package mobvista.dmp.datasource.ga.mapreduce; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Map; import java.util.regex.Pattern; /** * author: houying * date : 16-10-9 * desc : */ public class GaDeviceTotalMR extends Configured implements Tool { private static Pattern splitPtn = Pattern.compile("\t"); private static int GENDER_INDEX_IN_TOTAL = 26; private static final int DAILY_LENGTH = 32; public static class GaDeviceTotalMapper extends Mapper<LongWritable, Text, Text, Text> { private Pattern idfaPtn = Pattern.compile("^[0-9A-F\\-]+$"); private final Text text = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] array = splitPtn.split(value.toString()); if (array.length == 33 || array.length == DAILY_LENGTH) { String deviceId = array[0]; String platform = array[2]; platform = fixPlatform(platform, deviceId); String deviceType = getDeviceType(platform); text.set(deviceId + "\t" + deviceType); context.write(text, value); } } private String getDeviceType(String platform) { switch (platform) { case "ios": return "idfa"; case "android": return "gaid"; case "adr": return "gaid"; default: return "unknown"; } } private String fixPlatform(String platform, String deviceId) { if (idfaPtn.matcher(deviceId).matches()) { return "ios"; } return platform; } } public static class GaDeviceTotalReducer extends Reducer<Text, Text, Text, NullWritable> { private static final Logger logger = LoggerFactory.getLogger(GaDeviceTotalReducer.class); private String date = null; private Joiner joiner = Joiner.on("\t").useForNull(""); private Map<String, String> genderMap; @Override protected void setup(Context context) throws IOException, InterruptedException { date = context.getConfiguration().get("task.date"); genderMap = Maps.newHashMap(); genderMap.put("F", "F"); genderMap.put("FEMALE", "F"); genderMap.put("X", "F"); genderMap.put("M", "M"); genderMap.put("MALE", "M"); genderMap.put("Y", "M"); } @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String[] idAndType = splitPtn.split(key.toString()); if (idAndType.length != 2) { return; } String[] outputArray = null; String[] newRecord = null; for (Text value: values) { String[] array = splitPtn.split(value.toString()); if (outputArray == null && array.length == 33) { if(org.apache.commons.lang.StringUtils.isNotBlank(array[3]) && array[3].equalsIgnoreCase("GB") ){ array[3] = "UK"; } outputArray = array; } else if (newRecord == null && array.length == DAILY_LENGTH) { newRecord = array; } } // 将UK改为GB,明远需求,修改人冯亮 20170815 if (newRecord != null && newRecord.length >= 4) { String countryCode = newRecord[3]; if ("GB".equalsIgnoreCase(countryCode)) { newRecord[3] = "UK"; } } outputArray = merge(newRecord, outputArray); outputArray[0] = idAndType[0]; outputArray[1] = idAndType[1]; standardizingGender(outputArray); context.write(new Text(joiner.join(outputArray)), NullWritable.get()); } private void standardizingGender(String[] outputArray) { String gender = outputArray[GENDER_INDEX_IN_TOTAL]; //gender gender = genderMap.get(gender.toUpperCase()); if (gender != null) { outputArray[GENDER_INDEX_IN_TOTAL] = gender; } } private String[] merge(String[] newRecord, String[] outputArray) { Preconditions.checkArgument(newRecord != null || outputArray != null, "both newRecord and outputArray are null"); if (newRecord == null) { return outputArray; } if (outputArray == null) { outputArray = new String[33]; //copy System.arraycopy(newRecord, 2, outputArray, 2, 4); System.arraycopy(newRecord, 6, outputArray, 7, 25); //set first_payed if (!Strings.isNullOrEmpty(newRecord[5]) && isNumeric(newRecord[5]) && Integer.valueOf(newRecord[5]) > 0) { outputArray[6] = date; } //set update_time outputArray[32] = date; //return return outputArray; } //neither newRecord nor outputArray is null if (Strings.isNullOrEmpty(outputArray[32]) || outputArray[32].equals("NULL") ) { outputArray[32] = date; } if (outputArray[32].compareTo(date) >= 0) { return outputArray; } String createTime = outputArray[4]; System.arraycopy(newRecord, 2, outputArray, 2, 4); outputArray[4] = createTime; System.arraycopy(newRecord, 6, outputArray, 7, 25); //set first_payed if (!Strings.isNullOrEmpty(newRecord[5]) && isNumeric(newRecord[5]) && Integer.valueOf(newRecord[5]) > 0) { outputArray[6] = date; } outputArray[32] = date; //update time return outputArray; } } public static boolean isNumeric(String s) { if (s != null && !"".equals(s.trim())) return s.matches("^[0-9]*$"); else return false; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new Configuration(), new GaDeviceTotalMR(), args)); } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.set("mapreduce.task.io.sort.mb", "500"); conf.set("mapreduce.reduce.java.opts", "-Xmx1536m"); conf.set("mapreduce.reduce.memory.mb", "2048"); conf.set("mapreduce.reduce.shuffle.parallelcopies", "50"); Job job = Job.getInstance(conf, "ga device total"); job.setJarByClass(GaDeviceTotalMR.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileInputFormat.addInputPath(job, new Path(args[1])); Path outputPath = new Path(args[2]); FileSystem fileSystem = outputPath.getFileSystem(conf); if (fileSystem.exists(outputPath)) { fileSystem.delete(outputPath, true); } FileOutputFormat.setOutputPath(job, outputPath); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); job.setMapperClass(GaDeviceTotalMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(GaDeviceTotalReducer.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); return job.waitForCompletion(true) ? 0 : 1; } }