package mobvista.dmp.datasource.baichuan

import mobvista.dmp.utils.clickhouse.ClickHouseConnectionFactory
import mobvista.dmp.utils.clickhouse.ClickHouseSparkExt._
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import ru.yandex.clickhouse.ClickHouseDataSource

/**
  * @package: mobvista.dmp.datasource.baichuan
  * @author: wangjf
  * @date: 2019-08-29
  * @time: 20:52
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
class BaiChuanJob extends java.io.Serializable {

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("date", true, "date")
    options.addOption("host", true, "host")
    options.addOption("cluster", true, "cluster")
    options.addOption("database", true, "database")
    options.addOption("table", true, "table")
    //  options.addOption("output", true, "output")
    options
  }

  protected def run(args: Array[String]) {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val date = commandLine.getOptionValue("date")
    val cluster = commandLine.getOptionValue("cluster")
    val host = commandLine.getOptionValue("host")
    val database = commandLine.getOptionValue("database")
    val table = commandLine.getOptionValue("table")
    //  val output = commandLine.getOptionValue("output")

    val spark = SparkSession
      .builder()
      .appName("BaiChuanJob")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "lz4")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .enableHiveSupport()
      .getOrCreate()
    try {

      val clusterName = Some(cluster): Option[String]

      spark.udf.register("hasNo", Constant.hasNo _)
      val df = spark.sql(Constant.filter_sql.replace("@date", date))
        .persist(StorageLevel.MEMORY_AND_DISK_SER)
      df.createOrReplaceTempView("ods_user_info_daily")

      val set = Set("387682726", "518966501", "com.taobao.taobao", "com.tmall.wireless")
      val itr = set.iterator

      implicit val clickhouseDataSource: ClickHouseDataSource = ClickHouseConnectionFactory.get(host)

      val update_date = Constant.sdf1.format(Constant.sdf2.parse(date))

      val tdf = spark.emptyDataFrame
      //  drop table
      tdf.dropPartition(database, table, date, clusterName)

      import spark.implicits._

      /**
        * 分别处理 天猫,淘宝不同OS 设备
        */
      while (itr.hasNext) {
        var sql = Constant.etl_sql
        var sql_imeimd5 = Constant.etl_sql
        var imeiFlag = false
        //  var output_dir = output
        //  var output_dir_imei = output

        /**
          * osId 2 ios, 1 android 3 android_md5
          */

        val ir = itr.next
        ir match {
          case "387682726" =>
            sql = sql.replace("@package", "387682726").replace("@appId", "2")
              .replace("@appOs", "ios").replace("@devType", "idfa").replace("@osId", "2")
            //  output_dir = output_dir + "/" + "2_2"
          case "518966501" =>
            sql = sql.replace("@package", "518966501").replace("@appId", "1")
              .replace("@appOs", "ios").replace("@devType", "idfa").replace("@osId", "2")
            //  output_dir = output_dir + "/" + "1_2"
          case "com.taobao.taobao" =>
            sql = sql.replace("@package", "com.taobao.taobao").replace("@appId", "2")
              .replace("@appOs", "android").replace("@devType", "imei").replace("@osId", "1")
            //  output_dir = output_dir + "/" + "2_1"
            sql_imeimd5 = sql_imeimd5.replace("@package", "com.taobao.taobao").replace("@appId", "2")
              .replace("@appOs", "android").replace("@devType", "imeimd5").replace("@osId", "3")
            imeiFlag = true
            //  output_dir_imei = output_dir_imei + "/" + "2_1_md5"
          case "com.tmall.wireless" =>
            sql = sql.replace("@package", "com.tmall.wireless").replace("@appId", "1")
              .replace("@appOs", "android").replace("@devType", "imei").replace("@osId", "1")
            //  output_dir = output_dir + "/" + "1_1"
            sql_imeimd5 = sql_imeimd5.replace("@package", "com.tmall.wireless").replace("@appId", "1")
              .replace("@appOs", "android").replace("@devType", "imeimd5").replace("@osId", "3")
            imeiFlag = true
            //  output_dir_imei = output_dir_imei + "/" + "1_1_md5"
        }
        //  从 install 获取的设备信息
        val dff = spark.sql(sql).rdd.map(r => {
          BaiChuanCK(r.getAs("dev_id"), Integer.parseInt(r.getAs("app_id").toString), Integer.parseInt(r.getAs("app_os").toString), 0, update_date)
        }).toDF.dropDuplicates

        /*
        dff.write
          .mode(SaveMode.Overwrite)
          .option("orc.compress", "zlib")
          .orc(output_dir)
        */
        if (imeiFlag) {
          val imeiDff = spark.sql(sql_imeimd5).rdd.map(r => {
            BaiChuanCK(r.getAs("dev_id"), Integer.parseInt(r.getAs("app_id").toString), Integer.parseInt(r.getAs("app_os").toString), 0, update_date)
          }).toDF.dropDuplicates
          /*
          imeiDff.write
            .mode(SaveMode.Overwrite)
            .option("orc.compress", "zlib")
            .orc(output_dir_imei)
          */
          imeiDff.saveToClickHouse(database, table, Seq(update_date), Seq("dt"), clusterName, batchSize = 100000)
        }
        //  dff.createClickHouseDb(database, clusterName)
        //  dff.detachPartition(database, table, "", clusterName)
        //  dff.createClickHouseTable(database, table, Seq("dt"), Constant.indexColumn, Constant.orderColumn, clusterName)
        dff.saveToClickHouse(database, table, Seq(update_date), Seq("dt"), clusterName, batchSize = 100000)
      }
    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
  }
}

object BaiChuanJob {
  def main(args: Array[String]): Unit = {
    new BaiChuanJob().run(args)
  }
}