package mobvista.prd.datasource.eggplants;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
 * Created by Administrator on 2017/5/3 0003.
 */
public class EggplantsAppCountryAllMR {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        conf.set("mapreduce.reduce.memory.mb", "1280");

        Job job = Job.getInstance(conf, "EggplantsAppCountryAllMR");
        job.setJarByClass(EggplantsAppCountryAllMR.class);
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

        job.setMapperClass(EggplantsMapper.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

    //    job.setCombinerClass(EggplantsCombiner.class);

        job.setReducerClass(EggplantsCombiner.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
    public static class EggplantsMapper extends Mapper<LongWritable, Text, Text, Text> {
        Map<String,String> packageMap = Maps.newHashMap();
        Text outKey = new Text();
        Text outValue = new Text();

 //       Set<String> matchingSet = Sets.newHashSet("com.toi.reader.activities","com.newsdog","com.eterno","com.uc.iflow","id.co.babe","com.jakarta.baca","org.detikcom.rss","com.snapdeal.main","com.flipkart.android","com.lazada.android","com.zalora.android","com.goibibo","com.makemytrip","com.gojek.app","com.traveloka.android","id785385147","id624639017","id742044692","id297606951","id368677368","id539179365","id944875099","id898244857");
//        public void setup(Context context) throws IOException, InterruptedException {
//            //读取CalcPackageDictMR的结果,放入Map中
//            String path = context.getConfiguration().get("file");
//            FileSystem fileSystem = FileSystem.get(URI.create(path), context.getConfiguration());
//            BufferedReader reader = new BufferedReader(new InputStreamReader(fileSystem.open(new Path(path))));
//            String readLine = "";
//            while ((readLine = reader.readLine()) != null) {
//                String[] arr = MRUtils.SPLITTER.split(readLine, -1);
//                String packageName = arr[3].replace("[","").replace("]","").replace("\"","");
//                packageMap.put(arr[0] + "\t" + arr[1], packageName);
//            }
//            reader.close();
//        }
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] fields = line.split("\t",-1);
            if (fields.length == 4) {
                outKey.set(fields[0] + "\t" + fields[1]);
                String packageName = fields[3].replace("[","").replace("]","").replace("\"","");
                String[] arr = packageName.split(",",-1);
                for (String pakName : arr) {
//                    if (matchingSet.contains(pakName)) {
                        outValue.set(pakName);
                        context.write(outKey,outValue);
//                    }
                }

            } else if (fields.length > 5) {
                outKey.set(fields[0] + "\t" + fields[1]);
                outValue.set(fields[9]);
                context.write(outKey, outValue);
            }
        }
    }

    public static class EggplantsReducer extends Reducer<Text, Text, Text, Text> {
        Text outKey = new Text();
        Text outValue = new Text();
        int all = 0;
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            for (Text val:values) {
                all++;
            }
            outValue.set(all+"");
            context.write(key, outValue);
        }
    }

    public static class EggplantsCombiner extends Reducer<Text, Text, Text, Text> {
        Text outKey = new Text();
        Text outValue = new Text();
        int all = 0;
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            List<String> list = Lists.newArrayList();
            String country = "";
            for (Text val:values) {
                if (val.toString().length() == 2) {
                    country = val.toString();
                    continue;
                }
                list.add(val.toString());
            }
            if (country.equals("")) {
                return;
            }
            for (String pakName : list) {
                outKey.set(country+"\t"+pakName);
                outValue.set("1");
                context.write(outKey, outValue);
            }
        }
    }
}