package mobvista.dmp.datasource.age.mapreduce; import com.google.common.collect.Maps; import mobvista.dmp.util.MRUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.codehaus.jackson.map.ObjectMapper; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.math.BigDecimal; import java.net.URI; import java.util.Map; /** * Created by liushuai on 2017/2/17 0017. */ public class CalcDeviceAgeMR { public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException { Configuration conf = new Configuration(); conf.set("mapreduce.map.speculative", "true"); conf.set("mapreduce.reduce.speculative", "true"); conf.set("mapreduce.task.io.sort.mb", "500"); conf.set("mapreduce.reduce.java.opts", "-Xmx1536m"); conf.set("mapreduce.reduce.memory.mb", "2048"); conf.set("mapreduce.reduce.shuffle.parallelcopies", "50"); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); conf.set("file", otherArgs[2]); conf.set("mapreduce.map.memory.mb", "1331"); Job job = Job.getInstance(conf, "CalcDeviceAgeMR"); job.setNumReduceTasks(0); job.setJarByClass(CalcDeviceAgeMR.class); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); job.setMapperClass(CalcDeviceYearMapper.class); // 取消part-r-00000新式文件输出 LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); // job.setOutputFormatClass(TextMultipleOutputFormat.class); // MultipleOutputs.addNamedOutput(job, "age", TextOutputFormat.class, // Text.class, Text.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static class CalcDeviceYearMapper extends Mapper<LongWritable, Text, Text, Text> { Map<String, String> packageMap = Maps.newHashMapWithExpectedSize(10000); Text outKey = new Text(); // Text outKey_v2 = new Text(); Text outValue = new Text(); ObjectMapper objectMapper = new ObjectMapper(); //转换器 // private String outPath; // private String outPath_v2; public void setup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); // outPath = conf.get("outPath"); // outPath_v2 = conf.get("outPath_v2"); //读取CalcPackageDictMR的结果,放入Map中 String path = conf.get("file"); FileSystem fileSystem = FileSystem.get(URI.create(path), context.getConfiguration()); BufferedReader reader = new BufferedReader(new InputStreamReader(fileSystem.open(new Path(path)))); String readLine = ""; while ((readLine = reader.readLine()) != null) { String[] arr = MRUtils.SPLITTER.split(readLine, -1); String age = arr[1].substring(1); packageMap.put(arr[0], age + "\t" + arr[2]); } reader.close(); } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = MRUtils.SPLITTER.split(line, -1); if (fields.length != 4) { return; } String[] arr = Util.wellSplit.split(fields[1], -1); String score = "-1.0"; int num = 0; String tag = "unknown"; Map<Integer, Double> stage = Maps.newHashMap(); Map<Integer, Integer> stageNum = Maps.newHashMap(); Map<Integer, Double> st = Maps.newHashMap(); for (String packageName : arr) { String ageRatioSource = packageMap.get(packageName); if (ageRatioSource == null) { continue; } String[] dictValue = MRUtils.SPLITTER.split(ageRatioSource); if (dictValue[1].equals("confirm")) { score = dictValue[0]; tag = dictValue[1]; num = 1; break; } else if (dictValue[1].equals("unbelievable")) { if (tag.equals("unknown")) { score = "-1.0"; tag = dictValue[1]; } } else { num += 1; String[] item = Util.verticalLine.split(dictValue[0], -1); for (String ageRatios : item) { if (ageRatios == null) { continue; } int generation = Integer.parseInt(Util.colonSplit.split(ageRatios, -1)[0]);//年龄段 double ageRatio = Double.parseDouble(Util.colonSplit.split(ageRatios, -1)[1]);//权重 Double stageAgeRatio = stage.get(generation); if (stageAgeRatio == null) { stage.put(generation, ageRatio); stageNum.put(generation, 1); } else { double val = stageAgeRatio + ageRatio; stage.put(generation, val); stageNum.put(generation, stageNum.get(generation) + 1); } } score = ""; tag = dictValue[1]; } } if (num == 1 && tag.equals("confirm")) { score = "|" + score; } if (num == 1 && tag.equals("calc")) { for (Map.Entry<Integer, Double> entry : stage.entrySet()) { score = score + '|' + entry.getKey() + ":" + entry.getValue(); } } if (num > 1) { for (Map.Entry<Integer, Double> entry : stage.entrySet()) { Integer gen = entry.getKey(); double rat = entry.getValue() / stageNum.get(gen); st.put(gen, rat); } for (Map.Entry<Integer, Double> entry : st.entrySet()) { score = score + '|' + entry.getKey() + ":" + entry.getValue(); } } Map<String, Double> outMap = Maps.newHashMap(); outMap.put("0-17", 0.0); outMap.put("18-24", 0.0); outMap.put("25-44", 0.0); outMap.put("45-59", 0.0); outMap.put("60+", 0.0); String[] scoreArr = score.split("\\|", -1); BigDecimal ageRatioDouble = null; //小数转换 for (String ageRatio : scoreArr) { if (!ageRatio.equals("") && !ageRatio.equals("-1.0")) { String[] tmpRatio = Util.colonSplit.split(ageRatio, -1); String generation = Util.getAge(Integer.parseInt(tmpRatio[0])); if (generation != null && !generation.equals("")) { ageRatioDouble = new BigDecimal(Double.parseDouble(tmpRatio[1])) .setScale(6, BigDecimal.ROUND_HALF_UP); outMap.put(generation, ageRatioDouble.doubleValue()); } } else if (ageRatio.equals("-1.0")) { outMap = Maps.newHashMap(); outMap.put("unbelievable", -1.0); } } Map<String, String> ageMap = Maps.newHashMap(); if (!fields[2].equals("null")) { String[] ageTags = fields[2].split("\\$", -1); for (String ageTag : ageTags) { if (ageTag == null || ageTag.equals("")) { return; } String[] ageForm = Util.wellSplit.split(ageTag, -1); String generation = Util.getAge(Integer.parseInt(ageForm[0])); if (!generation.equals("")) { ageMap.put(generation, ageForm[1]); } } } else { ageMap.put("null", "null"); } Map<String, Map> lastMap = Maps.newLinkedHashMap(); lastMap.put("age_and_source", ageMap); lastMap.put("age_and_proportion", outMap); String packageName = fields[1].replace("#", ",");//包名之间以逗号分隔 String[] ss = Util.wellSplit.split(fields[3], -1); outValue.set(MRUtils.JOINER.join(objectMapper.writeValueAsString(lastMap), tag)); outKey.set(MRUtils.JOINER.join(fields[0], //device_id ss[0], // device_type packageName // 包名 )); context.write(outKey, outValue); /* outKey_v2.set(outPath_v2 + "," + MRUtils.JOINER.join(fields[0], //device_id ss[0], // device_type packageName, // 包名 ss[1] // update_date )); context.write(outKey_v2, outValue); */ // context.write(outKey, outValue } } }