package mobvista.dmp.datasource.id_mapping

import com.alibaba.fastjson.JSONObject
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.id_mapping.Constant._
import mobvista.dmp.util.MD5Util
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SparkSession}

import java.net.URI
import scala.collection.mutable.ArrayBuffer

/**
 * @package: mobvista.dmp.datasource.id_mapping
 * @author: wangjf
 * @date: 2021/12/7
 * @time: 2:39 下午
 * @email: jinfeng.wang@mobvista.com
 */
class IDMappingGraphx extends CommonSparkJob with Serializable {
  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("country", true, "country")
    options.addOption("platform", true, "platform")
    options.addOption("date", true, "date")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val country = commandLine.getOptionValue("country")
    val platform = commandLine.getOptionValue("platform")
    val date = commandLine.getOptionValue("date")
    val output = commandLine.getOptionValue("output")
    val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))

    val spark = MobvistaConstant.createSparkSession(s"IDMappingGraphx.$date.$country.$platform")

    try {
      oldAndTodayIdMapping(country, platform, date, spark, output, output, coalesce)
    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }


  def oldAndTodayIdMapping(country: String, platform: String, date: String, spark: SparkSession, outPutPath: String,
                           edgeoutPutPath: String, coalesce: Int) = {

    implicit val formats = org.json4s.DefaultFormats

    var dailySQL = ""
    var schame: StructType = null
    var idSet: Array[String] = null
    var idMainSet: Set[String] = null
    //  1.今日数据加载
    platform match {
      case "ios" =>
        dailySQL = Constant.ios_id_mapping_sql.replace("@date", date)
        schame = iosVertSchema
        idSet = iosIDSet
        idMainSet = iosMainIDSet
      case "android" => {
        schame = adrVertSchema
        idMainSet = androidMainIDSet
        country match {
          case "CN" =>
            idSet = androidCNIDSet
            dailySQL = Constant.android_id_mapping_sql.replace("@date", date).replace("@filter_country", s"AND country = '${country}'")
          case _ =>
            idSet = androidIDSet
            dailySQL = Constant.android_id_mapping_sql.replace("@date", date).replace("@filter_country", s"AND country != '${country}'")
        }
      }
      case _ =>
        ""
    }
    val todayDF = spark.createDataFrame(spark.sql(dailySQL).rdd.map(row => {
      processData(row, platform)
    }), schema = schame)

    val vertex = todayDF.rdd.map(row => {
      processVertex(date, row, idSet, idMainSet)
    }).flatMap(l => l)

    val maxGraph = vertex.combineByKey(
      (v: (String, String, String)) => Set(v),
      (c: Set[(String, String, String)], v: (String, String, String)) => c ++ Seq(v),
      (c1: Set[(String, String, String)], c2: Set[(String, String, String)]) => c1 ++ c2
    )

    //  非主ID生成OneID
    val multiOneIDRDD = maxGraph.filter(kv => {
      kv._2.size > 1
    }).map(rs => {
      platform match {
        case "ios" =>
          updateOneID(rs, Constant.iosMainIDSet)
        case _ =>
          updateOneID(rs, Constant.androidMainIDSet)
      }
    }).flatMap(l => l)

    //  主ID生成OneID
    val singleOneIDRDD = maxGraph.filter(kv => {
      kv._2.size == 1
    }).map(kv => {
      val oneID = new JSONObject()
      val srcID = kv._1
      var idType = ""
      kv._2.foreach(it => {
        idType = it._3
        oneID.put(it._1, MobvistaConstant.String2JSONObject(it._2))
      })
      (srcID, oneID.toJSONString, idType)
    })

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true)

    multiOneIDRDD.union(singleOneIDRDD)
      .repartition(coalesce)
      .saveAsTextFile(outPutPath, classOf[GzipCodec])
  }


  def processData(row: Row, platform: String): Row = {
    platform match {
      case "ios" =>
        var idfa = row.getAs[String]("idfa")
        idfa = if (StringUtils.isNotBlank(idfa) && idfa.matches(didPtn) && !idfa.matches(allZero)) {
          idfa
        } else {
          ""
        }
        var idfv = row.getAs[String]("idfv")
        idfv = if (StringUtils.isNotBlank(idfv) && idfv.matches(didPtn) && !idfv.matches(allZero)) {
          idfv
        } else {
          ""
        }
        val pkg_name = row.getAs[String]("pkg_name")
        val sysid = row.getAs[String]("sysid")
        val bkupid = row.getAs[String]("bkupid")
        val xwho = row.getAs[String]("xwho")
        val user_id = row.getAs[String]("user_id")
        val country = row.getAs[String]("country")
        val ip = row.getAs[String]("ip")
        val ua = row.getAs[String]("ua")
        val brand = row.getAs[String]("brand")
        val model = row.getAs[String]("model")
        val os_version = row.getAs[String]("os_version")
        val osv_upt = row.getAs[String]("osv_upt")
        val upt = row.getAs[String]("upt")
        val cnt = row.getAs[Long]("cnt")
        val idfv_bundle = if (StringUtils.isNotBlank(idfv)) {
          MD5Util.getMD5Str(idfv + pkg_name)
        } else {
          ""
        }
        val bmosv_osv_upt = if (StringUtils.isNotBlank(osv_upt)) {
          MD5Util.getMD5Str(brand + model + os_version + osv_upt)
        } else {
          ""
        }
        val bmosv_upt = if (StringUtils.isNotBlank(upt)) {
          MD5Util.getMD5Str(brand + model + os_version + upt)
        } else {
          ""
        }
        val bmosv_ipua_bundle = if (StringUtils.isNotBlank(ip)) {
          MD5Util.getMD5Str(brand + model + os_version + ip + ua + pkg_name)
        } else {
          ""
        }
        //  IosVert(idfa, sysid, idfv_bundle, bmosv_osv_upt, bmosv_upt, bmosv_ipua_bundle, xwho, user_id, bkupid, cnt)
        Row(idfa, sysid, idfv_bundle, bmosv_osv_upt, bmosv_upt, bmosv_ipua_bundle, xwho, user_id, bkupid, cnt)
      case "android" =>
        val imei = row.getAs[String]("imei")
        val android_id = row.getAs[String]("android_id")
        val pkg_name = row.getAs[String]("pkg_name")
        val oaid = row.getAs[String]("oaid")
        val gaid = row.getAs[String]("gaid")
        val sysid = row.getAs[String]("sysid")
        val bkupid = row.getAs[String]("bkupid")
        val xwho = row.getAs[String]("xwho")
        val user_id = row.getAs[String]("user_id")
        val country = row.getAs[String]("country")
        val ip = row.getAs[String]("ip")
        val ua = row.getAs[String]("ua")
        val brand = row.getAs[String]("brand")
        val model = row.getAs[String]("model")
        val os_version = row.getAs[String]("os_version")
        val upt = row.getAs[String]("upt")
        val cnt = row.getAs[Long]("cnt")
        val android_pkg = if (StringUtils.isNotBlank(android_id)) {
          MD5Util.getMD5Str(android_id + pkg_name)
        } else {
          ""
        }
        val bmosv_upt = if (StringUtils.isNotBlank(upt)) {
          MD5Util.getMD5Str(brand + model + os_version + upt)
        } else {
          ""
        }
        val bmosv_ipua_pkg = if (StringUtils.isNotBlank(ip)) {
          MD5Util.getMD5Str(brand + model + os_version + ip + ua + pkg_name)
        } else {
          ""
        }
        //  AdrVert(imei, gaid, oaid, sysid, android_pkg, bmosv_upt, bmosv_ipua_pkg, xwho, user_id, bkupid, cnt)
        if ("CN".equalsIgnoreCase(country)) {
          Row(imei, oaid, gaid, sysid, android_pkg, bmosv_upt, bmosv_ipua_pkg, xwho, user_id, bkupid, cnt)
        } else {
          Row(gaid, imei, oaid, sysid, android_pkg, bmosv_upt, bmosv_ipua_pkg, xwho, user_id, bkupid, cnt)
        }
      case _ =>
        Row("")
    }
  }

  def processVertex(date: String, row: Row, ids: Array[String], mainIDSet: Set[String]): ArrayBuffer[(String, (String, String, String))] = {
    val array = new ArrayBuffer[(String, (String, String, String))]()
    implicit val formats = org.json4s.DefaultFormats
    //  val json = JSON.parseObject(Serialization.write(row))
    //  事件频次
    val cnt = row.getAs[Long]("cnt")
    //  date 活跃日期,用于计算权重
    var flag = true
    for (i <- 0 to ids.length - 2) {
      if (StringUtils.isNotBlank(row.getAs[String](String.valueOf(ids(i)))) && flag) {
        val jsonObject = new JSONObject()
        val oneIDType = ids(i)
        jsonObject.put("id_type", oneIDType)
        jsonObject.put("active_type", date)
        jsonObject.put("cnt", cnt)
        val oneID = row.getAs[String](String.valueOf(ids(i)))
        array += ((oneID, (oneID, jsonObject.toJSONString, oneIDType)))
        for (j <- i + 1 until ids.length) {
          if (StringUtils.isNotBlank(row.getAs[String](String.valueOf(ids(j))))) {
            val srcType = ids(j)
            val srcOrg = row.getAs[String](String.valueOf(srcType))
            if (mainIDSet.contains(oneIDType)) {
              array += ((srcOrg, (oneID, jsonObject.toJSONString, srcType)))
            } else {
              array += ((oneID, (srcOrg, jsonObject.toJSONString, srcType)))
            }
          }
        }
        flag = false
      }
    }
    array
  }

  def updateOneID(kv: (String, Set[(String, String, String)]), mainIDSet: Set[String]): ArrayBuffer[(String, String, String)] = {
    val array = new ArrayBuffer[(String, String, String)]()
    val tmpOneId = kv._1
    val iters = kv._2
    val oneID = new JSONObject()
    iters.filter(ir => {
      tmpOneId.equals(ir._1) || mainIDSet.contains(MobvistaConstant.String2JSONObject(ir._2).getString("id_type"))
    }).foreach(ir => {
      oneID.put(ir._1, MobvistaConstant.String2JSONObject(ir._2))
    })
    iters.filter(tp => {
      !mainIDSet.contains(MobvistaConstant.String2JSONObject(tp._2).getString("id_type"))
    }).foreach(itr => {
      val k = itr._1
      val t = itr._3
      array += ((k, oneID.toJSONString, t))
    })
    array
  }
}

object IDMappingGraphx {
  def main(args: Array[String]): Unit = {
    new IDMappingGraphx().run(args)
  }
}