package mobvista.dmp.datasource.dsp.mapreduce; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import mobvista.dmp.common.CommonMapReduce; import mobvista.dmp.common.CommonMapper; import mobvista.dmp.format.TextMultipleOutputFormat; import mobvista.dmp.util.MRUtils; import mobvista.prd.datasource.util.GsonUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.codehaus.jackson.map.ObjectMapper; import java.io.IOException; import java.util.Map; import java.util.Set; import java.util.regex.Pattern; /** * author: houying * date : 17-2-17 * desc : * output: device_id, device_type, platform, country_code, ip, gender, birthday, maker, model, os_version, package_list, androidIds, datetime, segment */ public class DspDailyEtlMR extends CommonMapReduce { public enum Fields{ IDFA("idfa", 37), GAID("gaid", 34), PKG_NAME("package_name", 20), PLATFORM("platform",29), UPDATE_TIME("time",0), IP("ip",26), MAKER("maker",27), MODEL("model",28), OS_VERSION("os_version",30), COUNTRY_CODE("country_code",33), BIRTHDAY("birthday",39), GENDER("gender",40), EXT_ID("ext_id", 15), JSON_MSG("json_msg", 6); private String name ; private int idx; Fields(String name, int idx){ this.name = name; this.idx = idx; } public String getName(){ return name; } public int getIdx(){ return idx; } } public static class DspDailyEtlMapper extends CommonMapper { private Pattern iosPkgPtn = Pattern.compile("^\\d+$"); private Pattern adrPkgPtn = Pattern.compile("^[0-9a-zA-Z\\.]+$"); private Pattern deviceIdPtn = Pattern.compile("^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$"); private Pattern idSplitPtn = Pattern.compile(","); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] array = MRUtils.SPLITTER.split(value.toString()); if (array.length < 52) { CommonMapReduce.setMetrics(context, "DMP", "dsp_log_fields_num_error", 1); return; } String platform = array[Fields.PLATFORM.getIdx()]; String packageName = array[Fields.PKG_NAME.getIdx()]; String idfa = array[Fields.IDFA.getIdx()]; String gaid = array[Fields.GAID.getIdx()]; String jsonMsg = array[Fields.JSON_MSG.getIdx()]; String deviceId = null; String deviceType = null; String geoInfo = null; String longitude = null; //经度 String latitude = null; //纬度 String segment = null; // double click segment // 去掉IOS中id开头包名中的id if (platform.equals("ios") && packageName.matches("^id\\d+$")) { packageName = packageName.replace("id", ""); } if (platform.equals("ios") && idfa.length() > 4 && checkPkgName("ios", packageName)) { deviceId = idfa; deviceType = "idfa"; } else if (platform.equals("android") && gaid.length() > 4 && checkPkgName("adr", packageName)) { deviceId = gaid; deviceType = "gaid"; } else { return; } if (!deviceIdPtn.matcher(deviceId).matches()) { setMetrics(context, "DMP", "device_id_illegal_format", 1); return; } String[] ids = idSplitPtn.split(array[Fields.EXT_ID.getIdx()], -1); if (ids.length <= 5) { return; } String androidId = ids[5]; if (jsonMsg.startsWith("{")) { //处理jsonMsg,获取geo属性值 JsonObject json = GsonUtil.String2JsonObject(jsonMsg); JsonElement element = json.get("device"); if (element != null && !element.isJsonNull()) { JsonElement geoElement = element.getAsJsonObject().get("geo"); if (geoElement != null && !geoElement.isJsonNull()) { geoInfo = geoElement.toString(); JsonObject geoJson = geoElement.getAsJsonObject(); JsonElement lonElement = geoJson.get("lon"); if (lonElement != null && !lonElement.isJsonNull()) { longitude = lonElement.toString(); } JsonElement latElement = geoJson.get("lat"); if (latElement != null && !latElement.isJsonNull()) { latitude = latElement.toString(); } } } // 获取segment信息 JsonElement userElement = json.get("user"); if (userElement != null && !userElement.isJsonNull()) { JsonElement dataElement = userElement.getAsJsonObject().get("data"); if (dataElement != null && !dataElement.isJsonNull()) { JsonArray dataArray = dataElement.getAsJsonArray(); for (JsonElement dataEle : dataArray) { JsonElement segElement = dataEle.getAsJsonObject().get("segment"); if (segElement != null && !segElement.isJsonNull()) { segment = segElement.toString(); } } } } } outKey.set(MRUtils.JOINER.join(deviceId, deviceType)); outValue.set(MRUtils.JOINER.join( platform, array[Fields.COUNTRY_CODE.getIdx()], array[Fields.IP.getIdx()], array[Fields.GENDER.getIdx()], array[Fields.BIRTHDAY.getIdx()], array[Fields.MAKER.getIdx()], array[Fields.MODEL.getIdx()], array[Fields.OS_VERSION.getIdx()], array[Fields.PKG_NAME.getIdx()], //8 androidId, array[Fields.UPDATE_TIME.getIdx()], //10 geoInfo, longitude, latitude, segment )); // size = 15 context.write(outKey, outValue); } private boolean checkPkgName(String platform, String pkg) { switch (platform) { case "ios": return iosPkgPtn.matcher(pkg).matches() || adrPkgPtn.matcher(pkg).matches(); case "adr": return adrPkgPtn.matcher(pkg).matches(); default: return false; } } } public static class DspDailyEtlReducer extends Reducer<Text, Text, Text, Text> { private ObjectMapper objectMapper = new ObjectMapper(); private Text outKey = new Text(); private Text outValue = new Text(); private String outPath; private String detailOutPath; private StringBuilder builder = new StringBuilder(); @Override protected void setup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); detailOutPath = conf.get("detailOutPath"); outPath = conf.get("outPath"); } @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Set<String> pkgSet = Sets.newHashSet(); Map<String, SegmentVO> segmentMap = Maps.newHashMap(); String[] outFields = null; //new String[11]; Set<String> androidId = Sets.newHashSet(); for (Text value: values) { String line = value.toString(); String[] array = MRUtils.SPLITTER.split(line, -1); if (array.length != 15) { continue; } if (outFields == null) { outFields = new String[13]; outFields[0] = key.toString(); //device_id \t device_type for (int i = 1; i < 12; i++) { outFields[i] = array[i - 1]; } } pkgSet.add(array[8]); if (!array[9].isEmpty()) { androidId.add(array[9]); } builder.setLength(0); builder.append(detailOutPath) .append(",") //用于分隔输出路径和输出数据真实key,逗号左边为路径,后边为key .append(key.toString()) //device_id + "\t" + device_type .append("\t") .append(array[0]); //platform outKey.set(builder.toString()); outValue.set(MRUtils.JOINER.join( array[10], // time array[2], // ip array[11], // geo array[12], // longitude array[13] // latitude )); String segment = array[14]; if (segment.startsWith("[") && segment.contains("{")) { SegmentVO segmentVO = null; JsonArray segArray = GsonUtil.String2JsonArray(segment); for (JsonElement segElement : segArray) { try { segmentVO = GsonUtil.fromJson(segElement, SegmentVO.class); segmentMap.put(segmentVO.getId(), segmentVO); } catch (Exception e) { e.printStackTrace(); context.getCounter("DMP", "segment id error").increment(1l); } } } context.write(outKey, outValue); } if (outFields != null) { outFields[9] = objectMapper.writeValueAsString(pkgSet); outFields[10] = MRUtils.join(androidId, ","); if (!segmentMap.isEmpty()) { outFields[12] = objectMapper.writeValueAsString(segmentMap.values()); } else { outFields[12] = ""; } outKey.set(outPath + ", "); outValue.set(MRUtils.join(outFields, "\t")); context.write(outKey, outValue); } } } @Override protected void otherSetting(Job job, String[] args) throws Exception { job.setOutputFormatClass(TextMultipleOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); } @Override protected void setOutputPath(Job job, String[] args) throws IOException { FileOutputFormat.setOutputPath(job, new Path(args[1])); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); } @Override protected void setInputPath(Job job, String[] args) throws IOException { FileInputFormat.addInputPath(job, new Path(args[0])); } public DspDailyEtlMR(String name, Class<? extends Mapper> mapperClass, Class<? extends Reducer> reducerClass) { super(name, mapperClass, reducerClass); } public static void main(String[] args) throws Exception { start(new DspDailyEtlMR("dsp daily etl job", DspDailyEtlMapper.class, DspDailyEtlReducer.class), args); } }