package mobvista.dmp.datasource.gender; import com.google.common.collect.Maps; import mobvista.dmp.format.TextMultipleOutputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.Map; public class CalcDeviceGenderMR { /** * author:liushuai * date:2017-01-18 * desc :根据标签计算比例 */ public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException { Configuration conf = new Configuration(); conf.set("mapreduce.map.speculative", "true"); conf.set("mapreduce.reduce.speculative", "true"); conf.set("mapreduce.task.io.sort.mb", "500"); conf.set("mapreduce.reduce.java.opts", "-Xmx1536m"); conf.set("mapreduce.reduce.memory.mb", "2048"); conf.set("mapreduce.reduce.shuffle.parallelcopies", "50"); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); conf.set("file", otherArgs[2]); Job job = Job.getInstance(conf, "CalcDeviceGenderMR"); job.setJarByClass(CalcDeviceGenderMR.class); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); job.setMapperClass(CalcDeviceGenderMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); job.setOutputFormatClass(TextMultipleOutputFormat.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static class CalcDeviceGenderMapper extends Mapper<LongWritable, Text, Text, Text> { Map<String, String> packageMap = Maps.newHashMap(); Text outKey = new Text(); Text outKey_v2 = new Text(); Text outValue = new Text(); private String outPath; private String outPath_v2; public void setup(Context context) throws IOException, InterruptedException { //读取CalcPackageDictMR的结果,放入Map中 String path = context.getConfiguration().get("file"); FileSystem fileSystem = FileSystem.get(URI.create(path), context.getConfiguration()); BufferedReader reader = new BufferedReader(new InputStreamReader(fileSystem.open(new Path(path)))); String readLine = ""; while ((readLine = reader.readLine()) != null) { String[] arr = MRUtils.SPLITTER.split(readLine, -1); packageMap.put(arr[0], arr[1] + "\t" + arr[2]); } reader.close(); } public void map(LongWritable key, Text value, Context context) { Configuration conf = context.getConfiguration(); outPath = conf.get("outPath"); outPath_v2 = conf.get("outPath_v2"); try { String label = ""; String line = value.toString(); String[] fields = line.split("\t", -1); String[] arr = fields[2].split("#", -1); double ratio = -1; double score = 0.0; int num = 0; String tag = "unknown"; for (int i = 0; i < arr.length; i++) { if (arr[i].length() < 2) { continue; } if (packageMap.containsKey(arr[i])) { String[] dictValue = MRUtils.SPLITTER.split(packageMap.get(arr[i])); if (dictValue[1].equals("confirm")) { score = Double.parseDouble(dictValue[0]); tag = dictValue[1]; num = 1; break; } else { num += 1; score += Double.parseDouble(dictValue[0]); tag = dictValue[1]; } } } if (num >= 1) { ratio = score / num; } if (ratio == -1) { label = "o"; } else if (ratio >= 0.5) { label = "m"; } else if (ratio < 0.5) { label = "f"; ratio = 1 - ratio; } String packageName = fields[2].replace("#", ",");//以逗号分隔 outKey.set(outPath + "," + MRUtils.JOINER.join(fields[0], fields[1], packageName, fields[3])); // 加入 // update_date 字段 outValue.set(MRUtils.JOINER.join(label, ratio, tag)); context.write(outKey, outValue); outKey_v2.set(outPath_v2 + "," + MRUtils.JOINER.join(fields[0], fields[1], packageName, fields[3], fields[4].split("#")[1])); // 加入 update_date 字段 context.write(outKey_v2, outValue); } catch (Exception e) { return; } } } }