package mobvista.dmp.datasource.dsp.mapreduce;

import com.google.gson.JsonObject;
import mobvista.dmp.common.CommonMapReduce;
import mobvista.dmp.common.CommonMapper;
import mobvista.dmp.util.MRUtils;
import mobvista.dmp.util.Standardizer;
import mobvista.prd.datasource.util.GsonUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ObjectNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.text.ParseException;

/**
 * author: houying
 * date  : 16-10-25
 * desc  :
 */
public class DspDeviceProfileTotalMR extends Configured implements Tool {
    private static final Logger logger = LoggerFactory.getLogger(DspDeviceProfileTotalMR.class);

    public static class ProfileTotalMapper extends CommonMapper {
        private ObjectMapper objectMapper;
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String val = value.toString();
                String[] array = MRUtils.SPLITTER.split(val);
                if (array.length >= 13) { //daily
                    ObjectNode objectNode = objectMapper.createObjectNode();
                    objectNode.put("device_id", array[0]);
                    objectNode.put("device_type", array[1]);
                    objectNode.put("platform", array[2]);
                    objectNode.put("country", array[3]);
                    objectNode.put("ip", array[4]);
                    objectNode.put("gender", array[5]);
                    objectNode.put("birthday", array[6]);
                    objectNode.put("maker", array[7]);
                    objectNode.put("model", array[8]);
                    objectNode.put("os_v", array[9]);
                    objectNode.put("os", array[2]);
                    objectNode.put("time", array[12]);
                    if(array[12].length()<10){
                        return;
                    }
                    objectNode.put("dmp_time", array[12].substring(0, 10));

                    // 将UK改为GB,明远需求,修改人冯亮 20170815
                    String countryCode = objectNode.get("country").getTextValue();
                    if ("GB".equalsIgnoreCase(countryCode)) {
                        objectNode.put("country", "UK");
                    }

                    outKey.set(MRUtils.JOINER.join(array[0], array[1]));
                    outValue.set(objectMapper.writeValueAsString(objectNode));
                    context.write(outKey, outValue);
                } else {
                    try {
                        JsonNode dspProfile = objectMapper.readTree(val);

                        // 将UK改为GB,明远需求,修改人冯亮 20170815
                        String countryCode = dspProfile.get("country").getTextValue();
                        if ("GB".equalsIgnoreCase(countryCode)) {
                            JsonObject obj = GsonUtil.String2JsonObject(val);
                            obj.addProperty("country", "UK");
                            value.set(obj.toString());
                        }

                        outKey.set(MRUtils.JOINER.join(dspProfile.get("device_id"), dspProfile.get("device_type")));
                        context.write(outKey, value);
                    } catch (Exception e) {
                        CommonMapReduce.setMetrics(context, "DMP", "json_parse_error", 1);
                    }
                }
        }

        @Override
        protected  void setup(Context context) throws IOException, InterruptedException {
            objectMapper = new ObjectMapper();
        }
    }

    public static class ProfileTotalReducer extends Reducer<Text, Text, NullWritable, Text> {
        private Text outKey = new Text();
        private ObjectMapper objectMapper;
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            String outValue = null;
            String tmpValue;
            JsonNode dspProfile;
            JsonNode dspProfileTmp;
            int count = 0;
            for (Text value: values) {
                if (count == 0){
                    outValue = value.toString();
                    count++;
                } else {
                    tmpValue = value.toString();
                    try {
                        dspProfile = objectMapper.readTree(outValue);
                        dspProfileTmp = objectMapper.readTree(tmpValue);
                    } catch(IOException e) {
                        logger.info("json解析出错: {}", outValue);
                        context.getCounter("dsp_profile_total", "json_parse_error").increment(1);
                        continue;
                    }
                    String dspTime = dspProfile.get("dmp_time").asText();
                    String dspTimeTmp = dspProfileTmp.get("dmp_time").asText();
                    try {
                        if (Standardizer.CompareDate(dspTime,dspTimeTmp) <= 0 ){
                            outValue = tmpValue;
                        }
                    } catch (ParseException e) {
                        e.printStackTrace();
                    }
                }
            }
            if (outValue != null) {
                outKey.set(outValue);
                context.write(NullWritable.get(), outKey);
            }
        }
        @Override
        protected  void setup(Context context) throws IOException, InterruptedException {
            objectMapper = new ObjectMapper();
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        conf.set("mapreduce.map.speculative", "true");
        conf.set("mapreduce.reduce.speculative", "true");
        conf.set("mapreduce.task.io.sort.mb", "500");
        conf.set("mapreduce.reduce.java.opts", "-Xmx1536m");
        conf.set("mapreduce.reduce.memory.mb", "2048");
        conf.set("mapreduce.reduce.shuffle.parallelcopies", "50");
        Job job = Job.getInstance(conf, "Profile job");
        job.setJarByClass(DspDeviceProfileTotalMR.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileInputFormat.addInputPath(job, new Path(args[1]));

        Path output = new Path(args[2]);

        FileSystem system = output.getFileSystem(conf);
        if (system.exists(output)) {
            system.delete(output, true);
        }
        FileOutputFormat.setOutputPath(job, output);
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

        job.setMapperClass(ProfileTotalMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);
        job.setReducerClass(ProfileTotalReducer.class);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new Configuration(), new DspDeviceProfileTotalMR(), args));
    }
}