Commit a53c5f7c by WangJinfeng

init id_mapping

parent 1dac1a5f
......@@ -3,6 +3,7 @@ package mobvista.dmp.datasource.id_mapping
import com.alibaba.fastjson.JSONObject
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.id_mapping.Constant._
import mobvista.dmp.util.MD5Util
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
......@@ -85,33 +86,18 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
case _ =>
""
}
val todayDF = spark.createDataFrame(spark.sql(dailySQL).rdd.map(row => {
val todayDF = spark.createDataFrame(spark.sql(dailySQL).sample(0.001).rdd.map(row => {
processData(row, platform)
}), schema = schame)
val vertex = todayDF.rdd.map(row => {
val res = processVertex(date, row, idSet.toArray, idMainSet)
/*
val res = platform match {
case "ios" =>
processVertex(date, rows, Constant.iosIDSet.toArray, Constant.iosMainIDSet)
case _ => {
country.toUpperCase match {
case "CN" =>
processVertex(date, rows, Constant.androidCNIDSet.toArray, Constant.androidMainIDSet)
case _ =>
processVertex(date, rows, Constant.androidIDSet.toArray, Constant.androidMainIDSet)
}
}
}
*/
res
processVertex(date, row, idSet.toArray, idMainSet)
}).flatMap(l => l)
val maxGraph = vertex.combineByKey(
(v: (String, JSONObject)) => Iterable(v),
(c: Iterable[(String, JSONObject)], v: (String, JSONObject)) => c ++ Seq(v),
(c1: Iterable[(String, JSONObject)], c2: Iterable[(String, JSONObject)]) => c1 ++ c2
(v: (String, JSONObject)) => Set(v),
(c: Set[(String, JSONObject)], v: (String, JSONObject)) => c ++ Seq(v),
(c1: Set[(String, JSONObject)], c2: Set[(String, JSONObject)]) => c1 ++ c2
)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true)
......@@ -202,7 +188,8 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
""
}
// IosVert(idfa, sysid, idfv_bundle, bmosv_osv_upt, bmosv_upt, bmosv_ipua_bundle, xwho, user_id, bkupid, cnt)
Row(idfa, sysid, idfv_bundle, bmosv_osv_upt, bmosv_upt, bmosv_ipua_bundle, xwho, user_id, bkupid, cnt)
Row(idfa, sysid, MD5Util.getMD5Str(idfv_bundle), MD5Util.getMD5Str(bmosv_osv_upt), MD5Util.getMD5Str(bmosv_upt), MD5Util.getMD5Str(bmosv_ipua_bundle),
xwho, user_id, bkupid, cnt)
case "android" =>
val imei = row.getAs[String]("imei")
val android_id = row.getAs[String]("android_id")
......@@ -238,9 +225,9 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
}
// AdrVert(imei, gaid, oaid, sysid, android_pkg, bmosv_upt, bmosv_ipua_pkg, xwho, user_id, bkupid, cnt)
if ("CN".equalsIgnoreCase(country)) {
Row(imei, oaid, gaid, sysid, android_pkg, bmosv_upt, bmosv_ipua_pkg, xwho, user_id, bkupid, cnt)
Row(imei, oaid, gaid, sysid, MD5Util.getMD5Str(android_pkg), MD5Util.getMD5Str(bmosv_upt), MD5Util.getMD5Str(bmosv_ipua_pkg), xwho, user_id, bkupid, cnt)
} else {
Row(gaid, imei, oaid, sysid, android_pkg, bmosv_upt, bmosv_ipua_pkg, xwho, user_id, bkupid, cnt)
Row(gaid, imei, oaid, sysid, MD5Util.getMD5Str(android_pkg), MD5Util.getMD5Str(bmosv_upt), MD5Util.getMD5Str(bmosv_ipua_pkg), xwho, user_id, bkupid, cnt)
}
case _ =>
Row("")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment