TrackingEventDaily.scala 4.78 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
package mobvista.dmp.datasource.datatory

import java.net.URI

import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel

/**
  * @package: mobvista.dmp.datasource.datatory
  * @author: wangjf
  * @date: 2019/4/25
  * @time: 下午3:19
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
class TrackingEventDaily extends CommonSparkJob with java.io.Serializable {
  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("date", true, "date")
    options.addOption("before_date", true, "before_date")
    options.addOption("output", true, "output")
    options.addOption("info_output", true, "info_output")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val date = commandLine.getOptionValue("date")
    val before_date = commandLine.getOptionValue("before_date")
    val output = commandLine.getOptionValue("output")
    val info_output = commandLine.getOptionValue("info_output")

    val spark = SparkSession
      .builder()
      .appName("TrackingEventDaily")
      .config("spark.rdd.compress", "true")
      .config("spark.shuffle.compress", "true")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.io.compression.codec", "lz4")
      .config("spark.io.compression.lz4.blockSize", "64k")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    val sc = spark.sparkContext
    try {
      import spark.implicits._

      /*
      campaingMaps = sc.broadcast(Constant.jdbcConnection(spark, "mob_adn", "campaign_list")
        .select("id", "network_cid")
        .groupBy("network_cid")
        .agg(concat_ws(",", collect_set("id")))
        .filter(r => {
          StringUtils.isNotBlank(r.getAs("network_cid").toString)
        }).rdd.map(r => {
        (r.getAs("network_cid").toString, r.getAs("id").toString)
      }).collectAsMap())

      spark.udf.register("getCampaignIds", getCampaignIds _)

      Constant.jdbcConnection(spark, "mob_adn", "campaign_list")
        .rdd.map(r => {
        (r.getAs("network_cid").toString, r.getAs("id"))
      }).saveAsTextFile(output, classOf[GzipCodec])
      */

      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

      var sql = Constant.tracking_event_sql.replace("@date", date)

      spark.sql(sql)
        .filter(r => {
          Constant.check_deviceId(r.getAs("device_id"))
        })
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "snappy")
        .orc(output)

      val campaingDF = Constant.jdbcConnection(spark, "mob_adn", "campaign_list")
        .select("id", "network_cid")
        .filter(r => {
          StringUtils.isNotBlank(r.getAs("network_cid").toString)
        }).rdd.map(r => {
        Row(r.getAs("network_cid").toString, r.getAs("id").toString)
      })
      val schema = StructType(Array(
        StructField("network_cid", StringType),
        StructField("id", StringType)))
      spark.createDataFrame(campaingDF, schema = schema).createOrReplaceTempView("campaign_tb")

      sql = Constant.event_info_sql.replace("@date", date)
      val df_1 = spark.sql(sql)
      sql = Constant.tracking_event_info_sql.replace("@date", date)
      val df_2 = spark.sql(sql)

      val df: Dataset[Row] = df_1.union(df_2)
        .dropDuplicates
        .rdd
        .filter(r => {
          r.getAs("offer_id") != null && StringUtils.isNotBlank(r.getAs("offer_id").toString)
        })
        .map(r => {
          EventInfo(r.getAs("id"), r.getAs("event_name"), r.getAs("event_type"), r.getAs("offer_id"))
        }).toDF

      val oldDF = spark.sql(Constant.all_event_info_sql.replace("@before_date", before_date))

      val newDF = oldDF.union(df).dropDuplicates.repartition(1).persist(StorageLevel.MEMORY_ONLY_SER)
      newDF
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "snappy")
        .orc(info_output)

      Constant.writeMySQL(newDF, "event_info", SaveMode.Overwrite)
    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  /*
  def getCampaignIds(id: String): String = {
    campaingMaps.value.getOrElse(id, "")
  }
  */
}

object TrackingEventDaily {
  def main(args: Array[String]): Unit = {
    new TrackingEventDaily().run(args)
  }
}