ods_dmp_user_info_daily_v2.sh 4.51 KB
#!/bin/sh

source ../dmp_env.sh

date=$(date +"%Y-%m-%d" -d "$ScheduleTime")
LOG_TIME=$(date +%Y%m%d -d "-1 day $ScheduleTime")

yes_bef2_day=`date -d "$ScheduleTime 2 days ago" +%Y%m%d`
yes_bef1_day=`date -d "$ScheduleTime 1 days ago" +%Y%m%d`
yes_bef1_slack=`date -d "$ScheduleTime 1 days ago" +%Y/%m/%d`
yes_bef2_slack=`date -d "$ScheduleTime 2 days ago" +%Y/%m/%d`

# while [[ true ]];
# do
#     aws s3 ls "s3://mob-emr-test/dataplatform/datawarehourse/ga/${yes_bef2_day}/_SUCCESS"
#     GA_IS_EXIST=$?
#
#     if [[ ${GA_IS_EXIST} -eq 0 ]]
#     then
#         break
#     fi
#     sleep 300
# done


export AWS_ACCESS_KEY_ID=AKIAIBMYT3BZHBYDOMSQ
export AWS_SECRET_ACCESS_KEY=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

## 检查GA文件是否存在
while [[ true ]];
do
    aws s3 ls "s3://mob-emr-test/dataplatform/datawarehourse/ga/${yes_bef2_day}/_SUCCESS"
    GA_IS_EXIST=$?

    if [[ ${GA_IS_EXIST} -eq 0 ]]
    then
        break
    fi
    sleep 300
done


#下载数据到本地
aws s3 sync "s3://mob-emr-test/dataplatform/datawarehourse/ga/${yes_bef2_day}" "./realtime/ga_add/${yes_bef2_day}"


#上传到hive仓库
hadoop dfs -rm -r s3://mob-emr-test/dataplatform/datawarehourse/real-time-query/ga_add/${yes_bef2_day}
hadoop dfs -mkdir -p s3://mob-emr-test/dataplatform/datawarehourse/real-time-query/ga_add/${yes_bef2_day}
hadoop dfs -put -p ./realtime/ga_add/${yes_bef2_day} s3://mob-emr-test/dataplatform/datawarehourse/real-time-query/ga_add/
rm -rf ./realtime/ga_add/${yes_bef2_day}

echo "[ga data prepare End!]"

unset AWS_ACCESS_KEY_ID
unset AWS_SECRET_ACCESS_KEY

echo "[ga data prepare End!]"

#wait for table dsp_profile_total
#check_await "${DMP_ADN_DSP_PROFILE_TOTAL}/${yes_bef1_slack}/_SUCCESS"
#echo "[table dm_profile_total is ready!]"
check_await "${DSP_PROFILE_TOTAL}/${yes_bef1_slack}/_SUCCESS"
echo "[table dsp_profile_total is ready!]"

#wait for table ods_adn_device_total
check_await "${ADN_TOTAL_PATH}/${yes_bef1_slack}/_SUCCESS"
echo "[table ods_adn_device_total is ready!]"

check_await "${AGE_CALC_DEVICE}/${yes_bef1_slack}/_SUCCESS"
echo "[table dm_device_age is ready!]"

# check_await "${GENDER_CALC_DEVICE}/${yes_bef1_slack}/_SUCCESS"
# check_await "${GENDER_MERGE_DEVICE}/${yes_bef1_slack}/_SUCCESS"
echo "[table dm_device_gender is ready!]"

GET_GENDER_DATE=$(get_recently_date "${GENDER_MERGE_DEVICE}" "${LOG_TIME}" "_SUCCESS")


OUTPUT_PATH="${ODS_DMP_USER_INFO_DAILY}_v2/${yes_bef1_day}"

hadoop fs -test -e ${OUTPUT_PATH}
if [[ $? -ne 0 ]];then
hadoop fs -mkdir -p  ${OUTPUT_PATH}
fi

# mount_partition "ods_dmp_user_info_daily" "dt='${yes_bef1_day}'" "${OUTPUT_PATH}"

# check_await ${DMP_EVENT_TAG_DAILY}/day=${yes_bef1_day}/tag_source=3s/_SUCCESS
# check_await ${DMP_EVENT_TAG_DAILY}/day=${yes_bef2_day}/tag_source=ga/_SUCCESS
check_await ${DM_DEVICE_TAG_PATH}/${yes_bef1_slack}/dc/dsp_req/_SUCCESS
check_await ${DM_DEVICE_TAG_PATH}/${yes_bef1_slack}/manual/3s/_SUCCESS
check_await ${DM_DEVICE_TAG_PATH}/${yes_bef1_slack}/manual/adn_install/_SUCCESS
check_await ${DM_DEVICE_TAG_PATH}/${yes_bef1_slack}/manual/adn_request_other/_SUCCESS
check_await ${DM_DEVICE_TAG_PATH}/${yes_bef1_slack}/manual/adn_request_sdk/_SUCCESS
check_await ${DM_DEVICE_TAG_PATH}/${yes_bef1_slack}/manual/adn_sdk/_SUCCESS
check_await ${DM_DEVICE_TAG_PATH}/${yes_bef1_slack}/manual/clever/_SUCCESS
check_await ${DM_DEVICE_TAG_PATH}/${yes_bef1_slack}/manual/dsp_req/_SUCCESS


hadoop fs -rmr ${OUTPUT_PATH}

# --conf spark.memory.offHeap.enabled=true \
# --conf spark.memory.offHeap.size=10737418240 \
# --conf spark.sql.adaptive.enabled=true \
# --conf spark.sql.adaptive.advisoryPartitionSizeInBytes=536870912 \
spark-submit --class  mobvista.dmp.datasource.device.OdsDmpUserInfoDailyV3 \
    --name "DMP_OdsDmpUserInfoDailyV3_${date}-wangjf" \
    --conf spark.network.timeout=720s \
	--conf spark.sql.shuffle.partitions=10000 \
	--conf spark.default.parallelism=10000 \
    --conf spark.yarn.executor.memoryOverhead=2048 \
    --conf spark.sql.files.maxPartitionBytes=536870912 \
    --conf spark.kryoserializer.buffer.max=512m \
    --conf spark.kryoserializer.buffer=64m \
	--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
	--conf spark.driver.extraJavaOptions="-XX:+UseG1GC" \
    --conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
	--master yarn \
	--deploy-mode cluster \
	--executor-memory 10G \
	--driver-memory 6G \
	--executor-cores 4 \
	--num-executors 180 \
	../${JAR}  \
    -cur_day ${date} -gender_date ${GET_GENDER_DATE} -output ${OUTPUT_PATH} -coalesce 2000



if [[ $? -ne 0 ]]; then
    exit 255
fi

hadoop fs -touchz ${OUTPUT_PATH}/_SUCCESS