package mobvista.dmp.datasource.gender;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.orc.mapred.OrcStruct;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.JavaType;

import java.io.IOException;
import java.util.Map;

/**
 * Created by liushuai on 2017/1/18 0018.
 * desc  :计算总的男性比例与总的性别数量
 */
public class ValidateMRV2 {

    public enum Fields {
        DEVICE_ID("device_id", 0),
        DEVICE_TYPE("device_type", 1),
        PACKAGE_NAMES("package_names", 2),
        SOURCE("source", 3),
        GENDER("gender", 4),
        RATIO("ratio", 5),
        TAG("tag", 6),
        UPDATE_DATE("update_date", 7);

        private String name;
        private int idx;

        Fields(String name, int idx) {
            this.name = name;
            this.idx = idx;
        }

        public String getName() {
            return name;
        }

        public int getIdx() {
            return idx;
        }
    }

    public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
        Configuration conf = new Configuration();
        conf.set("mapreduce.map.speculative", "true");
        conf.set("mapreduce.reduce.speculative", "true");
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        Job job = Job.getInstance(conf, "ValidateMRV2");
        job.setNumReduceTasks(1);
        job.setJarByClass(ValidateMRV2.class);

        job.setMapperClass(ValidateMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setReducerClass(ValidateReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

    public static class ValidateMapper extends Mapper<NullWritable, OrcStruct, Text, NullWritable> {
        Text outKey = new Text();
        int yes = 0;
        int no = 0;
        int other = 0;
        int female = 0;
        int man = 0;
        int mSource = 0;
        int fSource = 0;
        int oSource = 0;
        int mRight = 0;
        int fRight = 0;
        int oRight = 0;
        Long num = 0L;

        public void map(NullWritable key, OrcStruct value, Context context) {
            try {
                if (value.getNumFields() != 8) {
                    return;
                }
                /*
                String line = value.toString();
                String[] fields = MRUtils.SPLITTER.split(line, -1);
                if (fields.length != 7) {
                    return;
                }
                */
                String gender = value.getFieldValue(Fields.GENDER.getIdx()).toString();
                String source = value.getFieldValue(Fields.SOURCE.getIdx()).toString();

                int flag = 0;
                if (gender.equals("o")) {
                    other += 1;
                }
                if (gender.equals("f")) {
                    female += 1;
                }
                if (gender.equals("m")) {
                    man += 1;
                }
                if (!source.equals("null")) {
                    if (source.equals("m")) {
                        mSource++;
                    } else if (source.equals("f")) {
                        fSource++;
                    } else {
                        oSource++;
                    }
                    if (!source.equals(gender)) {
                        flag = 1;
                    }
                    if (flag == 0) {
                        if (source.equals("m")) {
                            mRight++;
                        } else if (source.equals("f")) {
                            fRight++;
                        } else if (source.equals("o")) {
                            oRight++;
                        }
                        yes += 1;
                    } else {
                        no += 1;
                    }
                }
                num += 1;
            } catch (Exception e) {
                return;
            }
        }

        public void cleanup(Context context) {
            outKey.set(MRUtils.JOINER.join(
                    yes,
                    no,
                    mSource,
                    fSource,
                    oSource,
                    mRight,
                    fRight,
                    oRight,
                    man,
                    female,
                    other,
                    num
            ));
            try {
                context.write(outKey, NullWritable.get());
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static class ValidateReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
        Text outKey = new Text();
        int yes = 0;
        int no = 0;
        int other = 0;
        int female = 0;
        int man = 0;
        int mSource = 0;
        int fSource = 0;
        int oSource = 0;
        int mRight = 0;
        int fRight = 0;
        int oRight = 0;
        Long num = 0L;
        double ratio = 0.0;

        public void reduce(Text key, Iterable<NullWritable> values, Context context) {
            try {
                String line = key.toString();
                String[] fields = MRUtils.SPLITTER.split(line, -1);
                yes += Integer.parseInt(fields[0]);
                no += Integer.parseInt(fields[1]);
                mSource += Integer.parseInt(fields[2]);
                fSource += Integer.parseInt(fields[3]);
                oSource += Integer.parseInt(fields[4]);
                mRight += Integer.parseInt(fields[5]);
                fRight += Integer.parseInt(fields[6]);
                oRight += Integer.parseInt(fields[7]);
                man += Integer.parseInt(fields[8]);
                female += Integer.parseInt(fields[9]);
                other += Integer.parseInt(fields[10]);
                num += Long.parseLong(fields[11]);
            } catch (Exception e) {
                return;
            }
        }

        public void cleanup(Context context) {
            if ((yes + no) != 0) {
                ratio = (double) (yes) / (yes + no);
            }
            outKey.set(MRUtils.JOINER.join(
                    yes,
                    no,
                    ratio,
                    mSource,
                    fSource,
                    oSource,
                    mRight,
                    fRight,
                    oRight,
                    man,
                    female,
                    other,
                    num
            ));
            try {
                context.write(outKey, NullWritable.get());
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}