1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package mobvista.dmp.format;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.orc.OrcFile;
import org.apache.orc.Writer;
import org.apache.orc.mapreduce.OrcMapreduceRecordWriter;
import java.io.IOException;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Iterator;
/**
*
*/
public class MultipleOrcOutputFormat<K extends WritableComparable<?>, V extends Writable & WritableComparable> extends FileOutputFormat<K, V> {
private static final NumberFormat NUMBER_FORMAT = NumberFormat
.getInstance();
static {
NUMBER_FORMAT.setMinimumIntegerDigits(5);
NUMBER_FORMAT.setGroupingUsed(false);
}
private MultipleOrcOutputFormat.MultiRecordWriter writer = null;
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {
if (writer == null) {
writer = new MultipleOrcOutputFormat.MultiRecordWriter(job, getTaskOutputPath(job));
}
return writer;
}
private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
Path workPath = null;
OutputCommitter committer = super.getOutputCommitter(conf);
if (committer instanceof FileOutputCommitter) {
workPath = ((FileOutputCommitter) committer).getWorkPath();
} else {
Path outputPath = super.getOutputPath(conf);
if (outputPath == null) {
throw new IOException("Undefined job output-path");
}
workPath = outputPath;
}
return workPath;
}
protected String generateFileNameForKeyValue(K key, V value, String name) {
return key.toString() + "/" + name;
}
public class MultiRecordWriter extends RecordWriter<K, V> {
private HashMap<String, RecordWriter<NullWritable, V>> recordWriters = null;
private TaskAttemptContext job = null;
private Path workPath = null;
public MultiRecordWriter(TaskAttemptContext job, Path workPath) {
super();
this.job = job;
this.workPath = workPath;
recordWriters = new HashMap<String, RecordWriter<NullWritable, V>>();
}
@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
Iterator<RecordWriter<NullWritable, V>> values = this.recordWriters.values()
.iterator();
while (values.hasNext()) {
values.next().close(context);
}
this.recordWriters.clear();
}
@Override
public void write(K key, V value) throws IOException,
InterruptedException {
TaskID taskId = job.getTaskAttemptID().getTaskID();
int partition = taskId.getId();
String baseName = generateFileNameForKeyValue(key, value,
NUMBER_FORMAT.format(partition));
RecordWriter<NullWritable, V> rw = this.recordWriters.get(baseName);
if (rw == null) {
rw = getBaseRecordWriter(job, baseName);
this.recordWriters.put(baseName, rw);
}
rw.write(NullWritable.get(), value);
}
// ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension}
protected RecordWriter<NullWritable, V> getBaseRecordWriter(TaskAttemptContext job,
String baseName) throws IOException, InterruptedException, FileAlreadyExistsException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
RecordWriter<NullWritable, V> recordWriter = null;
FileSystem fs = workPath.getFileSystem(conf);
if (isCompressed) {
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(
job, GzipCodec.class);
CompressionCodec codec = ReflectionUtils.newInstance(
codecClass, conf);
Path file = new Path(workPath, baseName
+ codec.getDefaultExtension() + ".orc");
if (fs.exists(file)) {
fs.delete(file, true);
}
Writer writer = OrcFile.createWriter(file,
org.apache.orc.mapred.OrcOutputFormat.buildOptions(conf));
recordWriter = new OrcMapreduceRecordWriter<V>(writer);
} else {
Path file = new Path(workPath, baseName + ".orc");
if (fs.exists(file)) {
fs.delete(file, true);
}
FSDataOutputStream fileOut = file.getFileSystem(conf).create(
file, false);
Writer writer = OrcFile.createWriter(file,
org.apache.orc.mapred.OrcOutputFormat.buildOptions(conf));
recordWriter = new OrcMapreduceRecordWriter<V>(writer);
}
return recordWriter;
}
}
}