EtlAdnRequestOtherDaily.scala 3.84 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
package mobvista.dmp.datasource.adn_request_other

import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ArrayBuffer

/**
 *
 */
class EtlAdnRequestOtherDaily extends CommonSparkJob with Serializable {

  override protected def run(args: Array[String]): Int = {

    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return 1
    } else {
      printOptions(commandLine)
    }

    val input = commandLine.getOptionValue("input")
    val output = commandLine.getOptionValue("output")

    val spark = SparkSession
      .builder()
      .appName("EtlAdnRequestOtherDaily")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.autoBroadcastJoinThreshold", "209715200")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()

    import spark.implicits._
    val sc = spark.sparkContext
    try {
      //  sc.newAPIHadoopFile[LongWritable, Text, CombineTextInputFormat](input)
      sc.textFile(input)
        .map(line => line)
        .map(splitFun)
        .filter(filterData)
        .map(array => {
          val gaid = array(15)
          val idfa = array(16)
          val imei = array(12)
          val platform = array(4)
          val android = array(14)
          val extPackageName = array(19)

          val buffer = new ArrayBuffer[(String, String, String, String)]()
          if ("android".equals(platform)) {
            if (StringUtils.isNotBlank(gaid)) {
              buffer += ((gaid, "gaid", platform, extPackageName))
            }
            if (StringUtils.isNotBlank(imei)) {
              buffer += ((imei, "imei", platform, extPackageName))
            }
            if (StringUtils.isNotBlank(android)) {
              buffer += ((android, "androidid", "android", extPackageName))
            }
          } else {
            buffer += ((idfa, "idfa", platform, extPackageName))
          }
          buffer.toArray
        })
        .flatMap(arr => arr.map(tuple => tuple))
        .toDF("device_id", "device_type", "platform", "package_name")
        .createOrReplaceTempView("t_daily")

      val sql =
        """
          |select t.device_id, t.device_type, t.platform, t.package_name
          |from t_daily t
          |where length(t.package_name) >= 4
          |group by t.device_id, t.device_type, t.platform, t.package_name
        """.stripMargin
      spark.sql(sql)
        .write
        .option("orc.compress", "zlib")
        .orc(output)
    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  /**
   *
   * @param array
   * @return
   */
  def filterData(array: Array[String]): Boolean = {
    if (array.length <= 20) {
      return false
    }
    val gaid = array(15)
    val idfa = array(16)
    val imei = array(12)
    val platform = array(4)
    val extPackageName = array(19)

    // 过滤掉gaid和idfa全为空的情况
    if (StringUtils.isBlank(gaid) && StringUtils.isBlank(idfa) && StringUtils.isBlank(imei)) {
      return false
    }

    // 过滤掉gaid、idfa和platform格式错误
    if ("android".equals(platform)) {
      if (!gaid.matches(didPtn) || gaid.equals(allZero) || !imei.matches(imeiPtn)) {
        return false
      }
    } else if ("ios".equals(platform)) {
      if (!idfa.matches(didPtn) || idfa.equals(allZero)) {
        return false
      }
    } else {
      return false
    }

    // 过滤掉extPackageName为空
    if (StringUtils.isBlank(extPackageName)) {
      return false
    }
    true
  }
}

object EtlAdnRequestOtherDaily {
  def main(args: Array[String]): Unit = {
    new EtlAdnRequestOtherDaily().run(args)
  }
}