package mobvista.dmp.datasource.adn

import java.net.URI
import java.util

import com.google.common.base.Strings
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.util.MRUtils
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}

import scala.collection.mutable.ArrayBuffer

/**
  * @package: mobvista.dmp.datasource.adn
  * @author: wangjf
  * @date: 2019-10-29
  * @time: 11:05
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
class AdnRequestSdkHour extends CommonSparkJob with Serializable {

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("input", true, "input")
    options.addOption("input_dict1", true, "input_dict1")
    options.addOption("input_dict2", true, "input_dict2")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options.addOption("date", true, "date")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val imeiPtn = "^([0-9]{15,17})$"

    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)

    val coalesce = commandLine.getOptionValue("coalesce")
    val input = commandLine.getOptionValue("input")
    val input_dict1 = commandLine.getOptionValue("input_dict1")
    val input_dict2 = commandLine.getOptionValue("input_dict2")
    val output = commandLine.getOptionValue("output")

    val spark = SparkSession
      .builder()
      .config("spark.rdd.compress", "true")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    try {
      val sc = spark.sparkContext

      FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)

      val bMap = sc.broadcast(sc.textFile(input_dict1).union(sc.textFile(input_dict2)).map(r => {
        val rs = r.split(",")
        (rs(0), rs(1))
      }).collectAsMap())

      //  val bMap = sc.broadcast()
      val rdd = sc.textFile(input)
      val kv_rdd = rdd.map(l => {
        var res = new ArrayBuffer[String]()
        val array = MRUtils.SPLITTER.split(l)
        if (array.length >= 54) {
          val gaid = array(42)
          val idfa = array(43)
          val imei = array(35)
          val model = array(16)
          val osVersion = array(14)
          val androidId = array(37)
          val platform = array(13)
          val appId = array(4)
          var extSysid = ""
          if (array.length >= 118) extSysid = array(117)

          //  split bug,末尾填充特殊字符
          val value = MRUtils.JOINER.join(appId, model, osVersion, "A")
          var key = ""
          import mobvista.dmp.datasource.adn.mapreduce.GetDevIdUtil
          //新增自有Id
          val sysIdType = GetDevIdUtil.getExtSysId(extSysid)
          if (StringUtils.isNotBlank(sysIdType)) {
            key = MRUtils.JOINER.join(sysIdType, platform.toLowerCase)
            res += key + ";" + value
          }

          val devIdType = GetDevIdUtil.getIdByIdAndPlatform(gaid, idfa, extSysid, platform)
          if (StringUtils.isNotBlank(devIdType)) {
            key = MRUtils.JOINER.join(devIdType, platform.toLowerCase)
            res += key + ";" + value
          }

          if (!imei.isEmpty && imei.matches(imeiPtn) && "android" == platform) {
            key = MRUtils.JOINER.join(imei, "imei", platform.toLowerCase)
            res += key + ";" + value
          }
        }
        res.iterator
      }).coalesce(coalesce.toInt)

      /*
        map(r => {
          val cur = r.split(";")
          (cur(0), cur(1))
        })
      */
      import spark.implicits._
      val df = kv_rdd
        .flatMap(l => l)
        .mapPartitions(mapFun)
        .combineByKey(
          (v: String) => Iterable(v),
          (c: Iterable[String], v: String) => c ++ Seq(v),
          (c1: Iterable[String], c2: Iterable[String]) => c1 ++ c2
        ).filter(r => {
        val s = MRUtils.SPLITTER.split(r._1)
        s.length == 3 && StringUtils.isNotBlank(s(0)) && StringUtils.isNotBlank(s(1)) && StringUtils.isNotBlank(s(2))
      }).map(r => {
        val res = new ArrayBuffer[AdnRequestSdkClass]()
        val key = r._1
        val keys = MRUtils.SPLITTER.split(key)
        val device_id = keys(0)
        val device_type = keys(1)
        val platform = keys(2)
        var model = ""
        var osVersion = ""
        var pkgName = ""
        val values = r._2.iterator

        while (values.hasNext) {
          val vs = MRUtils.SPLITTER.split(values.next())
          if (StringUtils.isBlank(model) && vs.length >= 3) {
            model = vs(1)
          }
          if (StringUtils.isBlank(osVersion) && vs.length >= 3) {
            osVersion = vs(2)
          }
          pkgName = Strings.nullToEmpty(bMap.value.getOrElse(vs(0), ""))
          res += AdnRequestSdkClass(device_id, device_type, platform, vs(0), pkgName, model, osVersion)
        }
        res
      }).flatMap(r => r)
        .toDF

      df.dropDuplicates
        .coalesce(coalesce.toInt / 10)
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(output)

    } finally {
      if (spark != null) {
        spark.stop()
      }
    }

    0
  }

  def mapFun(iter: Iterator[String]): Iterator[(String, String)] = {
    import scala.collection.JavaConverters._
    val res = new util.ArrayList[(String, String)]()
    while (iter.hasNext) {
      val cur = iter.next().split(";")
      res.add((cur(0), cur(1)))
    }
    res.asScala.iterator
  }
}

object AdnRequestSdkHour {
  def main(args: Array[String]): Unit = {
    new AdnRequestSdkHour().run(args)
  }
}