package mobvista.dmp.datasource.rtdmp

import com.alibaba.fastjson.{JSONArray, JSONObject}
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.format.RDDMultipleOutputFormat
import mobvista.dmp.util.{DateUtil, PropertyUtil}
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
import org.apache.spark.storage.StorageLevel

import java.net.URI
import scala.collection.mutable

/**
 * @package: mobvista.dmp.datasource.rtdmp
 * @author: wangjf
 * @date: 2020/7/13
 * @time: 11:25 上午
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
class RTDmpRequest extends CommonSparkJob with Serializable {

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("date", true, "date")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options.addOption("table", true, "table")
    options.addOption("business", true, "business")
    options.addOption("hh", true, "hh")
    options
  }

  val package_name_set = new mutable.HashSet[String]()

  def check_package(package_name: String): Boolean = {
    package_name_set.contains(package_name)
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val date = commandLine.getOptionValue("date")
    val output = commandLine.getOptionValue("output")
    val business = commandLine.getOptionValue("business")
    val table = commandLine.getOptionValue("table")
    val hh = commandLine.getOptionValue("hh")
    val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))
    val spark = MobvistaConstant.createSparkSession(s"RTDmpRequest.$date.$business")

    val update_date = DateUtil.format(DateUtil.parse(date, "yyyyMMdd"), "yyyy-MM-dd")

    val sc = spark.sparkContext
    try {

      val conf = spark.sparkContext.hadoopConfiguration
      conf.set("mapreduce.output.compress", "true")
      conf.set("mapreduce.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
      conf.setBoolean("mapreduce.output.fileoutputformat.compress", true)
      conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
      conf.setClass("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec], classOf[CompressionCodec])

      spark.udf.register("check_package", check_package _)

      val auto_business = PropertyUtil.getProperty("config.properties", s"rtdmp.auto.business").split(",", -1).toSet[String]

      //  val noauto_business = PropertyUtil.getProperty("config.properties", s"rtdmp.noauto.business").split(",", -1).toSet[String]

      var device_sql = if (auto_business.contains(business)) {
        Constant.device_sql
          .replace("@dt", date)
          .replace("@business", business)
          .replace("@check_package", "")
          .replace("@table", table)
      } else if (business.equals("tencent")) {
        PropertyUtil.getProperty("config.properties", s"$business.package_name").split(",", -1).foreach(p => {
          package_name_set.add(p)
        })
        Constant.tencent_device_sql
          .replace("@dt", date)
          .replace("@check_package", "AND check_package(package_name)")
          .replace("@table", table)
      } else {
        PropertyUtil.getProperty("config.properties", s"$business.package_name").split(",", -1).foreach(p => {
          package_name_set.add(p)
        })
        Constant.device_sql
          .replace("@dt", date)
          .replace("@business", business)
          .replace("@check_package", "AND check_package(package_name)")
          .replace("@table", table)
      }

      if (!hh.equals("00")) {
        device_sql = device_sql.replace("@check_hr", s"AND hh = '$hh'")
          .replace("@check_update_date", "")
      } else {
        device_sql = device_sql.replace("@check_hr", "")
          .replace("@check_update_date", s"AND update_date = '$update_date'")
      }

      println(s"package_name_set.size -->> ${package_name_set.size}")

      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

      //  停掉 audience
      val stopAudience = PropertyUtil.getProperty("config.properties",
        s"rtdmp.stop.audience").split(",", -1).toSet[String]
        .map(r => {
          Integer.valueOf(r)
        })

      val rdd = spark.sql(device_sql).rdd.map(r => {
        val device_id = r.getAs[String]("device_id")
        var device_type = r.getAs[String]("device_type")
        val platform = r.getAs[String]("platform")
        val package_name = r.getAs[String]("package_name")

        device_type = {
          device_type match {
            case "imeimd5" => "imei_md5"
            case "gaidmd5" => "gaid_md5"
            case "oaidmd5" => "oaid_md5"
            case "idfamd5" => "idfa_md5"
            case _ => device_type
          }
        }
        (device_id, device_type, platform, package_name)
      }).repartition(coalesce)

      rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

      rdd.map(t => {
        (new Text(s"$output/${t._4}/${t._2}"), new Text(t._1))
      }).saveAsNewAPIHadoopFile(output, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]], conf)
      val mapRdd = rdd.map(t => {
        (t._4, (t._2, t._3))
      }).distinct(10)
        .cache()

      val pkgTypeCount = mapRdd.map(r => {
        val deviceType = r._2._1
        val platform = r._2._2
        val packageName = {
          if (business.equals("btop") && !r._1.contains("_btop")) {
            r._1 + "_btop"
          } else {
            r._1
          }
        }
        (packageName, (deviceType, platform))
      }).countByKey()
      val portalMap = mapRdd
        .collectAsMap()

      val uploadPkgSet = new mutable.HashSet[String]()
      val uploadJsonArray = new JSONArray()
      val updateJsonArray = new JSONArray()
      portalMap.foreach(m => {
        val jsonObject = new JSONObject()
        val package_name = {
          if (business.equals("btop") && !m._1.contains("_btop")) {
            m._1 + "_btop"
          } else {
            m._1
          }
        }
        var platform = if (StringUtils.isBlank(m._2._2)) {
          0
        } else if (m._2._2.equals("ios")) {
          2
        } else {
          1
        }
        if (platform == 0) {
          platform = if (MobvistaConstant.iosPkgPtn.matcher(m._1).matches()) {
            2
          } else {
            1
          }
        }
        val match_device_type = Logic.match_device_type(m._2._1)
        val map = Logic.getAudienceInfo(business)

        if (map.contains(package_name)) {
          if (!stopAudience.contains(map(package_name))) {
            jsonObject.put("id", map(package_name))
            //  jsonObject.put("audience_name", package_name)
            if (pkgTypeCount(package_name) == 1) {
              jsonObject.put("s3_path", s"$output/${m._1}/${m._2._1}/")
            } else {
              jsonObject.put("s3_path", s"$output/${m._1}/*/")
            }
            jsonObject.put("status", 1)
            jsonObject.put("audience_data_status", 1)
            updateJsonArray.add(jsonObject)
          }
        } else {
          if (pkgTypeCount(package_name) == 1) {
            jsonObject.put("s3_path", s"$output/${m._1}/${m._2._1}/")
          } else {
            jsonObject.put("s3_path", s"$output/${m._1}/*/")
          }
          jsonObject.put("platform", platform)
          jsonObject.put("match_device_type", match_device_type)
          jsonObject.put("audience_type", 2)
          jsonObject.put("data_update_method", 1)
          jsonObject.put("audience_name", package_name)
          jsonObject.put("status", 1)
          jsonObject.put("audience_gender", 3)
          jsonObject.put("audience_count", 1)
          jsonObject.put("is_sync_dmpserver", 1)
          jsonObject.put("audience_data_status", 1)
          uploadPkgSet.add(package_name)
          uploadJsonArray.add(jsonObject)
        }
      })

      val uploadJsonObject = ServerUtil.upload(uploadJsonArray)

      if (uploadJsonObject.getInteger("code") == 200) {
        println("RTDmp Upload OK,AudienceId -->> " + uploadJsonObject.getJSONArray("data"))
      }

      Logic.writeAudienceInfo(business, Logic.getAudienceMap(uploadPkgSet))

      //  每次更新一个
      for (i <- 0 until updateJsonArray.size()) {
        val updateObject = new JSONArray()
        updateObject.add(updateJsonArray.getJSONObject(i))
        var flag = true
        while (flag) {
          val updateJsonObject = ServerUtil.update(updateObject)
          if (updateJsonObject.getInteger("code") == 200) {
            println("RTDmp Update OK,updateJson -->> " + updateObject)
            flag = false
          }
        }
      }

    } finally {
      if (sc != null) {
        sc.stop()
      }
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }
}

object RTDmpRequest {
  def main(args: Array[String]): Unit = {
    new RTDmpRequest().run(args)
  }
}