package mobvista.dmp.datasource.ga.mapreduce.map; import mobvista.dmp.common.CommonMapReduce; import mobvista.dmp.datasource.ga.mapreduce.vo.TextPair; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; import java.util.regex.Pattern; /** * * Created by fl on 2017/4/26. */ public class GaActiveTotalMapper extends Mapper<LongWritable, Text, TextPair, Text> { private String dataType; private String dataSplit = "\t"; private TextPair outKey = new TextPair(); private Text outValue = new Text(); private Pattern pattern = Pattern.compile(dataSplit); private Pattern dayPattern = Pattern.compile("\\|"); private StringBuilder keyBuilder = new StringBuilder(); private StringBuilder valBuilder = new StringBuilder(); @Override protected void setup(Context context) throws IOException, InterruptedException { FileSplit split = (FileSplit) context.getInputSplit(); if (split.getPath().toString().contains("dau-device-data-export")) { dataType = "0"; // daily data } else { dataType = "1"; // total data } System.out.println(split.getPath().toString()); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String valStr = value.toString(); try { String[] valSplits = null; if ("0".equals(dataType)) { valSplits = dayPattern.split(value.toString(), -1); } else { valSplits = pattern.split(value.toString(), -1); } if (valSplits.length >= 2) { String deviceId = valSplits[0]; String platform = valSplits[1]; if (StringUtils.isNotEmpty(platform) && deviceId.matches(CommonMapReduce.didPtn) && !deviceId.equals(CommonMapReduce.allZero)) { keyBuilder.setLength(0); keyBuilder.append(deviceId).append(dataSplit).append(platform.toLowerCase()); int count = 0; valBuilder.setLength(0); for (String split : valSplits) { if (valBuilder.length() == 0) { valBuilder.append(split); } else { if (count == 1) { // convert platform to lowerCase valBuilder.append(dataSplit).append(split.toLowerCase()); } else { valBuilder.append(dataSplit).append(split); } } count++; } outKey.set(keyBuilder.toString(), dataType); outValue.set(valBuilder.toString()); context.write(outKey, outValue); } } else { context.getCounter("DMP", "dataLengthError").increment(1l); } } catch (Exception e) { System.out.println("type = " + dataType); System.out.println("val = " + valStr); throw new RuntimeException(e); } } }