package mobvista.dmp.datasource.gender; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import java.io.IOException; /** * Created by liushuai on 2017/1/18 0018. * desc :计算总的男性比例与总的性别数量 */ public class ValidateMR { public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException { Configuration conf = new Configuration(); conf.set("mapreduce.map.speculative", "true"); conf.set("mapreduce.reduce.speculative", "true"); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); Job job = Job.getInstance(conf, "ValidateMR"); job.setNumReduceTasks(1); job.setJarByClass(ValidateMR.class); job.setMapperClass(ValidateMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(ValidateReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static class ValidateMapper extends Mapper<LongWritable, Text, Text, NullWritable> { Text outKey = new Text(); int yes = 0; int no = 0; int other = 0; int female = 0; int man = 0; int mSource = 0; int fSource = 0; int oSource = 0; int mRight = 0; int fRight = 0; int oRight = 0; Long num = 0L; public void map(LongWritable key, Text value, Context context) { try { String line = value.toString(); String[] fields = MRUtils.SPLITTER.split(line, -1); if (fields.length != 7) { return; } int flag = 0; if (fields[4].equals("o")) { other += 1; } if (fields[4].equals("f")) { female += 1; } if (fields[4].equals("m")) { man += 1; } if (!fields[3].equals("null")) { if (fields[3].equals("m")) { mSource++; } else if (fields[3].equals("f")) { fSource++; } else { oSource++; } if (!fields[3].equals(fields[4])) { flag = 1; } if (flag == 0) { if (fields[3].equals("m")) { mRight++; } else if (fields[3].equals("f")) { fRight++; } else if (fields[3].equals("o")) { oRight++; } yes += 1; } else { no += 1; } } num += 1; } catch (Exception e) { return; } } public void cleanup (Context context) { outKey.set(MRUtils.JOINER.join( yes, no, mSource, fSource, oSource, mRight, fRight, oRight, man, female, other, num )); try { context.write(outKey, NullWritable.get()); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static class ValidateReducer extends Reducer<Text, NullWritable, Text, NullWritable> { Text outKey = new Text(); int yes = 0; int no = 0; int other = 0; int female = 0; int man = 0; int mSource = 0; int fSource = 0; int oSource = 0; int mRight = 0; int fRight = 0; int oRight = 0; Long num = 0L; double ratio = 0.0; public void reduce(Text key, Iterable<NullWritable> values, Context context) { try { String line = key.toString(); String[] fields = MRUtils.SPLITTER.split(line, -1); yes += Integer.parseInt(fields[0]); no += Integer.parseInt(fields[1]); mSource += Integer.parseInt(fields[2]); fSource += Integer.parseInt(fields[3]); oSource += Integer.parseInt(fields[4]); mRight += Integer.parseInt(fields[5]); fRight += Integer.parseInt(fields[6]); oRight += Integer.parseInt(fields[7]); man += Integer.parseInt(fields[8]); female += Integer.parseInt(fields[9]); other += Integer.parseInt(fields[10]); num += Long.parseLong(fields[11]); } catch (Exception e) { return; } } public void cleanup(Context context) { if ((yes + no) != 0) { ratio = (double) (yes) / (yes + no); } outKey.set(MRUtils.JOINER.join( yes, no, ratio, mSource, fSource, oSource, mRight, fRight, oRight, man, female, other, num )); try { context.write(outKey, NullWritable.get()); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } }