Commit 450dd4c9 by WangJinfeng

update dmp

parent d26c25ff
......@@ -28,12 +28,9 @@ spark-submit --class mobvista.dmp.datasource.adn_sdk.AdnSdkDaily \
--conf spark.app.output_path=${OUTPUT_PATH} \
--conf spark.sql.shuffle.partitions=2000 \
--conf spark.default.parallelism=2000 \
--conf spark.shuffle.memoryFraction=0.4 \
--conf spark.storage.memoryFraction=0.4 \
--conf spark.driver.maxResultSize=8g \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
--conf spark.app.coalesce=60000 \
--master yarn --deploy-mode cluster --name adn_sdk_daily --executor-memory 8g --driver-memory 6g --executor-cores 4 --num-executors 200 \
--master yarn --deploy-mode cluster --name adn_sdk_daily --executor-memory 10g --driver-memory 6g --executor-cores 3 --num-executors 300 \
../${JAR}
if [[ $? -ne 0 ]];then
......
......@@ -32,8 +32,8 @@ check_await "$OLD_INPUT_PATH/_SUCCESS"
spark-submit --class mobvista.dmp.datasource.baichuan.AliInstallList \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.network.timeout=720s \
--conf spark.default.parallelism=400 \
--master yarn --deploy-mode cluster --name ali_install_wangjf --executor-memory 10g --driver-memory 4g --executor-cores 4 --num-executors 20 \
--conf spark.default.parallelism=1000 \
--master yarn --deploy-mode cluster --name ali_install_wangjf --executor-memory 8g --driver-memory 6g --executor-cores 3 --num-executors 50 \
../${JAR} -input ${INPUT_PATH} -oldInput ${OLD_INPUT_PATH} -output ${OUTPUT_PATH} -date ${dt} -parallelism 400 -coalesce 400
......
......@@ -2,7 +2,7 @@
source ../../dmp_env.sh
begin_day=$(date -d "$ScheduleTime 43 days ago" +"%Y-%m-%d")
begin_day=$(date -d "$ScheduleTime 14 days ago" +"%Y-%m-%d")
end_day01=$(date -d "$ScheduleTime 7 days ago" +"%Y-%m-%d")
end_day02=$(date -d "$ScheduleTime 1 days ago" +"%Y-%m-%d")
dt_slash_today=$(date -d "$ScheduleTime 1 days ago" +"%Y/%m/%d")
......
......@@ -6,6 +6,32 @@ COUNTRY=$1
PLATFORM=$2
if [[ ${COUNTRY} = 'cn' ]]; then
if [[ ${PLATFORM} = 'android' ]]; then
partition=1000
executors=100
cores=3
coalesce=500
else
partition=1000
executors=100
cores=3
coalesce=500
fi
else
if [[ ${PLATFORM} = 'android' ]]; then
partition=10000
executors=300
cores=3
coalesce=3000
else
partition=2000
executors=100
cores=3
coalesce=500
fi
fi
LOG_TIME=$(date +%Y%m%d -d "-1 day $ScheduleTime")
date_path=$(date +'%Y/%m/%d' -d "-1 day $ScheduleTime")
......@@ -14,21 +40,27 @@ ADN_REQUEST_INPUT_PATH=${ID_MAPPING}/${date_path}/adn_request
DSP_INPUT_PATH=${ID_MAPPING}/${date_path}/dsp_req
# check_await "${ADN_REQUEST_INPUT_PATH}/$PLATFORM/_SUCCESS"
check_await "${ADN_REQUEST_INPUT_PATH}/$PLATFORM/_SUCCESS"
# check_await "${DSP_INPUT_PATH}/$PLATFORM/_SUCCESS"
before_date_path=$(date +'%Y/%m/%d' -d "-2 day $ScheduleTime")
OLD_ID_MAPPING_PATH=${ADS_DEVICE_ID_MAPPING}/${before_date_path}/${COUNTRY}/${PLATFORM}
check_await "${OLD_ID_MAPPING_PATH}/mid/_SUCCESS"
OUTPUT_PATH=${ADS_DEVICE_ID_MAPPING}/${date_path}/${COUNTRY}/${PLATFORM}
spark-submit --class mobvista.dmp.datasource.id_mapping.IDMappingGraphx \
--name "IDMappingGraphx.${LOG_TIME}.${COUNTRY}.${PLATFORM}" \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.network.timeout=720s \
--conf spark.sql.shuffle.partitions=10000 \
--conf spark.default.parallelism=10000 \
--master yarn --deploy-mode cluster --executor-memory 10g --driver-memory 6g --executor-cores 5 --num-executors 200 \
--conf spark.sql.shuffle.partitions=${partition} \
--conf spark.default.parallelism=${partition} \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 4g --executor-cores ${cores} --num-executors ${executors} \
../${JAR} -date ${LOG_TIME} -country ${COUNTRY} -platform ${PLATFORM} \
-output ${OUTPUT_PATH}/mid -fre_output ${OUTPUT_PATH}/frequency -result_output ${OUTPUT_PATH}/result -coalesce 1000
-output ${OUTPUT_PATH}/mid -fre_output ${OUTPUT_PATH}/frequency -result_output ${OUTPUT_PATH}/result -coalesce ${coalesce}
if [ $? -ne 0 ]; then
exit 255
......
type=command
command=sh -x id_mapping_write.sh
\ No newline at end of file
#! /bin/bash
source ../dmp_env.sh
LOG_TIME=$(date +%Y%m%d -d "-1 day $ScheduleTime")
date_path=$(date +'%Y/%m/%d' -d "-1 day $ScheduleTime")
SOURCES="cn overseas"
PLTS="ios android"
ID_MAPPING_PATH=${ADS_DEVICE_ID_MAPPING}/${date_path}
for sour in ${SOURCES}; do
for plt in ${PLTS}; do
check_await "${ID_MAPPING_PATH}/$sour/$plt/result/_SUCCESS"
done
done
spark-submit --class mobvista.dmp.datasource.id_mapping.IDMappingWrite \
--name "IDMappingWrite.${LOG_TIME}" \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.network.timeout=720s \
--conf spark.sql.shuffle.partitions=2000 \
--conf spark.default.parallelism=2000 \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 4 --num-executors 8 \
../${JAR} -date ${LOG_TIME}
if [ $? -ne 0 ]; then
exit 255
fi
......@@ -33,7 +33,7 @@ public class Constants {
if (jsonObject.isEmpty()) {
jsonObject = new JSONObject();
}
} catch (JSONException e) {
} catch (JSONException | NumberFormatException e) {
jsonObject = new JSONObject();
}
} else {
......
......@@ -16,7 +16,7 @@ local_host=127.0.0.1
ali_host=192.168.17.122,192.168.17.123,192.168.17.124,192.168.17.145,192.168.17.146,192.168.17.147
aws_host=172.31.31.111,172.31.24.21,172.31.20.35,172.31.23.17,172.31.18.160,172.31.29.16
aws_host=172.31.31.111,172.31.24.21,172.31.23.17,172.31.18.160
rtdmp.vg.host=172.31.31.111,172.31.24.21,172.31.20.35,172.31.23.17,172.31.18.160,172.31.29.16
......
......@@ -3,8 +3,8 @@ package mobvista.dmp.datasource.dm
import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.{concat_ws, lit}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel
import java.net.URI
......@@ -51,14 +51,13 @@ class ComToponTopltv1015 extends CommonSparkJob with Serializable {
.getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true)
try {
val imei_sql=
val imei_sql =
s"""
|select device_id
|from
......@@ -147,16 +146,16 @@ class ComToponTopltv1015 extends CommonSparkJob with Serializable {
|group by imp.oaid
|) as t)<0.3
|""".stripMargin
println("imei_sql=="+imei_sql)
println("oaid_sql=="+oaid_sql)
println("imei_sql==" + imei_sql)
println("oaid_sql==" + oaid_sql)
val imei_df: DataFrame = spark.sql(imei_sql).persist(StorageLevel.MEMORY_AND_DISK_SER)
val imei_df_with_package_name = imei_df.select(concat_ws("\t", imei_df.col("device_id"), lit("imei"), lit("android"),lit("[\"com.topon_topltv_1015\"]")))
val imei_df_with_country = imei_df.select(concat_ws("\t", imei_df.col("device_id"), lit("imei"), lit("android"),lit("CN")))
val imei_df_with_package_name = imei_df.select(concat_ws("\t", imei_df.col("device_id"), lit("imei"), lit("android"), lit("[\"com.topon_topltv_1015\"]")))
val imei_df_with_country = imei_df.select(concat_ws("\t", imei_df.col("device_id"), lit("imei"), lit("android"), lit("CN")))
val oaid_df: DataFrame = spark.sql(oaid_sql).persist(StorageLevel.MEMORY_AND_DISK_SER)
val oaid_df_with_package_name = oaid_df.select(concat_ws("\t", oaid_df.col("device_id"), lit("oaid"), lit("android"),lit("[\"com.topon_topltv_1015\"]")))
val oaid_df_with_country = oaid_df.select(concat_ws("\t", oaid_df.col("device_id"), lit("oaid"), lit("android"),lit("CN")))
val oaid_df_with_package_name = oaid_df.select(concat_ws("\t", oaid_df.col("device_id"), lit("oaid"), lit("android"), lit("[\"com.topon_topltv_1015\"]")))
val oaid_df_with_country = oaid_df.select(concat_ws("\t", oaid_df.col("device_id"), lit("oaid"), lit("android"), lit("CN")))
imei_df_with_package_name.union(oaid_df_with_package_name).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
imei_df_with_country.union(oaid_df_with_country).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)
......@@ -170,6 +169,6 @@ class ComToponTopltv1015 extends CommonSparkJob with Serializable {
object ComToponTopltv1015 {
def main(args: Array[String]): Unit = {
new ComToponTopltv1015().run(args)
new ComToponTopltv1015().run(args)
}
}
......@@ -11,13 +11,13 @@ import org.apache.spark.sql.{SaveMode, SparkSession}
import java.net.URI
/**
* @package: mobvista.dmp.datasource.dm
* @author: wangjf
* @date: 2018/12/13
* @time: 下午6:42
* @email: jinfeng.wang@mobvista.com
* @phone: 152-1062-7698
*/
* @package: mobvista.dmp.datasource.dm
* @author: wangjf
* @date: 2018/12/13
* @time: 下午6:42
* @email: jinfeng.wang@mobvista.com
* @phone: 152-1062-7698
*/
class DmpDeviceInterest extends CommonSparkJob with Serializable {
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
......@@ -108,11 +108,17 @@ class DmpDeviceInterest extends CommonSparkJob with Serializable {
jsonArray.add(json)
}
})
DmInterestTagV2(device_id = device_id, device_type = device_type, platform = platform, install = install_list.asJava.toString,
tags = jsonArray.toString, ext_data = ext_data, update_date = update_date)
if (install_list.size > 0) {
DmInterestTagV2(device_id = device_id, device_type = device_type, platform = platform, install = install_list.asJava.toString,
tags = jsonArray.toString, ext_data = ext_data, update_date = update_date)
} else {
null
}
})
rdd.repartition(coalesce).toDF()
rdd.filter(dm => {
dm != null
}).repartition(coalesce).toDF()
.write
.mode(SaveMode.Overwrite)
.option("orc.compress", "snappy")
......
......@@ -371,23 +371,22 @@ class DspOrgEtlDailys extends CommonSparkJob with Serializable {
"4180" -> Array("me.ele_oppolianmeng_notinstall"),
"2840" -> Array("com.youku.phone_oppocn_notinstall"),
"4181" -> Array("me.ele_oppolianmeng_hist_notinstall"),
"2774" -> Array("com.smile.gifmaker_notinstall_oppo"), //2020.11.11添加快手、京东入库
"2774" -> Array("com.smile.gifmaker_notinstall_oppo"), //2020.11.11添加快手、京东入库
"2773" -> Array("com.jingdong.app.mallr_notinstall_oppo"),
"2716" -> Array("com.jingdong.app.mall","com.jingdong.app.mall_oppo"),
"2717" -> Array("com.eg.android.AlipayGphone","com.eg.android.AlipayGphone_oppo"), //补充说明: com.eg.android.AlipayGphone_oppo伪包名大写,但已经入库,修改困难。以后入库伪包名统一定为小写,类似上面的 com.ucmobile_oppo)
"2718" -> Array("com.qiyi.video","com.qiyi.video_oppo"),
"2783" -> Array("com.taobao.idlefish","com.taobao.idlefish_oppo"),
"2840" -> Array("com.youku.phone_notinstall","com.youku.phone_notinstall_oppo"),
"2889" -> Array("com.sankuai.meituan","com.sankuai.meituan_oppo"),
"2890" -> Array("com.meituan.itakeaway","com.meituan.itakeaway_oppo"),
"2904" -> Array("com.smile.gifmaker","com.smile.gifmaker_oppo"),
"2905" -> Array("com.kuaishou.nebula","com.kuaishou.nebula_oppo"),
"2906" -> Array("com.youku.phone","com.youku.phone_oppo"),
"3160" -> Array("com.tencent.news","com.tencent.news_oppo")
"2716" -> Array("com.jingdong.app.mall", "com.jingdong.app.mall_oppo"),
"2717" -> Array("com.eg.android.AlipayGphone", "com.eg.android.AlipayGphone_oppo"), //补充说明: com.eg.android.AlipayGphone_oppo伪包名大写,但已经入库,修改困难。以后入库伪包名统一定为小写,类似上面的 com.ucmobile_oppo)
"2718" -> Array("com.qiyi.video", "com.qiyi.video_oppo"),
"2783" -> Array("com.taobao.idlefish", "com.taobao.idlefish_oppo"),
"2840" -> Array("com.youku.phone_notinstall", "com.youku.phone_notinstall_oppo"),
"2889" -> Array("com.sankuai.meituan", "com.sankuai.meituan_oppo"),
"2890" -> Array("com.meituan.itakeaway", "com.meituan.itakeaway_oppo"),
"2904" -> Array("com.smile.gifmaker", "com.smile.gifmaker_oppo"),
"2905" -> Array("com.kuaishou.nebula", "com.kuaishou.nebula_oppo"),
"2906" -> Array("com.youku.phone", "com.youku.phone_oppo")
)
for (item <- mapData_oppo) {
if (dealeridArray.contains(item._1)) {
for( packageName <- item._2 ){
for (packageName <- item._2) {
value.packageName = packageName
arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
}
......@@ -410,24 +409,36 @@ class DspOrgEtlDailys extends CommonSparkJob with Serializable {
val mapData_bes = Map("100193" -> Array("com.taobao.taobao","com.taobao.taobao_bes"),
"100189" -> Array("com.eg.android.AlipayGphone","com.eg.android.AlipayGphone_bes"),
"100191" -> Array("com.jingdong.app.mall","com.jingdong.app.mall_bes"),
"100187" -> Array("com.UCMobile","com.UCMobile_bes"),
"100194" -> Array("com.taobao.idlefish","com.taobao.idlefish_bes"),
"100195" -> Array("com.qiyi.video","com.qiyi.video_bes"),
"100196" -> Array("com.smile.gifmaker","com.smile.gifmaker_bes"),
"100188" -> Array("com.tencent.news","com.tencent.news_bes"),
"100310" -> Array("com.taobao.litetao","com.taobao.litetao_bes"),
"100203" -> Array("com.ss.android.ugc.aweme","com.ss.android.ugc.aweme_bes"),
"00251" -> Array("com.xunmeng.pinduoduo","com.xunmeng.pinduoduo_bes"),
"100400" -> Array("com.youku.phone","com.youku.phone_bes"),
"100187" -> Array("com.UCMobile", "com.UCMobile_bes"),
"100194" -> Array("com.taobao.idlefish", "com.taobao.idlefish_bes"),
"100195" -> Array("com.qiyi.video", "com.qiyi.video_bes"),
"100196" -> Array("com.smile.gifmaker", "com.smile.gifmaker_bes"),
"100188" -> Array("com.tencent.news", "com.tencent.news_bes"),
"100310" -> Array("com.taobao.litetao", "com.taobao.litetao_bes"),
"100203" -> Array("com.ss.android.ugc.aweme", "com.ss.android.ugc.aweme_bes"),
"00251" -> Array("com.xunmeng.pinduoduo", "com.xunmeng.pinduoduo_bes"),
"100400" -> Array("com.youku.phone", "com.youku.phone_bes"),
"100219" -> Array("com.youku.phone_bes_notinstall"),
"100304" -> Array("me.ele","me.ele_bes") )
val mapData_ios_bes = Map("100197" -> Array("id387682726","id3876827262021090301"),
"100345" -> Array("id333206289","id3332062892021090301"),
"100344" -> Array("id1340376323","id13403763232021090301"),
"100353" -> Array("id1044283059","id10442830592021090301"),
"100304" -> Array("me.ele", "me.ele_bes"),
"100263" -> Array("com.taobao.taobao_bes", "com.taobao.taobao"),
"100273" -> Array("com.taobao.taobao_bes", "com.taobao.taobao"),
"100423" -> Array("com.UCMobile_bes", "com.UCMobile"),
"100435" -> Array("com.UCMobile_bes", "com.UCMobile"),
"100431" -> Array("com.youku.phone_bes", "com.youku.phone"),
"100433" -> Array("com.youku.phone_bes", "com.youku.phone")
)
val mapData_ios_bes = Map("100197" -> Array("id387682726", "id3876827262021090301"),
"100345" -> Array("id333206289", "id3332062892021090301"),
"100344" -> Array("id1340376323", "id13403763232021090301"),
"100353" -> Array("id1044283059", "id10442830592021090301"),
"100220" -> Array("id3361414752022010401"),
"100368" -> Array("id507161324","id5071613242021091701") )
"100368" -> Array("id507161324", "id5071613242021091701"),
"100322" -> Array("id38768272620220119", "id387682726"),
"100430" -> Array("id38768272620220119", "id387682726"),
"100432" -> Array("id3361414752022010401"),
"100434" -> Array("id3361414752022010401")
)
if ("bes".equals(exchanges)) {
if("android".equalsIgnoreCase(platform)) {
......
......@@ -146,17 +146,18 @@ class DspOrgLogEtlHoursDemo extends CommonSparkJob with Serializable {
val exitId = row.getAs[String]("exitid")
val exchanges = row.getAs[String]("exchanges")
var dealerid = ""
if("mopub".equalsIgnoreCase(exchanges) || "oppocn".equalsIgnoreCase(exchanges) || "inmobi".equalsIgnoreCase(exchanges) || "bes".equalsIgnoreCase(exchanges) || "iqiyi".equalsIgnoreCase(exchanges) || "vivo".equalsIgnoreCase(exchanges) ){
if ("mopub".equalsIgnoreCase(exchanges) || "oppocn".equalsIgnoreCase(exchanges) || "inmobi".equalsIgnoreCase(exchanges) ||
"bes".equalsIgnoreCase(exchanges) || "iqiyi".equalsIgnoreCase(exchanges) || "vivo".equalsIgnoreCase(exchanges)) {
val ext3 = row.getAs[String]("ext3")
if(StringUtils.isNotBlank(ext3) && ext3.startsWith("{")){
try{
if (StringUtils.isNotBlank(ext3) && ext3.startsWith("{")) {
try {
val testObj = GsonUtil.String2JsonObject(ext3)
val dealids = testObj.get("dealids")
if(dealids != null && !dealids.isJsonNull){
val dealids = testObj.get("dealids")
if (dealids != null && !dealids.isJsonNull) {
dealerid = dealids.getAsString
}else{
val ruleids = testObj.get("ruleids")
if(ruleids != null && !ruleids.isJsonNull){
} else {
val ruleids = testObj.get("ruleids")
if (ruleids != null && !ruleids.isJsonNull) {
dealerid = ruleids.getAsString
}
}
......@@ -293,8 +294,8 @@ class DspOrgLogEtlHoursDemo extends CommonSparkJob with Serializable {
}
val exitId = row.getAs[String]("exitid")
val time = row.getAs[String]("time")
val geoInfo = row.getAs[String]("geoinfo")
val time = row.getAs[String]("time")
val geoInfo = row.getAs[String]("geoinfo")
val longitude = row.getAs[String]("longitude")
val latitude = row.getAs[String]("latitude")
var segment = ""
......
......@@ -407,19 +407,19 @@ object Constant {
(f_platform, rw)
}
case class Result(device_id: String, device_type: String, one_id: String) extends Serializable
case class Result(device_id: String, device_type: String, one_id: String, update_date: String) extends Serializable
case class OneIDScore(one_id: String, one_type: String, one_score: Double, one_version: String) extends Serializable
class CustomInterator(active_date: String, iter: Iterator[((String, String), Set[(String, String, Long)])],
idArray: Array[String], mainIDSet: Set[String]) extends Iterator[ArrayBuffer[((String, String), String)]] {
idArray: Array[String], mainIDSet: Set[String]) extends Iterator[ArrayBuffer[((String, String), (String, String))]] {
def hasNext: Boolean = {
iter.hasNext
}
def next: ArrayBuffer[((String, String), String)] = {
def next: ArrayBuffer[((String, String), (String, String))] = {
val kv = iter.next
val array = new ArrayBuffer[((String, String), String)]()
val array = new ArrayBuffer[((String, String), (String, String))]()
val tmpOneId = kv._1._1
val tmpOneIdType = kv._1._2
val iters = kv._2
......@@ -438,7 +438,7 @@ object Constant {
}
finalize()
})
array += (((tmpOneId, tmpOneIdType), oneID.toJSONString))
array += (((tmpOneId, tmpOneIdType), (oneID.toJSONString, active_date)))
// if (idArray.indexOf(tmpOneIdType) > minTypeIndex) {
iters.foreach(itr => {
var oneJSON = new JSONObject()
......@@ -447,7 +447,7 @@ object Constant {
} else {
oneJSON = oneID
}
array += (((itr._1, itr._2), oneJSON.toJSONString))
array += (((itr._1, itr._2), (oneJSON.toJSONString, active_date)))
finalize()
})
// }
......
......@@ -50,7 +50,18 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
// val fre_output = commandLine.getOptionValue("fre_output")
val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))
val spark = MobvistaConstant.createSparkSession(s"IDMappingGraphx.$date.$country.$platform")
val spark = SparkSession
.builder()
.appName(s"IDMappingGraphx.$date.$country.$platform")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "lz4")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryo.registrationRequired", "false")
.config("spark.kryo.registrator", "mobvista.dmp.datasource.id_mapping.MyRegisterKryo")
.enableHiveSupport()
.getOrCreate()
try {
oldAndTodayIdMapping(country.toUpperCase, platform, date, spark, output, result_output, coalesce)
......@@ -62,7 +73,6 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
0
}
def oldAndTodayIdMapping(country: String, platform: String, date: String, spark: SparkSession, outPutPath: String,
resultOutPutPath: String, coalesce: Int) = {
......@@ -162,31 +172,47 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
json.put("one_cnt", t._3)
oneID.put(t._1, json)
})
(kv._1, oneID.toJSONString)
(kv._1, (oneID.toJSONString, schedule_date))
})
// multiOneIDRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
// singleOneIDRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
val yesDate = DateUtil.getDayByString(date, "yyyyMMdd", -1)
val updateDate = sdf1.format(sdf2.parse(DateUtil.getDayByString(date, "yyyyMMdd", -7)))
// spark.udf.register("filterAction",filterAction _)
val oldMidMergeOneIDRDD = spark.sql(
s"""
|SELECT device_id, device_type, one_id
|SELECT device_id, device_type, one_id, update_date
| FROM ads.ads_device_id_mapping WHERE dt = '$yesDate' AND source = '${country.toLowerCase}' AND platform = '$platform' AND `type` = 'mid'
|""".stripMargin)
.rdd.map(row => {
((row.getAs[String]("device_id"), row.getAs[String]("device_type")), row.getAs[String]("one_id"))
.rdd
.map(row => {
val update_date = if (StringUtils.isNotBlank(row.getAs[String]("update_date"))) {
row.getAs[String]("update_date")
} else {
schedule_date
}
((row.getAs[String]("device_id"), row.getAs[String]("device_type")), (row.getAs[String]("one_id"), update_date))
}).filter(rs => {
filterAction(rs._1._2, idMainSet) || (!filterAction(rs._1._2, idMainSet) && rs._2._2.compareTo(updateDate) >= 0)
})
val midMergeOneIDRDD = spark.sparkContext.union(Seq(singleOneIDRDD, multiOneIDRDD, oldMidMergeOneIDRDD))
val midMergeOneIDRDD = spark.sparkContext.union(Seq(multiOneIDRDD, singleOneIDRDD, oldMidMergeOneIDRDD))
.combineByKey(
(v: String) => Set(v),
(c: Set[String], v: String) => c ++ Seq(v),
(c1: Set[String], c2: Set[String]) => c1 ++ c2
(v: (String, String)) => Set(v),
(c: Set[(String, String)], v: (String, String)) => c ++ Seq(v),
(c1: Set[(String, String)], c2: Set[(String, String)]) => c1 ++ c2
).map(kv => {
val srcId = kv._1._1
val srcType = kv._1._2
var update_date = ""
val oneIDJSON = new JSONObject()
kv._2.foreach(js => {
val json = MobvistaConstant.String2JSONObject(js)
kv._2.foreach(ou => {
val json = MobvistaConstant.String2JSONObject(ou._1)
val keys = json.keySet().asScala
keys.foreach(key => {
if (oneIDJSON.containsKey(key) && oneIDJSON.getJSONObject(key).getString("one_date")
......@@ -195,13 +221,17 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
oneIDJSON.put(key, json.getJSONObject(key))
}
})
if (StringUtils.isBlank(update_date) || update_date.compareTo(ou._2) < 0) {
update_date = ou._2
}
})
Result(srcId, srcType, oneIDJSON.toJSONString)
}).persist(StorageLevel.MEMORY_AND_DISK_SER)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true)
Result(srcId, srcType, oneIDJSON.toJSONString, update_date)
})
import spark.implicits._
// midMergeOneIDRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true)
midMergeOneIDRDD.toDF
.repartition(coalesce)
......@@ -220,6 +250,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
val device_id = r.device_id
val device_type = r.device_type
val one_id = MobvistaConstant.String2JSONObject(r.one_id)
val update_date = r.update_date
val keys = one_id.keySet().asScala
var oneIDScore: OneIDScore = OneIDScore("", "", 0, "")
keys.foreach(key => {
......@@ -241,7 +272,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
json.put("one_type", oneIDScore.one_type)
json.put("one_score", oneIDScore.one_score)
json.put("one_version", oneIDScore.one_version)
Result(device_id, device_type, json.toJSONString)
Result(device_id, device_type, json.toJSONString, update_date)
})
})
......@@ -263,6 +294,10 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
midMergeOneIDRDD.unpersist(true)
}
def filterAction(device_type: String, mainIDSet: Set[String]): Boolean = {
mainIDSet.contains(device_type)
}
def processData(row: Row, platform: String): Row = {
platform match {
case "ios" =>
......
package mobvista.dmp.datasource.id_mapping
import com.datastax.spark.connector.{SomeColumns, _}
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.datasource.id_mapping.Constant.Result
import mobvista.dmp.util.PropertyUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.sql.SparkSession
/**
* @package: mobvista.dmp.datasource.id_mapping
* @author: wangjf
* @date: 2021/11/30
* @time: 7:51 下午
* @email: jinfeng.wang@mobvista.com
*/
class IDMappingWrite extends CommonSparkJob with Serializable {
def commandOptions(): Options = {
val options = new Options()
options.addOption("date", true, "date")
options
}
override protected def run(args: Array[String]): Int = {
val parser = new BasicParser()
val options = commandOptions()
val commandLine = parser.parse(options, args)
val date = commandLine.getOptionValue("date")
val spark: SparkSession = SparkSession.builder()
.appName(s"IDMappingWrite.$date")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.cassandra.connection.host", PropertyUtil.getProperty("ip.properties", "aws_host"))
.config("spark.cassandra.connection.port", "9042")
.config("spark.cassandra.query.retry.count", "3")
.config("spark.cassandra.connection.compression", "LZ4")
.config("spark.cassandra.input.consistency.level", "LOCAL_ONE")
.config("spark.cassandra.output.consistency.level", "LOCAL_ONE")
.config("spark.cassandra.connection.remoteConnectionsPerExecutor", "16")
.config("spark.cassandra.connection.localConnectionsPerExecutor", "8")
.config("spark.cassandra.input.fetch.sizeInRows", "4096")
.config("spark.cassandra.concurrent.reads", "4096")
.config("spark.cassandra.output.concurrent.writes", "32")
.config("spark.cassandra.output.batch.grouping.buffer.size", "2048")
.config("spark.cassandra.output.batch.size.bytes", "2048")
.config("spark.cassandra.connection.keepAliveMS", "60000")
.config("spark.cassandra.auth.username", "chMjFHCD5J7NrXk3gKE8")
.config("spark.cassandra.auth.password", "mAQRjmcTMtYRx6iz43FM")
.enableHiveSupport()
.getOrCreate()
try {
val keyspace = "one_service"
val tableName = "id_mapping"
val columns = SomeColumns("device_id", "device_type", "one_id")
val sql =
s"""
|SELECT device_id, device_type, one_id, update_date FROM ads.ads_device_id_mapping
| WHERE dt = '${date}' AND `type` = 'result'
|""".stripMargin
val df = spark.sql(sql)
.rdd
.map(row => {
Result(row.getAs[String]("device_id"), row.getAs[String]("device_type"), row.getAs[String]("one_id"),
row.getAs[String]("update_date"))
})
df.saveToCassandra(keyspace, tableName, columns)
} finally {
if (spark != null) {
spark.stop()
}
}
0
}
}
object IDMappingWrite {
def main(args: Array[String]): Unit = {
new IDMappingWrite().run(args)
}
}
package mobvista.dmp.datasource.id_mapping
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator
/**
* @package: mobvista.dmp.datasource.id_mapping
* @author: wangjf
* @date: 2022/1/16
* @time: 3:16 PM
* @email: jinfeng.wang@mobvista.com
*/
class MyRegisterKryo extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
kryo.register(classOf[Constant.Result])
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment