package mobvista.dmp.datasource.bigmedia_domestic

import java.net.URI

import com.google.gson.JsonObject
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.datasource.age_gender.Constant
import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.cli.Options
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel

/**
 * author andy.liu on 2019/11/4
 */
class BigMediaDomestic extends CommonSparkJob {

  /**
   *
   * @return
   */
  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("bigmediainput", true, "[must] bigmediainput")
    options.addOption("outputdaily", true, "[must] outputdaily")
    options.addOption("outputgender", true, "[must] outputgender")
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("last_sunday", true, "[must] last_sunday")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val bigmediainput = commandLine.getOptionValue("bigmediainput")
    val outputdaily = commandLine.getOptionValue("outputdaily")
    val outputgender = commandLine.getOptionValue("outputgender")
    val coalesce = commandLine.getOptionValue("coalesce")
    val last_sunday = commandLine.getOptionValue("last_sunday")


    val spark = SparkSession.builder()
      .appName("BigMediaDomestic")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputdaily), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputgender), true)

    try {

      val inputsRDD = spark.sparkContext.textFile(bigmediainput).map(line => {
        val jsonObjLine = GsonUtil.String2JsonObject(line)

        def genValbyName(name: String, jo: JsonObject): String = {
          if (!jo.get(name).isJsonNull) {
            jo.get(name).getAsString
          } else {
            ""
          }
        }

        var device_id = ""
        var network = ""
        var uuid = ""
        var event_name = ""
        var event_value = ""
        var timestamp_date = ""
        var package_name = ""
        var gender: String = "NONE"
        var age_min = ""
        var age_max = ""
        var device_type: String = "idfa"
        var platform = "ios"

        try {
          device_id = genValbyName("device_id", jsonObjLine)
          network = genValbyName("network", jsonObjLine)
          uuid = genValbyName("uuid", jsonObjLine)
          event_name = genValbyName("event_name", jsonObjLine)
          event_value = genValbyName("event_value", jsonObjLine)
          timestamp_date = genValbyName("timestamp_date", jsonObjLine)
          package_name = genValbyName("package_name", jsonObjLine)
          val genders = genValbyName("genders", jsonObjLine)
          if (StringUtils.isNotBlank(genders) && genders.equalsIgnoreCase("GENDER_MALE")) {
            gender = "m"
          } else if (StringUtils.isNotBlank(genders) && genders.equalsIgnoreCase("GENDER_FEMALE")) {
            gender = "f"
          }
          age_min = genValbyName("age_min", jsonObjLine)
          age_max = genValbyName("age_max", jsonObjLine)

          val osArr = jsonObjLine.get("os").getAsJsonArray
          if (!osArr.isJsonNull && osArr.isJsonArray && osArr.size() > 0) {
            platform = osArr.get(0).getAsString.toLowerCase()
          }
          if (platform.equalsIgnoreCase("ios")) {
            device_type = "idfa"
          } else if (platform.equalsIgnoreCase("android")) {
            device_type = "imei"
          }
        } catch {
          case e: Exception => {
            e.printStackTrace()
          }
        }
        Row(device_id, device_type, platform, network, uuid, event_name, event_value, timestamp_date, package_name, gender, age_min, age_max)
      })

      spark.createDataFrame(inputsRDD, Constant.schema_bigmedia_domestic).createOrReplaceTempView("ods_bigmedia_domestic_tmp")

      val sql =
        s"""
          select /*+ mapjoin(t1)*/ t2.device_id device_id,
           |device_type,
           |platform,
           |max(network)  network,
           |max(uuid) uuid,
           |max(event_name) event_name,
           |max(event_value) event_value,
           |max(timestamp_date) timestamp_date,
           |max(package_name) package_name,
           |max(genders) genders,
           |min(age_min) age_min,
           |max(age_max) age_max,
           |'A' tag,
           |max(genders) label,
           |'bm' business
           |from ods_bigmedia_domestic_tmp t1 join ( select device_id,device_id_md5 from dwh.device_id_md5_match where dt='$last_sunday' ) t2
           |on (t1.device_id = t2.device_id_md5)
           |group by
           |t2.device_id,
           |device_type,
           |platform
        """.stripMargin

      val df = spark.sql(sql).coalesce(coalesce.toInt).persist(StorageLevel.MEMORY_AND_DISK_SER)

      df.select(
        col("device_id"), col("device_type"), col("platform"), col("network"), col("uuid"), col("event_name"), col("event_value"),
        col("timestamp_date"), col("package_name"), col("genders"), col("age_min"), col("age_max")
      ).write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(outputdaily)

      /*
      val sql2=
        s"""
         select /*+ mapjoin(t1)*/ t2.device_id,
           | 'A' tag,
           |max(genders) label,
           |'bm' business,
           |device_type
           |from ods_bigmedia_domestic_tmp t1 join ( select device_id,device_id_md5 from dwh.device_id_md5_match where dt='${last_sunday}' ) t2
           |on (t1.device_id = t2.device_id_md5)
           |group by
           |t2.device_id,
           |device_type,
           |platform
        """.stripMargin
      */
      df.select(
        col("device_id"), col("device_type"), col("tag"), col("label"), col("business")
      ).write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(outputgender)

    } finally {
      spark.stop()
    }
    0
  }
}

object BigMediaDomestic {
  def main(args: Array[String]): Unit = {
    new BigMediaDomestic().run(args)
  }
}