package mobvista.prd.datasource.table; import com.google.common.collect.Interner; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import mobvista.dmp.util.MRUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.List; import java.util.Map; /** * Created by Administrator on 2017/3/9 0009. * desc:根据setting日志得到的app_id与pkg_name对应关系表join抓取到的app_id,app_name,platform,impression * 如果一个pkg_name对应多个app_id,则取impression大的app_id * 得到app_id,pkg_name,app_name */ public class MergeAppIDMR { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); conf.set("file", otherArgs[2]); Job job = Job.getInstance(conf, "MergeAppIDMR"); job.setJarByClass(MergeAppIDMR.class); job.setMapperClass(MergeAppMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(MergeAppReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static class MergeAppMapper extends Mapper<LongWritable, Text, Text, Text> { Map<String, String> packageMap = Maps.newHashMap(); Text outKey = new Text(); Text outValue = new Text(); public void setup(Context context) throws IOException, InterruptedException { //读取CalcPackageDictMR的结果,放入Map中 String path = context.getConfiguration().get("file"); FileSystem fileSystem = FileSystem.get(URI.create(path), context.getConfiguration()); BufferedReader reader = new BufferedReader(new InputStreamReader(fileSystem.open(new Path(path)))); String readLine = ""; while ((readLine = reader.readLine()) != null) { String[] arr = readLine.split("\t", -1); packageMap.put(arr[2], arr[0] + "\t" + arr[1] + "\t" + arr[3]); } reader.close(); } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { try { String line = value.toString(); if (line.contains("\t")) { String[] fields = line.split("\t", -1); String appName = packageMap.get(fields[0]); if (appName == null) { return; } if (fields[1] == null || fields[1].equals("") || fields[1].length() == 0) { return; } outKey.set(fields[0]); outValue.set(fields[1] + "\t" + appName); context.write(outKey, outValue); } else if (line.contains(",")) { String[] fields = line.split(",", -1); String appName = packageMap.get(fields[0]); //app_id if (appName == null) { return; } if (fields[1] == null || fields[1].equals("") || fields[1].length() == 0) { return; } outKey.set(fields[1]); outValue.set(fields[0] + "\t" + appName); //app_id,platform,app_name,impression context.write(outKey, outValue); } } catch (Exception e) { return; } } } public static class MergeAppReducer extends Reducer<Text, Text, Text, NullWritable> { Text outKey = new Text(); public void reduce (Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { List<String> list = Lists.newArrayList(); for (Text val : values) { list.add(val.toString()); } String out = ""; if (list.size() > 1) { int max = 0; for (String line : list) { String[] arr = line.split("\t",-1); String num = arr[3].replace(",",""); if (Integer.parseInt(num) > max) { max = Integer.parseInt(num); out = arr[0] + "\t" + arr[2]; } } } else { String[] arr = list.get(0).split("\t",-1); out = arr[0] + "\t" + arr[2]; } String[] fields = out.split("\t",-1); outKey.set(MRUtils.JOINER.join(fields[0],key,fields[1])); //app_id,pkg_name,app_name context.write(outKey, NullWritable.get()); } } }