package mobvista.dmp.datasource.adn.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;

/**
 * Created by Administrator on 2017/6/12 0012.
 */
public class MergeAdnClickMR {
    public static void main (String[] args) {
        int exitCode = 0;
        try {
            Configuration conf = new Configuration();
            conf.set("mapreduce.map.speculative", "true");
            conf.set("mapreduce.reduce.speculative", "true");
            conf.set("mapreduce.task.io.sort.mb", "500");
            conf.set("mapreduce.reduce.java.opts", "-Xmx1536m");
            conf.set("mapreduce.reduce.memory.mb", "2048");
            conf.set("mapreduce.reduce.shuffle.parallelcopies", "50");
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

            Job job = Job.getInstance(conf, "MergeAdnClickMR");
            job.setJarByClass(MergeAdnClickMR.class);

            FileOutputFormat.setCompressOutput(job, true);
            FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

            job.setMapperClass(MergeAdnClickMapper.class);
            job.setReducerClass(MergeAdnClickReducer.class);

            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);

            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
            FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

            exitCode = job.waitForCompletion(true) ? 0 : 1;
        } catch (Exception e) {
            exitCode = -1;
            e.printStackTrace();
        } finally {
            System.exit(exitCode);
        }
    }

    public static class MergeAdnClickMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            context.write(value, NullWritable.get());
        }
    }
    public static class MergeAdnClickReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }
}