BaiChuanEtlJob.scala 4.62 KB
package mobvista.dmp.datasource.baichuan

import java.net.URI

import mobvista.dmp.datasource.baichuan.Constant.BaiChuan
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SaveMode
import org.apache.spark.storage.StorageLevel

/**
  * @package: mobvista.dmp.datasource.baichuan
  * @author: wangjf
  * @date: 2019-08-29
  * @time: 20:52
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
class BaiChuanEtlJob extends java.io.Serializable {

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("date", true, "date")
    options.addOption("output", true, "output")
    options
  }

  protected def run(args: Array[String]) {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val date = commandLine.getOptionValue("date")
    val output = commandLine.getOptionValue("output")

    val spark = mobvista.dmp.common.MobvistaConstant.createSparkSession(s"BaiChuanEtlJob.$date")
    try {

      spark.udf.register("hasNo", Constant.hasNo _)

      val df = spark.sql(Constant.filter_sql.replace("@date", date))
        .persist(StorageLevel.MEMORY_AND_DISK_SER)
      df.createOrReplaceTempView("ods_user_info_daily")

      val set = Set("387682726", "518966501", "com.taobao.taobao", "com.tmall.wireless")
      val itr = set.iterator

      import spark.implicits._
      /**
        * 分别处理 天猫,淘宝不同OS 设备: 2 淘宝 1 天猫
        */
      while (itr.hasNext) {
        var output_dir = output
        var output_imei = output
        var sql = Constant.etl_sql
        var sql_imeimd5 = Constant.etl_sql
        var imeiFlag = false

        /**
          * appOsId: 1 android 2 IOS 3 android_md5
          */

        val ir = itr.next
        ir match {
          case "387682726" =>
            sql = sql.replace("@package", "387682726").replace("@appId", "2")
              .replace("@appOs", "ios").replace("@devType", "idfa").replace("@osId", "2")
            output_dir = output_dir + "/" + "2_2"
          case "518966501" =>
            sql = sql.replace("@package", "518966501").replace("@appId", "1")
              .replace("@appOs", "ios").replace("@devType", "idfa").replace("@osId", "2")
            output_dir = output_dir + "/" + "1_2"
          case "com.taobao.taobao" =>
            sql = sql.replace("@package", "com.taobao.taobao").replace("@appId", "2")
              .replace("@appOs", "android").replace("@devType", "imei").replace("@osId", "1")
            output_dir = output_dir + "/" + "2_1"
            sql_imeimd5 = sql_imeimd5.replace("@package", "com.taobao.taobao").replace("@appId", "2")
              .replace("@appOs", "android").replace("@devType", "imeimd5").replace("@osId", "3")
            imeiFlag = true
            output_imei = output_imei + "/" + "2_3"
          case "com.tmall.wireless" =>
            sql = sql.replace("@package", "com.tmall.wireless").replace("@appId", "1")
              .replace("@appOs", "android").replace("@devType", "imei").replace("@osId", "1")
            output_dir = output_dir + "/" + "1_1"
            sql_imeimd5 = sql_imeimd5.replace("@package", "com.tmall.wireless").replace("@appId", "1")
              .replace("@appOs", "android").replace("@devType", "imeimd5").replace("@osId", "3")
            imeiFlag = true
            output_imei = output_imei + "/" + "1_3"

        }
        //  从 install 获取的设备信息
        val dff = spark.sql(sql).rdd.map(r => {
          BaiChuan(r.getAs("dev_id"), Integer.parseInt(r.getAs("app_id").toString), Integer.parseInt(r.getAs("app_os").toString))
        }).toDF.dropDuplicates

        FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output_dir), true)
        dff.write
          .mode(SaveMode.Overwrite)
          .option("orc.compress", "zlib")
          .orc(output_dir)

        if (imeiFlag) {
          val imeiDff = spark.sql(sql_imeimd5).rdd.map(r => {
            BaiChuan(r.getAs("dev_id"), Integer.parseInt(r.getAs("app_id").toString), Integer.parseInt(r.getAs("app_os").toString))
          }).toDF.dropDuplicates

          FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output_imei), true)
          imeiDff.write
            .mode(SaveMode.Overwrite)
            .option("orc.compress", "zlib")
            .orc(output_imei)
        }
      }
    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
  }
}

object BaiChuanEtlJob {
  def main(args: Array[String]): Unit = {
    new BaiChuanEtlJob().run(args)
  }
}