package mobvista.dmp.datasource.retargeting

//  import com.datastax.spark.connector._

import com.alibaba.fastjson.{JSON, JSONObject}
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.util.DateUtil
import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{SaveMode, SparkSession}

import java.net.URI
import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable

/**
 * @package: mobvista.dmp.datasource.retargeting
 * @author: wangjf
 * @date: 2019/5/23
 * @time: 下午4:41
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
class DeviceInfoJob extends CommonSparkJob with Serializable {
  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("date", true, "date")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options
  }

  import java.text.SimpleDateFormat

  val sdf1 = new SimpleDateFormat("yyyy-MM-dd")
  val sdf2 = new SimpleDateFormat("yyyyMMdd")

  var bMap: Broadcast[scala.collection.Map[String, String]] = null
  var packageMap: Broadcast[scala.collection.Map[String, Int]] = null

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val date = commandLine.getOptionValue("date")
    val output = commandLine.getOptionValue("output")
    val coalesce = commandLine.getOptionValue("coalesce")
    val spark = SparkSession
      .builder()
      .appName("DeviceInfoJob")
      .config("spark.rdd.compress", "true")
      .config("spark.shuffle.compress", "true")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.io.compression.codec", "lz4")
      .config("spark.io.compression.lz4.blockSize", "64k")
      .config("spark.sql.autoBroadcastJoinThreshold", "314572800")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()
    try {

      FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)

      val last_req_day = DateUtil.getDayByString(date, "yyyyMMdd", -13)
      val update_date = sdf1.format(sdf2.parse(last_req_day))

      val sc = spark.sparkContext
      val code_sql = Constant.old2new_sql
      bMap = sc.broadcast(spark.sql(code_sql).rdd.map(r => {
        (r.getAs("tag_code").toString, r.getAs("new_second_id").toString)
      }).collectAsMap())

      println("bMap.size ===>>> " + bMap.value.size)

      val map = sc.broadcast(spark.sql(Constant.second2first_sql).rdd.map(r => {
        (r.getAs("new_second_id").toString, r.getAs("new_first_id").toString)
      }).collectAsMap())

      println("map.size ===>>> " + map.value.size)

      var package_sql =
        """
          |SHOW PARTITIONS dwh.package_mapping
        """.stripMargin
      var partDF = spark.sql(package_sql)
      val package_dt = partDF.orderBy(partDF("partition").desc).first.getString(0).split("=")(1)
      package_sql =
        s"""
           |SELECT id, package_name FROM dwh.package_mapping WHERE dt = '${package_dt}'
        """.stripMargin
      packageMap = spark.sparkContext.broadcast(spark.sql(package_sql).rdd.map(r => {
        (r.getAs("package_name").toString.toLowerCase, Integer.parseInt(r.getAs("id").toString))
      }).collectAsMap())

      /*
      packageMap = sc.broadcast(Constant.jdbcConnection(spark, "mob_adn", "dmp_app_map").rdd.map(r => {
        (r.getAs("app_package_name").toString, Integer.parseInt(r.getAs("id").toString))
      }).collectAsMap())
      */

      spark.udf.register("getId", getId _)
      spark.udf.register("getInstallList", getInstallList _)
      spark.udf.register("getInterestList", getInterestList _)
      spark.udf.register("getFrenquency", getFrenquency _)
      spark.udf.register("checkDevice", Constant.checkDevice _)
      val statistics_sql =
        """
          |SHOW PARTITIONS dwh.dmp_device_tag_statistics
        """.stripMargin
      partDF = spark.sql(statistics_sql)
      val freqDate = partDF.orderBy(partDF("partition").desc).first.getString(0).split("=")(1)
      val freqSql = Constant.statistics_sql.replace("@date", freqDate)
      spark.sql(freqSql).createOrReplaceTempView("dm_active")

      val active_sql =
        """
          |SHOW PARTITIONS dwh.dm_active_tag
        """.stripMargin
      partDF = spark.sql(active_sql)
      var activeDate = partDF.orderBy(partDF("partition").desc).take(2)(1).getString(0).split("/")(0).split("=")(1)
      val part = partDF.orderBy(partDF("partition").desc).take(2)(1).getString(0).split("/")(1).split("=")(1)
      if (!part.equals("month")) {
        activeDate = DateUtil.getDayByString(activeDate, "yyyyMMdd", -1)
      }

      //  .replace("@last_req_day", last_req_day)
      val sql = Constant.ods_dmp_user_info_all_sql_distinct
        .replaceAll("@date", date)
        .replaceAll("@activeDate", activeDate)
        .replaceAll("@update_date", update_date)

      import spark.implicits._
      spark.sql(sql)
        .rdd.map(r => {
        import scala.collection.JavaConversions._

        val interest: mutable.HashSet[String] = new mutable.HashSet[String]()
        val freObject = if (r.getAs("frequency") != null && StringUtils.isNotBlank(r.getAs("frequency").toString)) {
          JSON.parseObject(r.getAs("frequency").toString)
        } else {
          new JSONObject()
        }
        freObject.keySet().foreach(key => {
          interest.add(map.value(key))
          interest.add(key)
        })
        /*
        val frequencyEntity = FrequencyEntity(r.getAs("frequency"))

        if (frequencyEntity.frequency != null && frequencyEntity.frequency.nonEmpty) {
          for (i <- frequencyEntity.frequency.indices) {
            val tag = frequencyEntity.frequency.get(i).asInstanceOf[GenericRowWithSchema].getAs("tag").toString
            val cnt = Integer.parseInt(frequencyEntity.frequency.get(i).asInstanceOf[GenericRowWithSchema].getAs("cnt").toString)
            jsonObject.put(tag, cnt)
            interest.add(map.value(tag))
            interest.add(tag)
          }
        }
        */
        val interestArr = r.getAs("interest").asInstanceOf[mutable.WrappedArray[String]]
        interestArr.foreach(i => {
          interest.add(map.value(i))
          interest.add(i)
        })

        val tag_week_jsonObject = new JSONObject()
        if (r.getAs("tag_week") != null && StringUtils.isNotBlank(r.getAs("tag_week"))) {
          val jsonArray = GsonUtil.String2JsonArray(r.getAs("tag_week"))
          for (json <- jsonArray) {
            val j = json.getAsJsonObject
            val tag_id = j.get("tag_id").getAsString
            val cntJson = new JSONObject()
            val cnt = j.get("cnt").getAsInt
            cntJson.put("cnt", cnt)
            val count = j.get("count").getAsInt
            cntJson.put("count", count)
            tag_week_jsonObject.put(tag_id, cntJson)
            interest.add(map.value(tag_id))
            interest.add(tag_id)
          }
        }
        val tag_month_jsonObject = new JSONObject()
        if (r.getAs("tag_month") != null && StringUtils.isNotBlank(r.getAs("tag_month"))) {
          val jsonArray = GsonUtil.String2JsonArray(r.getAs("tag_month"))
          for (json <- jsonArray) {
            val j = json.getAsJsonObject
            val tag_id = j.get("tag_id").getAsString
            val cntJson = new JSONObject()
            val cnt = j.get("cnt").getAsInt
            cntJson.put("cnt", cnt)
            val count = j.get("count").getAsInt
            cntJson.put("count", count)
            tag_month_jsonObject.put(tag_id, cntJson)
            interest.add(map.value(tag_id))
            interest.add(tag_id)
          }
        }

        DeviceInfoEntity(r.getAs("device_id").toString.toUpperCase, r.getAs("platform"), r.getAs("model"), r.getAs("os_version"),
          r.getAs("country"), r.getAs("age"), r.getAs("gender"), r.getAs("install"), mutable.WrappedArray.make(interest.toArray[String]),
          r.getAs("behavior"), freObject.toJSONString, tag_week_jsonObject.toJSONString, tag_month_jsonObject.toJSONString, r.getAs("region"),
          r.getAs("update_date"), r.getAs("publish_date"))
      }).toDF
        .repartition(coalesce.toInt)
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "snappy")
        .orc(output)

    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  //  生成 Set[inter_id]
  def getInterestList(interest: String): Array[String] = {
    val set = new util.HashSet[String]()
    interest.split("#").foreach(inters => {
      val ins = inters.toUpperCase.split(",")
      if (ins.length >= 3) {
        val key = ins(0) + "-" + ins(1) + "-" + ins(2)
        val vals = if (bMap.value.keySet.contains(key)) {
          bMap.value(key)
        } else {
          bMap.value.getOrElse(key + "OTHER", "")
        }
        if (StringUtils.isNotBlank(vals)) {
          set.add(vals)
        }
      }
    })
    set.asScala.toArray
  }

  def getId(tag_code: String): String = {
    val id = if (bMap.value.keySet.contains(tag_code.toUpperCase)) {
      bMap.value(tag_code.toUpperCase)
    } else {
      bMap.value.getOrElse(tag_code.toUpperCase + "OTHER", "")
    }
    id
  }

  //  生成 Set[package_name]
  def getInstallList(install: String): Array[Int] = {
    val set = new util.HashSet[Int]()
    if (StringUtils.isNotBlank(install)) {
      install.split(",").foreach(pkgs => {
        val pkd = pkgs.split("\\|")
        if (pkd.nonEmpty && StringUtils.isNotBlank(pkd(0)) && packageMap.value.contains(pkd(0).toLowerCase)
        ) {
          set.add(packageMap.value(pkd(0).toLowerCase))
        }
      })
    }
    set.asScala.toArray
  }

  def getFrenquency(frenquency: String): Array[(String, Int)] = {
    val set = new util.HashSet[(String, Int)]()
    frenquency.split(",").foreach(fn => {
      val fns = fn.split(":")
      if (StringUtils.isNotBlank(fns(0))) {
        set.add(fns(0), fns(1).toInt)
      }
    })
    set.asScala.toArray
  }
}

object DeviceInfoJob {
  def main(args: Array[String]): Unit = {
    new DeviceInfoJob().run(args)
  }
}