package mobvista.dmp.datasource.rtdmp

import com.alibaba.fastjson.{JSONArray, JSONObject}
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.rtdmp.Constant.AudienceInfoPre
import mobvista.dmp.util.{DateUtil, MD5Util}
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.slf4j.LoggerFactory

import java.net.URI
import java.util
import scala.collection.JavaConversions._
import scala.collection.mutable

/**
 * @package: mobvista.dmp.datasource.rtdmp
 * @author: wangjf
 * @date: 2020/7/13
 * @time: 11:25 上午
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
class RTDmpMainPre extends CommonSparkJob with Serializable {
  private val logger = LoggerFactory.getLogger(classOf[RTDmpMainPre])

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("time", true, "time")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options.addOption("data_utime", true, "data_utime")
    options
  }

  override protected def run(args: Array[String]): Int = {

    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val time = commandLine.getOptionValue("time").replace(".", " ")
    val output = commandLine.getOptionValue("output")
    val coalesce = commandLine.getOptionValue("coalesce")
    val data_utime = commandLine.getOptionValue("data_utime").replace(".", " ")

    val spark: SparkSession = SparkSession.builder()
      .appName(s"RTDmpMainPre.$time")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()

    val sc = spark.sparkContext
    try {

      var mergeRDD = sc.emptyRDD[(String, Int)]

      //  默认计算上个小时的数据
      val update_time_start = DateUtil.format(time + ":00:00", "yyyy-MM-dd HH:mm:ss")
      val update_time_end = DateUtil.format(time + ":59:59", "yyyy-MM-dd HH:mm:ss")

      val audience_date_utime_start = DateUtil.parse(data_utime, "yyyy-MM-dd HH").getTime / 1000 - 28800
      val audience_date_utime_end = DateUtil.parse(data_utime, "yyyy-MM-dd HH").getTime / 1000 - 25200

      val map: util.Map[Integer, (JSONArray, Integer, Integer, JSONObject)] =
        ServerUtil.request(update_time_start, update_time_end, audience_date_utime_start, audience_date_utime_end, 0, 1, 2)

      map.foreach(t => {
        val audienceId = Integer2int(t._1)
        val audienceOp = t._2._2
        val dmap = new mutable.HashMap[String, String]()
        t._2._1.foreach(json => {
          val jsonObject = json.asInstanceOf[JSONObject]
          if (jsonObject.containsKey("s3_path") && StringUtils.isNotBlank(jsonObject.getString("s3_path"))) {
            //  (s3_path, update_date)
            dmap.put(jsonObject.getString("s3_path"), jsonObject.getString("update_time"))
          }
        })

        //  判断所有 s3_path 对应的 update_date 中是否有 当前 update_date,进行有效过滤
        //  if (dmap.values.contains(time)) {
        /**
         * audienceOp == 0 and dmap.size >= 2 即做差集计算,表示在上个分区出现,这个分区不出现的设备对应的安装包置为 -1 * audienceId,
         * 用于下游删除;否则与前一个人群包进行合并操作
         */
        val updateRDD = if (audienceOp == 1 && dmap.size >= 2) {
          val list = dmap.toList.sortWith(_._2 > _._2).take(1) //  按 update_date 进行降序排序,提取最前面的两个人群包
          val pathUri = new URI(list.get(0)._1)
          val newAudience = if (FileSystem.get(new URI(s"${pathUri.getScheme}://${pathUri.getHost}"), sc.hadoopConfiguration)
            .exists(new Path(pathUri.toString.replace("*", "")))) {
            sc.textFile(list.get(0)._1).repartition(100)
              .filter(r => {
                r.length <= 64
              })
              .map(r => { //  First 为最新的人群包
                val device_id =
                  if (r.matches(MobvistaConstant.md5Ptn)) {
                    r
                  } else {
                    MD5Util.getMD5Str(r)
                  }
                (device_id, audienceId)
              })
          } else {
            sc.emptyRDD[(String, Int)]
          }
          /*
          val oldAudience = sc.textFile(list.get(1)._1).repartition(100).map(r => { //  Second 为旧的人群包,表示上一版本人群包
            val device_id =
              if (r.matches(MobvistaConstant.md5Ptn)) {
                r
              } else {
                MD5Util.getMD5Str(r)
              }
            (device_id, (audienceId, list.get(1)._2))
          })
          oldAudience.subtractByKey(newAudience).map(t => {
            //  对差集 audienceId * -1,用于下游计算删除 audienceId
            val device_id =
              if (t._1.matches(MobvistaConstant.md5Ptn)) {
                t._1
              } else {
                MD5Util.getMD5Str(t._1)
              }
            (device_id, ((-1) * audienceId, t._2._2))
            //  (devId, ((-1) * audienceId, update_date))
          }).union(newAudience) //  与最新的人群包进行合并操作
          */
          newAudience
        } else if (dmap.size == 1) {
          val audData = dmap.toList.sortWith(_._2 > _._2).take(1)
          val pathUri = new URI(audData.get(0)._1)
          if (audData.nonEmpty && FileSystem.get(new URI(s"${pathUri.getScheme}://${pathUri.getHost}"), sc.hadoopConfiguration)
            .exists(new Path(audData.get(0)._1.replace("*", "")))) {
            sc.textFile(audData.get(0)._1)
              .filter(r => {
                r.length <= 64
              })
              .map(r => { //  取出最新的人群包
                val device_id =
                  if (r.matches(MobvistaConstant.md5Ptn)) {
                    r
                  } else {
                    MD5Util.getMD5Str(r)
                  }
                (device_id, audienceId)
              })
          } else { //  如果没有,则创建 空RDD
            sc.emptyRDD[(String, Int)]
          }
        } else {
          sc.emptyRDD[(String, Int)]
        }
        //  所有人群包进行合并操作
        mergeRDD = mergeRDD.union(updateRDD)
      })

      val df = mergeRDD.repartition(1000).groupByKey().map(r => {
        val devId = r._1
        val set = new mutable.HashSet[Int]()
        //  生成 audienceId -> update_date JSONObject
        r._2.foreach(t => {
          set.add(t)
        })
        (devId, set)
      })

      import spark.implicits._

      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
      df.map(r => {
        AudienceInfoPre(r._1, r._2.mkString(","))
      }).repartition(coalesce.toInt)
        .toDF
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(output)

    } finally {
      if (sc != null) {
        sc.stop()
      }
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }
}

object RTDmpMainPre {
  def main(args: Array[String]): Unit = {
    new RTDmpMainPre().run(args)
  }
}