1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package mobvista.dmp.test
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object DataCompare {
def main(args: Array[String]): Unit = {
var sc: SparkContext = null
try {
val options = buildOptions()
val parser = new BasicParser
val commandLine = parser.parse(options, args)
val input01 = commandLine.getOptionValue("input01")
val input02 = commandLine.getOptionValue("input02")
val spark = createSparkSession("mydatacompare")
val sc = spark.sparkContext
println("*************************")
println(s"* input01 = $input01")
println(s"* input02 = $input02")
println("*************************")
val data01 = sc.textFile(input01)
var data02: RDD[String] = null
if(input02.contains("mytest_BundleMatchMain") || input02.contains("mytest_3s_BundleMatchMain")|| input02.contains("mytest_adn_install_BundleMatchMain")|| input02.contains("mytest_ga_BundleMatchMain") )
{ data02 = spark.read.orc(input02).rdd.map(_.mkString("\t"));
spark.read.orc(input02).show(false)}
else
{ data02 = sc.textFile(input02)}
println("data01.count()=======================" + data01.count())
// data01.take(100).foreach(println)
val rdd1: RDD[String] = data01.filter(_.startsWith("F064D5F9-83B1-462A-9A2A-31F03F385A30"))
rdd1.take(10).foreach(println)
rdd1.take(10).foreach((x:String)=>println(x.length))
val rdd3: RDD[String] = data01.filter(_.startsWith("17e6274b-41b9-4c96-b349-c386a7709165"))
rdd3.take(10).foreach(println)
rdd3.take(10).foreach((x:String)=>println(x.length))
val rdd5: RDD[String] = data01.filter(_.startsWith("e607767b-b7a7-44f2-a07c-1131154eda12"))
rdd5.take(10).foreach(println)
rdd5.take(10).foreach((x:String)=>println(x.length))
println( "data02.count()===============" + data02.count())
// data02.take(100).foreach(println)
val rdd2: RDD[String] = data02.filter(_.startsWith("F064D5F9-83B1-462A-9A2A-31F03F385A30"))
rdd2.take(10).foreach(println)
rdd2.take(10).foreach((x:String)=>println(x.length))
val rdd4: RDD[String] = data02.filter(_.startsWith("17e6274b-41b9-4c96-b349-c386a7709165"))
rdd4.take(10).foreach(println)
rdd4.take(10).foreach((x:String)=>println(x.length))
val rdd6: RDD[String] = data02.filter(_.startsWith("e607767b-b7a7-44f2-a07c-1131154eda12"))
rdd6.take(10).foreach(println)
rdd6.take(10).foreach((x:String)=>println(x.length))
// val res_trim = data01.map(r =>{r.trim() }).subtract(data02.map(r =>{r.trim() }))
// println( "res_trim.count()=======" + res_trim.count() )
// val res_trim_test = data01.map(record =>{record.substring(0,record.lastIndexOf("\t")).trim() }).subtract(data02.map(r =>{r.trim() }))
// res_trim_test.take(100).foreach(println)
// println( "res_trim_test.count()=======" + res_trim_test.count() )
println( "data01.subtract(data02).count()=======" + data01.subtract(data02).count() )
data01.subtract(data02).take(300).foreach(println)
println( "data02.subtract(data01).count()==========" + data02.subtract(data01).count() )
data02.subtract(data01).take(200).foreach(println)
println( "data01.intersection(data02).count()======="+data01.intersection(data02).count() )
data01.intersection(data02).take(30).foreach(println)
} finally {
if (sc != null) {
sc.stop()
}
}
}
def buildOptions(): Options = {
val options = new Options
options.addOption("input01", true, "[must] input path 01")
options.addOption("input02", true, "[must] input path 02")
options
}
def createSparkSession(appName: String): SparkSession = {
SparkSession
.builder()
.appName(appName)
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "lz4")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
}
}