package mobvista.dmp.common

import java.net.URI
import java.util

import com.google.gson.{JsonArray, JsonElement, JsonObject}
import mobvista.dmp.datasource.mpsdk.InstallInfo
import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{Row, SparkSession}
import org.joda.time.format.DateTimeFormat

import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer

/**
  * 填数据合并到安装列表通用程序,
  * 记成该类后,需实现数据处理方法即可
  * fengliang
  */
abstract class CommonInstallListV3 extends CommonSparkJob with Serializable {

  protected val FIELD_SPLIT = "\t"
  protected var DAILY_STORE_FORMAT = "ORC"

  def run(args: Array[String]): Int = {
    var sc: SparkContext = null
    try {
      options.addOption("date", true, "[must] today")
      options.addOption("file", true, "[must] file")
      options.addOption("dailyFormat", true, "[must] dailyFormat")
      options.addOption("oldInput", true, "[must] yestoday install data path")
      val commandLine = commParser.parse(options, args)
      if (!checkMustOption(commandLine)) {
        printUsage(options)
        return 1
      } else {
        printOptions(commandLine)
      }

      val date = commandLine.getOptionValue("date")
      val file = commandLine.getOptionValue("file")
      val input = commandLine.getOptionValue("input")
      val output = commandLine.getOptionValue("output")
      val oldInput = commandLine.getOptionValue("oldInput")
      val dailyFormat = commandLine.getOptionValue("dailyFormat")
      val parallelism = commandLine.getOptionValue("parallelism").toInt
      val coalesce = commandLine.getOptionValue("coalesce").toInt

      val expireDate = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(date).minusMonths(12).toString("yyyy-MM-dd")

      val spark = SparkSession
        .builder()
        .appName("MPSDKTotal")
        .config("spark.rdd.compress", "true")
        .config("spark.default.parallelism", parallelism)
        .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .getOrCreate()

      //  任务重试过程中,路径已经存在造成无法写入的bug
      FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)

      //  FileSystem.get(spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)

      sc = spark.sparkContext

      val packageMap = sc.textFile(file).map(_.split(";")).map(r =>
        if (r(0).endsWith("://")) {
          (r(0).substring(0, r(0).length - 3), r(1))
        } else {
          (r(0), r(1))
        }
      ).collectAsMap()

      val bPackageMap = sc.broadcast(packageMap)

      val dailyData = if ("orc".equalsIgnoreCase(dailyFormat)) {
        spark.read.format(dailyFormat).load(input).rdd.map(convertDayRow)
      } else {
        spark.sparkContext.textFile(input).map(_.split("\t"))
      }

      //  val dailyDF = spark.read.format(DAILY_STORE_FORMAT).load(input)
      //  val dailyData = dailyDF.rdd.map(convertDayRow(_))
      val dailyRDD = dailyData.flatMap(processDailyData(_, date, bPackageMap))
        .filter(tuple => {
          val array = splitFun(tuple._1)
          array(0).matches(didPtn) && !allZero.equals(array(0)) || array(0).matches(imeiPtn)
        })
        .groupByKey()
        .map(tuple => {
          val set = new util.HashSet[InstallInfo]()
          tuple._2.foreach(info => {
            set.add(info)
          })
          (tuple._1, set)
        })

      val installRDD = sc.textFile(oldInput)
        .map(splitFun(_))
        .filter(array => (array(0).matches(didPtn) && !allZero.equals(array(0)) || array(0).matches(imeiPtn)))
        .map(splits => {
          (s"${splits(0)}$DATA_SPLIT${splits(1)}$DATA_SPLIT${splits(2)}", splits(3))
        })

      dailyRDD.fullOuterJoin(installRDD)
        .map(tuple => {
          val key = tuple._1
          val valTuple = tuple._2
          val dailyOpt = valTuple._1
          val totalOpt = valTuple._2

          var tags = ""
          if (dailyOpt == None && totalOpt != None) {
            // 需删除过期的安装信息
            val installArray = new JsonArray
            GsonUtil.String2JsonArray(totalOpt.get)
              .foreach(element => {
                val installDate = getJsonValue(element, "date")
                // 验证安装信息是否已超出期限
                if (StringUtils.isNotEmpty(installDate)) {
                  if (installDate.compareTo(expireDate) >= 0) {
                    installArray.add(element)
                  }
                } else {
                  installArray.add(element)
                }
              })
            tags = installArray.toString
          } else if (dailyOpt != None && totalOpt == None) {
            val jsonArray = new JsonArray
            dailyOpt.get.foreach(installInfo => {
              var installDate = installInfo.getDate()
              if (installDate.compareTo(date) > 0) {
                installDate = date
              }
              if (jsonArray.size() < 1000) {
                val json = new JsonObject
                json.addProperty("date", installDate)
                json.addProperty("package_name", installInfo.getPackage_name().replaceAll("[^0-9a-zA-Z\\.\\_]+", ""))
                jsonArray.add(json)
              }
            })
            tags = jsonArray.toString
          } else if (dailyOpt != None && totalOpt != None) {
            val map = new util.HashMap[String, String]()
            GsonUtil.String2JsonArray(totalOpt.get)
              .foreach(element => {
                val installPackage = getJsonValue(element, "package_name")
                val installDate = getJsonValue(element, "date")
                // 验证安装信息是否已超出期限
                if (StringUtils.isNotEmpty(installDate)) {
                  if (installDate.compareTo(expireDate) >= 0) {
                    map.put(installPackage, installDate)
                  }
                } else {
                  map.put(installPackage, installDate)
                }
              })
            dailyOpt.get
              .foreach(installInfo => {
                var installDate = installInfo.getDate()
                if (installDate.compareTo(date) > 0) {
                  installDate = date
                }

                val packageName = installInfo.getPackage_name().replaceAll("[^0-9a-zA-Z\\.\\_]+", "")
                val tmpDate = map.get(packageName)
                if (tmpDate != null) {
                  if (tmpDate.compareTo(installDate) < 0) {
                    map.put(packageName, installDate)
                  }
                } else {
                  map.put(packageName, installDate)
                }
              })

            val jsonArray = new JsonArray
            map.entrySet()
              .foreach(entry => {
                if (jsonArray.size() < 1000) {
                  val json = new JsonObject
                  if (StringUtils.isNotEmpty(entry.getValue)) {
                    json.addProperty("date", entry.getValue)
                  }
                  json.addProperty("package_name", entry.getKey)
                  jsonArray.add(json)
                }
              })
            tags = jsonArray.toString
          }

          s"${key}$DATA_SPLIT${tags}"
        }).repartition(coalesce)
        .saveAsTextFile(output, classOf[GzipCodec])

    } finally {
      if (sc != null) {
        sc.stop()
      }
    }
    0
  }

  def getJsonValue(element: JsonElement, key: String): String = {
    if (element != null && !element.isJsonNull) {
      element.getAsJsonObject.get(key).getAsString
    } else {
      ""
    }
  }

  /**
    * 将row转化为Array[String]
    *
    * @param row
    * @return
    */
  def convertDayRow(row: Row): Array[String] = {
    val buffer = new ArrayBuffer[String]()
    for (i <- 0 until row.size) {
      buffer += row.getString(i)
    }
    buffer.toArray
  }

  /**
    * 解析天处理结果数据
    *
    * @param array
    * @param date
    * @return
    */
  def processDailyData(array: Array[String], date: String, bPackageMap: Broadcast[scala.collection.Map[String, String]]): Array[(String, InstallInfo)]
}