package mobvista.dmp.test


import java.net.URI
import java.util

import mobvista.dmp.common.CommonSparkJob
import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession

import scala.collection.JavaConversions._

class ParseReport extends CommonSparkJob with Serializable {
  val split = ","
  val iosPkgRegex = "^[0-9]+$"


  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
    } else {
      printOptions(commandLine)
    }


    val input = commandLine.getOptionValue("input")
    val output = commandLine.getOptionValue("output")
    val parallelism = commandLine.getOptionValue("parallelism").toInt
    val coalesce = commandLine.getOptionValue("coalesce").toInt

    val spark = SparkSession
      .builder()
      .appName("IdMappingJob")
      .config("spark.rdd.compress", "true")
      .config("spark.default.parallelism", s"${parallelism}")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()
    val sc = spark.sparkContext
    FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output),true)
    try {
      sc.textFile(input)
        .map(splitFun(_, "\001"))
        .map(array => {
          val interest = array(4)
          if (interest.startsWith("[")) {
            val set = new util.HashSet[String]()
            GsonUtil.String2JsonArray(interest)
              .foreach(element => {
                element.getAsJsonObject
                  .get("tag")
                  .getAsJsonArray
                  .foreach(tagElement => {
                    val tagObj = tagElement.getAsJsonObject
                    val firstTag = if (tagObj.get("1") != null) tagObj.get("1").getAsString else ""
                    val secondTag = if (tagObj.get("2") != null) tagObj.get("2").getAsString else ""
                    set.add(s"${firstTag}->${secondTag}")
                  })
              })
            array(4) = set.mkString(split)
          }

          val deviceType = array(1)
          val installList = array(5)
          if (StringUtils.isNotEmpty(installList) && installList.startsWith("[")) {
            val set = new util.HashSet[String]()
            GsonUtil.String2JsonArray(installList)
                .foreach(element => {
                  val packageName = element.getAsJsonObject.get("package_name").getAsString
                  if (("idfa".equals(deviceType) && packageName.matches(iosPkgRegex))
                       || "gaid".equals(deviceType)) {
                    set.add(packageName)
                  }
                })
            array(5) = set.mkString(split)
          }
          array.mkString(DATA_SPLIT)
        })
        .saveAsTextFile(output)

    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }
}


object ParseReport {
  def main(args: Array[String]): Unit = {
    new ParseReport().run(args)
  }
}