ods_dmp_user_info_all.sh 4.29 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
#!/bin/sh

source ../dmp_env.sh
# QUEUE_NAME="dataplatform"

# ScheduleTime=${ScheduleTime:-$1}
date=$(date +"%Y-%m-%d" -d "$ScheduleTime")

yes_bef2_day=`date -d "$ScheduleTime 2 days ago" +%Y%m%d`
yes_bef1_day=`date -d "$ScheduleTime 1 days ago" +%Y%m%d`

OUTPUT_PATH="${ODS_DMP_USER_INFO_ALL}/${yes_bef1_day}"

unmount_day=`date -d "$ScheduleTime 7 days ago" +%Y%m%d`
UNMOUNT_OUTPUT_PATH="${ODS_DMP_USER_INFO_ALL}/${unmount_day}"

# hadoop fs -test -e ${OUTPUT_PATH}
# if [ $? -ne 0 ];then
# hadoop fs -mkdir -p  ${OUTPUT_PATH}
# fi

check_await ${ODS_DMP_USER_INFO_ALL}/${yes_bef2_day}/_SUCCESS
check_await ${ODS_DMP_USER_INFO_DAILY}/${yes_bef1_day}/_SUCCESS

hadoop fs -rmr ${OUTPUT_PATH}

date_path=$(date +%Y/%m/%d -d "-1 day $ScheduleTime")
INPUT_PUBLISH_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dev/realtime_service_device/${date_path}/*/*/*"

## hive_cmd "
## use dwh;
## set mapreduce.map.memory.mb=2560;
## et mapreduce.map.java.opts=-Xmx2048m;
## et mapreduce.reduce.memory.mb=2560;
## et mapreduce.reduce.java.opts=-Xmx2048m;
## insert overwrite table dwh.ods_dmp_user_info_all partition(dt='${yes_bef1_day}')
## select coalesce(b.dev_id,a.dev_id) as dev_id,
## coalesce(b.dev_id_md5,a.dev_id_md5) as dev_id_md5,
## coalesce(b.dev_type,a.dev_type) as dev_type,
## coalesce(b.platform,a.platform) as platform,
## coalesce(b.install,a.install) as install,
## coalesce(b.interest,a.interest) as interest,
## coalesce(b.model,a.model) as model,
## coalesce(b.country,a.country) as country,
## coalesce(b.osversion,a.osversion) as osversion,
## coalesce(b.age,a.age) as age,
## coalesce(b.gender,a.gender) as gender,
## coalesce(b.behavior,a.behavior) as behavior,
## coalesce(b.update_date,a.update_date) as update_date
## from (select * from dwh.ods_dmp_user_info_all where dt='${yes_bef2_day}') a
## full join  (select * from dwh.ods_dmp_user_info_daily where dt='${yes_bef1_day}' )  b
## on a.dev_id=b.dev_id;
## "


# hadoop fs -rmr ${OUTPUT_PATH}

#  hive_cmd "
#  set hive.cli.print.header=false;
#  set hive.optimize.index.filter=true;
#  set mapreduce.task.io.sort.mb=512;
#  set mapreduce.map.speculative=true;
#  set mapreduce.reduce.speculative=true;
#
#  use dwh;
#  set mapreduce.map.memory.mb=2560;
#  set mapreduce.map.java.opts=-Xmx2048m;
#  set mapreduce.reduce.memory.mb=2560;
#  set mapreduce.reduce.java.opts=-Xmx2048m;
#  insert overwrite table dwh.ods_dmp_user_info_all partition(dt='${yes_bef1_day}')
#  select coalesce(b.dev_id,a.dev_id) as dev_id,
#  coalesce(b.dev_id_md5,a.dev_id_md5) as dev_id_md5,
#  coalesce(b.dev_type,a.dev_type) as dev_type,
#  coalesce(b.platform,a.platform) as platform,
#  coalesce(b.install,a.install) as install,
#  coalesce(b.interest,a.interest) as interest,
#  coalesce(b.model,a.model) as model,
#  coalesce(b.country,a.country) as country,
#  coalesce(b.osversion,a.osversion) as osversion,
#  coalesce(b.age,a.age) as age,
#  coalesce(b.gender,a.gender) as gender,
#  coalesce(b.behavior,a.behavior) as behavior,
#  coalesce(b.update_date,a.update_date) as update_date
#  from (select * from dwh.ods_dmp_user_info_all where dt='${yes_bef2_day}') a
#  full join  (select * from dwh.ods_dmp_user_info_daily where dt='${yes_bef1_day}' )  b
#  on a.dev_id=b.dev_id;
#  "

# --conf spark.memory.offHeap.enabled=true \
# --conf spark.memory.offHeap.size=6442450944 \
# --conf spark.sql.adaptive.enabled=true \
# --conf spark.sql.adaptive.advisoryPartitionSizeInBytes=268435456 \

spark-submit --class  mobvista.dmp.datasource.device.OdsDmpUserInfoAll \
    --name "OdsDmpUserInfoAll.${yes_bef1_day}" \
    --conf spark.sql.shuffle.partitions=20000 \
	  --conf spark.default.parallelism=2000 \
    --conf spark.sql.files.maxPartitionBytes=268435456 \
    --conf spark.sql.adaptive.enabled=true \
    --conf spark.sql.adaptive.advisoryPartitionSizeInBytes=268435456 \
    --master yarn \
	  --deploy-mode cluster \
	  --executor-memory 10G \
	  --driver-memory 6G \
	  --executor-cores 3 \
	  --num-executors 300 \
    ../${JAR}  \
    -cur_day ${date} -output ${OUTPUT_PATH} -coalesce 5000 -input ${INPUT_PUBLISH_PATH}

if [[ $? -ne 0 ]]; then
    exit 255
fi

mount_partition "ods_dmp_user_info_all" "dt='${yes_bef1_day}'" "${OUTPUT_PATH}"
if [[ $? -ne 0 ]]; then
    exit 255
fi
# hadoop fs -touchz ${OUTPUT_PATH}/_SUCCESS

unmount_partition "ods_dmp_user_info_all" "dt='${unmount_day}'" "${UNMOUNT_OUTPUT_PATH}"