AdnInstallDaily.scala 5.12 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
package mobvista.dmp.datasource.adn

import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.util.MRUtils
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.broadcast.Broadcast

import java.net.URI
import scala.collection.mutable.ArrayBuffer

class AdnInstallDaily extends CommonSparkJob with Serializable {

  var cmpPkgMap: Broadcast[scala.collection.Map[String, String]] = null

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("datetime", true, "datetime")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)

    val datetime = commandLine.getOptionValue("datetime")
    val output = commandLine.getOptionValue("output")
    val coalesce = commandLine.getOptionValue("coalesce")

    val spark = MobvistaConstant.createSparkSession(s"AdnInstallDaily.$datetime")

    val sql =
      s"""
         |SELECT request_id, adn.campaign_id campaign_id, cmp_pkg.package_name package_name, platform, gaid, idfa, imei,
         |  android_id, sysid, ruid, idfv, oaid
         |  FROM (
         |    SELECT request_id, campaign_id, platform, gaid, idfa, imei, dev_id android_id, getDevId(ext_sysid) sysid,
         |    getRuid(ext_algo) ruid, getDevId(cdn_ab) idfv, ext_oaid oaid
         |      FROM dwh.ods_adn_trackingnew_install WHERE CONCAT(yyyy,mm,dd) = '$datetime'
         |    ) adn LEFT JOIN
         |    (SELECT campaign_id, package_name FROM dwh.dim_adn_campaign WHERE CONCAT(year,month,day) = '$datetime') cmp_pkg
         |    ON adn.campaign_id = cmp_pkg.campaign_id
         |""".stripMargin

    try {

      spark.udf.register("getDevId", AdnConstant.getDevId _)
      spark.udf.register("getRuid", AdnConstant.getRuid _)

      val rdd = spark.sql(sql).dropDuplicates().rdd.map(r => {
        val arrayBuffer = new ArrayBuffer[String]()

        val requestId = r.getAs[String]("request_id")
        val campaignId = r.getAs[String]("campaign_id")
        val packageName = if (StringUtils.isNotBlank(r.getAs("package_name"))) {
          r.getAs[String]("package_name")
        } else {
          ""
        }
        val platform = r.getAs[String]("platform")
        val gaid = r.getAs[String]("gaid")
        val idfa = r.getAs[String]("idfa")
        val imei = r.getAs[String]("imei")
        val androidId = r.getAs[String]("android_id")
        val sysId = r.getAs[String]("sysid")
        val idfv = r.getAs[String]("idfv")
        val oaid = r.getAs[String]("oaid")
        val ruid = r.getAs[String]("ruid")

        if (StringUtils.isNotBlank(gaid) && gaid.matches(MobvistaConstant.didPtn)) {
          arrayBuffer += MRUtils.JOINER.join(gaid, "gaid", platform, requestId, campaignId, packageName)
        }
        if (StringUtils.isNotBlank(idfa) && idfa.matches(MobvistaConstant.didPtn)) {
          arrayBuffer += MRUtils.JOINER.join(idfa, "idfa", platform, requestId, campaignId, packageName)
        }
        if (StringUtils.isNotBlank(imei) && imei.matches(MobvistaConstant.imeiPtn)) {
          arrayBuffer += MRUtils.JOINER.join(imei, "imei", platform, requestId, campaignId, packageName)
        }
        if (StringUtils.isNotBlank(androidId) && androidId.matches(MobvistaConstant.andriodIdPtn)) {
          arrayBuffer += MRUtils.JOINER.join(androidId, "android_id", platform, requestId, campaignId, packageName)
        }
        if (StringUtils.isNotBlank(idfv) && idfv.matches(MobvistaConstant.didPtn)) {
          arrayBuffer += MRUtils.JOINER.join(idfv, "idfv", platform, requestId, campaignId, packageName)
        }
        if (StringUtils.isNotBlank(oaid) && oaid.matches(MobvistaConstant.oaidPtb)) {
          arrayBuffer += MRUtils.JOINER.join(oaid, "oaid", platform, requestId, campaignId, packageName)
        }
        if (StringUtils.isNotBlank(sysId) && sysId.matches(MobvistaConstant.didPtn)) {
          arrayBuffer += MRUtils.JOINER.join(sysId, "sysid", platform, requestId, campaignId, packageName)
        }
        if (StringUtils.isNotBlank(ruid) && ruid.length > 16) {
          arrayBuffer += MRUtils.JOINER.join(ruid, "ruid", platform, requestId, campaignId, packageName)
        }
        arrayBuffer.toIterator
      }).flatMap(l => l)

      FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)
      rdd.repartition(coalesce.toInt)
        .saveAsTextFile(output, classOf[GzipCodec])
    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  def getPkgName(campaignId: String): String = {
    var pkgName = cmpPkgMap.value.getOrElse(campaignId, "")
    if (StringUtils.isNotBlank(pkgName) && pkgName.matches("^id\\d+$")) {
      pkgName = pkgName.replace("id", "")
    }
    pkgName
  }
}

object AdnInstallDaily {
  def main(args: Array[String]): Unit = {
    new AdnInstallDaily().run(args)
  }
}