1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package mobvista.dmp.datasource.rtdmp
import com.alibaba.fastjson.JSONObject
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.rtdmp.Constant.AudienceMerge
import mobvista.dmp.util.DateUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.codehaus.jackson.map.ObjectMapper
import java.text.SimpleDateFormat
import java.util.Calendar
import scala.collection.JavaConverters._
/**
* @package: mobvista.dmp.datasource.rtdmp
* @author: wangjf
* @date: 2020/7/13
* @time: 11:25 上午
* @email: jinfeng.wang@mobvista.com
* @phone: 152-1062-7698
*/
class RTDmpMain extends CommonSparkJob with Serializable {
def commandOptions(): Options = {
val options = new Options()
options.addOption("datetime", true, "datetime")
options.addOption("old_datetime", true, "old_datetime")
options.addOption("input", true, "input")
options.addOption("output", true, "output")
options.addOption("coalesce", true, "coalesce")
options
}
override protected def run(args: Array[String]): Int = {
val parser = new BasicParser()
val options = commandOptions()
val commandLine = parser.parse(options, args)
val datetime = commandLine.getOptionValue("datetime")
val old_datetime = commandLine.getOptionValue("old_datetime")
val input = commandLine.getOptionValue("input")
val output = commandLine.getOptionValue("output")
val coalesce = commandLine.getOptionValue("coalesce")
val spark: SparkSession = SparkSession.builder()
.appName(s"RTDmpMain.${datetime}")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
try {
var sdf = new SimpleDateFormat("yyyyMMddHHmmss")
// 默认计算上个小时的数据
val update_time_start = DateUtil.format(sdf.parse(datetime + "0000"), "yyyy-MM-dd HH:mm:ss")
val update_time_end = DateUtil.format(sdf.parse(datetime + "5959"), "yyyy-MM-dd HH:mm:ss")
val audience_date_utime_start = sdf.parse(datetime + "0000").getTime / 1000 - 28800
val audience_date_utime_end = sdf.parse(datetime + "5959").getTime / 1000 - 25200
val updateAudienceIds =
ServerUtil.request(update_time_start, update_time_end, audience_date_utime_start, audience_date_utime_end, 0, 1, 2)
.asScala.keys.toSet
println(s"updateAudienceIds -->> ${updateAudienceIds.mkString(",")}")
sdf = new SimpleDateFormat("yyyyMMddHH")
val calendar = Calendar.getInstance()
val date = sdf.parse(datetime)
calendar.setTime(date)
calendar.set(Calendar.HOUR_OF_DAY, calendar.get(Calendar.HOUR_OF_DAY) - 48)
val expire_time = sdf.format(calendar.getTime)
val hour_rdd = spark.read.orc(input).rdd.map(row => {
val devid = row.getAs[String]("devid")
val audience_ids = row.getAs[String]("audience_ids").split(",")
val audience_data = new JSONObject()
audience_ids.foreach(audience_id => {
audience_data.put(audience_id, datetime)
})
val device_type = row.getAs[String]("device_type")
(devid, (audience_data.toJSONString, device_type))
})
val sql =
s"""
|SELECT * FROM dwh.audience_merge WHERE dt = '$old_datetime'
|""".stripMargin
val merge_rdd = spark.sql(sql).rdd
.map(row => {
val devid = row.getAs[String]("devid")
val audience_map = row.getAs[String]("audience_map")
val update_time = row.getAs[String]("update_time")
val device_type = row.getAs[String]("device_type")
(devid, (audience_map, update_time, device_type))
})
import spark.implicits._
val df = hour_rdd.fullOuterJoin(merge_rdd)
.mapPartitions(ts => {
ts.map(t => {
val devid = t._1
val opt1 = t._2._1
val opt2 = t._2._2
if (opt1.nonEmpty && opt2.nonEmpty) {
val new_audience = MobvistaConstant.String2JSONObject(opt1.get._1).asInstanceOf[java.util.Map[String, String]]
val old_audience = opt2.get._1
val retain_old_audience = MobvistaConstant.String2JSONObject(old_audience).asInstanceOf[java.util.Map[String, String]].asScala
.retain((k, v) => !updateAudienceIds.contains(k.toInt) && !new_audience.keySet().contains(k) && v.compareTo(expire_time) > 0)
new_audience.putAll(retain_old_audience.asJava)
AudienceMerge(devid, new ObjectMapper().writeValueAsString(new_audience), datetime, opt1.get._2)
} else if (opt1.nonEmpty && opt2.isEmpty) {
AudienceMerge(devid, opt1.get._1, datetime, opt1.get._2)
} else {
val old_audience = opt2.get._1
val retain_old_audience = MobvistaConstant.String2JSONObject(old_audience).asInstanceOf[java.util.Map[String, String]].asScala
.retain((k, v) => !updateAudienceIds.contains(k.toInt) && v.compareTo(expire_time) > 0)
AudienceMerge(devid, new ObjectMapper().writeValueAsString(retain_old_audience.asJava), opt2.get._2, opt2.get._3)
}
})
}).filter(o => {
!MobvistaConstant.String2JSONObject(o.audience_map).isEmpty
})
df.toDF
.repartition(coalesce.toInt)
.write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output)
} finally {
if (sc != null) {
sc.stop()
}
if (spark != null) {
spark.stop()
}
}
0
}
}
object RTDmpMain {
def main(args: Array[String]): Unit = {
new RTDmpMain().run(args)
}
}