Commit ff5d6e34 by WangJinfeng

init id_mapping

parent a66a6656
...@@ -95,19 +95,11 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -95,19 +95,11 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
}).flatMap(l => l) }).flatMap(l => l)
val maxGraph = vertex.combineByKey( val maxGraph = vertex.combineByKey(
(v: (String, JSONObject)) => Set(v), (v: (String, JSONObject, String)) => Set(v),
(c: Set[(String, JSONObject)], v: (String, JSONObject)) => c ++ Seq(v), (c: Set[(String, JSONObject, String)], v: (String, JSONObject, String)) => c ++ Seq(v),
(c1: Set[(String, JSONObject)], c2: Set[(String, JSONObject)]) => c1 ++ c2 (c1: Set[(String, JSONObject, String)], c2: Set[(String, JSONObject, String)]) => c1 ++ c2
) )
/*
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true)
maxGraph
.repartition(coalesce)
.saveAsTextFile(outPutPath, classOf[GzipCodec])
*/
val multiOneIDRDD = maxGraph.filter(kv => { val multiOneIDRDD = maxGraph.filter(kv => {
kv._2.size > 1 kv._2.size > 1
}).map(rs => { }).map(rs => {
...@@ -124,10 +116,12 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -124,10 +116,12 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
}).map(kv => { }).map(kv => {
val oneID = new JSONObject() val oneID = new JSONObject()
val srcID = kv._1 val srcID = kv._1
var idType = ""
kv._2.foreach(it => { kv._2.foreach(it => {
idType = it._3
oneID.put(it._1, it._2) oneID.put(it._1, it._2)
}) })
(srcID, oneID) (srcID, oneID, idType)
}) })
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true)
...@@ -233,8 +227,8 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -233,8 +227,8 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
} }
} }
def processVertex(date: String, row: Row, ids: Array[String], mainIDSet: Set[String]): ArrayBuffer[(String, (String, JSONObject))] = { def processVertex(date: String, row: Row, ids: Array[String], mainIDSet: Set[String]): ArrayBuffer[(String, (String, JSONObject, String))] = {
val array = new ArrayBuffer[(String, (String, JSONObject))]() val array = new ArrayBuffer[(String, (String, JSONObject, String))]()
implicit val formats = org.json4s.DefaultFormats implicit val formats = org.json4s.DefaultFormats
// val json = JSON.parseObject(Serialization.write(row)) // val json = JSON.parseObject(Serialization.write(row))
// 事件频次 // 事件频次
...@@ -249,14 +243,15 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -249,14 +243,15 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
jsonObject.put("active_type", date) jsonObject.put("active_type", date)
jsonObject.put("cnt", cnt) jsonObject.put("cnt", cnt)
val oneID = row.getAs[String](String.valueOf(ids(i))) val oneID = row.getAs[String](String.valueOf(ids(i)))
array += ((oneID, (oneID, jsonObject))) array += ((oneID, (oneID, jsonObject, oneIDType)))
for (j <- i + 1 until ids.length) { for (j <- i + 1 until ids.length) {
if (StringUtils.isNotBlank(row.getAs[String](String.valueOf(ids(j))))) { if (StringUtils.isNotBlank(row.getAs[String](String.valueOf(ids(j))))) {
val srcOrg = row.getAs[String](String.valueOf(ids(j))) val srcType = ids(j)
val srcOrg = row.getAs[String](String.valueOf(srcType))
if (mainIDSet.contains(oneIDType)) { if (mainIDSet.contains(oneIDType)) {
array += ((srcOrg, (oneID, jsonObject))) array += ((srcOrg, (oneID, jsonObject, srcType)))
} else { } else {
array += ((oneID, (srcOrg, jsonObject))) array += ((oneID, (srcOrg, jsonObject, srcType)))
} }
} }
} }
...@@ -266,19 +261,18 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -266,19 +261,18 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
array array
} }
def updateOneID(kv: (String, Iterable[(String, JSONObject)]), mainIDSet: Set[String]): ArrayBuffer[(String, JSONObject)] = { def updateOneID(kv: (String, Iterable[(String, JSONObject, String)]), mainIDSet: Set[String]): ArrayBuffer[(String, JSONObject, String)] = {
val array = new ArrayBuffer[(String, JSONObject)]() val array = new ArrayBuffer[(String, JSONObject, String)]()
val iters = kv._2 val iters = kv._2
// val oneID = new ArrayBuffer[(String, String)]()
val oneID = new JSONObject() val oneID = new JSONObject()
iters.foreach(kv => { iters.foreach(kv => {
oneID.put(kv._1, kv._2) oneID.put(kv._1, kv._2)
}) })
iters.filter(tp => { iters.filter(tp => {
mainIDSet.contains(tp._2.getString("id_type")) !mainIDSet.contains(tp._2.getString("id_type"))
}).foreach(itr => { }).foreach(itr => {
val k = itr._1 val k = itr._1
array += ((k, oneID)) array += ((k, oneID, itr._3))
}) })
array array
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment