package mobvista.dmp.datasource.ga.mapreduce; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.*; import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** * Created by fl on 2017/5/2. */ public class TestMR extends Configured implements Tool { public static class MyMapper extends Mapper<LongWritable, Text, NullWritable, Text> { private Map<String, String> bundleMap = new ConcurrentHashMap<String, String>(); @Override protected void setup(Context context) throws IOException, InterruptedException { BufferedReader bReader = null; // BufferedReader reader = new try { FileSystem fs = FileSystem.get(context.getConfiguration()); Reader reader = new InputStreamReader(fs.open(new Path("s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/bundle_package_mapping/bundel-pkg.data"))); bReader = new BufferedReader(reader); String line = bReader.readLine(); while (line != null) { String[] splits = StringUtils.splitPreserveAllTokens(line, "\t", -1); bundleMap.put(splits[0], splits[1]); line = bReader.readLine(); } } catch (Exception e) { e.printStackTrace(); } finally { if (bReader != null) { bReader.close(); } } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { } } @Override public int run(String[] args) throws Exception { Configuration conf = this.getConf(); // conf.setBoolean("mapreduce.output.fileoutputformat.compress", true); // conf.setClass("mapreduce.output.fileoutputformat.compress.codec", // GzipCodec.class, CompressionCodec.class); Job job = Job.getInstance(this.getConf(), "test"); job.setNumReduceTasks(0); job.setJarByClass(TestMR.class); job.setMapperClass(MyMapper.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); // job.setInputFormatClass(TextInputFormat.class); // SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK); FileInputFormat.addInputPath(job, new Path(args[0])); FileInputFormat.addInputPath(job, new Path(args[1])); FileOutputFormat.setOutputPath(job, new Path(args[2])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { // System.exit(ToolRunner.run(new Configuration(), new TestMR(), args)); Reader reader = new InputStreamReader(new FileInputStream("/data0/sourceCode/spark-2.2.0-rc4/a.txt")); BufferedReader br = new BufferedReader(reader); String line = br.readLine(); Set<String> set = new HashSet<>(); while (line != null) { set.add(line); System.out.println(line); line = br.readLine(); } System.out.println(set.size()); } }