package mobvista.dmp.datasource.id_mapping import mobvista.dmp.common.MobvistaConstant import mobvista.dmp.datasource.id_mapping.Constant.{parseUA, process} import org.apache.commons.lang3.StringUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SparkSession} /** * @package: mobvista.dmp.datasource.id_mapping * @author: wangjf * @date: 2021/11/30 * @time: 7:51 下午 * @email: jinfeng.wang@mobvista.com */ class DspReq extends EtlDeviceIdDaily { override def processData(date: String, i: Int, spark: SparkSession): RDD[(String, Row)] = { spark.udf.register("parseUA", parseUA _) // DWD // val sql = Constant.dsp_req_sql.replace("@date", date) // ODS val hour = i match { case 0 => " AND hh BETWEEN '00' AND '03'" case 1 => " AND hh BETWEEN '04' AND '07'" case 2 => " AND hh BETWEEN '08' AND '11'" case 3 => " AND hh BETWEEN '12' AND '15'" case 4 => " AND hh BETWEEN '16' AND '19'" case 5 => " AND hh BETWEEN '20' AND '23'" case _ => "" } val sql = Constant.dsp_req_sql_v2.replace("@date", date) .replace("@hour", hour) val rdd = spark.sql(sql).coalesce(20000).rdd.map(row => { var idfa = row.getAs[String]("idfa") var gaid = row.getAs[String]("gaid") val platform = row.getAs[String]("platform") val exitId = row.getAs[String]("exitid") var country = row.getAs[String]("country") val pkg_name = row.getAs[String]("pkg_name") val network_type = row.getAs[String]("network_type") country = if (StringUtils.isNotBlank(country)) { country.toUpperCase() } else { "" } val ip = row.getAs[String]("ip") val ua = row.getAs[String]("ua") val brand = row.getAs[String]("brand") val model = row.getAs[String]("model") val os_version = row.getAs[String]("os_version") val osv_upt = row.getAs[String]("osv_upt") val upt = row.getAs[String]("upt") val cnt = row.getAs[Long]("cnt") var idfv = "" var oaid = "" var imei = "" var androidId = "" if (StringUtils.isNotBlank(exitId)) { val devIds = splitFun(exitId, ",") if (devIds.length >= 17) { if ("ios".equalsIgnoreCase(platform)) { if (StringUtils.isBlank(idfa) && StringUtils.isNotBlank(devIds(1)) && devIds(1).matches(MobvistaConstant.didPtn)) { idfa = devIds(1) } if (StringUtils.isNotBlank(devIds(16)) && devIds(16).matches(MobvistaConstant.didPtn)) { idfv = devIds(16) } } else { if (StringUtils.isBlank(gaid) && StringUtils.isNotBlank(devIds(0)) && devIds(0).matches(MobvistaConstant.didPtn)) { gaid = devIds(0) } if (StringUtils.isNotBlank(devIds(12))) { oaid = devIds(12) } if (StringUtils.isNotBlank(devIds(4)) && devIds(4).matches(MobvistaConstant.imeiPtn)) { imei = devIds(4) } if (StringUtils.isNotBlank(devIds(7)) && devIds(7).matches(MobvistaConstant.andriodIdPtn)) { androidId = devIds(7) } } } } process(idfa, idfv, pkg_name, imei, androidId, oaid, gaid, sysId = "", bkupId = "", country, ip, ua, brand, model, os_version, osv_upt, upt, network_type, platform, cnt) }) rdd } } object DspReq { def main(args: Array[String]): Unit = { new DspReq().run(args) } }