mrandsparkCompareORC.scala 3.41 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
package mobvista.dmp.test

import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}

object mrandsparkCompareORC {

  def main(args: Array[String]): Unit = {
    var sc: SparkContext = null

    try {
      val options = buildOptions()
      val parser = new BasicParser
      val commandLine = parser.parse(options, args)

      val mr_input = commandLine.getOptionValue("mrinput")
      val spark_input = commandLine.getOptionValue("sparkinput")

      println("*************************")
      println(s"* input = $mr_input")
      println(s"* output = $spark_input")
      println("*************************")


      val spark = SparkSession
       .builder()
       .config("spark.rdd.compress", "true")
       .config("spark.default.parallelism", "2000")
       .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
       .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
       .enableHiveSupport()
       .getOrCreate()


      val mr_data = spark.read.schema(dspEtlSchema).orc(mr_input).rdd
      val spark_data = spark.read.schema(dspEtlSchema).orc(spark_input).rdd

      println("mr_data.count()=======================" + mr_data.count())
     // mr_data.take(100).foreach(println)
      val rdd1 = mr_data.filter( _.getAs[String]("idfa")=="F064D5F9-83B1-462A-9A2A-31F03F385A30")
      rdd1.take(10).foreach(println)
      val rdd3 = mr_data.filter(_.getAs[String]("idfa")=="17e6274b-41b9-4c96-b349-c386a7709165")
      rdd3.take(10).foreach(println)
      println( "spark_data.count()===============" + spark_data.count() )
    //  spark_data.take(100).foreach(println)
      val rdd2 = spark_data.filter(_.getAs[String]("idfa")=="F064D5F9-83B1-462A-9A2A-31F03F385A30")
      rdd2.take(10).foreach(println)
      val rdd4 = spark_data.filter(_.getAs[String]("idfa")=="17e6274b-41b9-4c96-b349-c386a7709165")
      rdd4.take(10).foreach(println)
      val res = mr_data.subtract(spark_data)
      println( "res.count()=======" + res.count() )
      res.take(200).foreach(println)
      println( "rdd intersection count()======="+mr_data.intersection(spark_data).count() )
      mr_data.intersection(spark_data).take(30).foreach(println)
    } finally {
      if (sc != null) {
        sc.stop()
      }
    }
  }

  def buildOptions(): Options = {
    val options = new Options
    options.addOption("mrinput", true, "[must] mr input path")
    options.addOption("sparkinput", true, "[must] spark input path")
    options
  }


  def dspEtlSchema: StructType = {
    StructType(StructField("idfa", StringType) ::
      StructField("gaid", StringType) ::
      StructField("platform", StringType) ::
      StructField("country", StringType) ::
      StructField("ip", StringType) ::
      StructField("gender", StringType) ::
      StructField("birthday", StringType) ::
      StructField("maker", StringType) ::
      StructField("model", StringType) ::
      StructField("osVersion", StringType) ::
      StructField("packageName", StringType) ::
      StructField("exitId", StringType) ::
      StructField("datetime", StringType) ::
      StructField("segment", StringType) ::
      StructField("dealerid", StringType) ::
      StructField("exchanges", StringType) ::
      StructField("region", StringType)
      :: Nil)
  }

}