JoypacResultAll.scala 5.91 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
package mobvista.dmp.datasource.joypac

import java.net.URI

import com.alibaba.fastjson.JSON
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.util.{DateUtil, MRUtils}
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.codehaus.jackson.map.ObjectMapper

import scala.collection.mutable

/**
  * @package: mobvista.dmp.datasource.joypac
  * @author: wangjf
  * @date: 2019-12-18
  * @time: 14:10:50
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */

class JoypacResultAll extends CommonSparkJob {

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("date", true, "[must] date")
    options.addOption("output", true, "[must] output")
    options.addOption("coalesce", true, "[must] coalesce")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else {
      printOptions(commandLine)
    }

    val date = commandLine.getOptionValue("date")
    val output = commandLine.getOptionValue("output")
    val coalesce = commandLine.getOptionValue("coalesce")

    val spark = SparkSession.builder()
      .appName("JoypacResultAll")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()
    val sc = spark.sparkContext

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)
    try {

      val etlDF = spark.sql(JoypacResultAll.sql.replace("@date", date).replace("@part", "etl"))
        .rdd.map(r => {
        (MRUtils.JOINER.join(r.getAs("device_id").toString, r.getAs("package_name").toString),
          MRUtils.JOINER.join(r.getAs("platform"), r.getAs("app_version"), r.getAs("apps_info"), r.getAs("update_date")))
      })

      val yesDate = DateUtil.getDayByString(date, "yyyyMMdd", -1)

      val allDF = spark.sql(JoypacResultAll.sql.replace("@date", yesDate).replace("@part", "all"))
        .rdd.map(r => {
        (MRUtils.JOINER.join(r.getAs("device_id").toString, r.getAs("package_name").toString),
          MRUtils.JOINER.join(r.getAs("platform"), r.getAs("app_version"), r.getAs("apps_info"), r.getAs("update_date")))
      })

      import spark.implicits._

      val df = etlDF.fullOuterJoin(allDF)
        .map(JoypacResultAll.map)
        .toDF

      df.coalesce(Integer.parseInt(coalesce))
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(output)

    } finally {
      sc.stop()
      spark.stop()
    }
    0
  }

}

object JoypacResultAll {

  val sql: String =
    """
      |SELECT * FROM dwh.joypac_result WHERE dt = '@date' AND part = '@part'
    """.stripMargin

  val mapper = new ObjectMapper()

  def map(t: (String, (Option[String], Option[String]))): JoypacEntity = {
    val key = t._1
    val keys = MRUtils.SPLITTER.split(key)
    val device_id = keys(0)
    val package_name = keys(1)
    var platform = ""
    var app_version = ""
    var apps_info = ""
    var update_date = ""

    val valOpt = t._2
    val dailyOpt = valOpt._1
    val allOpt = valOpt._2

    if (dailyOpt.isDefined && allOpt.isDefined) { //  1,1
      val dVals = MRUtils.SPLITTER.split(dailyOpt.get)
      platform = dVals(0)
      val d_app_version = dVals(1)
      import scala.collection.JavaConverters._
      val dMap = JSON.parse(dVals(2)).asInstanceOf[java.util.Map[String, String]].asScala
        .map(kv => (kv._1, Integer.parseInt(kv._2)))

      val aVals = MRUtils.SPLITTER.split(allOpt.get)
      val a_app_version = aVals(1)
      val aMap = JSON.parse(aVals(2)).asInstanceOf[java.util.Map[String, Int]].asScala

      val map = new mutable.HashMap[String, Int]()
      for (a_kv <- aMap) {
        for (d_kv <- dMap) {
          if (d_kv._1.equals(a_kv._1)) {
            val status = d_kv._2 match {
              case 1 => //  schema_url = 1,则 status = 1
                1
              case 0 => //  schema_url = 0,则 status = 2
                2
            }
            map.put(d_kv._1, status)
          }
        }
      }
      for (kv <- aMap) {
        //  如果新旧 app_version 一样,则将 JOIN 不上的旧的 apps_info 输出,保留原值
        if (d_app_version.equals(a_app_version) && !map.contains(kv._1)) {
          map.put(kv._1, kv._2)
        }
      }
      for (kv <- dMap) {
        //  将 JOIN 不上的新的 schema_url 对应值为 1的 apps_info 输出,并初始化 status = 0
        if (!map.contains(kv._1) && kv._2 == 1) {
          map.put(kv._1, 0)
        }
      }
      app_version = a_app_version
      apps_info = mapper.writeValueAsString(map.asJava)
      update_date = dVals(3)
    } else if (dailyOpt.isDefined && allOpt.isEmpty) { //  1, 0
      val vals = MRUtils.SPLITTER.split(dailyOpt.get)
      platform = vals(0)
      app_version = vals(1)
      import scala.collection.JavaConverters._
      val map = JSON.parse(vals(2)).asInstanceOf[java.util.Map[String, String]].asScala
        .retain((k, v) => Integer.parseInt(v) == 1 && k.length > 1).map(kv => {
        (kv._1, 0)
      })
      apps_info = mapper.writeValueAsString(map.asJava)
      update_date = vals(3)
    } else if (dailyOpt.isEmpty && allOpt.isDefined) { //  0, 1
      val vals = MRUtils.SPLITTER.split(allOpt.get)
      platform = vals(0)
      app_version = vals(1)
      apps_info = vals(2)
      update_date = vals(3)
    }
    JoypacEntity(device_id, platform, app_version, package_name, apps_info, update_date)
  }

  def main(args: Array[String]): Unit = {
    new JoypacResultAll().run(args)
  }
}