package mobvista.dmp.datasource.ga.mapreduce; import mobvista.dmp.common.CommonMapReduce; import mobvista.dmp.common.CommonMapper; import mobvista.dmp.common.InstallTotalReducer; import mobvista.dmp.util.MRUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.regex.Matcher; import java.util.regex.Pattern; /** * author: houying * date : 16-11-8 * desc : */ public class GaInstallTotalMR extends CommonMapReduce { public GaInstallTotalMR(String name, Class<? extends Mapper> mapperClass, Class<? extends Reducer> reducerClass) { super(name, mapperClass, reducerClass); } public static void main(String[] args) throws Exception { start(new GaInstallTotalMR("ga install list total job", GaInstallTotalMapper.class, InstallTotalReducer.class), args); } @Override protected void otherSetting(Job job, String[] args) { job.setInputFormatClass(TextInputFormat.class); } @Override protected void setOutputPath(Job job, String[] args) throws IOException { FileOutputFormat.setOutputPath(job, new Path(args[2])); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); } @Override protected void setInputPath(Job job, String[] args) throws IOException { FileInputFormat.addInputPath(job, new Path(args[0])); FileInputFormat.addInputPath(job, new Path(args[1])); } private static class GaInstallTotalMapper extends CommonMapper { private Pattern datePtn = Pattern.compile("/(\\d{4}/\\d{2}/\\d{2})"); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] array = MRUtils.SPLITTER.split(value.toString()); if (array[0].matches(CommonMapReduce.didPtn) && !array[0].equals(CommonMapReduce.allZero)) { if (array.length == 5) { // ga install daily outKey.set(MRUtils.JOINER.join( array[0], array[1], fixPlatform(array[2]) )); String path = ((FileSplit)context.getInputSplit()).getPath().toString(); Matcher m = datePtn.matcher(path); if (!m.find()) { return; } String date = m.group(1).replace("/","-"); outValue.set(MRUtils.JOINER.join( array[3], date )); } else if (array.length == 4) { // ga install total outKey.set(MRUtils.JOINER.join( array[0], array[1], fixPlatform(array[2]) )); outValue.set(array[3]); } else { CommonMapReduce.setMetrics(context,"DMP","column_num_error",1); return; } context.write(outKey, outValue); } else { CommonMapReduce.setMetrics(context,"DMP","devivce_exceptions",1); } } private String fixPlatform(String platform) { if (platform.equals("adr")) { return "android"; } else { return platform; } } } }