RTDmpMergeFix.scala 2.52 KB
package mobvista.dmp.datasource.rtdmp

import java.net.URI

import com.alibaba.fastjson.JSONObject
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SaveMode

import scala.collection.mutable

/**
 * @package: mobvista.dmp.datasource.rtdmp
 * @author: wangjf
 * @date: 2020/8/17
 * @time: 10:57 上午
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
class RTDmpMergeFix extends CommonSparkJob with Serializable {
  var expire_time = ""

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("input", true, "input")
    options.addOption("output", true, "output")
    options.addOption("partition", true, "partition")
    options.addOption("date_time", true, "date_time")
    options.addOption("old_time", true, "old_time")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    //  val input = commandLine.getOptionValue("input")
    val output = commandLine.getOptionValue("output")
    val partition = commandLine.getOptionValue("partition")
    val date_time = commandLine.getOptionValue("date_time")
    val old_time = commandLine.getOptionValue("old_time")

    val spark = MobvistaConstant.createSparkSession(s"RTDmpMergeFix.$date_time")
    val sc = spark.sparkContext

    try {
      spark.udf.register("process", process _)
      val df = spark.sql(sql.replace("@dt", old_time))
        .dropDuplicates()

      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

      df.coalesce(partition.toInt)
        .write.mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(output)

    } finally {
      if (sc != null) {
        sc.stop()
      }
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  val sql =
    """
      |SELECT devid, process(audience_id, update_time) audience_map,
      |  update_time
      |  FROM dwh.audience_merge
      |  WHERE dt = '@dt'
      |""".stripMargin

  def process(audienceIds: mutable.WrappedArray[Int], updateTime: String): String = {
    val json = new JSONObject()
    audienceIds.foreach(audienceId => {
      json.put(String.valueOf(audienceId), updateTime)
    })
    json.toJSONString
  }
}

object RTDmpMergeFix {
  def main(args: Array[String]): Unit = {
    new RTDmpMergeFix().run(args)
  }
}