ods_dmp_user_info_daily_v2.sh 4.51 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
#!/bin/sh

source ../dmp_env.sh

date=$(date +"%Y-%m-%d" -d "$ScheduleTime")
LOG_TIME=$(date +%Y%m%d -d "-1 day $ScheduleTime")

yes_bef2_day=`date -d "$ScheduleTime 2 days ago" +%Y%m%d`
yes_bef1_day=`date -d "$ScheduleTime 1 days ago" +%Y%m%d`
yes_bef1_slack=`date -d "$ScheduleTime 1 days ago" +%Y/%m/%d`
yes_bef2_slack=`date -d "$ScheduleTime 2 days ago" +%Y/%m/%d`

# while [[ true ]];
# do
#     aws s3 ls "s3://mob-emr-test/dataplatform/datawarehourse/ga/${yes_bef2_day}/_SUCCESS"
#     GA_IS_EXIST=$?
#
#     if [[ ${GA_IS_EXIST} -eq 0 ]]
#     then
#         break
#     fi
#     sleep 300
# done


export AWS_ACCESS_KEY_ID=AKIAIBMYT3BZHBYDOMSQ
export AWS_SECRET_ACCESS_KEY=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

## 检查GA文件是否存在
while [[ true ]];
do
    aws s3 ls "s3://mob-emr-test/dataplatform/datawarehourse/ga/${yes_bef2_day}/_SUCCESS"
    GA_IS_EXIST=$?

    if [[ ${GA_IS_EXIST} -eq 0 ]]
    then
        break
    fi
    sleep 300
done


#下载数据到本地
aws s3 sync "s3://mob-emr-test/dataplatform/datawarehourse/ga/${yes_bef2_day}" "./realtime/ga_add/${yes_bef2_day}"


#上传到hive仓库
hadoop dfs -rm -r s3://mob-emr-test/dataplatform/datawarehourse/real-time-query/ga_add/${yes_bef2_day}
hadoop dfs -mkdir -p s3://mob-emr-test/dataplatform/datawarehourse/real-time-query/ga_add/${yes_bef2_day}
hadoop dfs -put -p ./realtime/ga_add/${yes_bef2_day} s3://mob-emr-test/dataplatform/datawarehourse/real-time-query/ga_add/
rm -rf ./realtime/ga_add/${yes_bef2_day}

echo "[ga data prepare End!]"

unset AWS_ACCESS_KEY_ID
unset AWS_SECRET_ACCESS_KEY

echo "[ga data prepare End!]"

#wait for table dsp_profile_total
#check_await "${DMP_ADN_DSP_PROFILE_TOTAL}/${yes_bef1_slack}/_SUCCESS"
#echo "[table dm_profile_total is ready!]"
check_await "${DSP_PROFILE_TOTAL}/${yes_bef1_slack}/_SUCCESS"
echo "[table dsp_profile_total is ready!]"

#wait for table ods_adn_device_total
check_await "${ADN_TOTAL_PATH}/${yes_bef1_slack}/_SUCCESS"
echo "[table ods_adn_device_total is ready!]"

check_await "${AGE_CALC_DEVICE}/${yes_bef1_slack}/_SUCCESS"
echo "[table dm_device_age is ready!]"

# check_await "${GENDER_CALC_DEVICE}/${yes_bef1_slack}/_SUCCESS"
# check_await "${GENDER_MERGE_DEVICE}/${yes_bef1_slack}/_SUCCESS"
echo "[table dm_device_gender is ready!]"

GET_GENDER_DATE=$(get_recently_date "${GENDER_MERGE_DEVICE}" "${LOG_TIME}" "_SUCCESS")


OUTPUT_PATH="${ODS_DMP_USER_INFO_DAILY}_v2/${yes_bef1_day}"

hadoop fs -test -e ${OUTPUT_PATH}
if [[ $? -ne 0 ]];then
hadoop fs -mkdir -p  ${OUTPUT_PATH}
fi

# mount_partition "ods_dmp_user_info_daily" "dt='${yes_bef1_day}'" "${OUTPUT_PATH}"

# check_await ${DMP_EVENT_TAG_DAILY}/day=${yes_bef1_day}/tag_source=3s/_SUCCESS
# check_await ${DMP_EVENT_TAG_DAILY}/day=${yes_bef2_day}/tag_source=ga/_SUCCESS
check_await ${DM_DEVICE_TAG_PATH}/${yes_bef1_slack}/dc/dsp_req/_SUCCESS
check_await ${DM_DEVICE_TAG_PATH}/${yes_bef1_slack}/manual/3s/_SUCCESS
check_await ${DM_DEVICE_TAG_PATH}/${yes_bef1_slack}/manual/adn_install/_SUCCESS
check_await ${DM_DEVICE_TAG_PATH}/${yes_bef1_slack}/manual/adn_request_other/_SUCCESS
check_await ${DM_DEVICE_TAG_PATH}/${yes_bef1_slack}/manual/adn_request_sdk/_SUCCESS
check_await ${DM_DEVICE_TAG_PATH}/${yes_bef1_slack}/manual/adn_sdk/_SUCCESS
check_await ${DM_DEVICE_TAG_PATH}/${yes_bef1_slack}/manual/clever/_SUCCESS
check_await ${DM_DEVICE_TAG_PATH}/${yes_bef1_slack}/manual/dsp_req/_SUCCESS


hadoop fs -rmr ${OUTPUT_PATH}

# --conf spark.memory.offHeap.enabled=true \
# --conf spark.memory.offHeap.size=10737418240 \
# --conf spark.sql.adaptive.enabled=true \
# --conf spark.sql.adaptive.advisoryPartitionSizeInBytes=536870912 \
spark-submit --class  mobvista.dmp.datasource.device.OdsDmpUserInfoDailyV3 \
    --name "DMP_OdsDmpUserInfoDailyV3_${date}-wangjf" \
    --conf spark.network.timeout=720s \
	--conf spark.sql.shuffle.partitions=10000 \
	--conf spark.default.parallelism=10000 \
    --conf spark.yarn.executor.memoryOverhead=2048 \
    --conf spark.sql.files.maxPartitionBytes=536870912 \
    --conf spark.kryoserializer.buffer.max=512m \
    --conf spark.kryoserializer.buffer=64m \
	--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
	--conf spark.driver.extraJavaOptions="-XX:+UseG1GC" \
    --conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
	--master yarn \
	--deploy-mode cluster \
	--executor-memory 10G \
	--driver-memory 6G \
	--executor-cores 4 \
	--num-executors 180 \
	../${JAR}  \
    -cur_day ${date} -gender_date ${GET_GENDER_DATE} -output ${OUTPUT_PATH} -coalesce 2000



if [[ $? -ne 0 ]]; then
    exit 255
fi

hadoop fs -touchz ${OUTPUT_PATH}/_SUCCESS