package mobvista.dmp.datasource.dm

import java.net.URI
import java.text.SimpleDateFormat
import java.util
import java.util.Date

import com.google.gson.{JsonArray, JsonObject}
import mobvista.dmp.common.CommonSparkJob
import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}

/**
  * @author wangjf
  */
class DmInterestAll extends CommonSparkJob with Serializable {
  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      printOptions(commandLine)
      return 1
    } else {
      printOptions(commandLine)
    }

    val date = commandLine.getOptionValue("date")
    val ga_date = commandLine.getOptionValue("ga_date")
    val output = commandLine.getOptionValue("output")
    val coalesce = commandLine.getOptionValue("coalesce")

    val spark = SparkSession.builder()
      .appName("DmInterestAll")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()
    val sc = spark.sparkContext

    FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

    try {

      spark.udf.register("check_deviceId", Constant.check_deviceId _)
      spark.udf.register("combineJson", combineJsonArray _)

      /*
      val interest_sql = Constant.interest_sql.replace("@date", date).replace("@ga_date", ga_date)
        .replace("@check_deviceId", "check_deviceId(device_id)").replace("@combineJson", "combineJson")
      */
      val interest_sql = Constant.old_interest_sql.replace("@date", date).replace("@ga_date", ga_date)
        .replace("@check_deviceId", "check_deviceId(device_id)").replace("@combineJson", "combineJson")

      val df = spark.sql(interest_sql).repartition(coalesce.toInt)

      df.toDF
        .write.mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(output)

    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }


  /**
    * @desc
    * UDF 合并 jsonArray 并,将 date 置为最新的日期,减少 shuffle 操作和磁盘 io
    * @param tags
    * jsonArray
    * @return
    */
  def combineJsonArray(tags: String): String = {
    val jsonArray = new JsonArray
    val map: java.util.Map[String, (Date, JsonObject)] = new util.HashMap[String, (Date, JsonObject)]()
    val sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
    tags.split(";").foreach(tag => {
      val jsonNode = GsonUtil.String2JsonArray(tag)
      for (i <- 0 until jsonNode.size) {
        val json = jsonNode.get(i).getAsJsonObject
        if (!json.has("package_name") || !json.has("date")) {
          jsonArray.add(json)
        } else if (map.keySet.contains(json.get("package_name").getAsString)) {
          if (map.get(json.get("package_name").getAsString)._1.before(sdf.parse(json.get("date").getAsString))) {
            map.put(json.get("package_name").getAsString, (sdf.parse(json.get("date").getAsString), json))
          }
        } else {
          map.put(json.get("package_name").getAsString, (sdf.parse(json.get("date").getAsString), json))
        }
      }
    })
    import scala.collection.JavaConversions._
    for (key <- map.keySet()) {
      jsonArray.add(map.get(key)._2)
    }
    jsonArray.toString
  }

  def toJsonArraySize(tags: String): Int = {
    GsonUtil.String2JsonArray(tags).size()
  }

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("date", true, "[must] date")
    options.addOption("ga_date", true, "[must] ga_date")
    options.addOption("output", true, "[must] output")
    options.addOption("coalesce", true, "[must] coalesce")
    options
  }
}

object DmInterestAll {
  def main(args: Array[String]): Unit = {
    new DmInterestAll().run(args)
  }
}