TrackingInstallDaily.scala 5.65 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
package mobvista.dmp.datasource.tracking_3s

import java.net.URI

import mobvista.dmp.common.{CommonMapReduce, CommonSparkJob}
import mobvista.prd.datasource.util.MRUtils
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row

import scala.collection.mutable.ArrayBuffer

object TrackingInstallDaily {

  val idfaRegex = "^[0-9A-F]{8}(-[0-9A-F]{4}){3}-[0-9A-F]{12}$"
  def main(args: Array[String]): Unit = {
    val options = buildOptions()
    val parser = new BasicParser
    val commandLine = parser.parse(options, args)

    //   if (!checkMustOption(commandLine)) {
    //     printUsage(options)
    //     return 1
    //   } else {
    //     printOptions(commandLine)
    //   }

    val campaign_input_path = commandLine.getOptionValue("campaign_input_path")
    val input_path_3s = commandLine.getOptionValue("input_path_3s")
    val output = commandLine.getOptionValue("output")
    val datetime = commandLine.getOptionValue("date")
    val coalesce = commandLine.getOptionValue("coalesce").toInt

    val spark = mobvista.dmp.common.MobvistaConstant.createSparkSession(s"TrackingInstallDaily")

    val sc = spark.sparkContext

    FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

    try{
      val campaign_input = sc.textFile(campaign_input_path)
      val input_3s = sc.textFile(input_path_3s)

      val campaign_input_filter: RDD[String] = campaign_input.filter(line => {
        if (line.contains("\t")) {
          val strings: Array[String] = MRUtils.SPLITTER.split(line, -1)
          strings.length == 3 && !StringUtils.isEmpty(strings(2))
        }else{
           false
        }
      })

      val campaign_input_filterMap = campaign_input_filter.map(record => {
        val splits = StringUtils.splitPreserveAllTokens(record, "\t", -1)
        (splits(0), (splits(1),splits(2)))
      }).collectAsMap()
      val campaign_input_filterMapBC: Broadcast[collection.Map[String, (String,String)]] = sc.broadcast(campaign_input_filterMap)

    //  val input_3s_faltmap: RDD[String] = input_3s.filter(!_.contains("\t")).flatMap(line=>{ parseflatMapData(line,campaign_input_filterMapBC,datetime) })  to be same as mobvista.dmp.datasource.tracking.mapreduce.TrackingInstallDailyMR 
      val input_3s_faltmap: RDD[String] = input_3s.flatMap(line=>{ parseflatMapData(line,campaign_input_filterMapBC,datetime) })
      input_3s_faltmap.map(line => {parseMapData(line,campaign_input_filterMapBC)}).coalesce(coalesce).saveAsTextFile(output,classOf[GzipCodec])

    }finally{
      if (sc != null) {
        sc.stop()
      }
    }

  }


  def parseflatMapData(line: String,bmap: Broadcast[collection.Map[String, (String,String)]],date:String): Array[String] = {
    val arrayBuffer = new ArrayBuffer[String]()
    val fields: Array[String] = line.split(",",-1)
    val matchPackage = bmap.value.get(fields(1))
    println("00=="+fields(1))
    if (matchPackage.isDefined) {
      if (fields != null && fields.length >= 29) {
        if (fields(10).matches(CommonMapReduce.didPtn) && !fields(10).equals(CommonMapReduce.allZero)) {
          arrayBuffer += MRUtils.JOINER.join(fields(1), fields(10), fields(3).toUpperCase, date)
        } else if (fields(28).matches(CommonMapReduce.didPtn) && !fields(28).equals(CommonMapReduce.allZero)) {
          arrayBuffer += MRUtils.JOINER.join(fields(1), fields(28), fields(3).toUpperCase, date)
        }
        if (!fields(10).isEmpty && fields(10).matches(CommonMapReduce.andriodIdPtn)) {
          arrayBuffer += MRUtils.JOINER.join(fields(1), fields(10), fields(3).toUpperCase, date)
        }
        if (!fields(11).isEmpty && fields(11).matches(CommonMapReduce.imeiPtn)) {
          arrayBuffer += MRUtils.JOINER.join(fields(1), "imei", fields(11), fields(3).toUpperCase, date)
        }
      }
    }
    arrayBuffer.toArray
  }

  def parseMapData(line: String,bmap: Broadcast[collection.Map[String, (String,String)]]):String ={
    val fields: Array[String] = MRUtils.SPLITTER.split(line,-1)
    if(fields.length==5){
      MRUtils.JOINER.join(fields(2), "imei", "android",  bmap.value.get(fields(0)).get._2, fields(4), fields(3));
    }else{
      val deviceId = fields(1).replaceAll("\"", "").replaceAll("\\[", "")
      val platform = getPlatform(bmap.value.get(fields(0)).get._1, deviceId)
      var deviceType = getDeviceType(platform)
      var packageName=bmap.value.get(fields(0)).get._2
      if ("ios" == platform) packageName = packageName.replace("id", "")
      if (deviceId.matches(CommonMapReduce.andriodIdPtn)) deviceType = "androidid"
      MRUtils.JOINER.join(deviceId, deviceType, platform, packageName, fields(3), fields(2))
    }
  }


  private def getPlatform(platform: String, deviceId: String): String = {
    if (!"ios".equalsIgnoreCase(platform) && !"android".equalsIgnoreCase(platform)) {
      if (deviceId.matches(idfaRegex)) return "ios"
      else return "android"
    }
    platform
  }

  private def getDeviceType(platform: String) = platform match {
    case "ios" =>
      "idfa"
    case "android" =>
      "gaid"
    case "adr" =>
      "gaid"
    case _ =>
      "unknown"
  }

  def buildOptions(): Options = {
    val options = new Options
    options.addOption("campaign_input_path", true, "[must] campaign input path")
    options.addOption("input_path_3s", true, "[must] 3s input path")
    options.addOption("output", true, "[must] output path")
    options.addOption("date", true, "[must] date")
    options.addOption("coalesce", true, "[must] coalesce")
    options
  }
}