package mobvista.dmp.datasource.dsp import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.lang.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SaveMode} import java.net.URI /** * @package: mobvista.dmp.datasource.dsp * @author: wangjf * @date: 2021/5/13 * @time: 1:53 下午 * @email: jinfeng.wang@mobvista.com */ class DspDeviceIdMapping extends CommonSparkJob with Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("date", true, "date") options.addOption("output", true, "output") options.addOption("coalesce", true, "coalesce") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val date = commandLine.getOptionValue("date") val output = commandLine.getOptionValue("output") val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce")) val spark = MobvistaConstant.createSparkSession(s"DspDeviceIdMapping.${date}") val sc = spark.sparkContext val sql = s""" |SELECT idfa, gaid, exitid, platform FROM dwh.etl_dsp_request_daily_hours | WHERE dt = '${date}' |""".stripMargin val rdd = spark.sql(sql).rdd.mapPartitions(rs => { rs.map(r => { var idfa = r.getAs[String]("idfa") val platform = r.getAs[String]("platform") val exitId = r.getAs[String]("exitid") // 新增 var idfv = "" if (StringUtils.isNotBlank(exitId)) { val devIds = splitFun(exitId, ",") if (devIds.length >= 17) { if ("ios".equalsIgnoreCase(platform)) { if (StringUtils.isBlank(idfa) && StringUtils.isNotBlank(devIds(1)) && devIds(1).matches(MobvistaConstant.didPtn)) { idfa = devIds(1) } if (StringUtils.isNotBlank(devIds(16)) && devIds(16).matches(MobvistaConstant.didPtn)) { idfv = devIds(16) } } } } if (StringUtils.isNotBlank(idfa) && StringUtils.isNotBlank(idfv)) { (idfa, idfv) } else { ("", "") } }) /* var gaid = r.getAs[String]("idfa") val platform = r.getAs[String]("platform") val exitId = r.getAs[String]("exitid") // 新增 var idfv = "" var gaidmd5 = "" var androidId = "" var imei = "" var imeimd5 = "" var oaid = "" var oaidmd5 = "" if (StringUtils.isNotBlank(exitId)) { val devIds = splitFun(exitId, ",") if (devIds.length >= 17) { if ("ios".equalsIgnoreCase(platform)) { if (StringUtils.isBlank(idfa) && StringUtils.isNotBlank(devIds(1)) && devIds(1).matches(MobvistaConstant.didPtn)) { idfa = devIds(1) } if (StringUtils.isNotBlank(devIds(16)) && devIds(16).matches(MobvistaConstant.didPtn)) { idfv = devIds(16) } if (StringUtils.isNotBlank(idfa) && StringUtils.isNotBlank(idfv)) { (idfa, idfv) } } else { if (StringUtils.isBlank(gaid) && StringUtils.isNotBlank(devIds(0)) && devIds(0).matches(MobvistaConstant.didPtn)) { gaid = devIds(0) } if (StringUtils.isNotBlank(devIds(2)) && devIds(2).matches(MobvistaConstant.md5Ptn)) { gaidmd5 = devIds(2) } if (StringUtils.isNotBlank(devIds(12))) { oaid = devIds(12) } if (StringUtils.isNotBlank(devIds(13)) && devIds(13).matches(MobvistaConstant.md5Ptn)) { oaidmd5 = devIds(13) } if (StringUtils.isNotBlank(devIds(4)) && devIds(4).matches(MobvistaConstant.imeiPtn)) { imei = devIds(4) } if (StringUtils.isNotBlank(devIds(5)) && devIds(5).matches(MobvistaConstant.md5Ptn)) { imeimd5 = devIds(5) } if (StringUtils.isNotBlank(devIds(7)) && devIds(7).matches(MobvistaConstant.andriodIdPtn)) { androidId = devIds(7) } } } } */ }).filter(r => { StringUtils.isNotBlank(r._1) && StringUtils.isNotBlank(r._2) }).map(r => { Row(r._1, r._2) }) val pathUri = new URI(output) FileSystem.get(new URI(s"${pathUri.getScheme}://${pathUri.getHost}"), sc.hadoopConfiguration).delete(new Path(output), true) val df = spark.createDataFrame(rdd, schema) df.dropDuplicates() .repartition(coalesce) .write .option("orc.compress", "zlib") .mode(SaveMode.Overwrite) .orc(output) 0 } val schema: StructType = StructType(Array( StructField("old_id", StringType), StructField("new_id", StringType) )) } object DspDeviceIdMapping { def main(args: Array[String]) { new DspDeviceIdMapping().run(args) } }