package mobvista.dmp.datasource.mpsdk

import java.net.{URI, URLDecoder}
import java.util

import mobvista.dmp.common.{CommonSparkJob, UrlParser}
import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.cli.Options
import org.apache.commons.lang.StringUtils
import org.apache.commons.net.util.Base64
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}

import scala.collection.mutable.ArrayBuffer

/**
  * 解析MPSDK数据
  * 1、MPSDK数据均为安卓平台数据
  * 2、只要带lfq参数的数据
  * 3、adid参数值为gaid,mc参数值为AndroidId(base64转码),当adid为空时使用mc与现有对应关系匹配或得gaid,
  * 如果匹配到gaid,则该数据被抛弃
  */
class ParseMPSDKDaily extends CommonSparkJob with Serializable {

  val dataSplit = " "
  val encode = "UTF-8"
  val httpPrefix = "http://test.com"

  def run(args: Array[String]): Int = {

    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return 1
    } else {
      printOptions(commandLine)
    }

    val input = commandLine.getOptionValue("input")
    val output = commandLine.getOptionValue("output")
    val mappingPath = commandLine.getOptionValue("mappingPath")
    val parallelism = commandLine.getOptionValue("parallelism").toInt
    val coalesce = commandLine.getOptionValue("coalesce").toInt

    // 构建hbase解码字典
    val dict = buildDict()

    val spark = SparkSession.builder()
      .appName("ParseMPSDK")
      .config("spark.rdd.compress", "true")
      .config("spark.speculation", "true")
      .config("spark.speculation.quantile", "0.9")
      .config("spark.speculation.multiplier", "1")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .getOrCreate()

    import spark.implicits._
    val sc = spark.sparkContext

    try {
      val dailyRDD = sc.textFile(input)
        .filter(record => record.contains("lfq"))
        .map(splitFun(_, dataSplit)(6))
        .map(parseData(_, dict))
        .map(array => MpSDKVO(array(0), array(1), array(2), array(3), array(4), array(5), array(6)))
        .toDF()
        .createOrReplaceTempView("tmp_sdk_daily")

      // 过滤id_mapping日志,获得(adrid, gaid)形式
      sc.textFile(mappingPath)
          .map(record => {
          val json = GsonUtil.String2JsonObject(record)
          val gaid = json.get("gaid").getAsString
          val adrId = json.get("android_id").getAsString
          val dmpTime = json.get("dmp_time").getAsString
          (gaid, adrId, dmpTime)
        })
        .toDF("gaid", "android_id", "dmp_time")
        .createOrReplaceTempView("tmp_mapping")

      var sql =
        """
          |select t.gaid, t.android_id
          |from (
          |  select t.*, row_number() over(partition by t.android_id order by t.dmp_time desc) as rk
          |  from tmp_mapping t
          |  where t.gaid <> '0' and t.android_id <> '0'
          |) t
          |where t.rk = 1
        """.stripMargin
      spark.sql(sql)
        .createOrReplaceTempView("tmp_gaid_aid_mapping")


      sql =
        """
          |select *
          |from (
          |  select a.gaid as device_id, b.android_id, b.appid, b.brand, b.model, b.package_list, b.app_pname
          |  from tmp_gaid_aid_mapping a
          |  join (
          |    select t.device_id, t.android_id, t.appid, t.brand, t.model, t.package_list, t.app_pname
          |    from tmp_sdk_daily t
          |    where t.device_id not rlike '^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$'
          |     and t.android_id is not null and t.android_id <> ''
          |     and t.package_list like '[%'
          |  ) b on a.android_id=b.android_id
          |
          |  union all
          |
          |  select t.device_id, t.android_id, t.appid, t.brand, t.model, t.package_list, t.app_pname
          |  from tmp_sdk_daily t
          |  where t.device_id rlike '^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$'
          |    and t.package_list like '[%'
          |) t
          |where t.device_id rlike '^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$'
        """.stripMargin
      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
      spark.sql(sql)
        .repartition(coalesce)
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(output)

    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }


  def buildDict(): util.HashMap[Character, Character] = {
    // 初始化base64字典
    val word = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/="
    val secret = "vSoajc7dRzpWifGyNxZnV5k+DHLYhJ46lt0U3QrgEuq8sw/XMeBAT2Fb9P1OIKmC="
    val dict = new util.HashMap[Character, Character]
    for (i <- 0 until word.length) {
      dict.put(secret.charAt(i), word.charAt(i))
    }
    dict
  }


  /**
    * 按照字典解密mc
    *
    * @param mc
    * @param dict
    * @return
    */
  def decodeMC(mc: String, dict: util.Map[Character, Character]): String = {
    val array = mc.toCharArray
    try {
      for (i <- 0 until array.length()) {
        array(i) = dict.get(array(i))
      }
    } catch {
      case e: NullPointerException => {
        throw new RuntimeException("dict = null" + (dict == null) + "\n mc = " + mc)
      }
    }
    val decode = String.valueOf(array)
    return new String(Base64.decodeBase64(decode))
  }

  /**
    * 过滤gaid和adrid都合法的数据
    *
    * @param record
    * @return
    */
  def filterMappingData(record: String): Boolean = {
    val json = GsonUtil.String2JsonObject(record)
    val jsonElement = json.get("gaid")
    val adrJsonElement = json.get("android_id")
    if (jsonElement != null && adrJsonElement != null) {
      val gaid = jsonElement.getAsString
      val adrId = adrJsonElement.getAsString
      if (StringUtils.isNotEmpty(gaid) && !gaid.matches("^0+$")
        && StringUtils.isNotEmpty(adrId) && !adrId.matches("^0+$")) {
        return true
      }
    }
    false
  }

  /**
    * 解析URL参数对应的值
    *
    * @param record
    * @return
    */
  def parseData(record: String, dict: util.Map[Character, Character]): Array[String] = {
    val urlParser = new UrlParser
    val url = s"$httpPrefix/$record"
    val buffer = new ArrayBuffer[String]
    val params = Array("adid", "mc", "appid", "brand", "model", "lfq", "app_pname")
    params.foreach(param => {
      var paramValue = urlParser.evaluate(url, QUERY, param)
      if (StringUtils.isNotEmpty(paramValue)) {
        try {
          if (paramValue.contains("%")) {
            paramValue = URLDecoder.decode(paramValue, encode)
          }
          if ("mc".equals(param) || "lfq".equals(param)) {
            buffer += decodeMC(paramValue, dict)
          } else {
            buffer += paramValue
          }
        } catch {
          case e: Exception => {
            buffer += ""
          }
        }
      } else {
        buffer += ""
      }
    })
    buffer.toArray
  }

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("input", true, "[must] input path")
    options.addOption("mappingPath", true, "[must] gaid and androidId mapping data path")
    options.addOption("output", true, "[must] output path")
    options.addOption("parallelism", true, "parallelism of shuffle operation")
    options.addOption("coalesce", true, "number of output files")
    options
  }
}

case class MpSDKVO(device_id: String, android_id: String, appid: String, brand: String,
                   model: String, package_list: String, app_pname: String)

object ParseMPSDKDaily {
  def main(args: Array[String]): Unit = {
    new ParseMPSDKDaily().run(args)
  }
}