package mobvista.dmp.datasource.id_mapping

import mobvista.dmp.common.MobvistaConstant.{sdf1, sdf2}
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.id_mapping.Constant._
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}

import java.text.SimpleDateFormat
import scala.collection.JavaConverters._

 * @package: mobvista.dmp.datasource.id_mapping
 * @author: wangjf
 * @date: 2021/12/7
 * @time: 2:39 下午
 * @email:
class IDMappingGraphxResult extends CommonSparkJob with Serializable {
  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("country", true, "country")
    options.addOption("platform", true, "platform")
    options.addOption("date", true, "date")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val country = commandLine.getOptionValue("country")
    val platform = commandLine.getOptionValue("platform")
    val date = commandLine.getOptionValue("date")
    val output = commandLine.getOptionValue("output")
    val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))

    val spark = MobvistaConstant.createSparkSession(s"IDMappingGraphxResult.$date.$country.$platform")

    try {
      oldAndTodayIdMapping(country.toUpperCase, platform, date, spark, output, coalesce)
    } finally {
      if (spark != null) {

  def oldAndTodayIdMapping(country: String, platform: String, date: String, spark: SparkSession, outPutPath: String,
                           coalesce: Int) = {
    val dailySQL =
         |SELECT * FROM ads.ads_device_id_mapping WHERE dt = '$date' AND source = '${country.toLowerCase}' AND platform = '$platform' AND `type` = 'mid'
    var idSet: Array[String] = null
    var scoreMap: Map[String, Double] = null
    platform match {
      case "ios" =>
        idSet = iosIDSet
        scoreMap = iosIDScoreMap
      case "android" => {
        scoreMap = androidIDScoreMap
        country match {
          case "CN" =>
            idSet = androidCNIDSet
          case _ =>
            idSet = androidIDSet
      case _ =>

    val schedule_date = sdf1.format(sdf2.parse(date))

    val resultOneID = spark.sql(dailySQL).rdd.mapPartitions(rs => { => {
        val device_id = r.getAs[String]("device_id")
        val device_type = r.getAs[String]("device_type")
        val one_id = MobvistaConstant.String2JSONObject(r.getAs[String]("one_id"))
        val update_date = r.getAs[String]("update_date")
        val keys = one_id.keySet().asScala
        var oneIDScore: OneIDScore = OneIDScore("", "", 0, "")
        keys.foreach(key => {
          val sdf = new SimpleDateFormat("yyyy-MM-dd")
          val json = one_id.getJSONObject(key)
          val id_type = json.getString("one_type")
          val id_type_score = scoreMap(id_type)
          val active_date = json.getString("one_date")
          val cnt = json.getLongValue("one_cnt")
          val days = (sdf.parse(schedule_date).getTime - sdf.parse(active_date).getTime) / 1000 / 3600 / 24 + 1
          val score = id_type_score * 30 / days + 0.1 * cnt
          if (idSet.indexOf(id_type) < idSet.indexOf(oneIDScore.one_type) || idSet.indexOf(oneIDScore.one_type) == -1
            || (idSet.indexOf(id_type) == idSet.indexOf(oneIDScore.one_type) && score >= oneIDScore.one_score)) {
            oneIDScore = OneIDScore(key, id_type, score, active_date)
        val json = new JSONObject()
        json.put("one_id", oneIDScore.one_id)
        json.put("type", oneIDScore.one_type)
        json.put("score", oneIDScore.one_score)
        json.put("version", oneIDScore.one_version)
        Result(device_id, device_type, json.toJSONString, update_date)

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true)

    import spark.implicits._
      .option("orc.compress", "zlib")

         |ALTER TABLE ads.ads_device_id_mapping ADD IF NOT EXISTS PARTITION (dt='$date',source='${country.toLowerCase}',platform='$platform',`type`='result')
         | LOCATION '$outPutPath'

object IDMappingGraphxResult {
  def main(args: Array[String]): Unit = {
    new IDMappingGraphxResult().run(args)