CalcDeviceGenderMR.java 5.67 KB
package mobvista.dmp.datasource.gender;

import com.google.common.collect.Maps;
import mobvista.dmp.format.TextMultipleOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Map;

public class CalcDeviceGenderMR {
    /**
     * author:liushuai
     * date:2017-01-18
     * desc  :根据标签计算比例
     */
    public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
        Configuration conf = new Configuration();
        conf.set("mapreduce.map.speculative", "true");
        conf.set("mapreduce.reduce.speculative", "true");
        conf.set("mapreduce.task.io.sort.mb", "500");
        conf.set("mapreduce.reduce.java.opts", "-Xmx1536m");
        conf.set("mapreduce.reduce.memory.mb", "2048");
        conf.set("mapreduce.reduce.shuffle.parallelcopies", "50");
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        conf.set("file", otherArgs[2]);
        Job job = Job.getInstance(conf, "CalcDeviceGenderMR");
        job.setJarByClass(CalcDeviceGenderMR.class);

        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
        job.setMapperClass(CalcDeviceGenderMapper.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        job.setOutputFormatClass(TextMultipleOutputFormat.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class CalcDeviceGenderMapper extends Mapper<LongWritable, Text, Text, Text> {
        Map<String, String> packageMap = Maps.newHashMap();
        Text outKey = new Text();
        Text outKey_v2 = new Text();
        Text outValue = new Text();
        private String outPath;
        private String outPath_v2;

        public void setup(Context context) throws IOException, InterruptedException {
            //读取CalcPackageDictMR的结果,放入Map中
            String path = context.getConfiguration().get("file");
            FileSystem fileSystem = FileSystem.get(URI.create(path), context.getConfiguration());
            BufferedReader reader = new BufferedReader(new InputStreamReader(fileSystem.open(new Path(path))));

            String readLine = "";
            while ((readLine = reader.readLine()) != null) {
                String[] arr = MRUtils.SPLITTER.split(readLine, -1);
                packageMap.put(arr[0], arr[1] + "\t" + arr[2]);
            }
            reader.close();
        }

        public void map(LongWritable key, Text value, Context context) {
            Configuration conf = context.getConfiguration();
            outPath = conf.get("outPath");
            outPath_v2 = conf.get("outPath_v2");
            try {
                String label = "";
                String line = value.toString();
                String[] fields = line.split("\t", -1);
                String[] arr = fields[2].split("#", -1);
                double ratio = -1;
                double score = 0.0;
                int num = 0;
                String tag = "unknown";
                for (int i = 0; i < arr.length; i++) {
                    if (arr[i].length() < 2) {
                        continue;
                    }
                    if (packageMap.containsKey(arr[i])) {
                        String[] dictValue = MRUtils.SPLITTER.split(packageMap.get(arr[i]));
                        if (dictValue[1].equals("confirm")) {
                            score = Double.parseDouble(dictValue[0]);
                            tag = dictValue[1];
                            num = 1;
                            break;
                        } else {
                            num += 1;
                            score += Double.parseDouble(dictValue[0]);
                            tag = dictValue[1];
                        }
                    }
                }
                if (num >= 1) {
                    ratio = score / num;
                }
                if (ratio == -1) {
                    label = "o";
                } else if (ratio >= 0.5) {
                    label = "m";
                } else if (ratio < 0.5) {
                    label = "f";
                    ratio = 1 - ratio;
                }
                String packageName = fields[2].replace("#", ",");//以逗号分隔
                outKey.set(outPath + "," + MRUtils.JOINER.join(fields[0], fields[1], packageName, fields[3]));  //  加入
                // update_date 字段
                outValue.set(MRUtils.JOINER.join(label, ratio, tag));
                context.write(outKey, outValue);

                outKey_v2.set(outPath_v2 + "," + MRUtils.JOINER.join(fields[0], fields[1], packageName, fields[3],
                        fields[4].split("#")[1]));  //  加入 update_date 字段
                context.write(outKey_v2, outValue);
            } catch (Exception e) {
                return;
            }
        }
    }
}