package mobvista.dmp.datasource.adn_request_other

import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ArrayBuffer

/**
 *
 */
class EtlAdnRequestOtherDaily extends CommonSparkJob with Serializable {

  override protected def run(args: Array[String]): Int = {

    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return 1
    } else {
      printOptions(commandLine)
    }

    val input = commandLine.getOptionValue("input")
    val output = commandLine.getOptionValue("output")

    val spark = SparkSession
      .builder()
      .appName("EtlAdnRequestOtherDaily")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.autoBroadcastJoinThreshold", "209715200")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()

    import spark.implicits._
    val sc = spark.sparkContext
    try {
      //  sc.newAPIHadoopFile[LongWritable, Text, CombineTextInputFormat](input)
      sc.textFile(input)
        .map(line => line)
        .map(splitFun)
        .filter(filterData)
        .map(array => {
          val gaid = array(15)
          val idfa = array(16)
          val imei = array(12)
          val platform = array(4)
          val android = array(14)
          val extPackageName = array(19)

          val buffer = new ArrayBuffer[(String, String, String, String)]()
          if ("android".equals(platform)) {
            if (StringUtils.isNotBlank(gaid)) {
              buffer += ((gaid, "gaid", platform, extPackageName))
            }
            if (StringUtils.isNotBlank(imei)) {
              buffer += ((imei, "imei", platform, extPackageName))
            }
            if (StringUtils.isNotBlank(android)) {
              buffer += ((android, "androidid", "android", extPackageName))
            }
          } else {
            buffer += ((idfa, "idfa", platform, extPackageName))
          }
          buffer.toArray
        })
        .flatMap(arr => arr.map(tuple => tuple))
        .toDF("device_id", "device_type", "platform", "package_name")
        .createOrReplaceTempView("t_daily")

      val sql =
        """
          |select t.device_id, t.device_type, t.platform, t.package_name
          |from t_daily t
          |where length(t.package_name) >= 4
          |group by t.device_id, t.device_type, t.platform, t.package_name
        """.stripMargin
      spark.sql(sql)
        .write
        .option("orc.compress", "zlib")
        .orc(output)
    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  /**
   *
   * @param array
   * @return
   */
  def filterData(array: Array[String]): Boolean = {
    if (array.length <= 20) {
      return false
    }
    val gaid = array(15)
    val idfa = array(16)
    val imei = array(12)
    val platform = array(4)
    val extPackageName = array(19)

    // 过滤掉gaid和idfa全为空的情况
    if (StringUtils.isBlank(gaid) && StringUtils.isBlank(idfa) && StringUtils.isBlank(imei)) {
      return false
    }

    // 过滤掉gaid、idfa和platform格式错误
    if ("android".equals(platform)) {
      if (!gaid.matches(didPtn) || gaid.equals(allZero) || !imei.matches(imeiPtn)) {
        return false
      }
    } else if ("ios".equals(platform)) {
      if (!idfa.matches(didPtn) || idfa.equals(allZero)) {
        return false
      }
    } else {
      return false
    }

    // 过滤掉extPackageName为空
    if (StringUtils.isBlank(extPackageName)) {
      return false
    }
    true
  }
}

object EtlAdnRequestOtherDaily {
  def main(args: Array[String]): Unit = {
    new EtlAdnRequestOtherDaily().run(args)
  }
}