package mobvista.dmp.datasource.reyun

import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.util.DateUtil
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession

import java.net.URI

/**
 * @author jiangfan
 * @date 2021/7/23 18:00
 */

class ReyunLabelBaijiu  extends CommonSparkJob with Serializable{
  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("output", true, "[must] output")
    options.addOption("dt_today", true, "[must] dt_today")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val coalesce = commandLine.getOptionValue("coalesce")
    val output = commandLine.getOptionValue("output")
    val dt_today = commandLine.getOptionValue("dt_today")

    val spark = SparkSession.builder()
      .appName("ReyunLabelBaijiu")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    val sc = spark.sparkContext
    import spark.implicits._

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)

    val one_day_ago = DateUtil.getDayByString(dt_today, "yyyyMMdd", -1)
    val two_days_ago = DateUtil.getDayByString(dt_today, "yyyyMMdd", -2)

    try {
      val sql1=
        s"""
           |select distinct device_id
           |from
           |(select md5(imei) as device_id
           |from
           |(select app_id,
           |(case when imei not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ')  then imei else null end) as imei,
           |(case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ')  then ext_oaid else null end ) as oaid
           |from
           |dwh.ods_adn_trackingnew_request
           |where concat(yyyy,mm,dd) in ('${dt_today}')
           |and country_code = 'CN'
           |and platform = 'android'
           |union all
           |select app_id,(case when imei not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ')  then imei else null end) as imei,
           |(case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ')  then ext_oaid else null end ) as oaid
           |from dwh.ods_adn_trackingnew_hb_request
           |where concat(yyyy,mm,dd)  in ('${dt_today}')
           |and country_code = 'CN'
           |and platform = 'android'
           |) as tmp
           |where  tmp.imei is not null  and tmp.app_id in (select appid from dwh.reyun_label_baijiu_filter_appid)
           |union all
           |select md5(oaid) as device_id
           |from
           |(select app_id,
           |(case when imei not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ')  then imei else null end) as imei,
           |(case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ')  then ext_oaid else null end ) as oaid
           |from
           |dwh.ods_adn_trackingnew_request
           |where concat(yyyy,mm,dd)  in ('${dt_today}')
           |and country_code = 'CN'
           |and platform = 'android'
           |union all
           |select app_id,(case when imei not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ')  then imei else null end) as imei,
           |(case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ')  then ext_oaid else null end ) as oaid
           |from dwh.ods_adn_trackingnew_hb_request
           |where concat(yyyy,mm,dd) in ('${dt_today}')
           |and country_code = 'CN'
           |and platform = 'android'
           |) as tmp
           |where  tmp.oaid is not null and tmp.app_id in (select appid from dwh.reyun_label_baijiu_filter_appid)
           |union all
           |select if(imeimd5 is null ,md5(imei),imeimd5)  as device_id
           |from
           |(select  split(ext5,',')[4] as imei,split(ext5,',')[5] as imeimd5
           |from adn_dsp.log_adn_dsp_request_orc_hour
           |where concat(yr,mt,dt)  in ('${dt_today}')  and hh >='00'  and hh<'06'
           |and countrycode = 'CN'
           |and os = 'android'
           |and exchanges='xunfei'
           |) adx
           |union all
           |select if(oaidmd5 is null ,md5(oaid),oaidmd5)  as device_id
           |from
           |(select  split(ext5,',')[12] as oaid ,split(ext5,',')[13] as oaidmd5
           |from adn_dsp.log_adn_dsp_request_orc_hour
           |where concat(yr,mt,dt)  in ('${dt_today}')  and hh >='00'  and hh<'06'
           |and countrycode = 'CN'
           |and os = 'android'
           |and exchanges='xunfei'
           |) adx
           |) tmp
           |""".stripMargin

      println("sql=============="+sql1)

      spark.sql(sql1).rdd.map(_.mkString).coalesce(coalesce.toInt).saveAsTextFile(output)

    } finally {
      spark.stop()
    }
    0
  }
}

object ReyunLabelBaijiu {
  def main(args: Array[String]): Unit = {
     new ReyunLabelBaijiu().run(args)
  }
}