RTDmpRequestDaily.scala 5.17 KB
Newer Older
WangJinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
package mobvista.dmp.datasource.rtdmp

import com.alibaba.fastjson.{JSONArray, JSONObject}
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.format.RDDMultipleOutputFormat
import mobvista.dmp.util.DateUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
import org.apache.spark.storage.StorageLevel

import java.net.URI
import scala.collection.JavaConverters.asScalaSetConverter
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
 * @package: mobvista.dmp.datasource.rtdmp
 * @author: wangjf
 * @date: 2020/7/13
 * @time: 11:25 上午
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
class RTDmpRequestDaily extends CommonSparkJob with Serializable {

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("date", true, "date")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options.addOption("hh", true, "hh")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val date = commandLine.getOptionValue("date")
    val output = commandLine.getOptionValue("output")
    val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))
    val spark = MobvistaConstant.createSparkSession(s"RTDmpRequestDaily.$date")

    val sc = spark.sparkContext
    try {

      val conf = spark.sparkContext.hadoopConfiguration
      conf.set("mapreduce.output.compress", "true")
      conf.set("mapreduce.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
      conf.setBoolean("mapreduce.output.fileoutputformat.compress", true)
      conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
      conf.setClass("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec], classOf[CompressionCodec])

      spark.udf.register("changeDeviceId", Logic.changeDeviceId _)
      spark.udf.register("changeDeviceType", Logic.changeDeviceType _)

      val sql =
        s"""
           |SELECT changeDeviceId(device_type,device_id) device_id, changeDeviceType(device_type) device_type, install_list
           |  FROM dwh.dmp_install_list WHERE dt = '${date}' AND business = '14days'
           |""".stripMargin

      //  默认计算上个小时的数据
      val update_time_start = DateUtil.format(date + " 00:00:00", "yyyy-MM-dd HH:mm:ss")
      val update_time_end = DateUtil.format(date + " 23:59:59", "yyyy-MM-dd HH:mm:ss")

      val audience_date_utime_start = DateUtil.parse(date + " 00:00:00", "yyyy-MM-dd HH:mm:ss").getTime / 1000 - 28800
      val audience_date_utime_end = DateUtil.parse(date + " 23:59:59", "yyyy-MM-dd HH:mm:ss").getTime / 1000 - 28800

      val json: JSONObject =
        RTDmpServer.query(update_time_start, update_time_end, 0,
          2, audience_date_utime_start, audience_date_utime_end, "6")

      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

79 80 81 82
      val map = new mutable.HashMap[Integer, Array[String]]()
      for (id <- json.keySet().toArray()) {
        val pkgSet = json.getJSONObject(String.valueOf(id)).getString("installed_package_name").split(",", -1)
        map.put(Integer.parseInt(String.valueOf(id)), pkgSet)
WangJinfeng committed
83 84 85 86 87
      }

      val rdd = spark.sql(sql).rdd.map(row => {
        val deviceId = row.getAs[String]("device_id")
        val deviceType = row.getAs[String]("device_type")
88
        val install_list = MobvistaConstant.String2JSONObject(row.getAs[String]("install_list")).keySet().asScala
WangJinfeng committed
89 90
        val array = new ArrayBuffer[(String, String, Integer)]()
        for (key <- map.keySet) {
91 92
          if (map(key).toSet.intersect(install_list).size > 0) {
            array += ((deviceId, deviceType, key))
WangJinfeng committed
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
          }
        }
        array
      }).flatMap(l => l).persist(StorageLevel.MEMORY_AND_DISK_SER)

      rdd.repartition(coalesce).map(t => {
        (new Text(s"$output/${t._3}/${t._2}"), new Text(t._1))
      }).saveAsNewAPIHadoopFile(output, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]], conf)

      val mRdd = rdd.map(r => {
        (r._3, r._2)
      }).distinct(10)
        .cache()
      val pkgTypeCount = mRdd.countByKey()

      val portalMap = mRdd.collectAsMap()

      portalMap.foreach(m => {
        val updateJsonArray = new JSONArray()
        val jsonObject = new JSONObject()
        jsonObject.put("id", m._1)
        if (pkgTypeCount(m._1) == 1) {
          jsonObject.put("s3_path", s"$output/${m._1}/${m._2}/")
        } else {
          jsonObject.put("s3_path", s"$output/${m._1}/*/")
        }
        updateJsonArray.add(jsonObject)
        ServerUtil.update(updateJsonArray)
      })

    } finally {
      if (sc != null) {
        sc.stop()
      }
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }
}

object RTDmpRequestDaily {
  def main(args: Array[String]): Unit = {
    new RTDmpRequestDaily().run(args)
  }
}