package mobvista.dmp.datasource.age.mapreduce; import com.google.common.collect.Maps; import mobvista.dmp.util.MRUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.orc.mapred.OrcStruct; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.JavaType; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; /** * Created by liushuai on 2017/2/17 0017. */ public class DmpYearMRV2 { public enum Fields { DEVICE_ID("device_id", 0), DEVICE_TYPE("device_type", 1), PACKAGE_NAMES("package_names", 2), AGE("age", 3), TAG("tag", 4), UPDATE_DATE("update_date", 5); private String name; private int idx; Fields(String name, int idx) { this.name = name; this.idx = idx; } public String getName() { return name; } public int getIdx() { return idx; } } public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException { Configuration conf = new Configuration(); conf.set("mapreduce.map.speculative", "true"); conf.set("mapreduce.reduce.speculative", "true"); conf.set("mapreduce.reduce.memory.mb", "1536"); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); Job job = Job.getInstance(conf, "DmpYearMRV2"); job.setNumReduceTasks(1); job.setJarByClass(DmpYearMRV2.class); job.setMapperClass(DmpYearOrcMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(DmpYearReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static class DmpYearOrcMapper extends Mapper<NullWritable, OrcStruct, Text, NullWritable> { Text outKey = new Text(); ObjectMapper objectMapper = new ObjectMapper(); //转换器 JavaType mapType = objectMapper.getTypeFactory().constructMapType(Map.class, String.class, Map.class); int yes = 0; int no = 0; int other = 0; int valid = 0; int un = 0; Long una = 0L; Map<String, Integer> st = Maps.newHashMap(); Long allNum = 0L; public void map(NullWritable key, OrcStruct value, Context context) throws IOException, InterruptedException { if (value.getNumFields() != 6) { return; } // String line = value.toString(); // String[] fields = MRUtils.SPLITTER.split(line, -1); // if (fields.length != 5) { // return; // } String ages = value.getFieldValue(Fields.AGE.getIdx()).toString(); String tag = value.getFieldValue(Fields.TAG.getIdx()).toString(); String age; String maxAge; allNum = allNum + 1; Map<String, Map> ageMap = objectMapper.readValue(ages, mapType); Map<String, String> ageSource = ageMap.get("age_and_source"); Map<String, Double> ageProportion = ageMap.get("age_and_proportion"); List maxAgeList = getMaxRatio(ageProportion); maxAge = (String) maxAgeList.get(0); double maxRatio = (Double) maxAgeList.get(1); if (!ageSource.containsKey("null")) { if (tag.equals("unbelievable")) { un += 1; una += 1; return; } if (tag.equals("unknown")) { no += 1; other += 1; return; } if (maxRatio < 0.5) { un += 1; return; } Set<String> ageSet = ageSource.keySet(); for (String item : ageSet) { age = Util.colonSplit.split(item, -1)[0]; if (maxAge.equals(age)) { yes += 1; break; } else { no += 1; break; } } } if (tag.equals("unbelievable")) { una += 1; return; } if (tag.equals("unknown")) { other += 1; return; } if (maxRatio < 0.5) { una += 1; return; } Integer sum = st.get(maxAge); if (sum == null) { sum = 0; } st.put(maxAge, sum + 1); valid += 1; } public void cleanup(Context context) throws IOException, InterruptedException { StringBuilder values = new StringBuilder(); values.append(valid); for (Map.Entry<String, Integer> entry : st.entrySet()) { values.append("|"); values.append(entry.getKey()); values.append(":"); values.append(entry.getValue()); } outKey.set(MRUtils.JOINER.join(yes, no, un, allNum, other, una, values )); context.write(outKey, NullWritable.get()); } } public static class DmpYearReducer extends Reducer<Text, NullWritable, Text, NullWritable> { Text outKey = new Text(); int yes = 0; int no = 0; int other = 0; int valid = 0; int validAll = 0; int num = 0; int un = 0; Long una = 0L; Map<String, Integer> st = Maps.newHashMap(); Long allNum = 0L; double ratio = 0.0; double recall = 0.0; public void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { String line = key.toString(); String[] fields = MRUtils.SPLITTER.split(line, -1); yes = yes + Integer.parseInt(fields[0]); no = no + Integer.parseInt(fields[1]); un = un + Integer.parseInt(fields[2]); allNum = allNum + Long.parseLong(fields[3]); other = other + Integer.parseInt(fields[4]); una = una + Long.parseLong(fields[5]); String[] arr = Util.verticalLine.split(fields[6], -1); valid = valid + Integer.parseInt(arr[0]); for (int i = 1; i < arr.length; i++) { String[] item = Util.colonSplit.split(arr[i], -1); String gen = item[0]; Integer sum = st.get(gen); if (sum == null) { sum = 0; } Integer part = Integer.parseInt(item[1]); st.put(gen, sum + part); validAll = validAll + part; } } public void cleanup(Context context) throws IOException, InterruptedException { StringBuilder value = new StringBuilder(); value.append(valid); for (Map.Entry<String, Integer> entry : st.entrySet()) { value.append("|"); value.append(entry.getKey()); value.append(":"); value.append(entry.getValue()); } if (yes + no != 0) { ratio = (double) yes / (yes + no); } if (allNum != 0) { recall = (double) validAll / allNum; } outKey.set(MRUtils.JOINER.join(yes, no, un, ratio, allNum, recall, other, una, validAll, value)); context.write(outKey, NullWritable.get()); } } public static List getMaxRatio(Map<String, Double> ageMap) { String maxAge = ""; double maxRatio = 0.0; for (Map.Entry<String, Double> entry : ageMap.entrySet()) { String ages = entry.getKey(); Double ratio = entry.getValue(); if (ages != null && !ages.equals("")) { if (maxAge.length() == 0) { maxAge = ages; maxRatio = ratio; } else if (maxRatio < ratio) { maxAge = ages; maxRatio = ratio; } } } List maxRatioList = new ArrayList<>(); maxRatioList.add(maxAge); maxRatioList.add(maxRatio); return maxRatioList; } }