package mobvista.dmp.datasource.ga.mapreduce;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;
import java.util.regex.Pattern;

/**
 * author: houying
 * date  : 16-10-9
 * desc  :
 */
public class GaDeviceTotalMR extends Configured implements Tool {
    private static Pattern splitPtn = Pattern.compile("\t");
    private static int GENDER_INDEX_IN_TOTAL = 26;
    private static final int DAILY_LENGTH = 32;

    public static class GaDeviceTotalMapper extends Mapper<LongWritable, Text, Text, Text> {
        private Pattern idfaPtn = Pattern.compile("^[0-9A-F\\-]+$");
        private final Text text = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] array = splitPtn.split(value.toString());
            if (array.length == 33 || array.length == DAILY_LENGTH) {
                String deviceId = array[0];
                String platform = array[2];
                platform = fixPlatform(platform, deviceId);
                String deviceType = getDeviceType(platform);
                text.set(deviceId + "\t" + deviceType);
                context.write(text, value);
            }
        }

        private String getDeviceType(String platform) {
            switch (platform) {
                case "ios":
                    return "idfa";
                case "android":
                    return "gaid";
                case "adr":
                    return "gaid";
                default:
                    return "unknown";
            }
        }

        private String fixPlatform(String platform, String deviceId) {
            if (idfaPtn.matcher(deviceId).matches()) {
                return "ios";
            }
            return platform;
        }
    }

    public static class GaDeviceTotalReducer extends Reducer<Text, Text, Text, NullWritable> {
        private static final Logger logger = LoggerFactory.getLogger(GaDeviceTotalReducer.class);

        private String date = null;
        private Joiner joiner = Joiner.on("\t").useForNull("");
        private Map<String, String> genderMap;

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            date = context.getConfiguration().get("task.date");
            genderMap = Maps.newHashMap();
            genderMap.put("F", "F");
            genderMap.put("FEMALE", "F");
            genderMap.put("X", "F");
            genderMap.put("M", "M");
            genderMap.put("MALE", "M");
            genderMap.put("Y", "M");
        }

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            String[] idAndType = splitPtn.split(key.toString());
            if (idAndType.length != 2) {
                return;
            }
            String[] outputArray = null;
            String[] newRecord = null;
            for (Text value: values) {
                String[] array = splitPtn.split(value.toString());
                if (outputArray == null && array.length == 33) {
                    if(org.apache.commons.lang.StringUtils.isNotBlank(array[3]) && array[3].equalsIgnoreCase("GB") ){
                        array[3] = "UK";
                    }
                    outputArray = array;
                } else if (newRecord == null && array.length == DAILY_LENGTH) {
                    newRecord = array;
                }
            }

            // 将UK改为GB,明远需求,修改人冯亮 20170815
            if (newRecord != null && newRecord.length >= 4) {
                String countryCode = newRecord[3];
                if ("GB".equalsIgnoreCase(countryCode)) {
                    newRecord[3] = "UK";
                }
            }

            outputArray = merge(newRecord, outputArray);
            outputArray[0] = idAndType[0];
            outputArray[1] = idAndType[1];
            standardizingGender(outputArray);
            context.write(new Text(joiner.join(outputArray)), NullWritable.get());
        }

        private void standardizingGender(String[] outputArray) {
            String gender = outputArray[GENDER_INDEX_IN_TOTAL]; //gender
            gender = genderMap.get(gender.toUpperCase());
            if (gender != null) {
                outputArray[GENDER_INDEX_IN_TOTAL] = gender;
            }
        }

        private String[] merge(String[] newRecord, String[] outputArray) {
            Preconditions.checkArgument(newRecord != null || outputArray != null,
                    "both newRecord and outputArray are null");
            if (newRecord == null) {
                return outputArray;
            }
            if (outputArray == null) {
                outputArray = new String[33];
                //copy
                System.arraycopy(newRecord, 2, outputArray, 2, 4);
                System.arraycopy(newRecord, 6, outputArray, 7, 25);
                //set first_payed
                if (!Strings.isNullOrEmpty(newRecord[5]) && isNumeric(newRecord[5]) && Integer.valueOf(newRecord[5]) > 0) {
                    outputArray[6] = date;
                }
                //set update_time
                outputArray[32] = date;
                //return
                return outputArray;
            }
            //neither newRecord nor outputArray is null
            if (Strings.isNullOrEmpty(outputArray[32]) || outputArray[32].equals("NULL") ) {
                outputArray[32] = date;
            }
            if (outputArray[32].compareTo(date) >= 0) {
                return outputArray;
            }
            String createTime = outputArray[4];
            System.arraycopy(newRecord, 2, outputArray, 2, 4);
            outputArray[4] = createTime;
            System.arraycopy(newRecord, 6, outputArray, 7, 25);
            //set first_payed
            if (!Strings.isNullOrEmpty(newRecord[5]) && isNumeric(newRecord[5]) && Integer.valueOf(newRecord[5]) > 0) {
                outputArray[6] = date;
            }
            outputArray[32] = date; //update time
            return outputArray;
        }
    }


    public  static boolean isNumeric(String s) {
        if (s != null && !"".equals(s.trim()))
            return s.matches("^[0-9]*$");
        else
            return false;
    }


    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new Configuration(), new GaDeviceTotalMR(), args));
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        conf.set("mapreduce.task.io.sort.mb", "500");
        conf.set("mapreduce.reduce.java.opts", "-Xmx1536m");
        conf.set("mapreduce.reduce.memory.mb", "2048");
        conf.set("mapreduce.reduce.shuffle.parallelcopies", "50");

        Job job = Job.getInstance(conf, "ga device total");
        job.setJarByClass(GaDeviceTotalMR.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileInputFormat.addInputPath(job, new Path(args[1]));

        Path outputPath = new Path(args[2]);
        FileSystem fileSystem = outputPath.getFileSystem(conf);
        if (fileSystem.exists(outputPath)) {
            fileSystem.delete(outputPath, true);
        }
        FileOutputFormat.setOutputPath(job, outputPath);
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
        job.setMapperClass(GaDeviceTotalMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(GaDeviceTotalReducer.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        return job.waitForCompletion(true) ? 0 : 1;
    }
}