RTDmpMain.scala 5.8 KB
package mobvista.dmp.datasource.rtdmp

import com.alibaba.fastjson.JSONObject
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.rtdmp.Constant.AudienceMerge
import mobvista.dmp.util.DateUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.codehaus.jackson.map.ObjectMapper

import java.text.SimpleDateFormat
import java.util.Calendar
import scala.collection.JavaConverters._

/**
 * @package: mobvista.dmp.datasource.rtdmp
 * @author: wangjf
 * @date: 2020/7/13
 * @time: 11:25 上午
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
class RTDmpMain extends CommonSparkJob with Serializable {

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("datetime", true, "datetime")
    options.addOption("old_datetime", true, "old_datetime")
    options.addOption("input", true, "input")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options
  }

  override protected def run(args: Array[String]): Int = {

    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val datetime = commandLine.getOptionValue("datetime")
    val old_datetime = commandLine.getOptionValue("old_datetime")
    val input = commandLine.getOptionValue("input")
    val output = commandLine.getOptionValue("output")
    val coalesce = commandLine.getOptionValue("coalesce")

    val spark: SparkSession = SparkSession.builder()
      .appName(s"RTDmpMain.${datetime}")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    val sc = spark.sparkContext
    try {

      var sdf = new SimpleDateFormat("yyyyMMddHHmmss")
      //  默认计算上个小时的数据
      val update_time_start = DateUtil.format(sdf.parse(datetime + "0000"), "yyyy-MM-dd HH:mm:ss")
      val update_time_end = DateUtil.format(sdf.parse(datetime + "5959"), "yyyy-MM-dd HH:mm:ss")

      val audience_date_utime_start = sdf.parse(datetime + "0000").getTime / 1000 - 28800
      val audience_date_utime_end = sdf.parse(datetime + "5959").getTime / 1000 - 25200

      val updateAudienceIds =
        ServerUtil.request(update_time_start, update_time_end, audience_date_utime_start, audience_date_utime_end, 0, 1, 2)
          .asScala.keys.toSet

      println(s"updateAudienceIds -->> ${updateAudienceIds.mkString(",")}")

      sdf = new SimpleDateFormat("yyyyMMddHH")
      val calendar = Calendar.getInstance()
      val date = sdf.parse(datetime)
      calendar.setTime(date)
      calendar.set(Calendar.HOUR_OF_DAY, calendar.get(Calendar.HOUR_OF_DAY) - 48)
      val expire_time = sdf.format(calendar.getTime)

      val hour_rdd = spark.read.orc(input).rdd.map(row => {
        val devid = row.getAs[String]("devid")
        val audience_ids = row.getAs[String]("audience_ids").split(",")
        val audience_data = new JSONObject()
        audience_ids.foreach(audience_id => {
          audience_data.put(audience_id, datetime)
        })
        val device_type = row.getAs[String]("device_type")
        (devid, (audience_data.toJSONString, device_type))
      })

      val sql =
        s"""
           |SELECT * FROM dwh.audience_merge WHERE dt = '$old_datetime'
           |""".stripMargin
      val merge_rdd = spark.sql(sql).rdd
        .map(row => {
          val devid = row.getAs[String]("devid")
          val audience_map = row.getAs[String]("audience_map")
          val update_time = row.getAs[String]("update_time")
          val device_type = row.getAs[String]("device_type")
          (devid, (audience_map, update_time, device_type))
        })

      import spark.implicits._

      val df = hour_rdd.fullOuterJoin(merge_rdd)
        .mapPartitions(ts => {
          ts.map(t => {
            val devid = t._1
            val opt1 = t._2._1
            val opt2 = t._2._2
            if (opt1.nonEmpty && opt2.nonEmpty) {
              val new_audience = MobvistaConstant.String2JSONObject(opt1.get._1).asInstanceOf[java.util.Map[String, String]]
              val old_audience = opt2.get._1
              val retain_old_audience = MobvistaConstant.String2JSONObject(old_audience).asInstanceOf[java.util.Map[String, String]].asScala
                .retain((k, v) => !updateAudienceIds.contains(k.toInt) && !new_audience.keySet().contains(k) && v.compareTo(expire_time) > 0)
              new_audience.putAll(retain_old_audience.asJava)
              AudienceMerge(devid, new ObjectMapper().writeValueAsString(new_audience), datetime, opt1.get._2)
            } else if (opt1.nonEmpty && opt2.isEmpty) {
              AudienceMerge(devid, opt1.get._1, datetime, opt1.get._2)
            } else {
              val old_audience = opt2.get._1
              val retain_old_audience = MobvistaConstant.String2JSONObject(old_audience).asInstanceOf[java.util.Map[String, String]].asScala
                .retain((k, v) => !updateAudienceIds.contains(k.toInt) && v.compareTo(expire_time) > 0)
              AudienceMerge(devid, new ObjectMapper().writeValueAsString(retain_old_audience.asJava), opt2.get._2, opt2.get._3)
            }
          })
        }).filter(o => {
        !MobvistaConstant.String2JSONObject(o.audience_map).isEmpty
      })

      df.toDF
        .repartition(coalesce.toInt)
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(output)

    } finally {
      if (sc != null) {
        sc.stop()
      }
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }
}

object RTDmpMain {
  def main(args: Array[String]): Unit = {
    new RTDmpMain().run(args)
  }
}