package mobvista.dmp.datasource.ga.mapreduce; import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.JavaType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; /** * author: houying * date : 16-10-24 * desc : */ public class GaRevenueMR extends Configured implements Tool { private static final Pattern splitPtn = Pattern.compile("\t"); private static final Joiner joiner = Joiner.on("\t").useForNull(""); private static Set<String> illegalValueSet = Sets.newHashSet("0", "0.0", "", "NULL"); private static ObjectMapper objectMapper = new ObjectMapper(); static class RevenueRecord { private double revenue; private String date; public RevenueRecord() { } public RevenueRecord(double revenue, String date) { this.revenue = revenue; this.date = date; } public double getRevenue() { return revenue; } public void setRevenue(double revenue) { this.revenue = revenue; } public String getDate() { return date; } public void setDate(String date) { this.date = date; } } public static class GaRevenueMapper extends Mapper<LongWritable, Text, Text, Text> { private static final Logger logger = LoggerFactory.getLogger(GaRevenueMapper.class); private Pattern datePtn = Pattern.compile("/(\\d{4}/\\d{2}/\\d{2})"); private Text outKey = new Text(); private Text outValue = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] array = splitPtn.split(value.toString()); String path = ((FileSplit)context.getInputSplit()).getPath().toString(); outKey.set(joiner.join(array[0], array[1])); if (array.length == 31) { //daily String revenueWeek = array[27]; String revenueMonth = array[28]; if (!illegalValueSet.contains(revenueWeek) || !illegalValueSet.contains(revenueMonth)) { String date = getDate(path); if (date == null) { return; } outValue.set(joiner.join(date, revenueWeek, revenueMonth)); context.write(outKey, outValue); } } else if (array.length == 4) { outValue.set(joiner.join(array[2], array[3])); context.write(outKey, outValue); } } private String getDate(String path) { Matcher matcher = datePtn.matcher(path); if (matcher.find()) { return matcher.group(1).replace("/", "-"); } else { logger.error("can not find date in path: {}", path); return null; } } } public static class GaRevenueReducer extends Reducer<Text, Text, Text, NullWritable> { private JavaType javaType = objectMapper.getTypeFactory().constructCollectionType(List.class, RevenueRecord.class); private Text outKey = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String[] idAndType = splitPtn.split(key.toString()); List<RevenueRecord> weekList = Lists.newArrayList(); List<RevenueRecord> monthList = Lists.newArrayList(); String oldWeekList = null; String oldMonthList = null; for (Text value: values) { String[] array = splitPtn.split(value.toString()); if (array.length == 2) { //old revenue oldWeekList = array[0]; oldMonthList = array[1]; } else if (array.length == 3) { String date = array[0]; if (!illegalValueSet.contains(array[1])) { weekList.add(new RevenueRecord(Double.valueOf(array[1]), date)); } if (!illegalValueSet.contains(array[2])) { monthList.add(new RevenueRecord(Double.valueOf(array[2]), date)); } } } outKey.set(joiner.join( idAndType[0], idAndType[1], listToJson(mergeRevenueList(oldWeekList, weekList)), listToJson(mergeRevenueList(oldMonthList, monthList)) )); context.write(outKey, NullWritable.get()); } private Collection<RevenueRecord> mergeRevenueList(String oldWeekList, final List<RevenueRecord> list) throws IOException { List<RevenueRecord> revenueRecords = oldWeekList == null ? Collections.<RevenueRecord>emptyList(): (List<RevenueRecord>) objectMapper.readValue(oldWeekList, javaType); Map<String, RevenueRecord> map = Maps.newHashMap(); for (RevenueRecord revenueRecord: revenueRecords) { map.put(revenueRecord.getDate(), revenueRecord); } for (RevenueRecord record: list) { map.put(record.getDate(), record); } return map.values(); } private String listToJson(Collection<RevenueRecord> list) throws IOException { return objectMapper.writeValueAsString(list); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.set("mapreduce.task.io.sort.mb", "500"); conf.set("mapreduce.reduce.java.opts", "-Xmx1536m"); conf.set("mapreduce.reduce.memory.mb", "2048"); conf.set("mapreduce.reduce.shuffle.parallelcopies", "50"); Job job = Job.getInstance(conf, "ga revenue"); job.setJarByClass(GaRevenueMR.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileInputFormat.addInputPath(job, new Path(args[1])); Path outputPath = new Path(args[2]); FileSystem fileSystem = outputPath.getFileSystem(conf); if (fileSystem.exists(outputPath)) { fileSystem.delete(outputPath, true); } FileOutputFormat.setOutputPath(job, outputPath); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(GaRevenueMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(GaRevenueReducer.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new Configuration(), new GaRevenueMR(), args)); } }