BtopTiktokrv.scala 7.9 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
package mobvista.dmp.datasource.dm

import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.{concat_ws, lit}
import org.apache.spark.storage.StorageLevel

import java.net.URI


class BtopTiktokrv extends CommonSparkJob with Serializable {
  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("output01", true, "[must] output01")
    options.addOption("output02", true, "[must] output02")
    options.addOption("begin_day", true, "[must] begin_day")
    options.addOption("begin_day02", true, "[must] begin_day02")
    options.addOption("end_day", true, "[must] end_day")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val coalesce = commandLine.getOptionValue("coalesce")
    val output01 = commandLine.getOptionValue("output01")
    val output02 = commandLine.getOptionValue("output02")
    val begin_day = commandLine.getOptionValue("begin_day")
    val begin_day02 = commandLine.getOptionValue("begin_day02")
    val end_day = commandLine.getOptionValue("end_day")

    val spark = SparkSession.builder()
      .appName("BtopTiktokrv")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    val sc = spark.sparkContext
    import spark.implicits._

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true)

    try {
//      val sql1=
//        s"""
//           |select imei,avg(bidprice) num from uparpu_main.uparpu_tk_request_v2 where
//           |concat(yyyy,mm,dd)<='${end_day}' and  concat(yyyy,mm,dd)>='${begin_day}'
//           |and bidtype=0 and format='1' and os_platform=1 and country_code='CN' and nw_firm_id =15 and imei rlike '${imeiPtn}'
//           |group by imei
//        """.stripMargin
//
//      val df01: DataFrame = spark.sql(sql1).persist(StorageLevel.MEMORY_AND_DISK_SER)
//      df01.createOrReplaceTempView("tmp_etl_imei")
//
//      val imeiNum: Double = spark.sql("select percentile(int(num*100),0.7) from tmp_etl_imei").rdd.map(_.mkString).take(1)(0).toDouble / 100
//      println("imeiNum======="+imeiNum)
//
//      val sql2=
//        s"""
//           |select android_id,avg(bidprice) num from uparpu_main.uparpu_tk_request_v2 where
//           |concat(yyyy,mm,dd)<='${end_day}' and  concat(yyyy,mm,dd)>='${begin_day}'
//           |and bidtype=0 and format='1' and os_platform=1 and country_code='CN' and nw_firm_id =15 and android_id rlike '${andriodIdPtn}'
//           |group by android_id
//        """.stripMargin
//
//      val df02: DataFrame = spark.sql(sql2).persist(StorageLevel.MEMORY_AND_DISK_SER)
//      df02.createOrReplaceTempView("tmp_etl_android_id")
//
//      val android_idNum: Double = spark.sql("select percentile(int(num*100),0.7) from tmp_etl_android_id").rdd.map(_.mkString).take(1)(0).toDouble / 100
//      println("android_idNum======="+android_idNum)
//
//      val imei_high_df = df01.filter(df01.col("num")>imeiNum).select(concat_ws("\t", df01.col("imei"),  lit("imei"),  lit("android"),lit("[\"com.btop_tiktokrv_highvalue15\"]")))
//      val imei_low_df = df01.filter(df01.col("num")<=imeiNum).select(concat_ws("\t", df01.col("imei"),  lit("imei"),  lit("android"),lit("[\"com.btop_tiktokrv_15\"]")))
//      val imei_all_df = df01.select(concat_ws("\t", df01.col("imei"),  lit("imei"),  lit("android"),lit("CN")))
//
//      val android_id_high_df = df02.filter(df02.col("num")>android_idNum).select(concat_ws("\t", df02.col("android_id"),  lit("android_id"),  lit("android"),lit("[\"com.btop_tiktokrv_highvalue15\"]")))
//      val android_id_low_df = df02.filter(df02.col("num")<=android_idNum).select(concat_ws("\t", df02.col("android_id"),  lit("android_id"),  lit("android"),lit("[\"com.btop_tiktokrv_15\"]")))
//      val android_id_all_df = df02.select(concat_ws("\t", df02.col("android_id"),  lit("android_id"),  lit("android"),lit("CN")))


//      imei_high_df.union(imei_low_df).union(android_id_high_df).union(android_id_low_df).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
//      imei_all_df.union(android_id_all_df).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)


      val sql3=
        s"""
           |select imei,avg(bidprice) num from uparpu_main.uparpu_tk_request_v2 where
           |concat(yyyy,mm,dd)<='${end_day}' and  concat(yyyy,mm,dd)>='${begin_day02}'
           |and bidtype=0 and format='1' and os_platform=1 and country_code='CN' and nw_firm_id =15 and imei rlike '${imeiPtn}'
           |group by imei
        """.stripMargin

      val df03: DataFrame = spark.sql(sql3).persist(StorageLevel.MEMORY_AND_DISK_SER)
      df03.createOrReplaceTempView("tmp_etl_imei_30_days")

      val imei_30_Days_Num: Double = spark.sql("select percentile(int(num*100),0.5) from tmp_etl_imei_30_days").rdd.map(_.mkString).take(1)(0).toDouble / 100
      println("imei_30_Days_Num======="+imei_30_Days_Num)


      val sql4=
        s"""
           |select oaid,avg(bidprice) num from uparpu_main.uparpu_tk_request_v2 where
           |concat(yyyy,mm,dd)<='${end_day}' and  concat(yyyy,mm,dd)>='${begin_day02}'
           |and bidtype=0 and format='1' and os_platform=1 and country_code='CN' and nw_firm_id =15 and ( oaid rlike '${oaidAnotherPtn}' or   oaid rlike '${didPtn}' )
           |group by oaid
        """.stripMargin

      val df04: DataFrame = spark.sql(sql4).persist(StorageLevel.MEMORY_AND_DISK_SER)
      df04.createOrReplaceTempView("tmp_etl_oaid")

      val oaidNum: Double = spark.sql("select percentile(int(num*100),0.5) from tmp_etl_oaid").rdd.map(_.mkString).take(1)(0).toDouble / 100
      println("oaidNum======="+oaidNum)


      val imei_30_Days_high_df = df03.filter(df03.col("num")>imei_30_Days_Num).select(concat_ws("\t", df03.col("imei"),  lit("imei"),  lit("android"),lit("[\"com.btop_tiktokrv_highvalue30_p50\"]")))
      val imei_30_Days_high_df_with_country = df03.filter(df03.col("num")>imei_30_Days_Num).select(concat_ws("\t", df03.col("imei"),  lit("imei"),  lit("android"),lit("CN")))

      val oaid_high_df = df04.filter(df04.col("num")>oaidNum).select(concat_ws("\t", df04.col("oaid"),  lit("oaid"),  lit("android"),lit("[\"com.btop_tiktokrv_highvalue30_p50\"]")))
      val oaid_high_df_with_country = df04.filter(df04.col("num")>oaidNum).select(concat_ws("\t", df04.col("oaid"),  lit("oaid"),  lit("android"),lit("CN")))


//      imei_high_df.union(imei_low_df).union(android_id_high_df).union(android_id_low_df).union(imei_30_Days_high_df).union(oaid_high_df).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
//      imei_all_df.union(android_id_all_df).union(imei_30_Days_high_df_with_country).union(oaid_high_df_with_country).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)
      imei_30_Days_high_df.union(oaid_high_df).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
      imei_30_Days_high_df_with_country.union(oaid_high_df_with_country).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)

    } finally {
      spark.stop()
    }
    0
  }
}

object BtopTiktokrv {
  def main(args: Array[String]): Unit = {
    new BtopTiktokrv().run(args)
  }
}