package mobvista.dmp.datasource.dsp import java.net.URI import mobvista.dmp.common.{CommonMapReduce, CommonSparkJob} import mobvista.prd.datasource.util.GsonUtil import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.lang.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.{Row, SaveMode, SparkSession} import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer class DspDealeridRetarget extends CommonSparkJob with Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("yyyymmdd", true, "yyyymmdd") options.addOption("output", true, "output") options.addOption("appFile",true,"appFile") options.addOption("coalesce", true, "coalesce") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val coalesce = commandLine.getOptionValue("coalesce") val yyyymmdd = commandLine.getOptionValue("yyyymmdd") val output = commandLine.getOptionValue("output") val appFile = commandLine.getOptionValue("appFile") val spark = SparkSession .builder() .config("spark.rdd.compress", "true") .config("spark.sql.orc.filterPushdown", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() try { val iosAppInfoMap = spark.sparkContext.textFile(appFile).filter(_.contains("202004")) .map(_.split("\t")).filter(_.length>=2) .map(buildAppInfo(_)) .collectAsMap() val iosAppInfoBC = spark.sparkContext.broadcast(iosAppInfoMap) val adrAppInfoMap = spark.sparkContext.textFile(appFile).filter(_.contains("com.")) .map(_.split("\t")).filter(_.length>=2) .map(buildAppInfo(_)) .collectAsMap() val adrAppInfoBC = spark.sparkContext.broadcast(adrAppInfoMap) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) import spark.implicits._ var sql = s""" |select dealerid,idfa,gaid,exitid,country,platform | from dwh.etl_dsp_request_daily_hours where dt ='${yyyymmdd}' and exchanges='mopub' and dealerid !='' """.stripMargin val df= spark.sql(sql).filter(filterData _).rdd.map(parseMapData(iosAppInfoBC,adrAppInfoBC,_)) sql = s""" |select ext3,idfa,googleadid gaid,ext5 exitid,countrycode country,os platform |from adn_dsp.log_adn_dsp_org_request_orc_hour where concat(yr,mt,dt) ='${yyyymmdd}' |and exchanges='mopub' """.stripMargin val df_org= spark.sql(sql).filter(filterData _).rdd.map(parseMapOrgData(iosAppInfoBC,adrAppInfoBC,_)) /*df.union(df_org).flatMap(l=> l).toDF("deviceid", "devicetype", "platform","packagename","country","dealerid") .createOrReplaceTempView("dsp_org_etl_hours")*/ df.union(df_org).flatMap(l=> l).toDF("device_id", "device_type", "platform","packagename","country","dealerid") .createOrReplaceTempView("dsp_org_etl_hours") val sql4= """ |select device_id, |device_type, |max(platform) platform, |concat_ws('#',collect_set(packagename)) package_name, |min(country) country, |concat_ws('#',collect_set(dealerid)) dealerids |from dsp_org_etl_hours |group by device_id,device_type """.stripMargin /* val sql4= """ |select device_id, device_type, platform,packagename,country,dealerid |from dsp_org_etl_hours """.stripMargin*/ spark.sql(sql4).coalesce(coalesce.toInt) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .option("mapreduce.fileoutputcommitter.marksuccessfuljobs", false) .orc(output) } finally { if (spark != null) { spark.stop() } } 0 } def buildAppInfo(array: Array[String]): Tuple2[String, String] = { val dealerid = array(0) val pkgName = array(1) (dealerid,pkgName) } def genRes(iosAppInfoBC: Broadcast[collection.Map[String, String]],adrAppInfoBC: Broadcast[collection.Map[String, String]],dealeridsStr: String, idfa: String, gaid: String, platform: String, exitId: String, country: String): Iterator[(String, String, String, String, String,String)] = { val buffer = new ArrayBuffer[Tuple6[String,String,String,String,String,String]]() val dealerids = GsonUtil.String2JsonArray(dealeridsStr) var idfagaidmd5 ="" var imei ="" var imeimd5 = "" if(StringUtils.isNotBlank(exitId)){ val devIds = splitFun(exitId, ",") if(devIds.length>= 6 ){ if (StringUtils.isNotBlank(devIds(2)) && devIds(2).matches(CommonMapReduce.imeiMd5Ptn)) { idfagaidmd5 = devIds(2) } if (StringUtils.isNotBlank(devIds(4)) && devIds(4).matches(CommonMapReduce.imeiPtn) && StringUtils.isNotBlank(country) && "CN".equalsIgnoreCase(country)) { imei = devIds(4) } if (StringUtils.isNotBlank(devIds(5)) && devIds(5).matches(CommonMapReduce.imeiMd5Ptn) && StringUtils.isNotBlank(country) && "CN".equalsIgnoreCase(country)) { imeimd5 = devIds(5) } } } var deviceid="" var devicetype="" if("ios".equalsIgnoreCase(platform)){ if(StringUtils.isNotBlank(idfa)){ deviceid = idfa devicetype="idfa" } else if(StringUtils.isNotBlank(idfagaidmd5)){ deviceid = idfagaidmd5 devicetype="idfamd5" } }else if("android".equalsIgnoreCase(platform) && "CN".equalsIgnoreCase(country)){ if(StringUtils.isNotBlank(imei)){ deviceid = imei devicetype="imei" } else if(StringUtils.isNotBlank(imeimd5)){ deviceid = imeimd5 devicetype="imeimd5" } }else if("android".equalsIgnoreCase(platform) && !"CN".equalsIgnoreCase(country)){ if(StringUtils.isNotBlank(gaid)){ deviceid = gaid devicetype="gaid" } else if(StringUtils.isNotBlank(idfagaidmd5)){ deviceid = idfagaidmd5 devicetype="gaidmd5" } } dealerids.foreach(element =>{ var pkgname:Option[String] =None if("ios".equalsIgnoreCase(platform)) { pkgname = iosAppInfoBC.value.get(element.getAsString) }else { pkgname = adrAppInfoBC.value.get(element.getAsString) } if(pkgname != None ){ buffer += Tuple6(deviceid,devicetype, platform,pkgname.get,country, element.getAsString ) } }) buffer.toIterator } def parseMapOrgData(iosAppInfoBC: Broadcast[collection.Map[String, String]],adrAppInfoBC: Broadcast[collection.Map[String, String]], row: Row): Iterator[Tuple6[String,String,String,String,String,String]] = { val ext3 = row.getAs[String]("ext3") var dealeridsStr = "" var resStr = "" if(StringUtils.isNotBlank(ext3) && ext3.startsWith("{")){ try{ val testObj = GsonUtil.String2JsonObject(ext3) val dealids = testObj.get("dealids") if(dealids != null && !dealids.isJsonNull){ dealeridsStr = dealids.toString } } catch { case e: Exception =>{e.printStackTrace()} } } val idfa = row.getAs[String]("idfa") val gaid = row.getAs[String]("gaid") val platform = row.getAs[String]("platform") val exitId = row.getAs[String]("exitid") val country = row.getAs[String]("country") if(StringUtils.isNotBlank(dealeridsStr)){ val dealeridDemo = dealeridsStr.replace("\\","") val len =dealeridDemo.length resStr = dealeridDemo.substring(1,len -1) } genRes(iosAppInfoBC,adrAppInfoBC,resStr,idfa,gaid,platform,exitId,country) } def parseMapData(iosAppInfoBC: Broadcast[collection.Map[String, String]],adrAppInfoBC: Broadcast[collection.Map[String, String]],row: Row): Iterator[Tuple6[String,String,String,String,String,String]] = { val dealerid = row.getAs[String]("dealerid") val idfa = row.getAs[String]("idfa") val gaid = row.getAs[String]("gaid") val platform = row.getAs[String]("platform") val exitId = row.getAs[String]("exitid") val country = row.getAs[String]("country") genRes(iosAppInfoBC,adrAppInfoBC,dealerid,idfa,gaid,platform,exitId,country) } def filterData(row: Row): Boolean = { // dealerid,idfa,gaid,exitid,country val idfa = row.getAs[String]("idfa") val gaid = row.getAs[String]("gaid") val platform = row.getAs[String]("platform") val exitId = row.getAs[String]("exitid") val country = row.getAs[String]("country") if (!"ios".equals(platform) && !"android".equals(platform)) { return false } var deviceId = "" if ("ios".equals(platform) && StringUtils.isNotBlank(idfa) && idfa.matches(didPtn) && !allZero.equals(idfa)) { deviceId = idfa } else if ("android".equals(platform) && StringUtils.isNotBlank(gaid) && gaid.matches(didPtn) && !allZero.equals(gaid) ) { deviceId = gaid } var idfagaidmd5 ="" var imei ="" var imeimd5 = "" if(StringUtils.isNotBlank(exitId)){ val devIds = splitFun(exitId, ",") if(devIds.length>= 6 ){ if (StringUtils.isNotBlank(devIds(2)) && devIds(2).matches(CommonMapReduce.imeiMd5Ptn)) { idfagaidmd5 = devIds(2) } if (StringUtils.isNotBlank(devIds(4)) && devIds(4).matches(CommonMapReduce.imeiPtn) && StringUtils.isNotBlank(country) && "CN".equalsIgnoreCase(country)) { imei = devIds(4) } if (StringUtils.isNotBlank(devIds(5)) && devIds(5).matches(CommonMapReduce.imeiMd5Ptn) && StringUtils.isNotBlank(country) && "CN".equalsIgnoreCase(country)) { imeimd5 = devIds(5) } } } if (StringUtils.isBlank(deviceId) && StringUtils.isBlank(imei) && StringUtils.isBlank(imeimd5) && StringUtils.isBlank(idfagaidmd5)) { return false } true } } object DspDealeridRetarget { def main(args: Array[String]): Unit = { new DspDealeridRetarget().run(args) } }