1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#!/bin/bash
source ../dmp_env.sh
region=$1
today=${ScheduleTime:-$2}
datetime=$(date +"%Y/%m/%d %H" -d "1 hour ago $today")
date=${datetime:0:10}
hour=${datetime:11:2}
date_path="${date}/${region}/${hour}"
INPUT_DSP_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dev/etl_dsp_request_hour/${date_path}"
INPUT_ADN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dev/etl_adn_sdk_request_hour/${date_path}"
before_date_path=$(date +"%Y/%m/%d" -d "2 days ago $today")
INPUT_APP_TAG="s3://mob-emr-test/dataplatform/data/dwh/app_tag_id"
LOG_TIME=$(date +"%Y%m%d" -d "1 days ago $today")
INPUT_APP_TAG_PATH=$(get_recently_dir "${INPUT_APP_TAG}" "${LOG_TIME}" "")
check_await "${INPUT_DSP_PATH}/_SUCCESS"
sleep 60
host="ip-172-31-20-35.ec2.internal"
cluster="cluster_1st"
database="dwh"
table="realtime_service_hour"
if [[ ${region} == "cn" ]]; then
parallelism=10
partition=10
executors=5
else
parallelism=1000
partition=1000
executors=20
fi
spark-submit --class mobvista.dmp.clickhouse.realtime.MergeEtlHourToCK \
--name "MergeEtlHourToCK_${date}_${hour}_${region}" \
--conf spark.sql.shuffle.partitions=${partition} \
--conf spark.default.parallelism=${parallelism} \
--conf spark.kryoserializer.buffer.max=512m \
--conf spark.kryoserializer.buffer=64m \
--conf spark.sql.files.maxPartitionBytes=268435456 \
--conf spark.driver.extraJavaOptions="-XX:+UseG1GC" \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
--files ${HIVE_SITE_PATH} \
--master yarn --deploy-mode cluster --executor-memory 6g --driver-memory 6g --executor-cores 2 --num-executors ${executors} \
../${JAR} -date ${date} -host ${host} -cluster ${cluster} -database ${database} -table ${table} \
--input_dsp ${INPUT_DSP_PATH} --input_adn ${INPUT_ADN_PATH} --region ${region} --hour ${hour} --app_tag_input ${INPUT_APP_TAG_PATH}
if [[ $? -ne 0 ]]; then
exit 255
fi