package mobvista.dmp.datasource.gender; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import java.io.IOException; import java.math.BigDecimal; import java.util.HashMap; import java.util.Map; import java.util.Set; public class CalcPackageGenderMR { /** * author:liushuai * * @throws InterruptedException * @throws IOException * @throws ClassNotFoundException date :2017-01-18 * desc : 计算每个包名字下面性别所占比例 */ public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException { Configuration conf = new Configuration(); conf.set("mapreduce.map.speculative", "true"); conf.set("mapreduce.reduce.speculative", "true"); conf.set("mapreduce.task.io.sort.mb", "500"); conf.set("mapreduce.reduce.java.opts", "-Xmx1536m"); conf.set("mapreduce.reduce.memory.mb", "2048"); conf.set("mapreduce.reduce.shuffle.parallelcopies", "50"); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); Job job = Job.getInstance(conf, "CalcPackageGenderMR"); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); job.setJarByClass(CalcPackageGenderMR.class); job.setMapperClass(CalcPackageGenderMapper.class); job.setReducerClass(CalcPackageGenderReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static class CalcPackageGenderMapper extends Mapper<LongWritable, Text, Text, Text> { Text outKey = new Text(); Text outValue = new Text(); public void map(LongWritable key, Text value, Context context) { try { String line = value.toString(); String[] fields = MRUtils.SPLITTER.split(line, -1); if (fields.length != 5) { return; } String[] packages = MRUtils.SPLITTER1.split(fields[2], -1); String gender = ""; if (fields[3].equals("m")) { //gender if (packages.length >= 30 || packages.length <= 3) { gender = "null"; } else { gender = fields[3]; } } else { gender = fields[3]; } for (int i = 0; i < packages.length; i++) { if (packages[i].length() < 2) { continue; } outKey.set(packages[i]); outValue.set(gender); context.write(outKey, outValue); } } catch (Exception e) { return; } } } public static class CalcPackageGenderReducer extends Reducer<Text, Text, Text, Text> { Text outValue = new Text(); public void reduce(Text key, Iterable<Text> values, Context context) { try { StringBuilder value = new StringBuilder(); Map<String, Integer> genderNum = new HashMap<String, Integer>(); int inNum = 0; //总的数量 int valid = 0; //有性别标签的数量 int mGender = 0; //男性的数量 int fGender = 0; BigDecimal gaRatioDouble = null; //小数转换 double mRatio = 0.0;//男性占总性别标签的比例 for (Text val : values) { String pack = val.toString(); Integer sum = genderNum.get(pack); if (sum == null) { sum = 1; } else { sum++; } genderNum.put(pack, sum); if (!pack.equals("null")) { if (pack.equals("m")) { mGender += 1; } else if (pack.equals("f")) { fGender += 1; } valid += 1;//有性别标签就+1 } inNum += 1;//不管有没有性别标签都+1 } Set<String> set = genderNum.keySet(); for (String name : set) { int num = genderNum.get(name); value.append("|"); //每个包下的性别标签加到一起 value.append(name); value.append(":"); value.append(num); } if (valid != 0) { mRatio = (double) (mGender) / valid; } outValue.set(MRUtils.JOINER.join(inNum, valid, mGender, fGender, mRatio, value.toString())); context.write(key, outValue); } catch (Exception e) { return; } } } }