ReyunLahuoList.scala 3.77 KB
Newer Older
fan.jiang committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
package mobvista.dmp.datasource.dm

import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
import org.apache.spark.sql.functions.{concat_ws, lit}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.storage.StorageLevel

import java.net.URI

/**
 * @author jiangfan
 * @date 2021/9/15 17:13
 */

class ReyunLahuoList extends CommonSparkJob with Serializable {
  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("output01", true, "[must] output01")
    options.addOption("output02", true, "[must] output02")
    options.addOption("dt_today", true, "[must] dt_today")
    options.addOption("update_date", true, "[must] update_date")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val coalesce = commandLine.getOptionValue("coalesce")
    val output01 = commandLine.getOptionValue("output01")
    val output02 = commandLine.getOptionValue("output02")
    val dt_today = commandLine.getOptionValue("dt_today")
    val update_date = commandLine.getOptionValue("update_date")

    val spark = SparkSession.builder()
      .appName("ReyunLahuoList")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    val conf = spark.sparkContext.hadoopConfiguration
    conf.set("mapreduce.output.compress", "true")
    conf.set("mapreduce.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
    conf.setBoolean("mapreduce.output.fileoutputformat.compress", true)
    conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
    conf.setClass("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec], classOf[CompressionCodec])

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true)

    try {
      val sql1=
        s"""
           |select device_id, device_type, concat("[\\"",package_name,"_reyun","\\"]")  as  package_name
           |from
           |   dwh.dm_install_list_v2
           |where dt='${dt_today}' and business='reyun'  and update_date>='${update_date}'
           |  and package_name in ('com.ss.android.ugc.aweme','com.taobao.idlefish','com.taobao.taobao','com.UCMobile','com.xunmeng.pinduoduo','me.ele')
        """.stripMargin

      val df01: DataFrame = spark.sql(sql1).persist(StorageLevel.MEMORY_AND_DISK_SER)

      val data = df01.select(concat_ws("\t", df01.col("device_id"),  df01.col("device_type"),  lit("android"),df01.col("package_name")))
      val data_with_country = df01.select(concat_ws("\t", df01.col("device_id"),  df01.col("device_type"),  lit("android"),lit("CN")))

      data.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01)
      data_with_country.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02)

    } finally {
      spark.stop()
    }
    0
  }
}

object ReyunLahuoList {
  def main(args: Array[String]): Unit = {
    new ReyunLahuoList().run(args)
  }
}