package mobvista.dmp.datasource.dsp import com.fasterxml.jackson.databind.ObjectMapper import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import mobvista.dmp.datasource.dsp.DspConstant.Value import mobvista.dmp.datasource.dsp.mapreduce.SegmentVO import mobvista.dmp.util.MD5Util import mobvista.prd.datasource.util.GsonUtil import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.lang.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.sql.functions.{collect_set, max, udf} import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} import java.net.URI import java.util import java.util.regex.Pattern import scala.collection.JavaConversions._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer class DspOrgEtlDailys extends CommonSparkJob with Serializable { private val iosPkgPtn = Pattern.compile("^\\d+$") private val adrPkgPtn = Pattern.compile("^[0-9a-zA-Z\\.]+$") override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val input = commandLine.getOptionValue("input") val output_etl_dsp_request_hour = commandLine.getOptionValue("output") val parallelism = commandLine.getOptionValue("parallelism").toInt val coalesce = commandLine.getOptionValue("coalesce").toInt val spark = SparkSession .builder() .config("spark.rdd.compress", "true") .config("spark.default.parallelism", s"${parallelism}") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() val sc = spark.sparkContext FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output_etl_dsp_request_hour), true) import spark.implicits._ try { val mergePkgUdf = udf((pkgsArrays: mutable.WrappedArray[String]) => { var res = "[]" val pkgSet: util.Set[String] = new util.HashSet[String]() if (pkgsArrays != null && pkgsArrays.size != 0) { for (pkgs <- pkgsArrays) { if (StringUtils.isNotBlank(pkgs)) { val pkgsname = pkgs.split("#", -1) for (pkgname <- pkgsname) { pkgSet.add(pkgname) } } } } if (pkgSet.size() != 0) { res = new ObjectMapper().writeValueAsString(pkgSet) } res }) val mergeAdridIdUdf = udf((adrsArrays: mutable.WrappedArray[String]) => { var res = "" val adrSet: util.Set[String] = new util.HashSet[String]() if (adrsArrays != null && adrsArrays.size != 0) { for (adrs <- adrsArrays) { if (StringUtils.isNotBlank(adrs)) { adrSet.add(adrs) } } } if (adrSet.size() != 0) { res = adrSet.mkString(",") } res }) val mergeUdf = udf((segmentArrays: mutable.WrappedArray[String]) => { var ret = "" val segmentMap: mutable.HashMap[String, SegmentVO] = new mutable.HashMap[String, SegmentVO]() if (segmentArrays != null && segmentArrays.size != 0) { for (segment <- segmentArrays) { if (StringUtils.isNotBlank(segment) && segment.startsWith("[") && segment.contains("{")) { val jsonArray = GsonUtil.String2JsonArray(segment.toString) for (json <- jsonArray) { //mergeUdf:segmentArray2>[{"name":"carrier","value":"verified"}];{"name":"carrier","value":"verified"} // println("mergeUdf:segmentArray2>" + segmentArray.toString + ";" +json.toString) import mobvista.dmp.datasource.dsp.mapreduce.SegmentVO import mobvista.prd.datasource.util.GsonUtil val segmentVO = GsonUtil.fromJson(json, classOf[SegmentVO]) segmentMap.put(segmentVO.getId, segmentVO) } } } } if (segmentMap.size != 0) { ret = new ObjectMapper().writeValueAsString(segmentMap.values()) } ret }) val mergeRegionUdf = udf((regsArrays: mutable.WrappedArray[String]) => { var res = "" val regSet: util.Set[String] = new util.HashSet[String]() if (regsArrays != null && regsArrays.size != 0) { for (regs <- regsArrays) { if (StringUtils.isNotBlank(regs)) { val regsname = regs.split("#", -1) for (regname <- regsname) { regSet.add(regname) } } } } if (regSet.size() != 0) { res = new ObjectMapper().writeValueAsString(regSet) } res }) val midData = spark.read.schema(dspEtlSchema).orc(input).flatMap(parseMapData(_)).toDF("device_id", "device_type", "platform", "country_code", "ip", "gender", "birthday", "maker", "model", "os_version", "package_list", "androidids", "datetime", "segment", "region") midData .groupBy("device_id", "device_type") .agg(max("platform"), max("country_code"), max("ip"), max("gender"), max("birthday"), max("maker"), max("model"), max("os_version"), mergePkgUdf(collect_set("package_list")), mergeAdridIdUdf(collect_set("androidids")), max("datetime"), mergeUdf(collect_set("segment")), mergeRegionUdf(collect_set("region")) ).repartition(coalesce) .rdd.map(_.mkString("\t")).saveAsTextFile(output_etl_dsp_request_hour, classOf[GzipCodec]) } finally { if (spark != null) { spark.stop() } } 0 } def parseMapData(row: Row): Array[DspReqVOoptimization] = { var arrayBuffer = new ArrayBuffer[DspReqVOoptimization]() // val idfa = row.getAs[String]("idfa") var idfa = "" val idfaTmp = row.getAs[String]("idfa") if (StringUtils.isNotBlank(idfaTmp) && idfaTmp.matches(didPtn)) idfa = idfaTmp // val gaid = row.getAs[String]("gaid") var gaid = "" val gaidTmp = row.getAs[String]("gaid") if (StringUtils.isNotBlank(gaidTmp) && gaidTmp.matches(didPtn)) gaid = gaidTmp val platform = row.getAs[String]("platform") val country = row.getAs[String]("country") val ip = row.getAs[String]("ip") val gender = row.getAs[String]("gender") val birthday = row.getAs[String]("birthday") val maker = row.getAs[String]("maker") val model = row.getAs[String]("model") val osVersion = row.getAs[String]("osVersion") var packageName = "" if (StringUtils.isNotBlank(row.getAs[String]("packageName"))) { packageName = row.getAs[String]("packageName") } val exitId = row.getAs[String]("exitId") val time = row.getAs[String]("datetime") var segment = "" if (StringUtils.isNotBlank(row.getAs[String]("segment"))) { segment = row.getAs[String]("segment") } val region = row.getAs[String]("region") var androidId = "" var imei = "" var imeimd5 = "" // 添加oaid oaidmd5 解析 var oaid = "" var oaidmd5 = "" // 新增 var idfv = "" var gaidmd5 = "" if (StringUtils.isNotBlank(exitId)) { val devIds = splitFun(exitId, ",") if (devIds.length >= 17) { if ("ios".equalsIgnoreCase(platform)) { if (StringUtils.isBlank(idfa) && StringUtils.isNotBlank(devIds(1)) && devIds(1).matches(MobvistaConstant.didPtn)) { idfa = devIds(1) } if (StringUtils.isNotBlank(devIds(16)) && devIds(16).matches(MobvistaConstant.didPtn)) { idfv = devIds(16) } } else { if (StringUtils.isBlank(gaid) && StringUtils.isNotBlank(devIds(0)) && devIds(0).matches(MobvistaConstant.didPtn)) { gaid = devIds(0) } if (StringUtils.isNotBlank(devIds(2)) && devIds(2).matches(MobvistaConstant.md5Ptn)) { gaidmd5 = devIds(2) } if (StringUtils.isNotBlank(devIds(12))) { oaid = devIds(12) } if (StringUtils.isNotBlank(devIds(13)) && devIds(13).matches(MobvistaConstant.md5Ptn)) { oaidmd5 = devIds(13) } if (StringUtils.isNotBlank(devIds(4)) && devIds(4).matches(MobvistaConstant.imeiPtn)) { imei = devIds(4) } if (StringUtils.isNotBlank(devIds(5)) && devIds(5).matches(MobvistaConstant.md5Ptn)) { imeimd5 = devIds(5) } if (StringUtils.isNotBlank(devIds(7)) && devIds(7).matches(MobvistaConstant.andriodIdPtn)) { androidId = devIds(7) } } } /* if (devIds.length >= 14) { if (StringUtils.isNotBlank(devIds(7)) && devIds(7).matches(CommonMapReduce.andriodIdPtn)) { androidId = devIds(7) } if (StringUtils.isNotBlank(devIds(4)) && devIds(4).matches(CommonMapReduce.imeiPtn) && "android".equals(platform) && StringUtils.isNotBlank(country) && "CN".equalsIgnoreCase(country)) { imei = devIds(4) } if (StringUtils.isNotBlank(devIds(5)) && devIds(5).matches(CommonMapReduce.imeiMd5Ptn) && "android".equals(platform) && StringUtils.isNotBlank(country) && "CN".equalsIgnoreCase(country)) { imeimd5 = devIds(5) } if (StringUtils.isNotBlank(devIds(12)) && "android".equals(platform) && StringUtils.isNotBlank(country) && "CN".equalsIgnoreCase(country)) { oaid = devIds(12) } if (StringUtils.isNotBlank(devIds(13)) && devIds(13).matches(CommonMapReduce.imeiMd5Ptn) && "android".equals(platform) && StringUtils.isNotBlank(country) && "CN".equalsIgnoreCase(country)) { oaidmd5 = devIds(13) } } */ } val value = Value(country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, segment, region) var deviceType = "" var deviceId = "" if ("ios".equals(platform) && idfa.length > 4 && !allZero.equals(idfa)) { deviceId = idfa deviceType = "idfa" } else if ("android".equals(platform) && gaid.length > 4 && !allZero.equals(gaid)) { deviceId = gaid deviceType = "gaid" } arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) /* if (!deviceId.isEmpty) { arrayBuffer += DspReqVOoptimization(deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, segment, region) } if (!androidId.isEmpty) { arrayBuffer += DspReqVOoptimization(androidId, "androidid", "android", country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, segment, region) } if (StringUtils.isNotBlank(imei)) { arrayBuffer += DspReqVOoptimization(imei, "imei", "android", country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, segment, region) val imeimd5 = MD5Util.getMD5Str(imei) arrayBuffer += DspReqVOoptimization(imeimd5, "imeimd5", "android", country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, segment, region) } if (StringUtils.isNotBlank(imeimd5)) { arrayBuffer += DspReqVOoptimization(imeimd5, "imeimd5", "android", country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, segment, region) } if (StringUtils.isNotBlank(oaid)) { arrayBuffer += DspReqVOoptimization(oaid, "oaid", "android", country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, segment, region) val oaidmd5 = MD5Util.getMD5Str(oaid) arrayBuffer += DspReqVOoptimization(oaidmd5, "oaidmd5", "android", country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, segment, region) } if (StringUtils.isNotBlank(oaidmd5)) { arrayBuffer += DspReqVOoptimization(oaidmd5, "oaidmd5", "android", country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, segment, region) } */ // 添加dealid入库逻辑 val exchanges = row.getAs[String]("exchanges") val dealerid = row.getAs[String]("dealerid") val dealeridArray: Array[String] = dealerid.split(",") if ("oppocn".equals(exchanges)) { if (dealeridArray.contains("2532") || dealeridArray.contains("2533")) { /* arrayBuffer = addDatas(arrayBuffer, imei, imeimd5, oaid, oaidmd5, deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion, "com.taobao.taobao", androidId, time, segment, region) arrayBuffer = addDatas(arrayBuffer, imei, imeimd5, oaid, oaidmd5, deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion, "com.taobao.taobao_oppo", androidId, time, segment, region) */ value.packageName = "com.taobao.taobao" arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) value.packageName = "com.taobao.taobao_oppo" arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) } else { // 2020.11.26 需求 若adx(exchanges字段)为oppo且dealid不等于2532或2533,则伪包名为com.taobao.taobao_notinstall_oppo /* arrayBuffer = addDatas(arrayBuffer, imei, imeimd5, oaid, oaidmd5, deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion, "com.taobao.taobao_notinstall_oppo", androidId, time, segment, region) */ value.packageName = "com.taobao.taobao_notinstall_oppo" arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) } // 添加com.UCMobile 对应伪包名小写com.ucmobile_oppo if (dealeridArray.contains("2728")) { /* arrayBuffer = addDatas(arrayBuffer, imei, imeimd5, oaid, oaidmd5, deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion, "com.UCMobile", androidId, time, segment, region) arrayBuffer = addDatas(arrayBuffer, imei, imeimd5, oaid, oaidmd5, deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion, "com.ucmobile_oppo", androidId, time, segment, region) */ value.packageName = "com.UCMobile" arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) value.packageName = "com.ucmobile_oppo" arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) } if (dealeridArray.contains("4059")) { value.packageName = "com.ss.android.ugc.aweme" arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) value.packageName = "com.ss.android.ugc.aweme_oppoziyou" arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) } if (dealeridArray.contains("4060")) { value.packageName = "com.ss.android.ugc.aweme_oppoziyou_notinstall" arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) } if (dealeridArray.contains("4061")) { value.packageName = "com.ss.android.ugc.aweme_oppoziyou_hist_notinstall" arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) } if (dealeridArray.contains("4053")) { value.packageName = "com.ss.android.ugc.aweme" arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) value.packageName = "com.ss.android.ugc.aweme_oppolianmeng" arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) } if (dealeridArray.contains("4054")) { value.packageName = "com.ss.android.ugc.aweme_oppolianmeng_hist1year_notinstall" arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) } if (dealeridArray.contains("4055")) { value.packageName = "com.ss.android.ugc.aweme_oppolianmeng_histhalfyear_notinstall" arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) } // 添加 oppo 在请求中开始传支付宝,京东,爱奇艺这几个包名的安装信息,并入库。 (2020-10-28 17:17 补充说明: com.eg.android.AlipayGphone_oppo伪包名大写,但已经入库,修改困难。以后入库伪包名统一定为小写,类似上面的 com.ucmobile_oppo) // 2020.11.11 添加 2783对应的咸鱼的 com.taobao.idlefish 和 com.taobao.idlefish_oppo 入库 // 2020.11.26 添加 2840对应的 com.youku.phone_notinstall 和 com.youku.phone_notinstall_oppo 入库 // 2020.12.24 添加 2889对应的 com.sankuai.meituan和 com.sankuai.meituan_oppo 入库 // 2020.12.24 添加 2890对应的 com.meituan.itakeaway和 com.meituan.itakeaway_oppo 入库 // 2021.01.06添加 快手、快手极速版、优酷 // 2021.04.22 添加adx(exchanges字段)为oppo且dealid=3160,则伪包名为com.tencent.news_oppo和com.tencent.news var mapData: Map[Int, String] = Map(2716 -> "com.jingdong.app.mall", 2717 -> "com.eg.android.AlipayGphone", 2718 -> "com.qiyi.video", 2783 -> "com.taobao.idlefish", 2840 -> "com.youku.phone_notinstall", 2889 -> "com.sankuai.meituan", 2890 -> "com.meituan.itakeaway", 2904 -> "com.smile.gifmaker", 2905 -> "com.kuaishou.nebula", 2906 -> "com.youku.phone", 3160 -> "com.tencent.news") for (item <- mapData) { if (dealeridArray.contains(item._1.toString)) { /* arrayBuffer = addDatas(arrayBuffer, imei, imeimd5, oaid, oaidmd5, deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion, mapData(num), androidId, time, segment, region) arrayBuffer = addDatas(arrayBuffer, imei, imeimd5, oaid, oaidmd5, deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion, mapData(num) + "_oppo", androidId, time, segment, region) */ value.packageName = item._2 arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) value.packageName = item._2+"_oppo" arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) } } //2020.11.11添加快手、京东入库 mapData = Map(2774 -> "com.smile.gifmaker_notinstall_oppo", 2773 -> "com.jingdong.app.mallr_notinstall_oppo") for (item <- mapData) { if (dealeridArray.contains(item._1.toString)) { /* arrayBuffer = addDatas(arrayBuffer, imei, imeimd5, oaid, oaidmd5, deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion, mapData(num), androidId, time, segment, region) */ value.packageName = item._2 arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) } } } if ("inmobi".equals(exchanges)) { if (dealeridArray.contains("1594807676568")) { /* arrayBuffer = addDatas(arrayBuffer, imei, imeimd5, oaid, oaidmd5, deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion, "com.taobao.taobao_inmobi", androidId, time, segment, region) */ value.packageName = "com.taobao.taobao_inmobi" arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) } } //2020.11.26 增加bes入库需求 // wiki https://confluence.mobvista.com/pages/viewpage.action?pageId=47976499 //2021.04.22 添加adx(exchanges字段)为bes且dealid=100188,则伪包名为com.tencent.news_bes和com.tencent.news //2021.06.17 添加adx(exchanges字段)为bes且dealid=100310,则伪包名为com.taobao.litetao_bes和com.taobao.litetao val mapData_bes = Map(100193 -> "com.taobao.taobao", 100189 -> "com.eg.android.AlipayGphone", 100191 -> "com.jingdong.app.mall", 100187 -> "com.UCMobile", 100194 -> "com.taobao.idlefish", 100195 -> "com.qiyi.video", 100196 -> "com.smile.gifmaker", 100197 -> "id387682726", 100188 -> "com.tencent.news", 100310 -> "com.taobao.litetao", 100203 -> "com.ss.android.ugc.aweme") if ("bes".equals(exchanges)) { for (item <- mapData_bes) { if (dealeridArray.contains(item._1.toString)) { /* arrayBuffer = addDatas(arrayBuffer, imei, imeimd5, oaid, oaidmd5, deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion, mapData_bes(num), androidId, time, segment, region) arrayBuffer = addDatas(arrayBuffer, imei, imeimd5, oaid, oaidmd5, deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion, mapData_bes(num) + "_bes", androidId, time, segment, region) */ value.packageName = item._2 arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) value.packageName = item._2+"_bes" arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) } } } //2021.04.28 天级别从dsp请求日志(adn_dsp.log_adn_dsp_request_orc_hour)中拉取exchanges='tencent' 并且appid = 'com.tencent.news',入库伪包名为com.tencent.news_fromtencent if ("tencent".equals(exchanges)) { if (packageName.split("#", -1).contains("com.tencent.news")) { /* arrayBuffer = addDatas(arrayBuffer, imei, imeimd5, oaid, oaidmd5, deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion, "com.tencent.news_fromtencent", androidId, time, segment, region) */ value.packageName = "com.tencent.news_fromtencent" arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) } } //2021.06.17 天级别从dsp请求日志(adn_dsp.log_adn_dsp_request_orc_hour)中拉取exchanges='kuaishou' 并且appid 符合下列名称的入库 val mapData_kuaishou = Map("com.smile.gifmaker" -> "com.smile.gifmaker_fromkuaishou", "com.kuaishou.nebula" -> "com.kuaishou.nebula_fromkuaishou", "440948110" -> "44094811020210617", "1472502819" -> "147250281920210617") if ("kuaishou".equals(exchanges)) { for (item <- mapData_kuaishou) { if (packageName.split("#", -1).contains(item._1)) { value.packageName = item._2 arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) } } } val mapData_iqiyi = Map(5260 -> "com.taobao.taobao", 1301 -> "com.UCMobile", 10949 -> "com.eg.android.AlipayGphone", 6242 -> "com.taobao.idlefish", 3996 -> "com.sankuai.meituan", 7156 -> "com.tencent.news") if("iqiyi".equals(exchanges)){ for (item <- mapData_iqiyi) { if (dealeridArray.contains(item._1.toString)) { value.packageName = item._2 arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) value.packageName = item._2+"_iqiyi" arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) } } //2021.06.17 若adx(exchanges字段)为iqiyi且dealid=112644(安装) 且os = 'android',则伪包名为com.taobao.litetao_iqiyi和com.taobao.litetao 若adx(exchanges字段)为iqiyi且dealid=112644(安装)且os='ios',则安装包名为134037632320210617和1340376323 if (dealeridArray.contains("112644")) { if("android".equalsIgnoreCase(platform)){ value.packageName = "com.taobao.litetao" arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, "", deviceType, "android", value) value.packageName = "com.taobao.litetao_iqiyi" arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, "", deviceType, "android", value) }else{ value.packageName = "1340376323" arrayBuffer = addDatasV2(arrayBuffer, deviceId, "", "", "", "", "", "", idfv, deviceType, "ios", value) value.packageName = "134037632320210617" arrayBuffer = addDatasV2(arrayBuffer, deviceId, "", "", "", "", "", "", idfv, deviceType, "ios", value) } } } //adx=iqiyi 且os=android的imeiMD5和oaidMD5去重设备, 分别和com.taobao.foractivation.227229和com.taobao.foractivation.227229_oaid做差集 2020.12.15 //2021.04.22下掉,该需求不需要了 // if("iqiyi".equals(exchanges)){ // arrayBuffer = addDatas(arrayBuffer,imei,imeimd5,oaid,oaidmd5,deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion, // "com.iqiyi_android.subtract_with_227229", androidId, time, segment,region) // } arrayBuffer.toArray } def addDatas(arrayBuffer: ArrayBuffer[DspReqVOoptimization], imei: String, imeimd5: String, oaid: String, oaidmd5: String, deviceId: String, deviceType: String, platform: String, country: String, ip: String, gender: String, birthday: String, maker: String, model: String, osVersion: String, packageName: String, androidId: String, time: String, segment: String, region: String): ArrayBuffer[DspReqVOoptimization] = { if (!deviceId.isEmpty) { arrayBuffer += DspReqVOoptimization(deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, segment, region) } if (!androidId.isEmpty) { arrayBuffer += DspReqVOoptimization(androidId, "androidid", "android", country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, segment, region) } if (StringUtils.isNotBlank(imei)) { arrayBuffer += DspReqVOoptimization(imei, "imei", "android", country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, segment, region) val imeimd5 = MD5Util.getMD5Str(imei) arrayBuffer += DspReqVOoptimization(imeimd5, "imeimd5", "android", country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, segment, region) } if (StringUtils.isNotBlank(imeimd5)) { arrayBuffer += DspReqVOoptimization(imeimd5, "imeimd5", "android", country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, segment, region) } if (StringUtils.isNotBlank(oaid)) { arrayBuffer += DspReqVOoptimization(oaid, "oaid", "android", country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, segment, region) val oaidmd5 = MD5Util.getMD5Str(oaid) arrayBuffer += DspReqVOoptimization(oaidmd5, "oaidmd5", "android", country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, segment, region) } if (StringUtils.isNotBlank(oaidmd5)) { arrayBuffer += DspReqVOoptimization(oaidmd5, "oaidmd5", "android", country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, segment, region) } arrayBuffer } def addDatasV2(arrayBuffer: ArrayBuffer[DspReqVOoptimization], deviceId: String, gaidmd5: String, imei: String, imeimd5: String, oaid: String, oaidmd5: String, androidId: String, idfv: String, deviceType: String, platform: String, value: Value ): ArrayBuffer[DspReqVOoptimization] = { var gaidFlag = true if (StringUtils.isNotBlank(deviceId)) { arrayBuffer += DspReqVOoptimization(deviceId, deviceType, platform, value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region) if ("android".equalsIgnoreCase(platform) && "gaid".equalsIgnoreCase(deviceType)) { val gaidmd5 = MD5Util.getMD5Str(deviceId) gaidFlag = false arrayBuffer += DspReqVOoptimization(gaidmd5, "gaidmd5", "android", value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region) } } if (StringUtils.isNotBlank(gaidmd5) && gaidFlag) { arrayBuffer += DspReqVOoptimization(gaidmd5, "gaidmd5", "android", value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region) } if (StringUtils.isNotBlank(deviceId)) { arrayBuffer += DspReqVOoptimization(deviceId, deviceType, platform, value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region) } if (StringUtils.isNotBlank(androidId)) { arrayBuffer += DspReqVOoptimization(androidId, "androidid", "android", value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region) } if (StringUtils.isNotBlank(idfv)) { arrayBuffer += DspReqVOoptimization(idfv, "idfv", "ios", value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region) } var imeiFlag = true if (StringUtils.isNotBlank(imei)) { arrayBuffer += DspReqVOoptimization(imei, "imei", "android", value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region) val imeimd5 = MD5Util.getMD5Str(imei) imeiFlag = false arrayBuffer += DspReqVOoptimization(imeimd5, "imeimd5", "android", value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region) } if (StringUtils.isNotBlank(imeimd5) && imeiFlag) { arrayBuffer += DspReqVOoptimization(imeimd5, "imeimd5", "android", value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region) } var oaidFlag = true if (StringUtils.isNotBlank(oaid)) { arrayBuffer += DspReqVOoptimization(oaid, "oaid", "android", value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region) val oaidmd5 = MD5Util.getMD5Str(oaid) oaidFlag = false arrayBuffer += DspReqVOoptimization(oaidmd5, "oaidmd5", "android", value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region) } if (StringUtils.isNotBlank(oaidmd5) && oaidFlag) { arrayBuffer += DspReqVOoptimization(oaidmd5, "oaidmd5", "android", value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region) } arrayBuffer } def dspEtlSchema: StructType = { StructType(StructField("idfa", StringType) :: StructField("gaid", StringType) :: StructField("platform", StringType) :: StructField("country", StringType) :: StructField("ip", StringType) :: StructField("gender", StringType) :: StructField("birthday", StringType) :: StructField("maker", StringType) :: StructField("model", StringType) :: StructField("osVersion", StringType) :: StructField("packageName", StringType) :: StructField("exitId", StringType) :: StructField("datetime", StringType) :: StructField("segment", StringType) :: StructField("dealerid", StringType) :: StructField("exchanges", StringType) :: StructField("region", StringType) :: Nil) } def commandOptions(): Options = { val options = new Options() options.addOption("input", true, "input") options.addOption("output", true, "output") options.addOption("parallelism", true, "parallelism") options.addOption("coalesce", true, "coalesce") options } } object DspOrgEtlDailys { def main(args: Array[String]): Unit = { new DspOrgEtlDailys().run(args) } }