MergeAppIDMR.java 5.65 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
package mobvista.prd.datasource.table;

import com.google.common.collect.Interner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import mobvista.dmp.util.MRUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.List;
import java.util.Map;

/**
 * Created by Administrator on 2017/3/9 0009.
 * desc:根据setting日志得到的app_id与pkg_name对应关系表join抓取到的app_id,app_name,platform,impression
 *       如果一个pkg_name对应多个app_id,则取impression大的app_id
 *       得到app_id,pkg_name,app_name
 */
public class MergeAppIDMR {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        conf.set("file", otherArgs[2]);

        Job job = Job.getInstance(conf, "MergeAppIDMR");

        job.setJarByClass(MergeAppIDMR.class);

        job.setMapperClass(MergeAppMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(MergeAppReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class MergeAppMapper extends Mapper<LongWritable, Text, Text, Text> {
        Map<String, String> packageMap = Maps.newHashMap();
        Text outKey = new Text();
        Text outValue = new Text();

        public void setup(Context context) throws IOException, InterruptedException {
            //读取CalcPackageDictMR的结果,放入Map中
            String path = context.getConfiguration().get("file");
            FileSystem fileSystem = FileSystem.get(URI.create(path), context.getConfiguration());
            BufferedReader reader = new BufferedReader(new InputStreamReader(fileSystem.open(new Path(path))));
            String readLine = "";
            while ((readLine = reader.readLine()) != null) {
                String[] arr = readLine.split("\t", -1);
                packageMap.put(arr[2], arr[0] + "\t" + arr[1] + "\t" + arr[3]);
            }
            reader.close();
        }

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            try {
                String line = value.toString();
                if (line.contains("\t")) {
                    String[] fields = line.split("\t", -1);
                    String appName = packageMap.get(fields[0]);
                    if (appName == null) {
                        return;
                    }
                    if (fields[1] == null || fields[1].equals("") || fields[1].length() == 0) {
                        return;
                    }

                    outKey.set(fields[0]);
                    outValue.set(fields[1] + "\t" + appName);
                    context.write(outKey, outValue);
                } else if (line.contains(",")) {
                    String[] fields = line.split(",", -1);
                    String appName = packageMap.get(fields[0]); //app_id
                    if (appName == null) {
                        return;
                    }
                    if (fields[1] == null || fields[1].equals("") || fields[1].length() == 0) {
                        return;
                    }
                    outKey.set(fields[1]);
                    outValue.set(fields[0] + "\t" + appName); //app_id,platform,app_name,impression
                    context.write(outKey, outValue);
                }
            } catch (Exception e) {
                return;
            }
        }
    }
    public static class MergeAppReducer extends Reducer<Text, Text, Text, NullWritable> {
        Text outKey = new Text();

        public void reduce (Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            List<String> list = Lists.newArrayList();
            for (Text val : values) {
                list.add(val.toString());
            }
            String out = "";
            if (list.size() > 1) {
                int max = 0;
                for (String line : list) {
                    String[] arr = line.split("\t",-1);
                    String num = arr[3].replace(",","");
                    if (Integer.parseInt(num) > max) {
                        max = Integer.parseInt(num);
                        out = arr[0] + "\t" + arr[2];
                    }
                }
            } else {
                String[] arr = list.get(0).split("\t",-1);
                out = arr[0] + "\t" + arr[2];
            }
            String[] fields = out.split("\t",-1);
            outKey.set(MRUtils.JOINER.join(fields[0],key,fields[1])); //app_id,pkg_name,app_name
            context.write(outKey, NullWritable.get());
        }
    }
}