Commit 3d9c4d26 by fan.jiang

reyun_lahuo_list

parent 0963a022
type=command
retries=3
command=sh -x reyun_lahuo_list.sh
\ No newline at end of file
#!/usr/bin/env bash
source ../../dmp_env.sh
source ./../../ga_rawdata_analysis/common/tools.sh
dt_today=$(date -d "$ScheduleTime 1 days ago" +"%Y%m%d")
dt_slash_today=$(date -d "$ScheduleTime 1 days ago" +"%Y/%m/%d")
update_date=$(date -d "$ScheduleTime 7 days ago" +"%Y-%m-%d")
INPUT_PATH1="${DM_INSTALL_LIST}_v2/${dt_slash_today}/reyun"
check_await ${INPUT_PATH1}/_SUCCESS
check_await "${TMP_EGGPLANTS_OUTPUT_PATH}/${dt_slash_today}"
hadoop fs -test -e "${ODS_OTHER_DEVICE_DAILY}/${dt_slash_today}"
if [ $? -ne 0 ];then
hadoop fs -mkdir -p "${ODS_OTHER_DEVICE_DAILY}/${dt_slash_today}"
fi
OUTPUT_PATH01="${TMP_REYUN_LAHUO_LIST_PATH}/${dt_slash_today}/01"
OUTPUT_PATH02="${TMP_REYUN_LAHUO_LIST_PATH}/${dt_slash_today}/02"
hadoop fs -rm -r "${OUTPUT_PATH01}"
hadoop fs -rm -r "${OUTPUT_PATH02}"
spark-submit --class mobvista.dmp.datasource.dm.ReyunLahuoList \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.default.parallelism=3000 \
--conf spark.sql.shuffle.partitions=3000 \
--conf spark.network.timeout=720s \
--files ${HIVE_SITE_PATH} \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 4g --executor-cores 6 --num-executors 10 \
../../${JAR} -dt_today ${dt_today} -output01 ${OUTPUT_PATH01} -output02 ${OUTPUT_PATH02} -update_date ${update_date}\
-coalesce 200
if [[ $? -ne 0 ]]; then
exit 255
fi
hadoop distcp -m20 "${OUTPUT_PATH01}/*" "${TMP_EGGPLANTS_OUTPUT_PATH}/${dt_slash_today}/"
hadoop distcp -m20 "${OUTPUT_PATH02}/*" "${ODS_OTHER_DEVICE_DAILY}/${dt_slash_today}/"
: '
需求说明
取reyun分区下列六个包最近七天的的数据入库安装列表other分区
"com.ss.android.ugc.aweme","com.taobao.idlefish","com.taobao.taobao","com.UCMobile","com.xunmeng.pinduoduo","me.ele"
'
\ No newline at end of file
...@@ -267,6 +267,7 @@ TMP_INTEREST_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/dm_ ...@@ -267,6 +267,7 @@ TMP_INTEREST_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/dm_
TMP_COM_YOUKU_PHONE_WAX_NOBID_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com_youku_phone_wax_nobid" TMP_COM_YOUKU_PHONE_WAX_NOBID_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com_youku_phone_wax_nobid"
TMP_COM_BTOP_TIKTOKRV_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com_btop_tiktokrv" TMP_COM_BTOP_TIKTOKRV_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com_btop_tiktokrv"
TMP_COM_BTOP_TIKTOKRV_GAID_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com_btop_tiktokrv_gaid" TMP_COM_BTOP_TIKTOKRV_GAID_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com_btop_tiktokrv_gaid"
TMP_REYUN_LAHUO_LIST_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/reyun_lahuo_list"
RTDMP_COM_EG_ANDROID_ALIPAYGPHONE_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_eg_android_AlipayGphone_reyun" RTDMP_COM_EG_ANDROID_ALIPAYGPHONE_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_eg_android_AlipayGphone_reyun"
RTDMP_COM_TAOBAO_LITETAO_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_taobao_litetao_reyun" RTDMP_COM_TAOBAO_LITETAO_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_taobao_litetao_reyun"
RTDMP_COM_SS_ANDROID_UGC_AWEME_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_ss_android_ugc_aweme_reyun" RTDMP_COM_SS_ANDROID_UGC_AWEME_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_ss_android_ugc_aweme_reyun"
......
package mobvista.dmp.datasource.dm
import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
import org.apache.spark.sql.functions.{concat_ws, lit}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import java.net.URI
/**
* @author jiangfan
* @date 2021/9/15 17:13
*/
class ReyunLahuoList extends CommonSparkJob with Serializable {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("coalesce", true, "[must] coalesce")
options.addOption("output01", true, "[must] output01")
options.addOption("output02", true, "[must] output02")
options.addOption("dt_today", true, "[must] dt_today")
options.addOption("update_date", true, "[must] update_date")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else printOptions(commandLine)
val coalesce = commandLine.getOptionValue("coalesce")
val output01 = commandLine.getOptionValue("output01")
val output02 = commandLine.getOptionValue("output02")
val dt_today = commandLine.getOptionValue("dt_today")
val update_date = commandLine.getOptionValue("update_date")
val spark = SparkSession.builder()
.appName("ReyunLahuoList")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
val conf = spark.sparkContext.hadoopConfiguration
conf.set("mapreduce.output.compress", "true")
conf.set("mapreduce.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
conf.setBoolean("mapreduce.output.fileoutputformat.compress", true)
conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
conf.setClass("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec], classOf[CompressionCodec])
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true)
try {
val sql1=
s"""
|select device_id, device_type, concat("[\\"",package_name,"_reyun","\\"]") as package_name
|from
| dwh.dm_install_list_v2
|where dt='${dt_today}' and business='reyun' and update_date>='${update_date}'
| and package_name in ('com.ss.android.ugc.aweme','com.taobao.idlefish','com.taobao.taobao','com.UCMobile','com.xunmeng.pinduoduo','me.ele')
""".stripMargin
val df01: DataFrame = spark.sql(sql1).persist(StorageLevel.MEMORY_AND_DISK_SER)
val data = df01.select(concat_ws("\t", df01.col("device_id"), df01.col("device_type"), lit("android"),df01.col("package_name")))
val data_with_country = df01.select(concat_ws("\t", df01.col("device_id"), df01.col("device_type"), lit("android"),lit("CN")))
data.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
data_with_country.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)
} finally {
spark.stop()
}
0
}
}
object ReyunLahuoList {
def main(args: Array[String]): Unit = {
new ReyunLahuoList().run(args)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment