package mobvista.dmp.datasource.dsp.mapreduce; import com.google.gson.JsonObject; import mobvista.dmp.common.CommonMapReduce; import mobvista.dmp.common.CommonMapper; import mobvista.dmp.util.MRUtils; import mobvista.dmp.util.Standardizer; import mobvista.prd.datasource.util.GsonUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.node.ObjectNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.text.ParseException; /** * author: houying * date : 16-10-25 * desc : */ public class DspDeviceProfileTotalMR extends Configured implements Tool { private static final Logger logger = LoggerFactory.getLogger(DspDeviceProfileTotalMR.class); public static class ProfileTotalMapper extends CommonMapper { private ObjectMapper objectMapper; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String val = value.toString(); String[] array = MRUtils.SPLITTER.split(val); if (array.length >= 13) { //daily ObjectNode objectNode = objectMapper.createObjectNode(); objectNode.put("device_id", array[0]); objectNode.put("device_type", array[1]); objectNode.put("platform", array[2]); objectNode.put("country", array[3]); objectNode.put("ip", array[4]); objectNode.put("gender", array[5]); objectNode.put("birthday", array[6]); objectNode.put("maker", array[7]); objectNode.put("model", array[8]); objectNode.put("os_v", array[9]); objectNode.put("os", array[2]); objectNode.put("time", array[12]); if(array[12].length()<10){ return; } objectNode.put("dmp_time", array[12].substring(0, 10)); // 将UK改为GB,明远需求,修改人冯亮 20170815 String countryCode = objectNode.get("country").getTextValue(); if ("GB".equalsIgnoreCase(countryCode)) { objectNode.put("country", "UK"); } outKey.set(MRUtils.JOINER.join(array[0], array[1])); outValue.set(objectMapper.writeValueAsString(objectNode)); context.write(outKey, outValue); } else { try { JsonNode dspProfile = objectMapper.readTree(val); // 将UK改为GB,明远需求,修改人冯亮 20170815 String countryCode = dspProfile.get("country").getTextValue(); if ("GB".equalsIgnoreCase(countryCode)) { JsonObject obj = GsonUtil.String2JsonObject(val); obj.addProperty("country", "UK"); value.set(obj.toString()); } outKey.set(MRUtils.JOINER.join(dspProfile.get("device_id"), dspProfile.get("device_type"))); context.write(outKey, value); } catch (Exception e) { CommonMapReduce.setMetrics(context, "DMP", "json_parse_error", 1); } } } @Override protected void setup(Context context) throws IOException, InterruptedException { objectMapper = new ObjectMapper(); } } public static class ProfileTotalReducer extends Reducer<Text, Text, NullWritable, Text> { private Text outKey = new Text(); private ObjectMapper objectMapper; @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String outValue = null; String tmpValue; JsonNode dspProfile; JsonNode dspProfileTmp; int count = 0; for (Text value: values) { if (count == 0){ outValue = value.toString(); count++; } else { tmpValue = value.toString(); try { dspProfile = objectMapper.readTree(outValue); dspProfileTmp = objectMapper.readTree(tmpValue); } catch(IOException e) { logger.info("json解析出错: {}", outValue); context.getCounter("dsp_profile_total", "json_parse_error").increment(1); continue; } String dspTime = dspProfile.get("dmp_time").asText(); String dspTimeTmp = dspProfileTmp.get("dmp_time").asText(); try { if (Standardizer.CompareDate(dspTime,dspTimeTmp) <= 0 ){ outValue = tmpValue; } } catch (ParseException e) { e.printStackTrace(); } } } if (outValue != null) { outKey.set(outValue); context.write(NullWritable.get(), outKey); } } @Override protected void setup(Context context) throws IOException, InterruptedException { objectMapper = new ObjectMapper(); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.set("mapreduce.map.speculative", "true"); conf.set("mapreduce.reduce.speculative", "true"); conf.set("mapreduce.task.io.sort.mb", "500"); conf.set("mapreduce.reduce.java.opts", "-Xmx1536m"); conf.set("mapreduce.reduce.memory.mb", "2048"); conf.set("mapreduce.reduce.shuffle.parallelcopies", "50"); Job job = Job.getInstance(conf, "Profile job"); job.setJarByClass(DspDeviceProfileTotalMR.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileInputFormat.addInputPath(job, new Path(args[1])); Path output = new Path(args[2]); FileSystem system = output.getFileSystem(conf); if (system.exists(output)) { system.delete(output, true); } FileOutputFormat.setOutputPath(job, output); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); job.setMapperClass(ProfileTotalMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setReducerClass(ProfileTotalReducer.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new Configuration(), new DspDeviceProfileTotalMR(), args)); } }