RtdmpTmpId1142110895.scala 2.74 KB
Newer Older
fan.jiang committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
package mobvista.dmp.datasource.dm

import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.format.TextMultipleOutputFormat
import org.apache.commons.cli.Options
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.Text
import org.apache.spark.sql.{Row, SparkSession}

import java.net.URI
import scala.collection.mutable.ArrayBuffer

/**
 * @author jiangfan
 * @date 2021/8/25 11:47
 */

class RtdmpTmpId1142110895 extends CommonSparkJob with Serializable {
  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("output1", true, "[must] output1")
    options.addOption("dt_today", true, "[must] dt_today")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val coalesce = commandLine.getOptionValue("coalesce")
    val output1 = commandLine.getOptionValue("output1")
    val dt_today = commandLine.getOptionValue("dt_today")

    val spark = SparkSession.builder()
      .appName("RtdmpTmpId1142110895")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    val sc = spark.sparkContext

51
    FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output1), true)
fan.jiang committed
52 53 54 55

    try {
      val sql1=
        s"""
56
           |select device_id, device_type from dwh.dm_install_list_v2  where dt='${dt_today}'  and package_name in ('1142110895','id1142110895') and device_type not in ('androidid','android_id','ruid')
fan.jiang committed
57 58
        """.stripMargin

59
      spark.sql(sql1).rdd.flatMap(buildRes).repartition(coalesce.toInt)
fan.jiang committed
60 61 62 63 64 65 66 67
        .saveAsNewAPIHadoopFile(s"${output1}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)

    } finally {
      spark.stop()
    }
    0
  }

68 69
  def buildRes(row: Row): Array[(Text, Text)] = {
    val buffer = new ArrayBuffer[(Text, Text)]()
fan.jiang committed
70 71 72 73 74 75 76 77 78 79 80 81 82 83
    val device_id = row.getAs[String]("device_id")
    val device_type = row.getAs[String]("device_type")
    if (StringUtils.isNotBlank(device_type)) {
      buffer += Tuple2(new Text(s"${device_type}, "), new Text(device_id))
    }
    buffer.toArray
  }
}

object RtdmpTmpId1142110895 {
  def main(args: Array[String]): Unit = {
    new RtdmpTmpId1142110895().run(args)
  }
}