Commit f671611c by WangJinfeng

update export_reyun_daily

parent 9bc44af8
...@@ -70,6 +70,8 @@ OUTPUT_REYUN_INSTALL_PATH="s3://mob-emr-test/mv2reyun/tracking_install" ...@@ -70,6 +70,8 @@ OUTPUT_REYUN_INSTALL_PATH="s3://mob-emr-test/mv2reyun/tracking_install"
OUTPUT_REYUN_EVENT_PATH="s3://mob-emr-test/mv2reyun/dmp_event_daily" OUTPUT_REYUN_EVENT_PATH="s3://mob-emr-test/mv2reyun/dmp_event_daily"
OUTPUT_REYUN_USER_INFO_PATH="s3://mob-emr-test/mv2reyun/dmp_user_info"
ETL_DSP_REQ_DAILY="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/etl_dsp_request_daily" ETL_DSP_REQ_DAILY="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/etl_dsp_request_daily"
ETL_ADN_INSTALL_DAILY="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/etl_adn_install_daily" ETL_ADN_INSTALL_DAILY="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/etl_adn_install_daily"
ETL_ADN_SDK_CLICK_DAILY="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/etl_adn_sdk_click_daily" ETL_ADN_SDK_CLICK_DAILY="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/etl_adn_sdk_click_daily"
......
type=command type=command
dependencies=tracking_impression,tracking_click,tracking_install,event_daily dependencies=tracking_impression,tracking_click,tracking_install,event_daily,user_info
command=echo 'export reyun success!' command=echo 'export reyun success!'
\ No newline at end of file
...@@ -33,8 +33,8 @@ elif [[ ${log_type} = 'install' ]]; then ...@@ -33,8 +33,8 @@ elif [[ ${log_type} = 'install' ]]; then
fi fi
spark-submit --class mobvista.dmp.output.reyun.TrackingLog \ spark-submit --class mobvista.dmp.output.reyun.TrackingLog \
--master yarn --deploy-mode cluster --executor-memory 6g --driver-memory 4g --executor-cores 4 --num-executors 5 \ --master yarn --deploy-mode cluster --executor-memory 6g --driver-memory 4g --executor-cores 4 --num-executors 10 \
../../${JAR} -date ${dt} -log_type ${log_type} -output ${OUTPUT_PATH} -coalesce 20 ../../${JAR} -date ${dt} -log_type ${log_type} -output ${OUTPUT_PATH} -coalesce 100
if [ $? -ne 0 ]; then if [ $? -ne 0 ]; then
exit 255 exit 255
......
type=command
command=sh -x user_info.sh
\ No newline at end of file
#! /bin/bash
# # # # # # # # # # # # # # # # # # # # # #
# @author : wangjf
# # # # # # # # # # # # # # # # # # # # # #
source ../../dmp_env.sh
dt=$(date +"%Y%m%d" -d "-1 day $ScheduleTime")
date_path=$(date +"%Y%m/%d" -d "-1 day $ScheduleTime")
check_await ${ODS_DMP_USER_INFO_ALL}/${dt}/_SUCCESS
OUTPUT_PATH="${OUTPUT_REYUN_USER_INFO_PATH}/${date_path}"
spark-submit --class mobvista.dmp.output.reyun.UserInfo \
--master yarn --deploy-mode cluster --executor-memory 6g --driver-memory 4g --executor-cores 3 --num-executors 100 \
../../${JAR} -date ${dt} -output ${OUTPUT_PATH} -coalesce 1000
if [ $? -ne 0 ]; then
exit 255
fi
...@@ -22,7 +22,7 @@ object Constant { ...@@ -22,7 +22,7 @@ object Constant {
|SELECT created, app_id, advertiser_id, creative_id, platform, os_version, device_brand, device_model, country_code, network_type, |SELECT created, app_id, advertiser_id, creative_id, platform, os_version, device_brand, device_model, country_code, network_type,
| ip, imei, mac, dev_id, idfa, ext_campaignpackagename, ext_finalpackagename, ext_advinstalltime, ext_oaid, ext_eventtime | ip, imei, mac, dev_id, idfa, ext_campaignpackagename, ext_finalpackagename, ext_advinstalltime, ext_oaid, ext_eventtime
| FROM dwh.ods_adn_trackingnew_install | FROM dwh.ods_adn_trackingnew_install
| WHERE CONCAT(yyyy,mm,dd) = '@date' AND re = 'beijing' AND UPPER(country_code) = 'CN' | WHERE CONCAT(yyyy,mm,dd) = '@date'
|""".stripMargin |""".stripMargin
val tracking_click = val tracking_click =
...@@ -31,6 +31,13 @@ object Constant { ...@@ -31,6 +31,13 @@ object Constant {
| mac, dev_id, idfa, ext_packagename, ext_finalpackagename, ext_channel, ext_oaid, ext_advinstalltime, ext_eventtime, | mac, dev_id, idfa, ext_packagename, ext_finalpackagename, ext_channel, ext_oaid, ext_advinstalltime, ext_eventtime,
| ext_campaignpackagename | ext_campaignpackagename
| FROM dwh.ods_adn_trackingnew_click | FROM dwh.ods_adn_trackingnew_click
| WHERE CONCAT(yyyy,mm,dd) = '@date' AND re = 'beijing' AND UPPER(country_code) = 'CN' | WHERE CONCAT(yyyy,mm,dd) = '@date'
|""".stripMargin
val user_info =
"""
|SELECT dev_id, dev_id_md5, LOWER(dev_type) dev_type, LOWER(platform) platform, UPPER(country) country, install, interest, update_date
| FROM dwh.ods_dmp_user_info_all
| WHERE dt = '@date' AND UPPER(country) = 'CN'
|""".stripMargin |""".stripMargin
} }
...@@ -38,7 +38,7 @@ class EventDaily extends CommonSparkJob { ...@@ -38,7 +38,7 @@ class EventDaily extends CommonSparkJob {
val sql = val sql =
s""" s"""
|SELECT * FROM dwh.dmp_event_tag_daily where day='${date}' and upper(country) = 'CN' |SELECT * FROM dwh.dmp_event_tag_daily where day='${date}'
|""".stripMargin |""".stripMargin
val df = spark.sql(sql) val df = spark.sql(sql)
......
package mobvista.dmp.output.reyun
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import org.apache.commons.cli.Options
import org.apache.spark.sql.SaveMode
/**
* @package: mobvista.dmp.output.reyun
* @author: wangjf
* @date: 2021/9/14
* @time: 2:06 下午
* @email: jinfeng.wang@mobvista.com
*/
class UserInfo extends CommonSparkJob {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("date", true, "[must] date")
options.addOption("output", true, "[must] output")
options.addOption("coalesce", true, "[must] coalesce")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else {
printOptions(commandLine)
}
val date = commandLine.getOptionValue("date")
val output = commandLine.getOptionValue("output")
val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))
val spark = MobvistaConstant.createSparkSession(s"UserInfo.${date}")
try {
val sql = Constant.user_info.replace("@date", date)
val df = spark.sql(sql)
df.repartition(coalesce)
.write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output)
} finally {
spark.stop()
}
0
}
}
object UserInfo {
def main(args: Array[String]): Unit = {
new UserInfo().run(args)
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment