rtdmp_request.sh 3.95 KB
#!/bin/bash

source ../dmp_env.sh

business=$1

hh="0"
today=${ScheduleTime}
if [[ ${business} = 'ali_activation' ]]; then
  date=$(date +"%Y%m%d" -d "-2 day $today")
  date_path=$(date +%Y/%m/%d -d "-2 day $today")
  partition=1000
  executors=20
  coalesce=200
elif [[ ${business} = 'dsp_req' ]]; then
  date=$(date +"%Y%m%d" -d "-1 day $today")
  date_path=$(date +%Y/%m/%d -d "-1 day $today")
  partition=2000
  executors=100
  coalesce=100
elif [[ ${business} = 'btop' ]]; then
  date=$(date +"%Y%m%d" -d "-1 day $today")
  date_path=$(date +%Y/%m/%d -d "-1 day $today")
  partition=20
  executors=20
  coalesce=40
elif [[ ${business} = 'uc_activation' ]]; then
  date=$(date +"%Y%m%d" -d "0 day $today")
  date_path=$(date +%Y/%m/%d -d "0 day $today")
  partition=100
  executors=20
  coalesce=40
elif [[ ${business} = 'iqiyi_activation' ]]; then
  date=$(date +"%Y%m%d" -d "0 day $today")
  date_path=$(date +%Y/%m/%d -d "0 day $today")
  partition=100
  executors=10
  coalesce=40
elif [[ ${business} = 'alipay_activation' ]]; then
  date=$(date +"%Y%m%d" -d "1 day $today")
  let hour=$(date +"%H" -d "1 day $today")
  hh=$(expr $hour / 6 + 1)
  date_path=$(date +%Y/%m/%d/0${hh} -d "1 day $today")
  partition=100
  executors=10
  coalesce=40
elif [[ ${business} = 'alipay_acquisition' ]]; then
  date=$(date +"%Y%m%d" -d "1 day $today")
  let hour=$(date +"%H" -d "1 day $today")
  hh=$(expr $hour / 6 + 1)
  date_path=$(date +%Y/%m/%d/0${hh} -d "1 day $today")
  partition=100
  executors=10
  coalesce=40
elif [[ ${business} = 'youku_acquisition' ]]; then
  date=$(date +"%Y%m%d" -d "0 day $today")
  date_path=$(date +%Y/%m/%d -d "0 day $today")
  partition=100
  executors=10
  coalesce=40
elif [[ ${business} = 'tencent' ]]; then
  date=$(date +"%Y%m%d" -d "-1 day $today")
  date_path=$(date +%Y/%m/%d -d "-1 day $today")
  partition=1000
  executors=20
  coalesce=100
elif [[ ${business} = 'other' ]]; then
  date=$(date +"%Y%m%d" -d "-2 day $today")
  date_path=$(date +%Y/%m/%d -d "-2 day $today")
  partition=1000
  executors=20
  coalesce=40
fi

if [[ ${business} = 'alipay_activation' ]]; then
  INPUT="${ALIPAY_ACTIVATION_DAILY_PATH}/$date_path"
  table="etl_alipay_activation_daily"
elif [[ ${business} = 'alipay_acquisition' ]]; then
  INPUT="${ALIPAY_ACQUISITION_DAILY_PATH}/$date_path"
  table="etl_alipay_acquisition_daily"
elif [[ ${business} = 'tencent' ]]; then
  INPUT="${ETL_COM_TENCENT_NEWS_DAILY}/$date_path"
  table="etl_com_tencent_news_daily"
else
  INPUT="${DM_INSTALL_LIST}_v2/$date_path/${business}"
  table="dm_install_list_v2"
fi
check_await ${INPUT}/_SUCCESS

OUTPUT="s3://mob-emr-test/dataplatform/rtdmp_request/${date_path}/${business}"

spark-submit --class mobvista.dmp.datasource.rtdmp.RTDmpRequest \
  --name "RTDmpRequest.${date}.${business}" \
  --conf spark.sql.shuffle.partitions=${partition} \
  --conf spark.default.parallelism=${partition} \
  --conf spark.kryoserializer.buffer.max=256m \
  --conf spark.sql.adaptive.enabled=true \
  --conf spark.sql.adaptive.advisoryPartitionSizeInBytes=268435456 \
  --master yarn --deploy-mode cluster --executor-memory 10g --driver-memory 4g --executor-cores 3 --num-executors ${executors} \
  ../${JAR} -date "${date}" -hh "0${hh}" -output ${OUTPUT} -coalesce ${coalesce} -business ${business} -table ${table}

if [[ $? -ne 0 ]]; then
  exit 255
fi

# 因 com.taobao.notforactivation 人群包量级过大,且业务侧暂未使用,影响计算性能,故暂时删除
if [[ ${business} = 'ali_activation' ]]; then
  RM_OUTPUT="s3://mob-emr-test/dataplatform/rtdmp_request/${date_path}/${business}/com.taobao.notforactivation"
  if hadoop fs -ls "$RM_OUTPUT" >/dev/null 2>&1; then
    hadoop dfs -rm -r ${RM_OUTPUT}/*
  fi
fi

hadoop dfs -touchz ${OUTPUT}/_OK

if [[ $? -ne 0 ]]; then
  exit 255
fi

expire_date_path=$(date +%Y/%m/%d -d "-365 day $today")
EXPIRE_OUTPUT_PATH="s3://mob-emr-test/dataplatform/rtdmp_request/${expire_date_path}"
if hadoop fs -ls "$EXPIRE_OUTPUT_PATH" >/dev/null 2>&1; then
  hadoop dfs -rm -r ${EXPIRE_OUTPUT_PATH}
fi