DataCompare.scala 4.18 KB
package mobvista.dmp.test

import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object DataCompare {

  def main(args: Array[String]): Unit = {
    var sc: SparkContext = null

    try {
      val options = buildOptions()
      val parser = new BasicParser
      val commandLine = parser.parse(options, args)

      val input01 = commandLine.getOptionValue("input01")
      val input02 = commandLine.getOptionValue("input02")

      val spark = createSparkSession("mydatacompare")
      val sc = spark.sparkContext
      println("*************************")
      println(s"* input01 = $input01")
      println(s"* input02 = $input02")
      println("*************************")

      val data01 = sc.textFile(input01)
      var data02: RDD[String] = null
      if(input02.contains("mytest_BundleMatchMain") || input02.contains("mytest_3s_BundleMatchMain")|| input02.contains("mytest_adn_install_BundleMatchMain")|| input02.contains("mytest_ga_BundleMatchMain") )
      { data02 = spark.read.orc(input02).rdd.map(_.mkString("\t"));
        spark.read.orc(input02).show(false)}
        else
      { data02 = sc.textFile(input02)}
      println("data01.count()=======================" + data01.count())
     // data01.take(100).foreach(println)
      val rdd1: RDD[String] = data01.filter(_.startsWith("F064D5F9-83B1-462A-9A2A-31F03F385A30"))
      rdd1.take(10).foreach(println)
      rdd1.take(10).foreach((x:String)=>println(x.length))
      val rdd3: RDD[String] = data01.filter(_.startsWith("17e6274b-41b9-4c96-b349-c386a7709165"))
      rdd3.take(10).foreach(println)
      rdd3.take(10).foreach((x:String)=>println(x.length))
      val rdd5: RDD[String] = data01.filter(_.startsWith("e607767b-b7a7-44f2-a07c-1131154eda12"))
      rdd5.take(10).foreach(println)
      rdd5.take(10).foreach((x:String)=>println(x.length))
      println( "data02.count()===============" + data02.count())
    //  data02.take(100).foreach(println)
      val rdd2: RDD[String] = data02.filter(_.startsWith("F064D5F9-83B1-462A-9A2A-31F03F385A30"))
      rdd2.take(10).foreach(println)
      rdd2.take(10).foreach((x:String)=>println(x.length))
      val rdd4: RDD[String] = data02.filter(_.startsWith("17e6274b-41b9-4c96-b349-c386a7709165"))
      rdd4.take(10).foreach(println)
      rdd4.take(10).foreach((x:String)=>println(x.length))
      val rdd6: RDD[String] = data02.filter(_.startsWith("e607767b-b7a7-44f2-a07c-1131154eda12"))
      rdd6.take(10).foreach(println)
      rdd6.take(10).foreach((x:String)=>println(x.length))
    //  val res_trim = data01.map(r =>{r.trim() }).subtract(data02.map(r =>{r.trim() }))
    //  println( "res_trim.count()=======" + res_trim.count() )

  //    val res_trim_test = data01.map(record =>{record.substring(0,record.lastIndexOf("\t")).trim() }).subtract(data02.map(r =>{r.trim() }))
  //    res_trim_test.take(100).foreach(println)
  //    println( "res_trim_test.count()=======" + res_trim_test.count() )

      println( "data01.subtract(data02).count()=======" + data01.subtract(data02).count() )
      data01.subtract(data02).take(300).foreach(println)
      println( "data02.subtract(data01).count()==========" + data02.subtract(data01).count() )
      data02.subtract(data01).take(200).foreach(println)
      println( "data01.intersection(data02).count()======="+data01.intersection(data02).count() )
      data01.intersection(data02).take(30).foreach(println)
    } finally {
      if (sc != null) {
        sc.stop()
      }
    }
  }

  def buildOptions(): Options = {
    val options = new Options
    options.addOption("input01", true, "[must] input path 01")
    options.addOption("input02", true, "[must] input path 02")
    options
  }
  def createSparkSession(appName: String): SparkSession = {
    SparkSession
      .builder()
      .appName(appName)
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "lz4")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()
  }
}