CanglanPackageNames.scala 4 KB
package mobvista.dmp.datasource.dm

import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.{concat_ws, lit}
import org.apache.spark.storage.StorageLevel

import java.net.URI


class CanglanPackageNames extends CommonSparkJob with Serializable {
  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("output01", true, "[must] output01")
    options.addOption("output02", true, "[must] output02")
    options.addOption("dt_today", true, "[must] dt_today")
    options.addOption("update", true, "[must] update")
    options.addOption("Canglan_Package_Names", true, "[must] Canglan_Package_Names")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val coalesce = commandLine.getOptionValue("coalesce")
    val output01 = commandLine.getOptionValue("output01")
    val output02 = commandLine.getOptionValue("output02")
    val dt_today = commandLine.getOptionValue("dt_today")
    val update = commandLine.getOptionValue("update")
    val Canglan_Package_Names = commandLine.getOptionValue("Canglan_Package_Names")

    val spark = SparkSession.builder()
      .appName("CanglanPackageNames")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    val sc = spark.sparkContext
    import spark.implicits._

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true)

    try {
      val package_names_rdd = sc.textFile(Canglan_Package_Names)
      val package_names_df: DataFrame = package_names_rdd.toDF("pkg")
      package_names_df.createOrReplaceTempView("package_names_tmp")

      val sql1=
        s"""
           |select
           |   device_id,device_type,platform,installnum
           |from
           |(select
           | device_id,device_type,platform,count(1) as installnum
           |from
           |  dwh.dm_install_list_v2
           |where
           |  dt = "${dt_today}"  and  update_date >= "${update}"
           |and
           |  package_name in  (select pkg from package_names_tmp)
           |group by device_id,device_type,platform) a
           |where installnum>=2
        """.stripMargin

      val df01: DataFrame = spark.sql(sql1).persist(StorageLevel.MEMORY_AND_DISK_SER)


      val ios_2_df = df01.filter(df01.col("installnum")>=2 && df01.col("platform").equalTo("ios")).select(concat_ws("\t", df01.col("device_id"),  df01.col("device_type"),  lit("ios"),lit("[\"id2021032502\"]")))
      val ios_3_df = df01.filter(df01.col("installnum")>=3 && df01.col("platform").equalTo("ios")).select(concat_ws("\t", df01.col("device_id"),  df01.col("device_type"),  lit("ios"),lit("[\"id2021032503\"]")))
      val ios_df_with_country =  df01.filter(df01.col("installnum")>=2 && df01.col("platform").equalTo("ios")).select(concat_ws("\t", df01.col("device_id"),  df01.col("device_type"),  lit("ios"),lit("UNKNOWN")))

      ios_2_df.union(ios_3_df).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
      ios_df_with_country.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)
    } finally {
      spark.stop()
    }
    0
  }

}

object CanglanPackageNames {
  def main(args: Array[String]): Unit = {
    new CanglanPackageNames().run(args)
  }
}