AdnClickJoinInstallJob.java 8.33 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
package mobvista.dmp.datasource.adn.mapreduce;

import mobvista.dmp.common.CommonMapReduce;
import mobvista.dmp.datasource.ga.mapreduce.vo.TextPair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;

/**
 * Created by fl on 2017/6/21.
 */
public class AdnClickJoinInstallJob extends Configured implements Tool {


    public static class AdnClickJoinInstallMapper extends Mapper<LongWritable, Text, TextPair, Text> {
        private Text outValue = new Text();
        private static final String dataSplit = "\t";
        private Pattern splitPtn = Pattern.compile("-");
        private StringBuffer keyBuffer = new StringBuffer();
        private StringBuffer valBuffer = new StringBuffer();
        private Pattern pattern = Pattern.compile(dataSplit);
        private Pattern idPtn = Pattern.compile("0*-0*-0*-0*-0*");

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] dataSplits = pattern.split(value.toString(), -1);
            String filePath = context.getConfiguration().get("map.input.file");
            if (filePath.contains("merge_click_pre_click")) {
                //此时为click数据
                String deviceId = dataSplits[0];
                String deviceType = dataSplits[1];
                String requestId = dataSplits[2];
                String campaignId = dataSplits[3];
                String platform = dataSplits[4];

                String[] didSplits = splitPtn.split(deviceId, -1);
                if (idPtn.matcher(deviceId).matches() || didSplits.length < 5 || "0".equals(requestId) || campaignId.length()<2 ) {
                    return;
                } else {

                    keyBuffer.setLength(0);
                    keyBuffer.append(requestId).append(dataSplit).append(campaignId);

                    valBuffer.setLength(0);
                    valBuffer.append(deviceId).append(dataSplit).append(deviceType).append(dataSplit).append(platform).append("2");

                    TextPair outKey = new TextPair(keyBuffer.toString(), "2");
                    outValue.set(valBuffer.toString());
                    context.write(outKey, outValue);
                }
            } else {
                // install数据
                String requestId = dataSplits[3];
                String campaignId = dataSplits[4];

                keyBuffer.setLength(0);
                keyBuffer.append(requestId).append(dataSplit).append(campaignId);

                valBuffer.setLength(0);
                valBuffer.append(value.toString()).append(dataSplit).append("1");

                TextPair outKey = new TextPair(keyBuffer.toString(), "1");
                outValue.set(valBuffer.toString());
                context.write(outKey, outValue);
            }
        }
    }

    public static class AndClickJoinInstallReducer extends Reducer<TextPair, Text, NullWritable, Text> {
        private Text outValue = new Text();
        private static final String dataSplit = "\t";
        private StringBuffer buffer = new StringBuffer();
        private Pattern splitPtn = Pattern.compile("-");
        private Pattern pattern = Pattern.compile(dataSplit);
        private static final String didRegex = "0*-0*-0*-0*-0*";
        private List<String[]> installMessges = new ArrayList<String[]>();

        @Override
        protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

            installMessges.clear();
            boolean isFirst = true;
            String[] valSplits = null;
            for (Text val : values) {
                valSplits = pattern.split(val.toString(), -1);
                String dataSource = valSplits[valSplits.length - 1];
                if (isFirst && "2".equals(dataSource)) {
                    return;
                } else {
                    if ("1".equals(dataSource)) {
                        installMessges.add(valSplits);
                    } else {
                        for (String[] msgrr : installMessges) {
                            String deviceId = msgrr[0];
                            String campainId = msgrr[4];
                            String pkgName = msgrr[5];
                            buffer.setLength(0);
                            if (deviceId.matches(didRegex) || splitPtn.split(deviceId).length < 5 ) { //  deviceId小于5的也包括新增androidId和imei两种类型,不改变之前逻辑下输出这两种类型
                                if( msgrr[1].equalsIgnoreCase("imei") || msgrr[1].equalsIgnoreCase("androidid") || msgrr[1].equalsIgnoreCase("oaid") ){   // 2020.07.27 加入oaid逻辑
                                    buffer.append(msgrr[0]).append(dataSplit).append(msgrr[1]).append(dataSplit).append(msgrr[2])
                                            .append(dataSplit).append(msgrr[4]).append(dataSplit).append(msgrr[5]);
                                    outValue.set(buffer.toString());
                                    context.write(NullWritable.get(), outValue);
                                }else {
                                    buffer.append(valSplits[0]).append(dataSplit).append(valSplits[1]).append(dataSplit)
                                            .append(valSplits[2]).append(dataSplit).append(campainId).append(dataSplit).append(pkgName);
                                    outValue.set(buffer.toString());
                                    context.write(NullWritable.get(), outValue);
                                }
                            } else {
                                buffer.append(msgrr[0]).append(dataSplit).append(msgrr[1]).append(dataSplit).append(msgrr[2])
                                        .append(dataSplit).append(msgrr[4]).append(dataSplit).append(msgrr[5]);
                                outValue.set(buffer.toString());
                                context.write(NullWritable.get(), outValue);
                            }
                        }
                        return;
                    }
                }
                isFirst = false;
            }
        }
    }


    @Override
    public int run(String[] args) throws Exception {
        this.getConf().set("mapreduce.map.speculative", "true");
        this.getConf().set("mapreduce.reduce.speculative", "true");
        this.getConf().set("mapreduce.task.io.sort.mb", "500");
        this.getConf().set("mapreduce.reduce.java.opts", "-Xmx1536m");
        this.getConf().set("mapreduce.reduce.memory.mb", "2048");
        this.getConf().set("mapreduce.reduce.shuffle.parallelcopies", "50");
        Job job = Job.getInstance(this.getConf(), "AdnClickJoinInstallJob");
        job.setJarByClass(AdnClickJoinInstallJob.class);
        job.setNumReduceTasks(100);
        job.setMapperClass(AdnClickJoinInstallMapper.class);
        job.setReducerClass(AndClickJoinInstallReducer.class);
        job.setMapOutputKeyClass(TextPair.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);
        job.setGroupingComparatorClass(TextPair.FirstComparator.class);
        job.setPartitionerClass(TextPair.FirstPartitioner.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileInputFormat.addInputPath(job, new Path(args[1]));
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) {

       int exitCode = 0;
        try {
            exitCode = ToolRunner.run(new Configuration(), new AdnClickJoinInstallJob(), args);
        } catch (Exception e) {
            exitCode = -1;
            e.printStackTrace();
        } finally {
            System.exit(exitCode);
        }
    }
}