package mobvista.dmp.common

import java.net.URI
import java.util

import com.alibaba.fastjson.JSONObject
import com.google.gson.{JsonArray, JsonElement, JsonObject}
import mobvista.dmp.datasource.mpsdk.InstallInfo
import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.SparkContext
import org.apache.spark.sql.{Row, SparkSession}
import org.joda.time.format.DateTimeFormat

import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer

/**
  * 填数据合并到安装列表通用程序,
  * 记成该类后,需实现数据处理方法即可
  * fengliang
  */
abstract class CommonInstallListV2 extends CommonSparkJob with Serializable {

  protected val FIELD_SPLIT = "\t"
  protected var DAILY_STORE_FORMAT = "ORC"

  def run(args: Array[String]): Int = {
    var sc: SparkContext = null
    try {
      options.addOption("date", true, "[must] today")
      options.addOption("oldInput", true, "[must] yestoday install data path")
      val commandLine = commParser.parse(options, args)
      if (!checkMustOption(commandLine)) {
        printUsage(options)
        return 1
      } else {
        printOptions(commandLine)
      }


      val date = commandLine.getOptionValue("date")
      val input = commandLine.getOptionValue("input")
      val output = commandLine.getOptionValue("output")
      val oldInput = commandLine.getOptionValue("oldInput")
      val parallelism = commandLine.getOptionValue("parallelism").toInt
      val coalesce = commandLine.getOptionValue("coalesce").toInt

      val expireDate = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(date).minusMonths(12).toString("yyyy-MM-dd")

      val spark = SparkSession
        .builder()
        .appName("MPSDKTotal")
        .config("spark.rdd.compress", "true")
        .config("spark.default.parallelism", parallelism)
        .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .getOrCreate()

      //  任务重试过程中,路径已经存在造成无法写入的bug
      FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)

      //  FileSystem.get(spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)

      val dailyDF = spark.read.format(DAILY_STORE_FORMAT).load(input)
      //  val dailyData = dailyDF.rdd.map(convertDayRow(_))
      val dailyRDD = dailyDF.rdd.mapPartitions(processDailyData(_, date)).filter(tuple => {
        val array = splitFun(tuple._1)
        array(0).matches(didPtn) && !allZero.equals(array(0)) || array(0).matches(imeiPtn)
      })
        .groupByKey()
        .map(tuple => {
          val set = new util.HashSet[InstallInfo]()
          tuple._2.foreach(info => {
            set.add(info)
          })
          (tuple._1, set)
        })


      sc = spark.sparkContext
      val installRDD = sc.textFile(oldInput)
        .map(splitFun(_))
        .filter(array => (array(0).matches(didPtn) && !allZero.equals(array(0)) || array(0).matches(imeiPtn)))
        .map(splits => {
          (s"${splits(0)}$DATA_SPLIT${splits(1)}$DATA_SPLIT${splits(2)}", splits(3))
        })


      dailyRDD.cogroup(installRDD).map(
        tuple => {
          val key = tuple._1
          val valTuple = tuple._2
          val dailyIter = valTuple._1
          val totalIter = valTuple._2
          var tags = ""
          if (dailyIter.isEmpty && totalIter.nonEmpty) {
            //  需删除过期的安装信息
            val installArray = new JsonArray
            totalIter.foreach(str => {
              GsonUtil.String2JsonArray(str).foreach(element => {
                val installDate = getJsonValue(element, "date")
                //  验证安装信息是否已超出期限
                if (StringUtils.isNotBlank(installDate)) {
                  if (installDate.compareTo(expireDate) >= 0) {
                    installArray.add(element)
                  }
                } else {
                  installArray.add(element)
                }
              })
            })
            tags = installArray.toString
          } else if (dailyIter.nonEmpty && totalIter.isEmpty) {
            val jsonArray = new JsonArray
            dailyIter.foreach(installSet => {
              installSet.foreach(installInfo => {
                var installDate = installInfo.getDate()
                if (installDate.compareTo(date) > 0) {
                  installDate = date
                }
                if (jsonArray.size() < 1000) {
                  val json = new JsonObject
                  json.addProperty("date", installDate)
                  json.addProperty("package_name", installInfo.getPackage_name().replaceAll("[^0-9a-zA-Z\\.\\_]+", ""))
                  jsonArray.add(json)
                }
              })
            })
            tags = jsonArray.toString
          } else if (dailyIter.nonEmpty && totalIter.nonEmpty) {
            val map = new JSONObject()
            totalIter.foreach(str => {
              GsonUtil.String2JsonArray(str).foreach(element => {
                val installDate = getJsonValue(element, "date")
                val installPackage = getJsonValue(element, "package_name")
                //  验证安装信息是否已超出期限
                if (StringUtils.isNotBlank(installDate)) {
                  if (installDate.compareTo(expireDate) >= 0) {
                    map.put(installPackage, installDate)
                  }
                } else {
                  map.put(installPackage, installDate)
                }
              })
            })

            dailyIter.foreach(installSet => {
              installSet.foreach(installInfo => {
                var installDate = installInfo.getDate()
                if (installDate.compareTo(date) > 0) {
                  installDate = date
                }
                val packageName = installInfo.getPackage_name().replaceAll("[^0-9a-zA-Z\\.\\_]+", "")
                val tmpDate = map.get(packageName).toString
                if (StringUtils.isNotBlank(tmpDate)) {
                  if (tmpDate.compareTo(installDate) < 0) {
                    map.put(packageName, installDate)
                  }
                } else {
                  map.put(packageName, installDate)
                }
              })
            })
            val jsonArray = new JsonArray
            map.entrySet().foreach(entry => {
              if (jsonArray.size() < 1000) {
                val json = new JsonObject
                if (StringUtils.isNotEmpty(entry.getValue.toString)) {
                  json.addProperty("date", entry.getValue.toString)
                }
                json.addProperty("package_name", entry.getKey)
                jsonArray.add(json)
              }
            })
            tags = jsonArray.toString
          }
          s"${key}${DATA_SPLIT}${tags}"
        }).repartition(coalesce).saveAsTextFile(output, classOf[GzipCodec])
    } finally {
      if (sc != null) {
        sc.stop()
      }
    }
    0
  }

  def getJsonValue(element: JsonElement, key: String): String = {
    if (element != null && !element.isJsonNull) {
      element.getAsJsonObject.get(key).getAsString
    } else {
      ""
    }
  }

  /**
    * 将row转化为Array[String]
    *
    * @param row
    * @return
    */
  def convertDayRow(row: Row): Array[String] = {
    val buffer = new ArrayBuffer[String]()
    for (i <- 0 until row.size) {
      buffer += row.getString(i)
    }
    buffer.toArray
  }

  /**
    * 解析天处理结果数据
    *
    * @param array
    * @param date
    * @return
    */
  def processDailyData(array: Iterator[Row], date: String): Iterator[Tuple2[String, InstallInfo]]
}

//  case class InstallListVO(key: String, package_name: String, date: String)