Commit cc55fe8b by WangJinfeng

init id_mapping

parent d4fe6410
...@@ -94,14 +94,16 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -94,14 +94,16 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
processVertex(date, row, idSet, idMainSet) processVertex(date, row, idSet, idMainSet)
}).flatMap(l => l) }).flatMap(l => l)
// 非主ID生成OneID val maxGraph = vertex.combineByKey(
val multiOneIDRDD = vertex.filter(kv => {
!idMainSet.contains(kv._2._3)
}).combineByKey(
(v: (String, String, String)) => Set(v), (v: (String, String, String)) => Set(v),
(c: Set[(String, String, String)], v: (String, String, String)) => c ++ Seq(v), (c: Set[(String, String, String)], v: (String, String, String)) => c ++ Seq(v),
(c1: Set[(String, String, String)], c2: Set[(String, String, String)]) => c1 ++ c2 (c1: Set[(String, String, String)], c2: Set[(String, String, String)]) => c1 ++ c2
).map(rs => { )
// 非主ID生成OneID
val multiOneIDRDD = maxGraph.filter(kv => {
kv._2.size > 1
}).map(rs => {
platform match { platform match {
case "ios" => case "ios" =>
updateOneID(rs, Constant.iosMainIDSet) updateOneID(rs, Constant.iosMainIDSet)
...@@ -111,16 +113,10 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -111,16 +113,10 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
}).flatMap(l => l) }).flatMap(l => l)
// 主ID生成OneID // 主ID生成OneID
val singleOneIDRDD = vertex.filter(kv => { val singleOneIDRDD = maxGraph.filter(kv => {
idMainSet.contains(kv._2._3) kv._2.size == 1
}).map(kv => { }).map(kv => {
val oneID = new JSONObject() val oneID = new JSONObject()
oneID.put(kv._2._1, MobvistaConstant.String2JSONObject(kv._2._2))
(kv._1, oneID.toJSONString, kv._2._3)
})
/*
.map(kv => {
val oneID = new JSONObject()
val srcID = kv._1 val srcID = kv._1
var idType = "" var idType = ""
kv._2.foreach(it => { kv._2.foreach(it => {
...@@ -129,7 +125,6 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -129,7 +125,6 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
}) })
(srcID, oneID.toJSONString, idType) (srcID, oneID.toJSONString, idType)
}) })
*/
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment