Commit 677c9f77 by fan.jiang

com_btop_tiktokrv rtdmp fix bug

parent 275b71e0
...@@ -18,9 +18,12 @@ fi ...@@ -18,9 +18,12 @@ fi
OUTPUT_PATH01="${TMP_COM_BTOP_TIKTOKRV_PATH}/${dt_dash_today}/01" OUTPUT_PATH01="${TMP_COM_BTOP_TIKTOKRV_PATH}/${dt_dash_today}/01"
OUTPUT_PATH02="${TMP_COM_BTOP_TIKTOKRV_PATH}/${dt_dash_today}/02" OUTPUT_PATH02="${TMP_COM_BTOP_TIKTOKRV_PATH}/${dt_dash_today}/02"
OUTPUT_PATH03="${RTDMP_TMP_PACKAGE_NAME_PATH}/com_btop_tiktokrv/${dt_dash_today}/imei"
OUTPUT_PATH04="${RTDMP_TMP_PACKAGE_NAME_PATH}/com_btop_tiktokrv/${dt_dash_today}/oaid"
hadoop fs -rm -r "${OUTPUT_PATH01}" hadoop fs -rm -r "${OUTPUT_PATH01}"
hadoop fs -rm -r "${OUTPUT_PATH02}" hadoop fs -rm -r "${OUTPUT_PATH02}"
hadoop fs -rm -r "${OUTPUT_PATH03}"
hadoop fs -rm -r "${OUTPUT_PATH04}"
spark-submit --class mobvista.dmp.datasource.dm.BtopTiktokrv \ spark-submit --class mobvista.dmp.datasource.dm.BtopTiktokrv \
--conf spark.yarn.executor.memoryOverhead=2048 \ --conf spark.yarn.executor.memoryOverhead=2048 \
...@@ -29,7 +32,7 @@ spark-submit --class mobvista.dmp.datasource.dm.BtopTiktokrv \ ...@@ -29,7 +32,7 @@ spark-submit --class mobvista.dmp.datasource.dm.BtopTiktokrv \
--conf spark.network.timeout=720s \ --conf spark.network.timeout=720s \
--files ${HIVE_SITE_PATH} \ --files ${HIVE_SITE_PATH} \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 6 --num-executors 70 \ --master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 6 --num-executors 70 \
../../${JAR} -begin_day ${begin_day} -begin_day02 ${begin_day02} -end_day ${end_day} -output01 ${OUTPUT_PATH01} -output02 ${OUTPUT_PATH02} -coalesce 200 ../../${JAR} -begin_day ${begin_day} -begin_day02 ${begin_day02} -end_day ${end_day} -output01 ${OUTPUT_PATH01} -output02 ${OUTPUT_PATH02} -output03 ${OUTPUT_PATH03} -output04 ${OUTPUT_PATH04} -coalesce 200
if [[ $? -ne 0 ]]; then if [[ $? -ne 0 ]]; then
......
...@@ -18,9 +18,10 @@ fi ...@@ -18,9 +18,10 @@ fi
OUTPUT_PATH01="${TMP_COM_BTOP_TIKTOKRV_GAID_PATH}/${dt_dash_today}/01" OUTPUT_PATH01="${TMP_COM_BTOP_TIKTOKRV_GAID_PATH}/${dt_dash_today}/01"
OUTPUT_PATH02="${TMP_COM_BTOP_TIKTOKRV_GAID_PATH}/${dt_dash_today}/02" OUTPUT_PATH02="${TMP_COM_BTOP_TIKTOKRV_GAID_PATH}/${dt_dash_today}/02"
OUTPUT_PATH03="${RTDMP_TMP_PACKAGE_NAME_PATH}/com_btop_tiktokrv/${dt_dash_today}/gaid"
hadoop fs -rm -r "${OUTPUT_PATH01}" hadoop fs -rm -r "${OUTPUT_PATH01}"
hadoop fs -rm -r "${OUTPUT_PATH02}" hadoop fs -rm -r "${OUTPUT_PATH02}"
hadoop fs -rm -r "${OUTPUT_PATH03}"
spark-submit --class mobvista.dmp.datasource.dm.BtopTiktokrvGaid \ spark-submit --class mobvista.dmp.datasource.dm.BtopTiktokrvGaid \
--conf spark.yarn.executor.memoryOverhead=2048 \ --conf spark.yarn.executor.memoryOverhead=2048 \
...@@ -30,7 +31,7 @@ spark-submit --class mobvista.dmp.datasource.dm.BtopTiktokrvGaid \ ...@@ -30,7 +31,7 @@ spark-submit --class mobvista.dmp.datasource.dm.BtopTiktokrvGaid \
--conf spark.network.timeout=720s \ --conf spark.network.timeout=720s \
--files ${HIVE_SITE_PATH} \ --files ${HIVE_SITE_PATH} \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 6 --num-executors 70 \ --master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 6 --num-executors 70 \
../../${JAR} -begin_day ${begin_day} -begin_day02 ${begin_day02} -end_day ${end_day} -output01 ${OUTPUT_PATH01} -output02 ${OUTPUT_PATH02} -coalesce 200 ../../${JAR} -begin_day ${begin_day} -begin_day02 ${begin_day02} -end_day ${end_day} -output01 ${OUTPUT_PATH01} -output02 ${OUTPUT_PATH02} -output03 ${OUTPUT_PATH03} -coalesce 200
if [[ $? -ne 0 ]]; then if [[ $? -ne 0 ]]; then
......
...@@ -17,6 +17,8 @@ class BtopTiktokrv extends CommonSparkJob with Serializable { ...@@ -17,6 +17,8 @@ class BtopTiktokrv extends CommonSparkJob with Serializable {
options.addOption("coalesce", true, "[must] coalesce") options.addOption("coalesce", true, "[must] coalesce")
options.addOption("output01", true, "[must] output01") options.addOption("output01", true, "[must] output01")
options.addOption("output02", true, "[must] output02") options.addOption("output02", true, "[must] output02")
options.addOption("output03", true, "[must] output03")
options.addOption("output04", true, "[must] output04")
options.addOption("begin_day", true, "[must] begin_day") options.addOption("begin_day", true, "[must] begin_day")
options.addOption("begin_day02", true, "[must] begin_day02") options.addOption("begin_day02", true, "[must] begin_day02")
options.addOption("end_day", true, "[must] end_day") options.addOption("end_day", true, "[must] end_day")
...@@ -33,6 +35,8 @@ class BtopTiktokrv extends CommonSparkJob with Serializable { ...@@ -33,6 +35,8 @@ class BtopTiktokrv extends CommonSparkJob with Serializable {
val coalesce = commandLine.getOptionValue("coalesce") val coalesce = commandLine.getOptionValue("coalesce")
val output01 = commandLine.getOptionValue("output01") val output01 = commandLine.getOptionValue("output01")
val output02 = commandLine.getOptionValue("output02") val output02 = commandLine.getOptionValue("output02")
val output03 = commandLine.getOptionValue("output03")
val output04 = commandLine.getOptionValue("output04")
val begin_day = commandLine.getOptionValue("begin_day") val begin_day = commandLine.getOptionValue("begin_day")
val begin_day02 = commandLine.getOptionValue("begin_day02") val begin_day02 = commandLine.getOptionValue("begin_day02")
val end_day = commandLine.getOptionValue("end_day") val end_day = commandLine.getOptionValue("end_day")
...@@ -52,6 +56,8 @@ class BtopTiktokrv extends CommonSparkJob with Serializable { ...@@ -52,6 +56,8 @@ class BtopTiktokrv extends CommonSparkJob with Serializable {
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output03), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output04), true)
try { try {
// val sql1= // val sql1=
...@@ -137,6 +143,9 @@ class BtopTiktokrv extends CommonSparkJob with Serializable { ...@@ -137,6 +143,9 @@ class BtopTiktokrv extends CommonSparkJob with Serializable {
imei_30_Days_high_df.union(oaid_high_df).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01) imei_30_Days_high_df.union(oaid_high_df).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
imei_30_Days_high_df_with_country.union(oaid_high_df_with_country).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02) imei_30_Days_high_df_with_country.union(oaid_high_df_with_country).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)
df03.filter(df03.col("num")>imei_30_Days_Num).select("imei").coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output03)
df04.filter(df04.col("num")>oaidNum).select("oaid").coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output04)
} finally { } finally {
spark.stop() spark.stop()
} }
......
...@@ -20,6 +20,7 @@ class BtopTiktokrvGaid extends CommonSparkJob with Serializable { ...@@ -20,6 +20,7 @@ class BtopTiktokrvGaid extends CommonSparkJob with Serializable {
options.addOption("coalesce", true, "[must] coalesce") options.addOption("coalesce", true, "[must] coalesce")
options.addOption("output01", true, "[must] output01") options.addOption("output01", true, "[must] output01")
options.addOption("output02", true, "[must] output02") options.addOption("output02", true, "[must] output02")
options.addOption("output03", true, "[must] output03")
options.addOption("begin_day", true, "[must] begin_day") options.addOption("begin_day", true, "[must] begin_day")
options.addOption("begin_day02", true, "[must] begin_day02") options.addOption("begin_day02", true, "[must] begin_day02")
options.addOption("end_day", true, "[must] end_day") options.addOption("end_day", true, "[must] end_day")
...@@ -36,6 +37,7 @@ class BtopTiktokrvGaid extends CommonSparkJob with Serializable { ...@@ -36,6 +37,7 @@ class BtopTiktokrvGaid extends CommonSparkJob with Serializable {
val coalesce = commandLine.getOptionValue("coalesce") val coalesce = commandLine.getOptionValue("coalesce")
val output01 = commandLine.getOptionValue("output01") val output01 = commandLine.getOptionValue("output01")
val output02 = commandLine.getOptionValue("output02") val output02 = commandLine.getOptionValue("output02")
val output03 = commandLine.getOptionValue("output03")
val begin_day = commandLine.getOptionValue("begin_day") val begin_day = commandLine.getOptionValue("begin_day")
val begin_day02 = commandLine.getOptionValue("begin_day02") val begin_day02 = commandLine.getOptionValue("begin_day02")
val end_day = commandLine.getOptionValue("end_day") val end_day = commandLine.getOptionValue("end_day")
...@@ -55,6 +57,7 @@ class BtopTiktokrvGaid extends CommonSparkJob with Serializable { ...@@ -55,6 +57,7 @@ class BtopTiktokrvGaid extends CommonSparkJob with Serializable {
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output03), true)
try { try {
val sql5= val sql5=
...@@ -77,6 +80,8 @@ class BtopTiktokrvGaid extends CommonSparkJob with Serializable { ...@@ -77,6 +80,8 @@ class BtopTiktokrvGaid extends CommonSparkJob with Serializable {
gaid_high_df.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01) gaid_high_df.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
gaid_high_df_with_country.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02) gaid_high_df_with_country.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)
df05.filter(df05.col("num")>gaidNum).select("gaid").coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output03)
} finally { } finally {
spark.stop() spark.stop()
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment