package mobvista.dmp.datasource.age.mapreduce;

import com.google.common.collect.Maps;
import mobvista.dmp.util.MRUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.math.BigDecimal;
import java.net.URI;
import java.util.Map;

/**
 * Created by liushuai on 2017/2/17 0017.
 */
public class CalcDeviceAgeMR {
    public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
        Configuration conf = new Configuration();
        conf.set("mapreduce.map.speculative", "true");
        conf.set("mapreduce.reduce.speculative", "true");
        conf.set("mapreduce.task.io.sort.mb", "500");
        conf.set("mapreduce.reduce.java.opts", "-Xmx1536m");
        conf.set("mapreduce.reduce.memory.mb", "2048");
        conf.set("mapreduce.reduce.shuffle.parallelcopies", "50");
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        conf.set("file", otherArgs[2]);
        conf.set("mapreduce.map.memory.mb", "1331");
        Job job = Job.getInstance(conf, "CalcDeviceAgeMR");

        job.setNumReduceTasks(0);
        job.setJarByClass(CalcDeviceAgeMR.class);
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

        job.setMapperClass(CalcDeviceYearMapper.class);
        //  取消part-r-00000新式文件输出
        LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setInputFormatClass(TextInputFormat.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        //  job.setOutputFormatClass(TextMultipleOutputFormat.class);

        //  MultipleOutputs.addNamedOutput(job, "age", TextOutputFormat.class,
        //          Text.class, Text.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class CalcDeviceYearMapper extends Mapper<LongWritable, Text, Text, Text> {
        Map<String, String> packageMap = Maps.newHashMapWithExpectedSize(10000);
        Text outKey = new Text();
        //  Text outKey_v2 = new Text();
        Text outValue = new Text();
        ObjectMapper objectMapper = new ObjectMapper(); //转换器

        //  private String outPath;
        //  private String outPath_v2;

        public void setup(Context context) throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            //  outPath = conf.get("outPath");
            //  outPath_v2 = conf.get("outPath_v2");

            //读取CalcPackageDictMR的结果,放入Map中
            String path = conf.get("file");
            FileSystem fileSystem = FileSystem.get(URI.create(path), context.getConfiguration());
            BufferedReader reader = new BufferedReader(new InputStreamReader(fileSystem.open(new Path(path))));
            String readLine = "";
            while ((readLine = reader.readLine()) != null) {
                String[] arr = MRUtils.SPLITTER.split(readLine, -1);
                String age = arr[1].substring(1);
                packageMap.put(arr[0], age + "\t" + arr[2]);
            }
            reader.close();
        }

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String line = value.toString();
            String[] fields = MRUtils.SPLITTER.split(line, -1);
            if (fields.length != 4) {
                return;
            }
            String[] arr = Util.wellSplit.split(fields[1], -1);
            String score = "-1.0";
            int num = 0;
            String tag = "unknown";
            Map<Integer, Double> stage = Maps.newHashMap();
            Map<Integer, Integer> stageNum = Maps.newHashMap();
            Map<Integer, Double> st = Maps.newHashMap();
            for (String packageName : arr) {
                String ageRatioSource = packageMap.get(packageName);
                if (ageRatioSource == null) {
                    continue;
                }
                String[] dictValue = MRUtils.SPLITTER.split(ageRatioSource);
                if (dictValue[1].equals("confirm")) {
                    score = dictValue[0];
                    tag = dictValue[1];
                    num = 1;
                    break;
                } else if (dictValue[1].equals("unbelievable")) {
                    if (tag.equals("unknown")) {
                        score = "-1.0";
                        tag = dictValue[1];
                    }
                } else {
                    num += 1;
                    String[] item = Util.verticalLine.split(dictValue[0], -1);
                    for (String ageRatios : item) {
                        if (ageRatios == null) {
                            continue;
                        }
                        int generation = Integer.parseInt(Util.colonSplit.split(ageRatios, -1)[0]);//年龄段
                        double ageRatio = Double.parseDouble(Util.colonSplit.split(ageRatios, -1)[1]);//权重
                        Double stageAgeRatio = stage.get(generation);
                        if (stageAgeRatio == null) {
                            stage.put(generation, ageRatio);
                            stageNum.put(generation, 1);
                        } else {
                            double val = stageAgeRatio + ageRatio;
                            stage.put(generation, val);
                            stageNum.put(generation, stageNum.get(generation) + 1);
                        }
                    }
                    score = "";
                    tag = dictValue[1];
                }
            }
            if (num == 1 && tag.equals("confirm")) {
                score = "|" + score;
            }
            if (num == 1 && tag.equals("calc")) {
                for (Map.Entry<Integer, Double> entry : stage.entrySet()) {
                    score = score + '|' + entry.getKey() + ":" + entry.getValue();
                }
            }
            if (num > 1) {
                for (Map.Entry<Integer, Double> entry : stage.entrySet()) {
                    Integer gen = entry.getKey();
                    double rat = entry.getValue() / stageNum.get(gen);
                    st.put(gen, rat);
                }
                for (Map.Entry<Integer, Double> entry : st.entrySet()) {
                    score = score + '|' + entry.getKey() + ":" + entry.getValue();
                }
            }
            Map<String, Double> outMap = Maps.newHashMap();
            outMap.put("0-17", 0.0);
            outMap.put("18-24", 0.0);
            outMap.put("25-44", 0.0);
            outMap.put("45-59", 0.0);
            outMap.put("60+", 0.0);
            String[] scoreArr = score.split("\\|", -1);
            BigDecimal ageRatioDouble = null;  //小数转换
            for (String ageRatio : scoreArr) {
                if (!ageRatio.equals("") && !ageRatio.equals("-1.0")) {
                    String[] tmpRatio = Util.colonSplit.split(ageRatio, -1);
                    String generation = Util.getAge(Integer.parseInt(tmpRatio[0]));
                    if (generation != null && !generation.equals("")) {
                        ageRatioDouble = new BigDecimal(Double.parseDouble(tmpRatio[1]))
                                .setScale(6, BigDecimal.ROUND_HALF_UP);
                        outMap.put(generation, ageRatioDouble.doubleValue());
                    }
                } else if (ageRatio.equals("-1.0")) {
                    outMap = Maps.newHashMap();
                    outMap.put("unbelievable", -1.0);
                }
            }
            Map<String, String> ageMap = Maps.newHashMap();
            if (!fields[2].equals("null")) {
                String[] ageTags = fields[2].split("\\$", -1);
                for (String ageTag : ageTags) {
                    if (ageTag == null || ageTag.equals("")) {
                        return;
                    }
                    String[] ageForm = Util.wellSplit.split(ageTag, -1);
                    String generation = Util.getAge(Integer.parseInt(ageForm[0]));
                    if (!generation.equals("")) {
                        ageMap.put(generation, ageForm[1]);
                    }
                }
            } else {
                ageMap.put("null", "null");
            }
            Map<String, Map> lastMap = Maps.newLinkedHashMap();
            lastMap.put("age_and_source", ageMap);
            lastMap.put("age_and_proportion", outMap);
            String packageName = fields[1].replace("#", ",");//包名之间以逗号分隔


            String[] ss = Util.wellSplit.split(fields[3], -1);
            outValue.set(MRUtils.JOINER.join(objectMapper.writeValueAsString(lastMap), tag));
            outKey.set(MRUtils.JOINER.join(fields[0],  //device_id
                    ss[0],  //  device_type
                    packageName  //    包名
            ));
            context.write(outKey, outValue);
            /*
            outKey_v2.set(outPath_v2 + "," + MRUtils.JOINER.join(fields[0],  //device_id
                    ss[0],  //  device_type
                    packageName,  //    包名
                    ss[1]   //  update_date
            ));

            context.write(outKey_v2, outValue);
            */
            //  context.write(outKey, outValue
        }
    }
}