#!/bin/bash source ../dmp_env.sh today=${ScheduleTime:-$1} old_time=$(date +"%Y%m%d%H" -d "-2 hour $today") curr_time=$(date +"%Y%m%d%H" -d "-1 hour $today") old_date_path=$(date +%Y/%m/%d/%H -d "-2 hour $today") date_path=$(date +%Y/%m/%d/%H -d "-1 hour $today") BASE_PATH="s3://mob-emr-test/dataplatform/rtdmp_deal" HOUR_1_DATE=$(date +%Y/%m/%d/%H -d "-1 hour $today") # HOUR_2_DATE=$(date +%Y/%m/%d/%H -d "-2 hour $today") # HOUR_3_DATE=$(date +%Y/%m/%d/%H -d "-3 hour $today") # HOUR_4_DATE=$(date +%Y/%m/%d/%H -d "-4 hour $today") # HOUR_5_DATE=$(date +%Y/%m/%d/%H -d "-5 hour $today") # HOUR_6_DATE=$(date +%Y/%m/%d/%H -d "-6 hour $today") # INPUT="${BASE_PATH}/${HOUR_1_DATE},${BASE_PATH}/${HOUR_2_DATE},${BASE_PATH}/${HOUR_3_DATE},${BASE_PATH}/${HOUR_4_DATE},${BASE_PATH}/${HOUR_5_DATE},${BASE_PATH}/${HOUR_6_DATE}" INPUT="${BASE_PATH}/${HOUR_1_DATE}" check_await ${BASE_PATH}/${HOUR_1_DATE}/_SUCCESS # check_await ${BASE_PATH}/${HOUR_2_DATE}/_SUCCESS # check_await ${BASE_PATH}/${HOUR_3_DATE}/_SUCCESS # check_await ${BASE_PATH}/${HOUR_4_DATE}/_SUCCESS # check_await ${BASE_PATH}/${HOUR_5_DATE}/_SUCCESS # check_await ${BASE_PATH}/${HOUR_6_DATE}/_SUCCESS MERGE_INPUT="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/audience_merge/${old_date_path}" check_await ${MERGE_INPUT}/_SUCCESS sleep 120 OUTPUT="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/audience_merge/${date_path}" spark-submit --class mobvista.dmp.datasource.rtdmp.RTDmpMerge \ --name "RTDmpMerge.wangjf.${curr_time}" \ --conf spark.sql.shuffle.partitions=1000 \ --conf spark.default.parallelism=1000 \ --conf spark.kryoserializer.buffer.max=256m \ --conf spark.speculation=false \ --conf spark.speculation.quantile=0.9 \ --conf spark.speculation.multiplier=1.3 \ --conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \ --master yarn --deploy-mode cluster --executor-memory 18g --driver-memory 6g --executor-cores 5 --num-executors 20 \ ../${JAR} -date_time "${curr_time}" -old_time "${old_time}" -input ${INPUT} -output ${OUTPUT} -partition 100 if [[ $? -ne 0 ]]; then exit 255 fi mount_partition "audience_merge" "dt='${curr_time}'" "$OUTPUT" expire_time=$(date +"%Y%m%d%H" -d "-25 hour $today") expire_date_path=$(date +%Y/%m/%d/%H -d "-25 hour $today") EXPIRE_OUTPUT_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/audience_merge/${expire_date_path}" # 删除过期的分区及删除对应路径 unmount_partition "audience_merge" "dt='${expire_time}'" "${EXPIRE_OUTPUT_PATH}"