package mobvista.dmp.datasource.dm

import com.alibaba.fastjson.JSON
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.util.DateUtil
import org.apache.commons.cli.Options
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.functions._

/**
  * @package: mobvista.dmp.datasource.dm
  * @author: wangjf
  * @date: 2020/5/25
  * @time: 6:30 下午
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
class ValidateDmpInstallList extends CommonSparkJob with Serializable {
  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      printOptions(commandLine)
      return 1
    } else {
      printOptions(commandLine)
    }

    val date = commandLine.getOptionValue("date")

    val spark = mobvista.dmp.common.MobvistaConstant.createSparkSession("ValidateDmpInstallList")

    spark.udf.register("get_pkg_size", get_pkg_size _)
    spark.udf.register("filter", filter _)
    val update_date = DateUtil.format(DateUtil.getDay(date, "yyyyMMdd", 0), "yyyy-MM-dd")

    val sql = common_sql.replace("@dt", date).replace("@update_date", update_date).replace("@last_day", update_date)
    spark.sql(sql.replace("@system", "m")).select(
      count(lit(1)).alias("m_uv"),
      sum(col("pkg_size")).alias("m_pkg_size")
    ).foreach(r => {
      println("m_uv -->> " + r.getAs[Int]("m_uv") + ",m_pkg_size -->> " + r.getAs[Int]("m_pkg_size"))
    })

    spark.sql(sql.replace("@system", "dsp")).select(
      count(lit(1)).alias("dsp_uv"),
      sum(col("pkg_size")).alias("dsp_pkg_size")
    ).foreach(r => {
      println("dsp_uv -->> " + r.getAs[Int]("dsp_uv") + ",dsp_pkg_size -->> " + r.getAs[Int]("dsp_pkg_size"))
    })

    0
  }

  def get_pkg_size(install_list: String) = {
    import scala.collection.JavaConverters._
    val installMap = JSON.parse(install_list).asInstanceOf[java.util.Map[String, String]].asScala
    val pkg_size = installMap.retain((k, _) => !k.equals("0000000000") && !k.equals("com.nonepkg.com.nonepkg")).toMap.asJava.size()
    pkg_size
  }

  def filter(ext_data: String, system: String, last_day: String) = {
    var flag = false
    val ext_json = mobvista.dmp.common.MobvistaConstant.String2JSONObject(ext_data)
    if (system.equals("m") && ext_json.containsKey("m")) {
      val m_json = ext_json.getJSONObject("m")
      val last_date = m_json.getString("last_date")
      var isMstrategy = false
      if (m_json.containsKey("strategy")) {
        val strategyList = m_json.getJSONArray("strategy").toArray
        for (strategy <- strategyList if strategy.toString.contains("MNormalAlphaModelRanker")) {
          isMstrategy = true
        }
      }
      val dev_tag = m_json.getString("dev_tag")
      flag = last_date.equals(last_day) && StringUtils.isNotBlank(dev_tag) && dev_tag.equals("1") && isMstrategy
    }
    if (system.equals("dsp") && ext_json.containsKey("dsp")) {
      val dsp_json = ext_json.getJSONObject("dsp")
      val last_date = dsp_json.getString("last_date")
      flag = last_date.equals(last_day)
    }
    flag
  }

  val common_sql =
    """
      |SELECT device_id,get_pkg_size(install_list) pkg_size FROM dwh.dmp_install_list
      |  WHERE dt = '@dt' AND update_date = '@update_date' AND business = '14days' AND filter(ext_data,'@system','@last_day')
      |""".stripMargin

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("date", true, "[must] date")
    options
  }
}

object ValidateDmpInstallList {

  def main(args: Array[String]): Unit = {
    new ValidateDmpInstallList().run(args)
  }
}