GaInstallDaily.scala 2.11 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
package mobvista.dmp.datasource.ga

import java.net.URI
import java.util.regex.Pattern

import mobvista.dmp.util.MRUtils
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}

import scala.collection.mutable.ArrayBuffer

object GaInstallDaily {

  private val splitPtn = Pattern.compile("\\|")

  def main(args: Array[String]): Unit = {
    val options = buildOptions()
    val parser = new BasicParser
    val commandLine = parser.parse(options, args)


    val input_path_install = commandLine.getOptionValue("input_path_install")
    val output = commandLine.getOptionValue("output")
    val coalesce = commandLine.getOptionValue("coalesce").toInt

    val spark = mobvista.dmp.common.MobvistaConstant.createSparkSession(s"GaInstallDaily")

    val sc = spark.sparkContext

    FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
    try {
      val input_path_install_rdd = sc.textFile(input_path_install)

      input_path_install_rdd.flatMap(line => {
        parseflatMapData(line)
      }).coalesce(coalesce).saveAsTextFile(output)

    } finally {
      if (sc != null) {
        sc.stop()
      }
    }
  }


  def parseflatMapData(line: String): Array[String] = {
    val arrayBuffer = new ArrayBuffer[String]()
    val array: Array[String] = splitPtn.split(line)
    if (array.length == 4) {
      var idType: String = null
      var platform: String = null
      if (array(1).equalsIgnoreCase("ios")) {
        idType = "idfa"
        platform = "ios"
        arrayBuffer += MRUtils.JOINER.join(array(0),idType,platform, array(2),array(3))
      } else if (array(1).equalsIgnoreCase("android")) {
        idType = "gaid"
        platform = "adr"
        arrayBuffer += MRUtils.JOINER.join(array(0),idType,platform, array(2),array(3))
      }
    }
    arrayBuffer.toArray
  }

  def buildOptions(): Options = {
    val options = new Options
    options.addOption("input_path_install", true, "[must]  input path")
    options.addOption("output", true, "[must] output path")
    options.addOption("coalesce", true, "[must] coalesce")
    options
  }
}