package mobvista.dmp.datasource.dm

import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.{concat_ws, lit}
import org.apache.spark.storage.StorageLevel

import java.net.URI

/**
 * @author jiangfan
 * @date 2021/10/15 16:22
 */

class ComToponTopltv1015 extends CommonSparkJob with Serializable {
  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("output01", true, "[must] output01")
    options.addOption("output02", true, "[must] output02")
    options.addOption("begin_day", true, "[must] begin_day")
    options.addOption("end_day01", true, "[must] end_day01")
    options.addOption("end_day02", true, "[must] end_day02")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val coalesce = commandLine.getOptionValue("coalesce")
    val output01 = commandLine.getOptionValue("output01")
    val output02 = commandLine.getOptionValue("output02")
    val begin_day = commandLine.getOptionValue("begin_day")
    val end_day01 = commandLine.getOptionValue("end_day01")
    val end_day02 = commandLine.getOptionValue("end_day02")

    val spark = SparkSession.builder()
      .appName("ComToponTopltv1015")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    val sc = spark.sparkContext
    import spark.implicits._

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true)

    try {

      val imei_sql=
        s"""
           |select device_id
           |from
           |(select device_id,row_number() over (partition by device_id order by LTV desc) as ranking
           |from
           |(
           |select sum(case when imp.imei not in ('0','NULL')  then bidprice else NULL end) as LTV,imp.imei as device_id
           |from
           |(select *,concat(yyyy,'-',mm,'-',dd) as day
           |from uparpu_main.uparpu_tk_impression_v2
           |where concat(yyyy,'-',mm,'-',dd) >= '${begin_day}'
           |and concat(yyyy,'-',mm,'-',dd) <= '${end_day02}'
           |) as imp
           |inner join
           |(select new_device_id,dt
           |from uparpu_main.uparpu_new_user
           |where dt >=  '${begin_day}'
           |and dt <=  '${end_day01}'
           |) as uu
           |on imp.upid = uu.new_device_id and datediff(imp.day,uu.dt)<7
           |group by imp.imei
           |) as t
           |) as t2
           |where ranking/(select count(device_id)
           |from
           |(
           |select sum(case when imp.imei not in ('0','NULL')  then bidprice else NULL end) as LTV,imp.imei as device_id
           |from
           |(select *,concat(yyyy,'-',mm,'-',dd)  as day
           |from uparpu_main.uparpu_tk_impression_v2
           |where concat(yyyy,'-',mm,'-',dd) >= '${begin_day}'
           |and concat(yyyy,'-',mm,'-',dd) <= '${end_day02}'
           |) as imp
           |inner join
           |(select new_device_id,dt
           |from uparpu_main.uparpu_new_user
           |where dt >=  '${begin_day}'
           |and dt <= '${end_day01}'
           |) as uu
           |on imp.upid = uu.new_device_id and datediff(imp.day,uu.dt)<7
           |group by imp.imei
           |) as t)<0.3
        """.stripMargin

      val oaid_sql =
        s"""
           |select device_id
           |from
           |(select device_id,row_number() over (partition by device_id order by LTV desc) as ranking
           |from
           |(
           |select sum(case when imp.oaid not in ('0','NULL')  then bidprice else NULL end) as LTV,imp.oaid as device_id
           |from
           |(select *,concat(yyyy,'-',mm,'-',dd) as day
           |from uparpu_main.uparpu_tk_impression_v2
           |where concat(yyyy,'-',mm,'-',dd) >= '${begin_day}'
           |and concat(yyyy,'-',mm,'-',dd) <= '${end_day02}'
           |) as imp
           |inner join
           |(select new_device_id,dt
           |from uparpu_main.uparpu_new_user
           |where dt >=  '${begin_day}'
           |and dt <=  '${end_day01}'
           |) as uu
           |on imp.upid = uu.new_device_id and datediff(imp.day,uu.dt)<7
           |group by imp.oaid
           |) as t
           |) as t2
           |where ranking/(select count(device_id)
           |from
           |(
           |select sum(case when imp.oaid not in ('0','NULL')  then bidprice else NULL end) as LTV,imp.oaid as device_id
           |from
           |(select *,concat(yyyy,'-',mm,'-',dd)  as day
           |from uparpu_main.uparpu_tk_impression_v2
           |where concat(yyyy,'-',mm,'-',dd) >= '${begin_day}'
           |and concat(yyyy,'-',mm,'-',dd) <= '${end_day02}'
           |) as imp
           |inner join
           |(select new_device_id,dt
           |from uparpu_main.uparpu_new_user
           |where dt >=  '${begin_day}'
           |and dt <= '${end_day01}'
           |) as uu
           |on imp.upid = uu.new_device_id and datediff(imp.day,uu.dt)<7
           |group by imp.oaid
           |) as t)<0.3
           |""".stripMargin
      println("imei_sql=="+imei_sql)
      println("oaid_sql=="+oaid_sql)

      val imei_df: DataFrame = spark.sql(imei_sql).persist(StorageLevel.MEMORY_AND_DISK_SER)
      val imei_df_with_package_name = imei_df.select(concat_ws("\t", imei_df.col("device_id"),  lit("imei"),  lit("android"),lit("[\"com.topon_topltv_1015\"]")))
      val imei_df_with_country = imei_df.select(concat_ws("\t", imei_df.col("device_id"),  lit("imei"),  lit("android"),lit("CN")))

      val oaid_df: DataFrame = spark.sql(oaid_sql).persist(StorageLevel.MEMORY_AND_DISK_SER)
      val oaid_df_with_package_name = oaid_df.select(concat_ws("\t", oaid_df.col("device_id"),  lit("oaid"),  lit("android"),lit("[\"com.topon_topltv_1015\"]")))
      val oaid_df_with_country = oaid_df.select(concat_ws("\t", oaid_df.col("device_id"),  lit("oaid"),  lit("android"),lit("CN")))

      imei_df_with_package_name.union(oaid_df_with_package_name).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
      imei_df_with_country.union(oaid_df_with_country).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)

    } finally {
      spark.stop()
    }
    0
  }
}

object ComToponTopltv1015 {
  def main(args: Array[String]): Unit = {
     new ComToponTopltv1015().run(args)
  }
}