package mobvista.dmp.datasource.ga.mapreduce;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.JavaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * author: houying
 * date  : 16-10-24
 * desc  :
 */
public class GaRevenueMR extends Configured implements Tool {
    private static final Pattern splitPtn = Pattern.compile("\t");
    private static final Joiner joiner = Joiner.on("\t").useForNull("");
    private static Set<String> illegalValueSet = Sets.newHashSet("0", "0.0", "", "NULL");
    private static ObjectMapper objectMapper = new ObjectMapper();

    static class RevenueRecord {
        private double revenue;
        private String date;

        public RevenueRecord() {
        }

        public RevenueRecord(double revenue, String date) {
            this.revenue = revenue;
            this.date = date;
        }

        public double getRevenue() {
            return revenue;
        }

        public void setRevenue(double revenue) {
            this.revenue = revenue;
        }

        public String getDate() {
            return date;
        }

        public void setDate(String date) {
            this.date = date;
        }
    }

    public static class GaRevenueMapper extends Mapper<LongWritable, Text, Text, Text> {
        private static final Logger logger = LoggerFactory.getLogger(GaRevenueMapper.class);
        private Pattern datePtn = Pattern.compile("/(\\d{4}/\\d{2}/\\d{2})");
        private Text outKey = new Text();
        private Text outValue = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] array = splitPtn.split(value.toString());
            String path = ((FileSplit)context.getInputSplit()).getPath().toString();
            outKey.set(joiner.join(array[0], array[1]));
            if (array.length == 31) { //daily
                String revenueWeek = array[27];
                String revenueMonth = array[28];
                if (!illegalValueSet.contains(revenueWeek)
                        || !illegalValueSet.contains(revenueMonth)) {
                    String date = getDate(path);
                    if (date == null) {
                        return;
                    }
                    outValue.set(joiner.join(date, revenueWeek, revenueMonth));
                    context.write(outKey, outValue);
                }
            } else if (array.length == 4) {
                outValue.set(joiner.join(array[2], array[3]));
                context.write(outKey, outValue);
            }
        }

        private String getDate(String path) {
            Matcher matcher = datePtn.matcher(path);
            if (matcher.find()) {
                return matcher.group(1).replace("/", "-");
            } else {
                logger.error("can not find date in path: {}", path);
                return null;
            }
        }
    }

    public static class GaRevenueReducer extends Reducer<Text, Text, Text, NullWritable> {
        private JavaType javaType = objectMapper.getTypeFactory().constructCollectionType(List.class, RevenueRecord.class);
        private Text outKey = new Text();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            String[] idAndType = splitPtn.split(key.toString());
            List<RevenueRecord> weekList = Lists.newArrayList();
            List<RevenueRecord> monthList = Lists.newArrayList();
            String oldWeekList = null;
            String oldMonthList = null;
            for (Text value: values) {
                String[] array = splitPtn.split(value.toString());
                if (array.length == 2) { //old revenue
                    oldWeekList = array[0];
                    oldMonthList = array[1];
                } else if (array.length == 3) {
                    String date = array[0];
                    if (!illegalValueSet.contains(array[1])) {
                        weekList.add(new RevenueRecord(Double.valueOf(array[1]), date));
                    }
                    if (!illegalValueSet.contains(array[2])) {
                        monthList.add(new RevenueRecord(Double.valueOf(array[2]), date));
                    }
                }
            }
            outKey.set(joiner.join(
                    idAndType[0],
                    idAndType[1],
                    listToJson(mergeRevenueList(oldWeekList, weekList)),
                    listToJson(mergeRevenueList(oldMonthList, monthList))
            ));
            context.write(outKey, NullWritable.get());
        }

        private Collection<RevenueRecord> mergeRevenueList(String oldWeekList, final List<RevenueRecord> list) throws IOException {
            List<RevenueRecord> revenueRecords = oldWeekList == null ? Collections.<RevenueRecord>emptyList():
                    (List<RevenueRecord>) objectMapper.readValue(oldWeekList, javaType);
            Map<String, RevenueRecord> map = Maps.newHashMap();
            for (RevenueRecord revenueRecord: revenueRecords) {
                map.put(revenueRecord.getDate(), revenueRecord);
            }
            for (RevenueRecord record: list) {
                map.put(record.getDate(), record);
            }
            return map.values();
        }

        private String listToJson(Collection<RevenueRecord> list) throws IOException {
            return objectMapper.writeValueAsString(list);
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        conf.set("mapreduce.task.io.sort.mb", "500");
        conf.set("mapreduce.reduce.java.opts", "-Xmx1536m");
        conf.set("mapreduce.reduce.memory.mb", "2048");
        conf.set("mapreduce.reduce.shuffle.parallelcopies", "50");

        Job job = Job.getInstance(conf, "ga revenue");
        job.setJarByClass(GaRevenueMR.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileInputFormat.addInputPath(job, new Path(args[1]));

        Path outputPath = new Path(args[2]);
        FileSystem fileSystem = outputPath.getFileSystem(conf);
        if (fileSystem.exists(outputPath)) {
            fileSystem.delete(outputPath, true);
        }
        FileOutputFormat.setOutputPath(job, outputPath);
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(GaRevenueMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);


        job.setReducerClass(GaRevenueReducer.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new Configuration(), new GaRevenueMR(), args));
    }
}