Commit 0963a022 by WangJinfeng

update rtdmp_request_v2.sh,export reyun daily

parent 02cb5410
......@@ -64,9 +64,11 @@ ADN_ADX_REQ_ORG="s3://mob-ad/adn/adx-v1/request"
ADN_DSP_PATH="s3://mob-ad/adn/dsp_orc/request"
DMP_ADN_REQUEST_DEVICE_IDS="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/etl_ids_mapping"
OUTPUT_REYUN_IMPRESSION_PATH="s3://mob-emr-test/reyun/tracking_impression"
OUTPUT_REYUN_CLICK_PATH="s3://mob-emr-test/reyun/tracking_click"
OUTPUT_REYUN_INSTALL_PATH="s3://mob-emr-test/reyun/tracking_install"
OUTPUT_REYUN_IMPRESSION_PATH="s3://mob-emr-test/mv2reyun/tracking_impression"
OUTPUT_REYUN_CLICK_PATH="s3://mob-emr-test/mv2reyun/tracking_click"
OUTPUT_REYUN_INSTALL_PATH="s3://mob-emr-test/mv2reyun/tracking_install"
OUTPUT_REYUN_EVENT_PATH="s3://mob-emr-test/mv2reyun/dmp_event_daily"
ETL_DSP_REQ_DAILY="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/etl_dsp_request_daily"
ETL_ADN_INSTALL_DAILY="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/etl_adn_install_daily"
......
type=command
command=sh -x event_daily.sh
\ No newline at end of file
#! /bin/bash
# # # # # # # # # # # # # # # # # # # # # #
# @author : wangjf
# # # # # # # # # # # # # # # # # # # # # #
source ../../dmp_env.sh
dt=$(date +"%Y%m%d" -d "-1 day $ScheduleTime")
date_path=$(date +"%Y/%m/%d" -d "-1 day $ScheduleTime")
check_await ${DMP_EVENT_TAG_DAILY}/day=${dt}/tag_source=3s/_SUCCESS
check_await ${DMP_EVENT_TAG_DAILY}/day=${dt}/tag_source=tp/tag_type=other_purchase/_SUCCESS
OUTPUT_PATH="${OUTPUT_REYUN_EVENT_PATH}/${date_path}"
spark-submit --class mobvista.dmp.output.reyun.EventDaily \
--master yarn --deploy-mode cluster --executor-memory 4g --driver-memory 4g --executor-cores 2 --num-executors 5 \
../../${JAR} -date ${dt} -output ${OUTPUT_PATH} -coalesce 10
if [ $? -ne 0 ]; then
exit 255
fi
type=command
dependencies=tracking_impression,tracking_click,tracking_install,event_daily
command=echo 'export reyun success!'
\ No newline at end of file
type=command
command=sh -x tracking_daily.sh
\ No newline at end of file
command=sh -x tracking_daily.sh 'click'
\ No newline at end of file
type=command
dependencies=tracking_impression
command=echo 'tracking output success!'
\ No newline at end of file
type=command
command=sh -x tracking_daily.sh
\ No newline at end of file
command=sh -x tracking_daily.sh 'install'
\ No newline at end of file
......@@ -9,7 +9,7 @@ INPUT_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_tmp_
audience_names="adx_goodreq7 sdk_goodreq7 sdk_wangzhuan_goodreq7"
for audience_name in ${audience_names}; do
S3_PATH="${INPUT_PATH}/${audience_name}/*/"
S3_PATH="${INPUT_PATH}/${audience_name}"
check_await ${S3_PATH}/_SUCCESS
java -cp ../${JAR} mobvista.dmp.datasource.rtdmp.RTDmpRequestV2 "${audience_name}" "${S3_PATH}"
done
......@@ -33,7 +33,7 @@ public class RTDmpRequestV2 {
if (map.containsKey(audience_name)) {
jsonObject = new JSONObject();
jsonObject.put("id", map.get(audience_name));
jsonObject.put("s3_path", s3_path);
jsonObject.put("s3_path", s3_path + "/*/");
jsonObject.put("status", 1);
jsonObject.put("audience_data_status", 1);
JSONArray updateJsonArray = new JSONArray();
......@@ -41,7 +41,7 @@ public class RTDmpRequestV2 {
ServerUtil.update(updateJsonArray);
} else {
jsonObject = new JSONObject();
jsonObject.put("s3_path", s3_path);
jsonObject.put("s3_path", s3_path + "/*/");
jsonObject.put("platform", 1);
jsonObject.put("match_device_type", "1,3,6,9");
jsonObject.put("audience_type", 2);
......
package mobvista.dmp.output.reyun
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import org.apache.commons.cli.Options
import org.apache.spark.sql.SaveMode
/**
* @package: mobvista.dmp.output.reyun
* @author: wangjf
* @date: 2021/9/14
* @time: 2:06 下午
* @email: jinfeng.wang@mobvista.com
*/
class EventDaily extends CommonSparkJob {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("date", true, "[must] date")
options.addOption("output", true, "[must] output")
options.addOption("coalesce", true, "[must] coalesce")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else {
printOptions(commandLine)
}
val date = commandLine.getOptionValue("date")
val output = commandLine.getOptionValue("output")
val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))
val spark = MobvistaConstant.createSparkSession(s"EventDaily.${date}")
try {
val sql =
s"""
|SELECT * FROM dwh.dmp_event_tag_daily where day='${date}' and upper(country) = 'CN'
|""".stripMargin
val df = spark.sql(sql)
df.repartition(coalesce)
.write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output)
} finally {
spark.stop()
}
0
}
}
object EventDaily {
def main(args: Array[String]): Unit = {
new EventDaily().run(args)
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment