RTDmpMerge.scala 4.53 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
package mobvista.dmp.datasource.rtdmp

import com.alibaba.fastjson.JSON
import com.fasterxml.jackson.databind.ObjectMapper
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.rtdmp.Constant.AudienceMerge
import mobvista.dmp.util.DateUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SaveMode

import java.net.URI
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable

/**
 * @package: mobvista.dmp.datasource.rtdmp
 * @author: wangjf
 * @date: 2020/8/17
 * @time: 10:57 上午
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
class RTDmpMerge extends CommonSparkJob with Serializable {
  var expire_time = ""

  var updateAudienceIdSet = new mutable.HashSet[Int]()

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("input", true, "input")
    options.addOption("output", true, "output")
    options.addOption("partition", true, "partition")
    options.addOption("date_time", true, "date_time")
    options.addOption("old_time", true, "old_time")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val input = commandLine.getOptionValue("input")
    val output = commandLine.getOptionValue("output")
    val partition = commandLine.getOptionValue("partition")
    val date_time = commandLine.getOptionValue("date_time")
    val old_time = commandLine.getOptionValue("old_time")

    val spark = MobvistaConstant.createSparkSession(s"RTDmpMerge.$date_time")
    val sc = spark.sparkContext

    val update_time = DateUtil.getDayByString(date_time, "yyyyMMddHH", -14)

    expire_time = DateUtil.getDayByString(date_time, "yyyyMMddHH", -2)

    val mapper = new ObjectMapper()
    try {
      import spark.implicits._
61 62 63
      //  val paths = input.split(",", -1)
      //  spark.read.orc(paths(0), paths(1), paths(2), paths(3), paths(4), paths(5))
      val daily_df = spark.read.orc(input).rdd.map(row => {
wang-jinfeng committed
64 65
        val deviceId = row.getAs[String]("devid")

66 67
        val deviceType = row.getAs[String]("device_type")

wang-jinfeng committed
68 69 70 71 72 73 74 75 76 77
        val audienceMap = JSON.parseObject(row.getAs[String]("audience_data")).asInstanceOf[java.util.Map[String, String]]
          .map(kv => {
            val audienceTime = DateUtil.format(DateUtil.parse(kv._2 + ":00:00", "yyyy-MM-dd HH:mm:ss"), "yyyyMMddHH")
            (kv._1, audienceTime)
          })

        audienceMap.retain((k, v) => v.compareTo(expire_time) > 0 && StringUtils.isNotBlank(k))
        audienceMap.keys.foreach(k => {
          updateAudienceIdSet.add(Integer.valueOf(k))
        })
78
        AudienceMerge(deviceId, mapper.writeValueAsString(audienceMap.asJava), date_time, deviceType)
wang-jinfeng committed
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
      }).toDF
        .dropDuplicates()

      daily_df.createOrReplaceTempView("daily_rtdmp")

      spark.udf.register("process", process _)
      val join_df = spark.sql(sql.replace("@dt", old_time).replace("@update_time", update_time))
        .filter("audience_map != '{}'")
        .dropDuplicates()

      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

      join_df.coalesce(partition.toInt)
        .write.mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(output)

    } finally {
      if (sc != null) {
        sc.stop()
      }
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  val sql =
    """
      |SELECT
      |   coalesce(d.devid, m.devid) devid,
      |   coalesce(d.audience_map, m.audience_map) audience_map,
112 113
      |   coalesce(d.update_time, m.update_time) update_time,
      |   coalesce(d.device_type, m.device_type) device_type
wang-jinfeng committed
114 115 116
      |   FROM daily_rtdmp d
      |   FULL OUTER JOIN
      |   (SELECT devid, process(audience_map) audience_map,
117
      |     update_time, device_type
wang-jinfeng committed
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
      |     FROM dwh.audience_merge
      |     WHERE dt = '@dt' AND update_time >= '@update_time'
      |   ) m
      |   ON d.devid = m.devid
      |""".stripMargin

  def process(audience_map: String): String = {
    val audienceMap = JSON.parseObject(audience_map).asInstanceOf[java.util.Map[String, String]]
    audienceMap.retain((k, v) => v.compareTo(expire_time) > 0 && !updateAudienceIdSet.contains(Integer.valueOf(k)))
    audienceMap.toString
  }
}

object RTDmpMerge {
  def main(args: Array[String]): Unit = {
    new RTDmpMerge().run(args)
  }
}