package mobvista.dmp.datasource.taobao

import java.net.URI
import java.security.MessageDigest

import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.format.TextMultipleOutputFormat
import org.apache.commons.cli.Options
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.Text
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{Row, SaveMode, SparkSession}

import scala.collection.mutable.ArrayBuffer


class EtlAliActivitionPostBackDaily extends CommonSparkJob with Serializable {


  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("output", true, "[must] output")
    options.addOption("iosoutput", true, "[must] iosoutput")
    options.addOption("oaidoutput", true, "[must] oaidoutput")
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("update_date", true, "[must] update_date")
    options.addOption("today", true, "[must] today")
    options.addOption("dt_taobao_postback_day", true, "[must] dt_taobao_postback_day")
    options.addOption("dt_dash_rec15day", true, "[must] dt_dash_rec15day")
    options.addOption("syn_to_3s", true, "[must] syn_to_3s")
    options.addOption("syn_3s_day", true, "[must] syn_3s_day")
    options
  }

  def buildRes(channelIds: Broadcast[Map[String, String]], syn_3s_day: String, sys_to_3s: String, row: Row): Array[Tuple2[Text, Text]] = {
    val buffer = new ArrayBuffer[Tuple2[Text, Text]]()
    var device_id = row.getAs[String]("device_id")
    val device_type = row.getAs[String]("device_type")
    val filename = row.getAs[String]("filename") // 判断拉新 拉活,以及安装包序列
    var outputpath = ""
    //   filename 举例
    //    val filename = "s3://mob-emr-test/adn/sync_srv/2021/05/27/1103/result_cuhuo_303907.20210527__0764c0f3bcb04a839c5b14d31715e93f.txt.gz"
    //    val filename = "s3://mob-emr-test/adn/sync_srv/2021/05/27/511/result_cuhuo_260935.20210527__40688f0306024a67aa5fe01796aef252.txt.gz"

    if (filename.contains("result_cuhuo") || filename.contains("result_acc")) { // 原有的拉活    新增的acc类型
      var seqNoPre = ""
      val oaidNubers = Array("1105","473","490","495","498","500","501","504","506","507","510","648","654","781","784")
//      如果是oaid类型数据
      if ( !oaidNubers.indexOf(filename.split("/")(8)).equals(-1) ) {
        seqNoPre = filename.split("_")(3).substring(0,6) + "_oaid"
      } else {
        seqNoPre = filename.split("_")(3).substring(0,6)
      }
      //      if(filename.contains("result_acc") ){
      //        if(filename.contains("ios")){
      //          seqNoPre=filename.substring(115,121)
      //        }else if(filename.contains("oaid")){
      //          seqNoPre=filename.substring(116,122) + "_oaid"
      //        }else{
      //          seqNoPre=filename.substring(111,117)
      //        }
      //      }else{
      //        if(filename.contains("ios")){
      //          seqNoPre=filename.substring(117,123)
      //        }else if(filename.contains("oaid")){
      //          seqNoPre=filename.substring(118,124) + "_oaid"
      //        }else{
      //          seqNoPre=filename.substring(113,119)
      //        }
      //      }

      var seqNo = "0"
      if (channelIds.value.contains(seqNoPre)) {
        seqNo = channelIds.value.get(seqNoPre).get
      }

      if (device_type == "imei") {
        device_id = hashMD5(device_id)
        outputpath = s"${syn_3s_day}_H_${seqNo}_imeimd5"
      } else if (device_type == "oaid") {
        device_id = hashMD5(device_id)
        outputpath = s"${syn_3s_day}_H_${seqNo}_oaidmd5"
      } else if (device_type == "idfa") {
        device_id = hashMD5(device_id)
        outputpath = s"${syn_3s_day}_H_${seqNo}_idfamd5"
      } else {
        outputpath = s"${syn_3s_day}_H_${seqNo}_${device_type}"
      }
    } else { //拉新
      if (device_type == "imei") {
        device_id = hashMD5(device_id)
        outputpath = s"${syn_3s_day}_X_imeimd5"
      } else if (device_type == "oaid") {
        device_id = hashMD5(device_id)
        outputpath = s"${syn_3s_day}_X_oaidmd5"
      }
      else if (device_type == "idfa") {
        device_id = hashMD5(device_id)
        outputpath = s"${syn_3s_day}_X_idfamd5"
      } else {
        outputpath = s"${syn_3s_day}_X_${device_type}"
      }
    }

    if (StringUtils.isNotBlank(outputpath)) {
      buffer += Tuple2(new Text(s"${outputpath}, "), new Text(device_id))
    }
    buffer.toArray
  }

  def hashMD5(content: String): String = {
    val md5 = MessageDigest.getInstance("MD5")
    val encoded = md5.digest((content).getBytes)
    encoded.map("%02x".format(_)).mkString
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val output = commandLine.getOptionValue("output")
    val iosoutput = commandLine.getOptionValue("iosoutput")
    val oaidoutput = commandLine.getOptionValue("oaidoutput")
    val coalesce = commandLine.getOptionValue("coalesce")
    val update_date = commandLine.getOptionValue("update_date")
    val today = commandLine.getOptionValue("today")
    val dt_taobao_postback_day = commandLine.getOptionValue("dt_taobao_postback_day")
    val dt_dash_rec15day = commandLine.getOptionValue("dt_dash_rec15day")
    val syn_to_3s = commandLine.getOptionValue("syn_to_3s")
    val syn_3s_day = commandLine.getOptionValue("syn_3s_day")

    val spark = SparkSession.builder()
      .appName("EtlAliActivitionPostBackDaily")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(syn_to_3s), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(iosoutput), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(oaidoutput), true)


    val sc = spark.sparkContext
    try {
      val channel_ids_map = Map("149655" -> "0",
        "149647" -> "1",
        "149649" -> "2",
        "149650" -> "3",
        "149652" -> "4",
        "149654" -> "5",
        "149656" -> "6",
        "172393" -> "7",
        "184147" -> "8",
        "184287" -> "10",
        "184289" -> "11",
        "188844" -> "12",
        "149653" -> "13",
        "219343" -> "14",
        "219809" -> "15",
        "204543" -> "16",
        "227229" -> "17",
        "227229_oaid" -> "20",
        "204543_oaid" -> "21",
        "241357" -> "33",
        "241357_oaid" -> "34",
        "249582" -> "44",
        "249582_oaid" -> "45",
        "252594" -> "46",
        "252594_oaid" -> "47",
        "251827" -> "48",
        "251827_oaid" -> "49",
        "254029" -> "50",
        "254029_oaid" -> "51",
        "256294" -> "52",
        "256294_oaid" -> "53",
        "254944" -> "54",
        "254944_oaid" -> "55",
        "257904" -> "56",
        "257904_oaid" -> "57",
        "254343" -> "58",
        "254343_oaid" -> "59",
        "260935" -> "60",
        "260935_oaid" -> "61",
        "260951" -> "62",
        "260951_oaid" -> "63",
        "261865" -> "64",
        "261865_oaid" -> "65",
        "159702" -> "66",
        "159702_oaid" -> "67",
        "303907" -> "70",
        "303907_oaid" -> "71")

//       H_68、H_69为新增UC拉活的oaidmd5类型伪包名com.uc.foractivation.4b5a58(由于uc拉活对接方策略更改,此包以前是imeimd5的拉活包,现在变为了oaidmd5的)、com.uc.foractivation.aff149,分别对应UC_4_HASH_OAID 、 UC_5_HASH_OAID

      val channelIds = sc.broadcast(channel_ids_map)
      //     自增id 所属设备类别
      //      imei  1103 479 483 485 487 488 491 493 494 497 499 502 508 514 515 577 580 647 653 776 777 782 783
      //      idfa  1104 480 481 482 484 486 489 492 496 503 505 509 511 512 513 578 646 652 778 779 780 785
      //      oaid  1105 473 490 495 498 500 501 504 506 507 510 648 654 781 784

      //今天要推送的数据   1425793208|2020-05-06,1486121050|2020-05-16,1474992087|2020-05-10,1475887491|2020-04-25,1480638758|2020-05-15,1488312219|2020-05-10
      val sql1 =
        s"""
           |select
           |case when t1.device_type ='imei' then t1.device_id  when t1.device_type ='imeimd5' then  t1.device_id_md5  when t2.dev_id is not null then t2.dev_id end as device_id,
           |case when t1.device_type is null then 'imeimd5' else t1.device_type end as device_type,
           |'android' platform,
           |case when t2.dev_id is null then 'com.taobao.notforactivation' else concat('com.taobao.taobao,com.taobao.foractivation,',t2.package_name) end as packagename,
           |'CN' country,
           |'ali_activation' ali_type
           |from
           |( select device_id_md5,device_id,device_type,'android' platform,'CN' country
           |  from dwh.ali_user_activation_rec15days
           |    where dt='${today}'  and update_date >= '${dt_dash_rec15day}'
           |) t1 full outer join
           |( select X.dev_id,concat_ws(',', collect_set(X.channel_id)) package_name
           |from (select dev_id,case when input_file_name() like '%result_cuhuo%' then concat('com.taobao.foractivation.',substr(split( input_file_name(), '/')[9],14,6))  when input_file_name() like '%result_acc%' then concat('com.taobao.foractivation.',substr(split( input_file_name(), '/')[9],12,6))  end as channel_id
           |from dwh.ali_taobao_postback_activation_daily
           |where dt ='${dt_taobao_postback_day}' and (input_file_name() like '%result_cuhuo%' or input_file_name() like '%result_acc%') and split( input_file_name(), '/')[8] in ('1103','479','483','485','487','488','491','493','494','497','499','502','508','514','515','577','580','647','653','776','777','782','783') ) X
           |group by X.dev_id ) t2
           |on(t1.device_id_md5 = t2.dev_id)
           |union
           |select
           |case when t1.device_type ='imei' then t1.device_id  when t1.device_type ='imeimd5' then  t1.device_id_md5 when t2.dev_id is not null then t2.dev_id end as device_id,
           |case when t1.device_type is null then 'imeimd5' else t1.device_type end as device_type,
           |'android' platform,
           |case when t2.dev_id is null then 'com.taobao.notforacquisition' else 'com.taobao.foracquisition' end as packagename,
           |'CN' country,
           |'ali_acquisition' ali_type
           |from
           |( select device_id_md5,device_id,device_type,'android' platform,'CN' country
           |  from dwh.ali_user_activation_rec15days
           |    where dt='${today}'  and update_date >= '${dt_dash_rec15day}'
           |) t1 full outer join
           |(select dev_id
           |from dwh.ali_taobao_postback_activation_daily
           |where dt ='${dt_taobao_postback_day}' and input_file_name() like '%result_laxin%'   and split( input_file_name(), '/')[8] in ('1103','479','483','485','487','488','491','493','494','497','499','502','508','514','515','577','580','647','653','776','777','782','783')
           |group by dev_id) t2
           |on(t1.device_id_md5 = t2.dev_id)
        """.stripMargin

      spark.sql(sql1).coalesce(coalesce.toInt)
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .option("mapreduce.fileoutputcommitter.marksuccessfuljobs", false)
        .orc(output)


      val sql2 =
        s"""
           |select
           |case when t1.device_type ='idfa' then t1.device_id  when t1.device_type ='idfamd5' then  t1.device_id_md5 when t2.dev_id is not null then t2.dev_id end as device_id,
           |case when t1.device_type is null then 'idfamd5' else t1.device_type end as device_type,
           |'ios' platform,
           |case when t2.dev_id is null then '202005250004' else concat('387682726,202005250003,',t2.package_name) end as packagename,
           |'CN' country,
           |'ali_activation' ali_type
           |from
           |( select device_id_md5,device_id,device_type,'ios' platform,'CN' country
           |  from dwh.ali_ios_user_activation_rec15days
           |    where dt='${today}'  and update_date >= '${dt_dash_rec15day}'
           |) t1 full outer join
           |( select X.dev_id,concat_ws(',', collect_set(X.channel_id)) package_name
           |from (select dev_id,case when input_file_name() like '%result_cuhuo%' then concat('202005',substr(split( input_file_name(), '/')[9],14,6))  when input_file_name() like '%result_acc%' then concat('202005',substr(split( input_file_name(), '/')[9],12,6))  end as channel_id
           |from dwh.ali_taobao_postback_activation_daily
           |where dt ='${dt_taobao_postback_day}' and (input_file_name() like '%result_cuhuo%' or input_file_name() like '%result_acc%')  and split( input_file_name(), '/')[8] in ('1104','480','481','482','484','486','489','492','496','503','505','509','511','512','513','578','646','652','778','779','780','785')  ) X
           |group by X.dev_id ) t2
           |on(t1.device_id_md5 = t2.dev_id)
           |union
           |select
           |case when t1.device_type ='idfa' then t1.device_id  when t1.device_type ='idfamd5' then  t1.device_id_md5 when t2.dev_id is not null then t2.dev_id end as device_id,
           |case when t1.device_type is null then 'idfamd5' else t1.device_type end as device_type,
           |'ios' platform,
           |case when t2.dev_id is null then '202005250001' else '202005250000' end as packagename,
           |'CN' country,
           |'ali_acquisition' ali_type
           |from
           |( select device_id_md5,device_id,device_type,'ios' platform,'CN' country
           |  from dwh.ali_ios_user_activation_rec15days
           |    where dt='${today}'  and update_date >= '${dt_dash_rec15day}'
           |) t1 full outer join
           |(select dev_id
           |from dwh.ali_taobao_postback_activation_daily
           |where dt ='${dt_taobao_postback_day}' and input_file_name() like '%result_laxin%'  and split( input_file_name(), '/')[8] in ('1104','480','481','482','484','486','489','492','496','503','505','509','511','512','513','578','646','652','778','779','780','785')
           |group by dev_id) t2
           |on(t1.device_id_md5 = t2.dev_id)
        """.stripMargin

      spark.sql(sql2).coalesce(coalesce.toInt)
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .option("mapreduce.fileoutputcommitter.marksuccessfuljobs", false)
        .orc(iosoutput)


      /* val sql2 =
         s"""
            | select case when device_type ='imei' then device_id  when device_type ='imeimd5' then  device_id_md5 end as device_id,
            | device_type,
            | 'android' platform,
            | 'CN' country,
            |'' age,
            |'' gender,
            |'' tags,
            |'${update_date}' first_req_day,
            |'${update_date}' last_req_day
            |  from dwh.ali_user_activation_rec15days
            |    where dt='${today}'  and update_date >= '${dt_dash_rec15day}'
         """.stripMargin

       spark.sql(sql2).coalesce(coalesce.toInt)
         .write
         .mode(SaveMode.Overwrite)
         .option("orc.compress", "zlib")
         .option("mapreduce.fileoutputcommitter.marksuccessfuljobs", false)
         .orc(dmpuserinfo)*/

      val sql3 =
        s"""
           |select
           |case when t1.device_type ='oaid' then t1.device_id  when t1.device_type ='oaidmd5' then  t1.device_id_md5  when t2.dev_id is not null then t2.dev_id end as device_id,
           |case when t1.device_type is null then 'oaidmd5' else t1.device_type end as device_type,
           |'android' platform,
           |case when t2.dev_id is null then 'com.taobao.notforactivation' else concat('com.taobao.taobao,com.taobao.foractivation,',t2.package_name) end as packagename,
           |'CN' country,
           |'ali_activation' ali_type
           |from
           |( select device_id_md5,device_id,device_type,'android' platform,'CN' country
           |  from dwh.ali_oaid_user_activation_rec15days
           |    where dt='${today}'  and update_date >= '${dt_dash_rec15day}'
           |) t1 full outer join
           |( select X.dev_id,concat_ws(',', collect_set(X.channel_id)) package_name
           |from (select dev_id,case when input_file_name() like '%result_cuhuo%' then  concat('com.taobao.foractivation.',substr(split( input_file_name(), '/')[9],14,6),'_oaid')  when input_file_name() like '%result_acc%' then  concat('com.taobao.foractivation.',substr(split( input_file_name(), '/')[9],12,6),'_oaid')  end as channel_id
           |from dwh.ali_taobao_postback_activation_daily
           |where dt ='${dt_taobao_postback_day}' and (input_file_name() like '%result_cuhuo%' or input_file_name() like '%result_acc%') and split( input_file_name(), '/')[8] in ('1105','473','490','495','498','500','501','504','506','507','510','648','654','781','784') ) X
           |group by X.dev_id ) t2
           |on(t1.device_id_md5 = t2.dev_id)
           |union
           |select
           |case when t1.device_type ='oaid' then t1.device_id  when t1.device_type ='oaidmd5' then  t1.device_id_md5 when t2.dev_id is not null then t2.dev_id end as device_id,
           |case when t1.device_type is null then 'oaidmd5' else t1.device_type end as device_type,
           |'android' platform,
           |case when t2.dev_id is null then 'com.taobao.notforacquisition' else 'com.taobao.foracquisition' end as packagename,
           |'CN' country,
           |'ali_acquisition' ali_type
           |from
           |( select device_id_md5,device_id,device_type,'android' platform,'CN' country
           |  from dwh.ali_oaid_user_activation_rec15days
           |    where dt='${today}'  and update_date >= '${dt_dash_rec15day}'
           |) t1 full outer join
           |(select dev_id
           |from dwh.ali_taobao_postback_activation_daily
           |where dt ='${dt_taobao_postback_day}' and input_file_name() like '%result_laxin%'  and split( input_file_name(), '/')[8] in ('1105','473','490','495','498','500','501','504','506','507','510','648','654','781','784')
           |group by dev_id) t2
           |on(t1.device_id_md5 = t2.dev_id)
        """.stripMargin

      spark.sql(sql3).coalesce(coalesce.toInt)
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .option("mapreduce.fileoutputcommitter.marksuccessfuljobs", false)
        .orc(oaidoutput)

      var sql4 =
        s"""
           |select
           |case when t1.device_type ='imei' then t1.device_id  when t1.device_type ='imeimd5' then  t1.device_id_md5 when t2.dev_id is not null then t2.dev_id end as device_id,
           |case when t1.device_type is null then 'imeimd5' else t1.device_type end as device_type,
           |t2.filename
           |from
           |( select device_id_md5,device_id,device_type
           |  from dwh.ali_user_activation_rec15days
           |    where dt='${today}'  and update_date >= '${dt_dash_rec15day}'
           |) t1 right join
           |(select dev_id,input_file_name() filename
           |from dwh.ali_taobao_postback_activation_daily
           |where dt ='${dt_taobao_postback_day}'  and split( input_file_name(), '/')[8] in ('1103','479','483','485','487','488','491','493','494','497','499','502','508','514','515','577','580','647','653','776','777','782','783')
           |group by dev_id,input_file_name()) t2
           |on(t1.device_id_md5 = t2.dev_id)
           |union
           |select
           |case when t1.device_type ='idfa' then t1.device_id  when t1.device_type ='idfamd5' then  t1.device_id_md5 when t2.dev_id is not null then t2.dev_id end as device_id,
           |case when t1.device_type is null then 'idfamd5' else t1.device_type end as device_type,
           |t2.filename
           |from
           |( select device_id_md5,device_id,device_type
           |  from dwh.ali_ios_user_activation_rec15days
           |    where dt='${today}'  and update_date >= '${dt_dash_rec15day}'
           |) t1 right join
           |(select dev_id,input_file_name() filename
           |from dwh.ali_taobao_postback_activation_daily
           |where dt ='${dt_taobao_postback_day}' and split( input_file_name(), '/')[8] in ('1104','480','481','482','484','486','489','492','496','503','505','509','511','512','513','578','646','652','778','779','780','785')
           |group by dev_id,input_file_name()) t2
           |on(t1.device_id_md5 = t2.dev_id)
           |union
           |select
           |case when t1.device_type ='oaid' then t1.device_id  when t1.device_type ='oaidmd5' then  t1.device_id_md5 when t2.dev_id is not null then t2.dev_id end as device_id,
           |case when t1.device_type is null then 'oaidmd5' else t1.device_type end as device_type,
           |t2.filename
           |from
           |( select device_id_md5,device_id,device_type
           |  from dwh.ali_oaid_user_activation_rec15days
           |    where dt='${today}'  and update_date >= '${dt_dash_rec15day}'
           |) t1 right join
           |(select dev_id,input_file_name() filename
           |from dwh.ali_taobao_postback_activation_daily
           |where dt ='${dt_taobao_postback_day}' and split( input_file_name(), '/')[8] in ('1105','473','490','495','498','500','501','504','506','507','510','648','654','781','784')
           |group by dev_id,input_file_name()) t2
           |on(t1.device_id_md5 = t2.dev_id)
        """.stripMargin

      spark.sql(sql4).rdd.flatMap(buildRes(channelIds, syn_3s_day, syn_to_3s, _)).coalesce(100)
        .saveAsNewAPIHadoopFile(s"${syn_to_3s}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)


    } finally {
      spark.stop()
    }
    0
  }
}

object EtlAliActivitionPostBackDaily {
  def main(args: Array[String]): Unit = {
    new EtlAliActivitionPostBackDaily().run(args)
  }
}