package mobvista.dmp.datasource.age.mapreduce;

import com.google.common.collect.Maps;
import mobvista.dmp.util.MRUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.orc.mapred.OrcStruct;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.JavaType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
 * Created by liushuai on 2017/2/17 0017.
 */
public class DmpYearMRV2 {

    public enum Fields {
        DEVICE_ID("device_id", 0),
        DEVICE_TYPE("device_type", 1),
        PACKAGE_NAMES("package_names", 2),
        AGE("age", 3),
        TAG("tag", 4),
        UPDATE_DATE("update_date", 5);

        private String name;
        private int idx;

        Fields(String name, int idx) {
            this.name = name;
            this.idx = idx;
        }

        public String getName() {
            return name;
        }

        public int getIdx() {
            return idx;
        }
    }

    public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
        Configuration conf = new Configuration();
        conf.set("mapreduce.map.speculative", "true");
        conf.set("mapreduce.reduce.speculative", "true");
        conf.set("mapreduce.reduce.memory.mb", "1536");
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        Job job = Job.getInstance(conf, "DmpYearMRV2");
        job.setNumReduceTasks(1);
        job.setJarByClass(DmpYearMRV2.class);

        job.setMapperClass(DmpYearOrcMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setReducerClass(DmpYearReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

    public static class DmpYearOrcMapper extends Mapper<NullWritable, OrcStruct, Text, NullWritable> {
        Text outKey = new Text();
        ObjectMapper objectMapper = new ObjectMapper(); //转换器
        JavaType mapType = objectMapper.getTypeFactory().constructMapType(Map.class, String.class, Map.class);
        int yes = 0;
        int no = 0;
        int other = 0;
        int valid = 0;
        int un = 0;
        Long una = 0L;
        Map<String, Integer> st = Maps.newHashMap();
        Long allNum = 0L;

        public void map(NullWritable key, OrcStruct value, Context context) throws IOException, InterruptedException {
            if (value.getNumFields() != 6) {
                return;
            }
            //  String line = value.toString();
            //  String[] fields = MRUtils.SPLITTER.split(line, -1);
            //  if (fields.length != 5) {
            //      return;
            //  }
            String ages = value.getFieldValue(Fields.AGE.getIdx()).toString();
            String tag = value.getFieldValue(Fields.TAG.getIdx()).toString();

            String age;
            String maxAge;
            allNum = allNum + 1;
            Map<String, Map> ageMap = objectMapper.readValue(ages, mapType);
            Map<String, String> ageSource = ageMap.get("age_and_source");
            Map<String, Double> ageProportion = ageMap.get("age_and_proportion");
            List maxAgeList = getMaxRatio(ageProportion);
            maxAge = (String) maxAgeList.get(0);
            double maxRatio = (Double) maxAgeList.get(1);
            if (!ageSource.containsKey("null")) {
                if (tag.equals("unbelievable")) {
                    un += 1;
                    una += 1;
                    return;
                }
                if (tag.equals("unknown")) {
                    no += 1;
                    other += 1;
                    return;
                }
                if (maxRatio < 0.5) {
                    un += 1;
                    return;
                }
                Set<String> ageSet = ageSource.keySet();
                for (String item : ageSet) {
                    age = Util.colonSplit.split(item, -1)[0];
                    if (maxAge.equals(age)) {
                        yes += 1;
                        break;
                    } else {
                        no += 1;
                        break;
                    }
                }
            }
            if (tag.equals("unbelievable")) {
                una += 1;
                return;
            }
            if (tag.equals("unknown")) {
                other += 1;
                return;
            }
            if (maxRatio < 0.5) {
                una += 1;
                return;
            }
            Integer sum = st.get(maxAge);
            if (sum == null) {
                sum = 0;
            }
            st.put(maxAge, sum + 1);
            valid += 1;
        }

        public void cleanup(Context context) throws IOException, InterruptedException {
            StringBuilder values = new StringBuilder();
            values.append(valid);
            for (Map.Entry<String, Integer> entry : st.entrySet()) {
                values.append("|");
                values.append(entry.getKey());
                values.append(":");
                values.append(entry.getValue());
            }
            outKey.set(MRUtils.JOINER.join(yes, no, un, allNum, other, una, values

            ));
            context.write(outKey, NullWritable.get());
        }
    }

    public static class DmpYearReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
        Text outKey = new Text();
        int yes = 0;
        int no = 0;
        int other = 0;
        int valid = 0;
        int validAll = 0;
        int num = 0;
        int un = 0;
        Long una = 0L;
        Map<String, Integer> st = Maps.newHashMap();
        Long allNum = 0L;
        double ratio = 0.0;
        double recall = 0.0;

        public void reduce(Text key, Iterable<NullWritable> values, Context context)
                throws IOException, InterruptedException {

            String line = key.toString();
            String[] fields = MRUtils.SPLITTER.split(line, -1);
            yes = yes + Integer.parseInt(fields[0]);
            no = no + Integer.parseInt(fields[1]);
            un = un + Integer.parseInt(fields[2]);
            allNum = allNum + Long.parseLong(fields[3]);
            other = other + Integer.parseInt(fields[4]);
            una = una + Long.parseLong(fields[5]);
            String[] arr = Util.verticalLine.split(fields[6], -1);
            valid = valid + Integer.parseInt(arr[0]);
            for (int i = 1; i < arr.length; i++) {
                String[] item = Util.colonSplit.split(arr[i], -1);
                String gen = item[0];
                Integer sum = st.get(gen);
                if (sum == null) {
                    sum = 0;
                }
                Integer part = Integer.parseInt(item[1]);
                st.put(gen, sum + part);
                validAll = validAll + part;
            }
        }

        public void cleanup(Context context) throws IOException, InterruptedException {
            StringBuilder value = new StringBuilder();
            value.append(valid);
            for (Map.Entry<String, Integer> entry : st.entrySet()) {
                value.append("|");
                value.append(entry.getKey());
                value.append(":");
                value.append(entry.getValue());
            }
            if (yes + no != 0) {
                ratio = (double) yes / (yes + no);
            }
            if (allNum != 0) {
                recall = (double) validAll / allNum;
            }
            outKey.set(MRUtils.JOINER.join(yes, no, un, ratio, allNum, recall, other, una, validAll, value));
            context.write(outKey, NullWritable.get());
        }
    }

    public static List getMaxRatio(Map<String, Double> ageMap) {
        String maxAge = "";
        double maxRatio = 0.0;
        for (Map.Entry<String, Double> entry : ageMap.entrySet()) {
            String ages = entry.getKey();
            Double ratio = entry.getValue();
            if (ages != null && !ages.equals("")) {
                if (maxAge.length() == 0) {
                    maxAge = ages;
                    maxRatio = ratio;
                } else if (maxRatio < ratio) {
                    maxAge = ages;
                    maxRatio = ratio;
                }
            }
        }
        List maxRatioList = new ArrayList<>();
        maxRatioList.add(maxAge);
        maxRatioList.add(maxRatio);
        return maxRatioList;
    }
}