package mobvista.dmp.datasource.dsp.mapreduce; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import mobvista.dmp.common.CommonMapReduce; import mobvista.dmp.format.TextMultipleOutputFormat; import mobvista.dmp.util.MD5Util; import mobvista.dmp.util.MRUtils; import mobvista.prd.datasource.util.GsonUtil; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.orc.mapred.OrcStruct; import org.codehaus.jackson.map.ObjectMapper; import java.io.IOException; import java.util.Map; import java.util.Set; import java.util.regex.Pattern; import static mobvista.dmp.common.CommonMapReduce.setMetrics; /** * author: zhi.liu * date : 18-07-18 * desc : * output: device_id, device_type, platform, country_code, ip, gender, birthday, maker, model, os_version, package_list, androidIds, datetime, segment */ public class DspDailyEtlMR3Demo extends Configured implements Tool { /** * struct<idfa:string, * gaid:string, * platform:string, * country:string, * ip:string, * gender:string, * birthday:string, * maker:string, * model:string, * osVersion:string, * packageName:string, * exitId:string, * time:string, * geoInfo:string, * longitude:string, * latitude:string> */ public enum Fields { IDFA("idfa", 0), GAID("gaid", 1), PLATFORM("platform", 2), COUNTRY("country", 3), IP("ip", 4), GENDER("gender", 5), BIRTHDAY("birthday", 6), MAKER("maker", 7), MODEL("model", 8), OS_VERSION("osVersion", 9), PKG_NAME("package_name", 10), EXT_ID("ext_id", 11), UPDATE_TIME("time",12), GEO_INFO("geoInfo", 13), LONGITUDE("longitude", 14), LATITUDE("latitude", 15), SEGMET("segment", 16), REGION("region", 17); private String name; private int idx; Fields(String name, int idx) { this.name = name; this.idx = idx; } public String getName() { return name; } public int getIdx() { return idx; } } public static class DspDailyEtlMapper3 extends Mapper<NullWritable, OrcStruct, Text, Text> { private Pattern iosPkgPtn = Pattern.compile("^\\d+$"); private Pattern adrPkgPtn = Pattern.compile("^[0-9a-zA-Z\\.]+$"); private Pattern deviceIdPtn = Pattern.compile("^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$"); private Pattern idSplitPtn = Pattern.compile(","); private Text outKey = new Text(); private Text outValue = new Text(); @Override protected void map(NullWritable key, OrcStruct value, Context context) throws IOException, InterruptedException { if (value.getNumFields() < 16) { setMetrics(context, "DMP", "dsp_log_fields_num_error", 1); return; } String region =""; if (value.getFieldValue(Fields.REGION.getIdx()) != null) { region = value.getFieldValue(Fields.REGION.getIdx()).toString(); } String idfa = ""; if (value.getFieldValue(Fields.IDFA.getIdx()) != null) { String idfaTmp = ((Text) value.getFieldValue(Fields.IDFA.getIdx())).toString(); if (StringUtils.isNotBlank(idfaTmp) && deviceIdPtn.matcher(idfaTmp).matches()) { idfa = idfaTmp; } else { setMetrics(context, "DMP", "idfa_illegal_format", 1); } } String gaid = ""; if (value.getFieldValue(Fields.GAID.getIdx()) != null) { String gaidTmp = ((Text) value.getFieldValue(Fields.GAID.getIdx())).toString(); if (StringUtils.isNotBlank(gaidTmp) && deviceIdPtn.matcher(gaidTmp).matches()) { gaid = gaidTmp; } else { setMetrics(context, "DMP", "gaid_illegal_format", 1); } } String countryCode = ""; if (value.getFieldValue(Fields.COUNTRY.getIdx()) != null) { countryCode = value.getFieldValue(Fields.COUNTRY.getIdx()).toString(); } String platform = ""; if (value.getFieldValue(Fields.PLATFORM.getIdx()) != null) { platform = ((Text) value.getFieldValue(Fields.PLATFORM.getIdx())).toString(); } else { context.getCounter("DMP", "platformIsNull").increment(1l); // platform为空则不处理 return; } // String ids = ""; String androidId = ""; String imei = ""; String imeimd5 = ""; // gaid,idfa,idfamd5,idfasha1,imei,imeimd5,imeisha1,androidid,androididmd5,androididsha1,macmd5,macsha1 if (value.getFieldValue(Fields.EXT_ID.getIdx()) != null) { String ids = ((Text) value.getFieldValue(Fields.EXT_ID.getIdx())).toString(); if (StringUtils.isNotBlank(ids)) { String[] devIds = idSplitPtn.split(((Text) value.getFieldValue(Fields.EXT_ID.getIdx())).toString(), -1); if (devIds.length >= 8) { //!androidid.isEmpty() && androidid.matches(CommonMapReduce.andriodIdPtn) if (StringUtils.isNotBlank(devIds[7]) && devIds[7].matches(CommonMapReduce.andriodIdPtn)) { androidId = devIds[7]; } if (StringUtils.isNotBlank(devIds[4]) && devIds[4].matches(CommonMapReduce.imeiPtn) && "android".equals(platform) && StringUtils.isNotBlank(countryCode) && "CN".equalsIgnoreCase(countryCode)) { imei = devIds[4]; } if (StringUtils.isNotBlank(devIds[5]) && devIds[5].matches(CommonMapReduce.imeiMd5Ptn) && "android".equals(platform) && StringUtils.isNotBlank(countryCode) && "CN".equalsIgnoreCase(countryCode)) { imeimd5 = devIds[5]; } } else { // 设置计数器 setMetrics(context, "DMP", "extIdlength<8", 1); } } else { setMetrics(context, "DMP", "EXT_IDisNull", 1); } } // 过滤掉gaid和idfa全为空的情况 if (StringUtils.isBlank(gaid) && StringUtils.isBlank(idfa) && StringUtils.isBlank(androidId) && StringUtils.isBlank(imei) && StringUtils.isBlank(imeimd5)) { context.getCounter("DMP", "deviceIdError").increment(1l); // 不能同时为空 return; } String packageName = ""; if (value.getFieldValue(Fields.PKG_NAME.getIdx()) != null) { packageName = value.getFieldValue(Fields.PKG_NAME.getIdx()).toString(); } String deviceId = ""; String deviceType = null; String geoInfo = null; if (value.getFieldValue(Fields.GEO_INFO.getIdx()) != null) { geoInfo = value.getFieldValue(Fields.GEO_INFO.getIdx()).toString(); } String longitude = null; //经度 String latitude = null; //纬度 String segment = null; // double click segment if (value.getFieldValue(Fields.LONGITUDE.getIdx()) != null) { longitude = value.getFieldValue(Fields.LONGITUDE.getIdx()).toString(); } if (value.getFieldValue(Fields.LATITUDE.getIdx()) != null) { latitude = value.getFieldValue(Fields.LATITUDE.getIdx()).toString(); } if (value.getFieldValue(Fields.SEGMET.getIdx()) != null) { segment = value.getFieldValue(Fields.SEGMET.getIdx()).toString(); } // 去掉IOS中id开头包名中的id if (platform.equals("ios") && packageName.matches("^id\\d+$")) { packageName = packageName.replace("id", ""); } if (platform.equals("ios") && idfa.length() > 4 && checkPkgName("ios", packageName) && !CommonMapReduce.allZero.equals(idfa)) { deviceId = idfa; deviceType = "idfa"; } else if (platform.equals("android") && gaid.length() > 4 && checkPkgName("adr", packageName) && !CommonMapReduce.allZero.equals(gaid)) { deviceId = gaid; deviceType = "gaid"; } if (value.getFieldValue(Fields.PKG_NAME.getIdx()) != null) { packageName = value.getFieldValue(Fields.PKG_NAME.getIdx()).toString(); } String ip = ""; if (value.getFieldValue(Fields.IP.getIdx()) != null) { ip = value.getFieldValue(Fields.IP.getIdx()).toString(); } String gender = ""; if (value.getFieldValue(Fields.GENDER.getIdx()) != null) { gender = value.getFieldValue(Fields.GENDER.getIdx()).toString(); } String birthday = ""; if (value.getFieldValue(Fields.BIRTHDAY.getIdx()) != null) { birthday = value.getFieldValue(Fields.BIRTHDAY.getIdx()).toString(); } String maker = ""; if (value.getFieldValue(Fields.MAKER.getIdx()) != null) { maker = value.getFieldValue(Fields.MAKER.getIdx()).toString(); } String model = ""; if (value.getFieldValue(Fields.MODEL.getIdx()) != null) { model = value.getFieldValue(Fields.MODEL.getIdx()).toString(); } String osVersion = ""; if (value.getFieldValue(Fields.OS_VERSION.getIdx()) != null) { osVersion = value.getFieldValue(Fields.OS_VERSION.getIdx()).toString(); } String updateTime = ""; if (value.getFieldValue(Fields.UPDATE_TIME.getIdx()) != null) { updateTime = value.getFieldValue(Fields.UPDATE_TIME.getIdx()).toString(); } if (!deviceId.isEmpty()) { outKey.set(MRUtils.JOINER.join(deviceId, deviceType)); outValue.set(MRUtils.JOINER.join( platform, countryCode, ip, gender, birthday, maker, model, osVersion, packageName, androidId, updateTime, geoInfo, longitude, latitude, segment, region )); // size = 16 context.write(outKey, outValue); } if (!androidId.isEmpty()) { outKey.set(MRUtils.JOINER.join(androidId, "androidid")); outValue.set(MRUtils.JOINER.join( "android", countryCode, ip, gender, birthday, maker, model, osVersion, packageName, androidId, updateTime, geoInfo, longitude, latitude, segment, region )); // size = 15 context.write(outKey, outValue); } if (StringUtils.isNotBlank(imei)) { outKey.set(MRUtils.JOINER.join(imei, "imei")); outValue.set(MRUtils.JOINER.join( "android", countryCode, ip, gender, birthday, maker, model, osVersion, packageName, androidId, updateTime, geoInfo, longitude, latitude, segment, region )); context.write(outKey, outValue); // 成卓萌新增需求,该需求时间 2020-03-02 imei明文要变成md5类型数据输出 try { String imeiMd5 = MD5Util.getMD5Str(imei); outKey.set(MRUtils.JOINER.join(imeiMd5, "imeimd5")); outValue.set(MRUtils.JOINER.join( "android", countryCode, ip, gender, birthday, maker, model, osVersion, packageName, androidId, updateTime, geoInfo, longitude, latitude, segment, region )); context.write(outKey, outValue); } catch (Exception e) { e.printStackTrace(); } } if (StringUtils.isNotBlank(imeimd5)) { outKey.set(MRUtils.JOINER.join(imeimd5, "imeimd5")); outValue.set(MRUtils.JOINER.join( "android", countryCode, ip, gender, birthday, maker, model, osVersion, packageName, androidId, updateTime, geoInfo, longitude, latitude, segment, region )); context.write(outKey, outValue); } } private boolean checkPkgName(String platform, String pkg) { switch (platform) { case "ios": return iosPkgPtn.matcher(pkg).matches() || adrPkgPtn.matcher(pkg).matches(); case "adr": return adrPkgPtn.matcher(pkg).matches(); default: return false; } } } public static class DspDailyEtlReducer3 extends Reducer<Text, Text, Text, Text> { private ObjectMapper objectMapper = new ObjectMapper(); private Text outKey = new Text(); private Text outValue = new Text(); private String outPath; private String detailOutPath; private StringBuilder builder = new StringBuilder(); @Override protected void setup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); detailOutPath = conf.get("detailOutPath"); outPath = conf.get("outPath"); } @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Set<String> pkgSet = Sets.newHashSet(); Set<String> regionSet = Sets.newHashSet(); Map<String, SegmentVO> segmentMap = Maps.newHashMap(); String[] outFields = null; //new String[11]; Set<String> androidId = Sets.newHashSet(); for (Text value : values) { String line = value.toString(); String[] array = MRUtils.SPLITTER.split(line, -1); if (array.length != 16) { continue; } if (outFields == null) { outFields = new String[14]; outFields[0] = key.toString(); //device_id \t device_type for (int i = 1; i < 12; i++) { outFields[i] = array[i - 1]; } } String packageNames= array[8]; if(StringUtils.isNotBlank(packageNames)){ String [] pkgNames = packageNames.split("#",-1); if(pkgNames != null && pkgNames.length != 0){ for(int i=0;i< pkgNames.length;i++){ pkgSet.add(pkgNames[i]); } } } if (!array[9].isEmpty()) { androidId.add(array[9]); } /*if(!array[15].isEmpty()){ regionSet.add(array[15]); }*/ String region= array[15]; if(StringUtils.isNotBlank(region)){ String [] regions = region.split("#",-1); if(regions != null && regions.length != 0){ for(int i=0;i< regions.length;i++){ regionSet.add(regions[i]); } } } builder.setLength(0); builder.append(detailOutPath) .append(",") //用于分隔输出路径和输出数据真实key,逗号左边为路径,后边为key .append(key.toString()) //device_id + "\t" + device_type .append("\t") .append(array[0]); //platform outKey.set(builder.toString()); outValue.set(MRUtils.JOINER.join( array[10], // time array[2], // ip array[11], // geo array[12], // longitude array[13] // latitude )); String segment = array[14]; if (segment.startsWith("[") && segment.contains("{")) { SegmentVO segmentVO = null; JsonArray segArray = GsonUtil.String2JsonArray(segment); for (JsonElement segElement : segArray) { try { segmentVO = GsonUtil.fromJson(segElement, SegmentVO.class); segmentMap.put(segmentVO.getId(), segmentVO); } catch (Exception e) { e.printStackTrace(); context.getCounter("DMP", "segment id error").increment(1l); } } } context.write(outKey, outValue); } if (outFields != null) { outFields[9] = objectMapper.writeValueAsString(pkgSet); outFields[10] = MRUtils.join(androidId, ","); if (!segmentMap.isEmpty()) { outFields[12] = objectMapper.writeValueAsString(segmentMap.values()); } else { outFields[12] = ""; } if (!regionSet.isEmpty()) { outFields[13] = objectMapper.writeValueAsString(regionSet); } else { outFields[13] = ""; } outKey.set(outPath + ", "); outValue.set(MRUtils.join(outFields, "\t")); context.write(outKey, outValue); } } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.set("mapreduce.map.speculative", "true"); conf.set("mapreduce.reduce.speculative", "true"); conf.set("mapreduce.task.io.sort.mb", "500"); conf.set("mapreduce.reduce.shuffle.parallelcopies", "50"); Job job = Job.getInstance(conf, "dsp orc daily etl job"); job.setJarByClass(DspDailyEtlMR3Demo.class); setInputPath(job, args); setOutputPath(job, args); Path outputPath = FileOutputFormat.getOutputPath(job); FileSystem fileSystem = outputPath.getFileSystem(job.getConfiguration()); if (fileSystem.exists(outputPath)) { fileSystem.delete(outputPath, true); } job.setMapperClass(DspDailyEtlMapper3.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(DspDailyEtlReducer3.class); job.setOutputFormatClass(TextMultipleOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); return job.waitForCompletion(true) ? 0 : 1; } protected void setOutputPath(Job job, String[] args) throws IOException { FileOutputFormat.setOutputPath(job, new Path(args[1])); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); } protected void setInputPath(Job job, String[] args) throws IOException { FileInputFormat.addInputPath(job, new Path(args[0])); } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new Configuration(), new DspDailyEtlMR3Demo(), args)); } }