package mobvista.prd.datasource.countotherid; import mobvista.dmp.util.MRUtils; import mobvista.prd.datasource.newall.NewAllMergeMR; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.JavaType; import java.io.IOException; import java.util.Map; import java.util.regex.Pattern; /** * Created by Administrator on 2017/5/17 0017. */ public class CountOtherIDMR { public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException { Configuration conf = new Configuration(); conf.set("mapreduce.job.reduce.slowstart.completedmaps", "0.5"); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); String numReducers = otherArgs[3]; String[] numrr = numReducers.split(",", -1); Job job = Job.getInstance(conf, "Count other id"); job.setNumReduceTasks(Integer.parseInt(numrr[0])); job.setJarByClass(CountOtherIDMR.class); job.setMapperClass(CountOtherIDMapper.class); job.setReducerClass(CountOtherIDReduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); int exitCode = job.waitForCompletion(true) ? 0 : 1; if (exitCode == 0) { Job countJob = Job.getInstance(conf, "CountOtherIDMR count Job"); countJob.setNumReduceTasks(Integer.parseInt(numrr[1])); countJob.setJarByClass(CountOtherIDMR.class); countJob.setMapperClass(CountMapper.class); countJob.setReducerClass(NewAllMergeMR.CountReducer.class); countJob.setMapOutputKeyClass(Text.class); countJob.setMapOutputValueClass(IntWritable.class); countJob.setOutputKeyClass(Text.class); countJob.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(countJob, new Path(otherArgs[1])); FileOutputFormat.setOutputPath(countJob, new Path(otherArgs[2])); System.exit(countJob.waitForCompletion(true) ? 0 : 1); } } public static class CountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private Text outKey = new Text(); private Pattern pattern = Pattern.compile("\t"); private IntWritable outVal = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] valSplits = pattern.split(value.toString(), -1); if (valSplits.length > 1) { outKey.set(valSplits[1]); context.write(outKey, outVal); } } } public static class CountOtherIDMapper extends Mapper<LongWritable, Text, Text, NullWritable> { Text outKey = new Text("1"); ObjectMapper objectMapper = new ObjectMapper(); //转换器 JavaType mapType = objectMapper.getTypeFactory().constructMapType(Map.class, String.class, String.class); public void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { try { String line = value.toString(); Map<String, String> map = objectMapper.readValue(line, mapType); String imeiId = map.get("imei"); String macId = map.get("mac"); String androidId = map.get("android_id"); if (!StringUtils.isEmpty(imeiId) && !"0".equals(imeiId)) { outKey.set(MRUtils.JOINER.join(imeiId, "imei")); context.write(outKey, NullWritable.get()); } if (!StringUtils.isEmpty(macId) && !"0".equals(macId)) { outKey.set(MRUtils.JOINER.join(macId, "mac")); context.write(outKey, NullWritable.get()); } if (!StringUtils.isEmpty(androidId) && !"0".equals(androidId)) { outKey.set(MRUtils.JOINER.join(androidId, "androidid")); context.write(outKey, NullWritable.get()); } } catch (Exception e) { e.printStackTrace(); } } } public static class CountOtherIDReduce extends Reducer<Text, NullWritable, Text, NullWritable> { @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } }