Commit af979033 by fan.jiang

com.3app.xianjindai

parent d35048e7
type=command
command=sh -x com_3app_xianjindai.sh
\ No newline at end of file
#!/usr/bin/env bash
source ../../dmp_env.sh
to_day=$(date -d "$ScheduleTime 1 days ago" +"%Y%m%d")
dt_slash_today=$(date -d "$ScheduleTime 1 days ago" +"%Y/%m/%d")
check_await "${TMP_EGGPLANTS_OUTPUT_PATH}/${dt_slash_today}"
hadoop fs -test -e "${ODS_OTHER_DEVICE_DAILY}/${dt_slash_today}"
if [ $? -ne 0 ];then
hadoop fs -mkdir -p "${ODS_OTHER_DEVICE_DAILY}/${dt_slash_today}"
fi
OUTPUT_PATH01="${TMP_COM_3APP_XIANJINDAI_PATH}/${dt_slash_today}/01"
OUTPUT_PATH02="${TMP_COM_3APP_XIANJINDAI_PATH}/${dt_slash_today}/02"
hadoop fs -rm -r "${OUTPUT_PATH01}"
hadoop fs -rm -r "${OUTPUT_PATH02}"
spark-submit --class mobvista.dmp.datasource.dm.Com3appXianjindai \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.default.parallelism=2000 \
--conf spark.sql.shuffle.partitions=2000 \
--conf spark.driver.maxResultSize=4g \
--conf spark.network.timeout=720s \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 5 --num-executors 40 \
../../${JAR} -to_day ${to_day} -output01 ${OUTPUT_PATH01} -output02 ${OUTPUT_PATH02} -coalesce 400
if [[ $? -ne 0 ]]; then
exit 255
fi
hadoop distcp -m20 "${OUTPUT_PATH01}/*" "${TMP_EGGPLANTS_OUTPUT_PATH}/${dt_slash_today}/"
hadoop distcp -m20 "${OUTPUT_PATH02}/*" "${ODS_OTHER_DEVICE_DAILY}/${dt_slash_today}/"
: '
任务说明
现金贷人群包天级别入库需求
'
\ No newline at end of file
......@@ -282,6 +282,7 @@ TMP_COM_BTOP_TIKTOKRV_GAID_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/da
TMP_REYUN_LAHUO_LIST_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/reyun_lahuo_list"
TMP_COM_TOPON_TOPLTV_1015_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com.topon_topltv_1015"
TMP_COM_REYUN_PRACTICALTOOL_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com.reyun_practicaltool"
TMP_COM_3APP_XIANJINDAI_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com.3app.xianjindai"
RTDMP_COM_EG_ANDROID_ALIPAYGPHONE_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_eg_android_AlipayGphone_reyun"
RTDMP_COM_TAOBAO_LITETAO_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_taobao_litetao_reyun"
RTDMP_COM_SS_ANDROID_UGC_AWEME_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_ss_android_ugc_aweme_reyun"
......
package mobvista.dmp.datasource.dm
import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.{concat_ws, lit}
import org.apache.spark.storage.StorageLevel
import java.net.URI
/**
* @author jiangfan
* @date 2021/11/4 14:50
*/
class Com3appXianjindai extends CommonSparkJob with Serializable{
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("coalesce", true, "[must] coalesce")
options.addOption("output01", true, "[must] output01")
options.addOption("output02", true, "[must] output02")
options.addOption("to_day", true, "[must] to_day")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else printOptions(commandLine)
val coalesce = commandLine.getOptionValue("coalesce")
val output01 = commandLine.getOptionValue("output01")
val output02 = commandLine.getOptionValue("output02")
val to_day = commandLine.getOptionValue("to_day")
val spark = SparkSession.builder()
.appName("Com3appXianjindai")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
val conf = spark.sparkContext.hadoopConfiguration
conf.set("mapreduce.output.compress", "true")
conf.set("mapreduce.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
conf.setBoolean("mapreduce.output.fileoutputformat.compress", true)
conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
conf.setClass("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec], classOf[CompressionCodec])
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true)
try {
//需求原本是从adn_dsp.log_adn_dsp_request_orc_hour取数据,但这个表计算量大,发现dwh.etl_dsp_request_daily_hours 保存了adn_dsp.log_adn_dsp_request_orc_hour相关数据,读取该表,减小计算量
val sql01=
s"""
|select
|device_id
|from
|(
|select
|device_id, count(distinct t1.appid) as app_num
|from
|(
|select appid, device_id
|from
| (select
| appid,device_id
| from
| (select
| packagename,split(exitid,',')[0] as device_id
| FROM
| dwh.etl_dsp_request_daily_hours
| where dt= '${to_day}'
| and country = 'MX' and platform = 'android'
| ) tmpdata lateral view explode(split(packagename, "#")) num as appid
| ) t01
|where device_id not in ('0','','-','00000000-0000-0000-0000-000000000000') and device_id rlike '${didPtn}'
|group by appid,device_id
|) t1
|group by device_id )t2
|where app_num>=3
|group by device_id
""".stripMargin
println("sql01=="+sql01)
val gaid_df: DataFrame = spark.sql(sql01).persist(StorageLevel.MEMORY_AND_DISK_SER)
val gaid_df_with_package_name = gaid_df.select(concat_ws("\t", gaid_df.col("device_id"), lit("gaid"), lit("android"),lit("[\"com.3app.xianjindai\"]")))
val gaid_df_with_country = gaid_df.select(concat_ws("\t", gaid_df.col("device_id"), lit("gaid"), lit("android"),lit("MX")))
gaid_df_with_package_name.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
gaid_df_with_country.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)
} finally {
spark.stop()
}
0
}
}
object Com3appXianjindai {
def main(args: Array[String]): Unit = {
new Com3appXianjindai().run(args)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment