package mobvista.dmp.datasource.postback_3s

import java.net.URI

import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}


class PostBackDaily extends CommonSparkJob {

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("output", true, "[must] output")
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("today", true, "[must] today")
    options.addOption("last_sunday", true, "[must] last_sunday")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val output = commandLine.getOptionValue("output")
    val coalesce = commandLine.getOptionValue("coalesce")
    val today = commandLine.getOptionValue("today")
    val last_sunday = commandLine.getOptionValue("last_sunday")


    val spark = SparkSession.builder()
      .appName("PostBackDaily")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)

    try {
      val sql1 =
        s"""
           |select UPPER(idfa) idfa,md5_idfa,LOWER(gaid) gaid,md5_gaid,lower(pl) platform,lower(app_id) package_name,country,install_time,cast(`date` as string) update_date,type  from dwh.ods_adn_trackingnew_postback_install where concat(yyyy,mm,dd) = '${today}' and type in ('appsflyer','min_appsflyer','tenjin','adjust','reyun') and app_id!="0"
           |union
           |select UPPER(idfa) idfa,md5_idfa,LOWER(gaid) gaid,md5_gaid,lower(pl) platform,lower(app_id) package_name,country,install_time,cast(`date` as string) update_date,type from dwh.ods_adn_trackingnew_postback_event  where  concat(yyyy,mm,dd) = '${today}' and type in ('appsflyer','min_appsflyer','tenjin','adjust','reyun') and app_id!="0"
        """.stripMargin
      spark.sql(sql1).createOrReplaceTempView("etl_3s_postback_daily")

      spark.sql("select coalesce(md5_idfa,md5_gaid) device_id_md5 from etl_3s_postback_daily group by coalesce(md5_idfa,md5_gaid)")
        .cache()
        .createOrReplaceTempView("3s_postback_device_md5")

      val sql2 =
        s"""
           |select  b.device_id,a.device_id_md5
           |from 3s_postback_device_md5 a join
           |(select * from dwh.device_id_md5_match where dt='${last_sunday}') b
           |on a.device_id_md5=b.device_id_md5
        """.stripMargin
      spark.sql(sql2).createOrReplaceTempView("etl_3s_postback_md5")

      val sql3 =
        """
          |select coalesce(a.device_id,b.device_id) device_id,
          |a.device_type,
          |a.platform,
          |a.package_name,
          |a.country,
          |a.install_time,
          |a.update_date,
          |a.type
          |from (
          |select case when platform = 'ios' and idfa is not null then idfa when  platform = 'android' and gaid is not null then gaid end as device_id,
          |case when platform = 'ios' and idfa is null then md5_idfa when  platform = 'android' and gaid is null then md5_gaid end as device_id_md5,
          |case when platform = 'ios'  then 'idfa' when platform = 'android'  then 'gaid' end as device_type,
          |platform,
          |case when instr(package_name,'id') = 1 then substr(package_name,3) else package_name end as package_name,
          |country,
          |install_time,
          |update_date,
          |type
          |from etl_3s_postback_daily ) a left join etl_3s_postback_md5 b on a.device_id_md5=b.device_id_md5
          |where a.device_id is not null and a.package_name is not null
        """.stripMargin
      spark.sql(sql3).createOrReplaceTempView("etl_3s_pre_daily")

      val sql4 =
        """
          |select device_id,
          |max(device_type) device_type,
          |max(platform) platform,
          |concat_ws('#',collect_set(package_name)) package_name,
          |max(country) country,
          |max(install_time) install_time,
          |max(update_date) update_date,
          |max(type) type
          |from etl_3s_pre_daily group by device_id
        """.stripMargin
      spark.sql(sql4).coalesce(coalesce.toInt)
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .option("mapreduce.fileoutputcommitter.marksuccessfuljobs", false)
        .orc(output)

    } finally {
      spark.stop()
    }
    0
  }
}

object PostBackDaily {
  def main(args: Array[String]): Unit = {
    new PostBackDaily().run(args)
  }
}