Commit a3f58be6 by WangJinfeng

init id_mapping

parent b1f36887
...@@ -437,6 +437,8 @@ DSP_DEVICE_MAPPING="s3://mob-emr-test/dataplatform/DataWareHouse/data/dsp/device ...@@ -437,6 +437,8 @@ DSP_DEVICE_MAPPING="s3://mob-emr-test/dataplatform/DataWareHouse/data/dsp/device
ID_MAPPING="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwd/dwd_device_ids_inc_daily" ID_MAPPING="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwd/dwd_device_ids_inc_daily"
ADS_DEVICE_MID_ID_MAPPING="s3://mob-emr-test/dataplatform/DataWareHouse/data/ads/ads_device_mid_id_mapping"
ADS_DEVICE_ID_MAPPING="s3://mob-emr-test/dataplatform/DataWareHouse/data/ads/ads_device_id_mapping" ADS_DEVICE_ID_MAPPING="s3://mob-emr-test/dataplatform/DataWareHouse/data/ads/ads_device_id_mapping"
JAR=./DMP.jar JAR=./DMP.jar
......
...@@ -19,8 +19,8 @@ spark-submit --class mobvista.dmp.datasource.id_mapping.DspReq \ ...@@ -19,8 +19,8 @@ spark-submit --class mobvista.dmp.datasource.id_mapping.DspReq \
--name "EtlDeviceIdDaily.$BUSINESS.$LOG_TIME" \ --name "EtlDeviceIdDaily.$BUSINESS.$LOG_TIME" \
--conf spark.yarn.executor.memoryOverhead=2048 \ --conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.network.timeout=720s \ --conf spark.network.timeout=720s \
--conf spark.sql.shuffle.partitions=10000 \ --conf spark.sql.shuffle.partitions=20000 \
--conf spark.default.parallelism=10000 \ --conf spark.default.parallelism=20000 \
--master yarn --deploy-mode cluster --executor-memory 12g --driver-memory 8g --executor-cores 5 --num-executors 200 \ --master yarn --deploy-mode cluster --executor-memory 12g --driver-memory 8g --executor-cores 5 --num-executors 200 \
../${JAR} -date ${LOG_TIME} -business ${BUSINESS} -output ${OUTPUT_PATH} -coalesce 2000 ../${JAR} -date ${LOG_TIME} -business ${BUSINESS} -output ${OUTPUT_PATH} -coalesce 2000
......
type=command type=command
command=sh -x id_mapping.sh dependencies=id_mapping_overseas_android,id_mapping_cn_android
\ No newline at end of file command=echo "id_mapping job end!"
\ No newline at end of file
...@@ -2,7 +2,11 @@ ...@@ -2,7 +2,11 @@
source ../dmp_env.sh source ../dmp_env.sh
LOG_TIME=$(date +%Y-%m-%d -d "-1 day $ScheduleTime") COUNTRY=$1
PLATFORM=$2
LOG_TIME=$(date +%Y%m%d -d "-1 day $ScheduleTime")
date_path=$(date +'%Y/%m/%d' -d "-1 day $ScheduleTime") date_path=$(date +'%Y/%m/%d' -d "-1 day $ScheduleTime")
...@@ -10,28 +14,22 @@ ADN_REQUEST_INPUT_PATH=${ID_MAPPING}/${date_path}/adn_request ...@@ -10,28 +14,22 @@ ADN_REQUEST_INPUT_PATH=${ID_MAPPING}/${date_path}/adn_request
DSP_INPUT_PATH=${ID_MAPPING}/${date_path}/dsp_req DSP_INPUT_PATH=${ID_MAPPING}/${date_path}/dsp_req
check_await "${ADN_REQUEST_INPUT_PATH}/_SUCCESS" # check_await "${ADN_REQUEST_INPUT_PATH}/$PLATFORM/_SUCCESS"
check_await "${DSP_INPUT_PATH}/_SUCCESS"
OUTPUT_PATH=${ADS_DEVICE_MID_ID_MAPPING}/${date_path}
RESULT_OUTPUT_PATH=${ADS_DEVICE_ID_MAPPING}/${date_path}
country="US" # check_await "${DSP_INPUT_PATH}/$PLATFORM/_SUCCESS"
platform="ios" OUTPUT_PATH=${ADS_DEVICE_ID_MAPPING}/${date_path}/${COUNTRY}/${PLATFORM}
spark-submit --class mobvista.dmp.datasource.id_mapping.IDMappingGraphx \ spark-submit --class mobvista.dmp.datasource.id_mapping.IDMappingGraphx \
--name "IDMappingGraphx.${LOG_TIME}.${country}.${platform}" \ --name "IDMappingGraphx.${LOG_TIME}.${COUNTRY}.${PLATFORM}" \
--conf spark.yarn.executor.memoryOverhead=2048 \ --conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.network.timeout=720s \ --conf spark.network.timeout=720s \
--conf spark.sql.shuffle.partitions=10000 \ --conf spark.sql.shuffle.partitions=10000 \
--conf spark.default.parallelism=10000 \ --conf spark.default.parallelism=10000 \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \ --master yarn --deploy-mode cluster --executor-memory 10g --driver-memory 6g --executor-cores 5 --num-executors 200 \
--master yarn --deploy-mode cluster --executor-memory 12g --driver-memory 8g --executor-cores 5 --num-executors 100 \ ../${JAR} -date ${LOG_TIME} -country ${COUNTRY} -platform ${PLATFORM} \
../${JAR} -date ${LOG_TIME} -country ${country} -platform ${platform} -output ${OUTPUT_PATH} -result_output ${RESULT_OUTPUT_PATH} -coalesce 500 -output ${OUTPUT_PATH}/mid -fre_output ${OUTPUT_PATH}/frequency -result_output ${OUTPUT_PATH}/result -coalesce 1000
if [ $? -ne 0 ]; then if [ $? -ne 0 ]; then
exit 255 exit 255
fi fi
\ No newline at end of file
type=command
dependencies=id_mapping_cn_ios
command=sh -x id_mapping.sh 'cn' 'android'
\ No newline at end of file
type=command
command=sh -x id_mapping.sh 'cn' 'ios'
\ No newline at end of file
type=command
dependencies=id_mapping_overseas_ios
command=sh -x id_mapping.sh 'overseas' 'android'
\ No newline at end of file
type=command
command=sh -x id_mapping.sh 'overseas' 'ios'
\ No newline at end of file
...@@ -32,7 +32,6 @@ unmount_output_path="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/dm_us ...@@ -32,7 +32,6 @@ unmount_output_path="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/dm_us
spark-submit --class mobvista.dmp.datasource.retargeting.DeviceInfoJob \ spark-submit --class mobvista.dmp.datasource.retargeting.DeviceInfoJob \
--name "DeviceInfoJob.wangjf.${date}" \ --name "DeviceInfoJob.wangjf.${date}" \
--conf spark.sql.broadcastTimeout=1200 \
--conf spark.sql.shuffle.partitions=10000 \ --conf spark.sql.shuffle.partitions=10000 \
--conf spark.default.parallelism=10000 \ --conf spark.default.parallelism=10000 \
--conf spark.kryoserializer.buffer.max=512m \ --conf spark.kryoserializer.buffer.max=512m \
...@@ -40,7 +39,7 @@ spark-submit --class mobvista.dmp.datasource.retargeting.DeviceInfoJob \ ...@@ -40,7 +39,7 @@ spark-submit --class mobvista.dmp.datasource.retargeting.DeviceInfoJob \
--conf spark.sql.files.maxPartitionBytes=536870912 \ --conf spark.sql.files.maxPartitionBytes=536870912 \
--conf spark.sql.adaptive.enabled=true \ --conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=536870912 \ --conf spark.sql.adaptive.advisoryPartitionSizeInBytes=536870912 \
--master yarn --deploy-mode cluster --executor-memory 12g --driver-memory 10g --executor-cores 4 --num-executors 100 \ --master yarn --deploy-mode cluster --executor-memory 12g --driver-memory 10g --executor-cores 4 --num-executors 150 \
../${JAR} \ ../${JAR} \
-date ${date} -output ${output_path} -coalesce 3000 -date ${date} -output ${output_path} -coalesce 3000
......
...@@ -59,7 +59,7 @@ object Constant { ...@@ -59,7 +59,7 @@ object Constant {
""" """
|SELECT b.device_id, UPPER(country) country, CAST(b.offer_id AS string) offer_id, COALESCE(a.id, b.event_name) id, COALESCE(a.event_name, b.event_name) event_name, COALESCE(a.event_type,'') event_type FROM |SELECT b.device_id, UPPER(country) country, CAST(b.offer_id AS string) offer_id, COALESCE(a.id, b.event_name) id, COALESCE(a.event_name, b.event_name) event_name, COALESCE(a.event_type,'') event_type FROM
| (SELECT devid device_id, MAX(country) country, event_name, uuid offer_id FROM dwh.ods_3s_trackingcsv_event_info | (SELECT devid device_id, MAX(country) country, event_name, uuid offer_id FROM dwh.ods_3s_trackingcsv_event_info
| WHERE yyyy = '@year' and mm = '@month' and dd = '@day' AND devid IS NOT NULL AND devid <> '' GROUP BY devid, event_name, uuid) b | WHERE yyyymmdd = '@date' AND devid IS NOT NULL AND devid <> '' GROUP BY devid, event_name, uuid) b
| LEFT JOIN | LEFT JOIN
| (SELECT CAST(id AS string) id, event_name, event_type, offer_id FROM dwh.ods_3s_trackingcsv_event_define WHERE yyyymmdd = '@date') a | (SELECT CAST(id AS string) id, event_name, event_type, offer_id FROM dwh.ods_3s_trackingcsv_event_define WHERE yyyymmdd = '@date') a
| ON a.offer_id = b.offer_id | ON a.offer_id = b.offer_id
......
...@@ -75,13 +75,7 @@ class TrackingEventDaily extends CommonSparkJob with java.io.Serializable { ...@@ -75,13 +75,7 @@ class TrackingEventDaily extends CommonSparkJob with java.io.Serializable {
FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
val year = date.substring(0, 4) var sql = Constant.tracking_event_sql.replace("@date", date)
val month = date.substring(4, 6)
val day = date.substring(6, 8)
var sql = Constant.tracking_event_sql.replace("@year", year)
.replace("@month", month)
.replace("@day", day)
spark.sql(sql) spark.sql(sql)
.filter(r => { .filter(r => {
......
...@@ -115,7 +115,7 @@ object Constant { ...@@ -115,7 +115,7 @@ object Constant {
StructField("xwho", StringType), StructField("xwho", StringType),
StructField("user_id", StringType), StructField("user_id", StringType),
StructField("bkupid", StringType), StructField("bkupid", StringType),
StructField("cnt", IntegerType) StructField("cnt", LongType)
)) ))
val androidCNIDSet = Array("imei", "oaid", "gaid", "sysid", "xwho", "user_id", "android_pkg", "bmosv_upt", "bmosv_ipua_pkg", "bkupid") val androidCNIDSet = Array("imei", "oaid", "gaid", "sysid", "xwho", "user_id", "android_pkg", "bmosv_upt", "bmosv_ipua_pkg", "bkupid")
...@@ -223,7 +223,7 @@ object Constant { ...@@ -223,7 +223,7 @@ object Constant {
val ios_id_mapping_sql: String = val ios_id_mapping_sql: String =
""" """
|SELECT idfa, idfv, pkg_name, sysid, bkupid, xwho, user_id, country, ip, ua, brand, model, os_version, osv_upt, upt, count(1) cnt |SELECT idfa, idfv, pkg_name, sysid, bkupid, xwho, user_id, country, ip, ua, brand, model, os_version, osv_upt, upt, count(1) cnt
| FROM dwd.dwd_device_ios_ids_inc_daily WHERE dt = '@date' | FROM dwd.dwd_device_ios_ids_inc_daily WHERE dt = '@date' @filter_country
| GROUP BY idfa, idfv, pkg_name, sysid, bkupid, xwho, user_id, country, ip, ua, brand, model, os_version, osv_upt, upt | GROUP BY idfa, idfv, pkg_name, sysid, bkupid, xwho, user_id, country, ip, ua, brand, model, os_version, osv_upt, upt
|""".stripMargin |""".stripMargin
...@@ -231,7 +231,7 @@ object Constant { ...@@ -231,7 +231,7 @@ object Constant {
""" """
|SELECT imei, android_id, pkg_name, oaid, gaid, sysid, bkupid, xwho, user_id, country, ip, ua, brand, model, os_version, osv_upt, upt, count(1) cnt |SELECT imei, android_id, pkg_name, oaid, gaid, sysid, bkupid, xwho, user_id, country, ip, ua, brand, model, os_version, osv_upt, upt, count(1) cnt
| FROM dwd.dwd_device_android_ids_inc_daily WHERE dt = '@date' @filter_country | FROM dwd.dwd_device_android_ids_inc_daily WHERE dt = '@date' @filter_country
| GROUP BY imei, android_id, oaid, gaid, sysid, bkupid, xwho, user_id, country, ip, ua, brand, model, os_version, osv_upt, upt | GROUP BY imei, android_id, pkg_name, oaid, gaid, sysid, bkupid, xwho, user_id, country, ip, ua, brand, model, os_version, osv_upt, upt
|""".stripMargin |""".stripMargin
val old_id_mapping_sql: String = val old_id_mapping_sql: String =
......
...@@ -22,13 +22,17 @@ class DspReq extends EtlDeviceIdDaily { ...@@ -22,13 +22,17 @@ class DspReq extends EtlDeviceIdDaily {
// ODS // ODS
val hour = i match { val hour = i match {
case 0 => case 0 =>
" AND hh BETWEEN '00' AND '05'" " AND hh BETWEEN '00' AND '03'"
case 1 => case 1 =>
" AND hh BETWEEN '06' AND '11'" " AND hh BETWEEN '04' AND '07'"
case 2 => case 2 =>
" AND hh BETWEEN '12' AND '17'" " AND hh BETWEEN '08' AND '11'"
case 3 => case 3 =>
" AND hh BETWEEN '18' AND '23'" " AND hh BETWEEN '12' AND '15'"
case 4 =>
" AND hh BETWEEN '16' AND '19'"
case 5 =>
" AND hh BETWEEN '20' AND '23'"
case _ => case _ =>
"" ""
} }
......
...@@ -40,10 +40,9 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable { ...@@ -40,10 +40,9 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable {
try { try {
if ("dsp_req".equalsIgnoreCase(business)) { if ("dsp_req".equalsIgnoreCase(business)) {
for (i <- 0 until 4) { for (i <- 0 until 6) {
val df = processData(date, i, spark) val df = processData(date, i, spark)
.repartition(5000) df.persist(StorageLevel.MEMORY_AND_DISK_SER)
.persist(StorageLevel.MEMORY_AND_DISK_SER)
val iosTab = df.filter(plf => { val iosTab = df.filter(plf => {
"ios".equals(plf._1) "ios".equals(plf._1)
...@@ -53,7 +52,7 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable { ...@@ -53,7 +52,7 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable {
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/ios/${i}"), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/ios/${i}"), true)
spark.createDataFrame(iosTab, iosSchema) spark.createDataFrame(iosTab, iosSchema)
.coalesce(coalesce) .repartition(coalesce)
.write.mode(SaveMode.Overwrite) .write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib") .option("orc.compress", "zlib")
.orc(output + s"/ios/${i}") .orc(output + s"/ios/${i}")
...@@ -67,7 +66,7 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable { ...@@ -67,7 +66,7 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable {
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/android/${i}"), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/android/${i}"), true)
spark.createDataFrame(adrTab, adrSchema) spark.createDataFrame(adrTab, adrSchema)
.coalesce(coalesce) .repartition(coalesce)
.write.mode(SaveMode.Overwrite) .write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib") .option("orc.compress", "zlib")
.orc(output + s"/android/${i}") .orc(output + s"/android/${i}")
...@@ -81,7 +80,7 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable { ...@@ -81,7 +80,7 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable {
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/other/${i}"), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/other/${i}"), true)
spark.createDataFrame(otherTab, otherSchema) spark.createDataFrame(otherTab, otherSchema)
.coalesce(coalesce) .repartition(coalesce)
.write.mode(SaveMode.Overwrite) .write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib") .option("orc.compress", "zlib")
.orc(output + s"/other/${i}") .orc(output + s"/other/${i}")
...@@ -90,8 +89,7 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable { ...@@ -90,8 +89,7 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable {
} }
} else { } else {
val df = processData(date, 0, spark) val df = processData(date, 0, spark)
.repartition(5000) df.persist(StorageLevel.MEMORY_AND_DISK_SER)
.persist(StorageLevel.MEMORY_AND_DISK_SER)
val iosTab = df.filter(plf => { val iosTab = df.filter(plf => {
"ios".equals(plf._1) "ios".equals(plf._1)
......
package mobvista.dmp.datasource.id_mapping package mobvista.dmp.datasource.id_mapping
import com.alibaba.fastjson.JSONObject import com.alibaba.fastjson.JSONObject
import mobvista.dmp.common.MobvistaConstant.sdf1 import mobvista.dmp.common.MobvistaConstant.{sdf1, sdf2}
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.id_mapping.Constant._ import mobvista.dmp.datasource.id_mapping.Constant._
import mobvista.dmp.util.MD5Util import mobvista.dmp.utils.common.MD5Util.hashMD5
import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
import java.net.URI import java.net.URI
import java.text.SimpleDateFormat
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
...@@ -33,6 +33,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -33,6 +33,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
options.addOption("output", true, "output") options.addOption("output", true, "output")
options.addOption("coalesce", true, "coalesce") options.addOption("coalesce", true, "coalesce")
options.addOption("result_output", true, "result_output") options.addOption("result_output", true, "result_output")
options.addOption("fre_output", true, "fre_output")
options options
} }
...@@ -45,12 +46,13 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -45,12 +46,13 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
val date = commandLine.getOptionValue("date") val date = commandLine.getOptionValue("date")
val output = commandLine.getOptionValue("output") val output = commandLine.getOptionValue("output")
val result_output = commandLine.getOptionValue("result_output") val result_output = commandLine.getOptionValue("result_output")
val fre_output = commandLine.getOptionValue("fre_output")
val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce")) val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))
val spark = MobvistaConstant.createSparkSession(s"IDMappingGraphx.$date.$country.$platform") val spark = MobvistaConstant.createSparkSession(s"IDMappingGraphx.$date.$country.$platform")
try { try {
oldAndTodayIdMapping(country, platform, date, spark, output, result_output, coalesce) oldAndTodayIdMapping(country.toUpperCase, platform, date, spark, output, result_output, fre_output, coalesce)
} finally { } finally {
if (spark != null) { if (spark != null) {
spark.stop() spark.stop()
...@@ -61,7 +63,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -61,7 +63,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
def oldAndTodayIdMapping(country: String, platform: String, date: String, spark: SparkSession, outPutPath: String, def oldAndTodayIdMapping(country: String, platform: String, date: String, spark: SparkSession, outPutPath: String,
resultOutPutPath: String, coalesce: Int) = { resultOutPutPath: String, frequencyOutPutPath: String, coalesce: Int) = {
implicit val formats = org.json4s.DefaultFormats implicit val formats = org.json4s.DefaultFormats
...@@ -73,11 +75,16 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -73,11 +75,16 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
// 1.今日数据加载 // 1.今日数据加载
platform match { platform match {
case "ios" => case "ios" =>
dailySQL = Constant.ios_id_mapping_sql.replace("@date", date)
schame = iosVertSchema schame = iosVertSchema
idSet = iosIDSet idSet = iosIDSet
idMainSet = iosMainIDSet idMainSet = iosMainIDSet
scoreMap = iosIDScoreMap scoreMap = iosIDScoreMap
country match {
case "CN" =>
dailySQL = Constant.ios_id_mapping_sql.replace("@date", date).replace("@filter_country", s"AND country = '${country}'")
case _ =>
dailySQL = Constant.ios_id_mapping_sql.replace("@date", date).replace("@filter_country", s"")
}
case "android" => { case "android" => {
schame = adrVertSchema schame = adrVertSchema
idMainSet = androidMainIDSet idMainSet = androidMainIDSet
...@@ -94,12 +101,36 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -94,12 +101,36 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
case _ => case _ =>
"" ""
} }
val todayDF = spark.createDataFrame(spark.sql(dailySQL).rdd.map(row => {
val df = spark.sql(dailySQL)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true)
df.persist(StorageLevel.MEMORY_AND_DISK_SER)
df.repartition(coalesce)
.write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(frequencyOutPutPath)
val fre_table = platform match {
case "ios" =>
"dws_device_id_ios_frequency"
case _ =>
"dws_device_id_android_frequency"
}
spark.sql(
s"""
|ALTER TABLE dws.$fre_table ADD IF NOT EXISTS PARTITION (dt='$date',source='${country.toLowerCase}')
| LOCATION '$frequencyOutPutPath'
|""".stripMargin)
val todayDF = spark.createDataFrame(df.rdd.map(row => {
processData(row, platform) processData(row, platform)
}), schema = schame) }), schema = schame)
val schedule_date = sdf1.format(sdf2.parse(date))
val vertex = todayDF.rdd.map(row => { val vertex = todayDF.rdd.map(row => {
processVertex(date, row, idSet, idMainSet) processVertex(schedule_date, row, idSet, idMainSet)
}).flatMap(l => l) }).flatMap(l => l)
val maxGraph = vertex.combineByKey( val maxGraph = vertex.combineByKey(
...@@ -142,7 +173,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -142,7 +173,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
val srcId = if (kv._1._1.matches(MobvistaConstant.md5Ptn)) { val srcId = if (kv._1._1.matches(MobvistaConstant.md5Ptn)) {
kv._1._1 kv._1._1
} else { } else {
MD5Util.getMD5Str(kv._1._1) hashMD5(kv._1._1)
} }
val srcType = kv._1._2 val srcType = kv._1._2
val oneIDJSON = new JSONObject() val oneIDJSON = new JSONObject()
...@@ -153,29 +184,44 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -153,29 +184,44 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
val oneID = if (key.matches(MobvistaConstant.md5Ptn)) { val oneID = if (key.matches(MobvistaConstant.md5Ptn)) {
key key
} else { } else {
MD5Util.getMD5Str(key) hashMD5(key)
} }
oneIDJSON.put(oneID, json.getJSONObject(key)) oneIDJSON.put(oneID, json.getJSONObject(key))
}) })
}) })
(srcId, srcType, oneIDJSON.toJSONString) Result(srcId, srcType, oneIDJSON.toJSONString)
}).persist(StorageLevel.MEMORY_AND_DISK_SER) }).persist(StorageLevel.MEMORY_AND_DISK_SER)
val end_time = sdf1.parse(date).getTime import spark.implicits._
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true)
midMergeOneIDRDD.toDF
.repartition(coalesce)
.write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(outPutPath)
spark.sql(
s"""
|ALTER TABLE ads.ads_device_id_mapping ADD IF NOT EXISTS PARTITION (dt='$date',source='${country.toLowerCase}',platform='$platform',`type`='mid')
| LOCATION '$outPutPath'
|""".stripMargin)
val resultOneID = midMergeOneIDRDD.mapPartitions(rs => { val resultOneID = midMergeOneIDRDD.mapPartitions(rs => {
rs.map(r => { rs.map(r => {
val device_id = r._1 val device_id = r.device_id
val device_type = r._2 val device_type = r.device_type
val one_id = MobvistaConstant.String2JSONObject(r._3) val one_id = MobvistaConstant.String2JSONObject(r.one_id)
val keys = one_id.keySet().asScala val keys = one_id.keySet().asScala
var oneIDScore: OneIDScore = OneIDScore("", "", 0) var oneIDScore: OneIDScore = OneIDScore("", "", 0)
keys.foreach(key => { keys.foreach(key => {
val sdf = new SimpleDateFormat("yyyy-MM-dd")
val json = one_id.getJSONObject(key) val json = one_id.getJSONObject(key)
val id_type = json.getString("id_type") val id_type = json.getString("id_type")
val id_type_score = scoreMap(id_type) val id_type_score = scoreMap(id_type)
val active_date = json.getString("active_date") val active_date = json.getString("active_date")
val cnt = json.getIntValue("cnt") val cnt = json.getLongValue("cnt")
val days = (end_time - sdf1.parse(active_date).getTime) / 1000 / 3600 / 24 + 1 val days = (sdf.parse(schedule_date).getTime - sdf.parse(active_date).getTime) / 1000 / 3600 / 24 + 1
val score = id_type_score * 30 / days + 0.1 * cnt val score = id_type_score * 30 / days + 0.1 * cnt
if (idSet.indexOf(id_type) < idSet.indexOf(oneIDScore.one_type) || idSet.indexOf(oneIDScore.one_type) == -1 if (idSet.indexOf(id_type) < idSet.indexOf(oneIDScore.one_type) || idSet.indexOf(oneIDScore.one_type) == -1
|| (idSet.indexOf(id_type) == idSet.indexOf(oneIDScore.one_type) && score >= oneIDScore.one_score)) { || (idSet.indexOf(id_type) == idSet.indexOf(oneIDScore.one_type) && score >= oneIDScore.one_score)) {
...@@ -185,16 +231,13 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -185,16 +231,13 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
val json = new JSONObject() val json = new JSONObject()
json.put("one_id", oneIDScore.one_id) json.put("one_id", oneIDScore.one_id)
json.put("one_type", oneIDScore.one_type) json.put("one_type", oneIDScore.one_type)
json.put("one_score", oneIDScore.one_score) // json.put("one_score", oneIDScore.one_score)
(device_id, device_type, json.toJSONString) Result(device_id, device_type, json.toJSONString)
}) })
}) })
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(resultOutPutPath), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(resultOutPutPath), true)
resultOneID.repartition(coalesce)
.saveAsTextFile(resultOutPutPath, classOf[GzipCodec])
/*
resultOneID resultOneID
.toDF .toDF
.repartition(coalesce) .repartition(coalesce)
...@@ -202,19 +245,13 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -202,19 +245,13 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
.option("orc.compress", "zlib") .option("orc.compress", "zlib")
.orc(resultOutPutPath) .orc(resultOutPutPath)
midMergeOneIDRDD.unpersist(true) spark.sql(
*/ s"""
|ALTER TABLE ads.ads_device_id_mapping ADD IF NOT EXISTS PARTITION (dt='$date',source='${country.toLowerCase}',platform='$platform',`type`='result')
| LOCATION '$resultOutPutPath'
|""".stripMargin)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true) midMergeOneIDRDD.unpersist(true)
/*
midMergeOneIDRDD.toDF
.repartition(coalesce)
.write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(outPutPath)
*/
midMergeOneIDRDD.repartition(coalesce)
.saveAsTextFile(outPutPath, classOf[GzipCodec])
} }
def processData(row: Row, platform: String): Row = { def processData(row: Row, platform: String): Row = {
...@@ -247,22 +284,22 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -247,22 +284,22 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
val upt = row.getAs[String]("upt") val upt = row.getAs[String]("upt")
val cnt = row.getAs[Long]("cnt") val cnt = row.getAs[Long]("cnt")
val idfv_bundle = if (StringUtils.isNotBlank(idfv)) { val idfv_bundle = if (StringUtils.isNotBlank(idfv)) {
MD5Util.getMD5Str(idfv + pkg_name) hashMD5(idfv + pkg_name)
} else { } else {
"" ""
} }
val bmosv_osv_upt = if (StringUtils.isNotBlank(osv_upt)) { val bmosv_osv_upt = if (StringUtils.isNotBlank(osv_upt)) {
MD5Util.getMD5Str(brand + model + os_version + osv_upt) hashMD5(brand + model + os_version + osv_upt)
} else { } else {
"" ""
} }
val bmosv_upt = if (StringUtils.isNotBlank(upt)) { val bmosv_upt = if (StringUtils.isNotBlank(upt)) {
MD5Util.getMD5Str(brand + model + os_version + upt) hashMD5(brand + model + os_version + upt)
} else { } else {
"" ""
} }
val bmosv_ipua_bundle = if (StringUtils.isNotBlank(ip)) { val bmosv_ipua_bundle = if (StringUtils.isNotBlank(ip)) {
MD5Util.getMD5Str(brand + model + os_version + ip + ua + pkg_name) hashMD5(brand + model + os_version + ip + ua + pkg_name)
} else { } else {
"" ""
} }
...@@ -287,17 +324,17 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -287,17 +324,17 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
val upt = row.getAs[String]("upt") val upt = row.getAs[String]("upt")
val cnt = row.getAs[Long]("cnt") val cnt = row.getAs[Long]("cnt")
val android_pkg = if (StringUtils.isNotBlank(android_id)) { val android_pkg = if (StringUtils.isNotBlank(android_id)) {
MD5Util.getMD5Str(android_id + pkg_name) hashMD5(android_id + pkg_name)
} else { } else {
"" ""
} }
val bmosv_upt = if (StringUtils.isNotBlank(upt)) { val bmosv_upt = if (StringUtils.isNotBlank(upt)) {
MD5Util.getMD5Str(brand + model + os_version + upt) hashMD5(brand + model + os_version + upt)
} else { } else {
"" ""
} }
val bmosv_ipua_pkg = if (StringUtils.isNotBlank(ip)) { val bmosv_ipua_pkg = if (StringUtils.isNotBlank(ip)) {
MD5Util.getMD5Str(brand + model + os_version + ip + ua + pkg_name) hashMD5(brand + model + os_version + ip + ua + pkg_name)
} else { } else {
"" ""
} }
...@@ -356,9 +393,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -356,9 +393,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
}).foreach(ir => { }).foreach(ir => {
oneID.put(ir._1, MobvistaConstant.String2JSONObject(ir._2)) oneID.put(ir._1, MobvistaConstant.String2JSONObject(ir._2))
}) })
iters.filter(tp => { iters.foreach(itr => {
!mainIDSet.contains(MobvistaConstant.String2JSONObject(tp._2).getString("id_type"))
}).foreach(itr => {
val k = itr._1 val k = itr._1
val t = itr._3 val t = itr._3
array += (((k, t), oneID.toJSONString)) array += (((k, t), oneID.toJSONString))
......
...@@ -57,7 +57,7 @@ class DeviceInfoJob extends CommonSparkJob with Serializable { ...@@ -57,7 +57,7 @@ class DeviceInfoJob extends CommonSparkJob with Serializable {
.config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.orc.filterPushdown", "true")
.config("spark.io.compression.codec", "lz4") .config("spark.io.compression.codec", "lz4")
.config("spark.io.compression.lz4.blockSize", "64k") .config("spark.io.compression.lz4.blockSize", "64k")
.config("spark.sql.autoBroadcastJoinThreshold", "314572800") .config("spark.sql.autoBroadcastJoinThreshold", "-1")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport() .enableHiveSupport()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment