package mobvista.dmp.datasource.dm.mapreduce;

import com.google.common.collect.Maps;
import mobvista.dmp.common.CommonMapReduce;
import mobvista.dmp.util.MRUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.JavaType;

import java.io.IOException;
import java.util.*;

/**
 * Created by liushuai
 * desc :整合dm_interest_tag目录下的数据,统一分区all
 */
public class DmInterestAllMR1 {
    public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        Job job = Job.getInstance(conf, "DmInterestAllMR1");
        job.setJarByClass(DmInterestAllMR1.class);
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
        job.setMapperClass(DmInterestAllMapper.class);
        job.setReducerClass(DmInterestAllReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class DmInterestAllMapper extends Mapper<LongWritable, Text, Text, Text> {
        Text outKey = new Text();
        Text outValue = new Text();

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] fields = MRUtils.SPLITTER.split(line,-1);
            if (fields.length != 4) {
                return;
            }
            if (fields[0].matches(CommonMapReduce.didPtn) && !CommonMapReduce.allZero.equals(fields[0])) {
                outKey.set(MRUtils.JOINER.join(
                        fields[0], //device_id
                        fields[1], //device_type
                        fields[2]  //platform
                ));
                outValue.set(fields[3]);
                context.write(outKey, outValue);
            }
        }
    }

    public static class DmInterestAllReducer extends Reducer<Text, Text, Text, NullWritable> {
        private String date;
        private Text outKey = new Text();
        private ObjectMapper objectMapper = new ObjectMapper(); //转换器
        private JavaType listType = objectMapper.getTypeFactory().constructCollectionType(ArrayList.class, Map.class);

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            date = context.getConfiguration().get("task.date");
        }

        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            List<String> arrList = new ArrayList<String>();
            Map<String, List> outMap = Maps.newHashMap();
            for (Text val : values) {//device_id相同的不同目录下的包
                arrList.add(val.toString());
            }
            if (arrList.size() == 1) {//device_id一个,其他分区没有,直接输出
                outKey.set(MRUtils.JOINER.join(key.toString(),arrList.get(0)));
                context.write(outKey, NullWritable.get());
            } else if (arrList.size() > 1) {//deivice_id有多个,不同分区
                List<Map<String, String>> tagList = null;//暂时存储tag里的数据
                Map<String, String> packageDateMap = new HashMap<String, String>(); // 存储package对应的最后一次访问date
                for (String line : arrList) {
                    List<Map<String, Object>> packageNameList = objectMapper.readValue(line, listType);//json转换成list
                    for (Map tagMap : packageNameList) {
                        String date = (String) tagMap.get("date");
                        String packageName = (String) tagMap.get("package_name");
                        if (!outMap.containsKey(packageName)) {//不包含包名,第一次出现
                            tagList = (List) tagMap.get("tag");
                            outMap.put(packageName, tagList);

                            // 将date放到map中
                            if (StringUtils.isNotEmpty(date)) {
                                packageDateMap.put(packageName, date);
                            }
                        } else {//不是第一次出现
                            // 判断日期,如果date > oldDate,将package对应的日期换成date
                            String oldDate = packageDateMap.get(packageName);
                            if (StringUtils.isNotEmpty(date)
                                    && StringUtils.isNotEmpty(oldDate)
                                    && oldDate.compareTo(date) < 0) {
                                packageDateMap.put(packageName, date);
                            }
                        }
                    }
                }

                Map.Entry<String, List> entry = null;
                Iterator<Map.Entry<String, List>> itr = outMap.entrySet().iterator();
                List<Map<String, Object>> outList = new ArrayList<Map<String, Object>>();
                while (itr.hasNext()) {
                    entry = itr.next();
                    String packageName = entry.getKey();
                    Map<String, Object> app = Maps.newHashMap();
                    if (StringUtils.isNotEmpty(packageName)) {
                        app.put("package_name", packageName);
                    }
                    if (StringUtils.isNotEmpty(packageDateMap.get(packageName))) {
                        app.put("date", packageDateMap.get(packageName));
                    }
                    app.put("tag", outMap.get(packageName));
                    outList.add(app);
                }

                outKey.set(MRUtils.JOINER.join(
                        key.toString(),//device_id,手机品牌,gaid或idfa
                        objectMapper.writeValueAsString(outList)  // tag
                ));
                context.write(outKey,NullWritable.get());
            }
        }

        /**
         * 这是一个临时方案,当数据时间大于当前认识时间时,取
         * @param date
         * @return
         */
        protected String getCurrentDate(String date) {
            if (this.date.compareTo(date) < 0) {
                return this.date;
            }
            return date;
        }
    }

}