package mobvista.dmp.datasource.rtdmp

import com.alibaba.fastjson.JSON
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.util.{DateUtil, MySQLUtil}
import mobvista.dmp.utils.clickhouse.ClickHouseConnectionFactory
import mobvista.dmp.utils.clickhouse.ClickHouseSparkExt._
import org.apache.commons.cli.{BasicParser, Options}
import ru.yandex.clickhouse.ClickHouseDataSource

import java.text.SimpleDateFormat
import java.util.Calendar
import scala.collection.JavaConversions._
import scala.collection.mutable


/**
 * @package: mobvista.dmp.datasource.rtdmp
 * @author: wangjf
 * @date: 2020/8/17
 * @time: 10:57 上午
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
class RTDmpMergeCK extends CommonSparkJob with Serializable {
  var expire_time = ""

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("date_time", true, "date_time")
    options.addOption("host", true, "host")
    options.addOption("cluster", true, "cluster")
    options.addOption("database", true, "database")
    options.addOption("table", true, "table")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val date_time = commandLine.getOptionValue("date_time")
    val cluster = commandLine.getOptionValue("cluster")
    val host = commandLine.getOptionValue("host")
    val database = commandLine.getOptionValue("database")
    val table = commandLine.getOptionValue("table")

    val spark = MobvistaConstant.createSparkSession(s"RTDmpMergeCK.$date_time")

    try {
      expire_time = DateUtil.getDayByString(date_time, "yyyyMMddHH", -1)

      val tdf = spark.emptyDataFrame
      val sdf = new SimpleDateFormat("yyyyMMddHH")
      //  drop expire partition
      val calendar = Calendar.getInstance()
      var date = sdf.parse(date_time)
      calendar.setTime(date)
      calendar.set(Calendar.HOUR_OF_DAY, calendar.get(Calendar.HOUR_OF_DAY) - 6)
      val expire_part = sdf.format(calendar.getTime)
      var dt_part = expire_part.substring(0, 8)
      var hour_part = expire_part.substring(8, 10)

      implicit val clickhouseDataSource: ClickHouseDataSource = ClickHouseConnectionFactory.get(host)

      val clusterName = Some(cluster): Option[String]

      tdf.dropPartition(database, table, s"($dt_part,'$hour_part')", clusterName)

      spark.udf.register("process", process _)

      val df = spark.sql(sql.replace("@dt", date_time))
        .filter("size(audience_id) > 0")

      dt_part = date_time.substring(0, 8)
      hour_part = expire_time.substring(8, 10)
      val dt = MobvistaConstant.sdf1.format(MobvistaConstant.sdf2.parse(dt_part))

      tdf.dropPartition(database, table, s"($dt_part,'$hour_part')", clusterName)

      Thread.sleep(120000)

      df.saveToClickHouse(database, table, Seq(dt, hour_part), Seq("dt", "hour"), clusterName, batchSize = 200000)

      MySQLUtil.update(database, table, date_time)

    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  val sql =
    """
      |SELECT devid, process(audience_map) audience_id,
      |  CASE WHEN device_type IS NOT NULL AND device_type != '' THEN device_type ELSE 'unknown' END AS device_type
      |  FROM dwh.audience_merge
      |  WHERE dt = '@dt'
      |""".stripMargin

  def process(audience_map: String): mutable.WrappedArray[Int] = {
    val set = new mutable.HashSet[Int]()
    val audienceMap = JSON.parseObject(audience_map).asInstanceOf[java.util.Map[String, String]]
    audienceMap.retain((_, v) => v.compareTo(expire_time) > 0).keys.foreach(k => {
      set.add(Integer.parseInt(k))
    })
    mutable.WrappedArray.make(set.toArray)
  }
}

object RTDmpMergeCK {
  def main(args: Array[String]): Unit = {
    new RTDmpMergeCK().run(args)
  }
}