package mobvista.dmp.datasource.adn_request_sdk

import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.datasource.adn.mapreduce.GetDevIdUtil
import mobvista.dmp.util.{DateUtil, MRUtils}
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}

import java.net.URI
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
 * @package: mobvista.dmp.datasource.adn_sdk
 * @author: wangjf
 * @date: 2020/4/2
 * @time: 8:34 下午
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
class AdnRequestSdkEtlDaily extends CommonSparkJob with java.io.Serializable {
  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("date", true, "date")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options.addOption("appIdMapping", true, "appIdMapping")
    options.addOption("manualAppIdMapping", true, "manualAppIdMapping")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val date = commandLine.getOptionValue("date")
    val output = commandLine.getOptionValue("output")
    val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))
    val appIdMapping = commandLine.getOptionValue("appIdMapping")
    val manualAppIdMapping = commandLine.getOptionValue("manualAppIdMapping")

    val spark = SparkSession
      .builder()
      .appName(s"AdnRequestSdkEtlDaily.$date")
      .config("spark.rdd.compress", "true")
      .config("spark.shuffle.compress", "true")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.io.compression.codec", "lz4")
      .config("spark.io.compression.lz4.blockSize", "64k")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    val sc = spark.sparkContext
    try {
      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

      val bmap = sc.broadcast(sc.textFile(appIdMapping).union(sc.textFile(manualAppIdMapping)).map(_.split(",")).filter(r => {
        StringUtils.isNotBlank(r(1)) && !"null".equals(r(1))
      }).map(r => {
        (r(0), r(1))
      }).collectAsMap())

      val year = date.substring(0, 4)
      val month = date.substring(4, 6)
      val day = date.substring(6, 8)

      val sql =
        s"""
           |SELECT gaid, idfa, imei, androidid, extsysid, platform, appid, devicemodel, devicebrand, countrycode, strategy, oaid, idfv, ruid, osversion, rg
           |  FROM dwh.etl_adn_org_request_daily_hours
           |  WHERE yt = '$year' AND mt = '$month' AND dt = '$day'
           |""".stripMargin

      val rdd = spark.sql(sql).coalesce(coalesce * 2).rdd.map(row => {
        val linesArr = new ArrayBuffer[Row]()
        val gaid = row.getAs[String]("gaid")
        val idfa = row.getAs[String]("idfa")
        val imei = row.getAs[String]("imei")
        val androidId = row.getAs[String]("androidid")
        val extSysid = row.getAs[String]("extsysid")
        val platform = row.getAs[String]("platform").toLowerCase
        val appId = row.getAs[String]("appid")
        val model = row.getAs[String]("devicemodel")
        val brand = row.getAs[String]("devicebrand")
        val country = row.getAs[String]("countrycode").toUpperCase
        val strategy = row.getAs[String]("strategy")
        val oaid = row.getAs[String]("oaid")
        val idfv = row.getAs[String]("idfv")
        val ruid = row.getAs[String]("ruid")
        val osVersion = row.getAs[String]("osversion")
        val region = row.getAs[String]("rg")

        //  新增自有Id
        val sysIdType = GetDevIdUtil.getExtSysId(extSysid)

        val sysId = if (StringUtils.isNotBlank(sysIdType)) {
          val idType = MRUtils.SPLITTER.split(sysIdType, -1)
          idType(0)
        } else {
          ""
        }

        platform match {
          case "ios" =>
            var dev_tag = 1
            if (StringUtils.isNotBlank(ruid) && ruid.length > 16) {
              linesArr += Row(ruid, "ruid", platform, appId, model, brand, osVersion, country, strategy, region, 1)
            }
            if (StringUtils.isNotBlank(idfa) && idfa.matches(mobvista.dmp.common.MobvistaConstant.didPtn) && !idfa.matches(mobvista.dmp.common.MobvistaConstant.allZero)) {
              linesArr += Row(idfa, "idfa", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag)
              if (StringUtils.isNotBlank(sysId)) {
                linesArr += Row(sysId, "sysid", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag)
              }
              dev_tag = 0
              if (StringUtils.isNotBlank(idfv) && idfv.matches(mobvista.dmp.common.MobvistaConstant.didPtn) && !idfv.matches(mobvista.dmp.common.MobvistaConstant.allZero)) {
                linesArr += Row(idfv, "idfv", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag)
              }
            } else {
              if (StringUtils.isNotBlank(sysId)) {
                linesArr += Row(sysId, "sysid", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag)
                if (StringUtils.isNotBlank(idfv) && idfv.matches(mobvista.dmp.common.MobvistaConstant.didPtn) && !idfv.matches(mobvista.dmp.common.MobvistaConstant.allZero)) {
                  linesArr += Row(idfv, "idfv", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag)
                }
              } else {
                if (StringUtils.isNotBlank(idfv) && idfv.matches(mobvista.dmp.common.MobvistaConstant.didPtn) && !idfv.matches(mobvista.dmp.common.MobvistaConstant.allZero)) {
                  linesArr += Row(idfv, "idfv", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag)
                }
              }
            }
          case "android" =>
            var dev_tag = 1
            if (StringUtils.isNotBlank(gaid) && gaid.matches(mobvista.dmp.common.MobvistaConstant.didPtn) && !gaid.matches(mobvista.dmp.common.MobvistaConstant.allZero)) {
              linesArr += Row(gaid, "gaid", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag)
              if (StringUtils.isNotBlank(oaid) && oaid.matches(mobvista.dmp.common.MobvistaConstant.didPtn) && !oaid.matches(mobvista.dmp.common.MobvistaConstant.allZero)) {
                linesArr += Row(oaid, "oaid", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag)
              }
              dev_tag = 0
              if (StringUtils.isNotBlank(sysId)) {
                linesArr += Row(sysId, "sysid", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag)
              }
              if (StringUtils.isNotBlank(imei) && imei.matches(mobvista.dmp.common.MobvistaConstant.imeiPtn)) {
                linesArr += Row(imei, "imei", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag)
              }
              if (StringUtils.isNotBlank(androidId) && androidId.matches(mobvista.dmp.common.MobvistaConstant.andriodIdPtn)) {
                linesArr += Row(androidId, "androidId", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag)
              }
            } else {
              if (StringUtils.isNotBlank(oaid) && oaid.matches(mobvista.dmp.common.MobvistaConstant.didPtn) && !oaid.matches(mobvista.dmp.common.MobvistaConstant.allZero)) {
                linesArr += Row(oaid, "oaid", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag)
              }
              if (StringUtils.isNotBlank(imei) && imei.matches(mobvista.dmp.common.MobvistaConstant.imeiPtn) && "android".equals(platform)) {
                if (dev_tag == 1) {
                  dev_tag = 0
                }
                linesArr += Row(imei, "imei", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag)
              }
              if (StringUtils.isNotBlank(androidId) && androidId.matches(mobvista.dmp.common.MobvistaConstant.andriodIdPtn) && "android".equals(platform)) {
                if (dev_tag == 1) {
                  dev_tag = 0
                }
                linesArr += Row(androidId, "androidId", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag)
              }
              if (StringUtils.isNotBlank(sysId)) {
                if (dev_tag == 1) {
                  dev_tag = 0
                }
                linesArr += Row(sysId, "sysid", platform, appId, model, brand, osVersion, country, strategy, region, dev_tag)
              }
            }
          case _ =>
        }
        linesArr
      }).flatMap(l => l)

      val getPkgName = udf((app_id: String) => {
        bmap.value.getOrElse(app_id, "")
      })

      val filterSet = udf((strategyArray: mutable.WrappedArray[String]) => {
        val strategySet = new mutable.HashSet[String]()
        strategyArray.foreach(strategy => {
          if (StringUtils.isNotBlank(strategy)) {
            strategySet.add(strategy.split(";")(0))
          }
        })
        strategySet.mkString(",")
      })

      val update_date = DateUtil.format(DateUtil.parse(date, "yyyyMMdd"), "yyyy-MM-dd")

      spark.createDataFrame(rdd, schema).groupBy("device_id", "device_type", "platform", "app_id")
        .agg(
          getPkgName(col("app_id")).alias("package_name"),
          lit(update_date).alias("update_date"),
          max("model").alias("model"),
          max("brand").alias("brand"),
          max("os_version").alias("os_version"),
          max("country").alias("country"),
          filterSet(collect_set("strategy")).alias("strategy"),
          concat_ws(",", collect_set("region")).alias("region"),
          max("dev_tag").alias("dev_tag")
        ).write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(output)

    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  val schema: StructType = StructType(Array(
    StructField("device_id", StringType),
    StructField("device_type", StringType),
    StructField("platform", StringType),
    StructField("app_id", StringType),
    StructField("model", StringType),
    StructField("brand", StringType),
    StructField("os_version", StringType),
    StructField("country", StringType),
    StructField("strategy", StringType),
    StructField("region", StringType),
    StructField("dev_tag", IntegerType)))
}

object AdnRequestSdkEtlDaily {
  def main(args: Array[String]): Unit = {
    new AdnRequestSdkEtlDaily().run(args)
  }
}