package mobvista.dmp.datasource.taobao import java.net.URI import java.security.MessageDigest import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.format.TextMultipleOutputFormat import org.apache.commons.cli.Options import org.apache.commons.lang.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.Text import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.{Row, SaveMode, SparkSession} import scala.collection.mutable.ArrayBuffer class EtlAliActivitionPostBackDaily extends CommonSparkJob with Serializable { override protected def buildOptions(): Options = { val options = new Options options.addOption("output", true, "[must] output") options.addOption("iosoutput", true, "[must] iosoutput") options.addOption("oaidoutput", true, "[must] oaidoutput") options.addOption("coalesce", true, "[must] coalesce") options.addOption("update_date", true, "[must] update_date") options.addOption("today", true, "[must] today") options.addOption("dt_taobao_postback_day", true, "[must] dt_taobao_postback_day") options.addOption("dt_dash_rec15day", true, "[must] dt_dash_rec15day") options.addOption("syn_to_3s", true, "[must] syn_to_3s") options.addOption("syn_3s_day", true, "[must] syn_3s_day") options } def buildRes(channelIds: Broadcast[Map[String, String]], syn_3s_day: String, sys_to_3s: String, row: Row): Array[Tuple2[Text, Text]] = { val buffer = new ArrayBuffer[Tuple2[Text, Text]]() var device_id = row.getAs[String]("device_id") val device_type = row.getAs[String]("device_type") val filename = row.getAs[String]("filename") // 判断拉新 拉活,以及安装包序列 var outputpath = "" // filename 举例 // val filename = "s3://mob-emr-test/adn/sync_srv/2021/05/27/1103/result_cuhuo_303907.20210527__0764c0f3bcb04a839c5b14d31715e93f.txt.gz" // val filename = "s3://mob-emr-test/adn/sync_srv/2021/05/27/511/result_cuhuo_260935.20210527__40688f0306024a67aa5fe01796aef252.txt.gz" if (filename.contains("result_cuhuo") || filename.contains("result_acc")) { // 原有的拉活 新增的acc类型 var seqNoPre = "" val oaidNubers = Array("1105","473","490","495","498","500","501","504","506","507","510","648","654","781","784") // 如果是oaid类型数据 if ( !oaidNubers.indexOf(filename.split("/")(8)).equals(-1) ) { seqNoPre = filename.split("_")(3).substring(0,6) + "_oaid" } else { seqNoPre = filename.split("_")(3).substring(0,6) } // if(filename.contains("result_acc") ){ // if(filename.contains("ios")){ // seqNoPre=filename.substring(115,121) // }else if(filename.contains("oaid")){ // seqNoPre=filename.substring(116,122) + "_oaid" // }else{ // seqNoPre=filename.substring(111,117) // } // }else{ // if(filename.contains("ios")){ // seqNoPre=filename.substring(117,123) // }else if(filename.contains("oaid")){ // seqNoPre=filename.substring(118,124) + "_oaid" // }else{ // seqNoPre=filename.substring(113,119) // } // } var seqNo = "0" if (channelIds.value.contains(seqNoPre)) { seqNo = channelIds.value.get(seqNoPre).get } if (device_type == "imei") { device_id = hashMD5(device_id) outputpath = s"${syn_3s_day}_H_${seqNo}_imeimd5" } else if (device_type == "oaid") { device_id = hashMD5(device_id) outputpath = s"${syn_3s_day}_H_${seqNo}_oaidmd5" } else if (device_type == "idfa") { device_id = hashMD5(device_id) outputpath = s"${syn_3s_day}_H_${seqNo}_idfamd5" } else { outputpath = s"${syn_3s_day}_H_${seqNo}_${device_type}" } } else { //拉新 if (device_type == "imei") { device_id = hashMD5(device_id) outputpath = s"${syn_3s_day}_X_imeimd5" } else if (device_type == "oaid") { device_id = hashMD5(device_id) outputpath = s"${syn_3s_day}_X_oaidmd5" } else if (device_type == "idfa") { device_id = hashMD5(device_id) outputpath = s"${syn_3s_day}_X_idfamd5" } else { outputpath = s"${syn_3s_day}_X_${device_type}" } } if (StringUtils.isNotBlank(outputpath)) { buffer += Tuple2(new Text(s"${outputpath}, "), new Text(device_id)) } buffer.toArray } def hashMD5(content: String): String = { val md5 = MessageDigest.getInstance("MD5") val encoded = md5.digest((content).getBytes) encoded.map("%02x".format(_)).mkString } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val output = commandLine.getOptionValue("output") val iosoutput = commandLine.getOptionValue("iosoutput") val oaidoutput = commandLine.getOptionValue("oaidoutput") val coalesce = commandLine.getOptionValue("coalesce") val update_date = commandLine.getOptionValue("update_date") val today = commandLine.getOptionValue("today") val dt_taobao_postback_day = commandLine.getOptionValue("dt_taobao_postback_day") val dt_dash_rec15day = commandLine.getOptionValue("dt_dash_rec15day") val syn_to_3s = commandLine.getOptionValue("syn_to_3s") val syn_3s_day = commandLine.getOptionValue("syn_3s_day") val spark = SparkSession.builder() .appName("EtlAliActivitionPostBackDaily") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(syn_to_3s), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(iosoutput), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(oaidoutput), true) val sc = spark.sparkContext try { val channel_ids_map = Map("149655" -> "0", "149647" -> "1", "149649" -> "2", "149650" -> "3", "149652" -> "4", "149654" -> "5", "149656" -> "6", "172393" -> "7", "184147" -> "8", "184287" -> "10", "184289" -> "11", "188844" -> "12", "149653" -> "13", "219343" -> "14", "219809" -> "15", "204543" -> "16", "227229" -> "17", "227229_oaid" -> "20", "204543_oaid" -> "21", "241357" -> "33", "241357_oaid" -> "34", "249582" -> "44", "249582_oaid" -> "45", "252594" -> "46", "252594_oaid" -> "47", "251827" -> "48", "251827_oaid" -> "49", "254029" -> "50", "254029_oaid" -> "51", "256294" -> "52", "256294_oaid" -> "53", "254944" -> "54", "254944_oaid" -> "55", "257904" -> "56", "257904_oaid" -> "57", "254343" -> "58", "254343_oaid" -> "59", "260935" -> "60", "260935_oaid" -> "61", "260951" -> "62", "260951_oaid" -> "63", "261865" -> "64", "261865_oaid" -> "65", "159702" -> "66", "159702_oaid" -> "67", "303907" -> "70", "303907_oaid" -> "71") // H_68、H_69为新增UC拉活的oaidmd5类型伪包名com.uc.foractivation.4b5a58(由于uc拉活对接方策略更改,此包以前是imeimd5的拉活包,现在变为了oaidmd5的)、com.uc.foractivation.aff149,分别对应UC_4_HASH_OAID 、 UC_5_HASH_OAID val channelIds = sc.broadcast(channel_ids_map) // 自增id 所属设备类别 // imei 1103 479 483 485 487 488 491 493 494 497 499 502 508 514 515 577 580 647 653 776 777 782 783 // idfa 1104 480 481 482 484 486 489 492 496 503 505 509 511 512 513 578 646 652 778 779 780 785 // oaid 1105 473 490 495 498 500 501 504 506 507 510 648 654 781 784 //今天要推送的数据 1425793208|2020-05-06,1486121050|2020-05-16,1474992087|2020-05-10,1475887491|2020-04-25,1480638758|2020-05-15,1488312219|2020-05-10 val sql1 = s""" |select |case when t1.device_type ='imei' then t1.device_id when t1.device_type ='imeimd5' then t1.device_id_md5 when t2.dev_id is not null then t2.dev_id end as device_id, |case when t1.device_type is null then 'imeimd5' else t1.device_type end as device_type, |'android' platform, |case when t2.dev_id is null then 'com.taobao.notforactivation' else concat('com.taobao.taobao,com.taobao.foractivation,',t2.package_name) end as packagename, |'CN' country, |'ali_activation' ali_type |from |( select device_id_md5,device_id,device_type,'android' platform,'CN' country | from dwh.ali_user_activation_rec15days | where dt='${today}' and update_date >= '${dt_dash_rec15day}' |) t1 full outer join |( select X.dev_id,concat_ws(',', collect_set(X.channel_id)) package_name |from (select dev_id,case when input_file_name() like '%result_cuhuo%' then concat('com.taobao.foractivation.',substr(split( input_file_name(), '/')[9],14,6)) when input_file_name() like '%result_acc%' then concat('com.taobao.foractivation.',substr(split( input_file_name(), '/')[9],12,6)) end as channel_id |from dwh.ali_taobao_postback_activation_daily |where dt ='${dt_taobao_postback_day}' and (input_file_name() like '%result_cuhuo%' or input_file_name() like '%result_acc%') and split( input_file_name(), '/')[8] in ('1103','479','483','485','487','488','491','493','494','497','499','502','508','514','515','577','580','647','653','776','777','782','783') ) X |group by X.dev_id ) t2 |on(t1.device_id_md5 = t2.dev_id) |union |select |case when t1.device_type ='imei' then t1.device_id when t1.device_type ='imeimd5' then t1.device_id_md5 when t2.dev_id is not null then t2.dev_id end as device_id, |case when t1.device_type is null then 'imeimd5' else t1.device_type end as device_type, |'android' platform, |case when t2.dev_id is null then 'com.taobao.notforacquisition' else 'com.taobao.foracquisition' end as packagename, |'CN' country, |'ali_acquisition' ali_type |from |( select device_id_md5,device_id,device_type,'android' platform,'CN' country | from dwh.ali_user_activation_rec15days | where dt='${today}' and update_date >= '${dt_dash_rec15day}' |) t1 full outer join |(select dev_id |from dwh.ali_taobao_postback_activation_daily |where dt ='${dt_taobao_postback_day}' and input_file_name() like '%result_laxin%' and split( input_file_name(), '/')[8] in ('1103','479','483','485','487','488','491','493','494','497','499','502','508','514','515','577','580','647','653','776','777','782','783') |group by dev_id) t2 |on(t1.device_id_md5 = t2.dev_id) """.stripMargin spark.sql(sql1).coalesce(coalesce.toInt) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .option("mapreduce.fileoutputcommitter.marksuccessfuljobs", false) .orc(output) val sql2 = s""" |select |case when t1.device_type ='idfa' then t1.device_id when t1.device_type ='idfamd5' then t1.device_id_md5 when t2.dev_id is not null then t2.dev_id end as device_id, |case when t1.device_type is null then 'idfamd5' else t1.device_type end as device_type, |'ios' platform, |case when t2.dev_id is null then '202005250004' else concat('387682726,202005250003,',t2.package_name) end as packagename, |'CN' country, |'ali_activation' ali_type |from |( select device_id_md5,device_id,device_type,'ios' platform,'CN' country | from dwh.ali_ios_user_activation_rec15days | where dt='${today}' and update_date >= '${dt_dash_rec15day}' |) t1 full outer join |( select X.dev_id,concat_ws(',', collect_set(X.channel_id)) package_name |from (select dev_id,case when input_file_name() like '%result_cuhuo%' then concat('202005',substr(split( input_file_name(), '/')[9],14,6)) when input_file_name() like '%result_acc%' then concat('202005',substr(split( input_file_name(), '/')[9],12,6)) end as channel_id |from dwh.ali_taobao_postback_activation_daily |where dt ='${dt_taobao_postback_day}' and (input_file_name() like '%result_cuhuo%' or input_file_name() like '%result_acc%') and split( input_file_name(), '/')[8] in ('1104','480','481','482','484','486','489','492','496','503','505','509','511','512','513','578','646','652','778','779','780','785') ) X |group by X.dev_id ) t2 |on(t1.device_id_md5 = t2.dev_id) |union |select |case when t1.device_type ='idfa' then t1.device_id when t1.device_type ='idfamd5' then t1.device_id_md5 when t2.dev_id is not null then t2.dev_id end as device_id, |case when t1.device_type is null then 'idfamd5' else t1.device_type end as device_type, |'ios' platform, |case when t2.dev_id is null then '202005250001' else '202005250000' end as packagename, |'CN' country, |'ali_acquisition' ali_type |from |( select device_id_md5,device_id,device_type,'ios' platform,'CN' country | from dwh.ali_ios_user_activation_rec15days | where dt='${today}' and update_date >= '${dt_dash_rec15day}' |) t1 full outer join |(select dev_id |from dwh.ali_taobao_postback_activation_daily |where dt ='${dt_taobao_postback_day}' and input_file_name() like '%result_laxin%' and split( input_file_name(), '/')[8] in ('1104','480','481','482','484','486','489','492','496','503','505','509','511','512','513','578','646','652','778','779','780','785') |group by dev_id) t2 |on(t1.device_id_md5 = t2.dev_id) """.stripMargin spark.sql(sql2).coalesce(coalesce.toInt) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .option("mapreduce.fileoutputcommitter.marksuccessfuljobs", false) .orc(iosoutput) /* val sql2 = s""" | select case when device_type ='imei' then device_id when device_type ='imeimd5' then device_id_md5 end as device_id, | device_type, | 'android' platform, | 'CN' country, |'' age, |'' gender, |'' tags, |'${update_date}' first_req_day, |'${update_date}' last_req_day | from dwh.ali_user_activation_rec15days | where dt='${today}' and update_date >= '${dt_dash_rec15day}' """.stripMargin spark.sql(sql2).coalesce(coalesce.toInt) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .option("mapreduce.fileoutputcommitter.marksuccessfuljobs", false) .orc(dmpuserinfo)*/ val sql3 = s""" |select |case when t1.device_type ='oaid' then t1.device_id when t1.device_type ='oaidmd5' then t1.device_id_md5 when t2.dev_id is not null then t2.dev_id end as device_id, |case when t1.device_type is null then 'oaidmd5' else t1.device_type end as device_type, |'android' platform, |case when t2.dev_id is null then 'com.taobao.notforactivation' else concat('com.taobao.taobao,com.taobao.foractivation,',t2.package_name) end as packagename, |'CN' country, |'ali_activation' ali_type |from |( select device_id_md5,device_id,device_type,'android' platform,'CN' country | from dwh.ali_oaid_user_activation_rec15days | where dt='${today}' and update_date >= '${dt_dash_rec15day}' |) t1 full outer join |( select X.dev_id,concat_ws(',', collect_set(X.channel_id)) package_name |from (select dev_id,case when input_file_name() like '%result_cuhuo%' then concat('com.taobao.foractivation.',substr(split( input_file_name(), '/')[9],14,6),'_oaid') when input_file_name() like '%result_acc%' then concat('com.taobao.foractivation.',substr(split( input_file_name(), '/')[9],12,6),'_oaid') end as channel_id |from dwh.ali_taobao_postback_activation_daily |where dt ='${dt_taobao_postback_day}' and (input_file_name() like '%result_cuhuo%' or input_file_name() like '%result_acc%') and split( input_file_name(), '/')[8] in ('1105','473','490','495','498','500','501','504','506','507','510','648','654','781','784') ) X |group by X.dev_id ) t2 |on(t1.device_id_md5 = t2.dev_id) |union |select |case when t1.device_type ='oaid' then t1.device_id when t1.device_type ='oaidmd5' then t1.device_id_md5 when t2.dev_id is not null then t2.dev_id end as device_id, |case when t1.device_type is null then 'oaidmd5' else t1.device_type end as device_type, |'android' platform, |case when t2.dev_id is null then 'com.taobao.notforacquisition' else 'com.taobao.foracquisition' end as packagename, |'CN' country, |'ali_acquisition' ali_type |from |( select device_id_md5,device_id,device_type,'android' platform,'CN' country | from dwh.ali_oaid_user_activation_rec15days | where dt='${today}' and update_date >= '${dt_dash_rec15day}' |) t1 full outer join |(select dev_id |from dwh.ali_taobao_postback_activation_daily |where dt ='${dt_taobao_postback_day}' and input_file_name() like '%result_laxin%' and split( input_file_name(), '/')[8] in ('1105','473','490','495','498','500','501','504','506','507','510','648','654','781','784') |group by dev_id) t2 |on(t1.device_id_md5 = t2.dev_id) """.stripMargin spark.sql(sql3).coalesce(coalesce.toInt) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .option("mapreduce.fileoutputcommitter.marksuccessfuljobs", false) .orc(oaidoutput) var sql4 = s""" |select |case when t1.device_type ='imei' then t1.device_id when t1.device_type ='imeimd5' then t1.device_id_md5 when t2.dev_id is not null then t2.dev_id end as device_id, |case when t1.device_type is null then 'imeimd5' else t1.device_type end as device_type, |t2.filename |from |( select device_id_md5,device_id,device_type | from dwh.ali_user_activation_rec15days | where dt='${today}' and update_date >= '${dt_dash_rec15day}' |) t1 right join |(select dev_id,input_file_name() filename |from dwh.ali_taobao_postback_activation_daily |where dt ='${dt_taobao_postback_day}' and split( input_file_name(), '/')[8] in ('1103','479','483','485','487','488','491','493','494','497','499','502','508','514','515','577','580','647','653','776','777','782','783') |group by dev_id,input_file_name()) t2 |on(t1.device_id_md5 = t2.dev_id) |union |select |case when t1.device_type ='idfa' then t1.device_id when t1.device_type ='idfamd5' then t1.device_id_md5 when t2.dev_id is not null then t2.dev_id end as device_id, |case when t1.device_type is null then 'idfamd5' else t1.device_type end as device_type, |t2.filename |from |( select device_id_md5,device_id,device_type | from dwh.ali_ios_user_activation_rec15days | where dt='${today}' and update_date >= '${dt_dash_rec15day}' |) t1 right join |(select dev_id,input_file_name() filename |from dwh.ali_taobao_postback_activation_daily |where dt ='${dt_taobao_postback_day}' and split( input_file_name(), '/')[8] in ('1104','480','481','482','484','486','489','492','496','503','505','509','511','512','513','578','646','652','778','779','780','785') |group by dev_id,input_file_name()) t2 |on(t1.device_id_md5 = t2.dev_id) |union |select |case when t1.device_type ='oaid' then t1.device_id when t1.device_type ='oaidmd5' then t1.device_id_md5 when t2.dev_id is not null then t2.dev_id end as device_id, |case when t1.device_type is null then 'oaidmd5' else t1.device_type end as device_type, |t2.filename |from |( select device_id_md5,device_id,device_type | from dwh.ali_oaid_user_activation_rec15days | where dt='${today}' and update_date >= '${dt_dash_rec15day}' |) t1 right join |(select dev_id,input_file_name() filename |from dwh.ali_taobao_postback_activation_daily |where dt ='${dt_taobao_postback_day}' and split( input_file_name(), '/')[8] in ('1105','473','490','495','498','500','501','504','506','507','510','648','654','781','784') |group by dev_id,input_file_name()) t2 |on(t1.device_id_md5 = t2.dev_id) """.stripMargin spark.sql(sql4).rdd.flatMap(buildRes(channelIds, syn_3s_day, syn_to_3s, _)).coalesce(100) .saveAsNewAPIHadoopFile(s"${syn_to_3s}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration) } finally { spark.stop() } 0 } } object EtlAliActivitionPostBackDaily { def main(args: Array[String]): Unit = { new EtlAliActivitionPostBackDaily().run(args) } }