1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package mobvista.dmp.format;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Iterator;
@SuppressWarnings("rawtypes")
public abstract class RCMultipleOutputFormat<K extends WritableComparable, V extends BytesRefArrayWritable> extends
FileOutputFormat<K, V>
{
private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
/**
* set number of columns into the given configuration.
*
* @param conf configuration instance which need to set the column number
* @param columnNum column number for RCFile's Writer
*/
public static void setColumnNumber(Configuration conf, int columnNum)
{
assert columnNum > 0;
conf.setInt(RCFile.COLUMN_NUMBER_CONF_STR, columnNum);
}
/**
* Returns the number of columns set in the conf for writers.
*
* @param conf
* @return number of columns for RCFile's writer
*/
public static int getColumnNumber(Configuration conf)
{
return conf.getInt(RCFile.COLUMN_NUMBER_CONF_STR, 0);
}
private MultiRecordWriter writer = null;
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException,
InterruptedException
{
if (writer == null)
{
writer = new MultiRecordWriter(job, getTaskOutputPath(job));
}
return writer;
}
private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException
{
Path workPath = null;
OutputCommitter committer = super.getOutputCommitter(conf);
if (committer instanceof FileOutputCommitter)
{
workPath = ((FileOutputCommitter) committer).getWorkPath();
} else
{
Path outputPath = super.getOutputPath(conf);
if (outputPath == null)
{
throw new IOException("Undefined job output-path");
}
workPath = outputPath;
}
return workPath;
}
protected abstract String generateFileNameForKeyValue(K key, V value, String name);
protected abstract K generateActualKey(K key, V value);
public class MultiRecordWriter extends RecordWriter<K, V>
{
private HashMap<String, RecordWriter<K, V>> recordWriters = null;
private TaskAttemptContext job = null;
private Path workPath = null;
public MultiRecordWriter(TaskAttemptContext job, Path workPath)
{
super();
this.job = job;
this.workPath = workPath;
recordWriters = new HashMap<String, RecordWriter<K, V>>();
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException
{
Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();
while (values.hasNext())
{
values.next().close(context);
}
this.recordWriters.clear();
}
@Override
public void write(K key, V value) throws IOException, InterruptedException
{
//得到输出文件名
TaskID taskId = job.getTaskAttemptID().getTaskID();
int partition = taskId.getId();
String baseName = generateFileNameForKeyValue(key, value, NUMBER_FORMAT.format(partition));
RecordWriter<K, V> rw = this.recordWriters.get(baseName);
if (rw == null)
{
rw = getBaseRecordWriter(job, baseName);
this.recordWriters.put(baseName, rw);
}
key = generateActualKey(key, value);
rw.write(key, value);
}
// ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension}
private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName)
throws IOException, InterruptedException
{
Configuration conf = job.getConfiguration();
Path path = new Path(workPath, baseName);
FileSystem fs = path.getFileSystem(conf);
CompressionCodec codec = null;
if (getCompressOutput(job))
{
Class<?> codecClass = getOutputCompressorClass(job, DefaultCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
}
final RCFile.Writer out = new RCFile.Writer(fs, conf, path, job, codec);
return new RecordWriter<K, V>()
{
@Override
public void close(TaskAttemptContext job) throws IOException
{
out.close();
}
@Override
public void write(WritableComparable key, BytesRefArrayWritable value)
throws IOException
{
out.append(value);
}
};
}
}
}