package mobvista.dmp.test

import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object mrandsparkCompare {

  def main(args: Array[String]): Unit = {
    var sc: SparkContext = null

    try {
      val options = buildOptions()
      val parser = new BasicParser
      val commandLine = parser.parse(options, args)

      val mr_input = commandLine.getOptionValue("mrinput")
      val spark_input = commandLine.getOptionValue("sparkinput")

      println("*************************")
      println(s"* input = $mr_input")
      println(s"* output = $spark_input")
      println("*************************")

      val conf = new SparkConf()
        .setAppName("compare_mr_and_spark")
      sc = new SparkContext(conf)
      val mr_data = sc.textFile(mr_input)
      val spark_data = sc.textFile(spark_input)
      println("mr_data.count()=======================" + mr_data.count())
     // mr_data.take(100).foreach(println)
      val rdd1: RDD[String] = mr_data.filter(_.startsWith("F064D5F9-83B1-462A-9A2A-31F03F385A30"))
      rdd1.take(10).foreach(println)
      rdd1.take(10).foreach((x:String)=>println(x.length))
      val rdd3: RDD[String] = mr_data.filter(_.startsWith("17e6274b-41b9-4c96-b349-c386a7709165"))
      rdd3.take(10).foreach(println)
      rdd3.take(10).foreach((x:String)=>println(x.length))
      println( "spark_data.count()===============" + spark_data.count() )
    //  spark_data.take(100).foreach(println)
      val rdd2: RDD[String] = spark_data.filter(_.startsWith("F064D5F9-83B1-462A-9A2A-31F03F385A30"))
      rdd2.take(10).foreach(println)
      rdd2.take(10).foreach((x:String)=>println(x.length))
      val rdd4: RDD[String] = spark_data.filter(_.startsWith("17e6274b-41b9-4c96-b349-c386a7709165"))
      rdd4.take(10).foreach(println)
      rdd4.take(10).foreach((x:String)=>println(x.length))
      val res = mr_data.subtract(spark_data)
      println( "res.count()=======" + res.count() )
      res.take(200).foreach(println)
      println( "rdd intersection count()======="+mr_data.intersection(spark_data).count() )
      mr_data.intersection(spark_data).take(30).foreach(println)
    } finally {
      if (sc != null) {
        sc.stop()
      }
    }
  }

  def buildOptions(): Options = {
    val options = new Options
    options.addOption("mrinput", true, "[must] mr input path")
    options.addOption("sparkinput", true, "[must] spark input path")
    options
  }
}