package mobvista.dmp.datasource.dm import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions.{concat_ws, lit} import org.apache.spark.storage.StorageLevel import java.net.URI /** * @author jiangfan * @date 2021/10/15 16:22 */ class ComToponTopltv1015 extends CommonSparkJob with Serializable { override protected def buildOptions(): Options = { val options = new Options options.addOption("coalesce", true, "[must] coalesce") options.addOption("output01", true, "[must] output01") options.addOption("output02", true, "[must] output02") options.addOption("begin_day", true, "[must] begin_day") options.addOption("end_day01", true, "[must] end_day01") options.addOption("end_day02", true, "[must] end_day02") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val coalesce = commandLine.getOptionValue("coalesce") val output01 = commandLine.getOptionValue("output01") val output02 = commandLine.getOptionValue("output02") val begin_day = commandLine.getOptionValue("begin_day") val end_day01 = commandLine.getOptionValue("end_day01") val end_day02 = commandLine.getOptionValue("end_day02") val spark = SparkSession.builder() .appName("ComToponTopltv1015") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() val sc = spark.sparkContext import spark.implicits._ FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true) try { val imei_sql= s""" |select device_id |from |(select device_id,row_number() over (partition by device_id order by LTV desc) as ranking |from |( |select sum(case when imp.imei not in ('0','NULL') then bidprice else NULL end) as LTV,imp.imei as device_id |from |(select *,concat(yyyy,'-',mm,'-',dd) as day |from uparpu_main.uparpu_tk_impression_v2 |where concat(yyyy,'-',mm,'-',dd) >= '${begin_day}' |and concat(yyyy,'-',mm,'-',dd) <= '${end_day02}' |) as imp |inner join |(select new_device_id,dt |from uparpu_main.uparpu_new_user |where dt >= '${begin_day}' |and dt <= '${end_day01}' |) as uu |on imp.upid = uu.new_device_id and datediff(imp.day,uu.dt)<7 |group by imp.imei |) as t |) as t2 |where ranking/(select count(device_id) |from |( |select sum(case when imp.imei not in ('0','NULL') then bidprice else NULL end) as LTV,imp.imei as device_id |from |(select *,concat(yyyy,'-',mm,'-',dd) as day |from uparpu_main.uparpu_tk_impression_v2 |where concat(yyyy,'-',mm,'-',dd) >= '${begin_day}' |and concat(yyyy,'-',mm,'-',dd) <= '${end_day02}' |) as imp |inner join |(select new_device_id,dt |from uparpu_main.uparpu_new_user |where dt >= '${begin_day}' |and dt <= '${end_day01}' |) as uu |on imp.upid = uu.new_device_id and datediff(imp.day,uu.dt)<7 |group by imp.imei |) as t)<0.3 """.stripMargin val oaid_sql = s""" |select device_id |from |(select device_id,row_number() over (partition by device_id order by LTV desc) as ranking |from |( |select sum(case when imp.oaid not in ('0','NULL') then bidprice else NULL end) as LTV,imp.oaid as device_id |from |(select *,concat(yyyy,'-',mm,'-',dd) as day |from uparpu_main.uparpu_tk_impression_v2 |where concat(yyyy,'-',mm,'-',dd) >= '${begin_day}' |and concat(yyyy,'-',mm,'-',dd) <= '${end_day02}' |) as imp |inner join |(select new_device_id,dt |from uparpu_main.uparpu_new_user |where dt >= '${begin_day}' |and dt <= '${end_day01}' |) as uu |on imp.upid = uu.new_device_id and datediff(imp.day,uu.dt)<7 |group by imp.oaid |) as t |) as t2 |where ranking/(select count(device_id) |from |( |select sum(case when imp.oaid not in ('0','NULL') then bidprice else NULL end) as LTV,imp.oaid as device_id |from |(select *,concat(yyyy,'-',mm,'-',dd) as day |from uparpu_main.uparpu_tk_impression_v2 |where concat(yyyy,'-',mm,'-',dd) >= '${begin_day}' |and concat(yyyy,'-',mm,'-',dd) <= '${end_day02}' |) as imp |inner join |(select new_device_id,dt |from uparpu_main.uparpu_new_user |where dt >= '${begin_day}' |and dt <= '${end_day01}' |) as uu |on imp.upid = uu.new_device_id and datediff(imp.day,uu.dt)<7 |group by imp.oaid |) as t)<0.3 |""".stripMargin println("imei_sql=="+imei_sql) println("oaid_sql=="+oaid_sql) val imei_df: DataFrame = spark.sql(imei_sql).persist(StorageLevel.MEMORY_AND_DISK_SER) val imei_df_with_package_name = imei_df.select(concat_ws("\t", imei_df.col("device_id"), lit("imei"), lit("android"),lit("[\"com.topon_topltv_1015\"]"))) val imei_df_with_country = imei_df.select(concat_ws("\t", imei_df.col("device_id"), lit("imei"), lit("android"),lit("CN"))) val oaid_df: DataFrame = spark.sql(oaid_sql).persist(StorageLevel.MEMORY_AND_DISK_SER) val oaid_df_with_package_name = oaid_df.select(concat_ws("\t", oaid_df.col("device_id"), lit("oaid"), lit("android"),lit("[\"com.topon_topltv_1015\"]"))) val oaid_df_with_country = oaid_df.select(concat_ws("\t", oaid_df.col("device_id"), lit("oaid"), lit("android"),lit("CN"))) imei_df_with_package_name.union(oaid_df_with_package_name).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01) imei_df_with_country.union(oaid_df_with_country).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02) } finally { spark.stop() } 0 } } object ComToponTopltv1015 { def main(args: Array[String]): Unit = { new ComToponTopltv1015().run(args) } }