package mobvista.prd.datasource.eggplants; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import java.io.IOException; import java.util.List; import java.util.Map; /** * Created by Administrator on 2017/5/3 0003. */ public class EggplantsAppCountryAllMR { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); conf.set("mapreduce.reduce.memory.mb", "1280"); Job job = Job.getInstance(conf, "EggplantsAppCountryAllMR"); job.setJarByClass(EggplantsAppCountryAllMR.class); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); job.setMapperClass(EggplantsMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // job.setCombinerClass(EggplantsCombiner.class); job.setReducerClass(EggplantsCombiner.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileInputFormat.addInputPath(job, new Path(otherArgs[1])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static class EggplantsMapper extends Mapper<LongWritable, Text, Text, Text> { Map<String,String> packageMap = Maps.newHashMap(); Text outKey = new Text(); Text outValue = new Text(); // Set<String> matchingSet = Sets.newHashSet("com.toi.reader.activities","com.newsdog","com.eterno","com.uc.iflow","id.co.babe","com.jakarta.baca","org.detikcom.rss","com.snapdeal.main","com.flipkart.android","com.lazada.android","com.zalora.android","com.goibibo","com.makemytrip","com.gojek.app","com.traveloka.android","id785385147","id624639017","id742044692","id297606951","id368677368","id539179365","id944875099","id898244857"); // public void setup(Context context) throws IOException, InterruptedException { // //读取CalcPackageDictMR的结果,放入Map中 // String path = context.getConfiguration().get("file"); // FileSystem fileSystem = FileSystem.get(URI.create(path), context.getConfiguration()); // BufferedReader reader = new BufferedReader(new InputStreamReader(fileSystem.open(new Path(path)))); // String readLine = ""; // while ((readLine = reader.readLine()) != null) { // String[] arr = MRUtils.SPLITTER.split(readLine, -1); // String packageName = arr[3].replace("[","").replace("]","").replace("\"",""); // packageMap.put(arr[0] + "\t" + arr[1], packageName); // } // reader.close(); // } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t",-1); if (fields.length == 4) { outKey.set(fields[0] + "\t" + fields[1]); String packageName = fields[3].replace("[","").replace("]","").replace("\"",""); String[] arr = packageName.split(",",-1); for (String pakName : arr) { // if (matchingSet.contains(pakName)) { outValue.set(pakName); context.write(outKey,outValue); // } } } else if (fields.length > 5) { outKey.set(fields[0] + "\t" + fields[1]); outValue.set(fields[9]); context.write(outKey, outValue); } } } public static class EggplantsReducer extends Reducer<Text, Text, Text, Text> { Text outKey = new Text(); Text outValue = new Text(); int all = 0; public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text val:values) { all++; } outValue.set(all+""); context.write(key, outValue); } } public static class EggplantsCombiner extends Reducer<Text, Text, Text, Text> { Text outKey = new Text(); Text outValue = new Text(); int all = 0; public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { List<String> list = Lists.newArrayList(); String country = ""; for (Text val:values) { if (val.toString().length() == 2) { country = val.toString(); continue; } list.add(val.toString()); } if (country.equals("")) { return; } for (String pakName : list) { outKey.set(country+"\t"+pakName); outValue.set("1"); context.write(outKey, outValue); } } } }