package mobvista.dmp.datasource.dsp

import com.fasterxml.jackson.databind.ObjectMapper
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.dsp.DspConstant.Value
import mobvista.dmp.datasource.dsp.mapreduce.SegmentVO
import mobvista.dmp.util.MD5Util
import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.sql.functions.{collect_set, max, udf}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

import java.net.URI
import java.util
import java.util.regex.Pattern
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer


class DspOrgEtlDailys extends CommonSparkJob with Serializable {

  private val iosPkgPtn = Pattern.compile("^\\d+$")
  private val adrPkgPtn = Pattern.compile("^[0-9a-zA-Z\\.]+$")

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)

    val input = commandLine.getOptionValue("input")
    val output_etl_dsp_request_hour = commandLine.getOptionValue("output")
    val parallelism = commandLine.getOptionValue("parallelism").toInt
    val coalesce = commandLine.getOptionValue("coalesce").toInt


    val spark = SparkSession
      .builder()
      .config("spark.rdd.compress", "true")
      .config("spark.default.parallelism", s"${parallelism}")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()
    val sc = spark.sparkContext


    FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output_etl_dsp_request_hour), true)


    import spark.implicits._

    try {

      val mergePkgUdf = udf((pkgsArrays: mutable.WrappedArray[String]) => {
        var res = "[]"
        val pkgSet: util.Set[String] = new util.HashSet[String]()
        if (pkgsArrays != null && pkgsArrays.size != 0) {
          for (pkgs <- pkgsArrays) {
            if (StringUtils.isNotBlank(pkgs)) {
              val pkgsname = pkgs.split("#", -1)
              for (pkgname <- pkgsname) {
                pkgSet.add(pkgname)
              }
            }
          }
        }

        if (pkgSet.size() != 0) {
          res = new ObjectMapper().writeValueAsString(pkgSet)
        }
        res
      })

      val mergeAdridIdUdf = udf((adrsArrays: mutable.WrappedArray[String]) => {
        var res = ""
        val adrSet: util.Set[String] = new util.HashSet[String]()
        if (adrsArrays != null && adrsArrays.size != 0) {
          for (adrs <- adrsArrays) {
            if (StringUtils.isNotBlank(adrs)) {
              adrSet.add(adrs)
            }
          }
        }
        if (adrSet.size() != 0) {
          res = adrSet.mkString(",")
        }
        res
      })


      val mergeUdf = udf((segmentArrays: mutable.WrappedArray[String]) => {
        var ret = ""
        val segmentMap: mutable.HashMap[String, SegmentVO] = new mutable.HashMap[String, SegmentVO]()

        if (segmentArrays != null && segmentArrays.size != 0) {
          for (segment <- segmentArrays) {
            if (StringUtils.isNotBlank(segment) && segment.startsWith("[") && segment.contains("{")) {
              val jsonArray = GsonUtil.String2JsonArray(segment.toString)
              for (json <- jsonArray) { //mergeUdf:segmentArray2>[{"name":"carrier","value":"verified"}];{"name":"carrier","value":"verified"}
                // println("mergeUdf:segmentArray2>" + segmentArray.toString + ";" +json.toString)
                import mobvista.dmp.datasource.dsp.mapreduce.SegmentVO
                import mobvista.prd.datasource.util.GsonUtil
                val segmentVO = GsonUtil.fromJson(json, classOf[SegmentVO])
                segmentMap.put(segmentVO.getId, segmentVO)
              }
            }

          }
        }
        if (segmentMap.size != 0) {
          ret = new ObjectMapper().writeValueAsString(segmentMap.values())
        }
        ret
      })


      val mergeRegionUdf = udf((regsArrays: mutable.WrappedArray[String]) => {
        var res = ""
        val regSet: util.Set[String] = new util.HashSet[String]()
        if (regsArrays != null && regsArrays.size != 0) {
          for (regs <- regsArrays) {
            if (StringUtils.isNotBlank(regs)) {
              val regsname = regs.split("#", -1)
              for (regname <- regsname) {
                regSet.add(regname)
              }
            }
          }
        }

        if (regSet.size() != 0) {
          res = new ObjectMapper().writeValueAsString(regSet)
        }
        res
      })


      val midData = spark.read.schema(dspEtlSchema).orc(input).flatMap(parseMapData(_)).toDF("device_id", "device_type", "platform", "country_code", "ip", "gender", "birthday", "maker",
        "model", "os_version", "package_list", "androidids", "datetime", "segment", "region")


      midData
        .groupBy("device_id", "device_type")
        .agg(max("platform"),
          max("country_code"),
          max("ip"),
          max("gender"),
          max("birthday"),
          max("maker"),
          max("model"),
          max("os_version"),
          mergePkgUdf(collect_set("package_list")),
          mergeAdridIdUdf(collect_set("androidids")),
          max("datetime"),
          mergeUdf(collect_set("segment")),
          mergeRegionUdf(collect_set("region"))
        ).repartition(coalesce)
        .rdd.map(_.mkString("\t")).saveAsTextFile(output_etl_dsp_request_hour, classOf[GzipCodec])


    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }


  def parseMapData(row: Row): Array[DspReqVOoptimization] = {
    var arrayBuffer = new ArrayBuffer[DspReqVOoptimization]()

    //    val idfa = row.getAs[String]("idfa")
    var idfa = ""
    val idfaTmp = row.getAs[String]("idfa")
    if (StringUtils.isNotBlank(idfaTmp) && idfaTmp.matches(didPtn)) idfa = idfaTmp

    //    val gaid = row.getAs[String]("gaid")
    var gaid = ""
    val gaidTmp = row.getAs[String]("gaid")
    if (StringUtils.isNotBlank(gaidTmp) && gaidTmp.matches(didPtn)) gaid = gaidTmp


    val platform = row.getAs[String]("platform")
    val country = row.getAs[String]("country")
    val ip = row.getAs[String]("ip")
    val gender = row.getAs[String]("gender")
    val birthday = row.getAs[String]("birthday")
    val maker = row.getAs[String]("maker")
    val model = row.getAs[String]("model")
    val osVersion = row.getAs[String]("osVersion")
    var packageName = ""
    if (StringUtils.isNotBlank(row.getAs[String]("packageName"))) {
      packageName = row.getAs[String]("packageName")
    }

    val exitId = row.getAs[String]("exitId")
    val time = row.getAs[String]("datetime")
    var segment = ""
    if (StringUtils.isNotBlank(row.getAs[String]("segment"))) {
      segment = row.getAs[String]("segment")
    }
    val region = row.getAs[String]("region")

    var androidId = ""
    var imei = ""
    var imeimd5 = ""
    // 添加oaid  oaidmd5 解析
    var oaid = ""
    var oaidmd5 = ""

    //  新增
    var idfv = ""
    var gaidmd5 = ""

    if (StringUtils.isNotBlank(exitId)) {
      val devIds = splitFun(exitId, ",")
      if (devIds.length >= 17) {
        if ("ios".equalsIgnoreCase(platform)) {
          if (StringUtils.isBlank(idfa) && StringUtils.isNotBlank(devIds(1)) && devIds(1).matches(MobvistaConstant.didPtn)) {
            idfa = devIds(1)
          }
          if (StringUtils.isNotBlank(devIds(16)) && devIds(16).matches(MobvistaConstant.didPtn)) {
            idfv = devIds(16)
          }
        } else {
          if (StringUtils.isBlank(gaid) && StringUtils.isNotBlank(devIds(0)) && devIds(0).matches(MobvistaConstant.didPtn)) {
            gaid = devIds(0)
          }
          if (StringUtils.isNotBlank(devIds(2)) && devIds(2).matches(MobvistaConstant.md5Ptn)) {
            gaidmd5 = devIds(2)
          }
          if (StringUtils.isNotBlank(devIds(12))) {
            oaid = devIds(12)
          }
          if (StringUtils.isNotBlank(devIds(13)) && devIds(13).matches(MobvistaConstant.md5Ptn)) {
            oaidmd5 = devIds(13)
          }
          if (StringUtils.isNotBlank(devIds(4)) && devIds(4).matches(MobvistaConstant.imeiPtn)) {
            imei = devIds(4)
          }
          if (StringUtils.isNotBlank(devIds(5)) && devIds(5).matches(MobvistaConstant.md5Ptn)) {
            imeimd5 = devIds(5)
          }
          if (StringUtils.isNotBlank(devIds(7)) && devIds(7).matches(MobvistaConstant.andriodIdPtn)) {
            androidId = devIds(7)
          }
        }
      }

      /*
      if (devIds.length >= 14) {
        if (StringUtils.isNotBlank(devIds(7)) && devIds(7).matches(CommonMapReduce.andriodIdPtn)) {
          androidId = devIds(7)
        }

        if (StringUtils.isNotBlank(devIds(4)) && devIds(4).matches(CommonMapReduce.imeiPtn) && "android".equals(platform) && StringUtils.isNotBlank(country) && "CN".equalsIgnoreCase(country)) {
          imei = devIds(4)
        }

        if (StringUtils.isNotBlank(devIds(5)) && devIds(5).matches(CommonMapReduce.imeiMd5Ptn) && "android".equals(platform) && StringUtils.isNotBlank(country) && "CN".equalsIgnoreCase(country)) {
          imeimd5 = devIds(5)
        }

        if (StringUtils.isNotBlank(devIds(12)) && "android".equals(platform) && StringUtils.isNotBlank(country) && "CN".equalsIgnoreCase(country)) {
          oaid = devIds(12)
        }

        if (StringUtils.isNotBlank(devIds(13)) && devIds(13).matches(CommonMapReduce.imeiMd5Ptn) && "android".equals(platform) && StringUtils.isNotBlank(country) && "CN".equalsIgnoreCase(country)) {
          oaidmd5 = devIds(13)
        }
      }
      */
    }

    val value = Value(country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, segment, region)

    var deviceType = ""
    var deviceId = ""


    if ("ios".equals(platform) && idfa.length > 4 && !allZero.equals(idfa)) {
      deviceId = idfa
      deviceType = "idfa"
    } else if ("android".equals(platform) && gaid.length > 4 && !allZero.equals(gaid)) {
      deviceId = gaid
      deviceType = "gaid"
    }

    arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)

    /*
    if (!deviceId.isEmpty) {
      arrayBuffer += DspReqVOoptimization(deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, segment, region)
    }

    if (!androidId.isEmpty) {
      arrayBuffer += DspReqVOoptimization(androidId, "androidid", "android", country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, segment, region)
    }

    if (StringUtils.isNotBlank(imei)) {
      arrayBuffer += DspReqVOoptimization(imei, "imei", "android", country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, segment, region)

      val imeimd5 = MD5Util.getMD5Str(imei)
      arrayBuffer += DspReqVOoptimization(imeimd5, "imeimd5", "android", country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, segment, region)

    }

    if (StringUtils.isNotBlank(imeimd5)) {
      arrayBuffer += DspReqVOoptimization(imeimd5, "imeimd5", "android", country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, segment, region)
    }

    if (StringUtils.isNotBlank(oaid)) {
      arrayBuffer += DspReqVOoptimization(oaid, "oaid", "android", country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, segment, region)

      val oaidmd5 = MD5Util.getMD5Str(oaid)
      arrayBuffer += DspReqVOoptimization(oaidmd5, "oaidmd5", "android", country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, segment, region)
    }

    if (StringUtils.isNotBlank(oaidmd5)) {
      arrayBuffer += DspReqVOoptimization(oaidmd5, "oaidmd5", "android", country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, segment, region)
    }
    */

    //  添加dealid入库逻辑
    val exchanges = row.getAs[String]("exchanges")
    val dealerid = row.getAs[String]("dealerid")
    val dealeridArray: Array[String] = dealerid.split(",")
    if ("oppocn".equals(exchanges)) {
      if (dealeridArray.contains("2532") || dealeridArray.contains("2533")) {
        /*
        arrayBuffer = addDatas(arrayBuffer, imei, imeimd5, oaid, oaidmd5, deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion,
          "com.taobao.taobao", androidId, time, segment, region)
        arrayBuffer = addDatas(arrayBuffer, imei, imeimd5, oaid, oaidmd5, deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion,
          "com.taobao.taobao_oppo", androidId, time, segment, region)
        */
        value.packageName = "com.taobao.taobao"
        arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
        value.packageName = "com.taobao.taobao_oppo"
        arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
      } else {
        // 2020.11.26 需求 若adx(exchanges字段)为oppo且dealid不等于2532或2533,则伪包名为com.taobao.taobao_notinstall_oppo
        /*
        arrayBuffer = addDatas(arrayBuffer, imei, imeimd5, oaid, oaidmd5, deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion,
          "com.taobao.taobao_notinstall_oppo", androidId, time, segment, region)
        */
        value.packageName = "com.taobao.taobao_notinstall_oppo"
        arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
      }
      //  添加com.UCMobile 对应伪包名小写com.ucmobile_oppo
      if (dealeridArray.contains("2728")) {
        /*
        arrayBuffer = addDatas(arrayBuffer, imei, imeimd5, oaid, oaidmd5, deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion,
          "com.UCMobile", androidId, time, segment, region)
        arrayBuffer = addDatas(arrayBuffer, imei, imeimd5, oaid, oaidmd5, deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion,
          "com.ucmobile_oppo", androidId, time, segment, region)
        */
        value.packageName = "com.UCMobile"
        arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
        value.packageName = "com.ucmobile_oppo"
        arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
      }
      if (dealeridArray.contains("4059")) {
        value.packageName = "com.ss.android.ugc.aweme"
        arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
        value.packageName = "com.ss.android.ugc.aweme_oppoziyou"
        arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
      }
      if (dealeridArray.contains("4060")) {
        value.packageName = "com.ss.android.ugc.aweme_oppoziyou_notinstall"
        arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
      }
      if (dealeridArray.contains("4061")) {
        value.packageName = "com.ss.android.ugc.aweme_oppoziyou_hist_notinstall"
        arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
      }
      if (dealeridArray.contains("4053")) {
        value.packageName = "com.ss.android.ugc.aweme"
        arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
        value.packageName = "com.ss.android.ugc.aweme_oppolianmeng"
        arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
      }
      if (dealeridArray.contains("4054")) {
        value.packageName = "com.ss.android.ugc.aweme_oppolianmeng_hist1year_notinstall"
        arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
      }
      if (dealeridArray.contains("4055")) {
        value.packageName = "com.ss.android.ugc.aweme_oppolianmeng_histhalfyear_notinstall"
        arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
      }
      //  添加 oppo 在请求中开始传支付宝,京东,爱奇艺这几个包名的安装信息,并入库。 (2020-10-28 17:17 补充说明: com.eg.android.AlipayGphone_oppo伪包名大写,但已经入库,修改困难。以后入库伪包名统一定为小写,类似上面的 com.ucmobile_oppo)
      //  2020.11.11 添加 2783对应的咸鱼的 com.taobao.idlefish 和 com.taobao.idlefish_oppo 入库
      //  2020.11.26 添加 2840对应的 com.youku.phone_notinstall 和 com.youku.phone_notinstall_oppo 入库
      //  2020.12.24 添加 2889对应的 com.sankuai.meituan和 com.sankuai.meituan_oppo 入库
      //  2020.12.24 添加 2890对应的 com.meituan.itakeaway和 com.meituan.itakeaway_oppo 入库
      //  2021.01.06添加  快手、快手极速版、优酷
      //  2021.04.22 添加adx(exchanges字段)为oppo且dealid=3160,则伪包名为com.tencent.news_oppo和com.tencent.news
      var mapData: Map[Int, String] = Map(2716 -> "com.jingdong.app.mall",
        2717 -> "com.eg.android.AlipayGphone",
        2718 -> "com.qiyi.video",
        2783 -> "com.taobao.idlefish",
        2840 -> "com.youku.phone_notinstall",
        2889 -> "com.sankuai.meituan",
        2890 -> "com.meituan.itakeaway",
        2904 -> "com.smile.gifmaker",
        2905 -> "com.kuaishou.nebula",
        2906 -> "com.youku.phone",
        3160 -> "com.tencent.news")
      for (item <- mapData) {
        if (dealeridArray.contains(item._1.toString)) {
          /*
          arrayBuffer = addDatas(arrayBuffer, imei, imeimd5, oaid, oaidmd5, deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion,
            mapData(num), androidId, time, segment, region)
          arrayBuffer = addDatas(arrayBuffer, imei, imeimd5, oaid, oaidmd5, deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion,
            mapData(num) + "_oppo", androidId, time, segment, region)
          */
          value.packageName = item._2
          arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
          value.packageName = item._2+"_oppo"
          arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
        }
      }
      //2020.11.11添加快手、京东入库
      mapData = Map(2774 -> "com.smile.gifmaker_notinstall_oppo",
        2773 -> "com.jingdong.app.mallr_notinstall_oppo")
      for (item <- mapData) {
        if (dealeridArray.contains(item._1.toString)) {
          /*
          arrayBuffer = addDatas(arrayBuffer, imei, imeimd5, oaid, oaidmd5, deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion,
            mapData(num), androidId, time, segment, region)
          */
          value.packageName = item._2
          arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
        }
      }

    }
    if ("inmobi".equals(exchanges)) {
      if (dealeridArray.contains("1594807676568")) {
        /*
        arrayBuffer = addDatas(arrayBuffer, imei, imeimd5, oaid, oaidmd5, deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion,
          "com.taobao.taobao_inmobi", androidId, time, segment, region)
        */
        value.packageName = "com.taobao.taobao_inmobi"
        arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
      }
    }
    //2020.11.26 增加bes入库需求
    // wiki   https://confluence.mobvista.com/pages/viewpage.action?pageId=47976499
    //2021.04.22 添加adx(exchanges字段)为bes且dealid=100188,则伪包名为com.tencent.news_bes和com.tencent.news
    //2021.06.17 添加adx(exchanges字段)为bes且dealid=100310,则伪包名为com.taobao.litetao_bes和com.taobao.litetao
    val mapData_bes = Map(100193 -> "com.taobao.taobao",
      100189 -> "com.eg.android.AlipayGphone",
      100191 -> "com.jingdong.app.mall",
      100187 -> "com.UCMobile",
      100194 -> "com.taobao.idlefish",
      100195 -> "com.qiyi.video",
      100196 -> "com.smile.gifmaker",
      100197 -> "id387682726",
      100188 -> "com.tencent.news",
      100310 -> "com.taobao.litetao",
      100203 -> "com.ss.android.ugc.aweme")
    if ("bes".equals(exchanges)) {
      for (item <- mapData_bes) {
        if (dealeridArray.contains(item._1.toString)) {
          /*
          arrayBuffer = addDatas(arrayBuffer, imei, imeimd5, oaid, oaidmd5, deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion,
            mapData_bes(num), androidId, time, segment, region)
          arrayBuffer = addDatas(arrayBuffer, imei, imeimd5, oaid, oaidmd5, deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion,
            mapData_bes(num) + "_bes", androidId, time, segment, region)
          */
          value.packageName = item._2
          arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
          value.packageName = item._2+"_bes"
          arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
        }
      }
    }
    //2021.04.28  天级别从dsp请求日志(adn_dsp.log_adn_dsp_request_orc_hour)中拉取exchanges='tencent' 并且appid = 'com.tencent.news',入库伪包名为com.tencent.news_fromtencent
    if ("tencent".equals(exchanges)) {
      if (packageName.split("#", -1).contains("com.tencent.news")) {
        /*
        arrayBuffer = addDatas(arrayBuffer, imei, imeimd5, oaid, oaidmd5, deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion,
          "com.tencent.news_fromtencent", androidId, time, segment, region)
        */
        value.packageName = "com.tencent.news_fromtencent"
        arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
      }
    }
    //2021.06.17  天级别从dsp请求日志(adn_dsp.log_adn_dsp_request_orc_hour)中拉取exchanges='kuaishou' 并且appid 符合下列名称的入库
    val mapData_kuaishou = Map("com.smile.gifmaker" -> "com.smile.gifmaker_fromkuaishou",
      "com.kuaishou.nebula" -> "com.kuaishou.nebula_fromkuaishou",
      "440948110" -> "44094811020210617",
      "1472502819" -> "147250281920210617")
    if ("kuaishou".equals(exchanges)) {
      for (item <- mapData_kuaishou) {
        if (packageName.split("#", -1).contains(item._1)) {
          value.packageName = item._2
          arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
        }
      }
    }

    val mapData_iqiyi = Map(5260 -> "com.taobao.taobao",
      1301 -> "com.UCMobile",
      10949 -> "com.eg.android.AlipayGphone",
      6242 -> "com.taobao.idlefish",
      3996 -> "com.sankuai.meituan",
      7156 -> "com.tencent.news")
    if("iqiyi".equals(exchanges)){
      for (item <- mapData_iqiyi) {
        if (dealeridArray.contains(item._1.toString)) {
          value.packageName = item._2
          arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
          value.packageName = item._2+"_iqiyi"
          arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
        }
      }
    //2021.06.17 若adx(exchanges字段)为iqiyi且dealid=112644(安装) 且os = 'android',则伪包名为com.taobao.litetao_iqiyi和com.taobao.litetao   若adx(exchanges字段)为iqiyi且dealid=112644(安装)且os='ios',则安装包名为134037632320210617和1340376323
      if (dealeridArray.contains("112644")) {
        if("android".equalsIgnoreCase(platform)){
          value.packageName = "com.taobao.litetao"
          arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, "", deviceType, "android", value)
          value.packageName = "com.taobao.litetao_iqiyi"
          arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, "", deviceType, "android", value)
        }else{
          value.packageName = "1340376323"
          arrayBuffer = addDatasV2(arrayBuffer, deviceId, "", "", "", "", "", "", idfv, deviceType, "ios", value)
          value.packageName = "134037632320210617"
          arrayBuffer = addDatasV2(arrayBuffer, deviceId, "", "", "", "", "", "", idfv, deviceType, "ios", value)
        }
      }
    }

    //adx=iqiyi 且os=android的imeiMD5和oaidMD5去重设备, 分别和com.taobao.foractivation.227229和com.taobao.foractivation.227229_oaid做差集   2020.12.15
    //2021.04.22下掉,该需求不需要了
    //    if("iqiyi".equals(exchanges)){
    //      arrayBuffer = addDatas(arrayBuffer,imei,imeimd5,oaid,oaidmd5,deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion,
    //        "com.iqiyi_android.subtract_with_227229", androidId, time, segment,region)
    //    }
    arrayBuffer.toArray
  }


  def addDatas(arrayBuffer: ArrayBuffer[DspReqVOoptimization], imei: String, imeimd5: String, oaid: String, oaidmd5: String, deviceId: String, deviceType: String, platform: String, country: String, ip: String, gender: String, birthday: String, maker: String,
               model: String, osVersion: String, packageName: String, androidId: String, time: String, segment: String, region: String): ArrayBuffer[DspReqVOoptimization] = {
    if (!deviceId.isEmpty) {
      arrayBuffer += DspReqVOoptimization(deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, segment, region)
    }
    if (!androidId.isEmpty) {
      arrayBuffer += DspReqVOoptimization(androidId, "androidid", "android", country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, segment, region)
    }
    if (StringUtils.isNotBlank(imei)) {
      arrayBuffer += DspReqVOoptimization(imei, "imei", "android", country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, segment, region)
      val imeimd5 = MD5Util.getMD5Str(imei)
      arrayBuffer += DspReqVOoptimization(imeimd5, "imeimd5", "android", country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, segment, region)
    }
    if (StringUtils.isNotBlank(imeimd5)) {
      arrayBuffer += DspReqVOoptimization(imeimd5, "imeimd5", "android", country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, segment, region)
    }
    if (StringUtils.isNotBlank(oaid)) {
      arrayBuffer += DspReqVOoptimization(oaid, "oaid", "android", country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, segment, region)

      val oaidmd5 = MD5Util.getMD5Str(oaid)
      arrayBuffer += DspReqVOoptimization(oaidmd5, "oaidmd5", "android", country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, segment, region)

    }
    if (StringUtils.isNotBlank(oaidmd5)) {
      arrayBuffer += DspReqVOoptimization(oaidmd5, "oaidmd5", "android", country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, segment, region)
    }
    arrayBuffer
  }

  def addDatasV2(arrayBuffer: ArrayBuffer[DspReqVOoptimization], deviceId: String, gaidmd5: String, imei: String, imeimd5: String,
                 oaid: String, oaidmd5: String, androidId: String, idfv: String, deviceType: String, platform: String, value: Value
                ): ArrayBuffer[DspReqVOoptimization] = {
    var gaidFlag = true
    if (StringUtils.isNotBlank(deviceId)) {
      arrayBuffer += DspReqVOoptimization(deviceId, deviceType, platform,
        value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
      if ("android".equalsIgnoreCase(platform) && "gaid".equalsIgnoreCase(deviceType)) {
        val gaidmd5 = MD5Util.getMD5Str(deviceId)
        gaidFlag = false
        arrayBuffer += DspReqVOoptimization(gaidmd5, "gaidmd5", "android",
          value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
      }
    }
    if (StringUtils.isNotBlank(gaidmd5) && gaidFlag) {
      arrayBuffer += DspReqVOoptimization(gaidmd5, "gaidmd5", "android",
        value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
    }
    if (StringUtils.isNotBlank(deviceId)) {
      arrayBuffer += DspReqVOoptimization(deviceId, deviceType, platform,
        value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
    }
    if (StringUtils.isNotBlank(androidId)) {
      arrayBuffer += DspReqVOoptimization(androidId, "androidid", "android",
        value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
    }
    if (StringUtils.isNotBlank(idfv)) {
      arrayBuffer += DspReqVOoptimization(idfv, "idfv", "ios",
        value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
    }
    var imeiFlag = true
    if (StringUtils.isNotBlank(imei)) {
      arrayBuffer += DspReqVOoptimization(imei, "imei", "android",
        value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
      val imeimd5 = MD5Util.getMD5Str(imei)
      imeiFlag = false
      arrayBuffer += DspReqVOoptimization(imeimd5, "imeimd5", "android",
        value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
    }
    if (StringUtils.isNotBlank(imeimd5) && imeiFlag) {
      arrayBuffer += DspReqVOoptimization(imeimd5, "imeimd5", "android",
        value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
    }
    var oaidFlag = true
    if (StringUtils.isNotBlank(oaid)) {
      arrayBuffer += DspReqVOoptimization(oaid, "oaid", "android",
        value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
      val oaidmd5 = MD5Util.getMD5Str(oaid)
      oaidFlag = false
      arrayBuffer += DspReqVOoptimization(oaidmd5, "oaidmd5", "android",
        value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)

    }
    if (StringUtils.isNotBlank(oaidmd5) && oaidFlag) {
      arrayBuffer += DspReqVOoptimization(oaidmd5, "oaidmd5", "android",
        value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
    }
    arrayBuffer
  }


  def dspEtlSchema: StructType = {
    StructType(StructField("idfa", StringType) ::
      StructField("gaid", StringType) ::
      StructField("platform", StringType) ::
      StructField("country", StringType) ::
      StructField("ip", StringType) ::
      StructField("gender", StringType) ::
      StructField("birthday", StringType) ::
      StructField("maker", StringType) ::
      StructField("model", StringType) ::
      StructField("osVersion", StringType) ::
      StructField("packageName", StringType) ::
      StructField("exitId", StringType) ::
      StructField("datetime", StringType) ::
      StructField("segment", StringType) ::
      StructField("dealerid", StringType) ::
      StructField("exchanges", StringType) ::
      StructField("region", StringType)
      :: Nil)
  }


  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("input", true, "input")
    options.addOption("output", true, "output")
    options.addOption("parallelism", true, "parallelism")
    options.addOption("coalesce", true, "coalesce")
    options
  }
}

object DspOrgEtlDailys {
  def main(args: Array[String]): Unit = {
    new DspOrgEtlDailys().run(args)
  }
}