1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package mobvista.dmp.datasource.rtdmp
import java.net.URI
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.format.RDDMultipleOutputFormat
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.Text
import org.apache.spark.sql.SparkSession
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
/**
* @package: mobvista.dmp.datasource.rtdmp
* @author: wangjf
* @date: 2020/7/13
* @time: 11:25 上午
* @email: jinfeng.wang@mobvista.com
* @phone: 152-1062-7698
*/
class RTDmpAS extends CommonSparkJob with Serializable {
def commandOptions(): Options = {
val options = new Options()
options.addOption("input_audience", true, "input_audience")
options.addOption("input_data", true, "input_data")
options.addOption("output", true, "output")
options.addOption("coalesce", true, "coalesce")
options.addOption("time", true, "time")
options
}
override protected def run(args: Array[String]): Int = {
val parser = new BasicParser()
val options = commandOptions()
val commandLine = parser.parse(options, args)
val input_audience = commandLine.getOptionValue("input_audience")
val input_data = commandLine.getOptionValue("input_data")
val output = commandLine.getOptionValue("output")
val coalesce = commandLine.getOptionValue("coalesce")
val time = commandLine.getOptionValue("time")
val spark: SparkSession = MobvistaConstant.createSparkSession(s"RTDmpAS.$time")
val sc = spark.sparkContext
try {
FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
// 默认计算上个小时的数据
// val update_time_start = DateUtil.format(time + ":00:00", "yyyy-MM-dd HH:mm:ss")
// val update_time_end = DateUtil.format(time + ":59:59", "yyyy-MM-dd HH:mm:ss")
// println("update_time_start -->> " + update_time_start + ", update_time_end -->> " + update_time_end)
// 筛选所有的人群包,给一个最大值和一个最小值
val audience_date_utime_start = 1577811600L
val audience_date_utime_end = 4100731200L
val update_time_start = "2000-01-01 00:00:00"
val update_time_end = "2099-12-31 23:59:59"
/*
val audience_date_utime_start = DateUtil.parse(update_time_start, "yyyy-MM-dd HH").getTime / 1000 - 28800
val audience_date_utime_end = DateUtil.parse(update_time_end, "yyyy-MM-dd HH").getTime / 1000 - 28800
*/
import scala.collection.JavaConverters._
val ids = ServerUtil.request(update_time_start, update_time_end, audience_date_utime_start, audience_date_utime_end, 0, 0,4).asScala
val trueId = ids.filter(kv => {
kv._2._3 != 3
}).keys.toSet
val falseId = ids.filter(kv => {
kv._2._3 == 3
}).keys.toSet
println("trueId -->> " + trueId)
println("falseId -->> " + falseId)
val audience_output = output + "/audience"
FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(audience_output), true)
val audineceSum = sc.textFile(input_audience).map(r => {
r.substring(1, r.length - 1).split(",", -1)
}).filter(r => {
!falseId.contains(Integer.parseInt(r(0)))
}).map(r => {
(r(0), r(1))
}).cache()
audineceSum.coalesce(1).saveAsTextFile(audience_output)
val data_output = output + "/data"
FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(data_output), true)
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
// .mapPartitions(Logic.parseResult(data_output, trueId, falseId, _))
// 运算人群包不进行产出。
spark.read.orc(input_data).rdd
.map(row => {
val array = new ArrayBuffer[(Text, Text)]()
val audience_info = row.getAs[String]("audience_info")
val jsonObject = MobvistaConstant.String2JSONObject(audience_info)
val devid = jsonObject.getString("devid")
val install_list = jsonObject.getJSONObject("devid")
val audience_id = JSON.parseArray(jsonObject.getJSONArray("audience_id").toJSONString, classOf[Integer]).toSet
if (((audience_id -- falseId) & trueId).nonEmpty) {
val newJSON = new JSONObject()
newJSON.put("devid", devid)
if (install_list.nonEmpty) {
newJSON.put("install_list", install_list)
}
newJSON.put("audience_id", ((audience_id -- falseId) & trueId).asJava)
val regionSet = row.getAs("region").asInstanceOf[mutable.WrappedArray[String]]
for (region <- regionSet) {
array.add((new Text(data_output + "/" + region), new Text(newJSON.toJSONString)))
}
}
array.iterator
}).flatMap(l => l)
.repartition(coalesce.toInt)
.saveAsNewAPIHadoopFile(data_output, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]])
val jsonArray = new JSONArray()
audineceSum.collect().foreach(m => {
val jsonObject = new JSONObject()
"".equals("")
jsonObject.put("id", Integer.parseInt(m._1))
jsonObject.put("audience_data_status", 2)
jsonObject.put("audience_count", Integer.parseInt(m._2))
jsonArray.add(jsonObject)
})
val jsonObject = ServerUtil.update(jsonArray)
if (jsonObject.getInteger("code") == 200) {
println("Audience Update OK!")
}
} finally {
if (sc != null) {
sc.stop()
}
if (spark != null) {
spark.stop()
}
}
0
}
}
object RTDmpAS {
def main(args: Array[String]): Unit = {
new RTDmpAS().run(args)
}
}