package mobvista.dmp.common;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.net.util.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.*;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

/**
 * author: houying
 * date  : 16-11-8
 * desc  :
 */
public abstract class CommonMapReduce extends Configured implements Tool {

    private final String name;
    private final Class<? extends Mapper> mapperClass;
    private final Class<? extends Reducer> reducerClass;
    public final static String didPtn = "^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$";
    public final static String imeiPtn = "^([0-9]{15,17})$";
    public final static String imeiMd5Ptn = "^([a-fA-F0-9]{32})$";
    public final static String imeiMd5 = "imeimd5";
    public final static String oaidMd5 = "oaidmd5";
    public final static String andriodIdPtn = "^[a-zA-Z0-9]{16}$";
    public final static String allZero = "00000000-0000-0000-0000-000000000000";
    //  14~16位连续多位相同字符,非法IMEI过滤
    public final static String imeiPtnAll = "^([0-9])\1{14,16}";
    //  连续多位相同字符,非法 androidId 过滤
    public final static String andriodIdAll = "^[a-zA-Z0-9]\1{15}$";
    //  连续多位相同字符,非法 IMEI MD5 过滤
    public final static String umd5Ptn = "^([0-9A-Za-z])\1{29,31}";
    //  OAID
    public final static String oaidPtb = "^[0-9A-Za-z-]{16,64}$";
    public final static String md5Ptn = "^([a-fA-F0-9]{32})$";

    public CommonMapReduce(String name, Class<? extends Mapper> mapperClass, Class<? extends Reducer> reducerClass) {
        this.name = name;
        this.mapperClass = mapperClass;
        this.reducerClass = reducerClass;
    }

    public static void setMetrics(Mapper.Context context, String metricsGroup, String metricsName, long increaseNum) {
        context.getCounter(metricsGroup, metricsName).increment(increaseNum);
    }


    public static void setMetrics(Reducer.Context context, String metricsGroup, String metricsName, long increaseNum) {
        context.getCounter(metricsGroup, metricsName).increment(increaseNum);
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
       /* conf.set("mapreduce.map.speculative", "true");
        conf.set("mapreduce.reduce.speculative", "true");
        conf.set("mapreduce.task.io.sort.mb", "500");
        conf.set("mapreduce.reduce.java.opts", "-Xmx1536m");
        conf.set("mapreduce.reduce.memory.mb", "2048");
        conf.set("mapreduce.reduce.shuffle.parallelcopies", "50");*/
        Job job = Job.getInstance(conf, name);
        job.setJarByClass(CommonMapReduce.class);

        setInputPath(job, args);
        setOutputPath(job, args);

        Path outputPath = FileOutputFormat.getOutputPath(job);
        FileSystem fileSystem = outputPath.getFileSystem(job.getConfiguration());
        if (fileSystem.exists(outputPath)) {
            fileSystem.delete(outputPath, true);
        }

        job.setMapperClass(mapperClass);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        if (reducerClass != null) {
            job.setReducerClass(reducerClass);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
        } else {
            job.setOutputKeyClass(job.getMapOutputKeyClass());
            job.setOutputValueClass(job.getMapOutputValueClass());
        }
        otherSetting(job, args);
        setConf(job, args);
        return job.waitForCompletion(true) ? 0 : 1;
    }

    protected void otherSetting(Job job, String[] args) throws Exception {

    }

    protected void setOutputPath(Job job, String[] args) throws IOException {

    }

    protected void setInputPath(Job job, String[] args) throws IOException {

    }

    protected void setConf(Job job, String[] args) throws IOException {

    }

    public static void start(Tool tool, String[] args) throws Exception {
        System.exit(ToolRunner.run(new Configuration(), tool, args));
    }


    /**
     * 从指定文件中读取需要广播的内容放入到Map中
     *
     * @param filePath   文件路径
     * @param conf       hadoop配置
     * @param keyIndex   作为map key的数据下标索引
     * @param valueIndex 作为map value的数据下标索引
     * @return
     * @throws Exception
     */
    public static Map<String, String> buildBroadcast(String filePath, Configuration conf, int keyIndex, int valueIndex) throws Exception {
        Map<String, String> map = new HashMap<String, String>();
        FileSystem fs = FileSystem.get(URI.create(filePath), conf);
        BufferedReader reader = null;
        try {
            InputStream in = fs.open(new Path(filePath));
            reader = new BufferedReader(new InputStreamReader(in));
            String line = reader.readLine();
            while (line != null) {
                String[] splits = line.split("\t", -1);
                map.put(splits[keyIndex], splits[valueIndex]);
                line = reader.readLine();
            }
        } finally {

        }
        return map;
    }

    /**
     * 将对象序列化成字符串
     *
     * @param obj
     * @return
     * @throws Exception
     */
    public static String encodeObject(Object obj) throws Exception {
        ObjectOutputStream oos = null;
        try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            oos = new ObjectOutputStream(baos);
            oos.writeObject(obj);
            byte[] bytes = baos.toByteArray();
            return new String(Base64.encodeBase64(bytes));
        } finally {
            if (oos != null) {
                oos.flush();
                oos.close();
            }
        }
    }

    /**
     * 将字符串反序列化成对象
     *
     * @param str
     * @param clazz
     * @param <T>
     * @return
     * @throws Exception
     */
    public static <T extends Serializable> T decodeObject(String str, Class<T> clazz) throws Exception {
        ObjectInputStream ois = null;
        try {
            byte[] input = Base64.decodeBase64(str);
            ByteArrayInputStream bais = new ByteArrayInputStream(input);
            ois = new ObjectInputStream(bais);
            return (T) ois.readObject();
        } finally {
            if (ois != null) {
                ois.close();
            }
        }
    }

    public static boolean checkDeviceId(String deviceId) {

        boolean flag = StringUtils.isNotBlank(deviceId) &&
                ((deviceId.matches(didPtn) && !allZero.equals(deviceId)) ||
                        (deviceId.matches(imeiPtn) && !deviceId.matches(imeiPtnAll)) ||
                        (deviceId.matches(andriodIdPtn) && !deviceId.matches(andriodIdAll)) ||
                        (deviceId.matches(md5Ptn) && !deviceId.matches(umd5Ptn)) ||
                        deviceId.matches(oaidPtb) || deviceId.length() > 16);
        return flag;
    }
}