AdnOrgLogEtlHours.scala 6.85 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
package mobvista.dmp.datasource.adn

import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.util.MRUtils
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec

import java.net.URI


class AdnOrgLogEtlHours extends CommonSparkJob with Serializable {
  val TAB_DELIMITER = "\t"

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("datetime", true, "datetime")
    options.addOption("region", true, "region")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)

    val coalesce = commandLine.getOptionValue("coalesce")
    val datetime = commandLine.getOptionValue("datetime")
    val region = commandLine.getOptionValue("region")
    val output = commandLine.getOptionValue("output")

    val spark = MobvistaConstant.createSparkSession(s"AdnOrgLogEtlHours.$datetime.$region")
fan.jiang committed
36 37 38 39
    //  添加hb request的数据入库dmp,数据源在s3://mob-ad/adn/hb-v1/request  本身该路径已经挂载表dwh.ods_adn_trackingnew_hb_request  但是该表的挂载语句和和下面用到的表dwh.ods_adn_trackingnew_request挂载语句不是同时执行的
    //  执行这个文件的shell脚本,运行运行时是通过判断路径下的_SUCCESS文件来进行运行的,可能出现表dwh.ods_adn_trackingnew_request挂载成功,但是表dwh.ods_adn_trackingnew_hb_request 挂载还没成功的情况,此时表dwh.ods_adn_trackingnew_hb_request 访问不到
    //   该小时下的数据,所以改用  dwh.ods_adn_trackingnew_request_tmp_hb_request ,因为他和表dwh.ods_adn_trackingnew_request挂载是同时进行的,执行脚本 https://gitlab.mobvista.com/fan.jiang/ods_adn_trackingnew_click_merge/blob/master/job/ods_adn_trackingnew_request_merge.sh
    //    azkaban链接   https://dataplatform.mobvista.com:8443/manager?project=ods_adn_trackingnew_click_merge&flow=ods_adn_trackingnew_merge#executions
wang-jinfeng committed
40 41 42 43 44 45
    val sql =
      s"""
         |SELECT date, time, created timestamp, app_id, platform, os_version, sdk_version, device_model, screen_size, country_code,
         |  language, strategy, ip, imei, mac, dev_id android_id, gaid, idfa, device_brand, getDevId(cdn_ab) idfv, ext_packagename package_name,
         |  getDevId(ext_sysid) sysid, ext_oaid oaid, getRuid(ext_algo) ruid
         |  FROM dwh.ods_adn_trackingnew_request WHERE CONCAT(yyyy,mm,dd,hh) = '$datetime' AND re = '$region'
fan.jiang committed
46 47 48 49 50
         |  UNION
         |  SELECT date, time, created timestamp, app_id, platform, os_version, sdk_version, device_model, screen_size, country_code,
         |  language, strategy, ip, imei, mac, dev_id android_id, gaid, idfa, device_brand, getDevId(cdn_ab) idfv, ext_packagename package_name,
         |  getDevId(ext_sysid) sysid, ext_oaid oaid, getRuid(ext_algo) ruid
         |  FROM dwh.ods_adn_trackingnew_request_tmp_hb_request  WHERE CONCAT(yyyy,mm,dd,hh) = '$datetime' AND re = '${region}_hb_request'
wang-jinfeng committed
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
         |""".stripMargin
    try {

      spark.udf.register("getDevId", AdnConstant.getDevId _)
      spark.udf.register("getRuid", AdnConstant.getRuid _)

      val rdd = spark.sql(sql).dropDuplicates().rdd.map(r => {
        val date = r.getAs[String]("date")
        val time = r.getAs[String]("time")
        val timestamp = r.getAs[String]("timestamp")
        val appId = r.getAs[String]("app_id")
        val platform = r.getAs[String]("platform")
        val osVersion = r.getAs[String]("os_version")
        val sdkVersion = r.getAs[String]("sdk_version")
        val deviceModel = r.getAs[String]("device_model")
        val screenSize = r.getAs[String]("screen_size")
        val countryCode = r.getAs[String]("country_code")
        val language = r.getAs[String]("language")
        val strategy = r.getAs[String]("strategy")
        val ip = r.getAs[String]("ip")
        val imei = r.getAs[String]("imei")
        val mac = r.getAs[String]("mac")
        val androidId = r.getAs[String]("android_id")
        val gaid = r.getAs[String]("gaid")
        val idfa = r.getAs[String]("idfa")
        val deviceBrand = r.getAs[String]("device_brand")
        val idfv = r.getAs[String]("idfv")
        val packageName = r.getAs[String]("package_name")
        val sysId = r.getAs[String]("sysid")
        val oaid = r.getAs[String]("oaid")
        val ruid = r.getAs[String]("ruid")

        if ((StringUtils.isNotBlank(idfa) && idfa.matches(MobvistaConstant.didPtn)) ||
          (StringUtils.isNotBlank(idfv) && idfv.matches(MobvistaConstant.didPtn)) || (StringUtils.isNotBlank(oaid) && oaid.matches(MobvistaConstant.oaidPtb)) ||
          (StringUtils.isNotBlank(sysId) && sysId.matches(MobvistaConstant.didPtn)) || (StringUtils.isNotBlank(ruid) && ruid.length > 16)) {
          val plt = if (StringUtils.isNotBlank(platform)) {
            platform
          } else {
            "ios"
          }
          MRUtils.JOINER.join(date, time, timestamp, appId, plt, osVersion, sdkVersion, deviceModel, screenSize, countryCode,
            language, ip, "", mac, "", "", idfa, deviceBrand, sysId, packageName, strategy, oaid, idfv, ruid)
        } else if ((StringUtils.isNotBlank(gaid) && gaid.matches(MobvistaConstant.didPtn)) ||
          (StringUtils.isNotBlank(imei) && imei.matches(MobvistaConstant.imeiPtn)) || (StringUtils.isNotBlank(androidId) && androidId.matches(MobvistaConstant.andriodIdPtn)) ||
          (StringUtils.isNotBlank(oaid) && oaid.matches(MobvistaConstant.oaidPtb)) || (StringUtils.isNotBlank(sysId) && sysId.matches(MobvistaConstant.didPtn))) {
          val plt = if (StringUtils.isNotBlank(platform)) {
            platform
          } else {
            "android"
          }
          MRUtils.JOINER.join(date, time, timestamp, appId, plt, osVersion, sdkVersion, deviceModel, screenSize, countryCode,
            language, ip, imei, mac, androidId, gaid, "", deviceBrand, sysId, packageName, strategy, oaid, "", "")
        } else {
          null
        }
      }).filter(l => {
        StringUtils.isNotBlank(l)
      })

      FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)

      rdd.repartition(coalesce.toInt)
        .saveAsTextFile(output, classOf[GzipCodec])
    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("input", true, "[must] input")
    options.addOption("output", true, "[must] output")
    options.addOption("coalesce", true, "[must] coalesce")
    options
  }
}

object AdnOrgLogEtlHours {
  def main(args: Array[String]): Unit = {
    new AdnOrgLogEtlHours().run(args)
  }
}