RTDmpAS.scala 5.8 KB
package mobvista.dmp.datasource.rtdmp

import java.net.URI

import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.format.RDDMultipleOutputFormat
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.Text
import org.apache.spark.sql.SparkSession

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
  * @package: mobvista.dmp.datasource.rtdmp
  * @author: wangjf
  * @date: 2020/7/13
  * @time: 11:25 上午
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
class RTDmpAS extends CommonSparkJob with Serializable {

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("input_audience", true, "input_audience")
    options.addOption("input_data", true, "input_data")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options.addOption("time", true, "time")
    options
  }

  override protected def run(args: Array[String]): Int = {

    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val input_audience = commandLine.getOptionValue("input_audience")
    val input_data = commandLine.getOptionValue("input_data")
    val output = commandLine.getOptionValue("output")
    val coalesce = commandLine.getOptionValue("coalesce")
    val time = commandLine.getOptionValue("time")

    val spark: SparkSession = MobvistaConstant.createSparkSession(s"RTDmpAS.$time")

    val sc = spark.sparkContext
    try {

      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

      //  默认计算上个小时的数据
      //  val update_time_start = DateUtil.format(time + ":00:00", "yyyy-MM-dd HH:mm:ss")
      //  val update_time_end = DateUtil.format(time + ":59:59", "yyyy-MM-dd HH:mm:ss")

      //  println("update_time_start -->> " + update_time_start + ", update_time_end -->> " + update_time_end)

      //  筛选所有的人群包,给一个最大值和一个最小值

      val audience_date_utime_start = 1577811600L
      val audience_date_utime_end = 4100731200L

      val update_time_start = "2000-01-01 00:00:00"
      val update_time_end = "2099-12-31 23:59:59"

      /*
      val audience_date_utime_start = DateUtil.parse(update_time_start, "yyyy-MM-dd HH").getTime / 1000 - 28800
      val audience_date_utime_end = DateUtil.parse(update_time_end, "yyyy-MM-dd HH").getTime / 1000 - 28800
      */

      import scala.collection.JavaConverters._
      val ids = ServerUtil.request(update_time_start, update_time_end, audience_date_utime_start, audience_date_utime_end, 0, 0,4).asScala

      val trueId = ids.filter(kv => {
        kv._2._3 != 3
      }).keys.toSet

      val falseId = ids.filter(kv => {
        kv._2._3 == 3
      }).keys.toSet

      println("trueId -->> " + trueId)
      println("falseId -->> " + falseId)

      val audience_output = output + "/audience"
      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(audience_output), true)

      val audineceSum = sc.textFile(input_audience).map(r => {
        r.substring(1, r.length - 1).split(",", -1)
      }).filter(r => {
        !falseId.contains(Integer.parseInt(r(0)))
      }).map(r => {
        (r(0), r(1))
      }).cache()

      audineceSum.coalesce(1).saveAsTextFile(audience_output)

      val data_output = output + "/data"
      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(data_output), true)


      import scala.collection.JavaConversions._
      import scala.collection.JavaConverters._
      //  .mapPartitions(Logic.parseResult(data_output, trueId, falseId, _))

      //  运算人群包不进行产出。
      spark.read.orc(input_data).rdd
        .map(row => {
          val array = new ArrayBuffer[(Text, Text)]()
          val audience_info = row.getAs[String]("audience_info")
          val jsonObject = MobvistaConstant.String2JSONObject(audience_info)
          val devid = jsonObject.getString("devid")
          val install_list = jsonObject.getJSONObject("devid")
          val audience_id = JSON.parseArray(jsonObject.getJSONArray("audience_id").toJSONString, classOf[Integer]).toSet
          if (((audience_id -- falseId) & trueId).nonEmpty) {
            val newJSON = new JSONObject()
            newJSON.put("devid", devid)
            if (install_list.nonEmpty) {
              newJSON.put("install_list", install_list)
            }
            newJSON.put("audience_id", ((audience_id -- falseId) & trueId).asJava)
            val regionSet = row.getAs("region").asInstanceOf[mutable.WrappedArray[String]]
            for (region <- regionSet) {
              array.add((new Text(data_output + "/" + region), new Text(newJSON.toJSONString)))
            }
          }
          array.iterator
        }).flatMap(l => l)
        .repartition(coalesce.toInt)
        .saveAsNewAPIHadoopFile(data_output, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]])

      val jsonArray = new JSONArray()
      audineceSum.collect().foreach(m => {
        val jsonObject = new JSONObject()
        "".equals("")
        jsonObject.put("id", Integer.parseInt(m._1))
        jsonObject.put("audience_data_status", 2)
        jsonObject.put("audience_count", Integer.parseInt(m._2))
        jsonArray.add(jsonObject)
      })

      val jsonObject = ServerUtil.update(jsonArray)
      if (jsonObject.getInteger("code") == 200) {
        println("Audience Update OK!")
      }

    } finally {
      if (sc != null) {
        sc.stop()
      }
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }
}

object RTDmpAS {

  def main(args: Array[String]): Unit = {
    new RTDmpAS().run(args)
  }
}