package mobvista.dmp.datasource.dsp.mapreduce; import com.google.common.base.Preconditions; import mobvista.dmp.common.CommonMapReduce; import mobvista.dmp.util.MRUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.node.ObjectNode; import java.io.IOException; import java.util.Iterator; /** * author: houying * date : 16-10-25 * desc : */ public class DspDeviceIdsMR extends Configured implements Tool { public enum Fields{ DEVICE_GAID("gaid",34), DEVICE_IDFA("idfa",37), DEVICE_PLATFORM("platform",29), DEVICE_IDS("ids",15); private String name ; private int idx; Fields(String name, int idx){ this.name = name; this.idx = idx; } public String getName(){ return name; } public int getIdx(){ return idx; } } /** * The enum Ids. ids from dsp */ public enum Ids{ //gaid,idfa,idfamd5,idfasha1,imei,imeimd5,imeisha1,androidid,androididmd5,androididsha1,macmd5,macsha1 GAID("gaid",0), IDFA("idfa",1), IMEI("imei",4), ANDROID_ID("android_id",7); Ids(String name, int idx){ this.name = name; this.idx = idx; } private String name; private int idx; public String getName() { return name; } public int getIdx() { return idx; } } public static class ProfileMapper extends Mapper<LongWritable, Text, Text, Text> { private Text outKey = new Text(); private Text outValue = new Text(); private String date = null; private MultipleOutputs multipleOutputs = null; private ObjectMapper objectMapper = null; public static String ANDROID = "android"; public static String IOS = "ios"; public static String idName = "device_id"; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] array = MRUtils.SPLITTER.split(value.toString()); if (array.length >= 52) { //adn_dsp daily ObjectNode dspProfile = objectMapper.createObjectNode(); int idx = Fields.DEVICE_PLATFORM.getIdx(); String platform = array[idx]; if ( "ios".equals(platform) ){//ios 只使用idfa 不需要做id映射 return ; } idx = Fields.DEVICE_IDS.getIdx(); String name = Fields.DEVICE_IDS.getName(); String outValueStr = array[idx]; String [] idsArray = outValueStr.trim().split(",",-1); if ( idsArray.length < 7 ){ CommonMapReduce.setMetrics(context, "DMP", "dsp_ids_err", 1); return; } for (Ids id:Ids.values()){ idx = id.getIdx(); name = id.getName(); String idValue = idsArray[idx]; dspProfile.put(name,idValue); } dspProfile.put("dmp_time", date); outKey.set(dspProfile.get(Ids.GAID.getName()).asText()); outValue.set(dspProfile.toString()); int count =0; if (count == 100000){ System.out.println("dspProfile: "+ dspProfile); count = 0; } context.write(outKey, outValue); } else { CommonMapReduce.setMetrics(context, "DMP", "dsp_length_err", 1); return; } } @Override protected void setup(Context context) throws IOException, InterruptedException { date = Preconditions.checkNotNull(context.getConfiguration().get("task.date")); objectMapper = new ObjectMapper(); } } public static class ProfileReducer extends Reducer<Text, Text, NullWritable, Text> { private Text outValue = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String value; Iterator<Text> it = values.iterator(); if (it.hasNext()) { value = it.next().toString(); outValue.set(value); context.write(NullWritable.get(), outValue); int count =0; if (count == 10000){ System.out.println("dspProfile: "+ value); count = 0; } } } } public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.set("mapreduce.map.speculative", "true"); conf.set("mapreduce.reduce.speculative", "true"); conf.set("mapreduce.task.io.sort.mb", "500"); conf.set("mapreduce.reduce.java.opts", "-Xmx1536m"); conf.set("mapreduce.reduce.memory.mb", "2048"); conf.set("mapreduce.reduce.shuffle.parallelcopies", "50"); Job job = Job.getInstance(conf, "Dsp Profile job"); job.setJarByClass(DspDeviceIdsMR.class); FileInputFormat.addInputPath(job, new Path(args[0])); //FileInputFormat.addInputPath(job, new Path(args[1])); Path output = new Path(args[1]); FileSystem system = output.getFileSystem(conf); if (system.exists(output)) { system.delete(output, true); } FileOutputFormat.setOutputPath(job, output); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); job.setMapperClass(ProfileMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(ProfileReducer.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new Configuration(), new DspDeviceIdsMR(), args)); } }