1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package mobvista.dmp.test
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
object mrandsparkCompareORC {
def main(args: Array[String]): Unit = {
var sc: SparkContext = null
try {
val options = buildOptions()
val parser = new BasicParser
val commandLine = parser.parse(options, args)
val mr_input = commandLine.getOptionValue("mrinput")
val spark_input = commandLine.getOptionValue("sparkinput")
println("*************************")
println(s"* input = $mr_input")
println(s"* output = $spark_input")
println("*************************")
val spark = SparkSession
.builder()
.config("spark.rdd.compress", "true")
.config("spark.default.parallelism", "2000")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
val mr_data = spark.read.schema(dspEtlSchema).orc(mr_input).rdd
val spark_data = spark.read.schema(dspEtlSchema).orc(spark_input).rdd
println("mr_data.count()=======================" + mr_data.count())
// mr_data.take(100).foreach(println)
val rdd1 = mr_data.filter( _.getAs[String]("idfa")=="F064D5F9-83B1-462A-9A2A-31F03F385A30")
rdd1.take(10).foreach(println)
val rdd3 = mr_data.filter(_.getAs[String]("idfa")=="17e6274b-41b9-4c96-b349-c386a7709165")
rdd3.take(10).foreach(println)
println( "spark_data.count()===============" + spark_data.count() )
// spark_data.take(100).foreach(println)
val rdd2 = spark_data.filter(_.getAs[String]("idfa")=="F064D5F9-83B1-462A-9A2A-31F03F385A30")
rdd2.take(10).foreach(println)
val rdd4 = spark_data.filter(_.getAs[String]("idfa")=="17e6274b-41b9-4c96-b349-c386a7709165")
rdd4.take(10).foreach(println)
val res = mr_data.subtract(spark_data)
println( "res.count()=======" + res.count() )
res.take(200).foreach(println)
println( "rdd intersection count()======="+mr_data.intersection(spark_data).count() )
mr_data.intersection(spark_data).take(30).foreach(println)
} finally {
if (sc != null) {
sc.stop()
}
}
}
def buildOptions(): Options = {
val options = new Options
options.addOption("mrinput", true, "[must] mr input path")
options.addOption("sparkinput", true, "[must] spark input path")
options
}
def dspEtlSchema: StructType = {
StructType(StructField("idfa", StringType) ::
StructField("gaid", StringType) ::
StructField("platform", StringType) ::
StructField("country", StringType) ::
StructField("ip", StringType) ::
StructField("gender", StringType) ::
StructField("birthday", StringType) ::
StructField("maker", StringType) ::
StructField("model", StringType) ::
StructField("osVersion", StringType) ::
StructField("packageName", StringType) ::
StructField("exitId", StringType) ::
StructField("datetime", StringType) ::
StructField("segment", StringType) ::
StructField("dealerid", StringType) ::
StructField("exchanges", StringType) ::
StructField("region", StringType)
:: Nil)
}
}