AdnRequestDeviceTotalMR.java 7.93 KB
package mobvista.dmp.datasource.adn.mapreduce;


import com.google.common.base.Joiner;
import mobvista.dmp.common.CommonMapReduce;
import mobvista.dmp.common.CommonMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.joda.time.format.DateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.regex.Pattern;


public class AdnRequestDeviceTotalMR extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new Configuration(), new AdnRequestDeviceTotalMR(), args));
    }

    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        conf.set("mapreduce.map.speculative", "true");
        conf.set("mapreduce.reduce.speculative", "true");
        conf.set("mapreduce.task.io.sort.mb", "500");
        conf.set("mapreduce.reduce.java.opts", "-Xmx1536m");
        conf.set("mapreduce.reduce.memory.mb", "2048");
        conf.set("mapreduce.reduce.shuffle.parallelcopies", "50");

        Job job = Job.getInstance(conf, "adn total");
        job.setJarByClass(AdnRequestDeviceTotalMR.class);
        Path outputPath = new Path(strings[2]);
        FileInputFormat.addInputPath(job, new Path(strings[0]));
        FileInputFormat.addInputPath(job, new Path(strings[1]));
        FileSystem fileSystem = outputPath.getFileSystem(conf);
        if (fileSystem.exists(outputPath)) {
            fileSystem.delete(outputPath, true);
        }
        FileOutputFormat.setOutputPath(job, outputPath);
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

        //job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(AdnTotalMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(AdnReducer.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static class AdnReducer extends Reducer<Text, Text, Text, Text> {
        private Pattern splitPtn = Pattern.compile("\t");

        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            String deviceId = key.toString();
            String deviceType = null;
            String mac = null;
            String platform = null;
            String osVersion = null;
            String sdkVersion = null;
            String deviceModel = null;
            String deviceBrand = null;
            String screenSize = null;
            String countryCode = null;
            String language = null;
            String updateTime = null;
            for (Text value: values) {
                final String[] fields = splitPtn.split(value.toString());
                if (fields.length == 12) { //total表
                    if (updateTime == null || updateTime.compareTo(fields[11]) < 0) {
                        deviceType = fields[0];
                        mac = fields[1];
                        platform = fields[2];
                        osVersion = fields[3];
                        sdkVersion = fields[4];
                        deviceModel = fields[5];
                        deviceBrand = fields[6];
                        screenSize = fields[7];
                        countryCode = fields[8];
                        language = fields[9];
                        updateTime = fields[11];
                    }
                } else if (fields.length == 11) {//daily
                    String timestamp = fields[10];
                    final String datetime = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").print(Long.valueOf(timestamp + "000"));
                    if (updateTime == null || updateTime.compareTo(datetime) < 0) {
                        deviceType = fields[0];
                        mac = fields[1];
                        platform = fields[2];
                        osVersion = fields[3];
                        sdkVersion = fields[4];
                        deviceModel = fields[5];
                        deviceBrand = fields[6];
                        screenSize = fields[7];
                        countryCode = fields[8];
                        language = fields[9];
                        updateTime = datetime;
                    }
                }
            }
            if (deviceType == null || !(deviceType.equals("idfa") || deviceType.equals("gaid") || deviceType.equals("imei"))) {
                return;
            }
            context.write(new Text(deviceId), new Text(Joiner.on("\t").join(deviceType, mac, platform, osVersion,
                    sdkVersion, deviceModel, deviceBrand, screenSize, countryCode, language,
                    updateTime, updateTime)));
        }
    }

    public static class AdnTotalMapper extends AdnMapper {
        private static final Logger logger = LoggerFactory.getLogger(AdnTotalMapper.class);
        private int exceptionCount = 0;
        private int columnNum;

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            //this.columnNum = context.getConfiguration().getInt("column.min.num", 54);
            this.columnNum = context.getConfiguration().getInt("column.min.num", 20);
        }

        private void handleTotal(String[] fields, Context context) throws IOException, InterruptedException {
            String deviceId = fields[0];
            String deviceType = fields[1];
            String mac = fields[2];
            String platform = fields[3];
            String osVersion = fields[4];
            String sdkVersion = fields[5];
            String deviceModel = fields[6];
            String deviceBrand = fields[7];
            String screenSize = fields[8];
            String countryCode = fields[9];
            String language = fields[10];
            String updateTime = fields[12];

            // 将UK改为GB,明远需求,修改人冯亮 20170815
            if ("GB".equalsIgnoreCase(countryCode)) {
                countryCode = "UK";
            }

            if (!deviceId.matches(regex)) {
                CommonMapReduce.setMetrics(context, "DMP", "device_exeptions", 1);
            }

            Text text = new Text(Joiner.on("\t").join(deviceType, mac, platform, osVersion, sdkVersion, deviceModel,
                    deviceBrand, screenSize, countryCode, language, "", updateTime));
            context.write(new Text(deviceId), text);
        }

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] fields = value.toString().split("\t",-1);
            if (fields.length >= columnNum) { //daily
                handleClick(fields, context);
            } else if (fields.length == 13) { //total
                handleTotal(fields, context);
            } else {
                CommonMapReduce.setMetrics(context,"DMP","devivce_exceptions",1);
                exceptionCount++;
            }
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            if (exceptionCount > 0) {
                logger.error("exception occurred {} times", exceptionCount);
            }
        }
    }
}