1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package mobvista.dmp.datasource.datatory
import java.net.URI
import mobvista.dmp.clickhouse.tracking.AdnTrackingEntity
import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.collection.mutable
/**
* @package: mobvista.dmp.datasource.datatory
* @author: wangjf
* @date: 2019/11/25
* @time: 15:00
* @email: jinfeng.wang@mobvista.com
* @phone: 152-1062-7698
*/
class AdnTrackingMergeDaily extends CommonSparkJob with java.io.Serializable {
def commandOptions(): Options = {
val options = new Options()
options.addOption("date", true, "date")
options.addOption("output", true, "output")
options
}
var eventMap: Broadcast[scala.collection.Map[String, String]] = null
override protected def run(args: Array[String]): Int = {
val parser = new BasicParser()
val options = commandOptions()
val commandLine = parser.parse(options, args)
val date = commandLine.getOptionValue("date")
val output = commandLine.getOptionValue("output")
val spark = SparkSession
.builder()
.appName("AdnTrackingMergeDaily")
.config("spark.rdd.compress", "true")
.config("spark.shuffle.compress", "true")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.io.compression.codec", "lz4")
.config("spark.io.compression.lz4.blockSize", "64k")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val sc = spark.sparkContext
try {
FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
eventMap = sc.broadcast(spark.sql(Constant.event_sql.replace("@date", date))
.rdd.map(r => {
(r.getAs("event_name").toString, r.getAs("event_type").toString)
}).collectAsMap())
spark.udf.register("getEventType", getEventType _)
var sql = Constant.adn_tracking_install_join_event_sql.replace("@date", date)
spark.sql(sql).createOrReplaceTempView("event_join")
sql = Constant.adn_tracking_merge_sql.replace("@date", date)
spark.sql(sql)
.dropDuplicates
.rdd.map(r => {
val campaign_id = r.getAs("campaign_id").asInstanceOf[mutable.WrappedArray[String]]
val campaignSet: mutable.HashSet[String] = new mutable.HashSet[String]()
campaign_id.foreach(c => {
if (StringUtils.isNotBlank(c)) {
campaignSet.add(c)
}
})
val event_name = r.getAs("event_name").asInstanceOf[mutable.WrappedArray[String]]
val eventNameSet: mutable.HashSet[String] = new mutable.HashSet[String]()
event_name.foreach(c => {
if (StringUtils.isNotBlank(c)) {
eventNameSet.add(c)
}
})
val event_type = r.getAs("event_type").asInstanceOf[mutable.WrappedArray[String]]
val eventTypeSet: mutable.HashSet[String] = new mutable.HashSet[String]()
event_type.foreach(c => {
if (StringUtils.isNotBlank(c)) {
eventTypeSet.add(c)
}
})
val app_id = r.getAs("app_id").asInstanceOf[mutable.WrappedArray[String]]
val appSet: mutable.HashSet[String] = new mutable.HashSet[String]()
app_id.foreach(c => {
if (StringUtils.isNotBlank(c)) {
appSet.add(c)
}
})
AdnTrackingEntity(r.getAs("device_id"), r.getAs("device_model"), r.getAs("os_version"), r.getAs("country"),
r.getAs("city"), mutable.WrappedArray.make(campaignSet.toArray), mutable.WrappedArray.make(eventNameSet.toArray),
mutable.WrappedArray.make(eventTypeSet.toArray), mutable.WrappedArray.make(appSet.toArray), r.getAs("log_type"))
}).toDF
.write
.mode(SaveMode.Overwrite)
.option("orc.compress", "snappy")
.orc(output)
} finally {
if (spark != null) {
spark.stop()
}
}
0
}
def getEventType(eventName: String): String = {
eventMap.value.getOrElse(eventName, "")
}
}
object AdnTrackingMergeDaily {
def main(args: Array[String]): Unit = {
new AdnTrackingMergeDaily().run(args)
}
}