package mobvista.dmp.common; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import mobvista.dmp.util.MRUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.JavaType; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; /** * author: houying * date : 16-11-9 * desc : */ public class InterestDeviceDistinctMR extends CommonMapReduce { public static class InstallDistinctMapper extends Mapper<LongWritable, Text, Text, NullWritable> { private ObjectMapper objectMapper; private JavaType javaType; private Map<String, String> broadcast; private Text outKey = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { objectMapper = new ObjectMapper(); javaType = objectMapper.getTypeFactory().constructMapType(HashMap.class, String.class, String.class); try { // 将broadcast反序列化成map对象 Configuration conf = context.getConfiguration(); String tag = conf.get(Constants.MR_BROADCAST_STR); broadcast = buildBroadcast(tag, conf, 0, 3); } catch (Exception e) { throw new RuntimeException(e); } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] array = MRUtils.SPLITTER.split(value.toString(), -1); if (array.length != 4) { CommonMapReduce.setMetrics(context, "DMP", "column_num_error", 1); return; } String deviceId = array[0]; if (checkDeviceId(deviceId)) { String deviceType = array[1]; String platform = array[2]; JsonNode appList = objectMapper.readTree(array[3]); List<Map<String, Object>> tagList = Lists.newArrayList(); for (JsonNode app : appList) { Map<String, String> map = objectMapper.readValue(app, javaType); String date = map.get("date"); String packageName = map.get("package_name"); if ("ios".equalsIgnoreCase(platform) && packageName.matches("^id[0-9]+$")) { packageName = packageName.replace("id", ""); } String tags = broadcast.get(packageName); if (tags != null) { Map<String, Object> tag = Maps.newHashMap(); tag.put("package_name", packageName); tag.put("tag", objectMapper.readValue(tags, List.class)); tag.put("date", date); tagList.add(tag); } } if (tagList.size() > 0) { outKey.set(MRUtils.JOINER.join( deviceId, deviceType, platform, objectMapper.writeValueAsString(tagList) )); context.write(outKey, NullWritable.get()); } } } } public static class InstallDistinctReducer extends CommonReducer { private ObjectMapper objectMapper = new ObjectMapper(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { List<Map<String, Object>> appList = Lists.newArrayList(); for (Text text : values) { String[] array = MRUtils.SPLITTER.split(text.toString(), -1); if (array.length != 3) { continue; } Map<String, Object> app = Maps.newHashMap(); app.put("package_name", array[0]); app.put("tag", objectMapper.readValue(array[1], List.class)); app.put("date", array[2]); appList.add(app); } if (appList.size() > 0) { out.set(MRUtils.JOINER.join( key.toString(), objectMapper.writeValueAsString(appList) )); context.write(out, NullWritable.get()); } } } public static void main(String[] args) throws Exception { start(new InterestDeviceDistinctMR(args[args.length - 1], InstallDistinctMapper.class, null), args); } public InterestDeviceDistinctMR(String name, Class<? extends Mapper> mapperClass, Class<? extends Reducer> reducerClass) { super(name, mapperClass, reducerClass); } @Override protected void otherSetting(Job job, String[] args) throws Exception { // 读取广播内容,用于map side join Configuration conf = job.getConfiguration(); conf.set(Constants.MR_BROADCAST_STR, args[0]); job.setNumReduceTasks(0); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setInputFormatClass(TextInputFormat.class); } @Override protected void setOutputPath(Job job, String[] args) throws IOException { FileOutputFormat.setOutputPath(job, new Path(args[2])); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); } @Override protected void setInputPath(Job job, String[] args) throws IOException { FileInputFormat.addInputPath(job, new Path(args[1])); } }