Commit a06ca89e by fan.jiang

com.btop_tiktokrv_highvalue30_p50_gaid

parent f2a6ea79
...@@ -124,6 +124,19 @@ class BtopTiktokrv extends CommonSparkJob with Serializable { ...@@ -124,6 +124,19 @@ class BtopTiktokrv extends CommonSparkJob with Serializable {
val oaidNum: Double = spark.sql("select percentile(int(num*100),0.5) from tmp_etl_oaid").rdd.map(_.mkString).take(1)(0).toDouble / 100 val oaidNum: Double = spark.sql("select percentile(int(num*100),0.5) from tmp_etl_oaid").rdd.map(_.mkString).take(1)(0).toDouble / 100
println("oaidNum======="+oaidNum) println("oaidNum======="+oaidNum)
val sql5=
s"""
|select gaid,avg(bidprice) num from uparpu_main.uparpu_tk_request_v2 where
|concat(yyyy,mm,dd)<='${end_day}' and concat(yyyy,mm,dd)>='${begin_day02}'
|and bidtype=0 and format='1' and os_platform=1 and country_code='CN' and nw_firm_id =15 and gaid !='${allZero}'
|group by gaid
""".stripMargin
val df05: DataFrame = spark.sql(sql5).persist(StorageLevel.MEMORY_AND_DISK_SER)
df05.createOrReplaceTempView("tmp_etl_gaid")
val gaidNum: Double = spark.sql("select percentile(int(num*100),0.5) from tmp_etl_gaid").rdd.map(_.mkString).take(1)(0).toDouble / 100
println("gaidNum======="+gaidNum)
val imei_30_Days_high_df = df03.filter(df03.col("num")>imei_30_Days_Num).select(concat_ws("\t", df03.col("imei"), lit("imei"), lit("android"),lit("[\"com.btop_tiktokrv_highvalue30_p50\"]"))) val imei_30_Days_high_df = df03.filter(df03.col("num")>imei_30_Days_Num).select(concat_ws("\t", df03.col("imei"), lit("imei"), lit("android"),lit("[\"com.btop_tiktokrv_highvalue30_p50\"]")))
val imei_30_Days_high_df_with_country = df03.filter(df03.col("num")>imei_30_Days_Num).select(concat_ws("\t", df03.col("imei"), lit("imei"), lit("android"),lit("CN"))) val imei_30_Days_high_df_with_country = df03.filter(df03.col("num")>imei_30_Days_Num).select(concat_ws("\t", df03.col("imei"), lit("imei"), lit("android"),lit("CN")))
...@@ -131,11 +144,14 @@ class BtopTiktokrv extends CommonSparkJob with Serializable { ...@@ -131,11 +144,14 @@ class BtopTiktokrv extends CommonSparkJob with Serializable {
val oaid_high_df = df04.filter(df04.col("num")>oaidNum).select(concat_ws("\t", df04.col("oaid"), lit("oaid"), lit("android"),lit("[\"com.btop_tiktokrv_highvalue30_p50\"]"))) val oaid_high_df = df04.filter(df04.col("num")>oaidNum).select(concat_ws("\t", df04.col("oaid"), lit("oaid"), lit("android"),lit("[\"com.btop_tiktokrv_highvalue30_p50\"]")))
val oaid_high_df_with_country = df04.filter(df04.col("num")>oaidNum).select(concat_ws("\t", df04.col("oaid"), lit("oaid"), lit("android"),lit("CN"))) val oaid_high_df_with_country = df04.filter(df04.col("num")>oaidNum).select(concat_ws("\t", df04.col("oaid"), lit("oaid"), lit("android"),lit("CN")))
val gaid_high_df = df05.filter(df05.col("num")>gaidNum).select(concat_ws("\t", df05.col("gaid"), lit("gaid"), lit("android"),lit("[\"com.btop_tiktokrv_highvalue30_p50_gaid\"]")))
val gaid_high_df_with_country = df05.filter(df05.col("num")>gaidNum).select(concat_ws("\t", df05.col("gaid"), lit("gaid"), lit("android"),lit("CN")))
// imei_high_df.union(imei_low_df).union(android_id_high_df).union(android_id_low_df).union(imei_30_Days_high_df).union(oaid_high_df).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01) // imei_high_df.union(imei_low_df).union(android_id_high_df).union(android_id_low_df).union(imei_30_Days_high_df).union(oaid_high_df).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
// imei_all_df.union(android_id_all_df).union(imei_30_Days_high_df_with_country).union(oaid_high_df_with_country).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02) // imei_all_df.union(android_id_all_df).union(imei_30_Days_high_df_with_country).union(oaid_high_df_with_country).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)
imei_30_Days_high_df.union(oaid_high_df).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01) imei_30_Days_high_df.union(oaid_high_df).union(gaid_high_df).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
imei_30_Days_high_df_with_country.union(oaid_high_df_with_country).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02) imei_30_Days_high_df_with_country.union(oaid_high_df_with_country).union(gaid_high_df_with_country).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)
} finally { } finally {
spark.stop() spark.stop()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment