RTDmpMainPre.scala 9.03 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5
package mobvista.dmp.datasource.rtdmp

import com.alibaba.fastjson.{JSONArray, JSONObject}
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.rtdmp.Constant.AudienceInfoPre
6
import mobvista.dmp.datasource.rtdmp.Logic.getDevType
wang-jinfeng committed
7 8 9 10
import mobvista.dmp.util.{DateUtil, MD5Util}
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
11 12 13
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat}
import org.apache.spark.rdd.NewHadoopRDD
wang-jinfeng committed
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.slf4j.LoggerFactory

import java.net.URI
import java.util
import scala.collection.JavaConversions._
import scala.collection.mutable

/**
 * @package: mobvista.dmp.datasource.rtdmp
 * @author: wangjf
 * @date: 2020/7/13
 * @time: 11:25 上午
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
class RTDmpMainPre extends CommonSparkJob with Serializable {
  private val logger = LoggerFactory.getLogger(classOf[RTDmpMainPre])

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("time", true, "time")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options.addOption("data_utime", true, "data_utime")
    options
  }

  override protected def run(args: Array[String]): Int = {

    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val time = commandLine.getOptionValue("time").replace(".", " ")
    val output = commandLine.getOptionValue("output")
    val coalesce = commandLine.getOptionValue("coalesce")
    val data_utime = commandLine.getOptionValue("data_utime").replace(".", " ")

    val spark: SparkSession = SparkSession.builder()
      .appName(s"RTDmpMainPre.$time")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()

    val sc = spark.sparkContext
    try {

63
      var mergeRDD = sc.emptyRDD[(String, (Int, String))]
wang-jinfeng committed
64 65 66 67 68 69 70 71 72 73 74

      //  默认计算上个小时的数据
      val update_time_start = DateUtil.format(time + ":00:00", "yyyy-MM-dd HH:mm:ss")
      val update_time_end = DateUtil.format(time + ":59:59", "yyyy-MM-dd HH:mm:ss")

      val audience_date_utime_start = DateUtil.parse(data_utime, "yyyy-MM-dd HH").getTime / 1000 - 28800
      val audience_date_utime_end = DateUtil.parse(data_utime, "yyyy-MM-dd HH").getTime / 1000 - 25200

      val map: util.Map[Integer, (JSONArray, Integer, Integer, JSONObject)] =
        ServerUtil.request(update_time_start, update_time_end, audience_date_utime_start, audience_date_utime_end, 0, 1, 2)

75 76 77 78
      val fc = classOf[TextInputFormat]
      val kc = classOf[LongWritable]
      val vc = classOf[Text]

wang-jinfeng committed
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
      map.foreach(t => {
        val audienceId = Integer2int(t._1)
        val audienceOp = t._2._2
        val dmap = new mutable.HashMap[String, String]()
        t._2._1.foreach(json => {
          val jsonObject = json.asInstanceOf[JSONObject]
          if (jsonObject.containsKey("s3_path") && StringUtils.isNotBlank(jsonObject.getString("s3_path"))) {
            //  (s3_path, update_date)
            dmap.put(jsonObject.getString("s3_path"), jsonObject.getString("update_time"))
          }
        })

        //  判断所有 s3_path 对应的 update_date 中是否有 当前 update_date,进行有效过滤
        //  if (dmap.values.contains(time)) {
        /**
         * audienceOp == 0 and dmap.size >= 2 即做差集计算,表示在上个分区出现,这个分区不出现的设备对应的安装包置为 -1 * audienceId,
         * 用于下游删除;否则与前一个人群包进行合并操作
         */
        val updateRDD = if (audienceOp == 1 && dmap.size >= 2) {
          val list = dmap.toList.sortWith(_._2 > _._2).take(1) //  按 update_date 进行降序排序,提取最前面的两个人群包
          val pathUri = new URI(list.get(0)._1)
          val newAudience = if (FileSystem.get(new URI(s"${pathUri.getScheme}://${pathUri.getHost}"), sc.hadoopConfiguration)
            .exists(new Path(pathUri.toString.replace("*", "")))) {
102 103 104 105 106
            val rdd = sc.newAPIHadoopFile(list.get(0)._1, fc, kc, vc, sc.hadoopConfiguration)
            val linesWithFileNames = rdd.asInstanceOf[NewHadoopRDD[LongWritable, Text]]
              .mapPartitionsWithInputSplit((inputSplit, iterator) => {
                val file = inputSplit.asInstanceOf[FileSplit]
                iterator.map(tup => (file.getPath.getParent.getName, tup._2))
wang-jinfeng committed
107
              })
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
            linesWithFileNames.repartition(100)
              .filter(r => {
                r._2.toString.length <= 64
              }).map(r => { //  First 为最新的人群包
              val device_id =
                if (r._2.toString.matches(MobvistaConstant.md5Ptn)) {
                  r._2.toString
                } else {
                  MD5Util.getMD5Str(r._2.toString)
                }
              val device_type = if (r._1.endsWith("md5")) {
                r._1
              } else {
                s"${r._1}_md5"
              }
              (device_id, (audienceId, getDevType(device_type)))
            })
wang-jinfeng committed
125
          } else {
126
            sc.emptyRDD[(String, (Int, String))]
wang-jinfeng committed
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
          }
          /*
          val oldAudience = sc.textFile(list.get(1)._1).repartition(100).map(r => { //  Second 为旧的人群包,表示上一版本人群包
            val device_id =
              if (r.matches(MobvistaConstant.md5Ptn)) {
                r
              } else {
                MD5Util.getMD5Str(r)
              }
            (device_id, (audienceId, list.get(1)._2))
          })
          oldAudience.subtractByKey(newAudience).map(t => {
            //  对差集 audienceId * -1,用于下游计算删除 audienceId
            val device_id =
              if (t._1.matches(MobvistaConstant.md5Ptn)) {
                t._1
              } else {
                MD5Util.getMD5Str(t._1)
              }
            (device_id, ((-1) * audienceId, t._2._2))
            //  (devId, ((-1) * audienceId, update_date))
          }).union(newAudience) //  与最新的人群包进行合并操作
          */
          newAudience
wang-jinfeng committed
151
        } else if (dmap.size == 1) {
wang-jinfeng committed
152 153 154 155
          val audData = dmap.toList.sortWith(_._2 > _._2).take(1)
          val pathUri = new URI(audData.get(0)._1)
          if (audData.nonEmpty && FileSystem.get(new URI(s"${pathUri.getScheme}://${pathUri.getHost}"), sc.hadoopConfiguration)
            .exists(new Path(audData.get(0)._1.replace("*", "")))) {
156 157 158 159 160 161 162 163
            val rdd = sc.newAPIHadoopFile(audData.get(0)._1, fc, kc, vc, sc.hadoopConfiguration)
            val linesWithFileNames = rdd.asInstanceOf[NewHadoopRDD[LongWritable, Text]]
              .mapPartitionsWithInputSplit((inputSplit, iterator) => {
                val file = inputSplit.asInstanceOf[FileSplit]
                iterator.map(tup => (file.getPath.getParent.getName, tup._2))
              })

            linesWithFileNames
wang-jinfeng committed
164
              .filter(r => {
165
                r._2.toString.length <= 64
wang-jinfeng committed
166 167 168
              })
              .map(r => { //  取出最新的人群包
                val device_id =
169 170
                  if (r._2.toString.matches(MobvistaConstant.md5Ptn)) {
                    r._2.toString
wang-jinfeng committed
171
                  } else {
172
                    MD5Util.getMD5Str(r._2.toString)
wang-jinfeng committed
173
                  }
174 175 176 177 178 179
                val device_type = if (r._1.endsWith("md5")) {
                  r._1
                } else {
                  s"${r._1}_md5"
                }
                (device_id, (audienceId, getDevType(device_type)))
wang-jinfeng committed
180 181
              })
          } else { //  如果没有,则创建 空RDD
182
            sc.emptyRDD[(String, (Int, String))]
wang-jinfeng committed
183
          }
wang-jinfeng committed
184
        } else {
185
          sc.emptyRDD[(String, (Int, String))]
wang-jinfeng committed
186 187 188 189 190 191 192
        }
        //  所有人群包进行合并操作
        mergeRDD = mergeRDD.union(updateRDD)
      })

      val df = mergeRDD.repartition(1000).groupByKey().map(r => {
        val devId = r._1
193
        var deviceType = ""
wang-jinfeng committed
194 195 196
        val set = new mutable.HashSet[Int]()
        //  生成 audienceId -> update_date JSONObject
        r._2.foreach(t => {
197 198 199 200
          set.add(t._1)
          if (StringUtils.isBlank(deviceType)) {
            deviceType = t._2
          }
wang-jinfeng committed
201
        })
202
        (devId, set, deviceType)
wang-jinfeng committed
203 204 205 206 207 208
      })

      import spark.implicits._

      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
      df.map(r => {
209
        AudienceInfoPre(r._1, r._2.mkString(","), r._3)
wang-jinfeng committed
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
      }).repartition(coalesce.toInt)
        .toDF
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(output)

    } finally {
      if (sc != null) {
        sc.stop()
      }
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }
}

object RTDmpMainPre {
  def main(args: Array[String]): Unit = {
    new RTDmpMainPre().run(args)
  }
}