package mobvista.dmp.datasource.dm

import com.alibaba.fastjson.{JSONArray, JSONObject}
import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.{concat, concat_ws, lit}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel

import java.net.URI
import java.util
import scala.collection.mutable.ArrayBuffer
import mobvista.dmp.datasource.rtdmp.RtdmpNormal.doGetJson

/**
 * @author jiangfan
 * @date 2021/3/29 17:25
 */

class RtdmpNormal extends CommonSparkJob with Serializable {
  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("output01", true, "[must] output01")
    options.addOption("output02", true, "[must] output02")
    options.addOption("dt_today", true, "[must] dt_today")
    options.addOption("rtdmp_normal_count_result", true, "[must] rtdmp_normal_count_result")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val coalesce = commandLine.getOptionValue("coalesce")
    val output01 = commandLine.getOptionValue("output01")
    val output02 = commandLine.getOptionValue("output02")
    val dt_today = commandLine.getOptionValue("dt_today")
    val rtdmp_normal_count_result = commandLine.getOptionValue("rtdmp_normal_count_result")

    val spark = SparkSession.builder()
      .appName("RtdmpNormal")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    val sc = spark.sparkContext
    import spark.implicits._

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(rtdmp_normal_count_result), true)

    try {
      //      curl -H 'key: installapp' -H 'token: 52c1f9c84a4dbe6d6e08e668f134f23f'  'http://portal-rtdmp.mintegral.com/rtdmp/audience/query?page=1&is_install_app=1'
      val deviceTypeMap: util.Map[String, String] = new util.HashMap[String, String]
      deviceTypeMap.put("1", "imei")
      deviceTypeMap.put("2", "idfa")
      deviceTypeMap.put("3", "gaid")
      deviceTypeMap.put("4", "oaid")
      deviceTypeMap.put("5", "android_id")
      deviceTypeMap.put("6", "imeimd5")
      deviceTypeMap.put("7", "idfamd5")
      deviceTypeMap.put("8", "gaidmd5")
      deviceTypeMap.put("9", "oaidmd5")
      deviceTypeMap.put("10", "android_idmd5")
      val platformMap: util.Map[String, String] = new util.HashMap[String, String]
      platformMap.put("1", "android")
      platformMap.put("2", "ios")
      val url = "http://portal-rtdmp.mintegral.com/rtdmp/audience/query?page=1&is_install_app=1"
      val jsonObject: JSONObject = doGetJson(url)
      println(jsonObject)
      val data: JSONArray = jsonObject.getJSONArray("data")

      val result = new ArrayBuffer[(String, String, String, String, String)]()

      for (i <- 0 until data.size) {
        val row: JSONObject = data.getJSONObject(i)
        println(row)
        val s3_path: String = row.getString("s3_path")
        val match_device_type: String = row.getString("match_device_type")
        val device_type: String = deviceTypeMap.get(match_device_type)
        var platform: String = row.getString("platform")
        val package_name = row.getString("package_name")
        val country_code = row.getString("country_code")
        platform = platformMap.get(platform)
        println(s3_path)
        println(device_type)
        println(platform)
        if (!s3_path.equals("") && !device_type.equals("") && !platform.equals("") && !package_name.equals("") && !device_type.contains("oaid"))
          result += Tuple5(s3_path, device_type, platform, package_name, country_code)
      }

      println(result)
      val array: Array[(String, String, String, String, String)] = result.toArray

      var inputDataRdd: RDD[DmpDailyDataInformation] = spark.sparkContext.emptyRDD[DmpDailyDataInformation]
      import spark.implicits._

      for (index <- 0 until array.length) {
        println(index)
        val inputPath: String = array(index)._1
        val device_type: String = array(index)._2
        var device_type_md5 = device_type
        var device_type_not_md5 = device_type
        if (device_type.contains("md5")) {
          device_type_not_md5 = device_type_not_md5.replace("md5", "")
        } else {
          device_type_md5 = device_type_md5 + "md5"
        }
        val platform: String = array(index)._3
        val package_name: String = array(index)._4
        val country_code: String = array(index)._5
        println(inputPath)
        val pathUri = new URI(inputPath)
        //过滤后面这种不存在的s3路径  s3://mob-emr-test/dataplatform/rtdmp_request/2021/07/10/dsp_req/com.taobao.idlefish_bes/*/,
        if (FileSystem.get(new URI(s"${pathUri.getScheme}://${pathUri.getHost}"), sc.hadoopConfiguration)
          .exists(new Path(pathUri.toString.replace("*", "")))){
            inputDataRdd = inputDataRdd.union(spark.sparkContext.textFile(inputPath).map(row => {
              if (row.length == 32) {
                DmpDailyDataInformation(row, device_type_md5, platform, package_name, country_code)
              }
              else {
                DmpDailyDataInformation(row, device_type_not_md5, platform, package_name, country_code)
              }
            }
        ))
        }else{
            println(inputPath+" not existed!")
            inputDataRdd = inputDataRdd.union(spark.sparkContext.emptyRDD[DmpDailyDataInformation])
        }

      }

      val df: DataFrame = inputDataRdd.toDF().persist(StorageLevel.MEMORY_AND_DISK_SER)
      df.createOrReplaceTempView("tmp_normal_rtdmp_data")
      val sql1 = "select count(*) from tmp_normal_rtdmp_data"
      spark.sql(sql1).rdd.map(_.mkString("==")).saveAsTextFile(rtdmp_normal_count_result)

      df.select(concat_ws("\t", df.col("device_id"), df.col("device_type"), df.col("platform"), concat(lit("[\""), df.col("package_name"), lit("\"]")))).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
      df.select(concat_ws("\t", df.col("device_id"), df.col("device_type"), df.col("platform"), df.col("country"))).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)

    } finally {
      spark.stop()
    }
    0
  }
}

object RtdmpNormal {
  def main(args: Array[String]): Unit = {
    new RtdmpNormal().run(args)
  }
}


case class DmpDailyDataInformation(device_id: String, device_type: String, platform: String, package_name: String, country: String)