package mobvista.dmp.datasource.rtdmp

import com.alibaba.fastjson.{JSONArray, JSONObject}
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.format.RDDMultipleOutputFormat
import mobvista.dmp.util.DateUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
import org.apache.spark.storage.StorageLevel

import java.net.URI
import scala.collection.JavaConverters.asScalaSetConverter
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
 * @package: mobvista.dmp.datasource.rtdmp
 * @author: wangjf
 * @date: 2020/7/13
 * @time: 11:25 上午
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
class RTDmpRequestDaily extends CommonSparkJob with Serializable {

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("date", true, "date")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options.addOption("hh", true, "hh")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val date = commandLine.getOptionValue("date")
    val output = commandLine.getOptionValue("output")
    val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))
    val spark = MobvistaConstant.createSparkSession(s"RTDmpRequestDaily.$date")

    val sc = spark.sparkContext
    try {

      val conf = spark.sparkContext.hadoopConfiguration
      conf.set("mapreduce.output.compress", "true")
      conf.set("mapreduce.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
      conf.setBoolean("mapreduce.output.fileoutputformat.compress", true)
      conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
      conf.setClass("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec], classOf[CompressionCodec])

      spark.udf.register("changeDeviceId", Logic.changeDeviceId _)
      spark.udf.register("changeDeviceType", Logic.changeDeviceType _)

      val sql =
        s"""
           |SELECT changeDeviceId(device_type,device_id) device_id, changeDeviceType(device_type) device_type, install_list
           |  FROM dwh.dmp_install_list WHERE dt = '${date}' AND business = '14days'
           |""".stripMargin

      //  默认计算上个小时的数据
      val update_time_start = DateUtil.format(date + " 00:00:00", "yyyy-MM-dd HH:mm:ss")
      val update_time_end = DateUtil.format(date + " 23:59:59", "yyyy-MM-dd HH:mm:ss")

      val audience_date_utime_start = DateUtil.parse(date + " 00:00:00", "yyyy-MM-dd HH:mm:ss").getTime / 1000 - 28800
      val audience_date_utime_end = DateUtil.parse(date + " 23:59:59", "yyyy-MM-dd HH:mm:ss").getTime / 1000 - 28800

      val json: JSONObject =
        RTDmpServer.query(update_time_start, update_time_end, 0,
          2, audience_date_utime_start, audience_date_utime_end, "6")

      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

      val map = new mutable.HashMap[Integer, Array[String]]()
      for (id <- json.keySet().toArray()) {
        val pkgSet = json.getJSONObject(String.valueOf(id)).getString("installed_package_name").split(",", -1)
        map.put(Integer.parseInt(String.valueOf(id)), pkgSet)
      }

      val rdd = spark.sql(sql).rdd.map(row => {
        val deviceId = row.getAs[String]("device_id")
        val deviceType = row.getAs[String]("device_type")
        val install_list = MobvistaConstant.String2JSONObject(row.getAs[String]("install_list")).keySet().asScala
        val array = new ArrayBuffer[(String, String, Integer)]()
        for (key <- map.keySet) {
          if (map(key).toSet.intersect(install_list).size > 0) {
            array += ((deviceId, deviceType, key))
          }
        }
        array
      }).flatMap(l => l).persist(StorageLevel.MEMORY_AND_DISK_SER)

      rdd.repartition(coalesce).map(t => {
        (new Text(s"$output/${t._3}/${t._2}"), new Text(t._1))
      }).saveAsNewAPIHadoopFile(output, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]], conf)

      val mRdd = rdd.map(r => {
        (r._3, r._2)
      }).distinct(10)
        .cache()
      val pkgTypeCount = mRdd.countByKey()

      val portalMap = mRdd.collectAsMap()

      portalMap.foreach(m => {
        val updateJsonArray = new JSONArray()
        val jsonObject = new JSONObject()
        jsonObject.put("id", m._1)
        if (pkgTypeCount(m._1) == 1) {
          jsonObject.put("s3_path", s"$output/${m._1}/${m._2}/")
        } else {
          jsonObject.put("s3_path", s"$output/${m._1}/*/")
        }
        updateJsonArray.add(jsonObject)
        ServerUtil.update(updateJsonArray)
      })

    } finally {
      if (sc != null) {
        sc.stop()
      }
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }
}

object RTDmpRequestDaily {
  def main(args: Array[String]): Unit = {
    new RTDmpRequestDaily().run(args)
  }
}