package mobvista.dmp.datasource.ga.mapreduce; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import mobvista.dmp.common.CommonMapReduce; import mobvista.dmp.common.CommonMapper; import mobvista.dmp.common.CommonReducer; import mobvista.dmp.util.MRUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; import java.util.regex.Pattern; /** * author: houying * date : 16-11-25 * desc : */ public class GaSubtractInterestMR extends CommonMapReduce { public GaSubtractInterestMR(String name, Class<? extends Mapper> mapperClass, Class<? extends Reducer> reducerClass) { super(name, mapperClass, reducerClass); } public static void main(String[] args) throws Exception { start(new GaSubtractInterestMR("ga subtract interest job", GaMapper.class, GaReducer.class), args); } public static class GaMapper extends CommonMapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] array = MRUtils.SPLITTER.split(value.toString(), -1); String source; if (array.length == 3) { //ga_device_total source = "1"; outKey.set(MRUtils.JOINER.join( array[0], array[1] //device_id, platform )); } else if (array.length == 4) { //dm_interest_tag source = "2"; outKey.set(MRUtils.JOINER.join( array[0], array[2] //device_id, platform )); } else { return; } outValue.set(source); context.write(outKey, outValue); } } public static class GaReducer extends CommonReducer { private Map<String, String> tagMap; @Override protected void setup(Context context) throws IOException, InterruptedException { tagMap = Maps.newHashMap(); tagMap.put("idfa", "[{\"tag\":[{\"id\":\"34\",\"1\":\"Games\"}]}]"); tagMap.put("gaid", "[{\"tag\":[{\"id\":\"69\",\"1\":\"Games\"}]}]"); } @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Set<String> sourceSet = Sets.newHashSet(); for (Text text: values) { sourceSet.add(text.toString()); } String[] array = MRUtils.SPLITTER.split(key.toString(), -1); if (!sourceSet.contains("2")) { String deviceType = ""; if (array[1].toLowerCase().equals("ios")) { deviceType = "idfa"; } else if (array[1].toLowerCase().contains("a")) { deviceType = "gaid"; } String tag = tagMap.get(deviceType); if (tag == null) { return; } out.set(MRUtils.JOINER.join( array[0], deviceType, array[1], tag )); context.write(out, NullWritable.get()); } } } @Override protected void setOutputPath(Job job, String[] args) throws IOException { FileOutputFormat.setOutputPath(job, new Path(args[2])); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); } @Override protected void setInputPath(Job job, String[] args) throws IOException { FileInputFormat.addInputPath(job, new Path(args[0])); FileInputFormat.addInputPath(job, new Path(args[1])); } }