package mobvista.dmp.datasource.adn import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import mobvista.dmp.util.MRUtils import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.compress.GzipCodec import java.net.URI import scala.collection.mutable.ArrayBuffer /** * @package: mobvista.dmp.datasource.adn * @author: wangjf * @date: 2021/2/23 * @time: 5:12 下午 * @email: jinfeng.wang@mobvista.com */ class AdnPreClickDaily extends CommonSparkJob with Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("datetime", true, "datetime") options.addOption("output", true, "output") options.addOption("coalesce", true, "coalesce") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val datetime = commandLine.getOptionValue("datetime") val output = commandLine.getOptionValue("output") val coalesce = commandLine.getOptionValue("coalesce") val spark = MobvistaConstant.createSparkSession(s"AdnPreClickDaily.$datetime") val sql = s""" |SELECT request_id, campaign_id, platform, gaid, idfa, imei, dev_id android_id, getDevId(ext_sysid) sysid, getRuid(ext_algo) ruid, getDevId(cdn_ab) idfv, ext_oaid oaid |FROM dwh.ods_adn_trackingnew_preclick WHERE CONCAT(yyyy,mm,dd) = '$datetime' AND request_id != '0' AND LENGTH(campaign_id) >= 2 |""".stripMargin try { spark.udf.register("getDevId", AdnConstant.getDevId _) spark.udf.register("getRuid", AdnConstant.getRuid _) val rdd = spark.sql(sql).rdd.map(r => { val arrayBuffer = new ArrayBuffer[String]() val requestId = r.getAs[String]("request_id") val campaignId = r.getAs[String]("campaign_id") val platform = r.getAs[String]("platform") val gaid = r.getAs[String]("gaid") val idfa = r.getAs[String]("idfa") val imei = r.getAs[String]("imei") val androidId = r.getAs[String]("android_id") val sysId = r.getAs[String]("sysid") val idfv = r.getAs[String]("idfv") val oaid = r.getAs[String]("oaid") val ruid = r.getAs[String]("ruid") if (StringUtils.isNotBlank(gaid) && gaid.matches(MobvistaConstant.didPtn)) { arrayBuffer += MRUtils.JOINER.join(gaid, "gaid", requestId, campaignId, platform) } if (StringUtils.isNotBlank(idfa) && idfa.matches(MobvistaConstant.didPtn)) { arrayBuffer += MRUtils.JOINER.join(idfa, "idfa", requestId, campaignId, platform) } if (StringUtils.isNotBlank(imei) && imei.matches(MobvistaConstant.imeiPtn)) { arrayBuffer += MRUtils.JOINER.join(imei, "imei", requestId, campaignId, platform) } if (StringUtils.isNotBlank(androidId) && androidId.matches(MobvistaConstant.andriodIdPtn)) { arrayBuffer += MRUtils.JOINER.join(androidId, "androidid", requestId, campaignId, platform) } if (StringUtils.isNotBlank(idfv) && idfv.matches(MobvistaConstant.didPtn)) { arrayBuffer += MRUtils.JOINER.join(idfv, "idfv", requestId, campaignId, platform) } if (StringUtils.isNotBlank(oaid) && oaid.matches(MobvistaConstant.oaidPtb)) { arrayBuffer += MRUtils.JOINER.join(oaid, "oaid", requestId, campaignId, platform) } if (StringUtils.isNotBlank(sysId) && sysId.matches(MobvistaConstant.didPtn)) { arrayBuffer += MRUtils.JOINER.join(sysId, "sysid", requestId, campaignId, platform) } if (StringUtils.isNotBlank(ruid) && ruid.length > 16) { arrayBuffer += MRUtils.JOINER.join(ruid, "ruid", requestId, campaignId, platform) } arrayBuffer.toIterator }).flatMap(l => l) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) rdd.repartition(coalesce.toInt) .saveAsTextFile(output, classOf[GzipCodec]) } finally { if (spark != null) { spark.stop() } } 0 } } object AdnPreClickDaily{ def main(args: Array[String]): Unit = { new AdnPreClickDaily().run(args) } }