Commit 3fe9507c by fan.jiang

产出人群包com.taobao.litetao_reyun com.ss.android.ugc.aweme_reyun …

产出人群包com.taobao.litetao_reyun com.ss.android.ugc.aweme_reyun com.taobao.litetao_btop com.ss.android.ugc.aweme_btop
parent ce4575ba
#!/usr/bin/env bash
# 从dm_install_list_v2表在reyun分区里的com.eg.android.AlipayGphone设备拉出来到某个路径里(天级别更新), 后续需要把这个人群包入rtdmp,命名为com.eg.android.AlipayGphone_reyun
# 获取下面这2个包名在business=btop(topon)和reyun(热云)的设备入人群包数据库 com.taobao.litetao com.ss.android.ugc.aweme
# 人群包名称可以叫 com.taobao.litetao_reyun com.ss.android.ugc.aweme_reyun com.taobao.litetao_btop com.ss.android.ugc.aweme_btop
source ../../dmp_env.sh
dt_today=$(date -d "$ScheduleTime 1 days ago" +"%Y%m%d")
dt_dash_today=$(date -d "$ScheduleTime 1 days ago" +"%Y/%m/%d")
business="reyun"
INPUT_PATH="${DM_INSTALL_LIST}_v2/${dt_dash_today}/${business}"
check_await ${INPUT_PATH}/_SUCCESS
business1="reyun"
INPUT_PATH1="${DM_INSTALL_LIST}_v2/${dt_dash_today}/${business1}"
check_await ${INPUT_PATH1}/_SUCCESS
OUTPUT_PATH="${RTDMP_COM_EG_ANDROID_ALIPAYGPHONE_REYUN_PATH}/${dt_dash_today}/"
hadoop fs -rm -r "${OUTPUT_PATH}"
business2="btop"
INPUT_PATH2="${DM_INSTALL_LIST}_v2/${dt_dash_today}/${business2}"
check_await ${INPUT_PATH2}/_SUCCESS
OUTPUT_PATH1="${RTDMP_COM_EG_ANDROID_ALIPAYGPHONE_REYUN_PATH}/${dt_dash_today}/"
OUTPUT_PATH2="${RTDMP_COM_TAOBAO_LITETAO_REYUN_PATH}/${dt_dash_today}/"
OUTPUT_PATH3="${RTDMP_COM_SS_ANDROID_UGC_AWEME_REYUN_PATH}/${dt_dash_today}/"
OUTPUT_PATH4="${RTDMP_COM_TAOBAO_LITETAO_BTOP_PATH}/${dt_dash_today}/"
OUTPUT_PATH5="${RTDMP_COM_SS_ANDROID_UGC_AWEME_BTOP_PATH}/${dt_dash_today}/"
hadoop fs -rm -r "${OUTPUT_PATH1}"
hadoop fs -rm -r "${OUTPUT_PATH2}"
hadoop fs -rm -r "${OUTPUT_PATH3}"
hadoop fs -rm -r "${OUTPUT_PATH4}"
hadoop fs -rm -r "${OUTPUT_PATH5}"
spark-submit --class mobvista.dmp.datasource.dm.ComEgAndroidAlipayGphoneReyun \
--conf spark.yarn.executor.memoryOverhead=2048 \
......@@ -21,7 +35,8 @@ spark-submit --class mobvista.dmp.datasource.dm.ComEgAndroidAlipayGphoneReyun \
--conf spark.network.timeout=720s \
--files ${HIVE_SITE_PATH} \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 6 --num-executors 20 \
../../${JAR} -dt_today ${dt_today} -output ${OUTPUT_PATH} -coalesce 200
../../${JAR} -dt_today ${dt_today} -output1 ${OUTPUT_PATH1} -output2 ${OUTPUT_PATH2} -output3 ${OUTPUT_PATH3} -output4 ${OUTPUT_PATH4} -output5 ${OUTPUT_PATH5} \
-coalesce 200
if [[ $? -ne 0 ]]; then
......
......@@ -261,6 +261,10 @@ TMP_COM_YOUKU_PHONE_WAX_NOBID_PATH="s3://mob-emr-test/dataplatform/DataWareHouse
TMP_COM_BTOP_TIKTOKRV_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com_btop_tiktokrv"
TMP_COM_BTOP_TIKTOKRV_GAID_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com_btop_tiktokrv_gaid"
RTDMP_COM_EG_ANDROID_ALIPAYGPHONE_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_eg_android_AlipayGphone_reyun"
RTDMP_COM_TAOBAO_LITETAO_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_taobao_litetao_reyun"
RTDMP_COM_SS_ANDROID_UGC_AWEME_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_ss_android_ugc_aweme_reyun"
RTDMP_COM_TAOBAO_LITETAO_BTOP_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_taobao_litetao_btop"
RTDMP_COM_SS_ANDROID_UGC_AWEME_BTOP_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_ss_android_ugc_aweme_btop"
RTDMP_NORMAL_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_normal"
RTDMP_NORMAL_COUNT_RESULT="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_normal_count_result"
......
......@@ -23,7 +23,11 @@ class ComEgAndroidAlipayGphoneReyun extends CommonSparkJob with Serializable {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("coalesce", true, "[must] coalesce")
options.addOption("output", true, "[must] output")
options.addOption("output1", true, "[must] output1")
options.addOption("output2", true, "[must] output2")
options.addOption("output3", true, "[must] output3")
options.addOption("output4", true, "[must] output4")
options.addOption("output5", true, "[must] output5")
options.addOption("dt_today", true, "[must] dt_today")
options
}
......@@ -36,7 +40,11 @@ class ComEgAndroidAlipayGphoneReyun extends CommonSparkJob with Serializable {
} else printOptions(commandLine)
val coalesce = commandLine.getOptionValue("coalesce")
val output = commandLine.getOptionValue("output")
val output1 = commandLine.getOptionValue("output1")
val output2 = commandLine.getOptionValue("output2")
val output3 = commandLine.getOptionValue("output3")
val output4 = commandLine.getOptionValue("output4")
val output5 = commandLine.getOptionValue("output5")
val dt_today = commandLine.getOptionValue("dt_today")
val spark = SparkSession.builder()
......@@ -52,16 +60,52 @@ class ComEgAndroidAlipayGphoneReyun extends CommonSparkJob with Serializable {
val sc = spark.sparkContext
import spark.implicits._
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output1), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output2), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output3), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output4), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output5), true)
try {
val sql1=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and business='reyun' and package_name='com.eg.android.AlipayGphone'
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and business='reyun' and package_name='com.eg.android.AlipayGphone' and device_type not in ('androidid','android_id')
""".stripMargin
spark.sql(sql1).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
.saveAsNewAPIHadoopFile(s"${output1}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
val sql2=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and business='reyun' and package_name='com.taobao.litetao' and device_type not in ('androidid','android_id')
""".stripMargin
spark.sql(sql2).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output2}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
val sql3=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and business='reyun' and package_name='com.ss.android.ugc.aweme' and device_type not in ('androidid','android_id')
""".stripMargin
spark.sql(sql3).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output3}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
val sql4=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and business='btop' and package_name='com.taobao.litetao' and device_type not in ('androidid','android_id')
""".stripMargin
spark.sql(sql4).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output4}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
val sql5=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and business='btop' and package_name='com.ss.android.ugc.aweme' and device_type not in ('androidid','android_id')
""".stripMargin
spark.sql(sql5).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output5}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
} finally {
spark.stop()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment