Commit 68561a03 by fan.jiang

com.topon_topltv_1015

parent ac61d839
type=command
command=sh -x com.topon_topltv_1015.sh
\ No newline at end of file
#!/usr/bin/env bash
source ../../dmp_env.sh
begin_day=$(date -d "$ScheduleTime 13 days ago" +"%Y-%m-%d")
end_day01=$(date -d "$ScheduleTime 7 days ago" +"%Y-%m-%d")
end_day02=$(date -d "$ScheduleTime 1 days ago" +"%Y-%m-%d")
dt_slash_today=$(date -d "$ScheduleTime 1 days ago" +"%Y/%m/%d")
check_await "${TMP_EGGPLANTS_OUTPUT_PATH}/${dt_slash_today}"
hadoop fs -test -e "${ODS_OTHER_DEVICE_DAILY}/${dt_slash_today}"
if [ $? -ne 0 ];then
hadoop fs -mkdir -p "${ODS_OTHER_DEVICE_DAILY}/${dt_slash_today}"
fi
OUTPUT_PATH01="${TMP_COM_TOPON_TOPLTV_1015_PATH}/${dt_slash_today}/01"
OUTPUT_PATH02="${TMP_COM_TOPON_TOPLTV_1015_PATH}/${dt_slash_today}/02"
hadoop fs -rm -r "${OUTPUT_PATH01}"
hadoop fs -rm -r "${OUTPUT_PATH02}"
spark-submit --class mobvista.dmp.datasource.dm.ComToponTopltv1015 \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.default.parallelism=2000 \
--conf spark.sql.shuffle.partitions=2000 \
--conf spark.driver.maxResultSize=4g \
--conf spark.network.timeout=720s \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 5 --num-executors 40 \
../../${JAR} -begin_day ${begin_day} -end_day01 ${end_day01} -end_day02 ${end_day02} -output01 ${OUTPUT_PATH01} -output02 ${OUTPUT_PATH02} -coalesce 400
if [[ $? -ne 0 ]]; then
exit 255
fi
hadoop distcp -m20 "${OUTPUT_PATH01}/*" "${TMP_EGGPLANTS_OUTPUT_PATH}/${dt_slash_today}/"
hadoop distcp -m20 "${OUTPUT_PATH02}/*" "${ODS_OTHER_DEVICE_DAILY}/${dt_slash_today}/"
...@@ -270,6 +270,7 @@ TMP_COM_YOUKU_PHONE_WAX_NOBID_PATH="s3://mob-emr-test/dataplatform/DataWareHouse ...@@ -270,6 +270,7 @@ TMP_COM_YOUKU_PHONE_WAX_NOBID_PATH="s3://mob-emr-test/dataplatform/DataWareHouse
TMP_COM_BTOP_TIKTOKRV_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com_btop_tiktokrv" TMP_COM_BTOP_TIKTOKRV_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com_btop_tiktokrv"
TMP_COM_BTOP_TIKTOKRV_GAID_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com_btop_tiktokrv_gaid" TMP_COM_BTOP_TIKTOKRV_GAID_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com_btop_tiktokrv_gaid"
TMP_REYUN_LAHUO_LIST_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/reyun_lahuo_list" TMP_REYUN_LAHUO_LIST_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/reyun_lahuo_list"
TMP_COM_TOPON_TOPLTV_1015_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com.topon_topltv_1015"
RTDMP_COM_EG_ANDROID_ALIPAYGPHONE_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_eg_android_AlipayGphone_reyun" RTDMP_COM_EG_ANDROID_ALIPAYGPHONE_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_eg_android_AlipayGphone_reyun"
RTDMP_COM_TAOBAO_LITETAO_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_taobao_litetao_reyun" RTDMP_COM_TAOBAO_LITETAO_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_taobao_litetao_reyun"
RTDMP_COM_SS_ANDROID_UGC_AWEME_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_ss_android_ugc_aweme_reyun" RTDMP_COM_SS_ANDROID_UGC_AWEME_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_ss_android_ugc_aweme_reyun"
......
package mobvista.dmp.datasource.dm
import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.{concat_ws, lit}
import org.apache.spark.storage.StorageLevel
import java.net.URI
/**
* @author jiangfan
* @date 2021/10/15 16:22
*/
class ComToponTopltv1015 extends CommonSparkJob with Serializable {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("coalesce", true, "[must] coalesce")
options.addOption("output01", true, "[must] output01")
options.addOption("output02", true, "[must] output02")
options.addOption("begin_day", true, "[must] begin_day")
options.addOption("end_day01", true, "[must] end_day01")
options.addOption("end_day02", true, "[must] end_day02")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else printOptions(commandLine)
val coalesce = commandLine.getOptionValue("coalesce")
val output01 = commandLine.getOptionValue("output01")
val output02 = commandLine.getOptionValue("output02")
val begin_day = commandLine.getOptionValue("begin_day")
val end_day01 = commandLine.getOptionValue("end_day01")
val end_day02 = commandLine.getOptionValue("end_day02")
val spark = SparkSession.builder()
.appName("ComToponTopltv1015")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true)
try {
val imei_sql=
s"""
|select device_id
|from
|(select device_id,row_number() over (partition by device_id order by LTV desc) as ranking
|from
|(
|select sum(case when imp.imei not in ('0','NULL') then bidprice else NULL end) as LTV,imp.imei as device_id
|from
|(select *,concat(yyyy,'-',mm,'-',dd) as day
|from uparpu_main.uparpu_tk_impression_v2
|where concat(yyyy,'-',mm,'-',dd) >= '${begin_day}'
|and concat(yyyy,'-',mm,'-',dd) <= '${end_day02}'
|) as imp
|inner join
|(select new_device_id,dt
|from uparpu_main.uparpu_new_user
|where dt >= '${begin_day}'
|and dt <= '${end_day01}'
|) as uu
|on imp.upid = uu.new_device_id and datediff(imp.day,uu.dt)<7
|group by imp.imei
|) as t
|) as t2
|where ranking/(select count(device_id)
|from
|(
|select sum(case when imp.imei not in ('0','NULL') then bidprice else NULL end) as LTV,imp.imei as device_id
|from
|(select *,concat(yyyy,'-',mm,'-',dd) as day
|from uparpu_main.uparpu_tk_impression_v2
|where concat(yyyy,'-',mm,'-',dd) >= '${begin_day}'
|and concat(yyyy,'-',mm,'-',dd) <= '${end_day02}'
|) as imp
|inner join
|(select new_device_id,dt
|from uparpu_main.uparpu_new_user
|where dt >= '${begin_day}'
|and dt <= '${end_day01}'
|) as uu
|on imp.upid = uu.new_device_id and datediff(imp.day,uu.dt)<7
|group by imp.imei
|) as t)<0.3
""".stripMargin
val oaid_sql =
s"""
|select device_id
|from
|(select device_id,row_number() over (partition by device_id order by LTV desc) as ranking
|from
|(
|select sum(case when imp.oaid not in ('0','NULL') then bidprice else NULL end) as LTV,imp.oaid as device_id
|from
|(select *,concat(yyyy,'-',mm,'-',dd) as day
|from uparpu_main.uparpu_tk_impression_v2
|where concat(yyyy,'-',mm,'-',dd) >= '${begin_day}'
|and concat(yyyy,'-',mm,'-',dd) <= '${end_day02}'
|) as imp
|inner join
|(select new_device_id,dt
|from uparpu_main.uparpu_new_user
|where dt >= '${begin_day}'
|and dt <= '${end_day01}'
|) as uu
|on imp.upid = uu.new_device_id and datediff(imp.day,uu.dt)<7
|group by imp.oaid
|) as t
|) as t2
|where ranking/(select count(device_id)
|from
|(
|select sum(case when imp.oaid not in ('0','NULL') then bidprice else NULL end) as LTV,imp.oaid as device_id
|from
|(select *,concat(yyyy,'-',mm,'-',dd) as day
|from uparpu_main.uparpu_tk_impression_v2
|where concat(yyyy,'-',mm,'-',dd) >= '${begin_day}'
|and concat(yyyy,'-',mm,'-',dd) <= '${end_day02}'
|) as imp
|inner join
|(select new_device_id,dt
|from uparpu_main.uparpu_new_user
|where dt >= '${begin_day}'
|and dt <= '${end_day01}'
|) as uu
|on imp.upid = uu.new_device_id and datediff(imp.day,uu.dt)<7
|group by imp.oaid
|) as t)<0.3
|""".stripMargin
println("imei_sql=="+imei_sql)
println("oaid_sql=="+oaid_sql)
val imei_df: DataFrame = spark.sql(imei_sql).persist(StorageLevel.MEMORY_AND_DISK_SER)
val imei_df_with_package_name = imei_df.select(concat_ws("\t", imei_df.col("device_id"), lit("imei"), lit("android"),lit("[\"com.topon_topltv_1015\"]")))
val imei_df_with_country = imei_df.select(concat_ws("\t", imei_df.col("device_id"), lit("imei"), lit("android"),lit("CN")))
val oaid_df: DataFrame = spark.sql(oaid_sql).persist(StorageLevel.MEMORY_AND_DISK_SER)
val oaid_df_with_package_name = oaid_df.select(concat_ws("\t", oaid_df.col("device_id"), lit("imei"), lit("android"),lit("[\"com.topon_topltv_1015\"]")))
val oaid_df_with_country = oaid_df.select(concat_ws("\t", oaid_df.col("device_id"), lit("imei"), lit("android"),lit("CN")))
imei_df_with_package_name.union(oaid_df_with_package_name).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
imei_df_with_country.union(oaid_df_with_country).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)
} finally {
spark.stop()
}
0
}
}
object ComToponTopltv1015 {
def main(args: Array[String]): Unit = {
new ComToponTopltv1015().run(args)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment