package mobvista.dmp.datasource.adn_request_sdk import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.datasource.adn.mapreduce.GetDevIdUtil import mobvista.dmp.util.{DateUtil, MRUtils} import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{Row, SaveMode, SparkSession} import java.net.URI import scala.collection.mutable import scala.collection.mutable.ArrayBuffer /** * @package: mobvista.dmp.datasource.adn_sdk * @author: wangjf * @date: 2020/4/2 * @time: 8:34 下午 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class AdnRequestSdkEtlDaily extends CommonSparkJob with java.io.Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("date", true, "date") options.addOption("output", true, "output") options.addOption("coalesce", true, "coalesce") options.addOption("appIdMapping", true, "appIdMapping") options.addOption("manualAppIdMapping", true, "manualAppIdMapping") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val date = commandLine.getOptionValue("date") val output = commandLine.getOptionValue("output") val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce")) val appIdMapping = commandLine.getOptionValue("appIdMapping") val manualAppIdMapping = commandLine.getOptionValue("manualAppIdMapping") val spark = SparkSession .builder() .appName(s"AdnRequestSdkEtlDaily.$date") .config("spark.rdd.compress", "true") .config("spark.shuffle.compress", "true") .config("spark.sql.orc.filterPushdown", "true") .config("spark.io.compression.codec", "lz4") .config("spark.io.compression.lz4.blockSize", "64k") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() val sc = spark.sparkContext try { FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) val bmap = sc.broadcast(sc.textFile(appIdMapping).union(sc.textFile(manualAppIdMapping)).map(_.split(",")).filter(r => { StringUtils.isNotBlank(r(1)) && !"null".equals(r(1)) }).map(r => { (r(0), r(1)) }).collectAsMap()) val year = date.substring(0, 4) val month = date.substring(4, 6) val day = date.substring(6, 8) val sql = s""" |SELECT gaid, idfa, imei, androidid, extsysid, platform, appid, devicemodel, devicebrand, countrycode, strategy, oaid, idfv, ruid, osversion, rg | FROM dwh.etl_adn_org_request_daily_hours | WHERE yt = '$year' AND mt = '$month' AND dt = '$day' |""".stripMargin val rdd = spark.sql(sql).coalesce(coalesce * 2).rdd.map(row => { val linesArr = new ArrayBuffer[Row]() val gaid = row.getAs[String]("gaid") val idfa = row.getAs[String]("idfa") val imei = row.getAs[String]("imei") val androidId = row.getAs[String]("androidid") val extSysid = row.getAs[String]("extsysid") val platform = row.getAs[String]("platform").toLowerCase val appId = row.getAs[String]("appid") val model = row.getAs[String]("devicemodel") val brand = row.getAs[String]("devicebrand") val country = row.getAs[String]("countrycode").toUpperCase val strategy = row.getAs[String]("strategy") val oaid = row.getAs[String]("oaid") val idfv = row.getAs[String]("idfv") val ruid = row.getAs[String]("ruid") val osVersion = row.getAs[String]("osversion") val region = row.getAs[String]("rg") // 新增自有Id val sysIdType = GetDevIdUtil.getExtSysId(extSysid) val sysId = if (StringUtils.isNotBlank(sysIdType)) { val idType = MRUtils.SPLITTER.split(sysIdType, -1) idType(0) } else { "" } platform match { case "ios" => var dev_tag = 1 if (StringUtils.isNotBlank(ruid) && ruid.length > 16) { linesArr += Row(ruid, "ruid", platform, appId, model, brand, osVersion, country, strategy, region, 1) } if (StringUtils.isNotBlank(idfa) && idfa.matches(mobvista.dmp.common.MobvistaConstant.didPtn) && !idfa.matches(mobvista.dmp.common.MobvistaConstant.allZero)) { linesArr += Row(idfa, "idfa", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag) if (StringUtils.isNotBlank(sysId)) { linesArr += Row(sysId, "sysid", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag) } dev_tag = 0 if (StringUtils.isNotBlank(idfv) && idfv.matches(mobvista.dmp.common.MobvistaConstant.didPtn) && !idfv.matches(mobvista.dmp.common.MobvistaConstant.allZero)) { linesArr += Row(idfv, "idfv", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag) } } else { if (StringUtils.isNotBlank(sysId)) { linesArr += Row(sysId, "sysid", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag) if (StringUtils.isNotBlank(idfv) && idfv.matches(mobvista.dmp.common.MobvistaConstant.didPtn) && !idfv.matches(mobvista.dmp.common.MobvistaConstant.allZero)) { linesArr += Row(idfv, "idfv", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag) } } else { if (StringUtils.isNotBlank(idfv) && idfv.matches(mobvista.dmp.common.MobvistaConstant.didPtn) && !idfv.matches(mobvista.dmp.common.MobvistaConstant.allZero)) { linesArr += Row(idfv, "idfv", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag) } } } case "android" => var dev_tag = 1 if (StringUtils.isNotBlank(gaid) && gaid.matches(mobvista.dmp.common.MobvistaConstant.didPtn) && !gaid.matches(mobvista.dmp.common.MobvistaConstant.allZero)) { linesArr += Row(gaid, "gaid", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag) if (StringUtils.isNotBlank(oaid) && oaid.matches(mobvista.dmp.common.MobvistaConstant.didPtn) && !oaid.matches(mobvista.dmp.common.MobvistaConstant.allZero)) { linesArr += Row(oaid, "oaid", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag) } dev_tag = 0 if (StringUtils.isNotBlank(sysId)) { linesArr += Row(sysId, "sysid", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag) } if (StringUtils.isNotBlank(imei) && imei.matches(mobvista.dmp.common.MobvistaConstant.imeiPtn)) { linesArr += Row(imei, "imei", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag) } if (StringUtils.isNotBlank(androidId) && androidId.matches(mobvista.dmp.common.MobvistaConstant.andriodIdPtn)) { linesArr += Row(androidId, "androidId", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag) } } else { if (StringUtils.isNotBlank(oaid) && oaid.matches(mobvista.dmp.common.MobvistaConstant.didPtn) && !oaid.matches(mobvista.dmp.common.MobvistaConstant.allZero)) { linesArr += Row(oaid, "oaid", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag) } if (StringUtils.isNotBlank(imei) && imei.matches(mobvista.dmp.common.MobvistaConstant.imeiPtn) && "android".equals(platform)) { if (dev_tag == 1) { dev_tag = 0 } linesArr += Row(imei, "imei", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag) } if (StringUtils.isNotBlank(androidId) && androidId.matches(mobvista.dmp.common.MobvistaConstant.andriodIdPtn) && "android".equals(platform)) { if (dev_tag == 1) { dev_tag = 0 } linesArr += Row(androidId, "androidId", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag) } if (StringUtils.isNotBlank(sysId)) { if (dev_tag == 1) { dev_tag = 0 } linesArr += Row(sysId, "sysid", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag) } } case _ => } linesArr }).flatMap(l => l) val getPkgName = udf((app_id: String) => { bmap.value.getOrElse(app_id, "") }) val filterSet = udf((strategyArray: mutable.WrappedArray[String]) => { val strategySet = new mutable.HashSet[String]() strategyArray.foreach(strategy => { if (StringUtils.isNotBlank(strategy)) { strategySet.add(strategy.split(";")(0)) } }) strategySet.mkString(",") }) val update_date = DateUtil.format(DateUtil.parse(date, "yyyyMMdd"), "yyyy-MM-dd") spark.createDataFrame(rdd, schema).groupBy("device_id", "device_type", "platform", "app_id") .agg( getPkgName(col("app_id")).alias("package_name"), lit(update_date).alias("update_date"), max("model").alias("model"), max("brand").alias("brand"), max("os_version").alias("os_version"), max("country").alias("country"), filterSet(collect_set("strategy")).alias("strategy"), concat_ws(",", collect_set("region")).alias("region"), max("dev_tag").alias("dev_tag") ).write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output) } finally { if (spark != null) { spark.stop() } } 0 } val schema: StructType = StructType(Array( StructField("device_id", StringType), StructField("device_type", StringType), StructField("platform", StringType), StructField("app_id", StringType), StructField("model", StringType), StructField("brand", StringType), StructField("os_version", StringType), StructField("country", StringType), StructField("strategy", StringType), StructField("region", StringType), StructField("dev_tag", IntegerType))) } object AdnRequestSdkEtlDaily { def main(args: Array[String]): Unit = { new AdnRequestSdkEtlDaily().run(args) } }