package mobvista.dmp.datasource.gender; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.orc.mapred.OrcStruct; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.JavaType; import java.io.IOException; import java.util.Map; /** * Created by liushuai on 2017/1/18 0018. * desc :计算总的男性比例与总的性别数量 */ public class ValidateMRV2 { public enum Fields { DEVICE_ID("device_id", 0), DEVICE_TYPE("device_type", 1), PACKAGE_NAMES("package_names", 2), SOURCE("source", 3), GENDER("gender", 4), RATIO("ratio", 5), TAG("tag", 6), UPDATE_DATE("update_date", 7); private String name; private int idx; Fields(String name, int idx) { this.name = name; this.idx = idx; } public String getName() { return name; } public int getIdx() { return idx; } } public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException { Configuration conf = new Configuration(); conf.set("mapreduce.map.speculative", "true"); conf.set("mapreduce.reduce.speculative", "true"); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); Job job = Job.getInstance(conf, "ValidateMRV2"); job.setNumReduceTasks(1); job.setJarByClass(ValidateMRV2.class); job.setMapperClass(ValidateMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(ValidateReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static class ValidateMapper extends Mapper<NullWritable, OrcStruct, Text, NullWritable> { Text outKey = new Text(); int yes = 0; int no = 0; int other = 0; int female = 0; int man = 0; int mSource = 0; int fSource = 0; int oSource = 0; int mRight = 0; int fRight = 0; int oRight = 0; Long num = 0L; public void map(NullWritable key, OrcStruct value, Context context) { try { if (value.getNumFields() != 8) { return; } /* String line = value.toString(); String[] fields = MRUtils.SPLITTER.split(line, -1); if (fields.length != 7) { return; } */ String gender = value.getFieldValue(Fields.GENDER.getIdx()).toString(); String source = value.getFieldValue(Fields.SOURCE.getIdx()).toString(); int flag = 0; if (gender.equals("o")) { other += 1; } if (gender.equals("f")) { female += 1; } if (gender.equals("m")) { man += 1; } if (!source.equals("null")) { if (source.equals("m")) { mSource++; } else if (source.equals("f")) { fSource++; } else { oSource++; } if (!source.equals(gender)) { flag = 1; } if (flag == 0) { if (source.equals("m")) { mRight++; } else if (source.equals("f")) { fRight++; } else if (source.equals("o")) { oRight++; } yes += 1; } else { no += 1; } } num += 1; } catch (Exception e) { return; } } public void cleanup(Context context) { outKey.set(MRUtils.JOINER.join( yes, no, mSource, fSource, oSource, mRight, fRight, oRight, man, female, other, num )); try { context.write(outKey, NullWritable.get()); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static class ValidateReducer extends Reducer<Text, NullWritable, Text, NullWritable> { Text outKey = new Text(); int yes = 0; int no = 0; int other = 0; int female = 0; int man = 0; int mSource = 0; int fSource = 0; int oSource = 0; int mRight = 0; int fRight = 0; int oRight = 0; Long num = 0L; double ratio = 0.0; public void reduce(Text key, Iterable<NullWritable> values, Context context) { try { String line = key.toString(); String[] fields = MRUtils.SPLITTER.split(line, -1); yes += Integer.parseInt(fields[0]); no += Integer.parseInt(fields[1]); mSource += Integer.parseInt(fields[2]); fSource += Integer.parseInt(fields[3]); oSource += Integer.parseInt(fields[4]); mRight += Integer.parseInt(fields[5]); fRight += Integer.parseInt(fields[6]); oRight += Integer.parseInt(fields[7]); man += Integer.parseInt(fields[8]); female += Integer.parseInt(fields[9]); other += Integer.parseInt(fields[10]); num += Long.parseLong(fields[11]); } catch (Exception e) { return; } } public void cleanup(Context context) { if ((yes + no) != 0) { ratio = (double) (yes) / (yes + no); } outKey.set(MRUtils.JOINER.join( yes, no, ratio, mSource, fSource, oSource, mRight, fRight, oRight, man, female, other, num )); try { context.write(outKey, NullWritable.get()); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } }