package mobvista.dmp.datasource.adn_sdk

import java.net.URI
import java.text.SimpleDateFormat
import java.util.{Date, Properties}
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import mobvista.dmp.common.MobvistaConstant
import mobvista.dmp.datasource.apptag.Constant
import mobvista.dmp.datasource.datatory.ConstantV2
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{SparkSession, _}
import org.apache.spark.storage.StorageLevel

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.control.Breaks._

/**
 * 刘凯      2018-01-18 15:20
 * adn_sdk 原始日志接入至etl_adn_sdk_daily表
 */
object AdnSdkDaily extends Serializable {
  val didPtn = "^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$"
  val allZero = "00000000-0000-0000-0000-000000000000"
  val imeiPtn = "^([0-9]{15})$"
  val andriodIdPtn = "^[a-zA-Z0-9]{16}$"


  def main(args: Array[String]) {
    val spark = SparkSession.builder()
      .appName("AdnSdkDaily")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    val loadTime = spark.conf.get("spark.app.loadTime")
    val input_path = spark.conf.get("spark.app.input_path")
    val output_path = spark.conf.get("spark.app.output_path")
    val coalesce = Integer.parseInt(spark.conf.get("spark.app.coalesce"))

    val sc = spark.sparkContext
    val url = "http://setting.rayjump.com/pkg?app_id=24282&sign=489942276c3cf759447d81719c104b95"
    val stringToString: java.util.Map[String, String] = AdnSdkTools.getCampaignidPackagenameMap(url)
    import scala.collection.JavaConverters._
    val campaignIdPackageNameMap: collection.Map[String, String] = mapAsScalaMapConverter(stringToString).asScala
    val campaignIdPackageNameMapBC: Broadcast[collection.Map[String, String]] = sc.broadcast(campaignIdPackageNameMap)

    try {
      val log_rdd = spark.sparkContext
        .textFile(input_path).coalesce(coalesce)
      val filter_rdd = log_rdd
        .filter(l => {
          !MobvistaConstant.String2JSONObject(l).isEmpty
        }).mapPartitions(
        elements => {
          val linesArr = new ArrayBuffer[Row]()
          if (elements.nonEmpty) {
            elements.foreach(p => {
              val etl_json = AdnSdkTools.getEtlJsonV2(p)
              //  device_id,device_type,platform,data,model,country,sdk_version,brand
              val device_id = etl_json.getString("device_id")
              val device_type = etl_json.get("device_type")
              val platform = etl_json.get("platform")
              val data = etl_json.get("data")
              val model = etl_json.get("model")
              val brand = etl_json.get("brand")
              val country = etl_json.get("country")
              val sdk_version = etl_json.get("sdk_version")
              val imei = etl_json.getString("imei")
              val android_id = etl_json.getString("android_id")
              val rid_n = etl_json.getString("rid_n")
              val hb = etl_json.getString("hb")

              if (StringUtils.isNotBlank(imei) && imei.matches(imeiPtn)) {
                linesArr += Row(imei, "imei", "android", data, model, country, sdk_version, brand,rid_n,hb)
              }

              if (StringUtils.isNotBlank(android_id) && android_id.matches(andriodIdPtn)) {
                linesArr += Row(android_id, "androidid", "android", data, model, country, sdk_version, brand,rid_n,hb)
              }

              if (StringUtils.isNotBlank(device_id) && device_id.matches(didPtn) && !allZero.equals(device_id)) {
                linesArr += Row(device_id, device_type, platform, data, model, country, sdk_version, brand, rid_n,hb)
              }
            })
          }
          linesArr.toIterator
        }
      )

      filter_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
      /*{
       //device_id,device_type,platform,data,model,brand
        val device_id = etl_json.getString("device_id")
        val device_type = etl_json.get("device_type")
        val platform = etl_json.get("platform")
        val data = etl_json.get("data")
        val model = etl_json.get("model")
        val brand = etl_json.get("brand")
        val imei =  etl_json.getString("imei")
        val android_id = etl_json.getString("android_id")

        if(StringUtils.isNotBlank(imei) &&  imei.matches(imeiPtn)){
          linesArr += Row(imei,"imei","android",data,model, brand)
        }

        if(StringUtils.isNotBlank(android_id) && android_id.matches(andriodIdPtn)){
          linesArr += Row(android_id,"android_id","android", data,model,brand)
        }

        if(device_id.matches(didPtn) && !allZero.equals(device_id)){
        linesArr += Row(device_id,device_type,platform,data,model,brand)
        }
      }
      )*/
      /* .filter { x => {
         val device_id = x.getString(0)
         val data = x.getString(3)
         data.contains("key=2000041") && data.contains("iex=1") && ((device_id.matches(didPtn) && !allZero.equals(device_id)) || || device_id.matches(andriodIdPtn))
       }
       }*/
      val input_schema = StructType(Array(
        StructField("device_id", StringType),
        StructField("device_type", StringType),
        StructField("platform", StringType),
        StructField("data", StringType),
        StructField("model", StringType),
        StructField("country", StringType),
        StructField("sdk_version", StringType),
        StructField("brand", StringType),
        StructField("rid_n", StringType),
        StructField("hb", StringType)
      ))
      val rdd_2000047 = filter_rdd.filter(row => {
        row.getString(3).contains("key=2000047")
      })

      spark.createDataFrame(rdd_2000047, input_schema).filter("device_id!=''").createOrReplaceTempView("adn_sdk_input_2000047")
      import spark.implicits._
      val sql01 =
        s"""select requestid,campaignid from dwh.ods_adn_trackingnew_request
           | LATERAL VIEW explode(split(extra2,',')) test_tmp_table AS campaignid
           | where concat(yyyy,mm,dd)='${loadTime}' and requestid!='' and campaignid!=''
           | """.stripMargin
      val Requestid_Campaignid_df = spark.sql(sql01).select($"requestid".cast("string"), $"campaignid".cast("string"))
      Requestid_Campaignid_df.createOrReplaceTempView("Requestid_Campaignid")

//      mysql -h adn-data-foronlinetest.c5yzcdreb1xr.us-east-1.rds.amazonaws.com -P 3306 -u adnro  -pYcM123glh
//      本来是spark访问mysql表campaign_list,但是读取数据时一直出现最后一个task持续运行不结束,可能时出现了数据倾斜。发现hive表dwh.ods_adn_campaign_list与mysql表数据相关联,最终采用hive表
//      val properties = new Properties()
//      properties.put("user", "adnro")
//      properties.put("password", "YcM123glh")
//      val url = "jdbc:mysql://adn-data-foronlinetest.c5yzcdreb1xr.us-east-1.rds.amazonaws.com:3306/mob_adn"
//      val campaign_list_df = spark.read.jdbc(url, "campaign_list", "id", 1, 200000000, 500, properties)
//        .select("id", "trace_app_id")
//        .toDF("campaignid", "packagename")
//        .select($"campaignid".cast("string"), $"packagename".cast("string"))
//        .filter( !$"packagename".equalTo("") )

      val sql02 =
        s"""
           |select  id,package_name from dwh.ods_adn_campaign_list
              """.stripMargin
      val campaign_list_df = spark.sql(sql02).select($"id".cast("string"), $"package_name".cast("string"))
      campaign_list_df.createOrReplaceTempView("Campaignid_Packagename")

      val Key2000047_rdd: RDD[Row] = spark.sql(
        s"""
          |select t1.device_id,t1.device_type,t1.platform,t1.data,t1.model,t1.country,t1.sdk_version,t1.brand,t3.id,t3.package_name from (select * from adn_sdk_input_2000047 where hb='0') t1
          |join Requestid_Campaignid t2 on t1.rid_n=t2.requestid
          |join Campaignid_Packagename t3 on t2.campaignid=t3.id
          |union
          |select t1.device_id,t1.device_type,t1.platform,t1.data,t1.model,t1.country,t1.sdk_version,t1.brand,t2.campaign_id,t2.package_name from (select * from adn_sdk_input_2000047 where hb='1') t1
          |join (select get_json_object(ext_dsp, '$$.bd') package_name,bidid rid_n,get_json_object(ext_dsp, '$$.cid')  campaign_id from dwh.ods_adn_hb_v1_bid where concat(yyyy,mm,dd)='${loadTime}') t2 on t1.rid_n=t2.rid_n
          |""".stripMargin
      ).rdd.map(contents => {
        parseKey2000047Data(contents, loadTime)
      })

      val cal_rdd = filter_rdd.filter(row => {row.getString(3).contains("key=2000079") || (row.getString(3).contains("key=2000041") && row.getString(3).contains("iex=1")  ) }).map {
        /*p => parseCalData(p, loadTime)*/ contents => {
          parseCalData(contents, loadTime, campaignIdPackageNameMapBC)
        }
      }
      val install_schema = StructType(Array(
        StructField("device_id", StringType),
        StructField("device_type", StringType),
        StructField("platform", StringType),
        StructField("model", StringType),
        StructField("country", StringType),
        StructField("sdk_version", StringType),
        StructField("brand", StringType),
        StructField("install_list", StringType),
        StructField("version", IntegerType)
      ))

      spark.createDataFrame(cal_rdd.union(Key2000047_rdd), install_schema).filter("device_id!=''").createOrReplaceTempView("adn_sdk_tab")
      spark.udf.register("merge", merge _)
      //去重复
      val install_re = spark.sql("select device_id,device_type,platform,max(model) model,max(brand) brand,max(country) country,max(sdk_version) sdk_version,merge(collect_set(install_list)) install_list, version from adn_sdk_tab group by device_id,device_type,platform,version")

      FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output_path),
        true)

