TrackingInstallDailyV2.scala 3.5 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
package mobvista.dmp.datasource.datatory

import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{SaveMode, SparkSession}

/**
  * @package: mobvista.dmp.datasource.datatory
  * @author: wangjf
  * @date: 2019-08-19 15:01:03
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
class TrackingInstallDailyV2 extends CommonSparkJob with java.io.Serializable {
  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("date", true, "date")
    options.addOption("output", true, "output")

    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val date = commandLine.getOptionValue("date")
    val output = commandLine.getOptionValue("output")

    val spark = SparkSession
      .builder()
      .appName("TrackingAdnInstallDaily")
      .config("spark.rdd.compress", "true")
      .config("spark.shuffle.compress", "true")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.io.compression.codec", "lz4")
      .config("spark.io.compression.lz4.blockSize", "64k")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    val sc = spark.sparkContext
    try {

      val campaign_list_cpi = ConstantV2.jdbcConnection(spark, "mob_adn", "publisher_channel")
        .select("id", "channel_name")
        .toDF("id", "channel_name")

      spark.udf.register("check_deviceId", Constant.check_deviceId _)

      import spark.implicits._
      var sql = ConstantV2.tracking_adn_install_sql.replace("@date", date)
        .replace("@check_deviceId", "check_deviceId(dev_id)")

      spark.sql(sql)
        .rdd.map(r => {
        val dev_id = r.getAs("dev_id")
        val idfa = r.getAs("idfa")
        val gaid = r.getAs("gaid")
        val imei = r.getAs("imei")
        var device_id = ""
        if (StringUtils.isNotBlank(imei)) {
          device_id = imei
        } else if (StringUtils.isNotBlank(idfa)) {
          device_id = idfa
        } else if (StringUtils.isNotBlank(gaid)) {
          device_id = gaid
        }
        AdnInstallClass(device_id, r.getAs("platform"), r.getAs("app_id"), r.getAs("campaign_id"), r.getAs("device_model"), r.getAs("os_version"),
          r.getAs("country_code"), r.getAs("ios_ab")
        )
      }).filter(a => {
        StringUtils.isNotBlank(a.dev_id)
      }).toDF
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "snappy")
        .orc(output+"/adn_install")

      sql = ConstantV2.tracking_3s_install_sql_v2.replace("@date", date)

      spark.sql(sql)
        .rdd.map(r => {
        SSInstallClass(r.getAs("device_id"), r.getAs("offer_id"), r.getAs("device_model"), r.getAs("os_version"), r.getAs("country"),
          r.getAs("city"), r.getAs("event_id"), r.getAs("event_name"), r.getAs("event_type"))
      }).filter(r => {
        StringUtils.isNotBlank(r.dev_id)
      }).toDF
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "snappy")
        .orc(output+"/3s_install")

    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }
}

object TrackingInstallDailyV2 {
  def main(args: Array[String]): Unit = {
    new TrackingInstallDailyV2().run(args)
  }
}