package mobvista.dmp.datasource.rtdmp

import com.alibaba.fastjson.JSONObject
import com.datastax.spark.connector._
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.util.{DateUtil, PropertyUtil}
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.sql.{Row, SparkSession}

import scala.collection.JavaConversions._

/**
 * @package: mobvista.dmp.datasource.rtdmp
 * @author: wangjf
 * @date: 2020/7/13
 * @time: 11:25 上午
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
class AudienceInfoWrite extends CommonSparkJob with Serializable {

  val slf1 = "yyyyMMddHH"
  val slf2 = "yyyy-MM-dd HH"

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("datetime", true, "datetime")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val datetime = commandLine.getOptionValue("datetime")

    val spark = SparkSession
      .builder()
      .appName("AudienceInfoWrite")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "lz4")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.cassandra.connection.host", PropertyUtil.getProperty("ip.properties", "aws_host"))
      .config("spark.cassandra.connection.port", "9042")
      .config("spark.cassandra.connection.remoteConnectionsPerExecutor", "15")
      .config("spark.cassandra.connection.localConnectionsPerExecutor", "12")
      .config("spark.cassandra.query.retry.count", "10")
      .config("spark.cassandra.connection.compression", "LZ4")
      .config("spark.cassandra.input.consistency.level", "LOCAL_ONE")
      .config("spark.cassandra.output.consistency.level", "LOCAL_ONE")
      .config("spark.cassandra.input.fetch.sizeInRows", "2048")
      .config("spark.cassandra.concurrent.reads", "2048")
      .config("spark.cassandra.output.concurrent.writes", "16")
      .config("spark.cassandra.output.batch.grouping.buffer.size", "1024")
      .config("spark.cassandra.output.batch.size.bytes", "1024")
      .config("spark.cassandra.connection.keepAliveMS", "60000")
      .enableHiveSupport()
      .getOrCreate()

    val sc = spark.sparkContext
    try {
      val keyspace = "rtdmp"
      val tableName = "audience_info"
      val columns = SomeColumns("devid", "audience_data", "update_time")
      val sql =
        s"""
          |SELECT devid, audience_map, update_time FROM dwh.audience_merge WHERE dt = '$datetime'
          |""".stripMargin

      val df = spark.sql(sql).rdd.map(r => {
        val devid = r.getAs[String]("devid")
        val audienceMap = r.getAs[String]("audience_map")
        val audienceData = new JSONObject()
        MobvistaConstant.String2JSONObject(audienceMap).asInstanceOf[java.util.Map[String, String]].foreach(kv => {
          audienceData.put(kv._1, DateUtil.format(DateUtil.parse(kv._2, slf1), slf2))
        })
        val update_time = DateUtil.format(DateUtil.parse(r.getAs[String]("update_time"), slf1), slf2)
        Row(devid, audienceData.toJSONString, update_time)
      })

      df.saveToCassandra(keyspace, tableName, columns)
    } finally {
      if (sc != null) {
        sc.stop()
      }
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }
}

object AudienceInfoWrite {
  def main(args: Array[String]): Unit = {
    new AudienceInfoWrite().run(args)
  }
}