Commit 30f76130 by WangJinfeng

init id_mapping

parent 3e38d98c
...@@ -57,9 +57,9 @@ day=${date:6:2} ...@@ -57,9 +57,9 @@ day=${date:6:2}
# 备份 s3 地址 # 备份 s3 地址
output="${BACKFLOW_OUTPUT}/${keyspace}/${table}/${date_path}/${region}/" output="${BACKFLOW_OUTPUT}/${keyspace}/${table}/${date_path}/${region}/"
export SPARK_HOME="/data/hadoop-home/engineplus-k8s-spark-3.0.0-hadoop3.2" # export SPARK_HOME="/data/hadoop-home/engineplus-k8s-spark-3.0.0-hadoop3.2"
export SPARK_CONF_DIR="/data/hadoop-config/command-home/engineplus-k8s-spark-3.0.0-online/conf" # export SPARK_CONF_DIR="/data/hadoop-config/command-home/engineplus-k8s-spark-3.0.0-online/conf"
spark-submit --class mobvista.dmp.datasource.backflow.BackFlow \ spark-submit --class mobvista.dmp.datasource.backflow.BackFlow \
--name "BackFlow.${keyspace}.${table}.${region}" \ --name "BackFlow.${keyspace}.${table}.${region}" \
......
...@@ -21,9 +21,8 @@ spark-submit --class mobvista.dmp.datasource.id_mapping.AdnRequest \ ...@@ -21,9 +21,8 @@ spark-submit --class mobvista.dmp.datasource.id_mapping.AdnRequest \
--conf spark.network.timeout=720s \ --conf spark.network.timeout=720s \
--conf spark.sql.shuffle.partitions=3000 \ --conf spark.sql.shuffle.partitions=3000 \
--conf spark.default.parallelism=3000 \ --conf spark.default.parallelism=3000 \
--conf spark.driver.extraJavaOptions="-XX:+UseG1GC" \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \ --conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 5 --num-executors 200 \ --master yarn --deploy-mode cluster --executor-memory 10g --driver-memory 8g --executor-cores 5 --num-executors 200 \
../${JAR} -date ${LOG_TIME} -business ${BUSINESS} -output ${OUTPUT_PATH} -coalesce 1000 ../${JAR} -date ${LOG_TIME} -business ${BUSINESS} -output ${OUTPUT_PATH} -coalesce 1000
if [ $? -ne 0 ]; then if [ $? -ne 0 ]; then
......
...@@ -19,11 +19,10 @@ spark-submit --class mobvista.dmp.datasource.id_mapping.DspReq \ ...@@ -19,11 +19,10 @@ spark-submit --class mobvista.dmp.datasource.id_mapping.DspReq \
--name "EtlDeviceIdDaily.$BUSINESS.$LOG_TIME" \ --name "EtlDeviceIdDaily.$BUSINESS.$LOG_TIME" \
--conf spark.yarn.executor.memoryOverhead=2048 \ --conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.network.timeout=720s \ --conf spark.network.timeout=720s \
--conf spark.sql.shuffle.partitions=3000 \ --conf spark.sql.shuffle.partitions=5000 \
--conf spark.default.parallelism=3000 \ --conf spark.default.parallelism=5000 \
--conf spark.driver.extraJavaOptions="-XX:+UseG1GC" \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \ --conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 5 --num-executors 200 \ --master yarn --deploy-mode cluster --executor-memory 10g --driver-memory 12g --executor-cores 5 --num-executors 200 \
../${JAR} -date ${LOG_TIME} -business ${BUSINESS} -output ${OUTPUT_PATH} -coalesce 1000 ../${JAR} -date ${LOG_TIME} -business ${BUSINESS} -output ${OUTPUT_PATH} -coalesce 1000
if [ $? -ne 0 ]; then if [ $? -ne 0 ]; then
......
type=command type=command
dependencies=adn_request_id_mapping,dsp_id_mapping
command=echo "ID Mapping End!" command=echo "ID Mapping End!"
\ No newline at end of file
...@@ -34,6 +34,7 @@ spark-submit --class mobvista.dmp.datasource.dm.EtlRuidMapping \ ...@@ -34,6 +34,7 @@ spark-submit --class mobvista.dmp.datasource.dm.EtlRuidMapping \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \ --conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
--conf spark.sql.adaptive.enabled=true \ --conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=536870912 \ --conf spark.sql.adaptive.advisoryPartitionSizeInBytes=536870912 \
--conf spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive=true \
--master yarn --deploy-mode cluster --executor-memory 10g --driver-memory 4g --executor-cores 5 --num-executors 100 \ --master yarn --deploy-mode cluster --executor-memory 10g --driver-memory 4g --executor-cores 5 --num-executors 100 \
../${JAR} \ ../${JAR} \
-date ${date} -output $OUTPUT_PATH -date ${date} -output $OUTPUT_PATH
......
...@@ -26,9 +26,9 @@ sleep 30 ...@@ -26,9 +26,9 @@ sleep 30
output_path="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/dm_user_info/${date_path}" output_path="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/dm_user_info/${date_path}"
unmount_output_path="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/dm_user_info/${unmount_date_path}" unmount_output_path="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/dm_user_info/${unmount_date_path}"
export SPARK_HOME="/data/hadoop-home/engineplus-k8s-spark-3.0.0-hadoop3.2" # export SPARK_HOME="/data/hadoop-home/engineplus-k8s-spark-3.0.0-hadoop3.2"
export SPARK_CONF_DIR="/data/hadoop-config/command-home/engineplus-k8s-spark-3.0.0-online/conf" # export SPARK_CONF_DIR="/data/hadoop-config/command-home/engineplus-k8s-spark-3.0.0-online/conf"
spark-submit --class mobvista.dmp.datasource.retargeting.DeviceInfoJob \ spark-submit --class mobvista.dmp.datasource.retargeting.DeviceInfoJob \
--name "DeviceInfoJob.wangjf.${date}" \ --name "DeviceInfoJob.wangjf.${date}" \
......
type=command type=command
dependencies=rtdmp_request_daily,rtdmp_request_tencent,rtdmp_request_other,rtdmp_request_dsp,rtdmp_request_btop,rtdmp_request_youku_acquisition,rtdmp_request_alive dependencies=rtdmp_request_daily,rtdmp_request_tencent,rtdmp_request_other,rtdmp_request_dsp,rtdmp_request_btop,rtdmp_request_alive
command=echo "RTDmp Request Success !!!" command=echo "RTDmp Request Success !!!"
\ No newline at end of file
...@@ -122,14 +122,15 @@ class OdsDmpUserInfoAll extends CommonSparkJob with Serializable { ...@@ -122,14 +122,15 @@ class OdsDmpUserInfoAll extends CommonSparkJob with Serializable {
| coalesce(b.gender,a.gender,10) as gender, | coalesce(b.gender,a.gender,10) as gender,
| coalesce(b.behavior,a.behavior) as behavior, | coalesce(b.behavior,a.behavior) as behavior,
| coalesce(b.update_date,a.update_date) as update_date, | coalesce(b.update_date,a.update_date) as update_date,
| coalesce(b.merge_bus,a.merge_bus) as merge_bus,
| coalesce(b.publish_date,a.publish_date) as publish_date | coalesce(b.publish_date,a.publish_date) as publish_date
|from |from
| (select dev_id,dev_id_md5,dev_type,platform,install,interest,model,case when upper(country) = 'GB' then 'UK' else upper(country) end as country ,osversion,age,gender,behavior,update_date, | (select dev_id,dev_id_md5,dev_type,platform,install,interest,model,case when upper(country) = 'GB' then 'UK' else upper(country) end as country ,osversion,age,gender,behavior,update_date,merge_bus,
| '$yesBef1Part' as publish_date | '$yesBef1Part' as publish_date
| from dwh.ods_dmp_user_info_daily where dt='$yesBef1Str' | from dwh.ods_dmp_user_info_daily where dt='$yesBef1Str'
| ) b | ) b
| full outer join | full outer join
| (select dev_id,dev_id_md5,dev_type,platform,install,interest,model,case when upper(country) = 'GB' then 'UK' else upper(country) end as country ,osversion,age,gender,behavior,update_date,publish_date | (select dev_id,dev_id_md5,dev_type,platform,install,interest,model,case when upper(country) = 'GB' then 'UK' else upper(country) end as country ,osversion,age,gender,behavior,update_date,merge_bus,publish_date
| from dwh.ods_dmp_user_info_all where dt = '$yesBef2DayStr' and update_date >= '$befYearPart') a | from dwh.ods_dmp_user_info_all where dt = '$yesBef2DayStr' and update_date >= '$befYearPart') a
| on a.dev_id = b.dev_id | on a.dev_id = b.dev_id
""".stripMargin """.stripMargin
......
...@@ -76,15 +76,16 @@ class OdsDmpUserInfoAllV2 extends CommonSparkJob with Serializable { ...@@ -76,15 +76,16 @@ class OdsDmpUserInfoAllV2 extends CommonSparkJob with Serializable {
| coalesce(b.gender,a.gender,10) as gender, | coalesce(b.gender,a.gender,10) as gender,
| coalesce(b.behavior,a.behavior) as behavior, | coalesce(b.behavior,a.behavior) as behavior,
| coalesce(b.update_date,a.update_date) as update_date, | coalesce(b.update_date,a.update_date) as update_date,
| coalesce(b.merge_bus,a.merge_bus) as merge_bus,
| case when b.publish_date is not null and b.publish_date != '' then b.publish_date | case when b.publish_date is not null and b.publish_date != '' then b.publish_date
| when a.publish_date is not null and a.publish_date != '' then a.publish_date | when a.publish_date is not null and a.publish_date != '' then a.publish_date
| else '${yesBef14Part}' end as publish_date | else '${yesBef14Part}' end as publish_date
|from |from
| (select dev_id,dev_id_md5,dev_type,platform,install,interest,model,case when upper(country) = 'GB' then 'UK' else upper(country) end as country ,osversion, age, gender,behavior,update_date,publish_date | (select dev_id,dev_id_md5,dev_type,platform,install,interest,model,case when upper(country) = 'GB' then 'UK' else upper(country) end as country ,osversion, age, gender,behavior,update_date,merge_bus,publish_date
| from dwh.ods_dmp_user_info_all_v2 where dt = '${yesBef2DayStr}' and update_date > '${yesBef14Part}') a | from dwh.ods_dmp_user_info_all_v2 where dt = '${yesBef2DayStr}' and update_date > '${yesBef14Part}') a
| full outer join | full outer join
| ( | (
| select dev_id,dev_id_md5,dev_type,platform,install,interest,model,case when upper(country) = 'GB' then 'UK' else upper(country) end as country ,osversion, age, gender,behavior,update_date, | select dev_id,dev_id_md5,dev_type,platform,install,interest,model,case when upper(country) = 'GB' then 'UK' else upper(country) end as country ,osversion, age, gender,behavior,update_date,merge_bus,
| '${yesBef1Part}' as publish_date | '${yesBef1Part}' as publish_date
| from dwh.ods_dmp_user_info_daily where dt='${yesBef1Str}' | from dwh.ods_dmp_user_info_daily where dt='${yesBef1Str}'
| ) b | ) b
......
package mobvista.dmp.datasource.device package mobvista.dmp.datasource.device
import java.net.URI
import java.text.SimpleDateFormat
import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.util.DateUtil import mobvista.dmp.util.DateUtil
import mobvista.prd.datasource.util.GsonUtil import mobvista.prd.datasource.util.GsonUtil
...@@ -11,6 +8,8 @@ import org.apache.hadoop.fs.{FileSystem, Path} ...@@ -11,6 +8,8 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
import java.net.URI
import java.text.SimpleDateFormat
import scala.collection.mutable import scala.collection.mutable
class OdsDmpUserInfoDailyV2 extends CommonSparkJob with Serializable { class OdsDmpUserInfoDailyV2 extends CommonSparkJob with Serializable {
...@@ -168,13 +167,12 @@ class OdsDmpUserInfoDailyV2 extends CommonSparkJob with Serializable { ...@@ -168,13 +167,12 @@ class OdsDmpUserInfoDailyV2 extends CommonSparkJob with Serializable {
| group by t.device_id | group by t.device_id
""".stripMargin """.stripMargin
spark.udf.register("mergeToStr", mergeToStr _) spark.udf.register("mergeToStr", mergeToStrV2 _)
hql = hql =
s""" s"""
|select lower(device_id) device_id, max(device_type) device_type, max(platform) platform, mergeToStr(collect_set(install_list)) install |select device_id, device_type, platform, mergeToStr(install_list) install, merge_bus
| from dwh.dmp_install_list | from dwh.dmp_install_list
| where dt = '$yesBef1Str' and business = '14days' and update_date = '$yesBef1Part' | where dt = '$yesBef1Str' and business = '14days' and update_date = '$yesBef1Part'
| group by lower(device_id)
""".stripMargin """.stripMargin
/* /*
hql = hql =
...@@ -430,6 +428,7 @@ class OdsDmpUserInfoDailyV2 extends CommonSparkJob with Serializable { ...@@ -430,6 +428,7 @@ class OdsDmpUserInfoDailyV2 extends CommonSparkJob with Serializable {
| coalesce(b.age,'10') age, | coalesce(b.age,'10') age,
| coalesce(c.gender,'10') gender, | coalesce(c.gender,'10') gender,
| d.behavior, | d.behavior,
| coalesce(e.merge_bus,'') merge_bus,
| "$yesBef1Part" update_date | "$yesBef1Part" update_date
|from dm_pkg_insterest_model_os_country a |from dm_pkg_insterest_model_os_country a
| left outer join tmp_age_daily b on a.dev_id = b.device_id | left outer join tmp_age_daily b on a.dev_id = b.device_id
...@@ -513,6 +512,15 @@ class OdsDmpUserInfoDailyV2 extends CommonSparkJob with Serializable { ...@@ -513,6 +512,15 @@ class OdsDmpUserInfoDailyV2 extends CommonSparkJob with Serializable {
installStr.substring(1) installStr.substring(1)
} }
def mergeToStrV2(installList: String): String = {
val installStr = new StringBuffer
val installJSON = mobvista.dmp.common.MobvistaConstant.String2JSONObject(installList)
installJSON.keySet().iterator().foreach(k => {
installStr.append(",").append(k).append("|").append(installJSON.getString(k))
})
installStr.substring(1)
}
def parseAge(row: Row): Tuple3[String, Int, Double] = { def parseAge(row: Row): Tuple3[String, Int, Double] = {
val deviceId = row.getString(0) val deviceId = row.getString(0)
val age = row.getString(1) val age = row.getString(1)
......
package mobvista.dmp.datasource.id_mapping package mobvista.dmp.datasource.id_mapping
import mobvista.dmp.common.MobvistaConstant import mobvista.dmp.datasource.id_mapping.Constant.{getDevId, parseUA}
import mobvista.dmp.datasource.adn.AdnConstant
import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.StringUtils
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.{Row, SparkSession}
...@@ -15,43 +14,60 @@ import org.apache.spark.sql.{Row, SparkSession} ...@@ -15,43 +14,60 @@ import org.apache.spark.sql.{Row, SparkSession}
*/ */
class AdnRequest extends EtlDeviceIdDaily { class AdnRequest extends EtlDeviceIdDaily {
override def processData(date: String, spark: SparkSession): RDD[(String, Row)] = { override def processData(date: String, i: Int, spark: SparkSession): RDD[(String, Row)] = {
spark.udf.register("getDevId", AdnConstant.getDevId _) spark.udf.register("getDevId", getDevId _)
spark.udf.register("parseUA", parseUA _)
// DWD // DWD
// val sql = Constant.adn_request_sql.replace("@date", date) // val sql = Constant.adn_request_sql.replace("@date", date)
// ODS // ODS
val sql = Constant.adn_request_sql_v2.replace("@date", date) val sql = Constant.adn_request_sql_v2.replace("@date", date)
val rdd = spark.sql(sql).coalesce(60000).rdd.map(row => { val rdd = spark.sql(sql).coalesce(2000).rdd.map(row => {
val gaid = row.getAs[String]("gaid") val gaid = row.getAs[String]("gaid")
val idfa = row.getAs[String]("idfa") val idfa = row.getAs[String]("idfa")
val imei = row.getAs[String]("imei") val imei = row.getAs[String]("imei")
val androidId = row.getAs[String]("androidid") val androidId = row.getAs[String]("androidid")
val extSysid = row.getAs[String]("extsysid") val extSysid = row.getAs[String]("extsysid")
val platform = row.getAs[String]("platform").toLowerCase var platform = row.getAs[String]("platform")
val oaid = row.getAs[String]("oaid") platform = if (StringUtils.isNotBlank(platform)) {
val idfv = row.getAs[String]("idfv") platform.toLowerCase()
val country = row.getAs[String]("country")
val arr = extSysid.split(",", -1)
val sysId = if (StringUtils.isNotBlank(arr(0)) &&
arr(0).matches(MobvistaConstant.didPtn) &&
!arr(0).matches(MobvistaConstant.allZero)) {
arr(0)
} else { } else {
"" ""
} }
val oaid = row.getAs[String]("oaid")
val bkupId = if (arr.length == 2 && StringUtils.isNotBlank(arr(1)) && val idfv = row.getAs[String]("idfv")
arr(1).matches(MobvistaConstant.didPtn) && var country = row.getAs[String]("country")
!arr(1).matches(MobvistaConstant.allZero)) { val ip = row.getAs[String]("ip")
arr(1) val ua = row.getAs[String]("ua")
val brand = row.getAs[String]("brand")
val model = row.getAs[String]("model")
val os_version = row.getAs[String]("os_version")
val osv_upt = row.getAs[String]("osv_upt")
val upt = row.getAs[String]("upt")
country = if (StringUtils.isNotBlank(country)) {
country.toUpperCase()
} else { } else {
"" ""
} }
var sysId = ""
var bkupId = ""
if (StringUtils.isNotBlank(extSysid)) {
val arr = extSysid.split(",", -1)
sysId = if (StringUtils.isNotBlank(arr(0))) {
arr(0)
} else {
""
}
bkupId = if (arr.length == 2 && StringUtils.isNotBlank(arr(1))) {
arr(1)
} else {
""
}
}
val rw = if ("ios".equalsIgnoreCase(platform)) { val rw = if ("ios".equalsIgnoreCase(platform)) {
Row(idfa, idfv, sysId, bkupId, "", "", country) Row(idfa, idfv, sysId, bkupId, "", "", country, ip, ua, brand, model, os_version, osv_upt, upt)
} else { } else {
Row(imei, androidId, oaid, gaid, sysId, bkupId, "", "", country) Row(imei, androidId, oaid, gaid, sysId, bkupId, "", "", country, ip, ua, brand, model, os_version, osv_upt, upt)
} }
(platform, rw) (platform, rw)
}) })
......
...@@ -4,6 +4,8 @@ import mobvista.dmp.common.MobvistaConstant ...@@ -4,6 +4,8 @@ import mobvista.dmp.common.MobvistaConstant
import org.apache.commons.lang.StringUtils import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.types.{StringType, StructField, StructType}
import java.net.URLDecoder
/** /**
* @package: mobvista.dmp.datasource.id_mapping * @package: mobvista.dmp.datasource.id_mapping
* @author: wangjf * @author: wangjf
...@@ -23,7 +25,14 @@ object Constant { ...@@ -23,7 +25,14 @@ object Constant {
StructField("bkupid", StringType), StructField("bkupid", StringType),
StructField("xwho", StringType), StructField("xwho", StringType),
StructField("user_id", StringType), StructField("user_id", StringType),
StructField("country", StringType) StructField("country", StringType),
StructField("ip", StringType), // IP
StructField("ua", StringType), // UA
StructField("brand", StringType), // 品牌
StructField("model", StringType), // 型号
StructField("os_version", StringType), // 操作系统版本
StructField("osv_upt", StringType), // 操作系统更新时间
StructField("upt", StringType) // 开机时间
)) ))
case class AdrTab(imei: String, android_id: String, oaid: String, gaid: String, sysid: String, bkupid: String, xwho: String, user_id: String) case class AdrTab(imei: String, android_id: String, oaid: String, gaid: String, sysid: String, bkupid: String, xwho: String, user_id: String)
...@@ -34,48 +43,67 @@ object Constant { ...@@ -34,48 +43,67 @@ object Constant {
StructField("android_id", StringType), StructField("android_id", StringType),
StructField("oaid", StringType), StructField("oaid", StringType),
StructField("gaid", StringType), StructField("gaid", StringType),
StructField("sysid", StringType), StructField("sysid", StringType), // MTG 自建ID
StructField("bkupid", StringType), StructField("bkupid", StringType), // MTG 自建ID
StructField("xwho", StringType), StructField("xwho", StringType), // tkio 账号
StructField("user_id", StringType), StructField("user_id", StringType), // tkio 账号
StructField("country", StringType) StructField("country", StringType), // 国家信息
StructField("ip", StringType), // IP
StructField("ua", StringType), // UA
StructField("brand", StringType), // 品牌
StructField("model", StringType), // 型号
StructField("os_version", StringType), // 操作系统版本
StructField("osv_upt", StringType), // 操作系统更新时间
StructField("upt", StringType) // 开机时间
)) ))
val dsp_req_sql = val dsp_req_sql =
""" """
|SELECT idfa, gaid, exitid, platform |SELECT idfa, gaid, exitid, platform, country
| FROM dwh.etl_dsp_request_daily_hours | FROM dwh.etl_dsp_request_daily_hours
| WHERE dt = '@date' | WHERE dt = '@date'
|""".stripMargin |""".stripMargin
val dsp_req_sql_v2 = val dsp_req_sql_v2 =
""" """
|SELECT idfa, googleadid gaid, ext5 exitid, os platform, countrycode country |SELECT idfa, googleadid gaid, ext5 exitid, os platform, countrycode country, deviceip ip,
| parseUA(ext6) ua, make brand, model, osv os_version, '0' osv_upt, '0' upt
| FROM adn_dsp.log_adn_dsp_request_orc_hour | FROM adn_dsp.log_adn_dsp_request_orc_hour
| WHERE CONCAT(yr,mt,dt) = '@date' | WHERE CONCAT(yr,mt,dt) = '@date' @hour
| GROUP BY idfa, googleadid, ext5, os, countrycode | GROUP BY idfa, googleadid, ext5, os, countrycode, deviceip, parseUA(ext6), make, model, osv
|""".stripMargin |""".stripMargin
val adn_request_sql = val adn_request_sql =
""" """
|SELECT idfa, gaid, oaid, idfv, androidid, extsysid, imei, platform |SELECT idfa, gaid, oaid, idfv, androidid, extsysid, imei, platform, country
| FROM dwh.etl_adn_org_request_daily_hours | FROM dwh.etl_adn_org_request_daily_hours
| WHERE CONCAT(yt,mt,dt) = '@date' | WHERE CONCAT(yt,mt,dt) = '@date'
|""".stripMargin |""".stripMargin
// ext_data
val adn_request_sql_v2 = val adn_request_sql_v2 =
""" """
|SELECT idfa, gaid, oaid, getDevId(cdn_ab) idfv, dev_id androidid, |SELECT idfa, gaid, ext_oaid oaid, getDevId(cdn_ab) idfv, dev_id androidid, ext_sysid extsysid, imei, platform,
| ext_sysid extsysid, imei, platform, country_code country | country_code country, ip, parseUA(ext_systemuseragent) ua, ext_brand brand, ext_model model, os_version,
| COALESCE(get_json_object(ext_data,'$.osvut'),0) osv_upt, COALESCE(get_json_object(ext_data,'$.upt'),0) upt
| FROM dwh.ods_adn_trackingnew_request | FROM dwh.ods_adn_trackingnew_request
| WHERE CONCAT(yyyy,mm,dd) = '@date' | WHERE CONCAT(yyyy,mm,dd) = '@date'
| GROUP BY idfa, gaid, oaid, cdn_ab, dev_id, ext_sysid, imei, platform, country_code | GROUP BY idfa, gaid, oaid, cdn_ab, dev_id, ext_sysid, imei, platform, country_code, ip, parseUA(ext_systemuseragent),
| ext_brand, ext_model, os_version, COALESCE(get_json_object(ext_data,'$.osvut'),0), COALESCE(get_json_object(ext_data,'$.upt'),0)
|""".stripMargin |""".stripMargin
val sss_sql = val sss_sql =
""" """
|""".stripMargin |""".stripMargin
def parseUA(ua: String): String = {
if (StringUtils.isNotBlank(ua)) {
URLDecoder.decode(ua.replaceAll("%(?![0-9a-fA-F]{2})", "%25"), "UTF-8")
} else {
""
}
}
def getDevId(devId: String): String = { def getDevId(devId: String): String = {
var dev_id = "" var dev_id = ""
if (StringUtils.isNotBlank(devId) && !",".equals(devId)) { if (StringUtils.isNotBlank(devId) && !",".equals(devId)) {
......
package mobvista.dmp.datasource.id_mapping package mobvista.dmp.datasource.id_mapping
import mobvista.dmp.common.MobvistaConstant import mobvista.dmp.common.MobvistaConstant
import mobvista.dmp.datasource.id_mapping.Constant.parseUA
import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.StringUtils
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.{Row, SparkSession}
...@@ -14,17 +15,43 @@ import org.apache.spark.sql.{Row, SparkSession} ...@@ -14,17 +15,43 @@ import org.apache.spark.sql.{Row, SparkSession}
*/ */
class DspReq extends EtlDeviceIdDaily { class DspReq extends EtlDeviceIdDaily {
override def processData(date: String, spark: SparkSession): RDD[(String, Row)] = { override def processData(date: String, i: Int, spark: SparkSession): RDD[(String, Row)] = {
spark.udf.register("parseUA", parseUA _)
// DWD // DWD
// val sql = Constant.dsp_req_sql.replace("@date", date) // val sql = Constant.dsp_req_sql.replace("@date", date)
// ODS // ODS
val hour = i match {
case 0 =>
" AND hh BETWEEN '00' AND '05'"
case 1 =>
" AND hh BETWEEN '06' AND '11'"
case 2 =>
" AND hh BETWEEN '12' AND '17'"
case 3 =>
" AND hh BETWEEN '18' AND '23'"
case _ =>
""
}
val sql = Constant.dsp_req_sql_v2.replace("@date", date) val sql = Constant.dsp_req_sql_v2.replace("@date", date)
val rdd = spark.sql(sql).coalesce(60000).rdd.map(row => { .replace("@hour", hour)
val rdd = spark.sql(sql).coalesce(2000).rdd.map(row => {
var idfa = row.getAs[String]("idfa") var idfa = row.getAs[String]("idfa")
var gaid = row.getAs[String]("gaid") var gaid = row.getAs[String]("gaid")
val platform = row.getAs[String]("platform") val platform = row.getAs[String]("platform")
val exitId = row.getAs[String]("exitid") val exitId = row.getAs[String]("exitid")
val country = row.getAs[String]("country").toUpperCase() var country = row.getAs[String]("country")
country = if (StringUtils.isNotBlank(country)) {
country.toUpperCase()
} else {
""
}
val ip = row.getAs[String]("ip")
val ua = row.getAs[String]("ua")
val brand = row.getAs[String]("brand")
val model = row.getAs[String]("model")
val os_version = row.getAs[String]("os_version")
val osv_upt = row.getAs[String]("osv_upt")
val upt = row.getAs[String]("upt")
var idfv = "" var idfv = ""
var oaid = "" var oaid = ""
var imei = "" var imei = ""
...@@ -53,12 +80,13 @@ class DspReq extends EtlDeviceIdDaily { ...@@ -53,12 +80,13 @@ class DspReq extends EtlDeviceIdDaily {
androidId = devIds(7) androidId = devIds(7)
} }
} }
} }
} }
val rw = if ("ios".equalsIgnoreCase(platform)) { val rw = if ("ios".equalsIgnoreCase(platform)) {
Row(idfa, idfv, "", "", "", "", country) Row(idfa, idfv, "", "", "", "", country, ip, ua, brand, model, os_version, osv_upt, upt)
} else { } else {
Row(imei, androidId, oaid, gaid, "", "", "", "", country) Row(imei, androidId, oaid, gaid, "", "", "", "", country, ip, ua, brand, model, os_version, osv_upt, upt)
} }
(platform, rw) (platform, rw)
}) })
......
...@@ -39,36 +39,73 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable { ...@@ -39,36 +39,73 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable {
val spark = MobvistaConstant.createSparkSession(s"EtlDeviceIdDaily.$business.$date") val spark = MobvistaConstant.createSparkSession(s"EtlDeviceIdDaily.$business.$date")
try { try {
val df = processData(date, spark).distinct(20000) if ("dsp_req".equalsIgnoreCase(business)) {
.persist(StorageLevel.MEMORY_AND_DISK_SER) for (i <- 0 until 4) {
val df = processData(date, i, spark)
val iosTab = df.filter(plf => { .repartition(20000)
"ios".equals(plf._1) .persist(StorageLevel.MEMORY_AND_DISK_SER)
}).map(row => {
row._2
})
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + "/ios"), true)
spark.createDataFrame(iosTab, iosSchema)
.coalesce(coalesce)
.write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output + "/ios")
val adrTab = df.filter(plf => {
"android".equals(plf._1)
}).map(row => {
row._2
})
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + "/android"), true)
spark.createDataFrame(adrTab, adrSchema)
.coalesce(coalesce)
.write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output + "/android")
val iosTab = df.filter(plf => {
"ios".equals(plf._1)
}).map(row => {
row._2
})
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/ios/${i}"), true)
spark.createDataFrame(iosTab, iosSchema)
.coalesce(coalesce)
.write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output + s"/ios/${i}")
val adrTab = df.filter(plf => {
"android".equals(plf._1)
}).map(row => {
row._2
})
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/android/${i}"), true)
spark.createDataFrame(adrTab, adrSchema)
.coalesce(coalesce)
.write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output + s"/android/${i}")
df.unpersist(true)
}
} else {
val df = processData(date, 0, spark)
.repartition(20000)
.persist(StorageLevel.MEMORY_AND_DISK_SER)
val iosTab = df.filter(plf => {
"ios".equals(plf._1)
}).map(row => {
row._2
})
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + "/ios"), true)
spark.createDataFrame(iosTab, iosSchema)
.coalesce(coalesce)
.write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output + "/ios")
val adrTab = df.filter(plf => {
"android".equals(plf._1)
}).map(row => {
row._2
})
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/android"), true)
spark.createDataFrame(adrTab, adrSchema)
.coalesce(coalesce)
.write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output + s"/android")
}
} finally { } finally {
if (spark != null) { if (spark != null) {
spark.stop() spark.stop()
...@@ -77,5 +114,5 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable { ...@@ -77,5 +114,5 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable {
0 0
} }
def processData(date: String, spark: SparkSession): RDD[(String, Row)] def processData(date: String, i: Int, spark: SparkSession): RDD[(String, Row)]
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment