AdnPreClickDaily.scala 4.4 KB
package mobvista.dmp.datasource.adn

import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.util.MRUtils
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec

import java.net.URI
import scala.collection.mutable.ArrayBuffer

/**
 * @package: mobvista.dmp.datasource.adn
 * @author: wangjf
 * @date: 2021/2/23
 * @time: 5:12 下午
 * @email: jinfeng.wang@mobvista.com
 */
class AdnPreClickDaily extends CommonSparkJob with Serializable {
  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("datetime", true, "datetime")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)

    val datetime = commandLine.getOptionValue("datetime")
    val output = commandLine.getOptionValue("output")
    val coalesce = commandLine.getOptionValue("coalesce")

    val spark = MobvistaConstant.createSparkSession(s"AdnPreClickDaily.$datetime")

    val year = datetime.substring(0, 4)
    val month = datetime.substring(4, 6)
    val day = datetime.substring(6, 8)

    val sql =
      s"""
         |SELECT request_id, campaign_id, platform, gaid, idfa, imei, dev_id android_id, getDevId(ext_sysid) sysid, getRuid(ext_algo) ruid, getDevId(cdn_ab) idfv, ext_oaid oaid
         |FROM dwh.ods_adn_trackingnew_preclick WHERE yyyy = '$year' AND mm = '$month' AND dd = '$day' AND request_id != '0' AND LENGTH(campaign_id) >= 2
         |""".stripMargin

    try {
      spark.udf.register("getDevId", AdnConstant.getDevId _)
      spark.udf.register("getRuid", AdnConstant.getRuid _)

      val rdd = spark.sql(sql).rdd.map(r => {
        val arrayBuffer = new ArrayBuffer[String]()

        val requestId = r.getAs[String]("request_id")
        val campaignId = r.getAs[String]("campaign_id")
        val platform = r.getAs[String]("platform")
        val gaid = r.getAs[String]("gaid")
        val idfa = r.getAs[String]("idfa")
        val imei = r.getAs[String]("imei")
        val androidId = r.getAs[String]("android_id")
        val sysId = r.getAs[String]("sysid")
        val idfv = r.getAs[String]("idfv")
        val oaid = r.getAs[String]("oaid")
        val ruid = r.getAs[String]("ruid")

        if (StringUtils.isNotBlank(gaid) && gaid.matches(MobvistaConstant.didPtn)) {
          arrayBuffer += MRUtils.JOINER.join(gaid, "gaid", requestId, campaignId, platform)
        }
        if (StringUtils.isNotBlank(idfa) && idfa.matches(MobvistaConstant.didPtn)) {
          arrayBuffer += MRUtils.JOINER.join(idfa, "idfa", requestId, campaignId, platform)
        }
        if (StringUtils.isNotBlank(imei) && imei.matches(MobvistaConstant.imeiPtn)) {
          arrayBuffer += MRUtils.JOINER.join(imei, "imei", requestId, campaignId, platform)
        }
        if (StringUtils.isNotBlank(androidId) && androidId.matches(MobvistaConstant.andriodIdPtn)) {
          arrayBuffer += MRUtils.JOINER.join(androidId, "androidid", requestId, campaignId, platform)
        }
        if (StringUtils.isNotBlank(idfv) && idfv.matches(MobvistaConstant.didPtn)) {
          arrayBuffer += MRUtils.JOINER.join(idfv, "idfv", requestId, campaignId, platform)
        }
        if (StringUtils.isNotBlank(oaid) && oaid.matches(MobvistaConstant.oaidPtb)) {
          arrayBuffer += MRUtils.JOINER.join(oaid, "oaid", requestId, campaignId, platform)
        }
        if (StringUtils.isNotBlank(sysId) && sysId.matches(MobvistaConstant.didPtn)) {
          arrayBuffer += MRUtils.JOINER.join(sysId, "sysid", requestId, campaignId, platform)
        }
        if (StringUtils.isNotBlank(ruid) && ruid.length > 16) {
          arrayBuffer += MRUtils.JOINER.join(ruid, "ruid", requestId, campaignId, platform)
        }
        arrayBuffer.toIterator
      }).flatMap(l => l)

      FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)
      rdd.repartition(coalesce.toInt)
        .saveAsTextFile(output, classOf[GzipCodec])
    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

}

object AdnPreClickDaily{
  def main(args: Array[String]): Unit = {
    new AdnPreClickDaily().run(args)
  }
}