1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package mobvista.dmp.datasource.joypac
import java.net.URI
import com.alibaba.fastjson.JSON
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.util.{DateUtil, MRUtils}
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.codehaus.jackson.map.ObjectMapper
import scala.collection.mutable
/**
* @package: mobvista.dmp.datasource.joypac
* @author: wangjf
* @date: 2019-12-18
* @time: 14:10:50
* @email: jinfeng.wang@mobvista.com
* @phone: 152-1062-7698
*/
class JoypacResultAll extends CommonSparkJob {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("date", true, "[must] date")
options.addOption("output", true, "[must] output")
options.addOption("coalesce", true, "[must] coalesce")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else {
printOptions(commandLine)
}
val date = commandLine.getOptionValue("date")
val output = commandLine.getOptionValue("output")
val coalesce = commandLine.getOptionValue("coalesce")
val spark = SparkSession.builder()
.appName("JoypacResultAll")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)
try {
val etlDF = spark.sql(JoypacResultAll.sql.replace("@date", date).replace("@part", "etl"))
.rdd.map(r => {
(MRUtils.JOINER.join(r.getAs("device_id").toString, r.getAs("package_name").toString),
MRUtils.JOINER.join(r.getAs("platform"), r.getAs("app_version"), r.getAs("apps_info"), r.getAs("update_date")))
})
val yesDate = DateUtil.getDayByString(date, "yyyyMMdd", -1)
val allDF = spark.sql(JoypacResultAll.sql.replace("@date", yesDate).replace("@part", "all"))
.rdd.map(r => {
(MRUtils.JOINER.join(r.getAs("device_id").toString, r.getAs("package_name").toString),
MRUtils.JOINER.join(r.getAs("platform"), r.getAs("app_version"), r.getAs("apps_info"), r.getAs("update_date")))
})
import spark.implicits._
val df = etlDF.fullOuterJoin(allDF)
.map(JoypacResultAll.map)
.toDF
df.coalesce(Integer.parseInt(coalesce))
.write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output)
} finally {
sc.stop()
spark.stop()
}
0
}
}
object JoypacResultAll {
val sql: String =
"""
|SELECT * FROM dwh.joypac_result WHERE dt = '@date' AND part = '@part'
""".stripMargin
val mapper = new ObjectMapper()
def map(t: (String, (Option[String], Option[String]))): JoypacEntity = {
val key = t._1
val keys = MRUtils.SPLITTER.split(key)
val device_id = keys(0)
val package_name = keys(1)
var platform = ""
var app_version = ""
var apps_info = ""
var update_date = ""
val valOpt = t._2
val dailyOpt = valOpt._1
val allOpt = valOpt._2
if (dailyOpt.isDefined && allOpt.isDefined) { // 1,1
val dVals = MRUtils.SPLITTER.split(dailyOpt.get)
platform = dVals(0)
val d_app_version = dVals(1)
import scala.collection.JavaConverters._
val dMap = JSON.parse(dVals(2)).asInstanceOf[java.util.Map[String, String]].asScala
.map(kv => (kv._1, Integer.parseInt(kv._2)))
val aVals = MRUtils.SPLITTER.split(allOpt.get)
val a_app_version = aVals(1)
val aMap = JSON.parse(aVals(2)).asInstanceOf[java.util.Map[String, Int]].asScala
val map = new mutable.HashMap[String, Int]()
for (a_kv <- aMap) {
for (d_kv <- dMap) {
if (d_kv._1.equals(a_kv._1)) {
val status = d_kv._2 match {
case 1 => // schema_url = 1,则 status = 1
1
case 0 => // schema_url = 0,则 status = 2
2
}
map.put(d_kv._1, status)
}
}
}
for (kv <- aMap) {
// 如果新旧 app_version 一样,则将 JOIN 不上的旧的 apps_info 输出,保留原值
if (d_app_version.equals(a_app_version) && !map.contains(kv._1)) {
map.put(kv._1, kv._2)
}
}
for (kv <- dMap) {
// 将 JOIN 不上的新的 schema_url 对应值为 1的 apps_info 输出,并初始化 status = 0
if (!map.contains(kv._1) && kv._2 == 1) {
map.put(kv._1, 0)
}
}
app_version = a_app_version
apps_info = mapper.writeValueAsString(map.asJava)
update_date = dVals(3)
} else if (dailyOpt.isDefined && allOpt.isEmpty) { // 1, 0
val vals = MRUtils.SPLITTER.split(dailyOpt.get)
platform = vals(0)
app_version = vals(1)
import scala.collection.JavaConverters._
val map = JSON.parse(vals(2)).asInstanceOf[java.util.Map[String, String]].asScala
.retain((k, v) => Integer.parseInt(v) == 1 && k.length > 1).map(kv => {
(kv._1, 0)
})
apps_info = mapper.writeValueAsString(map.asJava)
update_date = vals(3)
} else if (dailyOpt.isEmpty && allOpt.isDefined) { // 0, 1
val vals = MRUtils.SPLITTER.split(allOpt.get)
platform = vals(0)
app_version = vals(1)
apps_info = vals(2)
update_date = vals(3)
}
JoypacEntity(device_id, platform, app_version, package_name, apps_info, update_date)
}
def main(args: Array[String]): Unit = {
new JoypacResultAll().run(args)
}
}