DspEtlHourToCK.scala 4.28 KB
package mobvista.dmp.clickhouse.dsp

import java.text.SimpleDateFormat
import java.util.Date

import mobvista.dmp.utils.clickhouse.ClickHouseConnectionFactory
import mobvista.dmp.utils.clickhouse.ClickHouseSparkExt._
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.SparkSession
import ru.yandex.clickhouse.ClickHouseDataSource

import scala.collection.mutable

/**
  * @package: mobvista.dmp.clickhouse.dsp
  * @author: wangjf
  * @date: 2019-10-17
  * @time: 10:20
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
class DspEtlHourToCK extends Serializable {
  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("date", true, "date")
    options.addOption("host", true, "host")
    options.addOption("cluster", true, "cluster")
    options.addOption("database", true, "database")
    options.addOption("table", true, "table")
    options.addOption("input", true, "input")
    options.addOption("region", true, "region")
    options.addOption("hour", true, "hour")
    options
  }

  val sdf1 = new SimpleDateFormat("yyyy-MM-dd")
  val sdf2 = new SimpleDateFormat("yyyyMMdd")

  protected def run(args: Array[String]) {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val date = commandLine.getOptionValue("date")
    val cluster = commandLine.getOptionValue("cluster")
    val host = commandLine.getOptionValue("host")
    val database = commandLine.getOptionValue("database")
    val table = commandLine.getOptionValue("table")
    val input = commandLine.getOptionValue("input")
    val region = commandLine.getOptionValue("region")
    val hour = commandLine.getOptionValue("hour")

    val spark = SparkSession
      .builder()
      .appName("DspEtlHourToCK")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "lz4")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()
    try {

      val sc = spark.sparkContext

      val clusterName = Some(cluster): Option[String]
      implicit val clickhouseDataSource: ClickHouseDataSource = ClickHouseConnectionFactory.get(host)

      val partDate = sdf2.format(sdf1.parse(date))

      import spark.implicits._

      //  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      //  println(sdf.format(new Date()))

      val dspDF = spark.read.orc(input)
        .rdd
        .map(r => {
          val set = new mutable.HashSet[Int]()
          val package_list = r.getAs("package_list").asInstanceOf[mutable.WrappedArray[Int]]
          package_list.foreach(packageId => {
            set.add(packageId)
          })
          val gender = if (r.getAs("gender") != null && StringUtils.isNotBlank(r.getAs("gender").toString)) {
            r.getAs("gender").toString match {
              case "m" => 1
              case "f" => 2
              case _ => 0
            }
          } else {
            0
          }
          val version = new Date().getTime
          RealtimeServiceHour(r.getAs("device_id").toString.toUpperCase, r.getAs("platform"),
            gender, 0,
            if (r.getAs("country_code") != null && r.getAs("country_code").toString.length == 2) {
              r.getAs("country_code")
            } else {
              ""
            },
            Array.empty[String], mutable.WrappedArray.make(set.toArray), "", version)
        }).toDF
      /**
        * user_info save
        */
      //  dspDF.createClickHouseDb(database, clusterName)
      //  dspDF.createClickHouseTable(database, table, Seq("dt", "hour", "region"), Constant.indexColumn, Constant.orderColumn, clusterName)
      val emptyDF = spark.emptyDataFrame
      emptyDF.dropPartition(database, table, s"($partDate,'$hour','$region')", clusterName)

      dspDF.saveToClickHouse(database, table, Seq(date, hour, region), Seq("dt", "hour", "region"), clusterName, batchSize = 1000000)

    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
  }
}

object DspEtlHourToCK {
  def main(args: Array[String]): Unit = {
    new DspEtlHourToCK().run(args)
  }
}