AdnRequest.scala 2.31 KB
Newer Older
WangJinfeng committed
1 2
package mobvista.dmp.datasource.id_mapping

WangJinfeng committed
3
import mobvista.dmp.datasource.id_mapping.Constant.{getDevId, parseUA, process}
WangJinfeng committed
4 5 6 7 8 9 10 11 12 13 14 15 16
import org.apache.commons.lang3.StringUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}

/**
 * @package: mobvista.dmp.datasource.id_mapping
 * @author: wangjf
 * @date: 2021/11/30
 * @time: 7:51 下午
 * @email: jinfeng.wang@mobvista.com
 */
class AdnRequest extends EtlDeviceIdDaily {

WangJinfeng committed
17 18 19
  override def processData(date: String, i: Int, spark: SparkSession): RDD[(String, Row)] = {
    spark.udf.register("getDevId", getDevId _)
    spark.udf.register("parseUA", parseUA _)
WangJinfeng committed
20 21 22
    //  DWD
    //  val sql = Constant.adn_request_sql.replace("@date", date)
    //  ODS
WangJinfeng committed
23 24
    val sql = Constant.adn_request_sql_v3.replace("@date", date)
    val rdd = spark.sql(sql).coalesce(5000).rdd.map(row => {
WangJinfeng committed
25 26 27 28 29
      val gaid = row.getAs[String]("gaid")
      val idfa = row.getAs[String]("idfa")
      val imei = row.getAs[String]("imei")
      val androidId = row.getAs[String]("androidid")
      val extSysid = row.getAs[String]("extsysid")
WangJinfeng committed
30
      val platform = row.getAs[String]("platform")
WangJinfeng committed
31 32
      val oaid = row.getAs[String]("oaid")
      val idfv = row.getAs[String]("idfv")
WangJinfeng committed
33 34
      val pkg_name = row.getAs[String]("pkg_name")
      val country = row.getAs[String]("country")
WangJinfeng committed
35 36 37 38 39 40 41
      val ip = row.getAs[String]("ip")
      val ua = row.getAs[String]("ua")
      val brand = row.getAs[String]("brand")
      val model = row.getAs[String]("model")
      val os_version = row.getAs[String]("os_version")
      val osv_upt = row.getAs[String]("osv_upt")
      val upt = row.getAs[String]("upt")
WangJinfeng committed
42
      val network_type = row.getAs[String]("network_type")
WangJinfeng committed
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
      var sysId = ""
      var bkupId = ""
      if (StringUtils.isNotBlank(extSysid)) {
        val arr = extSysid.split(",", -1)
        sysId = if (StringUtils.isNotBlank(arr(0))) {
          arr(0)
        } else {
          ""
        }
        bkupId = if (arr.length == 2 && StringUtils.isNotBlank(arr(1))) {
          arr(1)
        } else {
          ""
        }
      }
WangJinfeng committed
58 59
      process(idfa, idfv, pkg_name, imei, androidId, oaid, gaid, sysId, bkupId, country, ip, ua, brand, model, os_version,
        osv_upt, upt, network_type, platform)
WangJinfeng committed
60 61 62 63 64 65 66 67 68 69
    })
    rdd
  }
}

object AdnRequest {
  def main(args: Array[String]): Unit = {
    new AdnRequest().run(args)
  }
}