package mobvista.dmp.datasource.dm.mapreduce; import com.google.common.collect.Maps; import mobvista.dmp.common.CommonMapReduce; import mobvista.dmp.util.MRUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.JavaType; import java.io.IOException; import java.util.*; /** * Created by liushuai * desc :整合dm_interest_tag目录下的数据,统一分区all */ public class DmInterestAllMR1 { public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); Job job = Job.getInstance(conf, "DmInterestAllMR1"); job.setJarByClass(DmInterestAllMR1.class); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); job.setMapperClass(DmInterestAllMapper.class); job.setReducerClass(DmInterestAllReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static class DmInterestAllMapper extends Mapper<LongWritable, Text, Text, Text> { Text outKey = new Text(); Text outValue = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = MRUtils.SPLITTER.split(line,-1); if (fields.length != 4) { return; } if (fields[0].matches(CommonMapReduce.didPtn) && !CommonMapReduce.allZero.equals(fields[0])) { outKey.set(MRUtils.JOINER.join( fields[0], //device_id fields[1], //device_type fields[2] //platform )); outValue.set(fields[3]); context.write(outKey, outValue); } } } public static class DmInterestAllReducer extends Reducer<Text, Text, Text, NullWritable> { private String date; private Text outKey = new Text(); private ObjectMapper objectMapper = new ObjectMapper(); //转换器 private JavaType listType = objectMapper.getTypeFactory().constructCollectionType(ArrayList.class, Map.class); @Override protected void setup(Context context) throws IOException, InterruptedException { date = context.getConfiguration().get("task.date"); } public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { List<String> arrList = new ArrayList<String>(); Map<String, List> outMap = Maps.newHashMap(); for (Text val : values) {//device_id相同的不同目录下的包 arrList.add(val.toString()); } if (arrList.size() == 1) {//device_id一个,其他分区没有,直接输出 outKey.set(MRUtils.JOINER.join(key.toString(),arrList.get(0))); context.write(outKey, NullWritable.get()); } else if (arrList.size() > 1) {//deivice_id有多个,不同分区 List<Map<String, String>> tagList = null;//暂时存储tag里的数据 Map<String, String> packageDateMap = new HashMap<String, String>(); // 存储package对应的最后一次访问date for (String line : arrList) { List<Map<String, Object>> packageNameList = objectMapper.readValue(line, listType);//json转换成list for (Map tagMap : packageNameList) { String date = (String) tagMap.get("date"); String packageName = (String) tagMap.get("package_name"); if (!outMap.containsKey(packageName)) {//不包含包名,第一次出现 tagList = (List) tagMap.get("tag"); outMap.put(packageName, tagList); // 将date放到map中 if (StringUtils.isNotEmpty(date)) { packageDateMap.put(packageName, date); } } else {//不是第一次出现 // 判断日期,如果date > oldDate,将package对应的日期换成date String oldDate = packageDateMap.get(packageName); if (StringUtils.isNotEmpty(date) && StringUtils.isNotEmpty(oldDate) && oldDate.compareTo(date) < 0) { packageDateMap.put(packageName, date); } } } } Map.Entry<String, List> entry = null; Iterator<Map.Entry<String, List>> itr = outMap.entrySet().iterator(); List<Map<String, Object>> outList = new ArrayList<Map<String, Object>>(); while (itr.hasNext()) { entry = itr.next(); String packageName = entry.getKey(); Map<String, Object> app = Maps.newHashMap(); if (StringUtils.isNotEmpty(packageName)) { app.put("package_name", packageName); } if (StringUtils.isNotEmpty(packageDateMap.get(packageName))) { app.put("date", packageDateMap.get(packageName)); } app.put("tag", outMap.get(packageName)); outList.add(app); } outKey.set(MRUtils.JOINER.join( key.toString(),//device_id,手机品牌,gaid或idfa objectMapper.writeValueAsString(outList) // tag )); context.write(outKey,NullWritable.get()); } } /** * 这是一个临时方案,当数据时间大于当前认识时间时,取 * @param date * @return */ protected String getCurrentDate(String date) { if (this.date.compareTo(date) < 0) { return this.date; } return date; } } }