EtlAliIosActivitionDaily.scala 6.27 KB
package mobvista.dmp.datasource.taobao

import java.net.URI

import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel


class EtlAliIosActivitionDaily extends CommonSparkJob {


  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("output", true, "[must] output")
    options.addOption("outputdaily", true, "[must] outputdaily")
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("today", true, "[must] today")
    options.addOption("yesterday", true, "[must] yesterday")
    options.addOption("dt_dash_today", true, "[must] dt_dash_today")
    options.addOption("last_req_day", true, "[must] last_req_day")
    options.addOption("dt_dash_rec14day", true, "[must] dt_dash_rec14day")
    options.addOption("request_count_result", true, "[must] request_count_result")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val output = commandLine.getOptionValue("output")
    val outputdaily = commandLine.getOptionValue("outputdaily")
    val coalesce = commandLine.getOptionValue("coalesce")
    val today = commandLine.getOptionValue("today")
    val dt_dash_today = commandLine.getOptionValue("dt_dash_today")
    val last_req_day = commandLine.getOptionValue("last_req_day")
    val dt_dash_rec14day = commandLine.getOptionValue("dt_dash_rec14day")
    val yesterday = commandLine.getOptionValue("yesterday")
    val request_count_result = commandLine.getOptionValue("request_count_result")


    val spark = SparkSession.builder()
      .appName("EtlAliActivitionDaily")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputdaily), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(request_count_result), true)


    try {

      val sql1=
        s"""
           |select XX.device_id_md5,XX.device_id,XX.device_type
           |FROM (select X.device_id_md5,X.device_id,X.device_type,
           |row_number() over(partition by X.device_id_md5 order by X.device_type asc) rk
           |from ( select device_id,device_type,
           |case when device_type = 'idfa' then MD5(device_id) when device_type = 'idfamd5' then device_id end as device_id_md5
           |from dwh.ods_dmp_user_info where dt ='${today}'
           | and device_type in ('idfa','idfamd5')
           | and last_req_day >='${last_req_day}'
           | and  upper(country) = 'CN' ) X ) XX
           | WHERE XX.rk= 1
        """.stripMargin

      val dfCache: DataFrame = spark.sql(sql1).persist(StorageLevel.MEMORY_AND_DISK_SER)
      dfCache.createOrReplaceTempView("ods_user_info_daily")
      val sql2="select count(distinct device_id_md5) from ods_user_info_daily"
      spark.sql(sql2).rdd.map(_.mkString("==")).saveAsTextFile(request_count_result)

      val dt_14days_ago: String = dt_dash_rec14day.replace("-", "")
      //当天设备 - 最近14天设备 = 要上传oss设备
//      9f89c84a559f573636a47ff8daed0d33 是 "00000000-0000-0000-0000-000000000000" 的md5值,过滤掉该设备
      val sql3 =
        s"""
           |select t1.device_id_md5
           |from  ods_user_info_daily t1
           |left join (
           |  select dev_id device_id_md5
           |    from dwh.ali_ios_user_activation_daily where dt >='${dt_14days_ago}' and dt<='${yesterday}'
           |) t2
           |on(t1.device_id_md5 = t2.device_id_md5)
           |where t2.device_id_md5 is null and  t1.device_id_md5!='9f89c84a559f573636a47ff8daed0d33'
           |group by t1.device_id_md5 limit 30000000
        """.stripMargin

    // val sql3 =
    //    s"""
    //       |select t1.device_id_md5
    //       |from  ods_user_info_daily t1
    //       |group by t1.device_id_md5  limit 150
    //    """.stripMargin

      val dfCacheUpload: DataFrame = spark.sql(sql3).persist(StorageLevel.MEMORY_AND_DISK_SER)
      dfCacheUpload.createOrReplaceTempView("upload_data_daily")
      dfCacheUpload.rdd.map(_.mkString).coalesce(40).saveAsTextFile(outputdaily)
      // 得到最近15天的设备,即当天设备与昨天分区最近14天的设备合并
      val sql4 =
        s"""
           |select t.device_id_md5,t.device_type,t.device_id, t.update_date
           |from (
           |  select t.device_id_md5,t.device_type,t.device_id,t.update_date,
           |    row_number() over(partition by t.device_id_md5 order by t.update_date desc) rk
           |  from (
           |     select device_id_md5,device_id,device_type, update_date
           |        from
           |     (select t1.device_id_md5,t1.device_id,t1.device_type, '${dt_dash_today}' as update_date
           |        from  ods_user_info_daily t1
           |     join  upload_data_daily t2
           |        on(t1.device_id_md5 = t2.device_id_md5) ) tmp_today
           |    union all
           |    select device_id_md5,device_id,device_type, update_date
           |    from dwh.ali_ios_user_activation_rec15days
           |    where dt='${yesterday}'  and update_date >= '${dt_dash_rec14day}'
           |  ) t
           |) t
           |where t.rk='1'
        """.stripMargin

      spark.sql(sql4).coalesce(coalesce.toInt)
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .option("mapreduce.fileoutputcommitter.marksuccessfuljobs", false)
        .orc(output)

    } finally {
      spark.stop()
    }
    0
  }
}

object EtlAliIosActivitionDaily {
  def main(args: Array[String]): Unit = {
    new EtlAliIosActivitionDaily().run(args)
  }
}