Commit c08cf707 by fan.jiang

将人群包同步计算提取出来单独写成一个作业,减少计算资源

parent d9b73faa
type=command
command=sh -x adx_packagename_synchronize.sh
\ No newline at end of file
#!/usr/bin/env bash
source ../../dmp_env.sh
dt_today=$(date -d "$ScheduleTime 1 days ago" +"%Y%m%d")
dt_slash_today=$(date -d "$ScheduleTime 1 days ago" +"%Y/%m/%d")
dt_day=$(date -d "$ScheduleTime 1 days ago" +"%Y%m%d")
dt_slash_day=$(date -d "$ScheduleTime" +"%Y/%m/%d")
check_await "${DM_INSTALL_LIST}_v2/${dt_slash_today}/dsp_req/_SUCCESS"
check_await "${DM_INSTALL_LIST}_v2/${dt_slash_today}/btop/_SUCCESS"
check_await "${DM_INSTALL_LIST}_v2/${dt_slash_day}/TO/_SUCCESS"
OUTPUT_PATH1="${RTDMP_TMP_PACKAGE_NAME_PATH}/adx_packagename_synchronize/${dt_slash_today}/"
hadoop fs -rm -r "${OUTPUT_PATH1}"
spark-submit --class mobvista.dmp.datasource.dm.AdxPackagenameSynchronize \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.default.parallelism=3000 \
--conf spark.sql.shuffle.partitions=3000 \
--conf spark.driver.maxResultSize=4g \
--conf spark.network.timeout=720s \
--files ${HIVE_SITE_PATH} \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 6 --num-executors 50 \
../../${JAR} -dt_today ${dt_today} -output1 ${OUTPUT_PATH1} \
-dt_day ${dt_day} \
-coalesce 300
if [[ $? -ne 0 ]]; then
exit 255
fi
: '
任务说明
把下面这些安装包名的写到s3路径,并提供给@梁晓鹏 Leo Liang @李林梦 Linmeng Li ,入库RTDMP。一共应该是14个人群包,入库的人群包名称后面如果有括号的话可以用括号里的人群包名称(括号内的名称比较好理解),
实际dmp中的包名是括号外的,所以代码使用括号外的人群包进行设备筛选
Business=dsp_req下:
id3876827262021090301(id387682726_bes)
id3332062892021090301(id333206289_bes)
id13403763232021090301(id1340376323_bes)
id10442830592021090301(id1044283059_bes)
com.xunmeng.pinduoduo_bes
com.ss.android.ugc.aweme_iqiyi
id11421108952021090302(id1142110895_iqiyi)
com.xunmeng.pinduoduo_oppoziyou
com.xunmeng.pinduoduo_oppoziyou_notinstall
com.xunmeng.pinduoduo_oppoziyou_hist_notinstall
com.xunmeng.pinduoduo_oppolianmeng
com.xunmeng.pinduoduo_oppolianmeng_hist1year_notinstall
com.xunmeng.pinduoduo_oppolianmeng_histhalfyear_notinstall
Business=btop&TO下
com.xunmeng.pinduoduo(com.xunmeng.pinduoduo_btop)
'
\ No newline at end of file
......@@ -3,44 +3,12 @@ source ../../dmp_env.sh
dt_today=$(date -d "$ScheduleTime 1 days ago" +"%Y%m%d")
dt_slash_today=$(date -d "$ScheduleTime 1 days ago" +"%Y/%m/%d")
dt_day=$(date -d "$ScheduleTime 1 days ago" +"%Y%m%d")
dt_slash_day=$(date -d "$ScheduleTime" +"%Y/%m/%d")
check_await "${DM_INSTALL_LIST}_v2/${dt_slash_today}/dsp_req/_SUCCESS"
check_await "${DM_INSTALL_LIST}_v2/${dt_slash_today}/btop/_SUCCESS"
check_await "${DM_INSTALL_LIST}_v2/${dt_slash_day}/TO/_SUCCESS"
OUTPUT_PATH1="${RTDMP_TMP_PACKAGE_NAME_PATH}/id1142110895/${dt_slash_today}/"
OUTPUT_PATH2="${RTDMP_TMP_PACKAGE_NAME_PATH}/id3876827262021090301/${dt_slash_today}/"
OUTPUT_PATH3="${RTDMP_TMP_PACKAGE_NAME_PATH}/id3332062892021090301/${dt_slash_today}/"
OUTPUT_PATH4="${RTDMP_TMP_PACKAGE_NAME_PATH}/id13403763232021090301/${dt_slash_today}/"
OUTPUT_PATH5="${RTDMP_TMP_PACKAGE_NAME_PATH}/id10442830592021090301/${dt_slash_today}/"
OUTPUT_PATH6="${RTDMP_TMP_PACKAGE_NAME_PATH}/com_xunmeng_pinduoduo_bes/${dt_slash_today}/"
OUTPUT_PATH7="${RTDMP_TMP_PACKAGE_NAME_PATH}/com_ss_android_ugc_aweme_iqiyi/${dt_slash_today}/"
OUTPUT_PATH8="${RTDMP_TMP_PACKAGE_NAME_PATH}/id11421108952021090302/${dt_slash_today}/"
OUTPUT_PATH9="${RTDMP_TMP_PACKAGE_NAME_PATH}/com_xunmeng_pinduoduo_oppoziyou/${dt_slash_today}/"
OUTPUT_PATH10="${RTDMP_TMP_PACKAGE_NAME_PATH}/com_xunmeng_pinduoduo_oppoziyou_notinstall/${dt_slash_today}/"
OUTPUT_PATH11="${RTDMP_TMP_PACKAGE_NAME_PATH}/com_xunmeng_pinduoduo_oppoziyou_hist_notinstall/${dt_slash_today}/"
OUTPUT_PATH12="${RTDMP_TMP_PACKAGE_NAME_PATH}/com_xunmeng_pinduoduo_oppolianmeng/${dt_slash_today}/"
OUTPUT_PATH13="${RTDMP_TMP_PACKAGE_NAME_PATH}/com_xunmeng_pinduoduo_oppolianmeng_hist1year_notinstall/${dt_slash_today}/"
OUTPUT_PATH14="${RTDMP_TMP_PACKAGE_NAME_PATH}/com_xunmeng_pinduoduo_oppolianmeng_histhalfyear_notinstall/${dt_slash_today}/"
OUTPUT_PATH15="${RTDMP_TMP_PACKAGE_NAME_PATH}/com_xunmeng_pinduoduo/${dt_slash_today}/"
hadoop fs -rm -r "${OUTPUT_PATH1}"
hadoop fs -rm -r "${OUTPUT_PATH2}"
hadoop fs -rm -r "${OUTPUT_PATH3}"
hadoop fs -rm -r "${OUTPUT_PATH4}"
hadoop fs -rm -r "${OUTPUT_PATH5}"
hadoop fs -rm -r "${OUTPUT_PATH6}"
hadoop fs -rm -r "${OUTPUT_PATH7}"
hadoop fs -rm -r "${OUTPUT_PATH8}"
hadoop fs -rm -r "${OUTPUT_PATH9}"
hadoop fs -rm -r "${OUTPUT_PATH10}"
hadoop fs -rm -r "${OUTPUT_PATH11}"
hadoop fs -rm -r "${OUTPUT_PATH12}"
hadoop fs -rm -r "${OUTPUT_PATH13}"
hadoop fs -rm -r "${OUTPUT_PATH14}"
hadoop fs -rm -r "${OUTPUT_PATH15}"
spark-submit --class mobvista.dmp.datasource.dm.RtdmpTmpId1142110895 \
......@@ -50,14 +18,9 @@ spark-submit --class mobvista.dmp.datasource.dm.RtdmpTmpId1142110895 \
--conf spark.driver.maxResultSize=4g \
--conf spark.network.timeout=720s \
--files ${HIVE_SITE_PATH} \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 6 --num-executors 130 \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 6 --num-executors 70 \
../../${JAR} -dt_today ${dt_today} -output1 ${OUTPUT_PATH1} \
-dt_day ${dt_day} \
-output2 ${OUTPUT_PATH2} -output3 ${OUTPUT_PATH3} -output4 ${OUTPUT_PATH4} -output5 ${OUTPUT_PATH5} \
-output6 ${OUTPUT_PATH6} -output7 ${OUTPUT_PATH7} -output8 ${OUTPUT_PATH8} -output9 ${OUTPUT_PATH9} \
-output10 ${OUTPUT_PATH10} -output11 ${OUTPUT_PATH11} -output12 ${OUTPUT_PATH12} -output13 ${OUTPUT_PATH13} \
-output14 ${OUTPUT_PATH14} -output15 ${OUTPUT_PATH15} \
-coalesce 780
-coalesce 420
if [[ $? -ne 0 ]]; then
......
package mobvista.dmp.datasource.dm
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.format.{RDDMultipleOutputFormat, TextMultipleOutputFormat}
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
import org.apache.spark.sql.SparkSession
import java.net.URI
/**
* @author jiangfan
* @date 2021/9/10 14:27
*/
class AdxPackagenameSynchronize extends CommonSparkJob with Serializable {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("coalesce", true, "[must] coalesce")
options.addOption("output1", true, "[must] output1")
options.addOption("dt_today", true, "[must] dt_today")
options.addOption("dt_day", true, "[must] dt_day")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else printOptions(commandLine)
val coalesce = commandLine.getOptionValue("coalesce")
val output1 = commandLine.getOptionValue("output1")
val dt_today = commandLine.getOptionValue("dt_today")
val dt_day = commandLine.getOptionValue("dt_day")
val spark = SparkSession.builder()
.appName("AdxPackagenameSynchronize")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
val conf = spark.sparkContext.hadoopConfiguration
conf.set("mapreduce.output.compress", "true")
conf.set("mapreduce.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
conf.setBoolean("mapreduce.output.fileoutputformat.compress", true)
conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
conf.setClass("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec], classOf[CompressionCodec])
val sc = spark.sparkContext
import spark.implicits._
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output1), true)
try {
val sql1=
s"""
|select package_name, device_type, device_id from dwh.dm_install_list_v2 where dt='${dt_today}' and business='dsp_req'
| and package_name in ('3876827262021090301','id3876827262021090301',
| '3332062892021090301','id3332062892021090301',
| '13403763232021090301','id13403763232021090301',
| '10442830592021090301','id10442830592021090301',
| 'com.xunmeng.pinduoduo_bes',
| 'com.ss.android.ugc.aweme_iqiyi',
| '11421108952021090302','id11421108952021090302',
| 'com.xunmeng.pinduoduo_oppoziyou',
| 'com.xunmeng.pinduoduo_oppoziyou_notinstall',
| 'com.xunmeng.pinduoduo_oppoziyou_hist_notinstall',
| 'com.xunmeng.pinduoduo_oppolianmeng',
| 'com.xunmeng.pinduoduo_oppolianmeng_hist1year_notinstall',
| 'com.xunmeng.pinduoduo_oppolianmeng_histhalfyear_notinstall'
| )
| and device_type in ('imei','gaid','oaid','idfa','imeimd5','gaidmd5','oaidmd5','idfamd5')
|union
|select package_name, device_type, device_id from dwh.dm_install_list_v2 where ( (dt='${dt_today}' and business in ('btop')) or (dt='${dt_day}' and business in ('TO')) )
| and package_name in ('com.xunmeng.pinduoduo')
| and device_type in ('imei','gaid','oaid','idfa','imeimd5','gaidmd5','oaidmd5','idfamd5')
""".stripMargin
spark.sql(sql1).rdd.map(r => {
val device_id = r.getAs[String]("device_id")
val device_type = r.getAs[String]("device_type")
var package_name = r.getAs[String]("package_name")
if(package_name.matches("^\\d+$"))
package_name="id"+package_name
(device_id, device_type, package_name)
})
.coalesce(coalesce.toInt).map(t => {
(new Text(s"${output1}/${t._3}/${t._2}"), new Text(t._1))
}).saveAsNewAPIHadoopFile(output1, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]], conf)
} finally {
spark.stop()
}
0
}
}
object AdxPackagenameSynchronize {
def main(args: Array[String]): Unit = {
new AdxPackagenameSynchronize().run(args)
}
}
......@@ -21,22 +21,7 @@ class RtdmpTmpId1142110895 extends CommonSparkJob with Serializable {
val options = new Options
options.addOption("coalesce", true, "[must] coalesce")
options.addOption("output1", true, "[must] output1")
options.addOption("output2", true, "[must] output2")
options.addOption("output3", true, "[must] output3")
options.addOption("output4", true, "[must] output4")
options.addOption("output5", true, "[must] output5")
options.addOption("output6", true, "[must] output6")
options.addOption("output7", true, "[must] output7")
options.addOption("output8", true, "[must] output8")
options.addOption("output9", true, "[must] output9")
options.addOption("output10", true, "[must] output10")
options.addOption("output11", true, "[must] output11")
options.addOption("output12", true, "[must] output12")
options.addOption("output13", true, "[must] output13")
options.addOption("output14", true, "[must] output14")
options.addOption("output15", true, "[must] output15")
options.addOption("dt_today", true, "[must] dt_today")
options.addOption("dt_day", true, "[must] dt_day")
options
}
......@@ -49,22 +34,7 @@ class RtdmpTmpId1142110895 extends CommonSparkJob with Serializable {
val coalesce = commandLine.getOptionValue("coalesce")
val output1 = commandLine.getOptionValue("output1")
val output2 = commandLine.getOptionValue("output2")
val output3 = commandLine.getOptionValue("output3")
val output4 = commandLine.getOptionValue("output4")
val output5 = commandLine.getOptionValue("output5")
val output6 = commandLine.getOptionValue("output6")
val output7 = commandLine.getOptionValue("output7")
val output8 = commandLine.getOptionValue("output8")
val output9 = commandLine.getOptionValue("output9")
val output10 = commandLine.getOptionValue("output10")
val output11 = commandLine.getOptionValue("output11")
val output12 = commandLine.getOptionValue("output12")
val output13 = commandLine.getOptionValue("output13")
val output14 = commandLine.getOptionValue("output14")
val output15 = commandLine.getOptionValue("output15")
val dt_today = commandLine.getOptionValue("dt_today")
val dt_day = commandLine.getOptionValue("dt_day")
val spark = SparkSession.builder()
.appName("RtdmpTmpId1142110895")
......@@ -80,142 +50,16 @@ class RtdmpTmpId1142110895 extends CommonSparkJob with Serializable {
import spark.implicits._
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output1), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output2), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output3), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output4), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output5), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output6), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output7), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output8), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output9), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output10), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output11), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output12), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output13), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output14), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output15), true)
try {
val sql1=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and package_name in ('1142110895','id1142110895') and device_type not in ('androidid','android_id','ruid','upid')
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and package_name in ('1142110895','id1142110895') and device_type not in ('androidid','android_id','ruid')
""".stripMargin
spark.sql(sql1).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output1}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
val sql2=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and package_name in ('3876827262021090301','id3876827262021090301') and business='dsp_req' and device_type not in ('androidid','android_id','ruid','upid')
""".stripMargin
spark.sql(sql2).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output2}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
val sql3=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and package_name in ('3332062892021090301','id3332062892021090301') and business='dsp_req' and device_type not in ('androidid','android_id','ruid','upid')
""".stripMargin
spark.sql(sql3).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output3}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
val sql4=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and package_name in ('13403763232021090301','id13403763232021090301') and business='dsp_req' and device_type not in ('androidid','android_id','ruid','upid')
""".stripMargin
spark.sql(sql4).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output4}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
val sql5=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and package_name in ('10442830592021090301','id10442830592021090301') and business='dsp_req' and device_type not in ('androidid','android_id','ruid','upid')
""".stripMargin
spark.sql(sql5).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output5}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
val sql6=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and package_name in ('com.xunmeng.pinduoduo_bes') and business='dsp_req' and device_type not in ('androidid','android_id','ruid','upid')
""".stripMargin
spark.sql(sql6).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output6}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
val sql7=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and package_name in ('com.ss.android.ugc.aweme_iqiyi') and business='dsp_req' and device_type not in ('androidid','android_id','ruid','upid')
""".stripMargin
spark.sql(sql7).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output7}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
val sql8=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and package_name in ('11421108952021090302','id11421108952021090302') and business='dsp_req' and device_type not in ('androidid','android_id','ruid','upid')
""".stripMargin
spark.sql(sql8).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output8}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
val sql9=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and package_name in ('com.xunmeng.pinduoduo_oppoziyou') and business='dsp_req' and device_type not in ('androidid','android_id','ruid','upid')
""".stripMargin
spark.sql(sql9).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output9}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
val sql10=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and package_name in ('com.xunmeng.pinduoduo_oppoziyou_notinstall') and business='dsp_req' and device_type not in ('androidid','android_id','ruid','upid')
""".stripMargin
spark.sql(sql10).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output10}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
val sql11=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and package_name in ('com.xunmeng.pinduoduo_oppoziyou_hist_notinstall') and business='dsp_req' and device_type not in ('androidid','android_id','ruid','upid')
""".stripMargin
spark.sql(sql11).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output11}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
val sql12=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and package_name in ('com.xunmeng.pinduoduo_oppolianmeng') and business='dsp_req' and device_type not in ('androidid','android_id','ruid','upid')
""".stripMargin
spark.sql(sql12).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output12}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
val sql13=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and package_name in ('com.xunmeng.pinduoduo_oppolianmeng_hist1year_notinstall') and business='dsp_req' and device_type not in ('androidid','android_id','ruid','upid')
""".stripMargin
spark.sql(sql13).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output13}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
val sql14=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and package_name in ('com.xunmeng.pinduoduo_oppolianmeng_histhalfyear_notinstall') and business='dsp_req' and device_type not in ('androidid','android_id','ruid','upid')
""".stripMargin
spark.sql(sql14).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output14}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
val sql15=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where ( (dt='${dt_today}' and business in ('btop')) or (dt='${dt_day}' and business in ('TO')) ) and package_name in ('com.xunmeng.pinduoduo') and device_type not in ('androidid','android_id','ruid','upid')
""".stripMargin
spark.sql(sql15).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output15}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
} finally {
spark.stop()
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment