package mobvista.dmp.datasource.adn

import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.util.MRUtils
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec

import java.net.URI


class AdnOrgLogEtlHours extends CommonSparkJob with Serializable {
  val TAB_DELIMITER = "\t"

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("datetime", true, "datetime")
    options.addOption("region", true, "region")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)

    val coalesce = commandLine.getOptionValue("coalesce")
    val datetime = commandLine.getOptionValue("datetime")
    val region = commandLine.getOptionValue("region")
    val output = commandLine.getOptionValue("output")

    val spark = MobvistaConstant.createSparkSession(s"AdnOrgLogEtlHours.$datetime.$region")
    //  添加hb request的数据入库dmp,数据源在s3://mob-ad/adn/hb-v1/request  本身该路径已经挂载表dwh.ods_adn_trackingnew_hb_request  但是该表的挂载语句和和下面用到的表dwh.ods_adn_trackingnew_request挂载语句不是同时执行的
    //  执行这个文件的shell脚本,运行运行时是通过判断路径下的_SUCCESS文件来进行运行的,可能出现表dwh.ods_adn_trackingnew_request挂载成功,但是表dwh.ods_adn_trackingnew_hb_request 挂载还没成功的情况,此时表dwh.ods_adn_trackingnew_hb_request 访问不到
    //   该小时下的数据,所以改用  dwh.ods_adn_trackingnew_request_tmp_hb_request ,因为他和表dwh.ods_adn_trackingnew_request挂载是同时进行的,执行脚本 https://gitlab.mobvista.com/fan.jiang/ods_adn_trackingnew_click_merge/blob/master/job/ods_adn_trackingnew_request_merge.sh
    //    azkaban链接   https://dataplatform.mobvista.com:8443/manager?project=ods_adn_trackingnew_click_merge&flow=ods_adn_trackingnew_merge#executions
    val sql =
      s"""
         |SELECT date, time, created timestamp, app_id, platform, os_version, sdk_version, device_model, screen_size, country_code,
         |  language, strategy, ip, imei, mac, dev_id android_id, gaid, idfa, device_brand, getDevId(cdn_ab) idfv, ext_packagename package_name,
         |  getDevId(ext_sysid) sysid, ext_oaid oaid, getRuid(ext_algo) ruid
         |  FROM dwh.ods_adn_trackingnew_request WHERE CONCAT(yyyy,mm,dd,hh) = '$datetime' AND re = '$region'
         |  UNION
         |  SELECT date, time, created timestamp, app_id, platform, os_version, sdk_version, device_model, screen_size, country_code,
         |  language, strategy, ip, imei, mac, dev_id android_id, gaid, idfa, device_brand, getDevId(cdn_ab) idfv, ext_packagename package_name,
         |  getDevId(ext_sysid) sysid, ext_oaid oaid, getRuid(ext_algo) ruid
         |  FROM dwh.ods_adn_trackingnew_request_tmp_hb_request  WHERE CONCAT(yyyy,mm,dd,hh) = '$datetime' AND re = '${region}_hb_request'
         |""".stripMargin
    try {

      spark.udf.register("getDevId", AdnConstant.getDevId _)
      spark.udf.register("getRuid", AdnConstant.getRuid _)

      val rdd = spark.sql(sql).dropDuplicates().rdd.map(r => {
        val date = r.getAs[String]("date")
        val time = r.getAs[String]("time")
        val timestamp = r.getAs[String]("timestamp")
        val appId = r.getAs[String]("app_id")
        val platform = r.getAs[String]("platform")
        val osVersion = r.getAs[String]("os_version")
        val sdkVersion = r.getAs[String]("sdk_version")
        val deviceModel = r.getAs[String]("device_model")
        val screenSize = r.getAs[String]("screen_size")
        val countryCode = r.getAs[String]("country_code")
        val language = r.getAs[String]("language")
        val strategy = r.getAs[String]("strategy")
        val ip = r.getAs[String]("ip")
        val imei = r.getAs[String]("imei")
        val mac = r.getAs[String]("mac")
        val androidId = r.getAs[String]("android_id")
        val gaid = r.getAs[String]("gaid")
        val idfa = r.getAs[String]("idfa")
        val deviceBrand = r.getAs[String]("device_brand")
        val idfv = r.getAs[String]("idfv")
        val packageName = r.getAs[String]("package_name")
        val sysId = r.getAs[String]("sysid")
        val oaid = r.getAs[String]("oaid")
        val ruid = r.getAs[String]("ruid")

        if ((StringUtils.isNotBlank(idfa) && idfa.matches(MobvistaConstant.didPtn)) ||
          (StringUtils.isNotBlank(idfv) && idfv.matches(MobvistaConstant.didPtn)) || (StringUtils.isNotBlank(oaid) && oaid.matches(MobvistaConstant.oaidPtb)) ||
          (StringUtils.isNotBlank(sysId) && sysId.matches(MobvistaConstant.didPtn)) || (StringUtils.isNotBlank(ruid) && ruid.length > 16)) {
          val plt = if (StringUtils.isNotBlank(platform)) {
            platform
          } else {
            "ios"
          }
          MRUtils.JOINER.join(date, time, timestamp, appId, plt, osVersion, sdkVersion, deviceModel, screenSize, countryCode,
            language, ip, "", mac, "", "", idfa, deviceBrand, sysId, packageName, strategy, oaid, idfv, ruid)
        } else if ((StringUtils.isNotBlank(gaid) && gaid.matches(MobvistaConstant.didPtn)) ||
          (StringUtils.isNotBlank(imei) && imei.matches(MobvistaConstant.imeiPtn)) || (StringUtils.isNotBlank(androidId) && androidId.matches(MobvistaConstant.andriodIdPtn)) ||
          (StringUtils.isNotBlank(oaid) && oaid.matches(MobvistaConstant.oaidPtb)) || (StringUtils.isNotBlank(sysId) && sysId.matches(MobvistaConstant.didPtn))) {
          val plt = if (StringUtils.isNotBlank(platform)) {
            platform
          } else {
            "android"
          }
          MRUtils.JOINER.join(date, time, timestamp, appId, plt, osVersion, sdkVersion, deviceModel, screenSize, countryCode,
            language, ip, imei, mac, androidId, gaid, "", deviceBrand, sysId, packageName, strategy, oaid, "", "")
        } else {
          null
        }
      }).filter(l => {
        StringUtils.isNotBlank(l)
      })

      FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)

      rdd.repartition(coalesce.toInt)
        .saveAsTextFile(output, classOf[GzipCodec])
    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("input", true, "[must] input")
    options.addOption("output", true, "[must] output")
    options.addOption("coalesce", true, "[must] coalesce")
    options
  }
}

object AdnOrgLogEtlHours {
  def main(args: Array[String]): Unit = {
    new AdnOrgLogEtlHours().run(args)
  }
}