1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package mobvista.dmp.datasource.dm.mapreduce;
import com.google.common.collect.Maps;
import mobvista.dmp.common.CommonMapReduce;
import mobvista.dmp.util.MRUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.JavaType;
import java.io.IOException;
import java.util.*;
/**
* Created by liushuai
* desc :整合dm_interest_tag目录下的数据,统一分区all
*/
public class DmInterestAllMR1 {
public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = Job.getInstance(conf, "DmInterestAllMR1");
job.setJarByClass(DmInterestAllMR1.class);
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
job.setMapperClass(DmInterestAllMapper.class);
job.setReducerClass(DmInterestAllReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class DmInterestAllMapper extends Mapper<LongWritable, Text, Text, Text> {
Text outKey = new Text();
Text outValue = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = MRUtils.SPLITTER.split(line,-1);
if (fields.length != 4) {
return;
}
if (fields[0].matches(CommonMapReduce.didPtn) && !CommonMapReduce.allZero.equals(fields[0])) {
outKey.set(MRUtils.JOINER.join(
fields[0], //device_id
fields[1], //device_type
fields[2] //platform
));
outValue.set(fields[3]);
context.write(outKey, outValue);
}
}
}
public static class DmInterestAllReducer extends Reducer<Text, Text, Text, NullWritable> {
private String date;
private Text outKey = new Text();
private ObjectMapper objectMapper = new ObjectMapper(); //转换器
private JavaType listType = objectMapper.getTypeFactory().constructCollectionType(ArrayList.class, Map.class);
@Override
protected void setup(Context context) throws IOException, InterruptedException {
date = context.getConfiguration().get("task.date");
}
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
List<String> arrList = new ArrayList<String>();
Map<String, List> outMap = Maps.newHashMap();
for (Text val : values) {//device_id相同的不同目录下的包
arrList.add(val.toString());
}
if (arrList.size() == 1) {//device_id一个,其他分区没有,直接输出
outKey.set(MRUtils.JOINER.join(key.toString(),arrList.get(0)));
context.write(outKey, NullWritable.get());
} else if (arrList.size() > 1) {//deivice_id有多个,不同分区
List<Map<String, String>> tagList = null;//暂时存储tag里的数据
Map<String, String> packageDateMap = new HashMap<String, String>(); // 存储package对应的最后一次访问date
for (String line : arrList) {
List<Map<String, Object>> packageNameList = objectMapper.readValue(line, listType);//json转换成list
for (Map tagMap : packageNameList) {
String date = (String) tagMap.get("date");
String packageName = (String) tagMap.get("package_name");
if (!outMap.containsKey(packageName)) {//不包含包名,第一次出现
tagList = (List) tagMap.get("tag");
outMap.put(packageName, tagList);
// 将date放到map中
if (StringUtils.isNotEmpty(date)) {
packageDateMap.put(packageName, date);
}
} else {//不是第一次出现
// 判断日期,如果date > oldDate,将package对应的日期换成date
String oldDate = packageDateMap.get(packageName);
if (StringUtils.isNotEmpty(date)
&& StringUtils.isNotEmpty(oldDate)
&& oldDate.compareTo(date) < 0) {
packageDateMap.put(packageName, date);
}
}
}
}
Map.Entry<String, List> entry = null;
Iterator<Map.Entry<String, List>> itr = outMap.entrySet().iterator();
List<Map<String, Object>> outList = new ArrayList<Map<String, Object>>();
while (itr.hasNext()) {
entry = itr.next();
String packageName = entry.getKey();
Map<String, Object> app = Maps.newHashMap();
if (StringUtils.isNotEmpty(packageName)) {
app.put("package_name", packageName);
}
if (StringUtils.isNotEmpty(packageDateMap.get(packageName))) {
app.put("date", packageDateMap.get(packageName));
}
app.put("tag", outMap.get(packageName));
outList.add(app);
}
outKey.set(MRUtils.JOINER.join(
key.toString(),//device_id,手机品牌,gaid或idfa
objectMapper.writeValueAsString(outList) // tag
));
context.write(outKey,NullWritable.get());
}
}
/**
* 这是一个临时方案,当数据时间大于当前认识时间时,取
* @param date
* @return
*/
protected String getCurrentDate(String date) {
if (this.date.compareTo(date) < 0) {
return this.date;
}
return date;
}
}
}