package mobvista.dmp.datasource.dm

import java.net.URI

import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel


class YoukuPhoneWaxNobid extends CommonSparkJob with Serializable {
  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("output01", true, "[must] output01")
    options.addOption("output02", true, "[must] output02")
    options.addOption("begin_day", true, "[must] begin_day")
    options.addOption("end_day", true, "[must] end_day")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val coalesce = commandLine.getOptionValue("coalesce")
    val output01 = commandLine.getOptionValue("output01")
    val output02 = commandLine.getOptionValue("output02")
    val begin_day = commandLine.getOptionValue("begin_day")
    val end_day = commandLine.getOptionValue("end_day")

    val spark = SparkSession.builder()
      .appName("YoukuPhoneWaxNobid")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    val sc = spark.sparkContext
    import spark.implicits._

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true)

    try {
      val sql=
        s"""
           |select
           |case when split(devids,',')[5]!='' then split(devids,',')[5]
           |when split(devids,',')[13]!='' then split(devids,',')[13] else split(devids,',')[7] end as devid,
           |case when split(devids,',')[5]!='' then 'imeimd5'
           |when split(devids,',')[13]!='' then 'oaidmd5' else 'androidid' end as device_type
           |from adn_dsp.log_adn_dsp_retarget_request_hour_v2
           |where concat(yr,mt,dt) >= '${begin_day}' and concat(yr,mt,dt) <= '${end_day}'
           |and countrycode='CN'
           |and os=1
           |and exchanges='wax'
           |and `describe` like '%rank%'
           |group by case when split(devids,',')[5]!='' then 'imeimd5'
           |when split(devids,',')[13]!='' then 'oaidmd5' else 'androidid' end ,
           |case when split(devids,',')[5]!='' then split(devids,',')[5]
           |when split(devids,',')[13]!='' then split(devids,',')[13] else split(devids,',')[7] end
        """.stripMargin


      val df01: DataFrame = spark.sql(sql).persist(StorageLevel.MEMORY_AND_DISK_SER)

      df01.withColumn("platform", lit("android")).withColumn("package_name", lit("[\"com.youku.phone_wax_nobid\"]")).coalesce(coalesce.toInt)
        .rdd.map(_.mkString("\t")).saveAsTextFile(output01)

      df01.withColumn("platform", lit("android")).withColumn("country", lit("CN")).coalesce(coalesce.toInt)
        .rdd.map(_.mkString("\t")).saveAsTextFile(output02)


      val sql1=
        s"""
           |select
           |case when split(devids,',')[5]!='' then split(devids,',')[5]
           |when split(devids,',')[13]!='' then split(devids,',')[13] else split(devids,',')[7] end as devid,
           |case when split(devids,',')[5]!='' then 'imeimd5'
           |when split(devids,',')[13]!='' then 'oaidmd5' else 'androidid' end as device_type,
           |'android' as platform,
           |'["com.youku.phone_wax_nobid"]' as package_name
           |from adn_dsp.log_adn_dsp_retarget_request_hour_v2
           |where concat(yr,mt,dt) >= '${begin_day}' and concat(yr,mt,dt) <= '${end_day}'
           |and countrycode='CN'
           |and os=1
           |and exchanges='wax'
           |and `describe` like '%rank%'
           |group by case when split(devids,',')[5]!='' then 'imeimd5'
           |when split(devids,',')[13]!='' then 'oaidmd5' else 'androidid' end ,
           |case when split(devids,',')[5]!='' then split(devids,',')[5]
           |when split(devids,',')[13]!='' then split(devids,',')[13] else split(devids,',')[7] end
        """.stripMargin

      //      spark.sql(sql1).coalesce(coalesce.toInt).rdd.map(_.mkString("\t")).saveAsTextFile(output01)

      val sql2=
        s"""
           |select
           |case when split(devids,',')[5]!='' then split(devids,',')[5]
           |when split(devids,',')[13]!='' then split(devids,',')[13] else split(devids,',')[7] end as devid,
           |case when split(devids,',')[5]!='' then 'imeimd5'
           |when split(devids,',')[13]!='' then 'oaidmd5' else 'androidid' end as device_type,
           |'android' as platform,
           |'CN' as country
           |from adn_dsp.log_adn_dsp_retarget_request_hour_v2
           |where concat(yr,mt,dt) >= '${begin_day}' and concat(yr,mt,dt) <= '${end_day}'
           |and countrycode='CN'
           |and os=1
           |and exchanges='wax'
           |and `describe` like '%rank%'
           |group by case when split(devids,',')[5]!='' then 'imeimd5'
           |when split(devids,',')[13]!='' then 'oaidmd5' else 'androidid' end ,
           |case when split(devids,',')[5]!='' then split(devids,',')[5]
           |when split(devids,',')[13]!='' then split(devids,',')[13] else split(devids,',')[7] end
        """.stripMargin

//      spark.sql(sql2).coalesce(coalesce.toInt).rdd.map(_.mkString("\t")).saveAsTextFile(output02)

    } finally {
      spark.stop()
    }
    0
  }
}

object YoukuPhoneWaxNobid {
  def main(args: Array[String]): Unit = {
    new YoukuPhoneWaxNobid().run(args)
  }
}