package mobvista.dmp.datasource.fmp

import java.text.SimpleDateFormat

import com.google.gson.{JsonArray, JsonObject}
import mobvista.dmp.datasource.dm.Constant.{allZero, didPtn}
import mobvista.dmp.util.DateUtil
import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.lang3.StringUtils
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel

import scala.collection.JavaConversions._

/**
  * @package: mobvista.dmp.datasource.fmp
  * @author: wangjf
  * @date: 2019-06-10
  * @time: 16:57
  * @emial: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
object Constant {

  val sdf1 = new SimpleDateFormat("yyyy-MM-dd")
  val sdf2 = new SimpleDateFormat("yyyyMMdd")

  def parseJsonStringJob(jsonString: String): JobProfileEntity = {
    GsonUtil.fromJson(GsonUtil.String2JsonObject(jsonString), classOf[JobProfileEntity])
  }

  def parseDimension(json: JsonObject): DimensionEntity = {
    GsonUtil.fromJson(json, classOf[DimensionEntity])
  }

  def getDF(spark: SparkSession, jobProfileEntity: JobProfileEntity): (DataFrame, Int) = {

    import spark.implicits._
    var user_sql =
      """
        |SHOW PARTITIONS dwh.dm_user_info
      """.stripMargin
    var partDF = spark.sql(user_sql)
    var date = partDF.orderBy(partDF("partition").desc).first.getString(0).split("=")(1)

    user_sql =
      s"""
         |SELECT UPPER(device_id) device_id, frequency FROM dwh.dm_user_info WHERE dt = '${date}'
        """.stripMargin

    if (jobProfileEntity.last_req_day != null) {
      val updateDate = DateUtil.getDayByString(date, "yyyyMMdd", -jobProfileEntity.last_req_day)
      val update_date = sdf1.format(sdf2.parse(updateDate))
      user_sql = user_sql + s" AND update_date >= '${update_date}'"
    }

    if (StringUtils.isNotBlank(jobProfileEntity.behavior) && GsonUtil.String2JsonArray(jobProfileEntity.behavior).size() > 0) {
      var behavior = ""
      GsonUtil.String2JsonArray(jobProfileEntity.behavior).foreach(j => {
        behavior += "'" + j.getAsString.toLowerCase + "',"
      })
      behavior = "(" + behavior.substring(0, behavior.length - 1) + ")"
      user_sql = user_sql + s" AND LOWER(behavior) IN ${behavior}"
    }

    var active_sql =
      """
        |SHOW PARTITIONS dwh.dm_active_tag
      """.stripMargin
    partDF = spark.sql(active_sql)
    date = partDF.orderBy(partDF("partition").desc).take(2)(1).getString(0).split("/")(0).split("=")(1)

    val part = partDF.orderBy(partDF("partition").desc).take(2)(1).getString(0).split("/")(1).split("=")(1)
    /*
    if (!part.equals("month")) {
      date = DateUtil.getDayByString(date, "yyyyMMdd", -1)
    }
    */

    active_sql =
      s"""
         |SELECT UPPER(device_id) device_id, tags, part FROM dwh.dm_active_tag WHERE dt = '${date}'
      """.stripMargin

    if (StringUtils.isNotBlank(jobProfileEntity.country) && GsonUtil.String2JsonArray(jobProfileEntity.country).size() > 0) {
      var country = ""
      GsonUtil.String2JsonArray(jobProfileEntity.country).foreach(j => {
        country += "'" + j.getAsString.toUpperCase + "',"
      })
      country = "(" + country.substring(0, country.length - 1) + ")"
      user_sql = user_sql + s" AND UPPER(country) IN $country"

      active_sql = active_sql + s" AND UPPER(country_code) IN $country"
    }
    if (StringUtils.isNotBlank(jobProfileEntity.platform) && GsonUtil.String2JsonArray(jobProfileEntity.platform).size() > 0) {
      var platform = ""
      //  var platform_ids = ""
      GsonUtil.String2JsonArray(jobProfileEntity.platform).foreach(j => {
        platform += "'" + j.getAsString.toUpperCase + "',"
        //  platform_ids += "'" + (if (j.getAsString.toUpperCase.equals("IOS")) "0" else "1") + "',"
      })
      platform = "(" + platform.substring(0, platform.length - 1) + ")"
      //  platform_ids = "(" + platform_ids.substring(0, platform_ids.length - 1) + ")"
      user_sql = user_sql + s" AND UPPER(platform) IN $platform"

      //  active_sql = active_sql + s" AND UPPER(platform) IN $platform_ids"
    }

    if (StringUtils.isNotBlank(jobProfileEntity.package_name) && GsonUtil.String2JsonArray(jobProfileEntity.package_name).size() > 0) {
      var package_sql =
        """
          |SHOW PARTITIONS dwh.package_mapping
        """.stripMargin
      partDF = spark.sql(package_sql)
      date = partDF.orderBy(partDF("partition").desc).first.getString(0).split("=")(1)
      package_sql =
        s"""
           |SELECT id, package_name FROM dwh.package_mapping WHERE dt = '${date}'
        """.stripMargin
      val bMap = spark.sparkContext.broadcast(spark.sql(package_sql).rdd.map(r => {
        (r.getAs("package_name").toString.toUpperCase, r.getAs("id").toString)
      }).collectAsMap())

      spark.udf.register("filterByPackage", filterByPackage _)
      var package_name = ""
      GsonUtil.String2JsonArray(jobProfileEntity.package_name).foreach(j => {
        package_name += "" + bMap.value.getOrElse(j.getAsString.toUpperCase, "-1") + ","
      })
      package_name = package_name.substring(0, package_name.length - 1)
      user_sql = user_sql + s" AND filterByPackage(CONCAT_WS(',',install),'${package_name}')"
    }
    spark.udf.register("check_deviceId", check_deviceId _)

    user_sql = user_sql + " AND @check_deviceId"

    println("user_sql ==>> " + user_sql)
    val user_df = spark.sql(user_sql.replace("@check_deviceId", "check_deviceId(device_id)"))
    //  .rdd.map(r => {DeviceFrequencyEntity(r.getAs("device_id"), r.getAs("frequency"))})
    //  .persist(StorageLevel.MEMORY_AND_DISK_SER)

    active_sql = active_sql + " AND @check_deviceId"
    println("active_sql ==>> " + active_sql)
    val active_df = spark.sql(active_sql.replace("@check_deviceId", "check_deviceId(device_id)"))

    //  .rdd.map(r => {DeviceActiveEntity(r.getAs("device_id"), r.getAs("tags"), r.getAs("part"))})

    val df = if (StringUtils.isNotBlank(jobProfileEntity.dimension) && GsonUtil.String2JsonArray(jobProfileEntity.dimension).size() > 0) {
      val code_sql: String =
        """
          |SELECT new_second_id, tag_id FROM
          | dwh.dm_old2new_tag
        """.stripMargin
      val bMap = spark.sparkContext.broadcast(spark.sql(code_sql).rdd.map(r => {
        (r.getAs("new_second_id").toString, r.getAs("tag_id").toString)
      }).collectAsMap())
      val dimensionEntity = parseDimension(GsonUtil.String2JsonArray(jobProfileEntity.dimension).get(0).getAsJsonArray.get(0).getAsJsonObject)
      val part = dimensionEntity.active_dimension.toLowerCase

      val tmp_df = active_df.toDF.filter(s"part = '${part}'").select("device_id", "tags")
      tmp_df.createOrReplaceTempView("active")
      user_df.toDF.createOrReplaceTempView("user")

      println("user_df.count ==>> " + user_df.count())
      println("tmp_df.count ==>> " + tmp_df.count())
      val join_sql =
        """
          |SELECT a.device_id device_id, a.frequency frequency, b.tags tags
          | FROM user a JOIN active b
          | ON a.device_id = b.device_id
        """.stripMargin

      val join_df = spark.sql(join_sql)
        .rdd.map(r => {
        DeviceCntCount(r.getAs("device_id"), r.getAs("frequency"), r.getAs("tags"))
      }).persist(StorageLevel.MEMORY_ONLY_SER)
      println("join_df.count ==>> " + join_df.count())

      val rdd = dimensionForeach(join_df, GsonUtil.String2JsonArray(jobProfileEntity.dimension), bMap, spark).dropDuplicates
      rdd
    } else {
      val user_rdd = user_df.map(r => {
        r.getAs("device_id").toString
      }).toDF
      user_rdd.dropDuplicates
    }
    df.cache
    val limit: Int = if (jobProfileEntity.limit != null && jobProfileEntity.limit <= 10000000) {
      jobProfileEntity.limit
    } else {
      10000000
    }

    (df.limit(limit), df.count.toInt)
  }

  def check_deviceId(device_id: String): Boolean = {
    device_id.matches(didPtn) && !device_id.equals(allZero)
  }

  //  UDF
  def filterByPackage(packages: String, pkgs: String): Boolean = {
    val set = pkgs.split(",").toSet
    var flag = false
    val itr = packages.split(",").iterator
    //  val itr = packages.iterator
    while (itr.hasNext && !flag) {
      val rs = itr.next
      if (set.contains(rs)) {
        flag = true
      }
    }
    flag
  }

  // dimension 一致优化
  def dimensionForeach(df: RDD[DeviceCntCount], jsonArray: JsonArray, bMap: Broadcast[scala.collection.Map[String, String]], spark: SparkSession): DataFrame = {
    import spark.implicits._
    var outer_df = spark.emptyDataFrame
    jsonArray.foreach(json => {
      var inter_df = spark.emptyDataFrame
      json.getAsJsonArray.foreach(j => {
        val dimensionEntity = parseDimension(j.getAsJsonObject)
        //  val interest = bMap.value.getOrElse(dimensionEntity.interest, "0")
        val install_cnt = dimensionEntity.install_cnt
        val active_count = dimensionEntity.active_count
        inter_df = if (inter_df.rdd.isEmpty()) {
          parseCnt_Count(df, dimensionEntity.interest, bMap, install_cnt, active_count).toDF
        } else {
          inter_df.union(parseCnt_Count(df, dimensionEntity.interest, bMap, install_cnt, active_count).toDF)
        }
      })
      outer_df = if (outer_df.rdd.isEmpty()) {
        inter_df
      } else {
        outer_df.intersect(inter_df)
      }
    })
    outer_df
  }

  //  dimension 计算
  def dimensionForeach(user_df: RDD[DeviceFrequencyEntity], active_df: RDD[DeviceActiveEntity], jsonArray: JsonArray, bMap: Broadcast[scala.collection.Map[String, String]], spark: SparkSession): DataFrame = {
    import spark.implicits._
    var outer_df = spark.emptyDataFrame
    jsonArray.foreach(json => {
      var inter_df = spark.emptyDataFrame
      json.getAsJsonArray.foreach(j => {
        val dimensionEntity = parseDimension(j.getAsJsonObject)
        val interest = bMap.value.getOrElse(dimensionEntity.interest, "0")
        val install_cnt = dimensionEntity.install_cnt
        val active_count = dimensionEntity.active_count
        val active_dimension = dimensionEntity.active_dimension
        inter_df = if (inter_df.rdd.isEmpty()) {
          parseInterestCnt(user_df, interest, install_cnt).toDF.intersect(parseInterestActive(active_df, dimensionEntity.interest, active_count, active_dimension).toDF)
        } else {
          inter_df.union(parseInterestCnt(user_df, interest, install_cnt).toDF.intersect(parseInterestActive(active_df, dimensionEntity.interest, active_count, active_dimension).toDF))
        }
      })
      outer_df = if (outer_df.rdd.isEmpty()) {
        inter_df
      } else {
        outer_df.intersect(inter_df)
      }
    })
    outer_df
  }

  //  install_cnt 安装频度
  def parseCnt_Count(df: RDD[DeviceCntCount], interest: String, bMap: Broadcast[scala.collection.Map[String, String]], install_cnt: String, active_count: String): RDD[String] = {
    val cnter =
      if (install_cnt.split("-").size == 2) {
        (Integer.parseInt(install_cnt.split("-")(0)), Integer.parseInt(install_cnt.split("-")(1)))
      } else {
        (Integer.parseInt(install_cnt.split("-")(0)), -1)
      }

    val active_cnter =
      if (active_count.split("-").size == 2) {
        (Integer.parseInt(active_count.split("-")(0)), Integer.parseInt(active_count.split("-")(1)))
      } else {
        (Integer.parseInt(active_count.split("-")(0)), -1)
      }
    //  val query_tag = bMap.value.getOrElse(interest, "0")
    val result = df.filter(d => {
      var freq_flag = false
      if (d.frequency != null) {
        for (i <- d.frequency.indices if !freq_flag) {
          val tag = d.frequency.get(i).asInstanceOf[GenericRowWithSchema].getAs("tag").toString
          val cnt = Integer.parseInt(d.frequency.get(i).asInstanceOf[GenericRowWithSchema].getAs("cnt").toString)
          freq_flag = tag.equals(interest) && (if (cnter._2 == -1) cnt >= cnter._1 else cnt >= cnter._1 && cnt <= cnter._2)
        }
      }
      var active_flag = false
      val tags = GsonUtil.String2JsonArray(d.tags)
      for (tag <- tags if !active_flag) {
        val active = GsonUtil.fromJson(tag.getAsJsonObject, classOf[ActiveTagEntity])
        val tag_id = active.tag_id
        val cnt = active.cnt
        active_flag = tag_id.equals(interest) && (if (active_cnter._2 == -1) cnt >= active_cnter._1 else cnt >= active_cnter._1 && cnt <= active_cnter._2)
      }
      freq_flag && active_flag
    }).map(r => {
      r.deviceId
    })
    result
  }

  //  install_cnt 安装频度
  def parseInterestCnt(df: RDD[DeviceFrequencyEntity], interest: String, install_cnt: String): RDD[String] = {
    val cnter =
      if (install_cnt.split("-").size == 2) {
        (Integer.parseInt(install_cnt.split("-")(0)), Integer.parseInt(install_cnt.split("-")(1)))
      } else {
        (Integer.parseInt(install_cnt.split("-")(0)), -1)
      }
    val result = df.filter(d => {
      var flag = false
      if (d.frequency != null) {
        for (i <- d.frequency.indices if !flag) {
          val tag = d.frequency.get(i).asInstanceOf[GenericRowWithSchema].getAs("tag").toString
          val cnt = Integer.parseInt(d.frequency.get(i).asInstanceOf[GenericRowWithSchema].getAs("cnt").toString)
          flag = tag.equals(interest) && (if (cnter._2 == -1) cnt >= cnter._1 else cnt >= cnter._1 && cnt <= cnter._2)
        }
      }
      flag
    }).map(r => {
      r.device_id
    })
    result
  }

  //  active_count 活跃天数
  def parseInterestActive(df: RDD[DeviceActiveEntity], interest: String, active_count: String, active_dimension: String): RDD[String] = {
    val cnter =
      if (active_count.split("-").size == 2) {
        (Integer.parseInt(active_count.split("-")(0)), Integer.parseInt(active_count.split("-")(1)))
      } else {
        (Integer.parseInt(active_count.split("-")(0)), -1)
      }
    val result = df.filter(d => {
      d.part.equals(active_dimension)
    }).filter(d => {
      var flag = false
      val tags = GsonUtil.String2JsonArray(d.tags)
      val part = d.part
      for (tag <- tags if !flag) {
        val active = GsonUtil.fromJson(tag.getAsJsonObject, classOf[ActiveTagEntity])
        val tag_id = active.tag_id
        val cnt = active.cnt
        flag = tag_id.equals(interest) && (if (cnter._2 == -1) cnt >= cnter._1 else cnt >= cnter._1 && cnt <= cnter._2)
      }
      flag
    }).map(r => {
      r.device_id
    })
    result
  }

}