package mobvista.dmp.format; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; import org.apache.orc.OrcFile; import org.apache.orc.Writer; import org.apache.orc.mapreduce.OrcMapreduceRecordWriter; import java.io.IOException; import java.text.NumberFormat; import java.util.HashMap; import java.util.Iterator; /** * */ public class MultipleOrcOutputFormat<K extends WritableComparable<?>, V extends Writable & WritableComparable> extends FileOutputFormat<K, V> { private static final NumberFormat NUMBER_FORMAT = NumberFormat .getInstance(); static { NUMBER_FORMAT.setMinimumIntegerDigits(5); NUMBER_FORMAT.setGroupingUsed(false); } private MultipleOrcOutputFormat.MultiRecordWriter writer = null; public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { if (writer == null) { writer = new MultipleOrcOutputFormat.MultiRecordWriter(job, getTaskOutputPath(job)); } return writer; } private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException { Path workPath = null; OutputCommitter committer = super.getOutputCommitter(conf); if (committer instanceof FileOutputCommitter) { workPath = ((FileOutputCommitter) committer).getWorkPath(); } else { Path outputPath = super.getOutputPath(conf); if (outputPath == null) { throw new IOException("Undefined job output-path"); } workPath = outputPath; } return workPath; } protected String generateFileNameForKeyValue(K key, V value, String name) { return key.toString() + "/" + name; } public class MultiRecordWriter extends RecordWriter<K, V> { private HashMap<String, RecordWriter<NullWritable, V>> recordWriters = null; private TaskAttemptContext job = null; private Path workPath = null; public MultiRecordWriter(TaskAttemptContext job, Path workPath) { super(); this.job = job; this.workPath = workPath; recordWriters = new HashMap<String, RecordWriter<NullWritable, V>>(); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { Iterator<RecordWriter<NullWritable, V>> values = this.recordWriters.values() .iterator(); while (values.hasNext()) { values.next().close(context); } this.recordWriters.clear(); } @Override public void write(K key, V value) throws IOException, InterruptedException { TaskID taskId = job.getTaskAttemptID().getTaskID(); int partition = taskId.getId(); String baseName = generateFileNameForKeyValue(key, value, NUMBER_FORMAT.format(partition)); RecordWriter<NullWritable, V> rw = this.recordWriters.get(baseName); if (rw == null) { rw = getBaseRecordWriter(job, baseName); this.recordWriters.put(baseName, rw); } rw.write(NullWritable.get(), value); } // ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension} protected RecordWriter<NullWritable, V> getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException, FileAlreadyExistsException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); RecordWriter<NullWritable, V> recordWriter = null; FileSystem fs = workPath.getFileSystem(conf); if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass( job, GzipCodec.class); CompressionCodec codec = ReflectionUtils.newInstance( codecClass, conf); Path file = new Path(workPath, baseName + codec.getDefaultExtension() + ".orc"); if (fs.exists(file)) { fs.delete(file, true); } Writer writer = OrcFile.createWriter(file, org.apache.orc.mapred.OrcOutputFormat.buildOptions(conf)); recordWriter = new OrcMapreduceRecordWriter<V>(writer); } else { Path file = new Path(workPath, baseName + ".orc"); if (fs.exists(file)) { fs.delete(file, true); } FSDataOutputStream fileOut = file.getFileSystem(conf).create( file, false); Writer writer = OrcFile.createWriter(file, org.apache.orc.mapred.OrcOutputFormat.buildOptions(conf)); recordWriter = new OrcMapreduceRecordWriter<V>(writer); } return recordWriter; } } }