Commit ffda3f49 by WangJinfeng

init id_mapping

parent d0d9e1ad
......@@ -94,7 +94,9 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
processVertex(date, row, idSet, idMainSet)
}).flatMap(l => l)
val maxGraph = vertex.combineByKey(
val maxGraph = vertex.filter(kv => {
!idMainSet.contains(kv._2._3)
}).combineByKey(
(v: (String, String, String)) => Set(v),
(c: Set[(String, String, String)], v: (String, String, String)) => c ++ Seq(v),
(c1: Set[(String, String, String)], c2: Set[(String, String, String)]) => c1 ++ c2
......@@ -111,10 +113,17 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
}
}).flatMap(l => l)
val singleOneIDRDD = maxGraph.filter(kv => {
kv._2.size == 1
val singleOneIDRDD = vertex.filter(kv => {
idMainSet.contains(kv._2._3)
// kv._2.size == 1
}).map(kv => {
val oneID = new JSONObject()
oneID.put(kv._2._1, kv._2._2)
(kv._1, oneID.toJSONString, kv._2._3)
})
/*
.map(kv => {
val oneID = new JSONObject()
val srcID = kv._1
var idType = ""
kv._2.foreach(it => {
......@@ -123,6 +132,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
})
(srcID, oneID.toJSONString, idType)
})
*/
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true)
......@@ -263,9 +273,12 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
def updateOneID(kv: (String, Set[(String, String, String)]), mainIDSet: Set[String]): ArrayBuffer[(String, String, String)] = {
val array = new ArrayBuffer[(String, String, String)]()
val tmpOneId = kv._1
val iters = kv._2
val oneID = new JSONObject()
iters.foreach(ir => {
iters.filter(ir => {
tmpOneId.equals(ir._1) || mainIDSet.contains(MobvistaConstant.String2JSONObject(ir._2).getString("id_type"))
}).foreach(ir => {
oneID.put(ir._1, MobvistaConstant.String2JSONObject(ir._2))
})
iters.filter(tp => {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment