package mobvista.dmp.device.mapreduce;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.regex.Pattern;

/**
 * author: houying
 * date : 16-10-8
 * desc :
 */
public class DmpDeviceTotalMR extends Configured implements Tool {
    private static Pattern splitPtn = Pattern.compile("\t");

    public static class DmpDeviceMapper extends Mapper<LongWritable, Text, Text, Text> {

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] array = splitPtn.split(line);
            int length = array.length;
            if (length == 13 || length == 34 || length == 45) { //adn || ga || dmp
                context.write(new Text(array[0]), value);
            }
        }
    }

    public static class DmpDeviceReducer extends Reducer<Text, Text, Text, NullWritable> {
        private static Joiner joiner = Joiner.on("\t").useForNull("");
        private DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            String id = key.toString();
            String[] outputArray = null;
            String[] adnArray = null;
            String[] gaArray = null;
            for (Text value : values) {
                String[] array = splitPtn.split(value.toString());
                if ((adnArray == null || ((adnArray[1].equals("androidid") || adnArray[1].equals("android_id")) && array[1].equals("gaid")))
                        && array.length == 13) { // adn
                    adnArray = array;
                } else if (gaArray == null && array.length == 34) { //ga
                    gaArray = array;
                } else if (outputArray == null && array.length == 45) {
                    outputArray = array;
                }
            }
            if (outputArray == null) {
                outputArray = new String[45];
                Arrays.fill(outputArray, "");
            }
            outputArray[0] = id;
            if (gaArray != null) {
                fillInGa(gaArray, outputArray);
            }
            if (adnArray != null) {
                fillInAdn(adnArray, outputArray);
            }
            for (int i = 2; i < outputArray.length; i++) {
                if (outputArray[i].equals("NULL")) {
                    outputArray[i] = "";
                }
            }
            context.write(new Text(joiner.join(outputArray)), NullWritable.get());
        }

        /**
         * 向outputArray中填充adn数据字段
         *
         * @param array       adn数据字段
         * @param outputArray 输出数组
         */
        private void fillInAdn(String[] array, String[] outputArray) {
            outputArray[1] = array[1]; //device_type
            outputArray[2] = array[3]; //platform
            outputArray[3] = array[4]; //os version
            outputArray[4] = array[6]; //device model
            outputArray[5] = array[7]; //device brand
            outputArray[6] = array[9]; //country code
            outputArray[7] = array[10];//language
            outputArray[8] = array[2]; //mac
            outputArray[43] = array[11]; //adn_app_click_list
            outputArray[44] = array[12]; //update time
        }

        /**
         * 向outputArray中填充ga数据字段
         *
         * @param array       ga数据字段
         * @param outputArray 输出数组
         */
        private void fillInGa(String[] array, String[] outputArray) throws IOException {
            outputArray[1] = array[1]; //device_type
            outputArray[9] = array[26]; //gender
            outputArray[10] = array[27]; //birth_year
//            outputArray[11] = ""; //profession
//            outputArray[12] = ""; //education level
//            outputArray[13] = ""; //income range
//            outputArray[14] = ""; //family status
//            outputArray[15] = ""; //hobby
            //outputArray[16:37] = array[4:25]
            System.arraycopy(array, 4, outputArray, 16, 22);
            outputArray[38] = array[28]; //revenue_week
            outputArray[39] = array[29]; //revenue_month
            outputArray[40] = array[30]; //average_session_per_app
            outputArray[41] = array[31]; //average_days_per_app
            outputArray[42] = array[32]; //installation list
            outputArray[44] = formatter.print(DateTime.now());
        }
    }

    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        conf.set("mapreduce.map.speculative", "true");
        conf.set("mapreduce.reduce.speculative", "true");
        conf.set("mapreduce.task.io.sort.mb", "500");
        conf.set("mapreduce.reduce.java.opts", "-Xmx1536m");
        conf.set("mapreduce.reduce.memory.mb", "2048");
        conf.set("mapreduce.reduce.shuffle.parallelcopies", "50");

        Job job = Job.getInstance(conf, "dmp device total");
        job.setJarByClass(DmpDeviceTotalMR.class);

        FileInputFormat.addInputPath(job, new Path(strings[0]));
        FileInputFormat.addInputPath(job, new Path(strings[1]));
        FileInputFormat.addInputPath(job, new Path(strings[2]));

        Path outputPath = new Path(strings[3]);
        FileSystem fileSystem = outputPath.getFileSystem(conf);
        if (fileSystem.exists(outputPath)) {
            fileSystem.delete(outputPath, true);
        }
        FileOutputFormat.setOutputPath(job, outputPath);
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

        job.setInputFormatClass(TextInputFormat.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(DmpDeviceMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(DmpDeviceReducer.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new Configuration(), new DmpDeviceTotalMR(), args));
/*
        String gaTotal = "00000B0A-46ED-4F9D-A101-7D8BF004CFF6\tidfa\tios\tIQ\t2016-09-07\t0\t\t1\t1\t0\t1\t0\t0\t0\t0\t0\t0\t0\t24\t36.0\t24\t36.0\t24\t36.0\t527708077401556\t\tfemale\t\t0.0\t0.0\t8.0\t1.0\t[{\"date\": \"20160907\", \"appName\": \"949498190\"}]\t20161006";
        String adnTotal = "00000B0A-46ED-4F9D-A101-7D8BF004CFF6\tidfa\t0\tandroid\t5.0\t0\tx6a\tvivo\t0\tCN\t0\t[{\"publisher_id\":\"6028\",\"app_version\":\"0\",\"app_id\":\"20660\",\"datetime\":\"2016-09-06 21:44:49\"}]\t2016-09-06 21:44:49";
        Iterable<Text> iterable = Lists.newArrayList(new Text(gaTotal), new Text(adnTotal));
        new DmpDeviceReducer().reduce(new Text("00000B0A-46ED-4F9D-A101-7D8BF004CFF6\tidfa"), iterable, null);*/
    }
}