package mobvista.prd.datasource.source.mapreduce; import com.google.common.collect.Lists; import mobvista.prd.datasource.table.MergeAppIDMR; import mobvista.prd.datasource.util.MRUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import java.io.File; import java.io.IOException; import java.util.List; /** * Created by Administrator on 2017/4/20 0020. * desc :输出格式: * ga与dsp,ga与M系统,ga与3s,dsp与M系统,dsp与3s,M系统与3s,ga日活,dsp日活,M日活,3s日活 */ public class SourceCrossMR { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); Job job = Job.getInstance(conf, "cross ga dsp 3s m"); job.setJarByClass(MergeAppIDMR.class); job.setNumReduceTasks(1); job.setMapperClass(CrossMRMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(CrossMRReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileInputFormat.addInputPath(job, new Path(otherArgs[1])); FileInputFormat.addInputPath(job, new Path(otherArgs[2])); FileInputFormat.addInputPath(job, new Path(otherArgs[3])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[4])); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static class CrossMRMapper extends Mapper<LongWritable, Text, Text, Text> { Text outKey = new Text(); Text outValue = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t", -1); String inputFile = context.getConfiguration().get("map.input.file"); if (inputFile.contains("ods_ga_device_daily")) { outValue.set("ga"); } else if (inputFile.contains("3s_install_daily")) { outValue.set("3s"); } else if (inputFile.contains("etl_dsp_request_daily")){ outValue.set("dsp"); } else if (inputFile.contains("etl_adn_sdk_request_daily")) { outValue.set("m"); } else { return; } outKey.set(MRUtils.JOINER.join(fields[0], fields[1]));//device_id,device_type context.write(outKey, outValue); } } public static class CrossMRReducer extends Reducer<Text, Text, Text, NullWritable> { Long ga = 0L; Long dsp = 0L; Long m = 0L; Long s = 0L; long gaAndDsp = 0L; long gaAnd3S = 0L; long gaAndM = 0L; long dspAndM = 0L; long dspAnd3S = 0L; long mAnd3S = 0L; long gaid = 0L; long idfa = 0L; long all = 0L; Text outKey = new Text(); public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { List<String> list = Lists.newArrayList(); String[] fields = key.toString().split("\t",-1); String deviceType = fields[1]; if (deviceType.contains("gaid")) { gaid++; } else if (deviceType.contains("idfa")) { idfa++; } for (Text val : values) { list.add(val.toString()); } if (list.contains("ga")) { ga++; } if (list.contains("dsp")) { dsp++; } if (list.contains("m")) { m++; } if (list.contains("3s")) { s++; } if (list.contains("ga") && list.contains("dsp")) { gaAndDsp++; } if (list.contains("ga") && list.contains("m")) { gaAndM++; } if (list.contains("ga") && list.contains("3s")) { gaAnd3S++; } if (list.contains("m") && list.contains("dsp")) { dspAndM++; } if (list.contains("3s") && list.contains("dsp")) { dspAnd3S++; } if (list.contains("m") && list.contains("3s")) { mAnd3S++; } all++; } public void cleanup (Context context) throws IOException, InterruptedException { outKey.set(MRUtils.JOINER.join( gaAndDsp, gaAndM, gaAnd3S, dspAndM, dspAnd3S, mAnd3S, ga, dsp, m, s, gaid, idfa, all )); context.write(outKey,NullWritable.get()); } } }