EtlDeviceIdDaily.scala 5.04 KB
Newer Older
WangJinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
package mobvista.dmp.datasource.id_mapping

import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.id_mapping.Constant._
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel

import java.net.URI

/**
 * @package: mobvista.dmp.datasource.id_mapping
 * @author: wangjf
 * @date: 2021/11/30
 * @time: 10:35 上午
 * @email: jinfeng.wang@mobvista.com
 */
abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable {
  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("business", true, "business")
    options.addOption("date", true, "date")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val date = commandLine.getOptionValue("date")
    val business = commandLine.getOptionValue("business")
    val output = commandLine.getOptionValue("output")
    val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))

    val spark = MobvistaConstant.createSparkSession(s"EtlDeviceIdDaily.$business.$date")

    try {
WangJinfeng committed
42
      if ("dsp_req".equalsIgnoreCase(business)) {
WangJinfeng committed
43
        for (i <- 0 until 6) {
WangJinfeng committed
44
          val df = processData(date, i, spark)
WangJinfeng committed
45
            .repartition(coalesce)
WangJinfeng committed
46
          df.persist(StorageLevel.MEMORY_AND_DISK_SER)
WangJinfeng committed
47

WangJinfeng committed
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
          val iosTab = df.filter(plf => {
            "ios".equals(plf._1)
          }).map(row => {
            row._2
          })
          FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/ios/${i}"), true)

          spark.createDataFrame(iosTab, iosSchema)
            .write.mode(SaveMode.Overwrite)
            .option("orc.compress", "zlib")
            .orc(output + s"/ios/${i}")

          val adrTab = df.filter(plf => {
            "android".equals(plf._1)
          }).map(row => {
            row._2
          })

          FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/android/${i}"), true)

          spark.createDataFrame(adrTab, adrSchema)
            .write.mode(SaveMode.Overwrite)
            .option("orc.compress", "zlib")
            .orc(output + s"/android/${i}")

WangJinfeng committed
73 74 75 76 77 78 79 80 81 82 83 84 85
          val otherTab = df.filter(plf => {
            "other".equals(plf._1)
          }).map(row => {
            row._2
          })

          FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/other/${i}"), true)

          spark.createDataFrame(otherTab, otherSchema)
            .write.mode(SaveMode.Overwrite)
            .option("orc.compress", "zlib")
            .orc(output + s"/other/${i}")

WangJinfeng committed
86 87 88 89
          df.unpersist(true)
        }
      } else {
        val df = processData(date, 0, spark)
WangJinfeng committed
90
          .repartition(coalesce)
WangJinfeng committed
91
        df.persist(StorageLevel.MEMORY_AND_DISK_SER)
WangJinfeng committed
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116

        val iosTab = df.filter(plf => {
          "ios".equals(plf._1)
        }).map(row => {
          row._2
        })
        FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + "/ios"), true)

        spark.createDataFrame(iosTab, iosSchema)
          .write.mode(SaveMode.Overwrite)
          .option("orc.compress", "zlib")
          .orc(output + "/ios")

        val adrTab = df.filter(plf => {
          "android".equals(plf._1)
        }).map(row => {
          row._2
        })

        FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/android"), true)

        spark.createDataFrame(adrTab, adrSchema)
          .write.mode(SaveMode.Overwrite)
          .option("orc.compress", "zlib")
          .orc(output + s"/android")
WangJinfeng committed
117 118 119 120 121 122 123 124 125 126 127 128 129

        val otherTab = df.filter(plf => {
          "other".equals(plf._1)
        }).map(row => {
          row._2
        })

        FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/other"), true)

        spark.createDataFrame(otherTab, otherSchema)
          .write.mode(SaveMode.Overwrite)
          .option("orc.compress", "zlib")
          .orc(output + s"/other")
WangJinfeng committed
130
      }
WangJinfeng committed
131 132 133 134 135 136 137 138 139 140 141
      spark.sql(
        s"""
           |ALTER TABLE dws.dws_device_id_ios_frequency ADD IF NOT EXISTS PARTITION (dt='$date',source='${business}')
           | LOCATION '$output/ios'
           |""".stripMargin)

      spark.sql(
        s"""
           |ALTER TABLE dws.dws_device_id_android_frequency ADD IF NOT EXISTS PARTITION (dt='$date',source='${business}')
           | LOCATION '$output/android'
           |""".stripMargin)
WangJinfeng committed
142 143 144 145 146 147 148 149
    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

WangJinfeng committed
150
  def processData(date: String, i: Int, spark: SparkSession): RDD[(String, Row)]
WangJinfeng committed
151
}