Commit 3c38857b by fan.jiang

insert…

insert com.eg.android.AlipayGphone_reyun,com.taobao.litetao_reyun,com.ss.android.ugc.aweme_reyun,com.taobao.litetao_btop,com.ss.android.ugc.aweme_btop to dmp
parent f671611c
...@@ -6,26 +6,29 @@ ...@@ -6,26 +6,29 @@
source ../../dmp_env.sh source ../../dmp_env.sh
dt_today=$(date -d "$ScheduleTime 1 days ago" +"%Y%m%d") dt_today=$(date -d "$ScheduleTime 1 days ago" +"%Y%m%d")
dt_dash_today=$(date -d "$ScheduleTime 1 days ago" +"%Y/%m/%d") dt_slash_today=$(date -d "$ScheduleTime 1 days ago" +"%Y/%m/%d")
update=$(date -d "$ScheduleTime 15 days ago" +"%Y-%m-%d")
check_await "${TMP_EGGPLANTS_OUTPUT_PATH}/${dt_slash_today}"
hadoop fs -test -e "${ODS_OTHER_DEVICE_DAILY}/${dt_slash_today}"
if [ $? -ne 0 ];then
hadoop fs -mkdir -p "${ODS_OTHER_DEVICE_DAILY}/${dt_slash_today}"
fi
business1="reyun" business1="reyun"
INPUT_PATH1="${DM_INSTALL_LIST}_v2/${dt_dash_today}/${business1}" INPUT_PATH1="${DM_INSTALL_LIST}_v2/${dt_slash_today}/${business1}"
check_await ${INPUT_PATH1}/_SUCCESS check_await ${INPUT_PATH1}/_SUCCESS
business2="btop" business2="btop"
INPUT_PATH2="${DM_INSTALL_LIST}_v2/${dt_dash_today}/${business2}" INPUT_PATH2="${DM_INSTALL_LIST}_v2/${dt_slash_today}/${business2}"
check_await ${INPUT_PATH2}/_SUCCESS check_await ${INPUT_PATH2}/_SUCCESS
OUTPUT_PATH1="${RTDMP_COM_EG_ANDROID_ALIPAYGPHONE_REYUN_PATH}/${dt_dash_today}/" OUTPUT_PATH1="${RTDMP_TMP_PACKAGE_NAME_PATH}/btop_reyun_to_other/${dt_slash_today}/01"
OUTPUT_PATH2="${RTDMP_COM_TAOBAO_LITETAO_REYUN_PATH}/${dt_dash_today}/" OUTPUT_PATH2="${RTDMP_TMP_PACKAGE_NAME_PATH}/btop_reyun_to_other/${dt_slash_today}/02"
OUTPUT_PATH3="${RTDMP_COM_SS_ANDROID_UGC_AWEME_REYUN_PATH}/${dt_dash_today}/"
OUTPUT_PATH4="${RTDMP_COM_TAOBAO_LITETAO_BTOP_PATH}/${dt_dash_today}/"
OUTPUT_PATH5="${RTDMP_COM_SS_ANDROID_UGC_AWEME_BTOP_PATH}/${dt_dash_today}/"
hadoop fs -rm -r "${OUTPUT_PATH1}" hadoop fs -rm -r "${OUTPUT_PATH1}"
hadoop fs -rm -r "${OUTPUT_PATH2}" hadoop fs -rm -r "${OUTPUT_PATH2}"
hadoop fs -rm -r "${OUTPUT_PATH3}"
hadoop fs -rm -r "${OUTPUT_PATH4}"
hadoop fs -rm -r "${OUTPUT_PATH5}"
spark-submit --class mobvista.dmp.datasource.dm.ComEgAndroidAlipayGphoneReyun \ spark-submit --class mobvista.dmp.datasource.dm.ComEgAndroidAlipayGphoneReyun \
--conf spark.yarn.executor.memoryOverhead=2048 \ --conf spark.yarn.executor.memoryOverhead=2048 \
...@@ -34,11 +37,25 @@ spark-submit --class mobvista.dmp.datasource.dm.ComEgAndroidAlipayGphoneReyun \ ...@@ -34,11 +37,25 @@ spark-submit --class mobvista.dmp.datasource.dm.ComEgAndroidAlipayGphoneReyun \
--conf spark.driver.maxResultSize=4g \ --conf spark.driver.maxResultSize=4g \
--conf spark.network.timeout=720s \ --conf spark.network.timeout=720s \
--files ${HIVE_SITE_PATH} \ --files ${HIVE_SITE_PATH} \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 6 --num-executors 20 \ --master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 6 --num-executors 10 \
../../${JAR} -dt_today ${dt_today} -output1 ${OUTPUT_PATH1} -output2 ${OUTPUT_PATH2} -output3 ${OUTPUT_PATH3} -output4 ${OUTPUT_PATH4} -output5 ${OUTPUT_PATH5} \ ../../${JAR} -dt_today ${dt_today} -output1 ${OUTPUT_PATH1} -output2 ${OUTPUT_PATH2} -update ${update} \
-coalesce 200 -coalesce 200
if [[ $? -ne 0 ]]; then if [[ $? -ne 0 ]]; then
exit 255 exit 255
fi fi
hadoop distcp -m20 "${OUTPUT_PATH1}/*" "${TMP_EGGPLANTS_OUTPUT_PATH}/${dt_slash_today}/"
hadoop distcp -m20 "${OUTPUT_PATH2}/*" "${ODS_OTHER_DEVICE_DAILY}/${dt_slash_today}/"
: '
业务说明:
将这五个伪包名设备
com.eg.android.AlipayGphone_reyun
com.taobao.litetao_reyun
com.ss.android.ugc.aweme_reyun
com.taobao.litetao_btop
com.ss.android.ugc.aweme_btop
天级别入库dmp的business=other分区
'
\ No newline at end of file
...@@ -5,7 +5,9 @@ import mobvista.dmp.format.TextMultipleOutputFormat ...@@ -5,7 +5,9 @@ import mobvista.dmp.format.TextMultipleOutputFormat
import org.apache.commons.cli.Options import org.apache.commons.cli.Options
import org.apache.commons.lang.StringUtils import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.Text import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
import org.apache.spark.broadcast.Broadcast import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions.{concat_ws, lit} import org.apache.spark.sql.functions.{concat_ws, lit}
...@@ -25,10 +27,8 @@ class ComEgAndroidAlipayGphoneReyun extends CommonSparkJob with Serializable { ...@@ -25,10 +27,8 @@ class ComEgAndroidAlipayGphoneReyun extends CommonSparkJob with Serializable {
options.addOption("coalesce", true, "[must] coalesce") options.addOption("coalesce", true, "[must] coalesce")
options.addOption("output1", true, "[must] output1") options.addOption("output1", true, "[must] output1")
options.addOption("output2", true, "[must] output2") options.addOption("output2", true, "[must] output2")
options.addOption("output3", true, "[must] output3")
options.addOption("output4", true, "[must] output4")
options.addOption("output5", true, "[must] output5")
options.addOption("dt_today", true, "[must] dt_today") options.addOption("dt_today", true, "[must] dt_today")
options.addOption("update", true, "[must] update")
options options
} }
...@@ -42,10 +42,8 @@ class ComEgAndroidAlipayGphoneReyun extends CommonSparkJob with Serializable { ...@@ -42,10 +42,8 @@ class ComEgAndroidAlipayGphoneReyun extends CommonSparkJob with Serializable {
val coalesce = commandLine.getOptionValue("coalesce") val coalesce = commandLine.getOptionValue("coalesce")
val output1 = commandLine.getOptionValue("output1") val output1 = commandLine.getOptionValue("output1")
val output2 = commandLine.getOptionValue("output2") val output2 = commandLine.getOptionValue("output2")
val output3 = commandLine.getOptionValue("output3")
val output4 = commandLine.getOptionValue("output4")
val output5 = commandLine.getOptionValue("output5")
val dt_today = commandLine.getOptionValue("dt_today") val dt_today = commandLine.getOptionValue("dt_today")
val update = commandLine.getOptionValue("update")
val spark = SparkSession.builder() val spark = SparkSession.builder()
.appName("ComEgAndroidAlipayGphoneReyun") .appName("ComEgAndroidAlipayGphoneReyun")
...@@ -62,66 +60,49 @@ class ComEgAndroidAlipayGphoneReyun extends CommonSparkJob with Serializable { ...@@ -62,66 +60,49 @@ class ComEgAndroidAlipayGphoneReyun extends CommonSparkJob with Serializable {
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output1), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output1), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output2), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output2), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output3), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output4), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output5), true)
try { try {
val sql1=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and business='reyun' and package_name='com.eg.android.AlipayGphone' and device_type not in ('androidid','android_id')
""".stripMargin
spark.sql(sql1).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt) val conf = spark.sparkContext.hadoopConfiguration
.saveAsNewAPIHadoopFile(s"${output1}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration) conf.set("mapreduce.output.compress", "true")
conf.set("mapreduce.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
conf.setBoolean("mapreduce.output.fileoutputformat.compress", true)
conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
conf.setClass("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec], classOf[CompressionCodec])
val sql2= val sql1=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and business='reyun' and package_name='com.taobao.litetao' and device_type not in ('androidid','android_id')
""".stripMargin
spark.sql(sql2).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output2}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
val sql3=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and business='reyun' and package_name='com.ss.android.ugc.aweme' and device_type not in ('androidid','android_id')
""".stripMargin
spark.sql(sql3).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output3}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
val sql4=
s""" s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and business='btop' and package_name='com.taobao.litetao' and device_type not in ('androidid','android_id') |select
| device_id, device_type ,concat("[\\"",package_name,"_",business,"\\"]") as package_name
|from
| dwh.dm_install_list_v2
|where dt='${dt_today}' and business='reyun' and package_name='com.eg.android.AlipayGphone'
| and device_type in ('imei','gaid','oaid','idfa','imeimd5','gaidmd5','oaidmd5','idfamd5')
| and update_date >= "${update}"
|union
|select
| device_id, device_type ,concat("[\\"",package_name,"_",business,"\\"]") as package_name
|from
| dwh.dm_install_list_v2
|where dt='${dt_today}' and business in ('reyun','btop')
| and package_name in ('com.taobao.litetao','com.ss.android.ugc.aweme')
| and device_type in ('imei','gaid','oaid','idfa','imeimd5','gaidmd5','oaidmd5','idfamd5')
| and update_date >= "${update}"
""".stripMargin """.stripMargin
spark.sql(sql4).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt) val df01: DataFrame = spark.sql(sql1).persist(StorageLevel.MEMORY_AND_DISK_SER)
.saveAsNewAPIHadoopFile(s"${output4}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
val sql5=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and business='btop' and package_name='com.ss.android.ugc.aweme' and device_type not in ('androidid','android_id')
""".stripMargin
spark.sql(sql5).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt) val data01 = df01.select(concat_ws("\t", df01.col("device_id"), df01.col("device_type"), lit("android"),df01.col("package_name")))
.saveAsNewAPIHadoopFile(s"${output5}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration) val data01_with_country = df01.select(concat_ws("\t", df01.col("device_id"), df01.col("device_type"), lit("android"),lit("CN")))
data01.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output1)
data01_with_country.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output2)
} finally { } finally {
spark.stop() spark.stop()
} }
0 0
} }
def buildRes( row: Row): Array[Tuple2[Text, Text]] = {
val buffer = new ArrayBuffer[Tuple2[Text, Text]]()
val device_id = row.getAs[String]("device_id")
val device_type = row.getAs[String]("device_type")
if (StringUtils.isNotBlank(device_type)) {
buffer += Tuple2(new Text(s"${device_type}, "), new Text(device_id))
}
buffer.toArray
}
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment