Commit 427a5422 by WangJinfeng

fix bug

parent 5f24407c
......@@ -437,10 +437,10 @@ DSP_DEVICE_MAPPING="s3://mob-emr-test/dataplatform/DataWareHouse/data/dsp/device
ID_MAPPING="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwd/dwd_device_ids_inc_daily"
ADS_DEVICE_MID_ID_MAPPING="s3://mob-emr-test/dataplatform/DataWareHouse/data/ads/ads_device_mid_id_mapping"
ADS_DEVICE_ID_MAPPING="s3://mob-emr-test/dataplatform/DataWareHouse/data/ads/ads_device_id_mapping"
ADS_DEVICE_TAG_ALL_DAILY="s3://mob-emr-test/reyun/dmp/ads/ads_device_tag_all_weekly"
JAR=./DMP.jar
# 检查_SUCCESS文件,若不存在则循环检测
......
......@@ -66,7 +66,7 @@ object AdnConstant {
val ids = devId.split(",", -1)
if (StringUtils.isNotBlank(ids(0)) && ids(0).matches(MobvistaConstant.didPtn) && !ids(0).matches(MobvistaConstant.allZero)) {
dev_id = ids(0)
} else if (ids.length == 2 && !ids(1).matches(MobvistaConstant.didPtn) && !ids(1).matches(MobvistaConstant.allZero)) {
} else if (ids.length == 2 && ids(1).matches(MobvistaConstant.didPtn) && !ids(1).matches(MobvistaConstant.allZero)) {
dev_id = ids(1)
}
}
......
......@@ -125,7 +125,7 @@ class AdnOrgLogEtlHours extends CommonSparkJob with Serializable {
} else {
""
}
var f_platform = if (StringUtils.isNotBlank(platform)) {
val f_platform = if (StringUtils.isNotBlank(platform)) {
platform.toLowerCase()
} else {
""
......@@ -145,24 +145,14 @@ class AdnOrgLogEtlHours extends CommonSparkJob with Serializable {
} else {
""
}
f_platform = if (f_platform.contains("ios") || f_platform.contains("iphone") || f_brand.toLowerCase.contains("apple")
|| f_model.toLowerCase.contains("iphone") || f_model.toLowerCase.contains("ipad") || f_osv.toLowerCase.contains("ios")
|| StringUtils.isNotBlank(f_idfa) || StringUtils.isNotBlank(f_idfv) || StringUtils.isNotBlank(f_ruid)) {
"ios"
} else if (f_platform.contains("android") || f_osv.toLowerCase.contains("android") ||
StringUtils.isNotBlank(f_imei) || StringUtils.isNotBlank(f_androidId) || StringUtils.isNotBlank(f_oaid) || StringUtils.isNotBlank(f_gaid)) {
"android"
} else {
"other"
}
if ((StringUtils.isNotBlank(f_idfa) || StringUtils.isNotBlank(f_idfv) || StringUtils.isNotBlank(f_imei) || StringUtils.isNotBlank(f_androidId) ||
StringUtils.isNotBlank(f_oaid) || StringUtils.isNotBlank(f_gaid) || StringUtils.isNotBlank(f_sysId) || StringUtils.isNotBlank(f_ruid)) &&
!"other".equals(f_platform)) {
MRUtils.JOINER.join(date, time, timestamp, appId, f_platform, osVersion, sdkVersion, deviceModel, screenSize, countryCode,
language, ip, f_imei, mac, f_androidId, f_gaid, f_idfa, deviceBrand, f_sysId, packageName, strategy, f_oaid, f_idfv, f_ruid)
if (f_platform.equals("ios")) {
MRUtils.JOINER.join(date, time, timestamp, appId, f_platform, f_osv, sdkVersion, f_model, screenSize, countryCode,
language, ip, "", mac, "", "", f_idfa, f_brand, f_sysId, packageName, strategy, "", f_idfv, f_ruid)
} else if (f_platform.equals("android")) {
MRUtils.JOINER.join(date, time, timestamp, appId, f_platform, f_osv, sdkVersion, f_model, screenSize, countryCode,
language, ip, f_imei, mac, f_androidId, f_gaid, "", f_brand, f_sysId, packageName, strategy, f_oaid, "", "")
} else {
null
""
}
/*
if ((StringUtils.isNotBlank(idfa) && idfa.matches(MobvistaConstant.didPtn)) || (StringUtils.isNotBlank(idfv) && idfv.matches(MobvistaConstant.didPtn)) ||
......@@ -188,9 +178,7 @@ class AdnOrgLogEtlHours extends CommonSparkJob with Serializable {
null
}
*/
}).filter(l => {
StringUtils.isNotBlank(l)
})
}).filter(l => StringUtils.isNotBlank(l))
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)
......
......@@ -57,7 +57,7 @@ class DeviceInfoJob extends CommonSparkJob with Serializable {
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.io.compression.codec", "lz4")
.config("spark.io.compression.lz4.blockSize", "64k")
.config("spark.sql.autoBroadcastJoinThreshold", "-1")
.config("spark.sql.autoBroadcastJoinThreshold", "367001600")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
......@@ -93,6 +93,13 @@ class DeviceInfoJob extends CommonSparkJob with Serializable {
s"""
|SELECT id, package_name FROM dwh.package_mapping WHERE dt = '${package_dt}'
""".stripMargin
/*
val packageMap = spark.sql(package_sql).rdd.map(r => {
(r.getAs("package_name").toString.toLowerCase, Integer.parseInt(r.getAs("id").toString))
}).cache().collectAsMap()
packageMap
*/
packageMap = spark.sparkContext.broadcast(spark.sql(package_sql).rdd.map(r => {
(r.getAs("package_name").toString.toLowerCase, Integer.parseInt(r.getAs("id").toString))
}).collectAsMap())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment