DspEtlHourToCK.scala 4.28 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
package mobvista.dmp.clickhouse.dsp

import java.text.SimpleDateFormat
import java.util.Date

import mobvista.dmp.utils.clickhouse.ClickHouseConnectionFactory
import mobvista.dmp.utils.clickhouse.ClickHouseSparkExt._
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.SparkSession
import ru.yandex.clickhouse.ClickHouseDataSource

import scala.collection.mutable

/**
  * @package: mobvista.dmp.clickhouse.dsp
  * @author: wangjf
  * @date: 2019-10-17
  * @time: 10:20
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
class DspEtlHourToCK extends Serializable {
  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("date", true, "date")
    options.addOption("host", true, "host")
    options.addOption("cluster", true, "cluster")
    options.addOption("database", true, "database")
    options.addOption("table", true, "table")
    options.addOption("input", true, "input")
    options.addOption("region", true, "region")
    options.addOption("hour", true, "hour")
    options
  }

  val sdf1 = new SimpleDateFormat("yyyy-MM-dd")
  val sdf2 = new SimpleDateFormat("yyyyMMdd")

  protected def run(args: Array[String]) {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val date = commandLine.getOptionValue("date")
    val cluster = commandLine.getOptionValue("cluster")
    val host = commandLine.getOptionValue("host")
    val database = commandLine.getOptionValue("database")
    val table = commandLine.getOptionValue("table")
    val input = commandLine.getOptionValue("input")
    val region = commandLine.getOptionValue("region")
    val hour = commandLine.getOptionValue("hour")

    val spark = SparkSession
      .builder()
      .appName("DspEtlHourToCK")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "lz4")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()
    try {

      val sc = spark.sparkContext

      val clusterName = Some(cluster): Option[String]
      implicit val clickhouseDataSource: ClickHouseDataSource = ClickHouseConnectionFactory.get(host)

      val partDate = sdf2.format(sdf1.parse(date))

      import spark.implicits._

      //  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      //  println(sdf.format(new Date()))

      val dspDF = spark.read.orc(input)
        .rdd
        .map(r => {
          val set = new mutable.HashSet[Int]()
          val package_list = r.getAs("package_list").asInstanceOf[mutable.WrappedArray[Int]]
          package_list.foreach(packageId => {
            set.add(packageId)
          })
          val gender = if (r.getAs("gender") != null && StringUtils.isNotBlank(r.getAs("gender").toString)) {
            r.getAs("gender").toString match {
              case "m" => 1
              case "f" => 2
              case _ => 0
            }
          } else {
            0
          }
          val version = new Date().getTime
          RealtimeServiceHour(r.getAs("device_id").toString.toUpperCase, r.getAs("platform"),
            gender, 0,
            if (r.getAs("country_code") != null && r.getAs("country_code").toString.length == 2) {
              r.getAs("country_code")
            } else {
              ""
            },
            Array.empty[String], mutable.WrappedArray.make(set.toArray), "", version)
        }).toDF
      /**
        * user_info save
        */
      //  dspDF.createClickHouseDb(database, clusterName)
      //  dspDF.createClickHouseTable(database, table, Seq("dt", "hour", "region"), Constant.indexColumn, Constant.orderColumn, clusterName)
      val emptyDF = spark.emptyDataFrame
      emptyDF.dropPartition(database, table, s"($partDate,'$hour','$region')", clusterName)

      dspDF.saveToClickHouse(database, table, Seq(date, hour, region), Seq("dt", "hour", "region"), clusterName, batchSize = 1000000)

    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
  }
}

object DspEtlHourToCK {
  def main(args: Array[String]): Unit = {
    new DspEtlHourToCK().run(args)
  }
}