package mobvista.prd.datasource.eggplants;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;

/**
 * Created by Administrator on 2017/5/3 0003.
 */
public class EggplantsCountMR {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        conf.set("mapreduce.reduce.memory.mb", "1280");

        Job job = Job.getInstance(conf, "EggplantsAppCountryAllMR");

        job.setJarByClass(EggplantsAppCountryAllMR.class);
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

        job.setMapperClass(EggplantsMapper.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        //    job.setCombinerClass(EggplantsCombiner.class);

        job.setReducerClass(EggplantsReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
}
    public static class EggplantsMapper extends Mapper<LongWritable, Text, Text, Text> {
            Text outKey = new Text();
            Text outValue = new Text();
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] fields = line.split("\t", -1);
            outKey.set(fields[0] + "\t" + fields[1]);
            outValue.set(fields[2]);
            context.write(outKey, outValue);
        }
    }
    public static class EggplantsReducer extends Reducer<Text, Text, Text, Text> {
        Text outKey = new Text();
        Text outValue = new Text();

        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            int all = 0;
            for (Text val:values) {
                all++;
            }
            outValue.set(all+"");
            context.write(key, outValue);
        }
    }
}