package mobvista.dmp.datasource.dm.mapreduce; import com.google.gson.JsonArray; import mobvista.dmp.util.MRUtils; import mobvista.prd.datasource.util.GsonUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; public class DmInterestAllCombineMR extends Configured implements Tool { @Override public int run(String[] args) throws Exception { String[] otherArgs = new GenericOptionsParser(this.getConf(), args).getRemainingArgs(); this.getConf().set("mapreduce.map.speculative", "true"); this.getConf().set("mapreduce.reduce.speculative", "true"); this.getConf().set("mapreduce.task.io.sort.mb", "500"); this.getConf().set("mapreduce.reduce.java.opts", "-Xmx1536m"); this.getConf().set("mapreduce.reduce.memory.mb", "2048"); this.getConf().set("mapreduce.reduce.shuffle.parallelcopies", "50"); Job job = Job.getInstance(this.getConf(), "DmInterestAllCombineMR"); job.setJarByClass(DmInterestAllCombineMR.class); job.setMapperClass(DmInterestAllCombineMapper.class); job.setReducerClass(DmInterestAllCombineReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); return job.waitForCompletion(true) ? 0 : 1; } /** * */ public static class DmInterestAllCombineMapper extends Mapper<LongWritable, Text, Text, Text> { private Text outKey = new Text(); private Text outValue = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { try { String[] valueSplits = MRUtils.SPLITTER.split(value.toString(), -1); if (valueSplits.length != 4 ) { return; } outKey.set(MRUtils.JOINER.join( valueSplits[0], // deviceId valueSplits[1], // deviceType valueSplits[2] // platform )); outValue.set(valueSplits[3]); // jsonObject.toString context.write(outKey, outValue); } catch (Exception e) { throw new RuntimeException(value.toString()); } } } /** * */ public static class DmInterestAllCombineReducer extends Reducer<Text, Text, Text, Text> { private Text outValue = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { JsonArray array = new JsonArray(); for (Text value: values) { array.add(GsonUtil.String2JsonObject(value.toString())); } outValue.set(array.toString()); context.write(key, outValue); } } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new Configuration(), new DmInterestAllCombineMR(), args)); } }