Commit 9d586698 by WangJinfeng

update lazada_rtdmp

parent b3907a3b
type=command
dependencies=merge_install
dependencies=merge_install,merge_event
command=bash -x lazada_rtdmp.sh
\ No newline at end of file
type=command
dependencies=etl_event
command=bash -x merge_event.sh
\ No newline at end of file
#!/bin/bash
source ../../dmp_env.sh
today=${ScheduleTime}
date=$(date +"%Y%m%d" -d "-1 day $today")
date_path=$(date +"%Y/%m/%d/" -d "-1 day $today")
OUTPUT="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/merge_adn_tracking_event/${date_path}"
old_date_path=$(date +"%Y/%m/%d/" -d "-2 day $today")
OLD_OUTPUT="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/merge_adn_tracking_event/${old_date_path}"
check_await "$OLD_OUTPUT/_SUCCESS"
spark-submit --class mobvista.dmp.datasource.rtdmp.lazada.MergeEventJob \
--name "MergeInstallJob.${date}" \
--conf spark.sql.shuffle.partitions=4000 \
--conf spark.default.parallelism=4000 \
--conf spark.kryoserializer.buffer.max=256m \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
--master yarn --deploy-mode cluster --executor-memory 6g --driver-memory 4g --executor-cores 4 --num-executors 5 \
../../${JAR} -dt ${date} -output ${OUTPUT}
if [[ $? -ne 0 ]]; then
exit 255
fi
mount_partition "merge_adn_tracking_event" "dt='${date}'" "$OUTPUT"
type=command
dependencies=etl_install,etl_event
dependencies=etl_install
command=bash -x merge_install.sh
\ No newline at end of file
......@@ -13,7 +13,7 @@ object Constant {
"""
|SELECT campaign_id,gaid
| FROM dwh.ods_adn_trackingnew_install
| WHERE CONCAT(yyyy,mm,dd) = @dt AND UPPER(country_code) IN ('ID','PH')
| WHERE CONCAT(yyyy,mm,dd) = @dt AND UPPER(country_code) IN ('ID','PH','MY','TH','VN')
| AND ext_campaignpackagename = 'com.lazada.android'
| GROUP BY campaign_id,gaid
|""".stripMargin
......@@ -22,7 +22,7 @@ object Constant {
"""
|SELECT campaign_id,gaid
| FROM dwh.ods_adn_tracking_ss_event
| WHERE CONCAT(yyyy,mm,dd) = @dt AND country in('ID','PH') AND event_name = 'REGISTRATION'
| WHERE CONCAT(yyyy,mm,dd) = @dt AND country in ('ID','PH','MY','TH','VN') AND event_name = 'REGISTRATION'
| GROUP BY campaign_id,gaid
|""".stripMargin
......@@ -45,12 +45,31 @@ object Constant {
| ON t1.campaign_id = t2.campaign_id AND t1.gaid = t2.gaid
|""".stripMargin
val merge_event_sql: String =
"""
|SELECT
| COALESCE(t1.campaign_id, t2.campaign_id) campaign_id,
| COALESCE(t1.gaid, t2.gaid) gaid,
| COALESCE(t1.update_date, t2.update_date) update_date
| FROM
| (SELECT campaign_id, gaid, '@new_date' update_date
| FROM dwh.etl_adn_tracking_event
| WHERE dt = '@dt'
| ) t1
| FULL OUTER JOIN
| (SELECT campaign_id, gaid, update_date
| FROM dwh.merge_adn_tracking_event
| WHERE dt = '@before_date' AND update_date > '@update_date'
| ) t2
| ON t1.campaign_id = t2.campaign_id AND t1.gaid = t2.gaid
|""".stripMargin
val process_rtdmp_audience_sql: String =
"""
|SELECT t2.gaid
| FROM
| (SELECT campaign_id,gaid
| FROM dwh.etl_adn_tracking_event WHERE dt = '@dt'
| FROM dwh.merge_adn_tracking_event WHERE dt = '@dt'
| ) t1
| RIGHT JOIN
| (SELECT campaign_id,gaid
......
package mobvista.dmp.datasource.rtdmp.lazada
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.util.DateUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.sql.SaveMode
/**
* @package: mobvista.dmp.datasource.rtdmp.lazada
* @author: wangjf
* @date: 2021/8/5
* @time: 7:23 下午
* @email: jinfeng.wang@mobvista.com
*/
class MergeEventJob extends CommonSparkJob with Serializable {
def commandOptions(): Options = {
val options = new Options()
options.addOption("dt", true, "dt")
options.addOption("tb_type", true, "tb_type")
options.addOption("output", true, "output")
options
}
override protected def run(args: Array[String]): Int = {
val parser = new BasicParser()
val options = commandOptions()
val commandLine = parser.parse(options, args)
val dt = commandLine.getOptionValue("dt")
val output = commandLine.getOptionValue("output")
val spark = MobvistaConstant.createSparkSession(s"MergeEventJob.${dt}")
val sc = spark.sparkContext
try {
val new_date = MobvistaConstant.sdf1.format(MobvistaConstant.sdf2.parse(dt))
val update_date = DateUtil.getDay(DateUtil.parse(dt, "yyyyMMdd"), "yyyy-MM-dd", -7)
val before_date = DateUtil.getDay(DateUtil.parse(dt, "yyyyMMdd"), "yyyyMMdd", -1)
val sql: String = Constant.merge_event_sql
.replace("@dt", dt)
.replace("@new_date", new_date)
.replace("@update_date", update_date)
.replace("@before_date", before_date)
spark.sql(sql)
.coalesce(100)
.write
.option("orc.compress", "zlib")
.mode(SaveMode.Overwrite)
.orc(output)
} finally {
if (sc != null) {
sc.stop()
}
if (spark != null) {
spark.stop()
}
}
0
}
}
object MergeEventJob {
def main(args: Array[String]): Unit = {
new MergeEventJob().run(args)
}
}
\ No newline at end of file
......@@ -35,7 +35,7 @@ class MergeInstallJob extends CommonSparkJob with Serializable {
try {
val new_date = MobvistaConstant.sdf1.format(MobvistaConstant.sdf2.parse(dt))
val update_date = DateUtil.getDay(DateUtil.parse(dt, "yyyyMMdd"), "yyyy-MM-dd", -30)
val update_date = DateUtil.getDay(DateUtil.parse(dt, "yyyyMMdd"), "yyyy-MM-dd", -8)
val before_date = DateUtil.getDay(DateUtil.parse(dt, "yyyyMMdd"), "yyyyMMdd", -1)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment