Commit e9771023 by fan.jiang

com.lazada_noregister_30p , com.lazada_noregister_70p

parent 41268e5d
type=command
command=sh -x com_lazada_noregister_70p_30p.sh
\ No newline at end of file
#!/usr/bin/env bash
source ../../dmp_env.sh
first_day=$(date -d "$ScheduleTime 2 days ago" +"%Y%m%d")
second_day=$(date -d "$ScheduleTime 1 days ago" +"%Y%m%d")
dt_slash_today=$(date -d "$ScheduleTime 1 days ago" +"%Y/%m/%d")
check_await "${TMP_EGGPLANTS_OUTPUT_PATH}/${dt_slash_today}"
hadoop fs -test -e "${ODS_OTHER_DEVICE_DAILY}/${dt_slash_today}"
if [ $? -ne 0 ];then
hadoop fs -mkdir -p "${ODS_OTHER_DEVICE_DAILY}/${dt_slash_today}"
fi
OUTPUT_PATH01="${TMP_COM_LAZADA_NOREGISTER_70P_30P_PATH}/${dt_slash_today}/01"
OUTPUT_PATH02="${TMP_COM_LAZADA_NOREGISTER_70P_30P_PATH}/${dt_slash_today}/02"
hadoop fs -rm -r "${OUTPUT_PATH01}"
hadoop fs -rm -r "${OUTPUT_PATH02}"
spark-submit --class mobvista.dmp.datasource.dm.ComLazadaNoregister70p30p \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.default.parallelism=2000 \
--conf spark.sql.shuffle.partitions=2000 \
--conf spark.driver.maxResultSize=4g \
--conf spark.network.timeout=720s \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 5 --num-executors 20 \
../../${JAR} -first_day ${first_day} -second_day ${second_day} -output01 ${OUTPUT_PATH01} -output02 ${OUTPUT_PATH02} -coalesce 200
if [[ $? -ne 0 ]]; then
exit 255
fi
hadoop distcp -m20 "${OUTPUT_PATH01}/*" "${TMP_EGGPLANTS_OUTPUT_PATH}/${dt_slash_today}/"
hadoop distcp -m20 "${OUTPUT_PATH02}/*" "${ODS_OTHER_DEVICE_DAILY}/${dt_slash_today}/"
: '
任务说明
每天用前一天的安装,如20211111 串 前一天 - 今天的注册事件,如20211111-20211112,eventname限制为event_name = "REGISTRATION",并把串不到注册的设备id 按照 7:3 比例随机分成两个人群包更新
'
\ No newline at end of file
...@@ -283,6 +283,7 @@ TMP_REYUN_LAHUO_LIST_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh ...@@ -283,6 +283,7 @@ TMP_REYUN_LAHUO_LIST_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh
TMP_COM_TOPON_TOPLTV_1015_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com.topon_topltv_1015" TMP_COM_TOPON_TOPLTV_1015_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com.topon_topltv_1015"
TMP_COM_REYUN_PRACTICALTOOL_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com.reyun_practicaltool" TMP_COM_REYUN_PRACTICALTOOL_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com.reyun_practicaltool"
TMP_COM_3APP_XIANJINDAI_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com.3app.xianjindai" TMP_COM_3APP_XIANJINDAI_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com.3app.xianjindai"
TMP_COM_LAZADA_NOREGISTER_70P_30P_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com_lazada_noregister_70p_30p"
RTDMP_COM_EG_ANDROID_ALIPAYGPHONE_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_eg_android_AlipayGphone_reyun" RTDMP_COM_EG_ANDROID_ALIPAYGPHONE_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_eg_android_AlipayGphone_reyun"
RTDMP_COM_TAOBAO_LITETAO_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_taobao_litetao_reyun" RTDMP_COM_TAOBAO_LITETAO_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_taobao_litetao_reyun"
RTDMP_COM_SS_ANDROID_UGC_AWEME_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_ss_android_ugc_aweme_reyun" RTDMP_COM_SS_ANDROID_UGC_AWEME_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_ss_android_ugc_aweme_reyun"
......
package mobvista.dmp.datasource.dm
import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.{concat_ws, lit}
import org.apache.spark.storage.StorageLevel
import java.net.URI
/**
* @author jiangfan
* @date 2021/11/12 14:36
*/
class ComLazadaNoregister70p30p extends CommonSparkJob with Serializable {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("coalesce", true, "[must] coalesce")
options.addOption("output01", true, "[must] output01")
options.addOption("output02", true, "[must] output02")
options.addOption("first_day", true, "[must] first_day")
options.addOption("second_day", true, "[must] second_day")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else printOptions(commandLine)
val coalesce = commandLine.getOptionValue("coalesce")
val output01 = commandLine.getOptionValue("output01")
val output02 = commandLine.getOptionValue("output02")
val first_day = commandLine.getOptionValue("first_day")
val second_day = commandLine.getOptionValue("second_day")
val spark = SparkSession.builder()
.appName("ComLazadaNoregister70p30p")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true)
val yyyy01 = first_day.substring(0, 4)
val mm01 = first_day.substring(4, 6)
val dd01 = first_day.substring(6, 8)
val yyyy02 = second_day.substring(0, 4)
val mm02 = second_day.substring(4, 6)
val dd02 = second_day.substring(6, 8)
var time_limit=""
// 用下列方式取代sql中的concat(yyyy,mm,dd) = 20211111 避免扫描整个hive表的元数据
if(yyyy01==yyyy02 && mm01==mm02)
{ time_limit=s"""yyyy = '${yyyy01}' and mm= '${mm01}' and dd>='${dd01}' and dd<='${dd02}' """}
else
{ time_limit=s"""( (yyyy = '${yyyy01}' and mm= '${mm01}' and dd='${dd01}') or (yyyy = '${yyyy02}' and mm= '${mm02}' and dd='${dd02}') ) """}
println(time_limit)
try {
val sql01 =
s"""
|select
|t1.gaid device_id,cast(ceiling(rand() * 100) as int) num
|from
|(select
|campaign_id,gaid
|from
|dwh.ods_adn_trackingnew_install
|where
|yyyy = '${yyyy01}' and mm= '${mm01}' and dd='${dd01}'
|and
|country_code in ('ID')
|and ext_campaignpackagename = 'com.lazada.android'
|group by campaign_id,gaid) t1
|left join
|(select
|campaign_id,gaid
|from
|dwh.ods_adn_tracking_ss_event
|where
| ${time_limit}
|and
| country in ('ID')
|and
| event_name = 'REGISTRATION'
|group by campaign_id,gaid
|) t2
|on t1.campaign_id = t2.campaign_id and t1.gaid = t2.gaid
|where
|t2.gaid is null
|group by t1.gaid
|""".stripMargin
println("sql01===="+sql01)
val all_df: DataFrame = spark.sql(sql01).persist(StorageLevel.MEMORY_AND_DISK_SER)
val df01_with_package_name = all_df.filter(all_df.col("num")<=30).select(concat_ws("\t", all_df.col("device_id"), lit("gaid"), lit("android"),lit("[\"com.lazada_noregister_30p\"]")))
val df01_with_country = all_df.filter(all_df.col("num")<=30).select(concat_ws("\t", all_df.col("device_id"), lit("gaid"), lit("android"),lit("ID")))
val df02_with_package_name = all_df.filter(all_df.col("num")>30).select(concat_ws("\t", all_df.col("device_id"), lit("gaid"), lit("android"),lit("[\"com.lazada_noregister_70p\"]")))
val df02_with_country = all_df.filter(all_df.col("num")>30).select(concat_ws("\t", all_df.col("device_id"), lit("gaid"), lit("android"),lit("ID")))
df01_with_package_name.union(df02_with_package_name).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
df01_with_country.union(df02_with_country).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)
} finally {
spark.stop()
}
0
}
}
object ComLazadaNoregister70p30p {
def main(args: Array[String]): Unit = {
new ComLazadaNoregister70p30p().run(args)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment