1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package mobvista.dmp.datasource.age.mapreduce;
import mobvista.dmp.util.MRUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.JavaType;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
/**
* Created by wangjf on 2018-8-22 15:54:27.
*/
public class ExtractDeviceMRV2 {
public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {
Configuration conf = new Configuration();
conf.set("mapreduce.map.speculative", "true");
conf.set("mapreduce.reduce.speculative", "true");
conf.set("mapreduce.task.io.sort.mb", "500");
conf.set("mapreduce.reduce.java.opts", "-Xmx1536m");
conf.set("mapreduce.reduce.memory.mb", "2048");
conf.set("mapreduce.reduce.shuffle.parallelcopies", "50");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = Job.getInstance(conf, "ExtractDeviceMR");
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
job.setJarByClass(ExtractDeviceMRV2.class);
job.setMapperClass(ExtractMapper.class);
job.setReducerClass(ExtractReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class ExtractMapper extends Mapper<LongWritable, Text, Text, Text> {
ObjectMapper objectMapper = new ObjectMapper();
JavaType javaType = objectMapper.getTypeFactory()
.constructCollectionType(ArrayList.class, ExtractDeviceMould.class);
Text outKey = new Text();
Text outValue = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = MRUtils.SPLITTER.split(line, -1);
String deviceId = fields[0];
String[] tmpDeviceId = Util.lineSplit.split(deviceId, -1);//-
if (tmpDeviceId.length != 5) {
return;
}
if (Util.match.matcher(deviceId).matches()) {
return;
}
if (fields.length != 4) {
return;
}
if (!StringUtils.isNotBlank(fields[3])) {
return;
}
StringBuilder outputValue = new StringBuilder();
// 使用jackson解析json数组
List<ExtractDeviceMould> packageList = objectMapper.readValue(fields[3], javaType);
String updateDate = "";
if (packageList.size() == 0) {//说明没有package_name,扔掉
return;
} else {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
for (ExtractDeviceMould deviceMould : packageList) {
outputValue.append("#");//循环把package_name加到后面
outputValue.append(deviceMould.getPackage_name());
if (!StringUtils.isNotBlank(updateDate) && StringUtils.isNotBlank(deviceMould.getDate())) {
updateDate = deviceMould.getDate();
} else {
try {
// 获取最近的活跃日期
if (format.parse(deviceMould.getDate()).after(format.parse(updateDate))) {
updateDate = deviceMould.getDate();
}
} catch (ParseException e) {
}
}
}
}
if (outputValue.length() > 0) {
outKey.set(fields[0]);//device_id
outValue.set(MRUtils.JOINER.join(outputValue.toString().substring(1), // package_namees
fields[1], // device_type
updateDate // update_date
));
context.write(outKey, outValue);
}
}
}
public static class ExtractReducer extends Reducer<Text, Text, Text, Text> {
Text outKey = new Text();
Text outValue = new Text();
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuilder outputValue = new StringBuilder();
String deviceId = key.toString();
if (deviceId.equals("")) {
return;
}
String deviceType = "";
String updateDate = "";
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
for (Text value : values) {
String[] arr = MRUtils.SPLITTER.split(value.toString(), -1);
deviceType = arr[1];
// 设备的最新活跃时间
if (!StringUtils.isNotBlank(updateDate) && StringUtils.isNotBlank(arr[2])) {
updateDate = arr[2];
} else {
try {
if (format.parse(arr[2]).after(format.parse(updateDate))) {
updateDate = arr[2];
}
} catch (ParseException e) {
}
}
// package_names
outputValue.append("#");
outputValue.append(arr[0]);
}
String value = outputValue.substring(1);
outKey.set(deviceId);
outValue.set(MRUtils.JOINER.join("B", value, deviceType + "#" + updateDate));//package_names
context.write(outKey, outValue);
}
}
}