package mobvista.dmp.datasource.adn.mapreduce; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import mobvista.dmp.common.CommonMapReduce; import mobvista.dmp.common.CommonMapper; import mobvista.dmp.common.CommonReducer; import mobvista.dmp.util.MRUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.JavaType; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; /** * author: walt * date : 16-11-10 * desc : adn sdk request 按设备聚合app_id */ public class AdnSdkRequestAppidTotalMR extends CommonMapReduce { public AdnSdkRequestAppidTotalMR(String name, Class<? extends Mapper> mapperClass, Class<? extends Reducer> reducerClass) { super(name, mapperClass, reducerClass); } public static class AdnAppidMapper extends CommonMapper { private int exceptionCount = 0; protected String regex = "^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$"; @Override protected void cleanup(Context context) throws IOException, InterruptedException { System.out.println("exception count is " + exceptionCount); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] array = MRUtils.SPLITTER.split(value.toString(), -1); if (array.length == 4) { //app id list handleTotal(array, context); if (!array[0].matches(regex)) { CommonMapReduce.setMetrics(context, "DMP", "device_exeptions", 1); } } else if (array.length == 6) { //daily handleDaily(array, context); } else { exceptionCount++; CommonMapReduce.setMetrics(context,"DMP","exception_Count",1); } } /** * Handle daily. daily data * * @param array the array id:gaid/idfa type:“gaid/idfa” platform:"ios/android" appid pkg_name date * @param context the context * @throws IOException the io exception * @throws InterruptedException the interrupted exception */ private void handleDaily(String[] array, Context context) throws IOException, InterruptedException { outKey.set(MRUtils.JOINER.join( array[0], array[1], array[2] )); outValue.set(MRUtils.JOINER.join( array[3], array[5] )); context.write(outKey, outValue); } /** * Handle total. * * @param array the array * @param context the context * @throws IOException the io exception * @throws InterruptedException the interrupted exception */ private void handleTotal(String[] array, Context context) throws IOException, InterruptedException { outKey.set(MRUtils.JOINER.join( array[0], array[1], array[2] )); outValue.set(array[3]); context.write(outKey, outValue); } } public static class AdnAppidReducer extends CommonReducer { private ObjectMapper objectMapper; private JavaType mapType; @Override protected void setup(Context context) throws IOException, InterruptedException { objectMapper = new ObjectMapper(); mapType = objectMapper.getTypeFactory().constructMapType(HashMap.class, String.class, String.class); } @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String device = key.toString(); Map<String, Map<String, String>> appidMap = Maps.newHashMap(); List<String[]> tmpList = Lists.newArrayList(); for (Text text: values) { String value = text.toString(); if (value.contains("\t")) { tmpList.add(MRUtils.SPLITTER.split(value)); } else { for (JsonNode node: objectMapper.readTree(value)) { Map<String, String> app = objectMapper.readValue(node, mapType); appidMap.put(app.get("app_id"), app); } } } for (String[] record: tmpList) { Map<String, String> app = Maps.newHashMap(); app.put("app_id", record[0]); app.put("date", record[1]); appidMap.put(record[0], app); } out.set(MRUtils.JOINER.join( device, objectMapper.writeValueAsString(appidMap.values()) )); context.write(out, NullWritable.get()); } } public static void main(String[] args) throws Exception { start(new AdnSdkRequestAppidTotalMR("adn request sdk appid total job", AdnAppidMapper.class, AdnAppidReducer.class), args); } @Override protected void otherSetting(Job job, String[] args) throws Exception { } @Override protected void setOutputPath(Job job, String[] args) throws IOException { FileOutputFormat.setOutputPath(job, new Path(args[2])); } @Override protected void setInputPath(Job job, String[] args) throws IOException { FileInputFormat.addInputPath(job, new Path(args[0])); FileInputFormat.addInputPath(job, new Path(args[1])); } }