1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package mobvista.dmp.datasource.dm.mapreduce;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import mobvista.dmp.common.CommonMapReduce;
import mobvista.dmp.datasource.ga.mapreduce.vo.TextPairSort;
import mobvista.dmp.util.MRUtils;
import mobvista.prd.datasource.util.GsonUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
* Created by liushuai
* desc :整合dm_interest_tag目录下的数据,统一分区all
*/
public class DmInterestAllMR extends Configured implements Tool {
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new Configuration(), new DmInterestAllMR(), args));
}
@Override
public int run(String[] args) throws Exception {
String[] otherArgs = new GenericOptionsParser(this.getConf(), args).getRemainingArgs();
this.getConf().set("mapreduce.map.speculative", "true");
this.getConf().set("mapreduce.reduce.speculative", "true");
this.getConf().set("mapreduce.task.io.sort.mb", "500");
this.getConf().set("mapreduce.reduce.java.opts", "-Xmx1536m");
this.getConf().set("mapreduce.reduce.memory.mb", "2048");
this.getConf().set("mapreduce.reduce.shuffle.parallelcopies", "50");
Job job = Job.getInstance(this.getConf(), "DmInterestAllMR");
job.setJarByClass(DmInterestAllMR.class);
job.setMapperClass(DmInterestAllMapper.class);
job.setReducerClass(DmInterestAllReducer.class);
job.setMapOutputKeyClass(TextPairSort.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setGroupingComparatorClass(TextPairSort.FirstComparator.class);
job.setPartitionerClass(TextPairSort.FirstPartitioner.class);
job.setInputFormatClass(TextInputFormat.class);
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
/**
* 使用二次排序对每个包,按照日期倒序排
*/
public static class DmInterestAllMapper extends Mapper<LongWritable, Text, TextPairSort, Text> {
Text outValue = new Text();
private TextPairSort outKey = new TextPairSort();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = MRUtils.SPLITTER.split(line,-1);
if (fields.length != 4) {
return;
}
String deviceId = fields[0];
if ((deviceId.matches(CommonMapReduce.didPtn) && !CommonMapReduce.allZero.equals(deviceId)) || deviceId.matches(CommonMapReduce.andriodIdPtn) || deviceId.matches(CommonMapReduce.imeiPtn)){
String deviceType = fields[1];
String platform = fields[2];
String interest = fields[3];
JsonObject jsonObject = null;
JsonArray tagsArray = GsonUtil.String2JsonArray(interest);
for (JsonElement tagElement : tagsArray) {
jsonObject = tagElement.getAsJsonObject();
String date = getDate(jsonObject.get("date"));
String tag = jsonObject.get("tag").toString();
String packageName = getPackageName(jsonObject.get("package_name"));
outKey.set(MRUtils.JOINER.join(
deviceId,
deviceType,
platform,
packageName
), date, "false");
outValue.set(MRUtils.JOINER.join(
date,
tag
));
context.write(outKey, outValue);
}
}
}
private String getDate(JsonElement element) {
if (element != null && !element.isJsonNull()) {
return element.getAsString();
}
return "1970-01-01";
}
private String getPackageName(JsonElement element) {
if (element != null && !element.isJsonNull()) {
return element.getAsString();
}
return "";
}
}
/**
*
*/
public static class DmInterestAllReducer extends Reducer<TextPairSort, Text, Text, NullWritable> {
private Text outKey = new Text();
@Override
protected void reduce(TextPairSort key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String[] keySplit = MRUtils.SPLITTER.split(key.getFirst().toString(), -1);
String packageName = keySplit[3];
// 取日期最近的数据,即取第一条记录
String value = values.iterator().next().toString();
String[] valSplits = MRUtils.SPLITTER.split(value, -1);
String date = valSplits[0];
String tag = valSplits[1];
JsonObject jsonObject = new JsonObject();
if (StringUtils.isNotEmpty(date) && !"1970-01-01".equals(date)) {
jsonObject.addProperty("date", date);
}
if (StringUtils.isNotEmpty(packageName)) {
jsonObject.addProperty("package_name", packageName);
}
jsonObject.add("tag", GsonUtil.String2JsonArray(tag));
outKey.set(MRUtils.JOINER.join(
keySplit[0], // deviceId
keySplit[1], // deviceType
keySplit[2], // platform
jsonObject.toString()
));
context.write(outKey, NullWritable.get());
}
}
}