Commit 719ef9a5 by WangJinfeng

init id_mapping

parent e90fd86f
......@@ -8,8 +8,9 @@ import mobvista.dmp.util.MD5Util
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import java.net.URI
......@@ -157,23 +158,15 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
oneIDJSON.put(oneID, json.getJSONObject(key))
})
})
Result(srcId, srcType, oneIDJSON.toJSONString)
(srcId, srcType, oneIDJSON.toJSONString)
}).persist(StorageLevel.MEMORY_AND_DISK_SER)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true)
import spark.implicits._
midMergeOneIDRDD.toDF
.repartition(coalesce)
.write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(outPutPath)
val end_time = sdf1.parse(date).getTime
val resultOneID = midMergeOneIDRDD.mapPartitions(rs => {
rs.map(r => {
val device_id = r.device_id
val device_type = r.device_type
val one_id = MobvistaConstant.String2JSONObject(r.one_id)
val device_id = r._1
val device_type = r._2
val one_id = MobvistaConstant.String2JSONObject(r._3)
val keys = one_id.keySet().asScala
var oneIDScore: OneIDScore = OneIDScore("", "", 0)
keys.foreach(key => {
......@@ -182,7 +175,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
val id_type_score = scoreMap(id_type)
val active_date = json.getString("active_date")
val cnt = json.getIntValue("cnt")
val days = (sdf1.parse(date).getTime - sdf1.parse(active_date).getTime) / 1000 / 3600 / 24 + 1
val days = (end_time - sdf1.parse(active_date).getTime) / 1000 / 3600 / 24 + 1
val score = id_type_score * 30 / days + 0.1 * cnt
if (idSet.indexOf(id_type) < idSet.indexOf(oneIDScore.one_type) || idSet.indexOf(oneIDScore.one_type) == -1
|| (idSet.indexOf(id_type) == idSet.indexOf(oneIDScore.one_type) && score >= oneIDScore.one_score)) {
......@@ -193,18 +186,35 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
json.put("one_id", oneIDScore.one_id)
json.put("one_type", oneIDScore.one_type)
json.put("one_score", oneIDScore.one_score)
Result(device_id = device_id, device_type = device_type, one_id = json.toJSONString)
(device_id, device_type, json.toJSONString)
})
})
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(resultOutPutPath), true)
resultOneID.repartition(coalesce)
.saveAsTextFile(resultOutPutPath, classOf[GzipCodec])
/*
resultOneID
.toDF
.repartition(coalesce)
.write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(resultOutPutPath)
midMergeOneIDRDD.unpersist(true)
*/
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true)
/*
midMergeOneIDRDD.toDF
.repartition(coalesce)
.write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(outPutPath)
*/
midMergeOneIDRDD.repartition(coalesce)
.saveAsTextFile(outPutPath, classOf[GzipCodec])
}
def processData(row: Row, platform: String): Row = {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment