#!/bin/bash

source ../dmp_env.sh

today=${ScheduleTime:-$1}

old_time=$(date +"%Y%m%d%H" -d "-2 hour $today")

curr_time=$(date +"%Y%m%d%H" -d "-1 hour $today")

old_date_path=$(date +%Y/%m/%d/%H -d "-2 hour $today")

date_path=$(date +%Y/%m/%d/%H -d "-1 hour $today")

BASE_PATH="s3://mob-emr-test/dataplatform/rtdmp_deal"

HOUR_1_DATE=$(date +%Y/%m/%d/%H -d "-1 hour $today")
# HOUR_2_DATE=$(date +%Y/%m/%d/%H -d "-2 hour $today")
# HOUR_3_DATE=$(date +%Y/%m/%d/%H -d "-3 hour $today")
# HOUR_4_DATE=$(date +%Y/%m/%d/%H -d "-4 hour $today")
# HOUR_5_DATE=$(date +%Y/%m/%d/%H -d "-5 hour $today")
# HOUR_6_DATE=$(date +%Y/%m/%d/%H -d "-6 hour $today")

# INPUT="${BASE_PATH}/${HOUR_1_DATE},${BASE_PATH}/${HOUR_2_DATE},${BASE_PATH}/${HOUR_3_DATE},${BASE_PATH}/${HOUR_4_DATE},${BASE_PATH}/${HOUR_5_DATE},${BASE_PATH}/${HOUR_6_DATE}"

INPUT="${BASE_PATH}/${HOUR_1_DATE}"

check_await ${BASE_PATH}/${HOUR_1_DATE}/_SUCCESS
# check_await ${BASE_PATH}/${HOUR_2_DATE}/_SUCCESS
# check_await ${BASE_PATH}/${HOUR_3_DATE}/_SUCCESS
# check_await ${BASE_PATH}/${HOUR_4_DATE}/_SUCCESS
# check_await ${BASE_PATH}/${HOUR_5_DATE}/_SUCCESS
# check_await ${BASE_PATH}/${HOUR_6_DATE}/_SUCCESS

MERGE_INPUT="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/audience_merge/${old_date_path}"

check_await ${MERGE_INPUT}/_SUCCESS

sleep 120

OUTPUT="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/audience_merge/${date_path}"

spark-submit --class mobvista.dmp.datasource.rtdmp.RTDmpMerge \
     --name "RTDmpMerge.wangjf.${curr_time}" \
     --conf spark.sql.shuffle.partitions=1000 \
     --conf spark.default.parallelism=1000 \
     --conf spark.kryoserializer.buffer.max=256m \
     --conf spark.speculation=false \
     --conf spark.speculation.quantile=0.9 \
     --conf spark.speculation.multiplier=1.3 \
     --conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
     --master yarn --deploy-mode cluster --executor-memory 18g --driver-memory 6g  --executor-cores 5 --num-executors 20 \
     ../${JAR} -date_time "${curr_time}" -old_time "${old_time}" -input ${INPUT} -output ${OUTPUT} -partition 100

if [[ $? -ne 0 ]]; then
    exit 255
fi

mount_partition "audience_merge" "dt='${curr_time}'" "$OUTPUT"

expire_time=$(date +"%Y%m%d%H" -d "-25 hour $today")

expire_date_path=$(date +%Y/%m/%d/%H -d "-25 hour $today")

EXPIRE_OUTPUT_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/audience_merge/${expire_date_path}"
# 删除过期的分区及删除对应路径
unmount_partition "audience_merge" "dt='${expire_time}'" "${EXPIRE_OUTPUT_PATH}"