Commit 0982ab1d by fan.jiang

lazada change datasource from table ods_dmp_user_info

parent b65e4bae
......@@ -4,6 +4,7 @@
source ../dmp_env.sh
dt_today=$(date -d "$ScheduleTime 2 days ago" +"%Y%m%d")
last_req_day=$(date -d "$ScheduleTime 16 days ago" +"%Y-%m-%d")
dt_six_days_ago=$(date -d "$ScheduleTime 8 days ago" +"%Y%m%d")
dt_30days_ago=$(date -d "$ScheduleTime 31 days ago" +"%Y%m%d")
......@@ -43,7 +44,9 @@ MY_GAID_OUTPUT_PATH="${LAZADA_OUTPUT_PATH}/${dt_slash_today}/gaid/MY"
SG_GAID_OUTPUT_PATH="${LAZADA_OUTPUT_PATH}/${dt_slash_today}/gaid/SG"
check_await "${ODS_DMP_USER_INFO_DAILY}/${dt_today}/_SUCCESS"
# check_await "${ODS_DMP_USER_INFO_DAILY}/${dt_today}/_SUCCESS"
check_await "${ODS_DMP_USER_INFO}/${dt_slash_one_day}/adn_request/_SUCCESS"
check_await "${ODS_DMP_USER_INFO}/${dt_slash_one_day}/dsp_req/_SUCCESS"
hadoop fs -rm -r "${GAID_OUTPUT_PATH}"
hadoop fs -rm -r "${TH_GAID_OUTPUT_PATH}"
......@@ -62,7 +65,7 @@ spark-submit --class mobvista.dmp.datasource.taobao.EtlLazadaActivitionDaily \
--files ${HIVE_SITE_PATH} \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 4g --executor-cores 4 --num-executors 90 ../${JAR} \
-gaidoutput "${GAID_OUTPUT_PATH}" \
-today ${dt_today} \
-today ${dt_today} -last_req_day ${last_req_day} \
-input_one_day ${INPUT_ONE_DAY} -input_two_day ${INPUT_TWO_DAY} -input_three_day ${INPUT_THREE_DAY} \
-th_gaidoutput "${TH_GAID_OUTPUT_PATH}" -vn_gaidoutput "${VN_GAID_OUTPUT_PATH}" -ph_gaidoutput "${PH_GAID_OUTPUT_PATH}" -my_gaidoutput "${MY_GAID_OUTPUT_PATH}" -sg_gaidoutput "${SG_GAID_OUTPUT_PATH}" \
-th_input_one_day ${TH_INPUT_ONE_DAY} -vn_input_one_day ${VN_INPUT_ONE_DAY} -ph_input_one_day ${PH_INPUT_ONE_DAY} -my_input_one_day ${MY_INPUT_ONE_DAY} -sg_input_one_day ${SG_INPUT_ONE_DAY} \
......
......@@ -40,6 +40,7 @@ class EtlLazadaActivitionDaily extends CommonSparkJob {
options.addOption("ph_input_three_day", true, "[must] ph_input_three_day")
options.addOption("my_input_three_day", true, "[must] my_input_three_day")
options.addOption("sg_input_three_day", true, "[must] sg_input_three_day")
options.addOption("last_req_day", true, "[must] last_req_day")
options
}
......@@ -79,6 +80,7 @@ class EtlLazadaActivitionDaily extends CommonSparkJob {
val ph_input_three_day = commandLine.getOptionValue("ph_input_three_day")
val my_input_three_day = commandLine.getOptionValue("my_input_three_day")
val sg_input_three_day = commandLine.getOptionValue("sg_input_three_day")
val last_req_day = commandLine.getOptionValue("last_req_day")
val spark = SparkSession.builder()
......@@ -105,21 +107,21 @@ class EtlLazadaActivitionDaily extends CommonSparkJob {
val sql2=
s"""
|select dev_id,country
|from dwh.ods_dmp_user_info_daily where dt between '${dt_30days_ago}' and '${today}'
| and dev_type='gaid'
|select device_id,country
|from dwh.ods_dmp_user_info where dt = '${today}' and last_req_day >= '${last_req_day}'
| and device_type='gaid'
| and platform='android'
| group by dev_id,country
| group by device_id,country
""".stripMargin
val dfCache: DataFrame = spark.sql(sql2).persist(StorageLevel.MEMORY_AND_DISK_SER)
// 2021.05.21需求,所有国家的数据放开数据量限制,有多少数据取多少数据,也不用和以往数据去重
dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "ID").map(_.getAs[String]("dev_id")).coalesce(60).saveAsTextFile(gaidoutput)
dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "TH").map(_.getAs[String]("dev_id")).coalesce(60).saveAsTextFile(th_gaidoutput)
dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "VN").map(_.getAs[String]("dev_id")).coalesce(60).saveAsTextFile(vn_gaidoutput)
dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "PH").map(_.getAs[String]("dev_id")).coalesce(60).saveAsTextFile(ph_gaidoutput)
dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "MY").map(_.getAs[String]("dev_id")).coalesce(60).saveAsTextFile(my_gaidoutput)
dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "SG").map(_.getAs[String]("dev_id")).coalesce(60).saveAsTextFile(sg_gaidoutput)
dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "ID").map(_.getAs[String]("device_id")).coalesce(60).saveAsTextFile(gaidoutput)
dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "TH").map(_.getAs[String]("device_id")).coalesce(60).saveAsTextFile(th_gaidoutput)
dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "VN").map(_.getAs[String]("device_id")).coalesce(60).saveAsTextFile(vn_gaidoutput)
dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "PH").map(_.getAs[String]("device_id")).coalesce(60).saveAsTextFile(ph_gaidoutput)
dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "MY").map(_.getAs[String]("device_id")).coalesce(60).saveAsTextFile(my_gaidoutput)
dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "SG").map(_.getAs[String]("device_id")).coalesce(60).saveAsTextFile(sg_gaidoutput)
} finally {
spark.stop()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment