package mobvista.dmp.datasource.dm.mapreduce; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import mobvista.dmp.common.CommonMapReduce; import mobvista.dmp.datasource.ga.mapreduce.vo.TextPairSort; import mobvista.dmp.util.MRUtils; import mobvista.prd.datasource.util.GsonUtil; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * Created by liushuai * desc :整合dm_interest_tag目录下的数据,统一分区all */ public class DmInterestAllMR extends Configured implements Tool { public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new Configuration(), new DmInterestAllMR(), args)); } @Override public int run(String[] args) throws Exception { String[] otherArgs = new GenericOptionsParser(this.getConf(), args).getRemainingArgs(); this.getConf().set("mapreduce.map.speculative", "true"); this.getConf().set("mapreduce.reduce.speculative", "true"); this.getConf().set("mapreduce.task.io.sort.mb", "500"); this.getConf().set("mapreduce.reduce.java.opts", "-Xmx1536m"); this.getConf().set("mapreduce.reduce.memory.mb", "2048"); this.getConf().set("mapreduce.reduce.shuffle.parallelcopies", "50"); Job job = Job.getInstance(this.getConf(), "DmInterestAllMR"); job.setJarByClass(DmInterestAllMR.class); job.setMapperClass(DmInterestAllMapper.class); job.setReducerClass(DmInterestAllReducer.class); job.setMapOutputKeyClass(TextPairSort.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setGroupingComparatorClass(TextPairSort.FirstComparator.class); job.setPartitionerClass(TextPairSort.FirstPartitioner.class); job.setInputFormatClass(TextInputFormat.class); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); return job.waitForCompletion(true) ? 0 : 1; } /** * 使用二次排序对每个包,按照日期倒序排 */ public static class DmInterestAllMapper extends Mapper<LongWritable, Text, TextPairSort, Text> { Text outValue = new Text(); private TextPairSort outKey = new TextPairSort(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = MRUtils.SPLITTER.split(line,-1); if (fields.length != 4) { return; } String deviceId = fields[0]; if ((deviceId.matches(CommonMapReduce.didPtn) && !CommonMapReduce.allZero.equals(deviceId)) || deviceId.matches(CommonMapReduce.andriodIdPtn) || deviceId.matches(CommonMapReduce.imeiPtn)){ String deviceType = fields[1]; String platform = fields[2]; String interest = fields[3]; JsonObject jsonObject = null; JsonArray tagsArray = GsonUtil.String2JsonArray(interest); for (JsonElement tagElement : tagsArray) { jsonObject = tagElement.getAsJsonObject(); String date = getDate(jsonObject.get("date")); String tag = jsonObject.get("tag").toString(); String packageName = getPackageName(jsonObject.get("package_name")); outKey.set(MRUtils.JOINER.join( deviceId, deviceType, platform, packageName ), date, "false"); outValue.set(MRUtils.JOINER.join( date, tag )); context.write(outKey, outValue); } } } private String getDate(JsonElement element) { if (element != null && !element.isJsonNull()) { return element.getAsString(); } return "1970-01-01"; } private String getPackageName(JsonElement element) { if (element != null && !element.isJsonNull()) { return element.getAsString(); } return ""; } } /** * */ public static class DmInterestAllReducer extends Reducer<TextPairSort, Text, Text, NullWritable> { private Text outKey = new Text(); @Override protected void reduce(TextPairSort key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String[] keySplit = MRUtils.SPLITTER.split(key.getFirst().toString(), -1); String packageName = keySplit[3]; // 取日期最近的数据,即取第一条记录 String value = values.iterator().next().toString(); String[] valSplits = MRUtils.SPLITTER.split(value, -1); String date = valSplits[0]; String tag = valSplits[1]; JsonObject jsonObject = new JsonObject(); if (StringUtils.isNotEmpty(date) && !"1970-01-01".equals(date)) { jsonObject.addProperty("date", date); } if (StringUtils.isNotEmpty(packageName)) { jsonObject.addProperty("package_name", packageName); } jsonObject.add("tag", GsonUtil.String2JsonArray(tag)); outKey.set(MRUtils.JOINER.join( keySplit[0], // deviceId keySplit[1], // deviceType keySplit[2], // platform jsonObject.toString() )); context.write(outKey, NullWritable.get()); } } }