package mobvista.dmp.datasource.joypac

import java.net.URI

import com.google.gson.JsonObject
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.datasource.dm.Constant.{allZero, andriodIdPtn, didPtn, imeiPtn}
import mobvista.dmp.util.DateUtil
import mobvista.dmp.utils.clickhouse.ClickHouseConnectionFactory
import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.cli.Options
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}
import ru.yandex.clickhouse.ClickHouseDataSource

/**
  * @package: mobvista.dmp.datasource.joypac
  * @author: wangjf
  * @date: 2019-12-18
  * @time: 14:10:50
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */

class JoypacResultEtl extends CommonSparkJob {

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("input", true, "[must] input")
    options.addOption("date", true, "[must] date")
    options.addOption("output", true, "[must] output")
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("host", true, "host")
    options.addOption("cluster", true, "cluster")
    options.addOption("database", true, "database")
    options.addOption("table", true, "table")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else {
      printOptions(commandLine)
    }

    val input = commandLine.getOptionValue("input")
    val date = commandLine.getOptionValue("date")
    val output = commandLine.getOptionValue("output")
    val coalesce = commandLine.getOptionValue("coalesce")
    val cluster = commandLine.getOptionValue("cluster")
    val host = commandLine.getOptionValue("host")
    val database = commandLine.getOptionValue("database")
    val table = commandLine.getOptionValue("table")

    val spark = SparkSession.builder()
      .appName("JoypacResultEtl")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()
    val sc = spark.sparkContext

    val updateDate = DateUtil.format(DateUtil.parse(date, "yyyyMMdd"), "yyyy-MM-dd")

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)
    try {
      import spark.implicits._
      val df = sc.textFile(input).map(r => {
        val json = GsonUtil.String2JsonObject(r)

        val idfa = if (json.get("idfa") != null && StringUtils.isNotBlank(json.get("idfa").getAsString)) {
          json.get("idfa").getAsString
        } else {
          ""
        }
        val idfv = if (json.get("idfv") != null && StringUtils.isNotBlank(json.get("idfv").getAsString)) {
          json.get("idfv").getAsString
        } else {
          ""
        }
        val app_version = if (json.get("app_version") != null && StringUtils.isNotBlank(json.get("app_version").getAsString)) {
          json.get("app_version").getAsString
        } else {
          ""
        }
        val package_name = if (json.get("package_name") != null && StringUtils.isNotBlank(json.get("package_name").getAsString)) {
          json.get("package_name").getAsString
        } else {
          ""
        }
        val platform = if (json.get("platform") != null && StringUtils.isNotBlank(json.get("platform").getAsString)) {
          json.get("platform").getAsString
        } else {
          ""
        }
        val apps_info = if (json.get("apps_info") != null && StringUtils.isNotBlank(json.get("apps_info").getAsString)
          && json.get("apps_info").getAsString.startsWith("[") && json.get("apps_info").getAsString.endsWith("]")) {
          json.get("apps_info").getAsString
            .replace("\\", "\\\\")
            .replace("{", "").replace("}", "")
            .replace("[", "{").replace("]", "}")
        } else {
          (new JsonObject).toString
        }
        val deviceId = if (JoypacResultEtl.check_deviceId(idfa)) {
          idfa
        } else {
          idfv
        }
        if (StringUtils.isNotBlank(deviceId)) {
          JoypacEntity(deviceId, platform, app_version, package_name, apps_info, updateDate)
        } else {
          null
        }
      }).filter(j => {
        j != null
      })

      df.toDF
        .dropDuplicates()
        .coalesce(Integer.parseInt(coalesce)).write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(output)

      import mobvista.dmp.utils.clickhouse.ClickHouseSparkExt._
      val clusterName = Some(cluster): Option[String]

      implicit val clickhouseDataSource: ClickHouseDataSource = ClickHouseConnectionFactory.get(host)

      val tdf = spark.emptyDataFrame
      tdf.dropPartition(database, table, date, clusterName)

      val df_ck = df.map(r => {
        DeviceId(r.device_id.toUpperCase)
      }).toDF
        .dropDuplicates

      df_ck.createClickHouseDb(database, clusterName)
      df_ck.createClickHouseTable(database, table, Seq("dt"), JoypacResultEtl.indexColumn, Seq(), clusterName)
      df_ck.saveToClickHouse(database, table, Seq(updateDate), Seq("dt"), clusterName)
    } finally {
      sc.stop()
      spark.stop()
    }
    0
  }

}

object JoypacResultEtl {

  val indexColumn: Seq[String] = Seq("device_id")

  def check_deviceId(device_id: String): Boolean = {
    StringUtils.isNotBlank(device_id) && (device_id.matches(didPtn) && !device_id.equals(allZero) || device_id.matches(imeiPtn) || device_id.matches(andriodIdPtn))
  }

  def main(args: Array[String]): Unit = {
    new JoypacResultEtl().run(args)
  }
}