AdnClickJoinInstallMR.java 7.37 KB
package mobvista.dmp.datasource.adn.mapreduce;

import com.google.common.collect.Maps;
import mobvista.dmp.util.MRUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Map;
import java.util.regex.Pattern;

/**
 * Created by Administrator on 2017/6/13 0013.
 */
public class AdnClickJoinInstallMR {
    public static void main (String[] args) throws InterruptedException, IOException, ClassNotFoundException {
        Configuration conf = new Configuration();
        conf.set("mapreduce.map.speculative", "true");
        conf.set("mapreduce.reduce.speculative", "true");
        conf.set("mapreduce.task.io.sort.mb", "500");
        conf.set("mapreduce.reduce.java.opts", "-Xmx1536m");
        conf.set("mapreduce.reduce.memory.mb", "2048");
        conf.set("mapreduce.reduce.shuffle.parallelcopies", "50");
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        Job job = Job.getInstance(conf, "AdnClickDailyMR");
        job.setJarByClass(AdnClickJoinInstallMR.class);

        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

        job.setMapperClass(AdnClickJoinInstallMapper.class);
        job.setReducerClass(AdnClickJoinInstallReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class AdnClickJoinInstallMapper extends Mapper<LongWritable, Text, Text, Text> {
        private Text outKey = new Text();
        private Text outValue = new Text();
        private Pattern idPtn = Pattern.compile("0*-0*-0*-0*-0*");
        private Pattern splitPtn = Pattern.compile("-");
        private Map<String, String> map = Maps.newHashMap();
        private static final String dataSplit = "\t";

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            BufferedReader reader = null;
            try {
                String path = context.getConfiguration().get("installFile");
                FileSystem fileSystem = FileSystem.get(URI.create(path), context.getConfiguration());
                reader = new BufferedReader(new InputStreamReader(fileSystem.open(new Path(path))));

                String line = "";
                while ((line = reader.readLine()) != null) {
                    String[] lineSplits = MRUtils.SPLITTER.split(line, -1);
                    String deviceId = lineSplits[0];
                    String requestId = lineSplits[3];
                    String campaignId = lineSplits[4];
                    String[] fields = splitPtn.split(deviceId, -1);
                    if (fields.length < 5 || idPtn.matcher(deviceId).matches()) {
                        map.put(requestId + dataSplit + campaignId, line);
                    } else {
                        map.put(line, "1");
                    }
                }
            } finally {
                if (reader != null) {
                    reader.close();
                }
            }
        }


        public void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] array = MRUtils.SPLITTER.split(line, -1);
            String deviceId = array[0];
            String[] didSplits = splitPtn.split(deviceId, -1);
            String requestId = array[2];
            String campaignId = array[3];
            String platform = array[4];
            if (idPtn.matcher(deviceId).matches() || didSplits.length < 5 || "0".equals(requestId) || campaignId.length()<2) {
                return;
            } else {
                String mapKey = requestId + dataSplit + campaignId;
                if (map.containsKey(mapKey)) {
                    String[] fields = MRUtils.SPLITTER.split(map.get(mapKey), -1);
                    String tempPlatform = fields[2].equals("0") ? platform : fields[2];
                    String newLine = MRUtils.JOINER.join(deviceId, array[1], tempPlatform, fields[4], fields[5]); //id, type, platform, campaign_id, package_name
                    map.put(fields[3] + dataSplit + fields[4], newLine); //request_id, campaign_id
                }
            }
        }
        public void cleanup(Context context) throws IOException, InterruptedException {
            for (Map.Entry<String, String> entry : map.entrySet()) {
                if (!"1".equals(entry.getValue())) {
                    // 此时为之前device_id不合法数据
                    outKey.set(entry.getKey());
                    outValue.set(entry.getValue());
                    context.write(outKey, outValue);
                } else {
                    outKey.set(entry.getKey());
                    outValue.set("1");
                    context.write(outKey, outValue);
                }
            }
        }
    }
    public static class AdnClickJoinInstallReducer extends Reducer<Text, Text, Text, NullWritable> {
        private Text outKey = new Text();
        public void reduce (Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            String[] line = MRUtils.SPLITTER.split(key.toString(),-1);
            if (line.length > 2) { //device_id 合法
                outKey.set(MRUtils.JOINER.join(line[0], line[1], line[2], line[4], line[5])); // device_id, device_type, platform, campaign_id, package_name
                context.write(outKey, NullWritable.get());
            } else if (line.length == 2) {  //request_id, campaign_id
                String newLine = "";
                String oldLine = "";
                for (Text val : values) {
                    String[] fields = MRUtils.SPLITTER.split(val.toString(), -1);
                    if (fields.length == 5) {  //device_id, device_type
                        newLine = val.toString();
                        break;
                    } else {
                        oldLine = val.toString();
                    }
                }
                if (!newLine.equals("")) {
                    outKey.set(newLine);
                    context.write(outKey, NullWritable.get());
                } else if (!oldLine.equals("")) {
                    String[] lineSplits = MRUtils.SPLITTER.split(oldLine, -1);
                    outKey.set(MRUtils.JOINER.join(lineSplits[0], lineSplits[1], lineSplits[2], lineSplits[4], lineSplits[5]));
                    context.write(outKey, NullWritable.get());
                }
            }
        }
    }
}