ParquetDemo.scala 2.41 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
package mobvista.dmp.demo

import java.io.{BufferedReader, FileInputStream, InputStreamReader}

import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.lang.StringUtils
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
/**
  * Created by fl on 2017/6/22.
  */
class ParquetDemo {
  val dataSplit = "\t"
  def run(args: Array[String]): Int = {
    val conf = new SparkConf().setAppName("ParquetDemo")
      .set("spark.default.parallelism", "200")
    val sc = new SparkContext(conf)
    val data = sc.textFile("s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/dm_install_list/2017/08/09/mp")
    val tuple = data.flatMap(doMap(_))

    tuple.cache()
    val pkgUnique = tuple.map(t => (t._2, 1)).reduceByKey(_+_).map(_._1)
    val didUnique = tuple.map(t=> (t._1, 1)).reduceByKey(_+_).map(_._1)
    pkgUnique.count()
    didUnique.count()

    val installData = sc.textFile("s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/dm_install_list/2017/08/09/{3s,adn_install,adn_request_sdk,adserver,dsp_req,ga,other}")
    val installTuple = installData.flatMap(doMap(_))
    val installPkgUnique = installTuple.map(t => (t._2, 1)).reduceByKey(_+_).map(_._1)
    val installDidUnique = installTuple.map(t=> (t._1, 1)).reduceByKey(_+_).map(_._1)

    pkgUnique.subtract(installPkgUnique).count()
    didUnique.subtract(installDidUnique).count()
    0
  }

  def doMap (record: String): Array[Tuple2[String, String]] = {
    val buffer = new ArrayBuffer[Tuple2[String, String]] ()
    val splits = StringUtils.splitPreserveAllTokens (record, dataSplit, - 1)
    val device_id = splits (0)
    val device_type = splits (1)
    val list = splits (3)
    val jsonArray = GsonUtil.String2JsonArray (list)
    jsonArray.foreach (element => {
      val obj = element.getAsJsonObject
      buffer += Tuple2 (s"$device_id$dataSplit$device_type", obj.get ("package_name").getAsString)
    })
    buffer.toArray
  }

}

object ParquetDemo {
  def main(args: Array[String]): Unit = {
    val in = new FileInputStream("/Users/fl/Downloads/ios.txt")
    val reader = new InputStreamReader(in)
    val buffer = new BufferedReader(reader)
    var line = buffer.readLine()
    var count = 1
    while (line != null) {
      if (line.split("\t", -1).length < 4) {
        println(count)
        println(line)
      }
      count += 1
      line = buffer.readLine()
    }
  }
}