Commit c40d552d by fan.jiang

develop com_reyun_practicaltool.job

parent 7dddf739
type=command
command=sh -x com_reyun_practicaltool.sh
#!/usr/bin/env bash
source ../../dmp_env.sh
dt_slash_today=$(date -d "$ScheduleTime 1 days ago" +"%Y/%m/%d")
check_await "${TMP_EGGPLANTS_OUTPUT_PATH}/${dt_slash_today}"
hadoop fs -test -e "${ODS_OTHER_DEVICE_DAILY}/${dt_slash_today}"
if [ $? -ne 0 ];then
hadoop fs -mkdir -p "${ODS_OTHER_DEVICE_DAILY}/${dt_slash_today}"
fi
OUTPUT_PATH01="${TMP_COM_REYUN_PRACTICALTOOL_PATH}/${dt_slash_today}/01"
OUTPUT_PATH02="${TMP_COM_REYUN_PRACTICALTOOL_PATH}/${dt_slash_today}/02"
hadoop fs -rm -r "${OUTPUT_PATH01}"
hadoop fs -rm -r "${OUTPUT_PATH02}"
spark-submit --class mobvista.dmp.datasource.dm.ComReyunPracticaltool \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.default.parallelism=2000 \
--conf spark.sql.shuffle.partitions=2000 \
--conf spark.driver.maxResultSize=4g \
--conf spark.network.timeout=720s \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 5 --num-executors 10 \
../../${JAR} -output01 ${OUTPUT_PATH01} -output02 ${OUTPUT_PATH02} -coalesce 400
if [[ $? -ne 0 ]]; then
exit 255
fi
hadoop distcp -m20 "${OUTPUT_PATH01}/*" "${TMP_EGGPLANTS_OUTPUT_PATH}/${dt_slash_today}/"
hadoop distcp -m20 "${OUTPUT_PATH02}/*" "${ODS_OTHER_DEVICE_DAILY}/${dt_slash_today}/"
: '
任务说明
运营侧有需要针对热云提供给我们的标签,拉取对应标签的设备进行伪包名入库的需求
运行以下sql
select
device_id
from dwh.device_tag_weight_event_all_weekly
where dt='20210922'
and id_type in('oaid','imei')
and tag_code = '0414' ---需要更改tag_code
group by device_id
tag_code和人群包的命名关系如下
一共有5个,需要限制不同的tag_code
0414-com.reyun_practicaltool
041404-com.reyun_clean
040507-com.reyun_smalltool
041403-com.reyun_wifi
041406-com.reyun_security
'
\ No newline at end of file
...@@ -271,6 +271,7 @@ TMP_COM_BTOP_TIKTOKRV_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dw ...@@ -271,6 +271,7 @@ TMP_COM_BTOP_TIKTOKRV_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dw
TMP_COM_BTOP_TIKTOKRV_GAID_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com_btop_tiktokrv_gaid" TMP_COM_BTOP_TIKTOKRV_GAID_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com_btop_tiktokrv_gaid"
TMP_REYUN_LAHUO_LIST_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/reyun_lahuo_list" TMP_REYUN_LAHUO_LIST_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/reyun_lahuo_list"
TMP_COM_TOPON_TOPLTV_1015_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com.topon_topltv_1015" TMP_COM_TOPON_TOPLTV_1015_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com.topon_topltv_1015"
TMP_COM_REYUN_PRACTICALTOOL_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com.reyun_practicaltool"
RTDMP_COM_EG_ANDROID_ALIPAYGPHONE_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_eg_android_AlipayGphone_reyun" RTDMP_COM_EG_ANDROID_ALIPAYGPHONE_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_eg_android_AlipayGphone_reyun"
RTDMP_COM_TAOBAO_LITETAO_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_taobao_litetao_reyun" RTDMP_COM_TAOBAO_LITETAO_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_taobao_litetao_reyun"
RTDMP_COM_SS_ANDROID_UGC_AWEME_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_ss_android_ugc_aweme_reyun" RTDMP_COM_SS_ANDROID_UGC_AWEME_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_ss_android_ugc_aweme_reyun"
......
package mobvista.dmp.datasource.dm
import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.{concat_ws, lit}
import org.apache.spark.storage.StorageLevel
import java.net.URI
/**
* @author jiangfan
* @date 2021/10/20 17:45
*/
class ComReyunPracticaltool extends CommonSparkJob with Serializable {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("coalesce", true, "[must] coalesce")
options.addOption("output01", true, "[must] output01")
options.addOption("output02", true, "[must] output02")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else printOptions(commandLine)
val coalesce = commandLine.getOptionValue("coalesce")
val output01 = commandLine.getOptionValue("output01")
val output02 = commandLine.getOptionValue("output02")
val spark = SparkSession.builder()
.appName("ComReyunPracticaltool")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true)
try {
val sql01 =
s"""
|select
|device_id,id_type as device_type,
| case when tag_code =='0414' then concat("[\\"","com.reyun_practicaltool","\\"]")
| when tag_code = '041404' then concat("[\\"","com.reyun_clean","\\"]")
| when tag_code = '040507' then concat("[\\"","com.reyun_smalltool","\\"]")
| when tag_code = '041403' then concat("[\\"","com.reyun_wifi","\\"]")
| when tag_code = '041406' then concat("[\\"","com.reyun_security","\\"]")
| end as package_name
|from dwh.device_tag_weight_event_all_weekly
|where dt='20210922'
|and id_type in ('oaid','imei')
|and tag_code in ('0414','041404','040507','041403','041406')
|group by device_id,id_type,tag_code
""".stripMargin
println("imei_sql==" + sql01)
val df: DataFrame = spark.sql(sql01).persist(StorageLevel.MEMORY_AND_DISK_SER)
val df_with_package_name = df.select(concat_ws("\t", df.col("device_id"), df.col("device_type"), lit("android"), df.col("package_name")))
val df_with_country = df.select(concat_ws("\t", df.col("device_id"), df.col("device_type"), lit("android"), lit("CN")))
df_with_package_name.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
df_with_country.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)
} finally {
spark.stop()
}
0
}
}
object ComReyunPracticaltool {
def main(args: Array[String]): Unit = {
new ComReyunPracticaltool().run(args)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment