package mobvista.dmp.format; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; import java.io.IOException; /** * RCFileOutputFormat. */ @InterfaceAudience.Public @InterfaceStability.Stable public class RCFileOutputFormat<K extends WritableComparable, V extends BytesRefArrayWritable> extends FileOutputFormat<WritableComparable, BytesRefArrayWritable> { /** * set number of columns into the given configuration. * * @param conf configuration instance which need to set the column number * @param columnNum column number for RCFile's Writer */ public static void setColumnNumber(Configuration conf, int columnNum) { assert columnNum > 0; conf.setInt(RCFile.COLUMN_NUMBER_CONF_STR, columnNum); } /** * Returns the number of columns set in the conf for writers. * * @param conf * @return number of columns for RCFile's writer */ public static int getColumnNumber(Configuration conf) { return conf.getInt(RCFile.COLUMN_NUMBER_CONF_STR, 0); } @Override public RecordWriter<WritableComparable, BytesRefArrayWritable> getRecordWriter( TaskAttemptContext job) throws IOException, InterruptedException { // TODO Auto-generated method stub Configuration conf = job.getConfiguration(); Path file = getDefaultWorkFile(job, ""); FileSystem fs = file.getFileSystem(conf); CompressionCodec codec = null; if (getCompressOutput(job)) { Class<?> codecClass = getOutputCompressorClass(job, DefaultCodec.class); codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); } final RCFile.Writer out = new RCFile.Writer(fs, conf, file, job, codec); return new RecordWriter<WritableComparable, BytesRefArrayWritable>() { @Override public void close(TaskAttemptContext job) throws IOException { out.close(); } @Override public void write(WritableComparable key, BytesRefArrayWritable value) throws IOException { out.append(value); } }; } }