      install_re.repartition(100).write.format("orc").option("orc.compress", "zlib").mode("overwrite").save(output_path)
      // install_df.repartition(100).write.format("orc").option("orc.compress", "zlib").mode("overwrite").save(output_path)
    } finally {
      if (spark != null) {
        spark.sparkContext.stop()
      }
    }
  }

  def merge(installList: mutable.WrappedArray[String]): String = {
    val installJSONArray = new JSONArray()
    installList.iterator.foreach(install => {
      installJSONArray.fluentAddAll(String2JSONArray(install))
    })
    installJSONArray.toJSONString
  }

  def String2JSONArray(str: String): JSONArray = {
    val jsonArray = if (null != str && !("" == str)) {
      try {
        val element = JSON.parseArray(str)
        if (!element.isEmpty) {
          element
        } else {
          new JSONArray()
        }
      } catch {
        case _: Exception =>
          new JSONArray()
      }
    } else {
      new JSONArray()
    }
    jsonArray
  }

  def parseCalData(row: Row, log_day: String, stringToStringMapBC: Broadcast[collection.Map[String, String]]) = {
    val data = row.getString(3)
    val sdk_version = row.getString(6)
    val model = row(4)
    val package_array = new JSONArray()
    val device_type = row(1)
    var version = 0 //  version = 0 表示默认,1 表示 key=2000079
    try {
      if (data.indexOf("cal=") > -1) {
        val a = data.substring(data.indexOf("cal="))
        val cal = a.substring(4, a.length()).split("&")(0)
        val receive_day = log_day.substring(0, 4) + "-" + log_day.substring(4, 6) + "-" + log_day.substring(6, 8)
        var install_array = JSON.parseArray(AdnSdkTools.evaluate(cal))
        if (install_array != null) {
          if (data.contains("key=2000079")) {
            for (a <- 0 to install_array.size() - 1) {
              breakable {
                var obj = install_array.getJSONObject(a)
                var label = ""
                if (obj.get("label") != null) {
                  label = obj.get("label").toString()
                }
                var campaignid = ""
                var package_name = ""
                if (obj.get("campaignid") != null) {
                  campaignid = obj.get("campaignid").toString()
                  val maybeString: Option[String] = stringToStringMapBC.value.get(campaignid)
                  if (maybeString.isDefined) {
                    package_name = maybeString.get
                  }
                  else {
                    break
                  }
                }
                else {
                  break
                }
                val install_json = new JSONObject()
                if (obj.get("ts") != null && obj.getString("ts").nonEmpty && obj.getString("ts").length() > 4) {
                  val ts = obj.getLong("ts")
                  val time_format = new SimpleDateFormat("yyyy-MM-dd")
                  val date = new Date()
                  date.setTime(ts)
                  val ins_day = time_format.format(date)
                  val n = Integer.parseInt(ins_day.substring(0, 4))
                  val f = Integer.parseInt(log_day.substring(0, 4))
                  if (f - n > 2) {
                    install_json.put("date", receive_day)
                  } else {
                    install_json.put("date", ins_day)
                  }

                } else {
                  install_json.put("date", receive_day)
                }
                var package_namejson = package_name
                if (StringUtils.isNotBlank(label) && sdk_version>="MAL_14") {
                  if ("ni".equalsIgnoreCase(label)) {
                    package_namejson = package_name + ".notinstall"
                  }
                  if ("if".equalsIgnoreCase(label)) {
                    package_namejson = package_name + ".delete"
                  }
                }
                install_json.put("package_name", package_namejson)
                install_json.put("label", label)
                install_json.put("campaignid", campaignid)
                var vc = ""
                if (obj.get("vc") != null && obj.getString("vc").nonEmpty) {
                  vc = obj.get("vc").toString()
                }
                install_json.put("vc", vc)
                package_array.add(install_json)
              }
            }
            version = 1
          } else {
            for (a <- 0 to install_array.size() - 1) {
              var obj = install_array.getJSONObject(a)
              val package_name = obj.get("packageName").toString()
              var label = ""
              if (obj.get("label") != null) {
                label = obj.get("label").toString()
              }
              var campaignid = ""
              var iex = 0
              if (obj.get("iex") != null) {
                iex = obj.getInteger("iex")
              }
              if (obj.get("campaignid") != null) {
                campaignid = obj.get("campaignid").toString()
              }
              if (iex == 1) {
                val install_json = new JSONObject()
                if (obj.get("ts") != null && obj.getString("ts").nonEmpty && obj.getString("ts").length() > 4) {
                  val ts = obj.getLong("ts")
                  val time_format = new SimpleDateFormat("yyyy-MM-dd")
                  val date = new Date()
                  date.setTime(ts)
                  val ins_day = time_format.format(date)
                  val n = Integer.parseInt(ins_day.substring(0, 4))
                  val f = Integer.parseInt(log_day.substring(0, 4))
                  if (f - n > 2) {
                    install_json.put("date", receive_day)
                  } else {
                    install_json.put("date", ins_day)
                  }

                } else {
                  install_json.put("date", receive_day)
                }
                var package_namejson = package_name
                if (StringUtils.isNotBlank(label) && sdk_version>="MAL_14") {
                  if ("ni".equalsIgnoreCase(label)) {
                    package_namejson = package_name + ".notinstall"
                  }
                  if ("if".equalsIgnoreCase(label)) {
                    package_namejson = package_name + ".delete"
                  }
                }
                install_json.put("package_name", package_namejson)
                install_json.put("label", label)
                install_json.put("campaignid", campaignid)
                var vc = ""
                if (obj.get("vc") != null && obj.getString("vc").nonEmpty) {
                  vc = obj.get("vc").toString()
                }
                install_json.put("vc", vc)
                package_array.add(install_json)
              }
            }
            version = 0
          }
        }
      }
    } catch {
      case _: com.alibaba.fastjson.JSONException => {}
      case e: Exception => {
        e.printStackTrace()
      }
      // e.printStackTrace()
    } finally {
    }

    Row(
      row(0),
      device_type,
      row(2),
      model,
      row(5),
      row(6),
      row(7),
      package_array.toString,
      version
    )
  }

  def parseKey2000047Data(row: Row, log_day: String) = {
    //    t1.device_id,t1.device_type,t1.platform,t1.data,t1.model,t1.country,t1.sdk_version,t1.brand,t3.id,t3.packagename
    val device_type = row(1)
    val model = row(4)
    val sdk_version = row.getString(6)
    val campaignid = row.getString(8)
    val package_name = row.getString(9)
    val package_array = new JSONArray()
    var version = 1 //  version = 0 表示默认 入adn_sdk 分区,1 表示 key=2000079 或者 key=2000047 入adn_sdk_v2分区
    try {
        val receive_day = log_day.substring(0, 4) + "-" + log_day.substring(4, 6) + "-" + log_day.substring(6, 8)
        var label = ""
        val install_json = new JSONObject()
        install_json.put("date", receive_day)
        var package_namejson = package_name
        if (StringUtils.isNotBlank(label) && sdk_version>="MAL_14") {
             if ("ni".equalsIgnoreCase(label)) {
                    package_namejson = package_name + ".notinstall"
             }
             if ("if".equalsIgnoreCase(label)) {
                 package_namejson = package_name + ".delete"
             }
        }
        install_json.put("package_name", package_namejson)
        install_json.put("label", label)
        install_json.put("campaignid", campaignid)
        var vc = ""
        install_json.put("vc", vc)
        package_array.add(install_json)
        version = 1
    } catch {
      case e: Exception => {
        e.printStackTrace()
      }
    } finally {
    }

    Row(
      row(0),
      device_type,
      row(2),
      model,
      row(5),
      row(6),
      row(7),
      package_array.toString,
      version
    )
  }

}


case class campaignId_packageName(campaignId: String, packageName: String)