package mobvista.dmp.format; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; import java.io.IOException; import java.text.NumberFormat; import java.util.HashMap; import java.util.Iterator; @SuppressWarnings("rawtypes") public abstract class RCMultipleOutputFormat<K extends WritableComparable, V extends BytesRefArrayWritable> extends FileOutputFormat<K, V> { private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance(); /** * set number of columns into the given configuration. * * @param conf configuration instance which need to set the column number * @param columnNum column number for RCFile's Writer */ public static void setColumnNumber(Configuration conf, int columnNum) { assert columnNum > 0; conf.setInt(RCFile.COLUMN_NUMBER_CONF_STR, columnNum); } /** * Returns the number of columns set in the conf for writers. * * @param conf * @return number of columns for RCFile's writer */ public static int getColumnNumber(Configuration conf) { return conf.getInt(RCFile.COLUMN_NUMBER_CONF_STR, 0); } private MultiRecordWriter writer = null; public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { if (writer == null) { writer = new MultiRecordWriter(job, getTaskOutputPath(job)); } return writer; } private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException { Path workPath = null; OutputCommitter committer = super.getOutputCommitter(conf); if (committer instanceof FileOutputCommitter) { workPath = ((FileOutputCommitter) committer).getWorkPath(); } else { Path outputPath = super.getOutputPath(conf); if (outputPath == null) { throw new IOException("Undefined job output-path"); } workPath = outputPath; } return workPath; } protected abstract String generateFileNameForKeyValue(K key, V value, String name); protected abstract K generateActualKey(K key, V value); public class MultiRecordWriter extends RecordWriter<K, V> { private HashMap<String, RecordWriter<K, V>> recordWriters = null; private TaskAttemptContext job = null; private Path workPath = null; public MultiRecordWriter(TaskAttemptContext job, Path workPath) { super(); this.job = job; this.workPath = workPath; recordWriters = new HashMap<String, RecordWriter<K, V>>(); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator(); while (values.hasNext()) { values.next().close(context); } this.recordWriters.clear(); } @Override public void write(K key, V value) throws IOException, InterruptedException { //得到输出文件名 TaskID taskId = job.getTaskAttemptID().getTaskID(); int partition = taskId.getId(); String baseName = generateFileNameForKeyValue(key, value, NUMBER_FORMAT.format(partition)); RecordWriter<K, V> rw = this.recordWriters.get(baseName); if (rw == null) { rw = getBaseRecordWriter(job, baseName); this.recordWriters.put(baseName, rw); } key = generateActualKey(key, value); rw.write(key, value); } // ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension} private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); Path path = new Path(workPath, baseName); FileSystem fs = path.getFileSystem(conf); CompressionCodec codec = null; if (getCompressOutput(job)) { Class<?> codecClass = getOutputCompressorClass(job, DefaultCodec.class); codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); } final RCFile.Writer out = new RCFile.Writer(fs, conf, path, job, codec); return new RecordWriter<K, V>() { @Override public void close(TaskAttemptContext job) throws IOException { out.close(); } @Override public void write(WritableComparable key, BytesRefArrayWritable value) throws IOException { out.append(value); } }; } } }