diff --git a/src/main/scala/mobvista/dmp/datasource/id_mapping/IDMappingGraphx.scala b/src/main/scala/mobvista/dmp/datasource/id_mapping/IDMappingGraphx.scala index aa6307d..2683618 100644 --- a/src/main/scala/mobvista/dmp/datasource/id_mapping/IDMappingGraphx.scala +++ b/src/main/scala/mobvista/dmp/datasource/id_mapping/IDMappingGraphx.scala @@ -95,19 +95,11 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { }).flatMap(l => l) val maxGraph = vertex.combineByKey( - (v: (String, JSONObject)) => Set(v), - (c: Set[(String, JSONObject)], v: (String, JSONObject)) => c ++ Seq(v), - (c1: Set[(String, JSONObject)], c2: Set[(String, JSONObject)]) => c1 ++ c2 + (v: (String, JSONObject, String)) => Set(v), + (c: Set[(String, JSONObject, String)], v: (String, JSONObject, String)) => c ++ Seq(v), + (c1: Set[(String, JSONObject, String)], c2: Set[(String, JSONObject, String)]) => c1 ++ c2 ) - /* - FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true) - - maxGraph - .repartition(coalesce) - .saveAsTextFile(outPutPath, classOf[GzipCodec]) - */ - val multiOneIDRDD = maxGraph.filter(kv => { kv._2.size > 1 }).map(rs => { @@ -124,10 +116,12 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { }).map(kv => { val oneID = new JSONObject() val srcID = kv._1 + var idType = "" kv._2.foreach(it => { + idType = it._3 oneID.put(it._1, it._2) }) - (srcID, oneID) + (srcID, oneID, idType) }) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true) @@ -233,8 +227,8 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { } } - def processVertex(date: String, row: Row, ids: Array[String], mainIDSet: Set[String]): ArrayBuffer[(String, (String, JSONObject))] = { - val array = new ArrayBuffer[(String, (String, JSONObject))]() + def processVertex(date: String, row: Row, ids: Array[String], mainIDSet: Set[String]): ArrayBuffer[(String, (String, JSONObject, String))] = { + val array = new ArrayBuffer[(String, (String, JSONObject, String))]() implicit val formats = org.json4s.DefaultFormats // val json = JSON.parseObject(Serialization.write(row)) // 事件频次 @@ -249,14 +243,15 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { jsonObject.put("active_type", date) jsonObject.put("cnt", cnt) val oneID = row.getAs[String](String.valueOf(ids(i))) - array += ((oneID, (oneID, jsonObject))) + array += ((oneID, (oneID, jsonObject, oneIDType))) for (j <- i + 1 until ids.length) { if (StringUtils.isNotBlank(row.getAs[String](String.valueOf(ids(j))))) { - val srcOrg = row.getAs[String](String.valueOf(ids(j))) + val srcType = ids(j) + val srcOrg = row.getAs[String](String.valueOf(srcType)) if (mainIDSet.contains(oneIDType)) { - array += ((srcOrg, (oneID, jsonObject))) + array += ((srcOrg, (oneID, jsonObject, srcType))) } else { - array += ((oneID, (srcOrg, jsonObject))) + array += ((oneID, (srcOrg, jsonObject, srcType))) } } } @@ -266,19 +261,18 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { array } - def updateOneID(kv: (String, Iterable[(String, JSONObject)]), mainIDSet: Set[String]): ArrayBuffer[(String, JSONObject)] = { - val array = new ArrayBuffer[(String, JSONObject)]() + def updateOneID(kv: (String, Iterable[(String, JSONObject, String)]), mainIDSet: Set[String]): ArrayBuffer[(String, JSONObject, String)] = { + val array = new ArrayBuffer[(String, JSONObject, String)]() val iters = kv._2 - // val oneID = new ArrayBuffer[(String, String)]() val oneID = new JSONObject() iters.foreach(kv => { oneID.put(kv._1, kv._2) }) iters.filter(tp => { - mainIDSet.contains(tp._2.getString("id_type")) + !mainIDSet.contains(tp._2.getString("id_type")) }).foreach(itr => { val k = itr._1 - array += ((k, oneID)) + array += ((k, oneID, itr._3)) }) array }