package mobvista.dmp.common

import com.alibaba.fastjson.JSON
import mobvista.dmp.common.MobvistaConstant.deviceTypeSet
import mobvista.dmp.util.DateUtil
import org.apache.commons.cli.Options
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.joda.time.format.DateTimeFormat

import java.net.URI
import java.util
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._

/**
 * 填数据合并到安装列表通用程序,
 * 记成该类后,需实现数据处理方法即可
 * fengliang
 */
abstract class CommonInstallListOrc extends CommonSparkJob with Serializable {

  override protected def run(args: Array[String]): Int = {

    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return 1
    } else {
      printOptions(commandLine)
    }

    val date = commandLine.getOptionValue("date")
    val business = commandLine.getOptionValue("business")
    val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))
    val output = commandLine.getOptionValue("output")

    val dateTime = DateUtil.format(DateUtil.parse(date, "yyyyMMdd"), "yyyy-MM-dd")
    val expireDate = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(dateTime).minusMonths(12).toString("yyyy-MM-dd")

    val spark = MobvistaConstant.createSparkSession(s"DmpInstallList.$business.$date")

    val v2_flag = if (business.equals("adn_sdk_v2")) {
      true
    } else {
      false
    }

    try {
      spark.udf.register("check_device", MobvistaConstant.checkDeviceId _)
      spark.udf.register("filter_pkg", MobvistaConstant.filter_pkg _)
      spark.udf.register("get_filter_pkg", MobvistaConstant.get_filter_pkg _)

      val sc = spark.sparkContext
      //  任务重试过程中,路径已经存在造成无法写入的bug
      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

      val dailyRDD = processDailyData(business, date, spark).rdd.map(row => {
        val device_id = row.getAs("device_id").toString
        val device_type = if (row.getAs("device_type") != null && StringUtils.isNotBlank(row.getAs("device_type"))) {
          row.getAs("device_type").toString
        } else {
          ""
        }
        val platform = if (row.getAs("platform") != null && StringUtils.isNotBlank(row.getAs("platform"))) {
          row.getAs("platform").toString
        } else {
          ""
        }
        var country = row.getAs[String]("country").toUpperCase
        if (!country.matches(MobvistaConstant.countryPtn)) {
          country = ""
        }
        val ext_data = row.getAs[String]("ext_data")
        val install_list = row.getAs("install_list").toString
        ((device_id, device_type, platform), (install_list, ext_data, country))
      })

      val last_date = DateUtil.format(DateUtil.getDay(date, "yyyyMMdd", -1), "yyyyMMdd")

      val install_list_sql = MobvistaConstant.dmp_install_list_sql.replace("@dt", last_date)
        .replace("@business", business).replace("@expireDate", expireDate)

      //  .mapPartitions(v => new CustomInteratorAll(v))
      val installRDD = spark.sql(install_list_sql)
        .rdd
        .map(row => {
          val device_id = row.getAs("device_id").toString
          val device_type = row.getAs("device_type").toString
          val platform = row.getAs("platform").toString
          val country = if (row.getAs("country") != null && StringUtils.isNotBlank(row.getAs[String]("country"))
            && row.getAs[String]("country").toUpperCase.matches(MobvistaConstant.countryPtn)) {
            row.getAs("country").toString.toUpperCase
          } else {
            ""
          }
          val install_list = row.getAs("install_list").toString
          val ext_data = row.getAs("ext_data").toString
          val update_date = row.getAs("update_date").toString
          //  new table
          ((device_id, device_type, platform), (install_list, ext_data, update_date, country))
        })

      import spark.implicits._

      //  .mapPartitions(v => new CustomInteratorList(dateTime, v))
      val df = if (business.equalsIgnoreCase("dsp_req_unmatch")) {
        dailyRDD.map(tp => {
          val dailyK = tp._1
          //  val keys = MRUtils.SPLITTER.split(tp._1)
          val pkgs = tp._2._1
          val ext_data = tp._2._2
          val country = tp._2._3
          val updateDate = dateTime
          var deviceType = dailyK._2
          if (deviceType.equalsIgnoreCase("android_id") || deviceType.equalsIgnoreCase("androidid")) {
            deviceType = "androidid"
          }
          var platform = dailyK._3
          if (platform.equalsIgnoreCase("android") || platform.equalsIgnoreCase("android2") || platform.equalsIgnoreCase("adr")) {
            platform = "android"
          } else if (platform.equalsIgnoreCase("ios") || platform.equalsIgnoreCase("ios2")) {
            platform = "ios"
          }
          DmpInstallList(dailyK._1, deviceType, platform, country, pkgs, ext_data, updateDate)
        })
      } else {
        dailyRDD.fullOuterJoin(installRDD)
          .map(tuple => {
            val key = tuple._1
            val valTuple = tuple._2
            val dailyOpt = valTuple._1
            val totalOpt = valTuple._2
            var pkgs = ""
            var ext_data = ""
            var country = ""
            var updateDate = ""
            if (dailyOpt.isEmpty && totalOpt.isDefined) {
              val total = totalOpt.get
              //  val installListDate = MRUtils.SPLITTER.split(totalOpt.get, -1)
              pkgs = total._1
              val old_ext_data_json = JSON.parseObject(total._2)
              val region_list = if (old_ext_data_json.containsKey("region")) {
                JSON.parseArray(old_ext_data_json.getString("region"), classOf[String])
              } else {
                new util.ArrayList[String]()
              }
              old_ext_data_json.put("region", new util.HashSet(region_list))
              ext_data = old_ext_data_json.toJSONString
              updateDate = total._3
              country = total._4
            } else if (dailyOpt.isDefined && totalOpt.isEmpty) {
              pkgs = dailyOpt.get._1
              ext_data = dailyOpt.get._2
              country = dailyOpt.get._3
              updateDate = dateTime
            } else if (dailyOpt.isDefined && totalOpt.isDefined) {
              //  优先加入最近活跃的 package_name
              val installJson = JSON.parseObject(dailyOpt.get._1).asInstanceOf[java.util.Map[String, String]]
              //  删除过期的安装信息,并对安装时间进行排序,优先入库最近活跃的安装包
              //  val array = MRUtils.SPLITTER.split(totalOpt.get, -1)
              val total = totalOpt.get
              val installMap = JSON.parseObject(total._1).asInstanceOf[java.util.Map[String, String]].asScala.retain((k, _) => !installJson.contains(k))
              if (v2_flag) {
                //  adn 上报业务  新增安装包处理逻辑;该逻辑不会影响其他业务线数据处理logic
                installJson.keySet().foreach(k => {
                  if (StringUtils.isNotBlank(k)) {
                    if (k.endsWith(".notinstall")) {
                      //  去掉 包名.delete 和 包名
                      val packageNameDel1 = k.replace(".notinstall", "") //  包名
                      val packageNameDel2 = packageNameDel1 + ".delete" //  包名.delete
                      installMap.remove(packageNameDel1)
                      installMap.remove(packageNameDel2)
                    } else if (k.endsWith(".delete")) {
                      //  去掉 包名.notinstall 和 包名
                      val packageNameDel1 = k.replace(".delete", "") //  包名
                      val packageNameDel2 = packageNameDel1 + ".notinstall" //  包名.notinstall
                      installMap.remove(packageNameDel1)
                      installMap.remove(packageNameDel2)
                    } else {
                      //  不以 .notinstall .delete 为后缀,要去掉 包名.notinstall 和 包名.delete
                      val packageNameDel1 = k + ".notinstall" //  包名.notinstall
                      val packageNameDel2 = k + ".delete" //  包名.delete
                      installMap.remove(packageNameDel1)
                      installMap.remove(packageNameDel2)
                    }
                  }
                })
              }
              if (installMap.size + installJson.size() > 1000) {
                installMap.toList.sortWith(_._2 > _._2).foreach(kv => {
                  if (installJson.size() < 1000) {
                    installJson.put(kv._1, kv._2)
                  }
                })
              } else {
                installJson.putAll(installMap.asJava)
              }
              pkgs = installJson.toString
              val old_ext_data_json = JSON.parseObject(total._2)
              val daily_ext_data_json = JSON.parseObject(dailyOpt.get._2)
              if (daily_ext_data_json.containsKey("dev_tag") && daily_ext_data_json.getInteger("dev_tag") == 1) {
                old_ext_data_json.put("dev_tag", daily_ext_data_json.getInteger("dev_tag"))
              }

              val ext_data_list = if (old_ext_data_json.containsKey("strategy")) {
                JSON.parseArray(old_ext_data_json.getString("strategy"), classOf[String])
              } else {
                new util.ArrayList[String]()
              }
              if (daily_ext_data_json.containsKey("strategy")) {
                ext_data_list.addAll(JSON.parseArray(daily_ext_data_json.getString("strategy"), classOf[String]))
                old_ext_data_json.put("strategy", new util.HashSet(ext_data_list))
              }
              val region_list = if (old_ext_data_json.containsKey("region")) {
                JSON.parseArray(old_ext_data_json.getString("region"), classOf[String])
              } else {
                new util.ArrayList[String]()
              }
              if (daily_ext_data_json.containsKey("region")) {
                region_list.addAll(JSON.parseArray(daily_ext_data_json.getString("region"), classOf[String]))
                old_ext_data_json.put("region", new util.HashSet(region_list))
              }
              ext_data = old_ext_data_json.toJSONString

              country = if (StringUtils.isNotBlank(dailyOpt.get._3)) {
                dailyOpt.get._3
              } else {
                total._4
              }
              updateDate = dateTime
            }
            var deviceType = key._2
            if (deviceType.equalsIgnoreCase("android_id") || deviceType.equalsIgnoreCase("androidid")) {
              deviceType = "androidid"
            }
            var platform = key._3
            if (platform.equalsIgnoreCase("android") || platform.equalsIgnoreCase("android2") || platform.equalsIgnoreCase("adr")) {
              platform = "android"
            } else if (platform.equalsIgnoreCase("ios") || platform.equalsIgnoreCase("ios2")) {
              platform = "ios"
            }
            DmpInstallList(key._1, deviceType, platform, country, pkgs, ext_data, updateDate)
          })
      }

      df.filter(r => {
        deviceTypeSet.contains(r.device_type)
      }).toDF
        .repartition(coalesce)
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(output)
    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("date", true, "[must] date")
    options.addOption("business", true, "[must] business")
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("output", true, "[must] output path")
    options
  }

  /**
   * 解析天处理结果数据
   *
   * @param business
   * 分区
   * @param date
   * 日期
   * @return
   */
  def processDailyData(business: String, date: String, spark: SparkSession): DataFrame
}