diff --git a/src/main/scala/mobvista/dmp/datasource/id_mapping/IDMappingGraphx.scala b/src/main/scala/mobvista/dmp/datasource/id_mapping/IDMappingGraphx.scala index e97ca35..151739d 100644 --- a/src/main/scala/mobvista/dmp/datasource/id_mapping/IDMappingGraphx.scala +++ b/src/main/scala/mobvista/dmp/datasource/id_mapping/IDMappingGraphx.scala @@ -95,9 +95,9 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { }).flatMap(l => l) val maxGraph = vertex.combineByKey( - (v: (String, JSONObject, String)) => Set(v), - (c: Set[(String, JSONObject, String)], v: (String, JSONObject, String)) => c ++ Seq(v), - (c1: Set[(String, JSONObject, String)], c2: Set[(String, JSONObject, String)]) => c1 ++ c2 + (v: (String, String, String)) => Set(v), + (c: Set[(String, String, String)], v: (String, String, String)) => c ++ Seq(v), + (c1: Set[(String, String, String)], c2: Set[(String, String, String)]) => c1 ++ c2 ) val multiOneIDRDD = maxGraph.filter(kv => { @@ -119,9 +119,9 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { var idType = "" kv._2.foreach(it => { idType = it._3 - oneID.put(it._1, it._2) + oneID.put(it._1, MobvistaConstant.String2JSONObject(it._2)) }) - (srcID, oneID, idType) + (srcID, oneID.toJSONString, idType) }) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true) @@ -161,7 +161,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { val osv_upt = row.getAs[String]("osv_upt") val upt = row.getAs[String]("upt") val cnt = row.getAs[Long]("cnt") - val idfv_bundle = if (StringUtils.isNotBlank(idfv) && StringUtils.isNotBlank(pkg_name)) { + val idfv_bundle = if (StringUtils.isNotBlank(idfv)) { MD5Util.getMD5Str(idfv + pkg_name) } else { "" @@ -176,7 +176,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { } else { "" } - val bmosv_ipua_bundle = if (StringUtils.isNotBlank(ip) && StringUtils.isNotBlank(pkg_name)) { + val bmosv_ipua_bundle = if (StringUtils.isNotBlank(ip)) { MD5Util.getMD5Str(brand + model + os_version + ip + ua + pkg_name) } else { "" @@ -201,7 +201,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { val os_version = row.getAs[String]("os_version") val upt = row.getAs[String]("upt") val cnt = row.getAs[Long]("cnt") - val android_pkg = if (StringUtils.isNotBlank(android_id) && StringUtils.isNotBlank(pkg_name)) { + val android_pkg = if (StringUtils.isNotBlank(android_id)) { MD5Util.getMD5Str(android_id + pkg_name) } else { "" @@ -211,7 +211,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { } else { "" } - val bmosv_ipua_pkg = if (StringUtils.isNotBlank(ip) && StringUtils.isNotBlank(pkg_name)) { + val bmosv_ipua_pkg = if (StringUtils.isNotBlank(ip)) { MD5Util.getMD5Str(brand + model + os_version + ip + ua + pkg_name) } else { "" @@ -227,8 +227,8 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { } } - def processVertex(date: String, row: Row, ids: Array[String], mainIDSet: Set[String]): ArrayBuffer[(String, (String, JSONObject, String))] = { - val array = new ArrayBuffer[(String, (String, JSONObject, String))]() + def processVertex(date: String, row: Row, ids: Array[String], mainIDSet: Set[String]): ArrayBuffer[(String, (String, String, String))] = { + val array = new ArrayBuffer[(String, (String, String, String))]() implicit val formats = org.json4s.DefaultFormats // val json = JSON.parseObject(Serialization.write(row)) // 事件频次 @@ -243,15 +243,15 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { jsonObject.put("active_type", date) jsonObject.put("cnt", cnt) val oneID = row.getAs[String](String.valueOf(ids(i))) - array += ((oneID, (oneID, jsonObject, oneIDType))) + array += ((oneID, (oneID, jsonObject.toJSONString, oneIDType))) for (j <- i + 1 until ids.length) { if (StringUtils.isNotBlank(row.getAs[String](String.valueOf(ids(j))))) { val srcType = ids(j) val srcOrg = row.getAs[String](String.valueOf(srcType)) if (mainIDSet.contains(oneIDType)) { - array += ((srcOrg, (oneID, jsonObject, srcType))) + array += ((srcOrg, (oneID, jsonObject.toJSONString, srcType))) } else { - array += ((oneID, (srcOrg, jsonObject, srcType))) + array += ((oneID, (srcOrg, jsonObject.toJSONString, srcType))) } } } @@ -261,18 +261,19 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { array } - def updateOneID(kv: (String, Iterable[(String, JSONObject, String)]), mainIDSet: Set[String]): ArrayBuffer[(String, JSONObject, String)] = { - val array = new ArrayBuffer[(String, JSONObject, String)]() + def updateOneID(kv: (String, Set[(String, String, String)]), mainIDSet: Set[String]): ArrayBuffer[(String, String, String)] = { + val array = new ArrayBuffer[(String, String, String)]() val iters = kv._2 val oneID = new JSONObject() iters.foreach(ir => { - oneID.put(ir._1, ir._2) + oneID.put(ir._1, MobvistaConstant.String2JSONObject(ir._2)) }) iters.filter(tp => { - !mainIDSet.contains(tp._2.getString("id_type")) + !mainIDSet.contains(MobvistaConstant.String2JSONObject(tp._2).getString("id_type")) }).foreach(itr => { val k = itr._1 - array += ((k, oneID, itr._3)) + val t = itr._3 + array += ((k, oneID.toJSONString, t)) }) array }