package mobvista.dmp.datasource.adn.mapreduce; import com.google.common.base.Joiner; import mobvista.dmp.common.CommonMapReduce; import mobvista.dmp.common.CommonMapper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.joda.time.format.DateTimeFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.regex.Pattern; public class AdnRequestDeviceTotalMR extends Configured implements Tool { public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new Configuration(), new AdnRequestDeviceTotalMR(), args)); } public int run(String[] strings) throws Exception { Configuration conf = getConf(); conf.set("mapreduce.map.speculative", "true"); conf.set("mapreduce.reduce.speculative", "true"); conf.set("mapreduce.task.io.sort.mb", "500"); conf.set("mapreduce.reduce.java.opts", "-Xmx1536m"); conf.set("mapreduce.reduce.memory.mb", "2048"); conf.set("mapreduce.reduce.shuffle.parallelcopies", "50"); Job job = Job.getInstance(conf, "adn total"); job.setJarByClass(AdnRequestDeviceTotalMR.class); Path outputPath = new Path(strings[2]); FileInputFormat.addInputPath(job, new Path(strings[0])); FileInputFormat.addInputPath(job, new Path(strings[1])); FileSystem fileSystem = outputPath.getFileSystem(conf); if (fileSystem.exists(outputPath)) { fileSystem.delete(outputPath, true); } FileOutputFormat.setOutputPath(job, outputPath); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); //job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(AdnTotalMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(AdnReducer.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); return job.waitForCompletion(true) ? 0 : 1; } public static class AdnReducer extends Reducer<Text, Text, Text, Text> { private Pattern splitPtn = Pattern.compile("\t"); protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String deviceId = key.toString(); String deviceType = null; String mac = null; String platform = null; String osVersion = null; String sdkVersion = null; String deviceModel = null; String deviceBrand = null; String screenSize = null; String countryCode = null; String language = null; String updateTime = null; for (Text value: values) { final String[] fields = splitPtn.split(value.toString()); if (fields.length == 12) { //total表 if (updateTime == null || updateTime.compareTo(fields[11]) < 0) { deviceType = fields[0]; mac = fields[1]; platform = fields[2]; osVersion = fields[3]; sdkVersion = fields[4]; deviceModel = fields[5]; deviceBrand = fields[6]; screenSize = fields[7]; countryCode = fields[8]; language = fields[9]; updateTime = fields[11]; } } else if (fields.length == 11) {//daily String timestamp = fields[10]; final String datetime = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").print(Long.valueOf(timestamp + "000")); if (updateTime == null || updateTime.compareTo(datetime) < 0) { deviceType = fields[0]; mac = fields[1]; platform = fields[2]; osVersion = fields[3]; sdkVersion = fields[4]; deviceModel = fields[5]; deviceBrand = fields[6]; screenSize = fields[7]; countryCode = fields[8]; language = fields[9]; updateTime = datetime; } } } if (deviceType == null || !(deviceType.equals("idfa") || deviceType.equals("gaid") || deviceType.equals("imei"))) { return; } context.write(new Text(deviceId), new Text(Joiner.on("\t").join(deviceType, mac, platform, osVersion, sdkVersion, deviceModel, deviceBrand, screenSize, countryCode, language, updateTime, updateTime))); } } public static class AdnTotalMapper extends AdnMapper { private static final Logger logger = LoggerFactory.getLogger(AdnTotalMapper.class); private int exceptionCount = 0; private int columnNum; @Override protected void setup(Context context) throws IOException, InterruptedException { //this.columnNum = context.getConfiguration().getInt("column.min.num", 54); this.columnNum = context.getConfiguration().getInt("column.min.num", 20); } private void handleTotal(String[] fields, Context context) throws IOException, InterruptedException { String deviceId = fields[0]; String deviceType = fields[1]; String mac = fields[2]; String platform = fields[3]; String osVersion = fields[4]; String sdkVersion = fields[5]; String deviceModel = fields[6]; String deviceBrand = fields[7]; String screenSize = fields[8]; String countryCode = fields[9]; String language = fields[10]; String updateTime = fields[12]; // 将UK改为GB,明远需求,修改人冯亮 20170815 if ("GB".equalsIgnoreCase(countryCode)) { countryCode = "UK"; } if (!deviceId.matches(regex)) { CommonMapReduce.setMetrics(context, "DMP", "device_exeptions", 1); } Text text = new Text(Joiner.on("\t").join(deviceType, mac, platform, osVersion, sdkVersion, deviceModel, deviceBrand, screenSize, countryCode, language, "", updateTime)); context.write(new Text(deviceId), text); } protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\t",-1); if (fields.length >= columnNum) { //daily handleClick(fields, context); } else if (fields.length == 13) { //total handleTotal(fields, context); } else { CommonMapReduce.setMetrics(context,"DMP","devivce_exceptions",1); exceptionCount++; } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { if (exceptionCount > 0) { logger.error("exception occurred {} times", exceptionCount); } } } }