package mobvista.dmp.datasource.zarola.mapreduce; import com.google.common.collect.Sets; import mobvista.dmp.common.CommonMapReduce; import mobvista.dmp.util.MRUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.JavaType; import java.io.IOException; import java.util.ArrayList; import java.util.Set; /** * Created by shuai.liu on 2017/3/14 0014. */ public class ZarolaInstallDailyMR { public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); Job job = Job.getInstance(conf, "Zarola_Install_Daily"); job.setJarByClass(ZarolaInstallDailyMR.class); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); job.setMapperClass(ZarolaInstallTotalMapper.class); job.setReducerClass(ZarolaInstallTotalReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static class ZarolaInstallTotalMapper extends Mapper<LongWritable, Text, Text, Text> { String gaid = "gaid"; String idfa = "idfa"; String ios = "ios"; String android = "android"; Text outKey = new Text(); Text outValue = new Text(); protected String regex = "^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$"; public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); if (!line.matches(regex)) { CommonMapReduce.setMetrics(context,"DMP","devivce_exceptions",1); return; } String inputFile = context.getConfiguration().get("map.input.file"); if (inputFile.contains("ios")) { outKey.set(MRUtils.JOINER.join(line, idfa, ios)); outValue.set("624639017"); } else if (inputFile.contains("android")) { outKey.set(MRUtils.JOINER.join(line, gaid, android)); outValue.set("com.zalora.android"); } else { return; } context.write(outKey, outValue); } } public static class ZarolaInstallTotalReducer extends Reducer<Text, Text, Text, NullWritable> { private ObjectMapper objectMapper; Text outKey = new Text(); protected void setup(Context context) throws IOException, InterruptedException { this.objectMapper = new ObjectMapper(); } public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Set<String> tmpList = Sets.newHashSet(); for (Text value : values) { tmpList.add(value.toString()); } outKey.set(MRUtils.JOINER.join(key.toString(), objectMapper.writeValueAsString(tmpList))); context.write(outKey, NullWritable.get()); } } }