package mobvista.prd.datasource.table;

import com.google.common.collect.Interner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import mobvista.dmp.util.MRUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.List;
import java.util.Map;

/**
 * Created by Administrator on 2017/3/9 0009.
 * desc:根据setting日志得到的app_id与pkg_name对应关系表join抓取到的app_id,app_name,platform,impression
 *       如果一个pkg_name对应多个app_id,则取impression大的app_id
 *       得到app_id,pkg_name,app_name
 */
public class MergeAppIDMR {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        conf.set("file", otherArgs[2]);

        Job job = Job.getInstance(conf, "MergeAppIDMR");

        job.setJarByClass(MergeAppIDMR.class);

        job.setMapperClass(MergeAppMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(MergeAppReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class MergeAppMapper extends Mapper<LongWritable, Text, Text, Text> {
        Map<String, String> packageMap = Maps.newHashMap();
        Text outKey = new Text();
        Text outValue = new Text();

        public void setup(Context context) throws IOException, InterruptedException {
            //读取CalcPackageDictMR的结果,放入Map中
            String path = context.getConfiguration().get("file");
            FileSystem fileSystem = FileSystem.get(URI.create(path), context.getConfiguration());
            BufferedReader reader = new BufferedReader(new InputStreamReader(fileSystem.open(new Path(path))));
            String readLine = "";
            while ((readLine = reader.readLine()) != null) {
                String[] arr = readLine.split("\t", -1);
                packageMap.put(arr[2], arr[0] + "\t" + arr[1] + "\t" + arr[3]);
            }
            reader.close();
        }

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            try {
                String line = value.toString();
                if (line.contains("\t")) {
                    String[] fields = line.split("\t", -1);
                    String appName = packageMap.get(fields[0]);
                    if (appName == null) {
                        return;
                    }
                    if (fields[1] == null || fields[1].equals("") || fields[1].length() == 0) {
                        return;
                    }

                    outKey.set(fields[0]);
                    outValue.set(fields[1] + "\t" + appName);
                    context.write(outKey, outValue);
                } else if (line.contains(",")) {
                    String[] fields = line.split(",", -1);
                    String appName = packageMap.get(fields[0]); //app_id
                    if (appName == null) {
                        return;
                    }
                    if (fields[1] == null || fields[1].equals("") || fields[1].length() == 0) {
                        return;
                    }
                    outKey.set(fields[1]);
                    outValue.set(fields[0] + "\t" + appName); //app_id,platform,app_name,impression
                    context.write(outKey, outValue);
                }
            } catch (Exception e) {
                return;
            }
        }
    }
    public static class MergeAppReducer extends Reducer<Text, Text, Text, NullWritable> {
        Text outKey = new Text();

        public void reduce (Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            List<String> list = Lists.newArrayList();
            for (Text val : values) {
                list.add(val.toString());
            }
            String out = "";
            if (list.size() > 1) {
                int max = 0;
                for (String line : list) {
                    String[] arr = line.split("\t",-1);
                    String num = arr[3].replace(",","");
                    if (Integer.parseInt(num) > max) {
                        max = Integer.parseInt(num);
                        out = arr[0] + "\t" + arr[2];
                    }
                }
            } else {
                String[] arr = list.get(0).split("\t",-1);
                out = arr[0] + "\t" + arr[2];
            }
            String[] fields = out.split("\t",-1);
            outKey.set(MRUtils.JOINER.join(fields[0],key,fields[1])); //app_id,pkg_name,app_name
            context.write(outKey, NullWritable.get());
        }
    }
}