package mobvista.dmp.datasource.dm

import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.{concat_ws, lit}
import org.apache.spark.storage.StorageLevel

import java.net.URI

/**
 * @author jiangfan
 * @date 2021/11/4 14:50
 */
class Com3appXianjindai extends CommonSparkJob with Serializable{
  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("output01", true, "[must] output01")
    options.addOption("output02", true, "[must] output02")
    options.addOption("to_day", true, "[must] to_day")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val coalesce = commandLine.getOptionValue("coalesce")
    val output01 = commandLine.getOptionValue("output01")
    val output02 = commandLine.getOptionValue("output02")
    val to_day = commandLine.getOptionValue("to_day")

    val spark = SparkSession.builder()
      .appName("Com3appXianjindai")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    val conf = spark.sparkContext.hadoopConfiguration
    conf.set("mapreduce.output.compress", "true")
    conf.set("mapreduce.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
    conf.setBoolean("mapreduce.output.fileoutputformat.compress", true)
    conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
    conf.setClass("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec], classOf[CompressionCodec])

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true)

    try {

//需求原本是从adn_dsp.log_adn_dsp_request_orc_hour取数据,但这个表计算量大,发现dwh.etl_dsp_request_daily_hours 保存了adn_dsp.log_adn_dsp_request_orc_hour相关数据,读取该表,减小计算量
      val sql01=
        s"""
           |select
           |device_id
           |from
           |(
           |select
           |device_id, count(distinct t1.appid) as app_num
           |from
           |(
           |select appid, device_id
           |from
           |        (select
           |                 appid,device_id
           |              from
           |              (select
           |                   packagename,split(exitid,',')[0]  as  device_id
           |               FROM
           |                   dwh.etl_dsp_request_daily_hours
           |               where   dt= '${to_day}'
           |                     and country = 'MX' and platform = 'android'
           |               ) tmpdata lateral view explode(split(packagename, "#")) num  as appid
           |         )  t01
           |where device_id not in ('0','','-','00000000-0000-0000-0000-000000000000')  and device_id rlike '${didPtn}'
           |group by appid,device_id
           |) t1
           |group by device_id )t2
           |where app_num>=3
           |group by device_id
        """.stripMargin

      println("sql01=="+sql01)


      val gaid_df: DataFrame = spark.sql(sql01).persist(StorageLevel.MEMORY_AND_DISK_SER)
      val gaid_df_with_package_name = gaid_df.select(concat_ws("\t", gaid_df.col("device_id"),  lit("gaid"),  lit("android"),lit("[\"com.3app.xianjindai\"]")))
      val gaid_df_with_country = gaid_df.select(concat_ws("\t", gaid_df.col("device_id"),  lit("gaid"),  lit("android"),lit("MX")))


      gaid_df_with_package_name.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
      gaid_df_with_country.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)

    } finally {
      spark.stop()
    }
    0
  }
}

object Com3appXianjindai {
  def main(args: Array[String]): Unit = {
      new  Com3appXianjindai().run(args)
  }
}