From d4fe641027d9c9244ffa03918efca140a73f3403 Mon Sep 17 00:00:00 2001 From: WangJinfeng <wjf20110627@163.com> Date: Mon, 20 Dec 2021 21:13:02 +0800 Subject: [PATCH] init id_mapping --- src/main/scala/mobvista/dmp/datasource/id_mapping/IDMappingGraphx.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/main/scala/mobvista/dmp/datasource/id_mapping/IDMappingGraphx.scala b/src/main/scala/mobvista/dmp/datasource/id_mapping/IDMappingGraphx.scala index 39469ae..a5d1540 100644 --- a/src/main/scala/mobvista/dmp/datasource/id_mapping/IDMappingGraphx.scala +++ b/src/main/scala/mobvista/dmp/datasource/id_mapping/IDMappingGraphx.scala @@ -94,15 +94,14 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { processVertex(date, row, idSet, idMainSet) }).flatMap(l => l) - val maxGraph = vertex.filter(kv => { + // 非主ID生成OneID + val multiOneIDRDD = vertex.filter(kv => { !idMainSet.contains(kv._2._3) }).combineByKey( (v: (String, String, String)) => Set(v), (c: Set[(String, String, String)], v: (String, String, String)) => c ++ Seq(v), (c1: Set[(String, String, String)], c2: Set[(String, String, String)]) => c1 ++ c2 - ) - - val multiOneIDRDD = maxGraph.map(rs => { + ).map(rs => { platform match { case "ios" => updateOneID(rs, Constant.iosMainIDSet) @@ -111,12 +110,12 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { } }).flatMap(l => l) + // 主ID生成OneID val singleOneIDRDD = vertex.filter(kv => { idMainSet.contains(kv._2._3) - // kv._2.size == 1 }).map(kv => { val oneID = new JSONObject() - oneID.put(kv._2._1, kv._2._2) + oneID.put(kv._2._1, MobvistaConstant.String2JSONObject(kv._2._2)) (kv._1, oneID.toJSONString, kv._2._3) }) /* -- libgit2 0.27.1