Commit a66a6656 by WangJinfeng

init id_mapping

parent a53c5f7c
...@@ -45,7 +45,7 @@ object Constant { ...@@ -45,7 +45,7 @@ object Constant {
StructField("cnt", LongType) StructField("cnt", LongType)
)) ))
val iosIDSet = Set("idfa", "sysid", "xwho", "user_id", "idfv_bundle", "bmosv_osv_upt", "bmosv_upt", "bmosv_ipua_bundle", "bkupid") val iosIDSet = Array("idfa", "sysid", "xwho", "user_id", "idfv_bundle", "bmosv_osv_upt", "bmosv_upt", "bmosv_ipua_bundle", "bkupid")
val iosMainIDSet = Set("idfa", "sysid", "xwho", "user_id") val iosMainIDSet = Set("idfa", "sysid", "xwho", "user_id")
...@@ -118,9 +118,9 @@ object Constant { ...@@ -118,9 +118,9 @@ object Constant {
StructField("cnt", IntegerType) StructField("cnt", IntegerType)
)) ))
val androidCNIDSet = Set("imei", "oaid", "gaid", "sysid", "xwho", "user_id", "android_pkg", "bmosv_upt", "bmosv_ipua_pkg", "bkupid") val androidCNIDSet = Array("imei", "oaid", "gaid", "sysid", "xwho", "user_id", "android_pkg", "bmosv_upt", "bmosv_ipua_pkg", "bkupid")
val androidIDSet = Set("gaid", "imei", "oaid", "sysid", "xwho", "user_id", "android_pkg", "bmosv_upt", "bmosv_ipua_pkg", "bkupid") val androidIDSet = Array("gaid", "imei", "oaid", "sysid", "xwho", "user_id", "android_pkg", "bmosv_upt", "bmosv_ipua_pkg", "bkupid")
val androidMainIDSet = Set("imei", "gaid", "oaid", "sysid", "xwho", "user_id") val androidMainIDSet = Set("imei", "gaid", "oaid", "sysid", "xwho", "user_id")
......
...@@ -62,7 +62,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -62,7 +62,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
var dailySQL = "" var dailySQL = ""
var schame: StructType = null var schame: StructType = null
var idSet: Set[String] = null var idSet: Array[String] = null
var idMainSet: Set[String] = null var idMainSet: Set[String] = null
// 1.今日数据加载 // 1.今日数据加载
platform match { platform match {
...@@ -100,13 +100,14 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -100,13 +100,14 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
(c1: Set[(String, JSONObject)], c2: Set[(String, JSONObject)]) => c1 ++ c2 (c1: Set[(String, JSONObject)], c2: Set[(String, JSONObject)]) => c1 ++ c2
) )
/*
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true)
maxGraph maxGraph
.repartition(coalesce) .repartition(coalesce)
.saveAsTextFile(outPutPath, classOf[GzipCodec]) .saveAsTextFile(outPutPath, classOf[GzipCodec])
*/
/*
val multiOneIDRDD = maxGraph.filter(kv => { val multiOneIDRDD = maxGraph.filter(kv => {
kv._2.size > 1 kv._2.size > 1
}).map(rs => { }).map(rs => {
...@@ -134,7 +135,6 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -134,7 +135,6 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
multiOneIDRDD.union(singleOneIDRDD) multiOneIDRDD.union(singleOneIDRDD)
.repartition(coalesce) .repartition(coalesce)
.saveAsTextFile(outPutPath, classOf[GzipCodec]) .saveAsTextFile(outPutPath, classOf[GzipCodec])
*/
} }
...@@ -168,28 +168,27 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -168,28 +168,27 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
val upt = row.getAs[String]("upt") val upt = row.getAs[String]("upt")
val cnt = row.getAs[Long]("cnt") val cnt = row.getAs[Long]("cnt")
val idfv_bundle = if (StringUtils.isNotBlank(idfv) && StringUtils.isNotBlank(pkg_name)) { val idfv_bundle = if (StringUtils.isNotBlank(idfv) && StringUtils.isNotBlank(pkg_name)) {
idfv + pkg_name MD5Util.getMD5Str(idfv + pkg_name)
} else { } else {
"" ""
} }
val bmosv_osv_upt = if (StringUtils.isNotBlank(osv_upt)) { val bmosv_osv_upt = if (StringUtils.isNotBlank(osv_upt)) {
brand + model + os_version + osv_upt MD5Util.getMD5Str(brand + model + os_version + osv_upt)
} else { } else {
"" ""
} }
val bmosv_upt = if (StringUtils.isNotBlank(upt)) { val bmosv_upt = if (StringUtils.isNotBlank(upt)) {
brand + model + os_version + upt MD5Util.getMD5Str(brand + model + os_version + upt)
} else { } else {
"" ""
} }
val bmosv_ipua_bundle = if (StringUtils.isNotBlank(ip) && StringUtils.isNotBlank(pkg_name)) { val bmosv_ipua_bundle = if (StringUtils.isNotBlank(ip) && StringUtils.isNotBlank(pkg_name)) {
brand + model + os_version + ip + ua + pkg_name MD5Util.getMD5Str(brand + model + os_version + ip + ua + pkg_name)
} else { } else {
"" ""
} }
// IosVert(idfa, sysid, idfv_bundle, bmosv_osv_upt, bmosv_upt, bmosv_ipua_bundle, xwho, user_id, bkupid, cnt) // IosVert(idfa, sysid, idfv_bundle, bmosv_osv_upt, bmosv_upt, bmosv_ipua_bundle, xwho, user_id, bkupid, cnt)
Row(idfa, sysid, MD5Util.getMD5Str(idfv_bundle), MD5Util.getMD5Str(bmosv_osv_upt), MD5Util.getMD5Str(bmosv_upt), MD5Util.getMD5Str(bmosv_ipua_bundle), Row(idfa, sysid, idfv_bundle, bmosv_osv_upt, bmosv_upt, bmosv_ipua_bundle, xwho, user_id, bkupid, cnt)
xwho, user_id, bkupid, cnt)
case "android" => case "android" =>
val imei = row.getAs[String]("imei") val imei = row.getAs[String]("imei")
val android_id = row.getAs[String]("android_id") val android_id = row.getAs[String]("android_id")
...@@ -209,25 +208,25 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -209,25 +208,25 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
val upt = row.getAs[String]("upt") val upt = row.getAs[String]("upt")
val cnt = row.getAs[Long]("cnt") val cnt = row.getAs[Long]("cnt")
val android_pkg = if (StringUtils.isNotBlank(android_id) && StringUtils.isNotBlank(pkg_name)) { val android_pkg = if (StringUtils.isNotBlank(android_id) && StringUtils.isNotBlank(pkg_name)) {
android_id + pkg_name MD5Util.getMD5Str(android_id + pkg_name)
} else { } else {
"" ""
} }
val bmosv_upt = if (StringUtils.isNotBlank(upt)) { val bmosv_upt = if (StringUtils.isNotBlank(upt)) {
brand + model + os_version + upt MD5Util.getMD5Str(brand + model + os_version + upt)
} else { } else {
"" ""
} }
val bmosv_ipua_pkg = if (StringUtils.isNotBlank(ip) && StringUtils.isNotBlank(pkg_name)) { val bmosv_ipua_pkg = if (StringUtils.isNotBlank(ip) && StringUtils.isNotBlank(pkg_name)) {
brand + model + os_version + ip + ua + pkg_name MD5Util.getMD5Str(brand + model + os_version + ip + ua + pkg_name)
} else { } else {
"" ""
} }
// AdrVert(imei, gaid, oaid, sysid, android_pkg, bmosv_upt, bmosv_ipua_pkg, xwho, user_id, bkupid, cnt) // AdrVert(imei, gaid, oaid, sysid, android_pkg, bmosv_upt, bmosv_ipua_pkg, xwho, user_id, bkupid, cnt)
if ("CN".equalsIgnoreCase(country)) { if ("CN".equalsIgnoreCase(country)) {
Row(imei, oaid, gaid, sysid, MD5Util.getMD5Str(android_pkg), MD5Util.getMD5Str(bmosv_upt), MD5Util.getMD5Str(bmosv_ipua_pkg), xwho, user_id, bkupid, cnt) Row(imei, oaid, gaid, sysid, android_pkg, bmosv_upt, bmosv_ipua_pkg, xwho, user_id, bkupid, cnt)
} else { } else {
Row(gaid, imei, oaid, sysid, MD5Util.getMD5Str(android_pkg), MD5Util.getMD5Str(bmosv_upt), MD5Util.getMD5Str(bmosv_ipua_pkg), xwho, user_id, bkupid, cnt) Row(gaid, imei, oaid, sysid, android_pkg, bmosv_upt, bmosv_ipua_pkg, xwho, user_id, bkupid, cnt)
} }
case _ => case _ =>
Row("") Row("")
...@@ -252,11 +251,13 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -252,11 +251,13 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
val oneID = row.getAs[String](String.valueOf(ids(i))) val oneID = row.getAs[String](String.valueOf(ids(i)))
array += ((oneID, (oneID, jsonObject))) array += ((oneID, (oneID, jsonObject)))
for (j <- i + 1 until ids.length) { for (j <- i + 1 until ids.length) {
val srcOrg = row.getAs[String](String.valueOf(ids(j))) if (StringUtils.isNotBlank(row.getAs[String](String.valueOf(ids(j))))) {
if (mainIDSet.contains(oneIDType)) { val srcOrg = row.getAs[String](String.valueOf(ids(j)))
array += ((srcOrg, (oneID, jsonObject))) if (mainIDSet.contains(oneIDType)) {
} else { array += ((srcOrg, (oneID, jsonObject)))
array += ((oneID, (srcOrg, jsonObject))) } else {
array += ((oneID, (srcOrg, jsonObject)))
}
} }
} }
flag = false flag = false
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment