TrackingMergeDaily.scala 3.99 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
package mobvista.dmp.datasource.datatory

import java.net.URI

import mobvista.dmp.clickhouse.tracking.TrackingEntity
import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}

import scala.collection.mutable

/**
  * @package: mobvista.dmp.datasource.datatory
  * @author: wangjf
  * @date: 2019/11/25
  * @time: 15:00
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
class TrackingMergeDaily extends CommonSparkJob with java.io.Serializable {
  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("date", true, "date")
    options.addOption("output", true, "output")

    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val date = commandLine.getOptionValue("date")
    val output = commandLine.getOptionValue("output")

    val spark = SparkSession
      .builder()
      .appName("TrackingMergeDaily")
      .config("spark.rdd.compress", "true")
      .config("spark.shuffle.compress", "true")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.io.compression.codec", "lz4")
      .config("spark.io.compression.lz4.blockSize", "64k")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    import spark.implicits._
    val sc = spark.sparkContext
    try {
      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

      var sql = Constant.tracking_install_join_event_sql.replace("@date", date)
      spark.sql(sql).createOrReplaceTempView("event_join")

      sql = Constant.tracking_merge_sql.replace("@date", date)

      spark.sql(sql)
        .dropDuplicates
        .rdd.map(r => {
        val offer_id = r.getAs("offer_id").asInstanceOf[mutable.WrappedArray[String]]
        val offerSet: mutable.HashSet[String] = new mutable.HashSet[String]()
        offer_id.foreach(c => {
          if (StringUtils.isNotBlank(c)) {
            offerSet.add(c)
          }
        })
        val id = r.getAs("id").asInstanceOf[mutable.WrappedArray[String]]
        val idSet: mutable.HashSet[String] = new mutable.HashSet[String]()
        id.foreach(c => {
          if (StringUtils.isNotBlank(c)) {
            idSet.add(c)
          }
        })
        val event_name = r.getAs("event_name").asInstanceOf[mutable.WrappedArray[String]]
        val eventNameSet: mutable.HashSet[String] = new mutable.HashSet[String]()
        event_name.foreach(c => {
          if (StringUtils.isNotBlank(c)) {
            eventNameSet.add(c)
          }
        })
        val event_type = r.getAs("event_type").asInstanceOf[mutable.WrappedArray[String]]
        val eventTypeSet: mutable.HashSet[String] = new mutable.HashSet[String]()
        event_type.foreach(c => {
          if (StringUtils.isNotBlank(c)) {
            eventTypeSet.add(c)
          }
        })
        val eventName = if (eventNameSet.nonEmpty) {
          eventNameSet
        } else if (idSet.nonEmpty) {
          idSet
        } else {
          Set()
        }
        TrackingEntity(r.getAs("device_id"), r.getAs("device_model"), r.getAs("os_version"),
          r.getAs("country"), r.getAs("city"), mutable.WrappedArray.make(offerSet.toArray),
          mutable.WrappedArray.make(eventName.toArray), mutable.WrappedArray.make(eventTypeSet.toArray), r.getAs("log_type"))
      }).toDF
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "snappy")
        .orc(output)

    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }
}

object TrackingMergeDaily {
  def main(args: Array[String]): Unit = {
    new TrackingMergeDaily().run(args)
  }
}