package mobvista.dmp.datasource.adn.mapreduce; import com.google.common.base.Preconditions; import mobvista.dmp.common.CommonMapReduce; import mobvista.dmp.util.MRUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.node.ObjectNode; import java.io.IOException; import java.util.Iterator; /** * author: houying * date : 16-10-25 * desc : */ public class AdnDeviceIdsMR extends Configured implements Tool { public enum Fields{ /* DEVICE_GAID("gaid",42), DEVICE_IDFA("idfa",43), DEVICE_IMEI("imei",35), DEVICE_MAC("mac",36), DEVICE_ANDROID_ID("android_id",37), DEVICE_PLATFORM("platform",13);*/ DEVICE_GAID("gaid",15), DEVICE_IDFA("idfa",16), DEVICE_IMEI("imei",12), DEVICE_MAC("mac",13), DEVICE_ANDROID_ID("android_id",14), DEVICE_PLATFORM("platform",4); private String name ; private int idx; Fields(String name, int idx){ this.name = name; this.idx = idx; } public String getName(){ return name; } public int getIdx(){ return idx; } } public static class ProfileMapper extends Mapper<LongWritable, Text, Text, Text> { private Text outKey = new Text(); private Text outValue = new Text(); private String date = null; private MultipleOutputs multipleOutputs = null; private ObjectMapper objectMapper = null; public static String ANDROID = "android"; public static String IOS = "ios"; public static String idName = "device_id"; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] array = MRUtils.SPLITTER.split(value.toString(),-1); // if (array.length >= 54) { //adn request daily if (array.length >= 20) { //adn request daily ObjectNode dspProfile = objectMapper.createObjectNode(); int idx = Fields.DEVICE_PLATFORM.getIdx(); String platform = array[idx]; if ( "ios".equals(platform) ){//ios 只使用idfa 不需要做id映射 return ; } String name = null; for (Fields Field: Fields.values()){ idx = Field.getIdx(); name = Field.getName(); String idValue = array[idx]; dspProfile.put(name,idValue); } dspProfile.put("dmp_time", date); outKey.set(dspProfile.get(Fields.DEVICE_GAID.getName()).asText()); outValue.set(dspProfile.toString()); context.write(outKey, outValue); } else { CommonMapReduce.setMetrics(context, "DMP", "adn_request_length_err", 1); return; } } @Override protected void setup(Context context) throws IOException, InterruptedException { date = Preconditions.checkNotNull(context.getConfiguration().get("task.date")); objectMapper = new ObjectMapper(); } } public static class ProfileReducer extends Reducer<Text, Text, NullWritable, Text> { private Text outValue = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String value; Iterator<Text> it = values.iterator(); if (it.hasNext()) { value = it.next().toString(); outValue.set(value); context.write(NullWritable.get(), outValue); } } } public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.set("mapreduce.map.speculative", "true"); conf.set("mapreduce.reduce.speculative", "true"); conf.set("mapreduce.task.io.sort.mb", "500"); conf.set("mapreduce.reduce.java.opts", "-Xmx1536m"); conf.set("mapreduce.reduce.memory.mb", "2048"); conf.set("mapreduce.reduce.shuffle.parallelcopies", "50"); Job job = Job.getInstance(conf, "adn device ids job"); job.setJarByClass(AdnDeviceIdsMR.class); FileInputFormat.addInputPath(job, new Path(args[0])); //FileInputFormat.addInputPath(job, new Path(args[1])); Path output = new Path(args[1]); FileSystem system = output.getFileSystem(conf); if (system.exists(output)) { system.delete(output, true); } FileOutputFormat.setOutputPath(job, output); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); job.setMapperClass(ProfileMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(ProfileReducer.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new Configuration(), new AdnDeviceIdsMR(), args)); } }