1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/bin/sh
source ../dmp_env.sh
# QUEUE_NAME="dataplatform"
# ScheduleTime=${ScheduleTime:-$1}
date=$(date +"%Y-%m-%d" -d "$ScheduleTime")
yes_bef2_day=`date -d "$ScheduleTime 2 days ago" +%Y%m%d`
yes_bef1_day=`date -d "$ScheduleTime 1 days ago" +%Y%m%d`
OUTPUT_PATH="${ODS_DMP_USER_INFO_ALL}/${yes_bef1_day}"
unmount_day=`date -d "$ScheduleTime 7 days ago" +%Y%m%d`
UNMOUNT_OUTPUT_PATH="${ODS_DMP_USER_INFO_ALL}/${unmount_day}"
# hadoop fs -test -e ${OUTPUT_PATH}
# if [ $? -ne 0 ];then
# hadoop fs -mkdir -p ${OUTPUT_PATH}
# fi
check_await ${ODS_DMP_USER_INFO_ALL}/${yes_bef2_day}/_SUCCESS
check_await ${ODS_DMP_USER_INFO_DAILY}/${yes_bef1_day}/_SUCCESS
hadoop fs -rmr ${OUTPUT_PATH}
date_path=$(date +%Y/%m/%d -d "-1 day $ScheduleTime")
INPUT_PUBLISH_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dev/realtime_service_device/${date_path}/*/*/*"
## hive_cmd "
## use dwh;
## set mapreduce.map.memory.mb=2560;
## et mapreduce.map.java.opts=-Xmx2048m;
## et mapreduce.reduce.memory.mb=2560;
## et mapreduce.reduce.java.opts=-Xmx2048m;
## insert overwrite table dwh.ods_dmp_user_info_all partition(dt='${yes_bef1_day}')
## select coalesce(b.dev_id,a.dev_id) as dev_id,
## coalesce(b.dev_id_md5,a.dev_id_md5) as dev_id_md5,
## coalesce(b.dev_type,a.dev_type) as dev_type,
## coalesce(b.platform,a.platform) as platform,
## coalesce(b.install,a.install) as install,
## coalesce(b.interest,a.interest) as interest,
## coalesce(b.model,a.model) as model,
## coalesce(b.country,a.country) as country,
## coalesce(b.osversion,a.osversion) as osversion,
## coalesce(b.age,a.age) as age,
## coalesce(b.gender,a.gender) as gender,
## coalesce(b.behavior,a.behavior) as behavior,
## coalesce(b.update_date,a.update_date) as update_date
## from (select * from dwh.ods_dmp_user_info_all where dt='${yes_bef2_day}') a
## full join (select * from dwh.ods_dmp_user_info_daily where dt='${yes_bef1_day}' ) b
## on a.dev_id=b.dev_id;
## "
# hadoop fs -rmr ${OUTPUT_PATH}
# hive_cmd "
# set hive.cli.print.header=false;
# set hive.optimize.index.filter=true;
# set mapreduce.task.io.sort.mb=512;
# set mapreduce.map.speculative=true;
# set mapreduce.reduce.speculative=true;
#
# use dwh;
# set mapreduce.map.memory.mb=2560;
# set mapreduce.map.java.opts=-Xmx2048m;
# set mapreduce.reduce.memory.mb=2560;
# set mapreduce.reduce.java.opts=-Xmx2048m;
# insert overwrite table dwh.ods_dmp_user_info_all partition(dt='${yes_bef1_day}')
# select coalesce(b.dev_id,a.dev_id) as dev_id,
# coalesce(b.dev_id_md5,a.dev_id_md5) as dev_id_md5,
# coalesce(b.dev_type,a.dev_type) as dev_type,
# coalesce(b.platform,a.platform) as platform,
# coalesce(b.install,a.install) as install,
# coalesce(b.interest,a.interest) as interest,
# coalesce(b.model,a.model) as model,
# coalesce(b.country,a.country) as country,
# coalesce(b.osversion,a.osversion) as osversion,
# coalesce(b.age,a.age) as age,
# coalesce(b.gender,a.gender) as gender,
# coalesce(b.behavior,a.behavior) as behavior,
# coalesce(b.update_date,a.update_date) as update_date
# from (select * from dwh.ods_dmp_user_info_all where dt='${yes_bef2_day}') a
# full join (select * from dwh.ods_dmp_user_info_daily where dt='${yes_bef1_day}' ) b
# on a.dev_id=b.dev_id;
# "
# --conf spark.memory.offHeap.enabled=true \
# --conf spark.memory.offHeap.size=6442450944 \
# --conf spark.sql.adaptive.enabled=true \
# --conf spark.sql.adaptive.advisoryPartitionSizeInBytes=268435456 \
spark-submit --class mobvista.dmp.datasource.device.OdsDmpUserInfoAll \
--name "OdsDmpUserInfoAll.${yes_bef1_day}" \
--conf spark.sql.shuffle.partitions=20000 \
--conf spark.default.parallelism=2000 \
--conf spark.sql.files.maxPartitionBytes=268435456 \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=268435456 \
--files ${HIVE_SITE_PATH} \
--master yarn \
--deploy-mode cluster \
--executor-memory 10G \
--driver-memory 6G \
--executor-cores 3 \
--num-executors 300 \
../${JAR} \
-cur_day ${date} -output ${OUTPUT_PATH} -coalesce 5000 -input ${INPUT_PUBLISH_PATH}
if [[ $? -ne 0 ]]; then
exit 255
fi
mount_partition "ods_dmp_user_info_all" "dt='${yes_bef1_day}'" "${OUTPUT_PATH}"
if [[ $? -ne 0 ]]; then
exit 255
fi
# hadoop fs -touchz ${OUTPUT_PATH}/_SUCCESS
unmount_partition "ods_dmp_user_info_all" "dt='${unmount_day}'" "${UNMOUNT_OUTPUT_PATH}"