package mobvista.prd.datasource.source.mapreduce; import com.google.common.base.Strings; import mobvista.dmp.util.MRUtils; import mobvista.prd.datasource.country.interest.mapreduce.CountryInterestCountMR; import mobvista.prd.datasource.tag.mapreduce.InterestTagJob; import mobvista.prd.datasource.tag.mapreduce.reduce.CountReducer; import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; import java.io.IOException; /** * Created by Administrator on 2017/5/16 0016. */ public class CountCountryDspMMR extends Configured implements Tool { private static final Logger log = Logger.getLogger(CountCountryDspMMR.class); public static void main(String[] args) { int exitCode = 0; try { exitCode = ToolRunner.run(new Configuration(), new CountCountryDspMMR(), args); } catch (Exception e) { exitCode = -1; log.error(e); } finally { System.exit(exitCode); } } @Override public int run(String[] args) throws Exception { Options options = buildOptions(); BasicParser parser = new BasicParser(); CommandLine commands = parser.parse(options, args); if (!checkMustOption(commands)) { printUsage(options); return 1; } String input = commands.getOptionValue("input"); String output = commands.getOptionValue("output"); String reduceNumStr = commands.getOptionValue("reduceNum"); int reduceNum = Strings.isNullOrEmpty(reduceNumStr) ? 50 : Integer.parseInt(reduceNumStr); log.info("*************************"); log.info("* input = " + input); log.info("* output = " + output); log.info("* reduceNum = " + reduceNum); log.info("*************************"); this.getConf().set("mapreduce.task.io.sort.mb", "512"); this.getConf().set("mapreduce.task.io.sort.factor", "100"); this.getConf().set("mapreduce.reduce.shuffle.parallelcopies", "50"); Job job = Job.getInstance(this.getConf(), "CountCountryDspMMR"); job.setJarByClass(CountCountryDspMMR.class); job.setNumReduceTasks(reduceNum); job.setMapperClass(CountCountryDspMMapper.class); job.setReducerClass(CountReducer.class); job.setCombinerClass(CountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setInputFormatClass(TextInputFormat.class); FileInputFormat.addInputPath(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); return job.waitForCompletion(true) ? 0 : 1; } public Options buildOptions() { Options options = new Options(); options.addOption("input", true, "[must] daily input path "); options.addOption("output", true, "[must] output path"); options.addOption("reduceNum", true, "number of reducer"); return options; } public boolean checkMustOption(CommandLine commands) { if (!(commands.hasOption("input"))) { log.info("please set input "); return false; } if (!(commands.hasOption("output"))) { log.info("please set output "); return false; } return true; } public void printUsage(Options options) { HelpFormatter help = new HelpFormatter(); help.printHelp("CountCountryDspMMR", options); } public static class CountCountryDspMMapper extends Mapper<LongWritable, Text, Text, LongWritable> { Text outKey = new Text(); LongWritable outValue = new LongWritable(1); public void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = MRUtils.SPLITTER.split(line,-1); outKey.set(MRUtils.JOINER.join(fields[2], fields[3])); context.write(outKey, outValue); } } }