TestMR.java 4.05 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
package mobvista.dmp.datasource.ga.mapreduce;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Created by fl on 2017/5/2.
 */
public class TestMR extends Configured implements Tool {

    public static class MyMapper extends Mapper<LongWritable, Text, NullWritable, Text> {

        private Map<String, String> bundleMap = new ConcurrentHashMap<String, String>();

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            BufferedReader bReader = null;
//            BufferedReader reader = new
            try {
                FileSystem fs = FileSystem.get(context.getConfiguration());
                Reader reader = new InputStreamReader(fs.open(new Path("s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/bundle_package_mapping/bundel-pkg.data")));
                bReader = new BufferedReader(reader);
                String line = bReader.readLine();
                while (line != null) {
                    String[] splits = StringUtils.splitPreserveAllTokens(line, "\t", -1);
                    bundleMap.put(splits[0], splits[1]);
                    line = bReader.readLine();
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (bReader != null) {
                    bReader.close();
                }
            }
        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = this.getConf();
//        conf.setBoolean("mapreduce.output.fileoutputformat.compress", true);
//        conf.setClass("mapreduce.output.fileoutputformat.compress.codec",
//                GzipCodec.class, CompressionCodec.class);

        Job job = Job.getInstance(this.getConf(), "test");
        job.setNumReduceTasks(0);
        job.setJarByClass(TestMR.class);
        job.setMapperClass(MyMapper.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);

//        job.setInputFormatClass(TextInputFormat.class);
//        SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileInputFormat.addInputPath(job, new Path(args[1]));
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
//        System.exit(ToolRunner.run(new Configuration(), new TestMR(), args));
        Reader reader = new InputStreamReader(new FileInputStream("/data0/sourceCode/spark-2.2.0-rc4/a.txt"));
        BufferedReader br = new BufferedReader(reader);
        String line = br.readLine();
        Set<String> set = new HashSet<>();
        while (line != null) {
            set.add(line);
            System.out.println(line);
            line = br.readLine();
        }

        System.out.println(set.size());
    }
}