package mobvista.prd.datasource.countotherid;

import mobvista.dmp.util.MRUtils;
import mobvista.prd.datasource.newall.NewAllMergeMR;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.JavaType;

import java.io.IOException;
import java.util.Map;
import java.util.regex.Pattern;

/**
 * Created by Administrator on 2017/5/17 0017.
 */
public class CountOtherIDMR {
    public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
        Configuration conf = new Configuration();
        conf.set("mapreduce.job.reduce.slowstart.completedmaps", "0.5");
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        String numReducers = otherArgs[3];
        String[] numrr = numReducers.split(",", -1);

        Job job = Job.getInstance(conf, "Count other id");
        job.setNumReduceTasks(Integer.parseInt(numrr[0]));
        job.setJarByClass(CountOtherIDMR.class);
        job.setMapperClass(CountOtherIDMapper.class);
        job.setReducerClass(CountOtherIDReduce.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        int exitCode = job.waitForCompletion(true) ? 0 : 1;

        if (exitCode == 0) {
            Job countJob = Job.getInstance(conf, "CountOtherIDMR count Job");
            countJob.setNumReduceTasks(Integer.parseInt(numrr[1]));
            countJob.setJarByClass(CountOtherIDMR.class);
            countJob.setMapperClass(CountMapper.class);
            countJob.setReducerClass(NewAllMergeMR.CountReducer.class);
            countJob.setMapOutputKeyClass(Text.class);
            countJob.setMapOutputValueClass(IntWritable.class);
            countJob.setOutputKeyClass(Text.class);
            countJob.setOutputValueClass(IntWritable.class);

            FileInputFormat.addInputPath(countJob, new Path(otherArgs[1]));
            FileOutputFormat.setOutputPath(countJob, new Path(otherArgs[2]));
            System.exit(countJob.waitForCompletion(true) ? 0 : 1);
        }

    }

    public static class CountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        private Text outKey = new Text();
        private Pattern pattern = Pattern.compile("\t");
        private IntWritable outVal = new IntWritable(1);

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] valSplits = pattern.split(value.toString(), -1);
            if (valSplits.length > 1) {
                outKey.set(valSplits[1]);
                context.write(outKey, outVal);
            }
        }
    }

    public static class CountOtherIDMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        Text outKey = new Text("1");
        ObjectMapper objectMapper = new ObjectMapper(); //转换器
        JavaType mapType = objectMapper.getTypeFactory().constructMapType(Map.class, String.class, String.class);
        public void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            try {
                String line = value.toString();
                Map<String, String> map = objectMapper.readValue(line, mapType);
                String imeiId = map.get("imei");
                String macId = map.get("mac");
                String androidId = map.get("android_id");

                if (!StringUtils.isEmpty(imeiId) && !"0".equals(imeiId)) {
                    outKey.set(MRUtils.JOINER.join(imeiId, "imei"));
                    context.write(outKey, NullWritable.get());
                }
                if (!StringUtils.isEmpty(macId) && !"0".equals(macId)) {
                    outKey.set(MRUtils.JOINER.join(macId, "mac"));
                    context.write(outKey, NullWritable.get());
                }
                if (!StringUtils.isEmpty(androidId) && !"0".equals(androidId)) {
                    outKey.set(MRUtils.JOINER.join(androidId, "androidid"));
                    context.write(outKey, NullWritable.get());
                }

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static class CountOtherIDReduce extends Reducer<Text, NullWritable, Text, NullWritable> {
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }
}