Commit 1e4fa068 by fan.jiang

fix bug com.btop_tiktokrv_highvalue30_p50_gaid

parent a06ca89e
type=command
retries=3
dependencies=btop_tiktokrv_gaid
command=sh -x btop_tiktokrv.sh
\ No newline at end of file
type=command
command=sh -x btop_tiktokrv_gaid.sh
\ No newline at end of file
#!/usr/bin/env bash
source ../../dmp_env.sh
begin_day=$(date -d "$ScheduleTime 15 days ago" +"%Y%m%d")
begin_day02=$(date -d "$ScheduleTime 30 days ago" +"%Y%m%d")
end_day=$(date -d "$ScheduleTime 1 days ago" +"%Y%m%d")
dt_dash_today=$(date -d "$ScheduleTime 1 days ago" +"%Y/%m/%d")
check_await "${TMP_EGGPLANTS_OUTPUT_PATH}/${dt_dash_today}"
hadoop fs -test -e "${ODS_OTHER_DEVICE_DAILY}/${dt_dash_today}"
if [ $? -ne 0 ];then
hadoop fs -mkdir -p "${ODS_OTHER_DEVICE_DAILY}/${dt_dash_today}"
fi
OUTPUT_PATH01="${TMP_COM_BTOP_TIKTOKRV_GAID_PATH}/${dt_dash_today}/01"
OUTPUT_PATH02="${TMP_COM_BTOP_TIKTOKRV_GAID_PATH}/${dt_dash_today}/02"
hadoop fs -rm -r "${OUTPUT_PATH01}"
hadoop fs -rm -r "${OUTPUT_PATH02}"
spark-submit --class mobvista.dmp.datasource.dm.BtopTiktokrvGaid \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.default.parallelism=3000 \
--conf spark.sql.shuffle.partitions=3000 \
--conf spark.driver.maxResultSize=4g \
--conf spark.network.timeout=720s \
--files ${HIVE_SITE_PATH} \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 6 --num-executors 70 \
../../${JAR} -begin_day ${begin_day} -begin_day02 ${begin_day02} -end_day ${end_day} -output01 ${OUTPUT_PATH01} -output02 ${OUTPUT_PATH02} -coalesce 200
if [[ $? -ne 0 ]]; then
exit 255
fi
hadoop distcp -m20 "${OUTPUT_PATH01}/*" "${TMP_EGGPLANTS_OUTPUT_PATH}/${dt_dash_today}/"
hadoop distcp -m20 "${OUTPUT_PATH02}/*" "${ODS_OTHER_DEVICE_DAILY}/${dt_dash_today}/"
......@@ -259,6 +259,8 @@ TMP_INTEREST_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/dm_
TMP_COM_YOUKU_PHONE_WAX_NOBID_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com_youku_phone_wax_nobid"
TMP_COM_BTOP_TIKTOKRV_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com_btop_tiktokrv"
TMP_COM_BTOP_TIKTOKRV_GAID_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/com_btop_tiktokrv_gaid"
RTDMP_COM_EG_ANDROID_ALIPAYGPHONE_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_eg_android_AlipayGphone_reyun"
RTDMP_NORMAL_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_normal"
RTDMP_NORMAL_COUNT_RESULT="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_normal_count_result"
......
......@@ -54,45 +54,45 @@ class BtopTiktokrv extends CommonSparkJob with Serializable {
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true)
try {
// val sql1=
// s"""
// |select imei,avg(bidprice) num from uparpu_main.uparpu_tk_request_v2 where
// |concat(yyyy,mm,dd)<='${end_day}' and concat(yyyy,mm,dd)>='${begin_day}'
// |and bidtype=0 and format='1' and os_platform=1 and country_code='CN' and nw_firm_id =15 and imei rlike '${imeiPtn}'
// |group by imei
// """.stripMargin
//
// val df01: DataFrame = spark.sql(sql1).persist(StorageLevel.MEMORY_AND_DISK_SER)
// df01.createOrReplaceTempView("tmp_etl_imei")
//
// val imeiNum: Double = spark.sql("select percentile(int(num*100),0.7) from tmp_etl_imei").rdd.map(_.mkString).take(1)(0).toDouble / 100
// println("imeiNum======="+imeiNum)
//
// val sql2=
// s"""
// |select android_id,avg(bidprice) num from uparpu_main.uparpu_tk_request_v2 where
// |concat(yyyy,mm,dd)<='${end_day}' and concat(yyyy,mm,dd)>='${begin_day}'
// |and bidtype=0 and format='1' and os_platform=1 and country_code='CN' and nw_firm_id =15 and android_id rlike '${andriodIdPtn}'
// |group by android_id
// """.stripMargin
//
// val df02: DataFrame = spark.sql(sql2).persist(StorageLevel.MEMORY_AND_DISK_SER)
// df02.createOrReplaceTempView("tmp_etl_android_id")
//
// val android_idNum: Double = spark.sql("select percentile(int(num*100),0.7) from tmp_etl_android_id").rdd.map(_.mkString).take(1)(0).toDouble / 100
// println("android_idNum======="+android_idNum)
//
// val imei_high_df = df01.filter(df01.col("num")>imeiNum).select(concat_ws("\t", df01.col("imei"), lit("imei"), lit("android"),lit("[\"com.btop_tiktokrv_highvalue15\"]")))
// val imei_low_df = df01.filter(df01.col("num")<=imeiNum).select(concat_ws("\t", df01.col("imei"), lit("imei"), lit("android"),lit("[\"com.btop_tiktokrv_15\"]")))
// val imei_all_df = df01.select(concat_ws("\t", df01.col("imei"), lit("imei"), lit("android"),lit("CN")))
//
// val android_id_high_df = df02.filter(df02.col("num")>android_idNum).select(concat_ws("\t", df02.col("android_id"), lit("android_id"), lit("android"),lit("[\"com.btop_tiktokrv_highvalue15\"]")))
// val android_id_low_df = df02.filter(df02.col("num")<=android_idNum).select(concat_ws("\t", df02.col("android_id"), lit("android_id"), lit("android"),lit("[\"com.btop_tiktokrv_15\"]")))
// val android_id_all_df = df02.select(concat_ws("\t", df02.col("android_id"), lit("android_id"), lit("android"),lit("CN")))
// imei_high_df.union(imei_low_df).union(android_id_high_df).union(android_id_low_df).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
// imei_all_df.union(android_id_all_df).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)
// val sql1=
// s"""
// |select imei,avg(bidprice) num from uparpu_main.uparpu_tk_request_v2 where
// |concat(yyyy,mm,dd)<='${end_day}' and concat(yyyy,mm,dd)>='${begin_day}'
// |and bidtype=0 and format='1' and os_platform=1 and country_code='CN' and nw_firm_id =15 and imei rlike '${imeiPtn}'
// |group by imei
// """.stripMargin
//
// val df01: DataFrame = spark.sql(sql1).persist(StorageLevel.MEMORY_AND_DISK_SER)
// df01.createOrReplaceTempView("tmp_etl_imei")
//
// val imeiNum: Double = spark.sql("select percentile(int(num*100),0.7) from tmp_etl_imei").rdd.map(_.mkString).take(1)(0).toDouble / 100
// println("imeiNum======="+imeiNum)
//
// val sql2=
// s"""
// |select android_id,avg(bidprice) num from uparpu_main.uparpu_tk_request_v2 where
// |concat(yyyy,mm,dd)<='${end_day}' and concat(yyyy,mm,dd)>='${begin_day}'
// |and bidtype=0 and format='1' and os_platform=1 and country_code='CN' and nw_firm_id =15 and android_id rlike '${andriodIdPtn}'
// |group by android_id
// """.stripMargin
//
// val df02: DataFrame = spark.sql(sql2).persist(StorageLevel.MEMORY_AND_DISK_SER)
// df02.createOrReplaceTempView("tmp_etl_android_id")
//
// val android_idNum: Double = spark.sql("select percentile(int(num*100),0.7) from tmp_etl_android_id").rdd.map(_.mkString).take(1)(0).toDouble / 100
// println("android_idNum======="+android_idNum)
//
// val imei_high_df = df01.filter(df01.col("num")>imeiNum).select(concat_ws("\t", df01.col("imei"), lit("imei"), lit("android"),lit("[\"com.btop_tiktokrv_highvalue15\"]")))
// val imei_low_df = df01.filter(df01.col("num")<=imeiNum).select(concat_ws("\t", df01.col("imei"), lit("imei"), lit("android"),lit("[\"com.btop_tiktokrv_15\"]")))
// val imei_all_df = df01.select(concat_ws("\t", df01.col("imei"), lit("imei"), lit("android"),lit("CN")))
//
// val android_id_high_df = df02.filter(df02.col("num")>android_idNum).select(concat_ws("\t", df02.col("android_id"), lit("android_id"), lit("android"),lit("[\"com.btop_tiktokrv_highvalue15\"]")))
// val android_id_low_df = df02.filter(df02.col("num")<=android_idNum).select(concat_ws("\t", df02.col("android_id"), lit("android_id"), lit("android"),lit("[\"com.btop_tiktokrv_15\"]")))
// val android_id_all_df = df02.select(concat_ws("\t", df02.col("android_id"), lit("android_id"), lit("android"),lit("CN")))
// imei_high_df.union(imei_low_df).union(android_id_high_df).union(android_id_low_df).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
// imei_all_df.union(android_id_all_df).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)
val sql3=
......@@ -124,19 +124,6 @@ class BtopTiktokrv extends CommonSparkJob with Serializable {
val oaidNum: Double = spark.sql("select percentile(int(num*100),0.5) from tmp_etl_oaid").rdd.map(_.mkString).take(1)(0).toDouble / 100
println("oaidNum======="+oaidNum)
val sql5=
s"""
|select gaid,avg(bidprice) num from uparpu_main.uparpu_tk_request_v2 where
|concat(yyyy,mm,dd)<='${end_day}' and concat(yyyy,mm,dd)>='${begin_day02}'
|and bidtype=0 and format='1' and os_platform=1 and country_code='CN' and nw_firm_id =15 and gaid !='${allZero}'
|group by gaid
""".stripMargin
val df05: DataFrame = spark.sql(sql5).persist(StorageLevel.MEMORY_AND_DISK_SER)
df05.createOrReplaceTempView("tmp_etl_gaid")
val gaidNum: Double = spark.sql("select percentile(int(num*100),0.5) from tmp_etl_gaid").rdd.map(_.mkString).take(1)(0).toDouble / 100
println("gaidNum======="+gaidNum)
val imei_30_Days_high_df = df03.filter(df03.col("num")>imei_30_Days_Num).select(concat_ws("\t", df03.col("imei"), lit("imei"), lit("android"),lit("[\"com.btop_tiktokrv_highvalue30_p50\"]")))
val imei_30_Days_high_df_with_country = df03.filter(df03.col("num")>imei_30_Days_Num).select(concat_ws("\t", df03.col("imei"), lit("imei"), lit("android"),lit("CN")))
......@@ -144,14 +131,11 @@ class BtopTiktokrv extends CommonSparkJob with Serializable {
val oaid_high_df = df04.filter(df04.col("num")>oaidNum).select(concat_ws("\t", df04.col("oaid"), lit("oaid"), lit("android"),lit("[\"com.btop_tiktokrv_highvalue30_p50\"]")))
val oaid_high_df_with_country = df04.filter(df04.col("num")>oaidNum).select(concat_ws("\t", df04.col("oaid"), lit("oaid"), lit("android"),lit("CN")))
val gaid_high_df = df05.filter(df05.col("num")>gaidNum).select(concat_ws("\t", df05.col("gaid"), lit("gaid"), lit("android"),lit("[\"com.btop_tiktokrv_highvalue30_p50_gaid\"]")))
val gaid_high_df_with_country = df05.filter(df05.col("num")>gaidNum).select(concat_ws("\t", df05.col("gaid"), lit("gaid"), lit("android"),lit("CN")))
// imei_high_df.union(imei_low_df).union(android_id_high_df).union(android_id_low_df).union(imei_30_Days_high_df).union(oaid_high_df).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
// imei_all_df.union(android_id_all_df).union(imei_30_Days_high_df_with_country).union(oaid_high_df_with_country).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)
imei_30_Days_high_df.union(oaid_high_df).union(gaid_high_df).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
imei_30_Days_high_df_with_country.union(oaid_high_df_with_country).union(gaid_high_df_with_country).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)
// imei_high_df.union(imei_low_df).union(android_id_high_df).union(android_id_low_df).union(imei_30_Days_high_df).union(oaid_high_df).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
// imei_all_df.union(android_id_all_df).union(imei_30_Days_high_df_with_country).union(oaid_high_df_with_country).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)
imei_30_Days_high_df.union(oaid_high_df).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
imei_30_Days_high_df_with_country.union(oaid_high_df_with_country).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)
} finally {
spark.stop()
......
package mobvista.dmp.datasource.dm
import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.{concat_ws, lit}
import org.apache.spark.storage.StorageLevel
import java.net.URI
/**
* @author jiangfan
* @date 2021/8/19 11:11
*/
class BtopTiktokrvGaid extends CommonSparkJob with Serializable {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("coalesce", true, "[must] coalesce")
options.addOption("output01", true, "[must] output01")
options.addOption("output02", true, "[must] output02")
options.addOption("begin_day", true, "[must] begin_day")
options.addOption("begin_day02", true, "[must] begin_day02")
options.addOption("end_day", true, "[must] end_day")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else printOptions(commandLine)
val coalesce = commandLine.getOptionValue("coalesce")
val output01 = commandLine.getOptionValue("output01")
val output02 = commandLine.getOptionValue("output02")
val begin_day = commandLine.getOptionValue("begin_day")
val begin_day02 = commandLine.getOptionValue("begin_day02")
val end_day = commandLine.getOptionValue("end_day")
val spark = SparkSession.builder()
.appName("BtopTiktokrvGaid")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true)
try {
val sql5=
s"""
|select gaid,avg(bidprice) num from uparpu_main.uparpu_tk_request_v2 where
|concat(yyyy,mm,dd)<='${end_day}' and concat(yyyy,mm,dd)>='${begin_day02}'
|and bidtype=0 and format='1' and os_platform=1 and country_code='CN' and nw_firm_id =15 and gaid !='${allZero}'
|group by gaid
""".stripMargin
val df05: DataFrame = spark.sql(sql5).persist(StorageLevel.MEMORY_AND_DISK_SER)
df05.createOrReplaceTempView("tmp_etl_gaid")
val gaidNum: Double = spark.sql("select percentile(int(num*100),0.5) from tmp_etl_gaid").rdd.map(_.mkString).take(1)(0).toDouble / 100
println("gaidNum======="+gaidNum)
val gaid_high_df = df05.filter(df05.col("num")>gaidNum).select(concat_ws("\t", df05.col("gaid"), lit("gaid"), lit("android"),lit("[\"com.btop_tiktokrv_highvalue30_p50_gaid\"]")))
val gaid_high_df_with_country = df05.filter(df05.col("num")>gaidNum).select(concat_ws("\t", df05.col("gaid"), lit("gaid"), lit("android"),lit("CN")))
gaid_high_df.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
gaid_high_df_with_country.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)
} finally {
spark.stop()
}
0
}
}
object BtopTiktokrvGaid {
def main(args: Array[String]): Unit = {
new BtopTiktokrvGaid().run(args)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment