package mobvista.dmp.datasource.id_mapping import com.alibaba.fastjson.JSONObject import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import mobvista.dmp.datasource.id_mapping.Constant._ import mobvista.dmp.util.MD5Util import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Row, SparkSession} import java.net.URI import scala.collection.mutable.ArrayBuffer /** * @package: mobvista.dmp.datasource.id_mapping * @author: wangjf * @date: 2021/12/7 * @time: 2:39 下午 * @email: jinfeng.wang@mobvista.com */ class IDMappingGraphx extends CommonSparkJob with Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("country", true, "country") options.addOption("platform", true, "platform") options.addOption("date", true, "date") options.addOption("output", true, "output") options.addOption("coalesce", true, "coalesce") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val country = commandLine.getOptionValue("country") val platform = commandLine.getOptionValue("platform") val date = commandLine.getOptionValue("date") val output = commandLine.getOptionValue("output") val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce")) val spark = MobvistaConstant.createSparkSession(s"IDMappingGraphx.$date.$country.$platform") try { oldAndTodayIdMapping(country, platform, date, spark, output, output, coalesce) } finally { if (spark != null) { spark.stop() } } 0 } def oldAndTodayIdMapping(country: String, platform: String, date: String, spark: SparkSession, outPutPath: String, edgeoutPutPath: String, coalesce: Int) = { implicit val formats = org.json4s.DefaultFormats var dailySQL = "" var schame: StructType = null var idSet: Array[String] = null var idMainSet: Set[String] = null // 1.今日数据加载 platform match { case "ios" => dailySQL = Constant.ios_id_mapping_sql.replace("@date", date) schame = iosVertSchema idSet = iosIDSet idMainSet = iosMainIDSet case "android" => { schame = adrVertSchema idMainSet = androidMainIDSet country match { case "CN" => idSet = androidCNIDSet dailySQL = Constant.android_id_mapping_sql.replace("@date", date).replace("@filter_country", s"AND country = '${country}'") case _ => idSet = androidIDSet dailySQL = Constant.android_id_mapping_sql.replace("@date", date).replace("@filter_country", s"AND country != '${country}'") } } case _ => "" } val todayDF = spark.createDataFrame(spark.sql(dailySQL).rdd.map(row => { processData(row, platform) }), schema = schame) val vertex = todayDF.rdd.map(row => { processVertex(date, row, idSet, idMainSet) }).flatMap(l => l) val maxGraph = vertex.combineByKey( (v: (String, String, String)) => Set(v), (c: Set[(String, String, String)], v: (String, String, String)) => c ++ Seq(v), (c1: Set[(String, String, String)], c2: Set[(String, String, String)]) => c1 ++ c2 ) // 非主ID生成OneID val multiOneIDRDD = maxGraph.filter(kv => { kv._2.size > 1 }).map(rs => { platform match { case "ios" => updateOneID(rs, Constant.iosMainIDSet) case _ => updateOneID(rs, Constant.androidMainIDSet) } }).flatMap(l => l) // 主ID生成OneID val singleOneIDRDD = maxGraph.filter(kv => { kv._2.size == 1 }).map(kv => { val oneID = new JSONObject() val srcID = kv._1 var idType = "" kv._2.foreach(it => { idType = it._3 oneID.put(it._1, MobvistaConstant.String2JSONObject(it._2)) }) (srcID, oneID.toJSONString, idType) }) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true) multiOneIDRDD.union(singleOneIDRDD) .repartition(coalesce) .saveAsTextFile(outPutPath, classOf[GzipCodec]) } def processData(row: Row, platform: String): Row = { platform match { case "ios" => var idfa = row.getAs[String]("idfa") idfa = if (StringUtils.isNotBlank(idfa) && idfa.matches(didPtn) && !idfa.matches(allZero)) { idfa } else { "" } var idfv = row.getAs[String]("idfv") idfv = if (StringUtils.isNotBlank(idfv) && idfv.matches(didPtn) && !idfv.matches(allZero)) { idfv } else { "" } val pkg_name = row.getAs[String]("pkg_name") val sysid = row.getAs[String]("sysid") val bkupid = row.getAs[String]("bkupid") val xwho = row.getAs[String]("xwho") val user_id = row.getAs[String]("user_id") val country = row.getAs[String]("country") val ip = row.getAs[String]("ip") val ua = row.getAs[String]("ua") val brand = row.getAs[String]("brand") val model = row.getAs[String]("model") val os_version = row.getAs[String]("os_version") val osv_upt = row.getAs[String]("osv_upt") val upt = row.getAs[String]("upt") val cnt = row.getAs[Long]("cnt") val idfv_bundle = if (StringUtils.isNotBlank(idfv)) { MD5Util.getMD5Str(idfv + pkg_name) } else { "" } val bmosv_osv_upt = if (StringUtils.isNotBlank(osv_upt)) { MD5Util.getMD5Str(brand + model + os_version + osv_upt) } else { "" } val bmosv_upt = if (StringUtils.isNotBlank(upt)) { MD5Util.getMD5Str(brand + model + os_version + upt) } else { "" } val bmosv_ipua_bundle = if (StringUtils.isNotBlank(ip)) { MD5Util.getMD5Str(brand + model + os_version + ip + ua + pkg_name) } else { "" } // IosVert(idfa, sysid, idfv_bundle, bmosv_osv_upt, bmosv_upt, bmosv_ipua_bundle, xwho, user_id, bkupid, cnt) Row(idfa, sysid, idfv_bundle, bmosv_osv_upt, bmosv_upt, bmosv_ipua_bundle, xwho, user_id, bkupid, cnt) case "android" => val imei = row.getAs[String]("imei") val android_id = row.getAs[String]("android_id") val pkg_name = row.getAs[String]("pkg_name") val oaid = row.getAs[String]("oaid") val gaid = row.getAs[String]("gaid") val sysid = row.getAs[String]("sysid") val bkupid = row.getAs[String]("bkupid") val xwho = row.getAs[String]("xwho") val user_id = row.getAs[String]("user_id") val country = row.getAs[String]("country") val ip = row.getAs[String]("ip") val ua = row.getAs[String]("ua") val brand = row.getAs[String]("brand") val model = row.getAs[String]("model") val os_version = row.getAs[String]("os_version") val upt = row.getAs[String]("upt") val cnt = row.getAs[Long]("cnt") val android_pkg = if (StringUtils.isNotBlank(android_id)) { MD5Util.getMD5Str(android_id + pkg_name) } else { "" } val bmosv_upt = if (StringUtils.isNotBlank(upt)) { MD5Util.getMD5Str(brand + model + os_version + upt) } else { "" } val bmosv_ipua_pkg = if (StringUtils.isNotBlank(ip)) { MD5Util.getMD5Str(brand + model + os_version + ip + ua + pkg_name) } else { "" } // AdrVert(imei, gaid, oaid, sysid, android_pkg, bmosv_upt, bmosv_ipua_pkg, xwho, user_id, bkupid, cnt) if ("CN".equalsIgnoreCase(country)) { Row(imei, oaid, gaid, sysid, android_pkg, bmosv_upt, bmosv_ipua_pkg, xwho, user_id, bkupid, cnt) } else { Row(gaid, imei, oaid, sysid, android_pkg, bmosv_upt, bmosv_ipua_pkg, xwho, user_id, bkupid, cnt) } case _ => Row("") } } def processVertex(date: String, row: Row, ids: Array[String], mainIDSet: Set[String]): ArrayBuffer[(String, (String, String, String))] = { val array = new ArrayBuffer[(String, (String, String, String))]() implicit val formats = org.json4s.DefaultFormats // val json = JSON.parseObject(Serialization.write(row)) // 事件频次 val cnt = row.getAs[Long]("cnt") // date 活跃日期,用于计算权重 var flag = true for (i <- 0 to ids.length - 2) { if (StringUtils.isNotBlank(row.getAs[String](String.valueOf(ids(i)))) && flag) { val jsonObject = new JSONObject() val oneIDType = ids(i) jsonObject.put("id_type", oneIDType) jsonObject.put("active_type", date) jsonObject.put("cnt", cnt) val oneID = row.getAs[String](String.valueOf(ids(i))) array += ((oneID, (oneID, jsonObject.toJSONString, oneIDType))) for (j <- i + 1 until ids.length) { if (StringUtils.isNotBlank(row.getAs[String](String.valueOf(ids(j))))) { val srcType = ids(j) val srcOrg = row.getAs[String](String.valueOf(srcType)) if (mainIDSet.contains(oneIDType)) { array += ((srcOrg, (oneID, jsonObject.toJSONString, srcType))) } else { array += ((oneID, (srcOrg, jsonObject.toJSONString, srcType))) } } } flag = false } } array } def updateOneID(kv: (String, Set[(String, String, String)]), mainIDSet: Set[String]): ArrayBuffer[(String, String, String)] = { val array = new ArrayBuffer[(String, String, String)]() val tmpOneId = kv._1 val iters = kv._2 val oneID = new JSONObject() iters.filter(ir => { tmpOneId.equals(ir._1) || mainIDSet.contains(MobvistaConstant.String2JSONObject(ir._2).getString("id_type")) }).foreach(ir => { oneID.put(ir._1, MobvistaConstant.String2JSONObject(ir._2)) }) iters.filter(tp => { !mainIDSet.contains(MobvistaConstant.String2JSONObject(tp._2).getString("id_type")) }).foreach(itr => { val k = itr._1 val t = itr._3 array += ((k, oneID.toJSONString, t)) }) array } } object IDMappingGraphx { def main(args: Array[String]): Unit = { new IDMappingGraphx().run(args) } }