package mobvista.prd.datasource.country.interest.mapreduce; import com.google.common.base.Strings; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import mobvista.prd.datasource.tag.mapreduce.InterestTagJob; import mobvista.prd.datasource.tag.mapreduce.reduce.CountReducer; import mobvista.prd.datasource.util.GsonUtil; import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; import java.io.IOException; import java.util.HashSet; import java.util.Set; import java.util.regex.Pattern; /** * Created by liushuai on 2017/5/12 0012. * desc :统计分国家的兴趣数量 */ public class CountryInterestCountMR extends Configured implements Tool { private static final Logger log = Logger.getLogger(CountryInterestCountMR.class); public static void main(String[] args) { int exitCode = 0; try { exitCode = ToolRunner.run(new Configuration(), new CountryInterestCountMR(), args); } catch (Exception e) { exitCode = -1; log.error(e); } finally { System.exit(exitCode); } } @Override public int run(String[] args) throws Exception { Options options = buildOptions(); BasicParser parser = new BasicParser(); CommandLine commands = parser.parse(options, args); if (!checkMustOption(commands)) { printUsage(options); return 1; } String input = commands.getOptionValue("input"); String output = commands.getOptionValue("output"); String reduceNumStr = commands.getOptionValue("reduceNum"); int reduceNum = Strings.isNullOrEmpty(reduceNumStr) ? 50 : Integer.parseInt(reduceNumStr); log.info("*************************"); log.info("* input = " + input); log.info("* output = " + output); log.info("* reduceNum = " + reduceNum); log.info("*************************"); this.getConf().set("mapreduce.task.io.sort.mb", "512"); this.getConf().set("mapreduce.task.io.sort.factor", "100"); this.getConf().set("mapreduce.reduce.shuffle.parallelcopies", "50"); this.getConf().set("mapreduce.reduce.memory.mb", "1536"); Job job = Job.getInstance(this.getConf(), "CountryInterestCountMR"); job.setJarByClass(InterestTagJob.class); job.setNumReduceTasks(reduceNum); job.setMapperClass(CountryInterestCountMapper.class); job.setReducerClass(CountReducer.class); job.setCombinerClass(CountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setInputFormatClass(TextInputFormat.class); FileInputFormat.addInputPath(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); return job.waitForCompletion(true) ? 0 : 1; } public Options buildOptions() { Options options = new Options(); options.addOption("input", true, "[must] daily input path "); options.addOption("output", true, "[must] output path"); options.addOption("reduceNum", true, "number of reducer"); return options; } public boolean checkMustOption(CommandLine commands) { if (!(commands.hasOption("input"))) { log.info("please set input "); return false; } if (!(commands.hasOption("output"))) { log.info("please set output "); return false; } return true; } public void printUsage(Options options) { HelpFormatter help = new HelpFormatter(); help.printHelp("CountryInterestCountMR", options); } public static class CountryInterestCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> { private Text outKey = new Text(); private static final String dataSplit = "\t"; private StringBuilder builder = new StringBuilder(); private LongWritable outVal = new LongWritable(1); private Pattern pattern = Pattern.compile(dataSplit); private Set<String> set = new HashSet<String>(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { try { String[] valSplits = pattern.split(value.toString(), -1); if (valSplits.length != 4) { return; } set.clear(); JsonArray array = GsonUtil.String2JsonArray(valSplits[3]); for (JsonElement element : array) { JsonObject obj = element.getAsJsonObject(); JsonArray tagArray = obj.get("tag").getAsJsonArray(); for (JsonElement tagElement : tagArray) { JsonObject tagObj = tagElement.getAsJsonObject(); String firstLevel = tagObj.get("1") != null ? tagObj.get("1").getAsString() : ""; String sencondLevel = tagObj.get("2") != null ? tagObj.get("2").getAsString() : ""; builder.setLength(0); builder.append(valSplits[2]).append(dataSplit).append(firstLevel).append(dataSplit).append(sencondLevel); set.add(builder.toString()); } } for (String temp : set) { outKey.set(temp); context.write(outKey, outVal); } } catch (Exception e) { throw new RuntimeException(e); } } } }