package mobvista.prd.datasource.source.mapreduce; import com.google.common.base.Strings; import com.google.common.collect.Maps; import mobvista.dmp.util.MRUtils; import mobvista.prd.datasource.tag.mapreduce.InterestTagJob; import mobvista.prd.datasource.tag.mapreduce.reduce.CountReducer; import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.Map; /** * Created by Administrator on 2017/5/16 0016. * desc : 添加app_name * */ public class CountDspJoinMMR extends Configured implements Tool { private static final Logger log = Logger.getLogger(CountDspJoinMMR.class); public static void main(String[] args) { int exitCode = 0; try { exitCode = ToolRunner.run(new Configuration(), new CountDspJoinMMR(), args); } catch (Exception e) { exitCode = -1; log.error(e); } finally { System.exit(exitCode); } } @Override public int run(String[] args) throws Exception { Options options = buildOptions(); BasicParser parser = new BasicParser(); CommandLine commands = parser.parse(options, args); if (!checkMustOption(commands)) { printUsage(options); return 1; } String input = commands.getOptionValue("input"); String output = commands.getOptionValue("output"); String reduceNumStr = commands.getOptionValue("reduceNum"); String iosFile = commands.getOptionValue("iosFile"); String adrFile = commands.getOptionValue("adrFile"); int reduceNum = Strings.isNullOrEmpty(reduceNumStr) ? 50 : Integer.parseInt(reduceNumStr); log.info("*************************"); log.info("* input = " + input); log.info("* output = " + output); log.info("* reduceNum = " + reduceNum); log.info("*************************"); this.getConf().set("mapreduce.task.io.sort.mb", "512"); this.getConf().set("mapreduce.task.io.sort.factor", "100"); this.getConf().set("mapreduce.reduce.shuffle.parallelcopies", "50"); this.getConf().set("iosFile", iosFile); this.getConf().set("adrFile", adrFile); Job job = Job.getInstance(this.getConf(), "CountDspJoinMMR"); job.setJarByClass(InterestTagJob.class); job.setNumReduceTasks(reduceNum); job.setMapperClass(CrossMRMapper.class); job.setReducerClass(CountReducer.class); job.setCombinerClass(CountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setInputFormatClass(TextInputFormat.class); FileInputFormat.addInputPath(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); return job.waitForCompletion(true) ? 0 : 1; } public Options buildOptions() { Options options = new Options(); options.addOption("input", true, "[must] daily input path "); options.addOption("output", true, "[must] output path"); options.addOption("iosFile", true, "ios app file "); options.addOption("adrFile", true, "adr app file"); options.addOption("reduceNum", true, "number of reducer"); return options; } public boolean checkMustOption(CommandLine commands) { if (!(commands.hasOption("input"))) { log.info("please set input "); return false; } if (!(commands.hasOption("output"))) { log.info("please set output "); return false; } return true; } public void printUsage(Options options) { HelpFormatter help = new HelpFormatter(); help.printHelp("CountDspJoinMMR", options); } public static class CrossMRMapper extends Mapper<LongWritable, Text, Text, LongWritable> { Text outKey = new Text(); LongWritable outVal = new LongWritable(1); Map<String, String> packageMap = Maps.newHashMap(); public void setup(Context context) throws IOException, InterruptedException { //读取CalcPackageDictMR的结果,放入Map中 Configuration conf = context.getConfiguration(); String path = conf.get("iosFile"); readAppPkgAndName(path, conf); path = context.getConfiguration().get("adrFile"); readAppPkgAndName(path, conf); } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\t"); if (fields.length != 5) { return; } String appName = packageMap.get(fields[2]); if (appName == null) { appName = ""; } outKey.set(MRUtils.JOINER.join(fields[3], fields[2], fields[4], appName)); //来源,包名,国家,app_name context.write(outKey,outVal); } protected void readAppPkgAndName(String path, Configuration conf) throws IOException { FileSystem fileSystem = FileSystem.get(URI.create(path), conf); FileStatus[] statuses = fileSystem.listStatus(new Path(path)); BufferedReader reader = null; for (FileStatus status : statuses) { try { reader = new BufferedReader(new InputStreamReader(fileSystem.open(status.getPath()))); String readLine = ""; while ((readLine = reader.readLine()) != null) { String[] arr = readLine.split("\t", -1); packageMap.put(arr[0], arr[1]); } } finally { if (reader != null) { reader.close(); } } } } } }