Commit a355902f by wang-jinfeng

fix dmp

parent 381415e2
......@@ -18,8 +18,7 @@ host="ip-172-31-20-35.ec2.internal"
cluster="cluster_1st"
database="dwh"
table="etl_baichuan_daily"
# --conf spark.driver.extraJavaOptions="-XX:+UseG1GC" \
# --conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
spark-submit --class mobvista.dmp.datasource.baichuan.BaiChuanJob \
--name "mobvista.dmp.datasource.baichuan.BaiChuanJob_wangjf_${LOG_TIME}" \
--conf spark.sql.shuffle.partitions=1000 \
......@@ -27,7 +26,7 @@ spark-submit --class mobvista.dmp.datasource.baichuan.BaiChuanJob \
--conf spark.kryoserializer.buffer.max=256m \
--conf spark.sql.files.maxPartitionBytes=268435456 \
--files ${HIVE_SITE_PATH} \
--master yarn --deploy-mode cluster --executor-memory 4g --driver-memory 4g --executor-cores 2 --num-executors 10 \
--master yarn --deploy-mode cluster --executor-memory 6g --driver-memory 6g --executor-cores 2 --num-executors 20 \
../${JAR} -date ${LOG_TIME} -host ${host} -cluster ${cluster} -database ${database} -table ${table}
if [[ $? -ne 0 ]];then
......
type=command
dependencies=ali_exclude_ios_ck
dependencies=ali_ck
command=sh -x ali_request_ios.sh
\ No newline at end of file
......@@ -17,8 +17,6 @@ table="alipay_lahuo_daily"
spark-submit --class mobvista.dmp.datasource.taobao.AlipayImeiLaHuoCK \
--name "AlipayImeiLaHuoCK.${LOG_TIME}" \
--conf spark.network.timeout=720s \
--conf spark.executor.heartbeatInterval=300s \
--conf spark.sql.shuffle.partitions=100 \
--conf spark.default.parallelism=100 \
--conf spark.kryoserializer.buffer.max=256m \
......
......@@ -17,8 +17,6 @@ table="alipay_lahuo_daily"
spark-submit --class mobvista.dmp.datasource.taobao.AlipayImeiLaHuoCK \
--name "AlipayImeiLaHuoCK.${LOG_TIME}" \
--conf spark.network.timeout=720s \
--conf spark.executor.heartbeatInterval=300s \
--conf spark.sql.shuffle.partitions=100 \
--conf spark.default.parallelism=100 \
--conf spark.kryoserializer.buffer.max=256m \
......
......@@ -17,8 +17,6 @@ table="alipay_lahuo_daily"
spark-submit --class mobvista.dmp.datasource.taobao.AlipayImeiLaHuoCK \
--name "AlipayImeiLaHuoCK.${LOG_TIME}" \
--conf spark.network.timeout=720s \
--conf spark.executor.heartbeatInterval=300s \
--conf spark.sql.shuffle.partitions=100 \
--conf spark.default.parallelism=100 \
--conf spark.kryoserializer.buffer.max=256m \
......
......@@ -17,8 +17,6 @@ table="alipay_lahuo_daily"
spark-submit --class mobvista.dmp.datasource.taobao.AlipayImeiLaHuoCK \
--name "AlipayImeiLaHuoCK.${LOG_TIME}" \
--conf spark.network.timeout=720s \
--conf spark.executor.heartbeatInterval=300s \
--conf spark.sql.shuffle.partitions=100 \
--conf spark.default.parallelism=100 \
--conf spark.kryoserializer.buffer.max=256m \
......
......@@ -14,8 +14,6 @@ hadoop fs -rm -r "${IMEIMD5_OUTPUT_PATH}"
spark-submit --class mobvista.dmp.datasource.taobao.AlipayLaHuoDF \
--name "AlipayLaHuoDF.${LOG_TIME}" \
--conf spark.network.timeout=720s \
--conf spark.executor.heartbeatInterval=300s \
--conf spark.sql.shuffle.partitions=1000 \
--conf spark.default.parallelism=1000 \
--conf spark.kryoserializer.buffer.max=256m \
......
......@@ -14,8 +14,6 @@ hadoop fs -rm -r "${IMEIMD5_OUTPUT_PATH}"
spark-submit --class mobvista.dmp.datasource.taobao.AlipayLaHuoDF \
--name "AlipayLaHuoDF.${LOG_TIME}" \
--conf spark.network.timeout=720s \
--conf spark.executor.heartbeatInterval=300s \
--conf spark.sql.shuffle.partitions=1000 \
--conf spark.default.parallelism=1000 \
--conf spark.kryoserializer.buffer.max=256m \
......
......@@ -14,8 +14,6 @@ hadoop fs -rm -r "${IMEIMD5_OUTPUT_PATH}"
spark-submit --class mobvista.dmp.datasource.taobao.AlipayLaHuoDF \
--name "AlipayLaHuoDF.${LOG_TIME}" \
--conf spark.network.timeout=720s \
--conf spark.executor.heartbeatInterval=300s \
--conf spark.sql.shuffle.partitions=1000 \
--conf spark.default.parallelism=1000 \
--conf spark.kryoserializer.buffer.max=256m \
......
......@@ -3,61 +3,41 @@
source ../../dmp_env.sh
LOG_TIME=$(date -d "$ScheduleTime" +"%Y-%m-%d")
if [[ ! -d "/home/hadoop/wangjf" ]]; then
mkdir /home/hadoop/wangjf
cd /home/hadoop/wangjf/
hdfs dfs -get s3://mob-emr-test/wangjf/data/pem/dataplatform_cn.pem .
chmod 600 dataplatform_cn.pem
fi
fors=400
concurrent=10000
java -cp ../../${JAR} mobvista.dmp.datasource.ali.UCRequest ${fors} ${concurrent} ${LOG_TIME} "IMEI_MD5"
#
#
##!/usr/bin/env bash
## # # # # # # # # # # # # # # # # # # # # #
## @author : jiangfan
## @date : 2021-01-19 18:00:51
## # # # # # # # # # # # # # # # # # # # # #
#
#source ../../dmp_env.sh
#
#LOG_TIME=$(date -d "$ScheduleTime" +"%Y-%m-%d")
#
## fors=100
#
## concurrent=10000
#
## java -cp /root/workspace/DMP-1.0.3-jar-with-dependencies.jar mobvista.dmp.datasource.ali.UCRequest ${fors} ${concurrent} ${LOG_TIME} "IMEI_MD5"
#
#if [[ ! -d "/home/hadoop/jiangfan" ]]; then
# mkdir /home/hadoop/jiangfan
#
# cd /home/hadoop/jiangfan/
#
# hdfs dfs -get s3://mob-emr-test/wangjf/data/pem/dataplatform_cn.pem .
#
# chmod 600 dataplatform_cn.pem
#fi
#
#fors=250
#concurrent=10000
#device_type="IMEI_MD5"
#
#ssh -o "StrictHostKeyChecking no" -i /home/hadoop/jiangfan/dataplatform_cn.pem -l root 182.92.177.185 "/root/jiangfan/uc_lahuo/exec_uc_request.sh ${fors} ${concurrent} ${LOG_TIME} ${device_type}"
#
#
#if [[ $? -ne 0 ]];then
# exit 255
#fi
#
#echo "kill java process begin"
#
#sleep 300
#
#shell=" -cp /root/jiangfan/uc_lahuo/DMP-1.0.3-jar-with-dependencies.jar mobvista.dmp.datasource.ali.UCRequest ${fors} ${concurrent} ${LOG_TIME} ${device_type}"
#
#ssh -o "StrictHostKeyChecking no" -i /home/hadoop/jiangfan/dataplatform_cn.pem -l root 182.92.177.185 "sh -x /root/jiangfan/uc_lahuo/check_process.sh '${shell}'"
#
#if [[ $? -ne 0 ]];then
# exit 255
#fi
#
device_type="IMEI_MD5"
shell=" -cp /root/workspace/DMP-1.0.3-jar-with-dependencies.jar mobvista.dmp.datasource.ali.UCRequest ${fors} ${concurrent} ${LOG_TIME} ${device_type}"
ssh -o "StrictHostKeyChecking no" -i /home/hadoop/wangjf/dataplatform_cn.pem -l root 182.92.177.185 "sh -x /root/workspace/check_process.sh '${shell}'"
if [[ $? -ne 0 ]]; then
exit 255
fi
ssh -o "StrictHostKeyChecking no" -i /home/hadoop/wangjf/dataplatform_cn.pem -l root 182.92.177.185 "/root/workspace/exec_uc.sh ${fors} ${concurrent} ${LOG_TIME} ${device_type} >/root/workspace/exec_uc_imei.log &"
if [[ $? -ne 0 ]]; then
exit 255
fi
sleep $((fors * 40))
ssh -o "StrictHostKeyChecking no" -i /home/hadoop/wangjf/dataplatform_cn.pem -l root 182.92.177.185 "sh -x /root/workspace/check_process.sh '${shell}'"
if [[ $? -ne 0 ]]; then
exit 255
fi
\ No newline at end of file
......@@ -3,8 +3,41 @@
source ../../dmp_env.sh
LOG_TIME=$(date -d "$ScheduleTime" +"%Y-%m-%d")
fors=51
if [[ ! -d "/home/hadoop/wangjf" ]]; then
mkdir /home/hadoop/wangjf
cd /home/hadoop/wangjf/
hdfs dfs -get s3://mob-emr-test/wangjf/data/pem/dataplatform_cn.pem .
chmod 600 dataplatform_cn.pem
fi
fors=100
concurrent=10000
java -cp ../../${JAR} mobvista.dmp.datasource.ali.UCRequest ${fors} ${concurrent} ${LOG_TIME} "OAID_MD5"
device_type="OAID_MD5"
shell=" -cp /root/workspace/DMP-1.0.3-jar-with-dependencies.jar mobvista.dmp.datasource.ali.UCRequest ${fors} ${concurrent} ${LOG_TIME} ${device_type}"
ssh -o "StrictHostKeyChecking no" -i /home/hadoop/wangjf/dataplatform_cn.pem -l root 182.92.177.185 "sh -x /root/workspace/check_process.sh '${shell}'"
if [[ $? -ne 0 ]]; then
exit 255
fi
ssh -o "StrictHostKeyChecking no" -i /home/hadoop/wangjf/dataplatform_cn.pem -l root 182.92.177.185 "/root/workspace/exec_uc.sh ${fors} ${concurrent} ${LOG_TIME} ${device_type} >/root/workspace/exec_uc_oaid.log &"
if [[ $? -ne 0 ]]; then
exit 255
fi
sleep $((fors * 40))
ssh -o "StrictHostKeyChecking no" -i /home/hadoop/wangjf/dataplatform_cn.pem -l root 182.92.177.185 "sh -x /root/workspace/check_process.sh '${shell}'"
if [[ $? -ne 0 ]]; then
exit 255
fi
\ No newline at end of file
#!/usr/bin/env bash
source ../../dmp_env.sh
source ././../../ga_rawdata_analysis/common/tools.sh
dt_today=$(date -d "$ScheduleTime" +"%Y%m%d")
dt_oneday_ago=$(date -d "$ScheduleTime 1 days ago" +"%Y%m%d")
dt_slash_today=$(date -d "$ScheduleTime" +"%Y/%m/%d")
update=$(date -d "$ScheduleTime" +"%Y-%m-%d")
dt_slash_oneday=$(date -d "$ScheduleTime 1 days ago" +"%Y/%m/%d")
expire_date=$(date -d "$ScheduleTime 61 days ago" +"%Y%m%d")
expire_date_path=$(date -d "$ScheduleTime 61 days ago" +"%Y/%m/%d")
check_await "${DM_INSTALL_LIST}_v2/${dt_slash_oneday}/dsp_req/_SUCCESS"
DMP_INSTALL_LIST_OUTPUT_PATH="${DM_INSTALL_LIST}_v2/${dt_slash_today}/uc_activation"
EXPIRE_OUTPUT_PATH="${UC_LAHUO_TMP_DAILY_TO_S3}/${expire_date_path}/uc_activation_other_data"
# OUTPUT_PATH01="${UC_LAHUO_TMP_DAILY_TO_S3}/${dt_slash_today}/uc_activation_other_data/4b5a58_ucbes"
# OUTPUT_PATH02="${UC_LAHUO_TMP_DAILY_TO_S3}/${dt_slash_today}/uc_activation_other_data/d3f521_ucbes"
# OUTPUT_PATH03="${UC_LAHUO_TMP_DAILY_TO_S3}/${dt_slash_today}/uc_activation_other_data/4b5a58_ucoppo"
# OUTPUT_PATH04="${UC_LAHUO_TMP_DAILY_TO_S3}/${dt_slash_today}/uc_activation_other_data/d3f521_ucoppo"
OUTPUT_PATH="${UC_LAHUO_TMP_DAILY_TO_S3}/${dt_slash_today}/uc_activation_other_data/"
hadoop fs -rm -r "${OUTPUT_PATH}"
spark-submit --class mobvista.dmp.datasource.taobao.UCOtherDataToDmpV2 \
--conf spark.network.timeout=720s \
--conf spark.default.parallelism=3000 \
--conf spark.sql.shuffle.partitions=3000 \
--conf spark.sql.broadcastTimeout=1200 \
--conf spark.sql.autoBroadcastJoinThreshold=31457280 \
--files ${HIVE_SITE_PATH} \
--master yarn --deploy-mode cluster --executor-memory 10g --driver-memory 6g --executor-cores 5 --num-executors 120 \
../../${JAR} \
-output ${OUTPUT_PATH} \
-dt_today ${dt_today} -dt_oneday_ago ${dt_oneday_ago} -update ${update}
if [ $? -ne 0 ]; then
exit 255
fi
mount_partition "uc_lahuo_tmp_daily_to_s3" "dt='${dt_today}', business='uc_activation_other_data'" "${OUTPUT_PATH}"
unmount_partition "uc_lahuo_tmp_daily_to_s3" "dt='${expire_date}', business='uc_activation_other_data'" "${EXPIRE_OUTPUT_PATH}"
HIVE_CMD=$(hive_func)
$HIVE_CMD -v -hivevar dt_today ${dt_today} -f uc_other_data_to_dmp.sql
if [ $? -ne 0 ]; then
exit 255
fi
hadoop fs -touchz ${DMP_INSTALL_LIST_OUTPUT_PATH}/_SUCCESS
......@@ -9,16 +9,10 @@ source ../../dmp_env.sh
LOG_TIME=$(date -d "$ScheduleTime" +"%Y-%m-%d")
# fors=100
if [[ ! -d "/home/hadoop/wangjf" ]]; then
mkdir /home/hadoop/wangjf
# concurrent=10000
# java -cp /root/jiangfan/youku_laxin/DMP-1.0.3-jar-with-dependencies.jar mobvista.dmp.datasource.ali.YOUKURequest ${fors} ${concurrent} ${LOG_TIME} "IMEI_MD5"
if [[ ! -d "/home/hadoop/jiangfan" ]]; then
mkdir /home/hadoop/jiangfan
cd /home/hadoop/jiangfan/
cd /home/hadoop/wangjf/
hdfs dfs -get s3://mob-emr-test/wangjf/data/pem/dataplatform_cn.pem .
......@@ -26,39 +20,29 @@ if [[ ! -d "/home/hadoop/jiangfan" ]]; then
fi
fors=2500
concurrent=10000
device_type="IMEI_MD5"
ssh -o "StrictHostKeyChecking no" -i /home/hadoop/jiangfan/dataplatform_cn.pem -l root 182.92.177.185 "/root/jiangfan/youku_laxin/exec_youku_request.sh ${fors} ${concurrent} ${LOG_TIME} ${device_type}"
shell=" -cp /root/workspace/DMP-1.0.3-jar-with-dependencies.jar mobvista.dmp.datasource.ali.YOUKURequest ${fors} ${concurrent} ${LOG_TIME} ${device_type}"
ssh -o "StrictHostKeyChecking no" -i /home/hadoop/wangjf/dataplatform_cn.pem -l root 182.92.177.185 "sh -x /root/workspace/check_process.sh '${shell}'"
if [[ $? -ne 0 ]];then
if [[ $? -ne 0 ]]; then
exit 255
fi
echo "kill java process begin"
sleep 200
ssh -o "StrictHostKeyChecking no" -i /home/hadoop/wangjf/dataplatform_cn.pem -l root 182.92.177.185 "/root/workspace/exec_youku.sh ${fors} ${concurrent} ${LOG_TIME} ${device_type} >/root/workspace/exec_youku_imei.log &"
shell=" -cp /root/jiangfan/youku_laxin/DMP-1.0.3-jar-with-dependencies.jar mobvista.dmp.datasource.ali.YOUKURequest ${fors} ${concurrent} ${LOG_TIME} ${device_type}"
ssh -o "StrictHostKeyChecking no" -i /home/hadoop/jiangfan/dataplatform_cn.pem -l root 182.92.177.185 "sh -x /root/jiangfan/youku_laxin/check_process.sh '${shell}'"
if [[ $? -ne 0 ]];then
if [[ $? -ne 0 ]]; then
exit 255
fi
sleep $((fors * 15))
ssh -o "StrictHostKeyChecking no" -i /home/hadoop/wangjf/dataplatform_cn.pem -l root 182.92.177.185 "sh -x /root/workspace/check_process.sh '${shell}'"
##!/usr/bin/env bash
#
#source ../../dmp_env.sh
#
#LOG_TIME=$(date -d "$ScheduleTime" +"%Y-%m-%d")
#fors=51
#
#concurrent=10000
#
#java -cp ../../${JAR} mobvista.dmp.datasource.ali.YOUKURequest ${fors} ${concurrent} ${LOG_TIME} "IMEI_MD5"
if [[ $? -ne 0 ]]; then
exit 255
fi
\ No newline at end of file
......@@ -27,7 +27,7 @@ spark-submit --class mobvista.dmp.datasource.taobao.YoukuTmpDataToDmp \
--conf spark.sql.autoBroadcastJoinThreshold=31457280 \
--files ${HIVE_SITE_PATH} \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 4g --executor-cores 4 --num-executors 40 \
../../${JAR} -Input ${INPUT_PATH} -Output ${OUTPUT_PATH} \
../../${JAR} -Input "${INPUT_PATH}/*/*" -Output ${OUTPUT_PATH} \
-update ${update}
......
......@@ -15,8 +15,6 @@ hadoop fs -rm -r "${OAIDMD5_OUTPUT_PATH}"
spark-submit --class mobvista.dmp.datasource.taobao.YOUKULaxinDF \
--name "YOUKULaxinDF.${LOG_TIME}" \
--conf spark.network.timeout=720s \
--conf spark.executor.heartbeatInterval=300s \
--conf spark.sql.shuffle.partitions=1000 \
--conf spark.default.parallelism=1000 \
--conf spark.kryoserializer.buffer.max=256m \
......
......@@ -9,16 +9,10 @@ source ../../dmp_env.sh
LOG_TIME=$(date -d "$ScheduleTime" +"%Y-%m-%d")
# fors=100
if [[ ! -d "/home/hadoop/wangjf" ]]; then
mkdir /home/hadoop/wangjf
# concurrent=10000
# java -cp /root/jiangfan/youku_laxin/DMP-1.0.3-jar-with-dependencies.jar mobvista.dmp.datasource.ali.YOUKURequest ${fors} ${concurrent} ${LOG_TIME} "IMEI_MD5"
if [[ ! -d "/home/hadoop/jiangfan" ]]; then
mkdir /home/hadoop/jiangfan
cd /home/hadoop/jiangfan/
cd /home/hadoop/wangjf/
hdfs dfs -get s3://mob-emr-test/wangjf/data/pem/dataplatform_cn.pem .
......@@ -26,26 +20,29 @@ if [[ ! -d "/home/hadoop/jiangfan" ]]; then
fi
fors=2500
concurrent=10000
device_type="OAID_MD5"
ssh -o "StrictHostKeyChecking no" -i /home/hadoop/jiangfan/dataplatform_cn.pem -l root 182.92.177.185 "/root/jiangfan/youku_laxin/exec_youku_request.sh ${fors} ${concurrent} ${LOG_TIME} ${device_type}"
shell=" -cp /root/workspace/DMP-1.0.3-jar-with-dependencies.jar mobvista.dmp.datasource.ali.YOUKURequest ${fors} ${concurrent} ${LOG_TIME} ${device_type}"
ssh -o "StrictHostKeyChecking no" -i /home/hadoop/wangjf/dataplatform_cn.pem -l root 182.92.177.185 "sh -x /root/workspace/check_process.sh '${shell}'"
if [[ $? -ne 0 ]];then
if [[ $? -ne 0 ]]; then
exit 255
fi
echo "kill java process begin"
ssh -o "StrictHostKeyChecking no" -i /home/hadoop/wangjf/dataplatform_cn.pem -l root 182.92.177.185 "/root/workspace/exec_youku.sh ${fors} ${concurrent} ${LOG_TIME} ${device_type} >/root/workspace/exec_youku_oaid.log &"
sleep 200
if [[ $? -ne 0 ]]; then
exit 255
fi
shell=" -cp /root/jiangfan/youku_laxin/DMP-1.0.3-jar-with-dependencies.jar mobvista.dmp.datasource.ali.YOUKURequest ${fors} ${concurrent} ${LOG_TIME} ${device_type}"
sleep $((fors * 15))
ssh -o "StrictHostKeyChecking no" -i /home/hadoop/jiangfan/dataplatform_cn.pem -l root 182.92.177.185 "sh -x /root/jiangfan/youku_laxin/check_process.sh '${shell}'"
ssh -o "StrictHostKeyChecking no" -i /home/hadoop/wangjf/dataplatform_cn.pem -l root 182.92.177.185 "sh -x /root/workspace/check_process.sh '${shell}'"
if [[ $? -ne 0 ]];then
if [[ $? -ne 0 ]]; then
exit 255
fi
#!/bin/bash
# # # # # # # # # # # # # # # # # # # # # #
# # # # # # # # # # # # # # # # # # # # # #
# @file : collect_package_name.sh
# @author: houying
# @date : 16-11-14
......@@ -9,7 +9,10 @@
source ../dmp_env.sh
BASE_PATH="$(cd "$(dirname $0)";pwd)"
BASE_PATH="$(
cd "$(dirname $0)"
pwd
)"
LOG_DATE=$(date -d "$ScheduleTime 1 days ago" "+%Y%m%d")
......@@ -40,11 +43,11 @@ campaign() {
and update_time>='$UPDATE'
group by package_name, platform
"
hive_cmd "use dwh;$SQL;" > ${campaign}
hive_cmd "use dwh;$SQL;" >${campaign}
hadoop fs -put ${campaign} "$PACKAGE_PATH"
}
:<<!
: <<!
install() {
check_await "$DM_INSTALL_LIST/$yes_year/$yes_month/$yes_day"
hive_cmd "
......@@ -82,19 +85,24 @@ install() {
--conf spark.driver.maxResultSize=4g \
--files ${HIVE_SITE_PATH} \
--master yarn --deploy-mode cluster --name CrawPkgsSpark --executor-memory 6g --driver-memory 4g --executor-cores 2 --num-executors 100 \
../${JAR} -pkginstallpath ${PACKAGE_INSTALL_PATH} -coalesce 10 \
../${JAR} -pkginstallpath ${PACKAGE_INSTALL_PATH} -coalesce 10 \
-yesday ${YESTERDAY}
if [ $? -ne 0 ]; then
exit 255
fi
install=$1
if [[ $? -ne 0 ]];then
if [[ $? -ne 0 ]]; then
exit 255
fi
hadoop fs -text ${PACKAGE_INSTALL_PATH}/* > ${install}
if [[ $? -ne 0 ]];then
hadoop fs -text ${PACKAGE_INSTALL_PATH}/* >${install}
if [[ $? -ne 0 ]]; then
exit 255
fi
hadoop fs -put ${install} "$PACKAGE_PATH"
if [[ $? -ne 0 ]];then
if [[ $? -ne 0 ]]; then
exit 255
fi
}
......@@ -102,23 +110,22 @@ install() {
bundle_pkg_mapping() {
path="$BUNDLE_PACKAGE_MAPPING_PATH/$year/$month/$day/"
check_await "$path/_SUCCESS"
hadoop fs -text $path/* | awk -F '\t' '{print $2"\tios"}' > bundle.data
hadoop fs -text $path/* | awk -F '\t' '{print $2"\tios"}' >bundle.data
hadoop fs -rm $PACKAGE_PATH/bundle.data
hadoop fs -put bundle.data $PACKAGE_PATH
}
hadoop fs -test -e ${PACKAGE_PATH}
if [ $? -ne 0 ];then
if [ $? -ne 0 ]; then
hadoop fs -mkdir -p ${PACKAGE_PATH}
fi
# 循环执行以上函数收集pkg_name
hadoop fs -rm ${PACKAGE_PATH}/*
for cmd in install campaign
do
for cmd in install campaign; do
${cmd} "$cmd.txt"
if [ $? -ne 0 ];then
if [ $? -ne 0 ]; then
exit 255
fi
done
......
......@@ -40,6 +40,10 @@ spark-submit --class mobvista.dmp.datasource.dsp.DspOrgLogEtlHoursDemo \
--master yarn --deploy-mode cluster --executor-memory 12g --driver-memory 6g --executor-cores 5 --num-executors 60 \
../${JAR} -yyyymmddhh $input_date -output $ETL_DSP_REQ_HOURS_PATH -outputmds $MDS_DSP_REQ_HOURS_PATH -coalesce 2000 || exit 1
if [ $? -ne 0 ];then
exit 255
fi
## mount_partition "etl_dsp_org_request_daily_hours" "dt='${dt}',hh='${hhpath}'" "$ETL_DSP_REQ_ORG_HOURS_PATH"
mount_partition "etl_dsp_request_daily_hours" "dt='${dt}',hh='${hhpath}'" "$ETL_DSP_REQ_HOURS_PATH"
......
......@@ -41,15 +41,19 @@ check_await ${FB_GENDER_PATH}/_SUCCESS
check_await ${TP_GENDER_PATH}/_SUCCESS
spark-submit --class mobvista.dmp.datasource.age_gender.MergeInstallGender \
--name "MergeInstallGender.${LOG_TIME}" \
--conf spark.sql.shuffle.partitions=5000 \
--conf spark.default.parallelism=2000 \
--conf spark.storage.memoryFraction=0.4 \
--conf spark.shuffle.memoryFraction=0.4 \
--conf spark.sql.files.maxPartitionBytes=536870912 \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=536870912 \
--files ${HIVE_SITE_PATH} \
--master yarn --deploy-mode cluster --executor-memory 10g --driver-memory 6g --executor-cores 5 --num-executors 60 \
../${JAR} -date ${LOG_TIME} \
-ga_gender_path ${GA_GENDER_PATH} -dsp_gender_path ${DSP_GENDER_PATH} -fb_gender_path ${FB_GENDER_PATH} -tp_gender_path ${TP_GENDER_PATH} -gender_output ${GENDER_OUTPUT} -parallelism 2000
\ No newline at end of file
--name "MergeInstallGender.${LOG_TIME}" \
--conf spark.sql.shuffle.partitions=5000 \
--conf spark.default.parallelism=2000 \
--conf spark.storage.memoryFraction=0.4 \
--conf spark.shuffle.memoryFraction=0.4 \
--conf spark.sql.files.maxPartitionBytes=536870912 \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=536870912 \
--files ${HIVE_SITE_PATH} \
--master yarn --deploy-mode cluster --executor-memory 10g --driver-memory 6g --executor-cores 5 --num-executors 60 \
../${JAR} -date ${LOG_TIME} \
-ga_gender_path ${GA_GENDER_PATH} -dsp_gender_path ${DSP_GENDER_PATH} -fb_gender_path ${FB_GENDER_PATH} -tp_gender_path ${TP_GENDER_PATH} -gender_output ${GENDER_OUTPUT} -parallelism 2000
if [ $? -ne 0 ]; then
exit 255
fi
......@@ -32,8 +32,8 @@ import java.util.concurrent.*;
public class YOUKURequest {
public static Logger logger = null;
static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(50, 100, 500, TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(100), new CustomizableThreadFactory("YOUKURequest-"), new ThreadPoolExecutor.CallerRunsPolicy());
static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(300, 500, 1000, TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(500), new CustomizableThreadFactory("YOUKURequest-"), new ThreadPoolExecutor.CallerRunsPolicy());
private static String dt = DateUtil.format(new Date(), "yyyy-MM-dd");
......
package mobvista.dmp.datasource.taobao
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
import org.apache.spark.sql.SaveMode
import java.net.URI
class UCOtherDataToDmpV2 extends CommonSparkJob with Serializable {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("dt_today", true, "[must] dt_today")
options.addOption("dt_oneday_ago", true, "[must] dt_oneday_ago")
options.addOption("update", true, "[must] update")
options.addOption("output", true, "[must] output")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else printOptions(commandLine)
val dt_today = commandLine.getOptionValue("dt_today")
val dt_oneday_ago = commandLine.getOptionValue("dt_oneday_ago")
val update = commandLine.getOptionValue("update")
val output = commandLine.getOptionValue("output")
val spark = MobvistaConstant.createSparkSession("UCOtherDataToDmp")
val sc = spark.sparkContext
FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
try {
val conf = spark.sparkContext.hadoopConfiguration
conf.set("mapreduce.output.compress", "true")
conf.set("mapreduce.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
conf.setBoolean("mapreduce.output.fileoutputformat.compress", true)
conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
conf.setClass("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec], classOf[CompressionCodec])
val sql =
s"""
|SELECT device_id, COLLECT_SET(package_name) install_list
| FROM
| (
| SELECT device_id, package_name
| FROM dwh.dm_install_list_v2
| WHERE dt = '${dt_today}' AND business = 'uc_activation' AND device_type = 'imeimd5'
| AND package_name IN ('com.uc.foractivation.4b5a58','com.uc.foractivation.d3f521')
| UNION
| SELECT device_id, package_name
| FROM dwh.dm_install_list_v2
| WHERE dt = '${dt_oneday_ago}' AND business = 'dsp_req' AND device_type = 'imeimd5'
| AND package_name IN ('com.UCMobile_bes','com.ucmobile_oppo')
| ) t
| GROUP BY device_id
|""".stripMargin
/*
val df = spark.sql(sql).persist(StorageLevel.MEMORY_AND_DISK_SER)
val rdd = df.rdd.map(r => {
val deviceId = r.getAs[String]("device_id")
val deviceType = "imeimd5"
val platform = "android"
val installList = r.getAs[mutable.WrappedArray[String]]("install_list")
if (installList.contains("com.uc.foractivation.4b5a58") && installList.contains("com.UCMobile_bes")) {
(new Text(s"$output/4b5a58_ucbes"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.4b5a58_ucbes", update)))
}
if (installList.contains("com.uc.foractivation.d3f521") && installList.contains("com.UCMobile_bes")) {
(new Text(s"$output/d3f521_ucbes"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.d3f521_ucbes", update)))
}
if (installList.contains("com.uc.foractivation.4b5a58") && installList.contains("com.ucmobile_oppo")) {
(new Text(s"$output/4b5a58_ucoppo"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.4b5a58_ucoppo", update)))
}
if (installList.contains("com.uc.foractivation.d3f521") && installList.contains("com.ucmobile_oppo")) {
(new Text(s"$output/d3f521_ucoppo"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.d3f521_ucoppo", update)))
} else {
(new Text(""), new Text(""))
}
}).filter(t => {
StringUtils.isNotBlank(t._1.toString) && StringUtils.isNotBlank(t._2.toString)
})
println(s"count -->> ${rdd.count()}")
rdd.coalesce(50)
.saveAsNewAPIHadoopFile(output, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]], conf)
*/
val df = spark.sql(sql)
df.write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output)
} finally {
spark.stop()
}
0
}
}
object UCOtherDataToDmpV2 {
def main(args: Array[String]): Unit = {
new UCOtherDataToDmpV2().run(args)
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment