ReadFromCKWriteCS.scala 7.31 KB
package mobvista.dmp.clickhouse.realtime

import java.net.URI
import java.text.SimpleDateFormat

import com.datastax.spark.connector._
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.util.PropertyUtil
import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.sql.{Row, SparkSession}

import scala.collection.mutable

/**
  * @package: mobvista.dmp.clickhouse.realtime
  * @author: wangjf
  * @date: 2019-10-24
  * @time: 14:42
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  * @desc: read from clickhouse,write to cassandra、s3
  */
class ReadFromCKWriteCS extends CommonSparkJob with Serializable {
  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("date", true, "date")
    options.addOption("hour", true, "hour")
    options.addOption("region", true, "region")
    options.addOption("host", true, "host")
    options.addOption("cluster", true, "cluster")
    options.addOption("database", true, "database")
    options.addOption("table", true, "table")
    options.addOption("output", true, "output")
    options.addOption("part", true, "part")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val date = commandLine.getOptionValue("date")
    val hour = commandLine.getOptionValue("hour")
    val region = commandLine.getOptionValue("region")
    val host = commandLine.getOptionValue("host")
    val cluster = commandLine.getOptionValue("cluster")
    val database = commandLine.getOptionValue("database")
    val table = commandLine.getOptionValue("table")
    val output = commandLine.getOptionValue("output")
    val part = commandLine.getOptionValue("part")

    var connections_per_executor_max = 10
    var reg = ""
    region match {
      case "cn" =>
        reg = "cn"
        connections_per_executor_max = 3 * 2
      case "tokyo" =>
        reg = "hk"
        connections_per_executor_max = 6 * 2
      case "virginia" =>
        reg = "vg"
        connections_per_executor_max = 6 * 2
    }

    val spark = SparkSession
      .builder()
      .appName(s"ReadFromCKWriteCS.$date.$hour.$reg")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "lz4")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.clickhouse.driver", "ru.yandex.clickhouse.ClickHouseDriver")
      .config("spark.clickhouse.url", PropertyUtil.getProperty("config.properties", "spark.clickhouse.url"))
      .config("spark.clickhouse.connection.per.executor.max", "10")
      .config("spark.clickhouse.metrics.enable", "true")
      .config("spark.clickhouse.socket.timeout.ms", "300000")
      .config("spark.clickhouse.cluster.auto-discovery", "true")
      .config("spark.cassandra.connection.host", PropertyUtil.getProperty("ip.properties", reg + "_host"))
      .config("spark.cassandra.connection.port", "9042")
      .config("spark.cassandra.connection.factory", s"mobvista.dmp.utils.cassandra.${reg.toUpperCase}Factory")
      .config("spark.cassandra.connection.connections_per_executor_max", String.valueOf(connections_per_executor_max))
      .config("spark.cassandra.output.concurrent.writes", "2048")
      .config("spark.cassandra.output.batch.size.rows", "1024")
      .config("spark.cassandra.output.batch.grouping.buffer.size", "2048")
      .config("spark.cassandra.connection.keep_alive_ms", "600000")
      .enableHiveSupport()
      .getOrCreate()

    try {
      val sdf1 = new SimpleDateFormat("yyyy-MM-dd")
      val sdf2 = new SimpleDateFormat("yyyy/MM/dd")

      val bMap = spark.sparkContext.broadcast(spark.sql(Constant.id_old2new_sql).rdd.map(r => {
        (r.getAs("tag_code").toString, r.getAs("tag_id").toString)
      }).collectAsMap())

      val sc = spark.sparkContext

      val dt = sdf1.format(sdf2.parse(date))
      import io.clickhouse.spark.connector._

      var query = Constant.readFromCKToS3_sql
        .replace("@database", database)
        .replace("@table", table)
        .replace("@dt", dt)
        .replace("@hour", hour)
        .replace("@region", region)

      var df = sc.clickhouseTable(query, cluster)
        .withCustomPartitioning(Constant.buildPart(part.toInt))

      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

      df.map(r => {
        r.getAs("device_id").toString
      }).coalesce(10)
        .saveAsTextFile(output, classOf[GzipCodec])

      query = Constant.readFromCKToCa_sql
        .replace("@database", database)
        .replace("@table", table)
        .replace("@dt", dt)
        .replace("@hour", hour)
        .replace("@region", region)
      df = sc.clickhouseTable(query, cluster)
        .withCustomPartitioning(Constant.buildPart(part.toInt))

      val keyspace = "dmp_realtime_service"
      val tableName = "dmp_user_features"
      val columns = SomeColumns("device_id", "age", "gender", "install_apps", "interest", "frequency" overwrite)
      df.map(r => {
        val device_id = r.getAs("device_id").toString
        val age = r.getAs("age").asInstanceOf[Int]
        val gender = r.getAs("gender").asInstanceOf[Int]
        var install_apps = r.getAs("install_apps").toString
        if (install_apps.length > 2) {
          install_apps = install_apps.substring(1, install_apps.length - 1)
        }
        var interest = r.getAs("interest").toString
        if (interest.length > 2) {
          interest = interest.substring(1, interest.length - 1)
        }
        val interest_set = new mutable.HashSet[Int]()
        interest.replace("\"", "").replace("\'", "")
          .replace("'", "")
          .split(",").foreach(r => {
          if (bMap.value.keySet.contains(r) && StringUtils.isNotBlank(bMap.value(r))) {
            interest_set.add(bMap.value(r).toInt)
          }
        })

        val frequencySet = new mutable.HashSet[(String, Int)]()
        val frequency = r.getAs("frequency").toString
        import scala.collection.JavaConversions._
        val json = GsonUtil.String2JsonObject(frequency)
        json.entrySet().foreach(j => {
          if (StringUtils.isNotBlank(j.getKey) && bMap.value.keySet.contains(j.getKey)) {
            frequencySet.add(bMap.value(j.getKey), j.getValue.getAsInt)

            interest_set.add(bMap.value(j.getKey).toInt)
          }
        })

        Row(device_id, age, gender, install_apps, interest_set.mkString(","), mutable.WrappedArray.make(frequencySet.toArray))
      }).saveToCassandra(keyspace, tableName, columns)

      /*
      implicit val clickhouseDataSource: ClickHouseDataSource = ClickHouseConnectionFactory.get(host)

      val clusterName = Some(cluster): Option[String]
      val tdf = spark.emptyDataFrame
      val partition = sdf3.format(sdf2.parse(date))
      tdf.dropPartition(database, table, s"($partition,'$hour','$region')", clusterName)
      */

    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }
}

object ReadFromCKWriteCS {
  def main(args: Array[String]): Unit = {
    new ReadFromCKWriteCS().run(args)
  }
}