Commit 12a03fcf by WangJinfeng

fix tracking etl job

parent 03a77d82
...@@ -28,6 +28,6 @@ if [ $? -ne 0 ]; then ...@@ -28,6 +28,6 @@ if [ $? -ne 0 ]; then
exit 255 exit 255
fi fi
common_mount_partition "dwd" "dwd_device_ios_ids_inc_daily" "dt='${LOG_TIME}',active_type='${BUSINESS}'" "${OUTPUT_PATH}/ios" # common_mount_partition "dwd" "dwd_device_ios_ids_inc_daily" "dt='${LOG_TIME}',active_type='${BUSINESS}'" "${OUTPUT_PATH}/ios"
common_mount_partition "dwd" "dwd_device_android_ids_inc_daily" "dt='${LOG_TIME}',active_type='${BUSINESS}'" "${OUTPUT_PATH}/android" # common_mount_partition "dwd" "dwd_device_android_ids_inc_daily" "dt='${LOG_TIME}',active_type='${BUSINESS}'" "${OUTPUT_PATH}/android"
...@@ -28,6 +28,6 @@ if [ $? -ne 0 ]; then ...@@ -28,6 +28,6 @@ if [ $? -ne 0 ]; then
exit 255 exit 255
fi fi
common_mount_partition "dwd" "dwd_device_ios_ids_inc_daily" "dt='${LOG_TIME}',active_type='${BUSINESS}'" "${OUTPUT_PATH}/ios" # common_mount_partition "dwd" "dwd_device_ios_ids_inc_daily" "dt='${LOG_TIME}',active_type='${BUSINESS}'" "${OUTPUT_PATH}/ios"
common_mount_partition "dwd" "dwd_device_android_ids_inc_daily" "dt='${LOG_TIME}',active_type='${BUSINESS}'" "${OUTPUT_PATH}/android" # common_mount_partition "dwd" "dwd_device_android_ids_inc_daily" "dt='${LOG_TIME}',active_type='${BUSINESS}'" "${OUTPUT_PATH}/android"
...@@ -17,11 +17,19 @@ class AdnRequest extends EtlDeviceIdDaily { ...@@ -17,11 +17,19 @@ class AdnRequest extends EtlDeviceIdDaily {
override def processData(date: String, i: Int, spark: SparkSession): RDD[(String, Row)] = { override def processData(date: String, i: Int, spark: SparkSession): RDD[(String, Row)] = {
spark.udf.register("getDevId", getDevId _) spark.udf.register("getDevId", getDevId _)
spark.udf.register("parseUA", parseUA _) spark.udf.register("parseUA", parseUA _)
val year = date.substring(0, 4)
val month = date.substring(4, 6)
val day = date.substring(6, 8)
// DWD // DWD
// val sql = Constant.adn_request_sql.replace("@date", date) // val sql = Constant.adn_request_sql.replace("@date", date)
// ODS // ODS
val sql = Constant.adn_request_sql_v3.replace("@date", date) val sql = Constant.adn_request_sql_v3
val rdd = spark.sql(sql).coalesce(5000).rdd.map(row => { .replace("@year", year)
.replace("@month", month)
.replace("@day", day)
val rdd = spark.sql(sql).coalesce(20000).rdd.map(row => {
val gaid = row.getAs[String]("gaid") val gaid = row.getAs[String]("gaid")
val idfa = row.getAs[String]("idfa") val idfa = row.getAs[String]("idfa")
val imei = row.getAs[String]("imei") val imei = row.getAs[String]("imei")
...@@ -40,6 +48,7 @@ class AdnRequest extends EtlDeviceIdDaily { ...@@ -40,6 +48,7 @@ class AdnRequest extends EtlDeviceIdDaily {
val osv_upt = row.getAs[String]("osv_upt") val osv_upt = row.getAs[String]("osv_upt")
val upt = row.getAs[String]("upt") val upt = row.getAs[String]("upt")
val network_type = row.getAs[String]("network_type") val network_type = row.getAs[String]("network_type")
val cnt = row.getAs[Long]("cnt")
var sysId = "" var sysId = ""
var bkupId = "" var bkupId = ""
if (StringUtils.isNotBlank(extSysid)) { if (StringUtils.isNotBlank(extSysid)) {
...@@ -56,7 +65,7 @@ class AdnRequest extends EtlDeviceIdDaily { ...@@ -56,7 +65,7 @@ class AdnRequest extends EtlDeviceIdDaily {
} }
} }
process(idfa, idfv, pkg_name, imei, androidId, oaid, gaid, sysId, bkupId, country, ip, ua, brand, model, os_version, process(idfa, idfv, pkg_name, imei, androidId, oaid, gaid, sysId, bkupId, country, ip, ua, brand, model, os_version,
osv_upt, upt, network_type, platform) osv_upt, upt, network_type, platform, cnt)
}) })
rdd rdd
} }
......
...@@ -70,7 +70,8 @@ object Constant { ...@@ -70,7 +70,8 @@ object Constant {
StructField("osv_upt", StringType), // 操作系统更新时间 StructField("osv_upt", StringType), // 操作系统更新时间
StructField("upt", StringType), // 开机时间 StructField("upt", StringType), // 开机时间
StructField("event_type", StringType), // 事件类型 StructField("event_type", StringType), // 事件类型
StructField("network_type", StringType) // 网络类型,用于判断API/SDK StructField("network_type", StringType), // 网络类型,用于判断API/SDK
StructField("cnt", LongType) // 事件频次
)) ))
/** /**
...@@ -148,7 +149,8 @@ object Constant { ...@@ -148,7 +149,8 @@ object Constant {
StructField("osv_upt", StringType), // 操作系统更新时间 StructField("osv_upt", StringType), // 操作系统更新时间
StructField("upt", StringType), // 开机时间 StructField("upt", StringType), // 开机时间
StructField("event_type", StringType), // 事件类型 StructField("event_type", StringType), // 事件类型
StructField("network_type", StringType) // 网络类型,用于判断API/SDK StructField("network_type", StringType), // 网络类型,用于判断API/SDK
StructField("cnt", LongType) // 事件频次
)) ))
val otherSchema: StructType = StructType(Array( val otherSchema: StructType = StructType(Array(
...@@ -172,7 +174,9 @@ object Constant { ...@@ -172,7 +174,9 @@ object Constant {
StructField("osv_upt", StringType), // 操作系统更新时间 StructField("osv_upt", StringType), // 操作系统更新时间
StructField("upt", StringType), // 开机时间 StructField("upt", StringType), // 开机时间
StructField("event_type", StringType), // 事件类型 StructField("event_type", StringType), // 事件类型
StructField("network_type", StringType) // 网络类型,用于判断API/SDK StructField("network_type", StringType), // 网络类型,用于判断API/SDK
StructField("cnt", LongType) // 事件频次
)) ))
val dsp_req_sql = val dsp_req_sql =
...@@ -186,9 +190,11 @@ object Constant { ...@@ -186,9 +190,11 @@ object Constant {
""" """
|SELECT idfa, googleadid gaid, ext5 exitid, os platform, countrycode country, deviceip ip, |SELECT idfa, googleadid gaid, ext5 exitid, os platform, countrycode country, deviceip ip,
| parseUA(ext6) ua, make brand, model, osv os_version, '0' osv_upt, '0' upt, cpackagename pkg_name, | parseUA(ext6) ua, make brand, model, osv os_version, '0' osv_upt, '0' upt, cpackagename pkg_name,
| cncttype network_type | cncttype network_type, COUNT(1) cnt
| FROM adn_dsp.log_adn_dsp_request_orc_hour | FROM adn_dsp.log_adn_dsp_request_orc_hour
| WHERE CONCAT(yr,mt,dt) = '@date' @hour | WHERE CONCAT(yr,mt,dt) = '@date' @hour
| GROUP BY idfa, googleadid, ext5, os, countrycode, deviceip,
| parseUA(ext6), make, model, osv, cpackagename, cncttype
|""".stripMargin |""".stripMargin
val adn_request_sql = val adn_request_sql =
...@@ -215,9 +221,12 @@ object Constant { ...@@ -215,9 +221,12 @@ object Constant {
""" """
|SELECT idfa, gaid, ext_oaid oaid, getDevId(cdn_ab) idfv, ext_packagename pkg_name, dev_id androidid, ext_sysid extsysid, imei, platform, |SELECT idfa, gaid, ext_oaid oaid, getDevId(cdn_ab) idfv, ext_packagename pkg_name, dev_id androidid, ext_sysid extsysid, imei, platform,
| country_code country, ip, parseUA(ext_systemuseragent) ua, ext_brand brand, ext_model model, os_version, | country_code country, ip, parseUA(ext_systemuseragent) ua, ext_brand brand, ext_model model, os_version,
| COALESCE(get_json_object(ext_data,'$.osvut'),0) osv_upt, COALESCE(get_json_object(ext_data,'$.upt'),0) upt, network_type | COALESCE(get_json_object(ext_data,'$.osvut'),0) osv_upt, COALESCE(get_json_object(ext_data,'$.upt'),0) upt, network_type, COUNT(1) cnt
| FROM dwh.ods_adn_trackingnew_request | FROM dwh.ods_adn_trackingnew_request
| WHERE CONCAT(yyyy,mm,dd) = '@date' | WHERE yyyy = '@year' AND mm = '@month' AND dd = '@day'
| GROUP BY idfa, gaid, ext_oaid, getDevId(cdn_ab), ext_packagename, dev_id, ext_sysid, imei, platform,
| country_code, ip, parseUA(ext_systemuseragent), ext_brand, ext_model, os_version,
| COALESCE(get_json_object(ext_data,'$.osvut'),0), COALESCE(get_json_object(ext_data,'$.upt'),0), network_type
|""".stripMargin |""".stripMargin
val ios_id_mapping_sql: String = val ios_id_mapping_sql: String =
...@@ -272,7 +281,7 @@ object Constant { ...@@ -272,7 +281,7 @@ object Constant {
def process(idfa: String, idfv: String, pkg_name: String, imei: String, androidId: String, oaid: String, gaid: String, sysId: String, def process(idfa: String, idfv: String, pkg_name: String, imei: String, androidId: String, oaid: String, gaid: String, sysId: String,
bkupId: String, country: String, ip: String, ua: String, brand: String, model: String, os_version: String, osv_upt: String, bkupId: String, country: String, ip: String, ua: String, brand: String, model: String, os_version: String, osv_upt: String,
upt: String, network_type: String, platform: String): (String, Row) = { upt: String, network_type: String, platform: String, cnt: Long): (String, Row) = {
val f_idfa = if (StringUtils.isNotBlank(idfa) && idfa.matches(didPtn) && !idfa.matches(allZero)) { val f_idfa = if (StringUtils.isNotBlank(idfa) && idfa.matches(didPtn) && !idfa.matches(allZero)) {
idfa idfa
} else { } else {
...@@ -375,16 +384,16 @@ object Constant { ...@@ -375,16 +384,16 @@ object Constant {
"other" "other"
} }
val rw = if ("ios".equals(f_platform)) { val rw = if ("ios".equals(f_platform)) {
Row(f_idfa, f_idfv, pkg_name, f_sysId, f_bkupId, "", "", f_country, f_ip, f_ua, f_brand, f_model, f_osv, f_osv_upt, f_upt, "", f_network_type) Row(f_idfa, f_idfv, pkg_name, f_sysId, f_bkupId, "", "", f_country, f_ip, f_ua, f_brand, f_model, f_osv, f_osv_upt, f_upt, "", f_network_type, cnt)
} else if ("android".equals(f_platform)) { } else if ("android".equals(f_platform)) {
Row(f_imei, f_androidId, pkg_name, f_oaid, f_gaid, f_sysId, f_bkupId, "", "", f_country, f_ip, f_ua, f_brand, f_model, f_osv, f_osv_upt, f_upt, "", f_network_type) Row(f_imei, f_androidId, pkg_name, f_oaid, f_gaid, f_sysId, f_bkupId, "", "", f_country, f_ip, f_ua, f_brand, f_model, f_osv, f_osv_upt, f_upt, "", f_network_type, cnt)
} else { } else {
Row(f_idfa, f_idfv, pkg_name, f_imei, f_androidId, f_oaid, f_gaid, f_sysId, f_bkupId, "", "", f_country, f_ip, f_ua, f_brand, f_model, f_osv, f_osv_upt, f_upt, "", f_network_type) Row(f_idfa, f_idfv, pkg_name, f_imei, f_androidId, f_oaid, f_gaid, f_sysId, f_bkupId, "", "", f_country, f_ip, f_ua, f_brand, f_model, f_osv, f_osv_upt, f_upt, "", f_network_type, cnt)
} }
(f_platform, rw) (f_platform, rw)
} }
case class Result(device_id: String, device_type: String, one_id: String) extends Serializable case class Result(device_id: String, device_type: String, one_id: String) extends Serializable
case class OneIDScore(one_id: String, one_type: String, one_score: Double) extends Serializable case class OneIDScore(one_id: String, one_type: String, one_score: Double, one_version: String) extends Serializable
} }
\ No newline at end of file
...@@ -58,6 +58,7 @@ class DspReq extends EtlDeviceIdDaily { ...@@ -58,6 +58,7 @@ class DspReq extends EtlDeviceIdDaily {
val os_version = row.getAs[String]("os_version") val os_version = row.getAs[String]("os_version")
val osv_upt = row.getAs[String]("osv_upt") val osv_upt = row.getAs[String]("osv_upt")
val upt = row.getAs[String]("upt") val upt = row.getAs[String]("upt")
val cnt = row.getAs[Long]("cnt")
var idfv = "" var idfv = ""
var oaid = "" var oaid = ""
var imei = "" var imei = ""
...@@ -90,7 +91,7 @@ class DspReq extends EtlDeviceIdDaily { ...@@ -90,7 +91,7 @@ class DspReq extends EtlDeviceIdDaily {
} }
} }
process(idfa, idfv, pkg_name, imei, androidId, oaid, gaid, sysId = "", bkupId = "", country, ip, ua, brand, process(idfa, idfv, pkg_name, imei, androidId, oaid, gaid, sysId = "", bkupId = "", country, ip, ua, brand,
model, os_version, osv_upt, upt, network_type, platform) model, os_version, osv_upt, upt, network_type, platform, cnt)
}) })
rdd rdd
} }
......
...@@ -42,6 +42,7 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable { ...@@ -42,6 +42,7 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable {
if ("dsp_req".equalsIgnoreCase(business)) { if ("dsp_req".equalsIgnoreCase(business)) {
for (i <- 0 until 6) { for (i <- 0 until 6) {
val df = processData(date, i, spark) val df = processData(date, i, spark)
.repartition(coalesce)
df.persist(StorageLevel.MEMORY_AND_DISK_SER) df.persist(StorageLevel.MEMORY_AND_DISK_SER)
val iosTab = df.filter(plf => { val iosTab = df.filter(plf => {
...@@ -52,7 +53,6 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable { ...@@ -52,7 +53,6 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable {
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/ios/${i}"), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/ios/${i}"), true)
spark.createDataFrame(iosTab, iosSchema) spark.createDataFrame(iosTab, iosSchema)
.repartition(coalesce)
.write.mode(SaveMode.Overwrite) .write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib") .option("orc.compress", "zlib")
.orc(output + s"/ios/${i}") .orc(output + s"/ios/${i}")
...@@ -66,7 +66,6 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable { ...@@ -66,7 +66,6 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable {
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/android/${i}"), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/android/${i}"), true)
spark.createDataFrame(adrTab, adrSchema) spark.createDataFrame(adrTab, adrSchema)
.repartition(coalesce)
.write.mode(SaveMode.Overwrite) .write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib") .option("orc.compress", "zlib")
.orc(output + s"/android/${i}") .orc(output + s"/android/${i}")
...@@ -80,7 +79,6 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable { ...@@ -80,7 +79,6 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable {
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/other/${i}"), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/other/${i}"), true)
spark.createDataFrame(otherTab, otherSchema) spark.createDataFrame(otherTab, otherSchema)
.repartition(coalesce)
.write.mode(SaveMode.Overwrite) .write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib") .option("orc.compress", "zlib")
.orc(output + s"/other/${i}") .orc(output + s"/other/${i}")
...@@ -89,6 +87,7 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable { ...@@ -89,6 +87,7 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable {
} }
} else { } else {
val df = processData(date, 0, spark) val df = processData(date, 0, spark)
.repartition(coalesce)
df.persist(StorageLevel.MEMORY_AND_DISK_SER) df.persist(StorageLevel.MEMORY_AND_DISK_SER)
val iosTab = df.filter(plf => { val iosTab = df.filter(plf => {
...@@ -99,7 +98,6 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable { ...@@ -99,7 +98,6 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable {
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + "/ios"), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + "/ios"), true)
spark.createDataFrame(iosTab, iosSchema) spark.createDataFrame(iosTab, iosSchema)
.coalesce(coalesce)
.write.mode(SaveMode.Overwrite) .write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib") .option("orc.compress", "zlib")
.orc(output + "/ios") .orc(output + "/ios")
...@@ -113,7 +111,6 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable { ...@@ -113,7 +111,6 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable {
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/android"), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/android"), true)
spark.createDataFrame(adrTab, adrSchema) spark.createDataFrame(adrTab, adrSchema)
.coalesce(coalesce)
.write.mode(SaveMode.Overwrite) .write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib") .option("orc.compress", "zlib")
.orc(output + s"/android") .orc(output + s"/android")
...@@ -127,11 +124,21 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable { ...@@ -127,11 +124,21 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable {
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/other"), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/other"), true)
spark.createDataFrame(otherTab, otherSchema) spark.createDataFrame(otherTab, otherSchema)
.coalesce(coalesce)
.write.mode(SaveMode.Overwrite) .write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib") .option("orc.compress", "zlib")
.orc(output + s"/other") .orc(output + s"/other")
} }
spark.sql(
s"""
|ALTER TABLE dws.dws_device_id_ios_frequency ADD IF NOT EXISTS PARTITION (dt='$date',source='${business}')
| LOCATION '$output/ios'
|""".stripMargin)
spark.sql(
s"""
|ALTER TABLE dws.dws_device_id_android_frequency ADD IF NOT EXISTS PARTITION (dt='$date',source='${business}')
| LOCATION '$output/android'
|""".stripMargin)
} finally { } finally {
if (spark != null) { if (spark != null) {
spark.stop() spark.stop()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment