ShinnyPackageNames.scala 6.94 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
package mobvista.dmp.datasource.dm

import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.{concat_ws, lit}
import org.apache.spark.storage.StorageLevel

import java.net.URI

/**
 * @author jiangfan
 * @date 2021/4/6 15:04
 */

class ShinnyPackageNames extends CommonSparkJob with Serializable {
  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("output01", true, "[must] output01")
    options.addOption("output02", true, "[must] output02")
    options.addOption("dt_today", true, "[must] dt_today")
    options.addOption("update", true, "[must] update")
    options.addOption("Shinny_Package_Names", true, "[must] Shinny_Package_Names")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val coalesce = commandLine.getOptionValue("coalesce")
    val output01 = commandLine.getOptionValue("output01")
    val output02 = commandLine.getOptionValue("output02")
    val dt_today = commandLine.getOptionValue("dt_today")
    val update = commandLine.getOptionValue("update")
    val Shinny_Package_Names = commandLine.getOptionValue("Shinny_Package_Names")

    val spark = SparkSession.builder()
      .appName("ShinnyPackageNames")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    val sc = spark.sparkContext
    import spark.implicits._

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true)

    try {
//      开发过程中遇到下述问题
//      org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 byte(s) of direct memory (used: 5251268608, max: 5255331840)
//      解决方法,在spark命令行启动中添加配置  --conf spark.reducer.maxBlocksInFlightPerAddress=3
//      参考网页 https://stackoverflow.com/questions/60808693/spark-shuffle-memory-error-failed-to-allocate-direct-memory
      val package_names_rdd = sc.textFile(Shinny_Package_Names)
      val package_names_df: DataFrame = package_names_rdd.toDF("pkg")
      package_names_df.createOrReplaceTempView("package_names_tmp")
      val iosPtn = "^[0-9]+$"
      spark.sql(s"""select pkg from package_names_tmp where pkg not rlike '${iosPtn}'""").createOrReplaceTempView("android_package_names_tmp")
      spark.sql(s"""select pkg from package_names_tmp where pkg  rlike '${iosPtn}'""").createOrReplaceTempView("ios_package_names_tmp")
      spark.sql("select pkg from ios_package_names_tmp").show(1000,truncate = false)

      val sql1=
        s"""
           |select
           |   device_id,device_type,platform,installnum
           |from
           |(select
           | device_id,device_type,platform,count(1) as installnum
           |from
           |  dwh.dm_install_list_v2
           |where
           |  dt = "${dt_today}"  and  update_date >= "${update}" and device_type='gaid'
           |and
           |  package_name in  (select pkg from android_package_names_tmp)
           |group by device_id,device_type,platform) a
           |where installnum>=2
        """.stripMargin
      val df01: DataFrame = spark.sql(sql1).persist(StorageLevel.MEMORY_AND_DISK_SER)
      val android_2_df = df01.filter(df01.col("installnum")>=2 && df01.col("platform").equalTo("android")).select(concat_ws("\t", df01.col("device_id"),  df01.col("device_type"),  lit("android"),lit("[\"shinny.2\"]")))
      val android_3_df = df01.filter(df01.col("installnum")>=3 && df01.col("platform").equalTo("android")).select(concat_ws("\t", df01.col("device_id"),  df01.col("device_type"),  lit("android"),lit("[\"shinny.3\"]")))
      val android_4_df = df01.filter(df01.col("installnum")>=4 && df01.col("platform").equalTo("android")).select(concat_ws("\t", df01.col("device_id"),  df01.col("device_type"),  lit("android"),lit("[\"shinny.4\"]")))
      val android_df_with_country =  df01.filter(df01.col("installnum")>=2 && df01.col("platform").equalTo("android")).select(concat_ws("\t", df01.col("device_id"),  df01.col("device_type"),  lit("android"),lit("UNKNOWN")))
      android_2_df.union(android_3_df).union(android_4_df).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
      android_df_with_country.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)
//      安卓和ios的数据分开跑的
//      val sql2=
//        s"""
//           |select
//           |   device_id,device_type,platform,installnum
//           |from
//           |(select
//           | device_id,device_type,platform,count(1) as installnum
//           |from
//           |  dwh.dm_install_list_v2
//           |where
//           |  dt = "${dt_today}"  and  update_date >= "${update}" and device_type='idfa' and platform='ios'
//           |and
//           |  package_name in  (select pkg from ios_package_names_tmp)
//           |group by device_id,device_type,platform) a
//           |where installnum>=2
//        """.stripMargin
//      val df01: DataFrame = spark.sql(sql2).persist(StorageLevel.MEMORY_AND_DISK_SER)
//      val ios_2_df = df01.filter(df01.col("installnum")>=2 && df01.col("platform").equalTo("ios")).select(concat_ws("\t", df01.col("device_id"),  df01.col("device_type"),  lit("ios"),lit("[\"id2021040102\"]")))
//      val ios_3_df = df01.filter(df01.col("installnum")>=3 && df01.col("platform").equalTo("ios")).select(concat_ws("\t", df01.col("device_id"),  df01.col("device_type"),  lit("ios"),lit("[\"id2021040103\"]")))
//      val ios_4_df = df01.filter(df01.col("installnum")>=4 && df01.col("platform").equalTo("ios")).select(concat_ws("\t", df01.col("device_id"),  df01.col("device_type"),  lit("ios"),lit("[\"id2021040104\"]")))
//      val ios_df_with_country =  df01.filter(df01.col("installnum")>=2 && df01.col("platform").equalTo("ios")).select(concat_ws("\t", df01.col("device_id"),  df01.col("device_type"),  lit("ios"),lit("UNKNOWN")))
//      ios_2_df.union(ios_3_df).union(ios_4_df).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
//      ios_df_with_country.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)
    } finally {
      spark.stop()
    }
    0
  }

}

object ShinnyPackageNames {
  def main(args: Array[String]): Unit = {
    new ShinnyPackageNames().run(args)
  }
}