package mobvista.dmp.datasource.ga.mapreduce; import com.google.common.base.Strings; import mobvista.dmp.datasource.ga.mapreduce.map.GaActiveTotalMapper; import mobvista.dmp.datasource.ga.mapreduce.reduce.GaActiveTotalReducer; import mobvista.dmp.datasource.ga.mapreduce.vo.TextPair; import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; /** * merge ga daily device data into total * Created by fl on 2017/4/26. */ public class GaActiveTotalMR extends Configured implements Tool { private static final Logger log = Logger.getLogger(GaActiveTotalMR.class); @Override public int run(String[] args) throws Exception { Options options = buildOptions(); BasicParser parser = new BasicParser(); CommandLine commands = parser.parse(options, args); if (!checkMustOption(commands)) { printUsage(options); return 1; } String dailypath = commands.getOptionValue("dailypath"); String totalpath = commands.getOptionValue("totalpath"); String outputpath = commands.getOptionValue("outputpath"); String reduceNumStr = commands.getOptionValue("reduceNum"); int reduceNum = Strings.isNullOrEmpty(reduceNumStr) ? 50 : Integer.parseInt(reduceNumStr); log.info("*************************"); log.info("* dailypath = " + dailypath); log.info("* totalpath = " + totalpath); log.info("* outputpath = " + outputpath); log.info("* reduceNum = " + reduceNum); log.info("*************************"); this.getConf().set("mapreduce.task.io.sort.mb", "500"); this.getConf().set("mapreduce.reduce.java.opts", "-Xmx1536m"); this.getConf().set("mapreduce.reduce.memory.mb", "2048"); this.getConf().set("mapreduce.reduce.shuffle.parallelcopies", "50"); Job job = Job.getInstance(this.getConf(), "GaActiveTotalMR"); job.setNumReduceTasks(reduceNum); job.setJarByClass(GaActiveTotalMR.class); job.setMapperClass(GaActiveTotalMapper.class); job.setReducerClass(GaActiveTotalReducer.class); job.setMapOutputKeyClass(TextPair.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setGroupingComparatorClass(TextPair.FirstComparator.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(dailypath)); FileInputFormat.addInputPath(job, new Path(totalpath)); FileOutputFormat.setOutputPath(job, new Path(outputpath)); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); return job.waitForCompletion(true) ? 0 : 1; } public Options buildOptions() { Options options = new Options(); options.addOption("dailypath", true, "[must] daily input path "); options.addOption("totalpath", true, "[must] total input path"); options.addOption("outputpath", true, "[must] output path"); options.addOption("reduceNum", true, "number of reducer"); return options; } public boolean checkMustOption(CommandLine commands) { if (!(commands.hasOption("dailypath"))) { log.info("please set dailypath "); return false; } if (!(commands.hasOption("totalpath"))) { log.info("please set totalpath "); return false; } if (!(commands.hasOption("outputpath"))) { log.info("please set outputpath "); return false; } return true; } public void printUsage(Options options) { HelpFormatter help = new HelpFormatter(); help.printHelp("HotNewsSortJob", options); } public static void main(String[] args) { int exitCode = 0; try { exitCode = ToolRunner.run(new Configuration(), new GaActiveTotalMR(), args); } catch (Exception e) { exitCode = -1; log.error(e); } finally { System.exit(exitCode); } } }