package mobvista.dmp.datasource.rtdmp

import com.datastax.spark.connector._
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.util.PropertyUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.sql.{Row, SparkSession}

import scala.collection.mutable

/**
  * @package: mobvista.dmp.datasource.rtdmp
  * @author: wangjf
  * @date: 2020/7/13
  * @time: 11:25 上午
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
class DeviceInfoWrite extends CommonSparkJob with Serializable {

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("input", true, "input")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val input = commandLine.getOptionValue("input")

    val spark = SparkSession
      .builder()
      .appName("DeviceInfoWrite")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "lz4")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.cassandra.connection.host", PropertyUtil.getProperty("ip.properties", "aws_host"))
      .config("spark.cassandra.connection.port", "9042")
      .config("spark.cassandra.connection.compression", "LZ4")
      .config("spark.cassandra.output.consistency.level", "LOCAL_QUORUM")
      .config("spark.cassandra.connection.remoteConnectionsPerExecutor","32")
      .config("spark.cassandra.connection.localConnectionsPerExecutor","64")
      .config("spark.cassandra.output.concurrent.writes", "512")
      .config("spark.cassandra.output.batch.grouping.buffer.size", "2048")
      .config("spark.cassandra.connection.keep_alive_ms", "60000")
      .enableHiveSupport()
      .getOrCreate()

    //  .config("spark.cassandra.connection.factory", s"mobvista.dmp.utils.cassandra.AWSFactory")
    //  .config("spark.cassandra.connection.connections_per_executor_max", "64")
    //  .config("spark.cassandra.output.concurrent.writes", "512")
    //  .config("spark.cassandra.output.batch.grouping.buffer.size", "1024")
    //  .config("spark.cassandra.output.consistency.level", "LOCAL_ONE")
    //  .config("spark.cassandra.connection.keep_alive_ms", "600000")
    val sc = spark.sparkContext
    try {

      val keyspace = "rtdmp"
      val tableName = "recent_device_region"
      val columns = SomeColumns("devid", "dev_type", "region")

      val df = spark.read.orc(input).rdd.map(r => {
        Row(r.getAs[String]("device_id"), r.getAs[String]("device_type"), r.getAs[mutable.WrappedArray[String]]("region").toSet)
      })
      df.saveToCassandra(keyspace, tableName, columns)

    } finally {
      if (sc != null) {
        sc.stop()
      }
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }
}

object DeviceInfoWrite {
  def main(args: Array[String]): Unit = {
    new DeviceInfoWrite().run(args)
  }
}