package mobvista.dmp.datasource.dm import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions.{concat_ws, lit} import org.apache.spark.storage.StorageLevel import java.net.URI /** * @author jiangfan * @date 2021/8/19 11:11 */ class BtopTiktokrvGaid extends CommonSparkJob with Serializable { override protected def buildOptions(): Options = { val options = new Options options.addOption("coalesce", true, "[must] coalesce") options.addOption("output01", true, "[must] output01") options.addOption("output02", true, "[must] output02") options.addOption("output03", true, "[must] output03") options.addOption("begin_day", true, "[must] begin_day") options.addOption("begin_day02", true, "[must] begin_day02") options.addOption("end_day", true, "[must] end_day") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val coalesce = commandLine.getOptionValue("coalesce") val output01 = commandLine.getOptionValue("output01") val output02 = commandLine.getOptionValue("output02") val output03 = commandLine.getOptionValue("output03") val begin_day = commandLine.getOptionValue("begin_day") val begin_day02 = commandLine.getOptionValue("begin_day02") val end_day = commandLine.getOptionValue("end_day") val spark = SparkSession.builder() .appName("BtopTiktokrvGaid") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() val sc = spark.sparkContext import spark.implicits._ FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output03), true) try { val sql5= s""" |select gaid,avg(bidprice) num from uparpu_main.uparpu_tk_request_v2 where |concat(yyyy,mm,dd)<='${end_day}' and concat(yyyy,mm,dd)>='${begin_day02}' |and bidtype=0 and format='1' and os_platform=1 and country_code='CN' and nw_firm_id =15 and gaid !='${allZero}' |group by gaid """.stripMargin val df05: DataFrame = spark.sql(sql5).persist(StorageLevel.MEMORY_AND_DISK_SER) df05.createOrReplaceTempView("tmp_etl_gaid") val gaidNum: Double = spark.sql("select percentile(int(num*100),0.5) from tmp_etl_gaid").rdd.map(_.mkString).take(1)(0).toDouble / 100 println("gaidNum======="+gaidNum) val gaid_high_df = df05.filter(df05.col("num")>gaidNum).select(concat_ws("\t", df05.col("gaid"), lit("gaid"), lit("android"),lit("[\"com.btop_tiktokrv_highvalue30_p50_gaid\"]"))) val gaid_high_df_with_country = df05.filter(df05.col("num")>gaidNum).select(concat_ws("\t", df05.col("gaid"), lit("gaid"), lit("android"),lit("CN"))) gaid_high_df.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01) gaid_high_df_with_country.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02) df05.filter(df05.col("num")>gaidNum).select("gaid").coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output03) } finally { spark.stop() } 0 } } object BtopTiktokrvGaid { def main(args: Array[String]): Unit = { new BtopTiktokrvGaid().run(args) } }