DmToutiaoTotal.scala 2.39 KB
package mobvista.dmp.datasource.toutiao

import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.spark.sql.SparkSession

class DmToutiaoTotal extends CommonSparkJob with Serializable {

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return 1
    } else {
      printOptions(commandLine)
    }

    val date = commandLine.getOptionValue("date")
    val output = commandLine.getOptionValue("output")
    val yestoday = commandLine.getOptionValue("yestoday")
    val updateDate = commandLine.getOptionValue("updateDate")

    val spark = SparkSession.builder()
      .appName("dmp_DmToutiaoTotal_fengliang")
      .config("spark.rdd.compress", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()
    import spark.implicits._

    try {
      val sql =
        s"""
          |select t.device_id, t.campaign_id, t.update_date
          |from (
          |  select t.device_id, t.campaign_id, t.update_date,
          |   row_number() over(partition by t.device_id, t.campaign_id order by t.update_date desc) as rk
          |  from (
          |    select t.device_id, t.campaign_id, '${updateDate}' as update_date
          |    from dwh.etl_toutiao_launch_daily t
          |    where t.dt='${date}'
          |    union all
          |    select t.device_id, t.campaign_id, t.update_date
          |    from dwh.dm_toutiao_launch_total t
          |    where t.dt='${yestoday}'
          |  ) t
          |) t
          |where t.rk='1'
        """.stripMargin
      spark.sql(sql)
        .write
        .option("orc.compress", "zlib")
        .orc(output)

    } finally {
      if (spark != null) {
        spark.stop()
      }
    }

    0
  }

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("date", true, "[must] today date ")
    options.addOption("updateDate", true, "[must] update date ")
    options.addOption("yestoday", true, "[must] yestoday ")
    options.addOption("output", true, "[must] output path ")
    options
  }
}

object DmToutiaoTotal {
  def main(args: Array[String]): Unit = {
    new DmToutiaoTotal().run(args)
  }
}