1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/bin/bash
source ../dmp_env.sh
business=$1
hh="0"
today=${ScheduleTime}
if [[ ${business} = 'ali_activation' ]]; then
date=$(date +"%Y%m%d" -d "-2 day $today")
date_path=$(date +%Y/%m/%d -d "-2 day $today")
partition=1000
executors=20
coalesce=200
elif [[ ${business} = 'dsp_req' ]]; then
date=$(date +"%Y%m%d" -d "-1 day $today")
date_path=$(date +%Y/%m/%d -d "-1 day $today")
partition=2000
executors=100
coalesce=100
elif [[ ${business} = 'btop' ]]; then
date=$(date +"%Y%m%d" -d "-1 day $today")
date_path=$(date +%Y/%m/%d -d "-1 day $today")
partition=20
executors=20
coalesce=40
elif [[ ${business} = 'uc_activation' ]]; then
date=$(date +"%Y%m%d" -d "0 day $today")
date_path=$(date +%Y/%m/%d -d "0 day $today")
partition=100
executors=20
coalesce=40
elif [[ ${business} = 'iqiyi_activation' ]]; then
date=$(date +"%Y%m%d" -d "0 day $today")
date_path=$(date +%Y/%m/%d -d "0 day $today")
partition=100
executors=10
coalesce=40
elif [[ ${business} = 'alipay_activation' ]]; then
date=$(date +"%Y%m%d" -d "1 day $today")
let hour=$(date +"%H" -d "1 day $today")
hh=$(expr $hour / 6 + 1)
date_path=$(date +%Y/%m/%d/0${hh} -d "1 day $today")
partition=100
executors=10
coalesce=40
elif [[ ${business} = 'alipay_acquisition' ]]; then
date=$(date +"%Y%m%d" -d "1 day $today")
let hour=$(date +"%H" -d "1 day $today")
hh=$(expr $hour / 6 + 1)
date_path=$(date +%Y/%m/%d/0${hh} -d "1 day $today")
partition=100
executors=10
coalesce=40
elif [[ ${business} = 'youku_acquisition' ]]; then
date=$(date +"%Y%m%d" -d "0 day $today")
date_path=$(date +%Y/%m/%d -d "0 day $today")
partition=100
executors=10
coalesce=40
elif [[ ${business} = 'tencent' ]]; then
date=$(date +"%Y%m%d" -d "-1 day $today")
date_path=$(date +%Y/%m/%d -d "-1 day $today")
partition=1000
executors=20
coalesce=100
elif [[ ${business} = 'other' ]]; then
date=$(date +"%Y%m%d" -d "-2 day $today")
date_path=$(date +%Y/%m/%d -d "-2 day $today")
partition=1000
executors=20
coalesce=40
fi
if [[ ${business} = 'alipay_activation' ]]; then
INPUT="${ALIPAY_ACTIVATION_DAILY_PATH}/$date_path"
table="etl_alipay_activation_daily"
elif [[ ${business} = 'alipay_acquisition' ]]; then
INPUT="${ALIPAY_ACQUISITION_DAILY_PATH}/$date_path"
table="etl_alipay_acquisition_daily"
elif [[ ${business} = 'tencent' ]]; then
INPUT="${ETL_COM_TENCENT_NEWS_DAILY}/$date_path"
table="etl_com_tencent_news_daily"
else
INPUT="${DM_INSTALL_LIST}_v2/$date_path/${business}"
table="dm_install_list_v2"
fi
check_await ${INPUT}/_SUCCESS
OUTPUT="s3://mob-emr-test/dataplatform/rtdmp_request/${date_path}/${business}"
spark-submit --class mobvista.dmp.datasource.rtdmp.RTDmpRequest \
--name "RTDmpRequest.${date}.${business}" \
--conf spark.sql.shuffle.partitions=${partition} \
--conf spark.default.parallelism=${partition} \
--conf spark.kryoserializer.buffer.max=256m \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=268435456 \
--master yarn --deploy-mode cluster --executor-memory 10g --driver-memory 4g --executor-cores 3 --num-executors ${executors} \
../${JAR} -date "${date}" -hh "0${hh}" -output ${OUTPUT} -coalesce ${coalesce} -business ${business} -table ${table}
if [[ $? -ne 0 ]]; then
exit 255
fi
# 因 com.taobao.notforactivation 人群包量级过大,且业务侧暂未使用,影响计算性能,故暂时删除
if [[ ${business} = 'ali_activation' ]]; then
RM_OUTPUT="s3://mob-emr-test/dataplatform/rtdmp_request/${date_path}/${business}/com.taobao.notforactivation"
if hadoop fs -ls "$RM_OUTPUT" >/dev/null 2>&1; then
hadoop dfs -rm -r ${RM_OUTPUT}/*
fi
fi
hadoop dfs -touchz ${OUTPUT}/_OK
if [[ $? -ne 0 ]]; then
exit 255
fi
expire_date_path=$(date +%Y/%m/%d -d "-365 day $today")
EXPIRE_OUTPUT_PATH="s3://mob-emr-test/dataplatform/rtdmp_request/${expire_date_path}"
if hadoop fs -ls "$EXPIRE_OUTPUT_PATH" >/dev/null 2>&1; then
hadoop dfs -rm -r ${EXPIRE_OUTPUT_PATH}
fi