package mobvista.dmp.datasource.age.mapreduce; import mobvista.dmp.util.MRUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.JavaType; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * Created by liushuai on 2017/2/16 0016. */ public class ExtractDeviceMR { public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException { Configuration conf = new Configuration(); conf.set("mapreduce.map.speculative", "true"); conf.set("mapreduce.reduce.speculative", "true"); conf.set("mapreduce.task.io.sort.mb", "500"); conf.set("mapreduce.reduce.java.opts", "-Xmx1536m"); conf.set("mapreduce.reduce.memory.mb", "2048"); conf.set("mapreduce.reduce.shuffle.parallelcopies", "50"); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); Job job = Job.getInstance(conf, "ExtractDeviceMR"); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); job.setJarByClass(ExtractDeviceMR.class); job.setMapperClass(ExtractMapper.class); job.setReducerClass(ExtractReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static class ExtractMapper extends Mapper<LongWritable, Text, Text, Text> { ObjectMapper objectMapper = new ObjectMapper(); JavaType javaType = objectMapper.getTypeFactory() .constructCollectionType(ArrayList.class, ExtractDeviceMould.class); Text outKey = new Text(); Text outValue = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = MRUtils.SPLITTER.split(line, -1); String deviceId = fields[0]; String[] tmpDeviceId = Util.lineSplit.split(deviceId, -1);//- if (tmpDeviceId.length != 5) { return; } if (Util.match.matcher(deviceId).matches()) { return; } if (fields.length != 4) { return; } if (fields[3].equals("")) { return; } StringBuilder outputValue = new StringBuilder(); //使用jackson解析json数组 List<ExtractDeviceMould> packageList = objectMapper.readValue(fields[3], javaType); if (packageList.size() == 0) {//说明没有package_name,扔掉 return; } else { for (ExtractDeviceMould deviceMould : packageList) { outputValue.append("#");//循环把package_name加到后面 outputValue.append(deviceMould.getPackage_name()); } } if (outputValue.length() > 0) { outKey.set(fields[0]);//device_id outValue.set(MRUtils.JOINER.join(outputValue.toString(), fields[1] //device_type ));//package_names context.write(outKey, outValue); } } } public static class ExtractReducer extends Reducer<Text, Text, Text, Text> { Text outKey = new Text(); Text outValue = new Text(); public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder outputValue = new StringBuilder(); String deviceId = key.toString(); if (deviceId.equals("")) { return; } String deviceType = ""; for (Text value : values) { String[] arr = MRUtils.SPLITTER.split(value.toString(), -1); deviceType = arr[1]; outputValue.append(arr[0]); } String value = outputValue.substring(1); outKey.set(deviceId); outValue.set(MRUtils.JOINER.join("B", value, deviceType));//package_names context.write(outKey, outValue); } } }