1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package mobvista.prd.datasource.source.mapreduce;
import com.google.common.collect.Lists;
import mobvista.prd.datasource.table.MergeAppIDMR;
import mobvista.prd.datasource.util.MRUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.List;
/**
* Created by Administrator on 2017/4/21 0021.
* desc :计算ga,dsp,m系统,3s的日活与纯新增
*/
public class SourceDailyMR {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = Job.getInstance(conf, "daily ga dsp 3s m");
job.setJarByClass(MergeAppIDMR.class);
job.setNumReduceTasks(1);
job.setMapperClass(AllMergeMRMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(AllMergeMRReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
FileInputFormat.addInputPath(job, new Path(otherArgs[2]));
FileInputFormat.addInputPath(job, new Path(otherArgs[3]));
FileInputFormat.addInputPath(job, new Path(otherArgs[4]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[5]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class AllMergeMRMapper extends Mapper<LongWritable, Text, Text, Text> {
Text outKey = new Text();
Text outValue = new Text();
String today = "";
String oldDay = "";
public void setup (Context context) {
today = context.getConfiguration().get("day");
oldDay = context.getConfiguration().get("oldDay");
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t", -1);
String inputFile = context.getConfiguration().get("map.input.file");
if (inputFile.contains(today) && inputFile.contains("3s")) {
outValue.set("3s");
outKey.set(MRUtils.JOINER.join(fields[0], fields[1]));//device_id,device_type
context.write(outKey, outValue);
} else if (inputFile.contains(today) && inputFile.contains("ga")) {
outValue.set("ga");
outKey.set(MRUtils.JOINER.join(fields[0], fields[1]));//device_id,device_type
context.write(outKey, outValue);
} else if (inputFile.contains(today) && inputFile.contains("dsp")) {
outValue.set("dsp");
outKey.set(MRUtils.JOINER.join(fields[0], fields[1]));//device_id,device_type
context.write(outKey, outValue);
} else if (inputFile.contains(today) && inputFile.contains("adn")) {
outValue.set("m");
outKey.set(MRUtils.JOINER.join(fields[0], fields[1]));//device_id,device_type
context.write(outKey, outValue);
} else if (inputFile.contains(oldDay)) {
outValue.set(oldDay);
outKey.set(MRUtils.JOINER.join(fields[0], fields[1]));//device_id,device_type
context.write(outKey, outValue);
}
}
}
public static class AllMergeMRReducer extends Reducer<Text, Text, Text, NullWritable> {
long ga = 0L;
long dsp = 0L;
long m = 0L;
long s = 0L;
Long all = 0L;
Long idfa = 0L;
Long gaid = 0L;
Text outKey = new Text();
String today = "";
String oldDay = "";
public void setup (Context context) {
today = context.getConfiguration().get("day");
oldDay = context.getConfiguration().get("oldDay");
}
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
List<String> list = Lists.newArrayList();
for (Text val : values) {
list.add(val.toString());
}
if (list.contains("ga") && !list.contains(oldDay)) {
ga++;
}
if (list.contains("dsp") && !list.contains(oldDay)) {
dsp++;
}
if (list.contains("m") && !list.contains(oldDay)) {
m++;
}
if (list.contains("3s") && !list.contains(oldDay)) {
s++;
}
if (!list.contains(oldDay)) {
all++;
}
String type = key.toString().split("\t")[1];
if (type.contains("idfa") && !list.contains(oldDay)) {
idfa++;
}
if (type.contains("gaid") && !list.contains(oldDay)) {
gaid++;
}
}
public void cleanup (Context context) throws IOException, InterruptedException {
outKey.set(MRUtils.JOINER.join(
ga,
dsp,
m,
s,
all,
idfa,
gaid
));
context.write(outKey, NullWritable.get());
}
}
}