JoypacDaily.scala 4.01 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
package mobvista.dmp.datasource.joypac

import java.net.URI
import java.util

import mobvista.dmp.common.CommonSparkJob
import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}

import scala.collection.JavaConversions._

/**
  * @package: mobvista.dmp.datasource.joypac
  * @author: wangjf
  * @date: 2019/3/4
  * @time: 下午1:44
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
class JoypacDaily extends CommonSparkJob {

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("input", true, "[must] input")
    options.addOption("file", true, "[must] file")
    options.addOption("output", true, "[must] output")
    options.addOption("coalesce", true, "[must] coalesce")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else {
      printOptions(commandLine)
    }

    val input = commandLine.getOptionValue("input")
    val file = commandLine.getOptionValue("file")
    val output = commandLine.getOptionValue("output")
    val coalesce = commandLine.getOptionValue("coalesce")

    val spark = SparkSession.builder()
      .appName("JoypacDaily")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()
    val sc = spark.sparkContext
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)

    try {

      val packageMap = sc.textFile(file).map(_.split(";")).map(r =>
        if (r(0).endsWith("://")) {
          (r(0).substring(0, r(0).length - 3), r(1))
        } else {
          (r(0), r(1))
        }
      ).collectAsMap()

      val bPackageMap = sc.broadcast(packageMap)

      val rdd = spark.read.format("orc").load(input).rdd
        .filter(r => {
          bPackageMap.value.contains(r.getAs("package_name").toString)
        })
        .map(r => {
          val device_id = r.getAs("idfa").toString
          val device_type = "idfa"
          val platform_id = r.getAs("platform").toString
          val platform = if (platform_id.equals("1")) {
            "android"
          } else if (platform_id.equals("2")) {
            "ios"
          } else {
            "other"
          }
          val set = new util.HashSet[String]()
          val jsonArray = GsonUtil.String2JsonArray(r.getAs("apps_info").toString)
          jsonArray.foreach(element => {
            val ir = element.getAsJsonObject.entrySet().iterator()
            while (ir.hasNext) {
              val itr = ir.next()
              val package_name = itr.getKey
              val code = itr.getValue.getAsString
              if (code.equals("1")) {
                if (bPackageMap.value.contains(package_name)) {
                  set.add(bPackageMap.value(package_name))
                }
              }
            }
          })

          val bundle_id = bPackageMap.value(r.getAs("package_name").toString)
          set.add(bundle_id)

          var package_name = ""
          set.iterator().foreach(r => {
            package_name += r + ";"
          })
          val country = "CN"
          Joypac(device_id, device_type, platform, package_name.substring(0, package_name.length - 1), bundle_id, country)
        })
      import spark.implicits._
      rdd.toDF().coalesce(coalesce.toInt).write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(output)
    } finally {
      sc.stop()
      spark.stop()
    }
    0
  }
}

object JoypacDaily {
  def main(args: Array[String]): Unit = {
    new JoypacDaily().run(args)
  }
}