package mobvista.dmp.datasource.dsp.mapreduce;

import com.google.common.base.Preconditions;
import mobvista.dmp.common.CommonMapReduce;
import mobvista.dmp.util.MRUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ObjectNode;

import java.io.IOException;
import java.util.Iterator;

/**
 * author: houying
 * date  : 16-10-25
 * desc  :
 */
public class DspDeviceIdsMR extends Configured implements Tool {
    public enum Fields{
        DEVICE_GAID("gaid",34),
        DEVICE_IDFA("idfa",37),
        DEVICE_PLATFORM("platform",29),
        DEVICE_IDS("ids",15);

        private String name ;
        private int  idx;
        Fields(String name, int idx){
            this.name = name;
            this.idx = idx;
        }
        public String getName(){
            return name;
        }
        public int getIdx(){
            return idx;
        }
    }

    /**
     * The enum Ids.  ids from dsp
     */
    public enum  Ids{
        //gaid,idfa,idfamd5,idfasha1,imei,imeimd5,imeisha1,androidid,androididmd5,androididsha1,macmd5,macsha1
        GAID("gaid",0),
        IDFA("idfa",1),
        IMEI("imei",4),
        ANDROID_ID("android_id",7);
        Ids(String name, int idx){
            this.name = name;
            this.idx = idx;
        }
        private String name;
        private int idx;

        public String getName() {
            return name;
        }

        public int getIdx() {
            return idx;
        }
    }



    public static class ProfileMapper extends Mapper<LongWritable, Text, Text, Text> {
        private Text outKey = new Text();
        private Text outValue = new Text();
        private String date = null;
        private MultipleOutputs multipleOutputs = null;
        private ObjectMapper objectMapper = null;

        public static String ANDROID = "android";
        public static String IOS = "ios";
        public static String idName = "device_id";


        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] array = MRUtils.SPLITTER.split(value.toString());
            if (array.length >= 52) { //adn_dsp daily
                ObjectNode dspProfile = objectMapper.createObjectNode();
                int idx = Fields.DEVICE_PLATFORM.getIdx();
                String platform = array[idx];
                if ( "ios".equals(platform) ){//ios 只使用idfa 不需要做id映射
                    return ;
                }

                idx = Fields.DEVICE_IDS.getIdx();
                String name = Fields.DEVICE_IDS.getName();
                String outValueStr = array[idx];
                String [] idsArray = outValueStr.trim().split(",",-1);
                if ( idsArray.length < 7 ){
                    CommonMapReduce.setMetrics(context, "DMP", "dsp_ids_err", 1);
                    return;
                }
                for (Ids id:Ids.values()){
                    idx = id.getIdx();
                    name = id.getName();
                    String idValue = idsArray[idx];
                    dspProfile.put(name,idValue);
                }
                dspProfile.put("dmp_time", date);
                outKey.set(dspProfile.get(Ids.GAID.getName()).asText());
                outValue.set(dspProfile.toString());

                int count =0;
                if (count == 100000){
                    System.out.println("dspProfile: "+ dspProfile);
                    count = 0;
                }

                context.write(outKey, outValue);
            } else {
                CommonMapReduce.setMetrics(context, "DMP", "dsp_length_err", 1);
                return;
            }
        }

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            date = Preconditions.checkNotNull(context.getConfiguration().get("task.date"));
            objectMapper = new ObjectMapper();
        }
    }

    public static class ProfileReducer extends Reducer<Text, Text, NullWritable, Text> {
        private Text outValue = new Text();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            String value;
            Iterator<Text> it = values.iterator();
            if (it.hasNext()) {
                value = it.next().toString();
                outValue.set(value);
                context.write(NullWritable.get(), outValue);
                int count =0;
                if (count == 10000){
                    System.out.println("dspProfile: "+ value);
                    count = 0;
                }
            }
        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        conf.set("mapreduce.map.speculative", "true");
        conf.set("mapreduce.reduce.speculative", "true");
        conf.set("mapreduce.task.io.sort.mb", "500");
        conf.set("mapreduce.reduce.java.opts", "-Xmx1536m");
        conf.set("mapreduce.reduce.memory.mb", "2048");
        conf.set("mapreduce.reduce.shuffle.parallelcopies", "50");
        Job job = Job.getInstance(conf, "Dsp Profile job");
        job.setJarByClass(DspDeviceIdsMR.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        //FileInputFormat.addInputPath(job, new Path(args[1]));

        Path output = new Path(args[1]);

        FileSystem system = output.getFileSystem(conf);
        if (system.exists(output)) {
            system.delete(output, true);
        }
        FileOutputFormat.setOutputPath(job, output);
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);


        job.setMapperClass(ProfileMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setReducerClass(ProfileReducer.class);

        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);


        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new Configuration(), new DspDeviceIdsMR(), args));
    }
}