Commit 66db526b by WangJinfeng

support reyun tracking impression

parent e1702143
......@@ -59,10 +59,15 @@ ADN_EVENT_PATH="s3://mob-ad/adn/tracking-v3/event"
ADN_REQUEST_PATH="s3://mob-ad/adn/tracking-v3/request"
HB_REQUEST_PATH="s3://mob-ad/adn/hb-v1/request"
ADN_PRE_CLICK_PATH="s3://mob-ad/adn/tracking-v3/pre_click"
ADN_IMPRESSION_PATH="s3://mob-ad/adn/tracking-v3/impression"
ADN_ADX_REQ_ORG="s3://mob-ad/adn/adx-v1/request"
ADN_DSP_PATH="s3://mob-ad/adn/dsp_orc/request"
DMP_ADN_REQUEST_DEVICE_IDS="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/etl_ids_mapping"
OUTPUT_REYUN_IMPRESSION_PATH="s3://mob-emr-test/reyun/tracking_impression"
OUTPUT_REYUN_CLICK_PATH="s3://mob-emr-test/reyun/tracking_click"
OUTPUT_REYUN_INSTALL_PATH="s3://mob-emr-test/reyun/tracking_install"
ETL_DSP_REQ_DAILY="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/etl_dsp_request_daily"
ETL_ADN_INSTALL_DAILY="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/etl_adn_install_daily"
ETL_ADN_SDK_CLICK_DAILY="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/etl_adn_sdk_click_daily"
......
type=command
command=sh -x tracking_daily.sh
\ No newline at end of file
type=command
dependencies=tracking_impression
command=echo 'tracking output success!'
\ No newline at end of file
#! /bin/bash
# # # # # # # # # # # # # # # # # # # # # #
# @author : wangjf
# # # # # # # # # # # # # # # # # # # # # #
source ../../dmp_env.sh
dt=$(date +"%Y%m%d" -d "-1 day $ScheduleTime")
date_path=$(date +"%Y/%m/%d" -d "-1 day $ScheduleTime")
log_type=$1
hours="00 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23"
if [[ ${log_type} = 'impression' ]]; then
INPUT_PATH="${ADN_IMPRESSION_PATH}/${date_path}/beijing"
elif [[ ${log_type} = 'click' ]]; then
INPUT_PATH="${ADN_CLICK_PATH}/${date_path}/beijing"
elif [[ ${log_type} = 'install' ]]; then
INPUT_PATH="${ADN_INSTALL_PATH}/${date_path}/beijing"
fi
for hour in ${hours}; do
check_await ${INPUT_PATH}/${hour}/_SUCCESS
done
if [[ ${log_type} = 'impression' ]]; then
OUTPUT_PATH="${OUTPUT_REYUN_IMPRESSION_PATH}/${date_path}"
elif [[ ${log_type} = 'click' ]]; then
OUTPUT_PATH="${OUTPUT_REYUN_CLICK_PATH}/${date_path}"
elif [[ ${log_type} = 'install' ]]; then
OUTPUT_PATH="${OUTPUT_REYUN_INSTALL_PATH}/${date_path}"
fi
spark-submit --class mobvista.dmp.output.reyun.TrackingLog \
--master yarn --deploy-mode cluster --executor-memory 6g --driver-memory 4g --executor-cores 4 --num-executors 5 \
../../${JAR} -date ${dt} -log_type ${log_type} -output ${OUTPUT_PATH} -coalesce 20
if [ $? -ne 0 ]; then
exit 255
fi
type=command
command=sh -x tracking_daily.sh 'impression'
\ No newline at end of file
type=command
command=sh -x tracking_daily.sh
\ No newline at end of file
package mobvista.dmp.output.reyun
/**
* @package: mobvista.dmp.output.reyun
* @author: wangjf
* @date: 2021/9/14
* @time: 2:14 下午
* @email: jinfeng.wang@mobvista.com
*/
object Constant {
val tracking_impression =
"""
|SELECT created, app_id, creative_id, platform, os_version, device_brand, device_model, country_code, network_type, ip, imei,
| mac, dev_id, idfa, ext_packagename, ext_finalpackagename, ext_channel, ext_oaid, ext_advinstalltime, ext_eventtime,
| ext_campaignpackagename
| FROM dwh.ods_adn_trackingnew_impression
| WHERE CONCAT(yyyy,mm,dd) = '@date' AND re = 'beijing' AND UPPER(country_code) = 'CN'
|""".stripMargin
val tracking_install =
"""
|SELECT created, app_id, advertiser_id, creative_id, platform, os_version, device_brand, device_model, country_code, network_type,
| ip, imei, mac, dev_id, idfa, ext_campaignpackagename, ext_finalpackagename, ext_advinstalltime, ext_oaid, ext_eventtime
| FROM dwh.ods_adn_trackingnew_install
| WHERE CONCAT(yyyy,mm,dd) = '@date' AND re = 'beijing' AND UPPER(country_code) = 'CN'
|""".stripMargin
val tracking_click =
"""
|SELECT created, app_id, creative_id, platform, os_version, device_brand, device_model, country_code, network_type, ip, imei,
| mac, dev_id, idfa, ext_packagename, ext_finalpackagename, ext_channel, ext_oaid, ext_advinstalltime, ext_eventtime,
| ext_campaignpackagename
| FROM dwh.ods_adn_trackingnew_click
| WHERE CONCAT(yyyy,mm,dd) = '@date' AND re = 'beijing' AND UPPER(country_code) = 'CN'
|""".stripMargin
}
package mobvista.dmp.output.reyun
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.output.reyun.Constant._
import org.apache.commons.cli.Options
import org.apache.spark.sql.SaveMode
/**
* @package: mobvista.dmp.output.reyun
* @author: wangjf
* @date: 2021/9/14
* @time: 2:06 下午
* @email: jinfeng.wang@mobvista.com
*/
class TrackingLog extends CommonSparkJob {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("log_type", true, "[must] log_type")
options.addOption("date", true, "[must] date")
options.addOption("output", true, "[must] output")
options.addOption("coalesce", true, "[must] coalesce")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else {
printOptions(commandLine)
}
val log_type = commandLine.getOptionValue("log_type")
val date = commandLine.getOptionValue("date")
val output = commandLine.getOptionValue("output")
val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))
val spark = MobvistaConstant.createSparkSession(s"TrackingLog.${log_type}.${date}")
try {
val sql =
log_type match {
case "impression" =>
tracking_impression
case "install" =>
tracking_install
case "click" =>
tracking_click
}
val df = spark.sql(sql.replace("@date", date))
df.repartition(coalesce)
.write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output)
} finally {
spark.stop()
}
0
}
}
object TrackingLog {
def main(args: Array[String]): Unit = {
new TrackingLog().run(args)
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment