package mobvista.dmp.test import org.apache.commons.cli.{BasicParser, Options} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object mrandsparkCompare { def main(args: Array[String]): Unit = { var sc: SparkContext = null try { val options = buildOptions() val parser = new BasicParser val commandLine = parser.parse(options, args) val mr_input = commandLine.getOptionValue("mrinput") val spark_input = commandLine.getOptionValue("sparkinput") println("*************************") println(s"* input = $mr_input") println(s"* output = $spark_input") println("*************************") val conf = new SparkConf() .setAppName("compare_mr_and_spark") sc = new SparkContext(conf) val mr_data = sc.textFile(mr_input) val spark_data = sc.textFile(spark_input) println("mr_data.count()=======================" + mr_data.count()) // mr_data.take(100).foreach(println) val rdd1: RDD[String] = mr_data.filter(_.startsWith("F064D5F9-83B1-462A-9A2A-31F03F385A30")) rdd1.take(10).foreach(println) rdd1.take(10).foreach((x:String)=>println(x.length)) val rdd3: RDD[String] = mr_data.filter(_.startsWith("17e6274b-41b9-4c96-b349-c386a7709165")) rdd3.take(10).foreach(println) rdd3.take(10).foreach((x:String)=>println(x.length)) println( "spark_data.count()===============" + spark_data.count() ) // spark_data.take(100).foreach(println) val rdd2: RDD[String] = spark_data.filter(_.startsWith("F064D5F9-83B1-462A-9A2A-31F03F385A30")) rdd2.take(10).foreach(println) rdd2.take(10).foreach((x:String)=>println(x.length)) val rdd4: RDD[String] = spark_data.filter(_.startsWith("17e6274b-41b9-4c96-b349-c386a7709165")) rdd4.take(10).foreach(println) rdd4.take(10).foreach((x:String)=>println(x.length)) val res = mr_data.subtract(spark_data) println( "res.count()=======" + res.count() ) res.take(200).foreach(println) println( "rdd intersection count()======="+mr_data.intersection(spark_data).count() ) mr_data.intersection(spark_data).take(30).foreach(println) } finally { if (sc != null) { sc.stop() } } } def buildOptions(): Options = { val options = new Options options.addOption("mrinput", true, "[must] mr input path") options.addOption("sparkinput", true, "[must] spark input path") options } }