package mobvista.dmp.datasource.id_mapping

import mobvista.dmp.common.MobvistaConstant
import mobvista.dmp.datasource.id_mapping.Constant.{parseUA, process}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}

/**
 * @package: mobvista.dmp.datasource.id_mapping
 * @author: wangjf
 * @date: 2021/11/30
 * @time: 7:51 下午
 * @email: jinfeng.wang@mobvista.com
 */
class DspReq extends EtlDeviceIdDaily {

  override def processData(date: String, i: Int, spark: SparkSession): RDD[(String, Row)] = {
    spark.udf.register("parseUA", parseUA _)
    //  DWD
    //  val sql = Constant.dsp_req_sql.replace("@date", date)
    //  ODS
    val hour = i match {
      case 0 =>
        " AND hh BETWEEN '00' AND '03'"
      case 1 =>
        " AND hh BETWEEN '04' AND '07'"
      case 2 =>
        " AND hh BETWEEN '08' AND '11'"
      case 3 =>
        " AND hh BETWEEN '12' AND '15'"
      case 4 =>
        " AND hh BETWEEN '16' AND '19'"
      case 5 =>
        " AND hh BETWEEN '20' AND '23'"
      case _ =>
        ""
    }
    val sql = Constant.dsp_req_sql_v2.replace("@date", date)
      .replace("@hour", hour)
    val rdd = spark.sql(sql).coalesce(20000).rdd.map(row => {
      var idfa = row.getAs[String]("idfa")
      var gaid = row.getAs[String]("gaid")
      val platform = row.getAs[String]("platform")
      val exitId = row.getAs[String]("exitid")
      var country = row.getAs[String]("country")
      val pkg_name = row.getAs[String]("pkg_name")
      val network_type = row.getAs[String]("network_type")
      country = if (StringUtils.isNotBlank(country)) {
        country.toUpperCase()
      } else {
        ""
      }
      val ip = row.getAs[String]("ip")
      val ua = row.getAs[String]("ua")
      val brand = row.getAs[String]("brand")
      val model = row.getAs[String]("model")
      val os_version = row.getAs[String]("os_version")
      val osv_upt = row.getAs[String]("osv_upt")
      val upt = row.getAs[String]("upt")
      val cnt = row.getAs[Long]("cnt")
      var idfv = ""
      var oaid = ""
      var imei = ""
      var androidId = ""
      if (StringUtils.isNotBlank(exitId)) {
        val devIds = splitFun(exitId, ",")
        if (devIds.length >= 17) {
          if ("ios".equalsIgnoreCase(platform)) {
            if (StringUtils.isBlank(idfa) && StringUtils.isNotBlank(devIds(1)) && devIds(1).matches(MobvistaConstant.didPtn)) {
              idfa = devIds(1)
            }
            if (StringUtils.isNotBlank(devIds(16)) && devIds(16).matches(MobvistaConstant.didPtn)) {
              idfv = devIds(16)
            }
          } else {
            if (StringUtils.isBlank(gaid) && StringUtils.isNotBlank(devIds(0)) && devIds(0).matches(MobvistaConstant.didPtn)) {
              gaid = devIds(0)
            }
            if (StringUtils.isNotBlank(devIds(12))) {
              oaid = devIds(12)
            }
            if (StringUtils.isNotBlank(devIds(4)) && devIds(4).matches(MobvistaConstant.imeiPtn)) {
              imei = devIds(4)
            }
            if (StringUtils.isNotBlank(devIds(7)) && devIds(7).matches(MobvistaConstant.andriodIdPtn)) {
              androidId = devIds(7)
            }
          }

        }
      }
      process(idfa, idfv, pkg_name, imei, androidId, oaid, gaid, sysId = "", bkupId = "", country, ip, ua, brand,
        model, os_version, osv_upt, upt, network_type, platform, cnt)
    })
    rdd
  }
}

object DspReq {
  def main(args: Array[String]): Unit = {
    new DspReq().run(args)
  }
}