package mobvista.dmp.datasource.clever

import java.net.{URI, URLDecoder}
import java.util

import mobvista.dmp.common.{CommonSparkJob, UrlParser}
import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.lang.StringUtils
import org.apache.commons.net.util.Base64
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession

import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer

class ParseCleverDaily extends CommonSparkJob with Serializable {

  override val DATA_SPLIT = " "
  val FIELD_SPLIT = "\t"

  val urlParser = new UrlParser

  var word = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=="
  var secret = "uVUoXc3pCnDFvb8lNJj9ZHEia7QYrfSmROkGKA0ehIdtzB64Mq2gP5syTL1wWx+/=="
  val pDict = new util.HashMap[Character, Character]
  for (i <- 0 until word.length) {
    pDict.put(secret.charAt(i), word.charAt(i))
  }

  word = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=="
  secret = "vSoajc7dRzpWifGyNxZnV5k+DHLYhJ46lt0U3QrgEuq8sw/XMeBAT2Fb9P1OIKmC=="
  val cleverDict = new util.HashMap[Character, Character]
  for (i <- 0 until word.length) {
    cleverDict.put(secret.charAt(i), word.charAt(i))
  }

  def run(args: Array[String]): Int = {
    var sc: SparkContext = null
    try {
      val commandLine = commParser.parse(options, args)
      if (!checkMustOption(commandLine)) {
        printOptions(commandLine)
        return 1
      } else {
        printOptions(commandLine)
      }

      val input = commandLine.getOptionValue("input")
      val output = commandLine.getOptionValue("output")
      val parallelism = commandLine.getOptionValue("parallelism").toInt
      val coalesce = commandLine.getOptionValue("coalesce").toInt

      val spark = SparkSession.builder()
        .appName("ParseCleverDaily")
        .config("spark.rdd.compress", "true")
        .config("spark.default.parallelism", s"$parallelism")
        .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .getOrCreate()
      import spark.implicits._
      sc = spark.sparkContext

      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

      val data = sc.textFile(input)
      val resultRDD = data.filter(record => record.contains("/openapi/clever?p="))
        .map(record => {
          // 获取参数P的值
          val splits = StringUtils.splitPreserveAllTokens(record, DATA_SPLIT, -1)
          var url = splits(6)
          url = s"$HTTPPREFIX/$url"
          urlParser.evaluate(url, QUERY, "p")
        })
        .filter(StringUtils.isNotEmpty(_))
        .map(pValue => {
          // 对P参数值进行Base64解码,并拼接成url
          val pDecode = decodeParams(pValue, pDict)
          s"$HTTPPREFIX/test?${pDecode}"
        })
        .filter(url => {
          // 1、过滤掉gaid或clever为null的数据
          // 2、过滤到clever解密后不是jsonArray的数据
          val gaid = urlParser.evaluate(url, QUERY, "gaid")
          val clever = urlParser.evaluate(url, QUERY, "clever")
          if (StringUtils.isNotEmpty(gaid) && StringUtils.isNotEmpty(clever)) {
            try {
              val cleverDecode = decodeParams(URLDecoder.decode(clever, ENCODING), cleverDict)
              if (cleverDecode.trim.startsWith("[") && cleverDecode.trim.endsWith("]")) {
                true
              } else {
                false
              }
            } catch {
              case e: Exception => {
                e.printStackTrace()
                false
              }
            }
          } else {
            false
          }
        })
        .flatMap(parseClever(_))
        .distinct()
        .coalesce(coalesce, true)
        .toDF()
        .write
        .format("ORC")
        .option("orc.compress", "zlib")
        .save(output)

    } finally {
      if (sc != null) {
        sc.stop()
      }
    }
    0
  }

  def parseClever(url: String): Array[CleverVO] = {
    val buffer = new ArrayBuffer[CleverVO]()
    val gaid = urlParser.evaluate(url, QUERY, "gaid")
    val model = urlParser.evaluate(url, QUERY, "model")
    val brand = urlParser.evaluate(url, QUERY, "brand")
    val clever = urlParser.evaluate(url, QUERY, "clever")

    val cleverDecode = decodeParams(URLDecoder.decode(clever, ENCODING), cleverDict)
    try {
      val jsonArray = GsonUtil.String2JsonArray(cleverDecode)
      jsonArray.foreach(element => {
        val jsonObject = element.getAsJsonObject
        val packageName = jsonObject.get("p").getAsString.replaceAll("[\\s\r\n]+", "")
        buffer += CleverVO(gaid, "gaid", "android", model, brand, packageName)
      })
    } catch {
      case e: Exception => {
        println("sdfsdfsdfs= " + cleverDecode)
        println("url= " + url)
        e.printStackTrace()
      }
    }
    buffer.toArray
  }

  def decodeParams(str: String, dict: util.Map[Character, Character]): String = {
    val array = str.toCharArray
    for (i <- 0 until array.length()) {
      try {
        array(i) = dict.get(array(i))
      } catch {
        case e: Exception => {
          println(str)
          println(i)
          e.printStackTrace()
        }
      }

    }
    val decode = String.valueOf(array)
    return new String(Base64.decodeBase64(decode))
  }
}

case class CleverVO(device_id: String, device_type: String, platform: String, model: String, brand: String, package_name: String) {
  override def hashCode() = {
    this.device_id.hashCode + this.device_type.hashCode
  }

  override def equals(obj: scala.Any): Boolean = {
    if (obj.isInstanceOf[CleverVO]) {
      val tmp = obj.asInstanceOf[CleverVO]
      return tmp.device_id.equals(this.device_id) && tmp.device_type.equals(this.device_type)
    }
    return false
  }
}

object ParseCleverDaily {
  def main(args: Array[String]): Unit = {
    new ParseCleverDaily().run(args)
  }
}