package mobvista.dmp.datasource.adn.mapreduce; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import mobvista.dmp.common.CommonMapReduce; import mobvista.dmp.common.CommonMapper; import mobvista.dmp.common.CommonReducer; import mobvista.dmp.util.MD5Util; import mobvista.dmp.util.MRUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.Map; import java.util.Set; /** * author: walt * date : 16-11-29 * desc : 从ods_adn_trackingnew_request中抽取appid字段,匹配出package_name输出到etl_adn_sdk_request_daily */ public class AdnSdkRequestPkgDailyMR extends CommonMapReduce { private static final Logger logger = LoggerFactory.getLogger(AdnSdkRequestPkgDailyMR.class); public AdnSdkRequestPkgDailyMR(String name, Class<? extends Mapper> mapperClass, Class<? extends Reducer> reducerClass) { super(name, mapperClass, reducerClass); } public static void main(String[] args) throws Exception { start(new AdnSdkRequestPkgDailyMR("adn sdk request pkg daily job", AdnSdkMapper.class, AdnSdkReducer.class), args); } public static class AdnSdkMapper extends CommonMapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] array = MRUtils.SPLITTER.split(value.toString(), -1); /* if (array.length >= 54) { String gaid = array[42]; String idfa = array[43]; String imei = array[35]; String model = array[16]; String osVersion = array[14]; String androidId = array[37]; String platform = array[13]; String appId = array[4]; String extSysid = ""; if (array.length >= 118) { extSysid = array[117]; } */ if (array.length >= 22) { String gaid = array[15]; String idfa = array[16]; String imei = array[12]; String model = array[7]; String osVersion = array[5]; String androidId = array[14]; String platform = array[4]; String appId = array[3]; String extSysid = array[18]; String oaid = array[21]; String idfv = ""; if (array.length >= 23) { idfv = array[22]; } String ruid = ""; if (array.length >= 24) { ruid = array[23]; } if ("android".equals(platform) && !gaid.isEmpty()) { CommonMapReduce.setMetrics(context, "DMP", "gaid", 1); if ("0".equals(gaid)) { CommonMapReduce.setMetrics(context, "DMP", "gaid_zero", 1); } } if ("android".equals(platform) && !imei.isEmpty()) { CommonMapReduce.setMetrics(context, "DMP", "imei", 1); if ("0".equals(imei)) { CommonMapReduce.setMetrics(context, "DMP", "imei_zero", 1); } } if ("ios".equals(platform)) { CommonMapReduce.setMetrics(context, "DMP", "ios", 1); if ("0".equals(idfa)) { CommonMapReduce.setMetrics(context, "DMP", "idfa_zero", 1); } } outValue.set(MRUtils.JOINER.join(appId, model, osVersion)); //新增自有Id String sysIdType = GetDevIdUtil.getExtSysId(extSysid); if (StringUtils.isNotBlank(sysIdType)) { outKey.set(MRUtils.JOINER.join(sysIdType, platform.toLowerCase())); context.write(outKey, outValue); } String devIdType = GetDevIdUtil.getIdByIdAndPlatform(gaid, idfa, extSysid, platform); if (StringUtils.isNotBlank(devIdType)) { outKey.set(MRUtils.JOINER.join(devIdType, platform.toLowerCase())); context.write(outKey, outValue); } if (!imei.isEmpty() && imei.matches(CommonMapReduce.imeiPtn) && "android".equals(platform)) { outKey.set(MRUtils.JOINER.join(imei, "imei", platform.toLowerCase())); context.write(outKey, outValue); // 卓萌需求,imei转换成imeimd5类型 2020.03.16 try { String imeiMd5 = MD5Util.getMD5Str(imei); outKey.set(MRUtils.JOINER.join(imeiMd5, "imeimd5", platform.toLowerCase())); context.write(outKey, outValue); } catch (Exception e) { e.printStackTrace(); } } /* if (gaid.matches(CommonMapReduce.didPtn) && !gaid.equals(CommonMapReduce.allZero)) { outKey.set(MRUtils.JOINER.join(gaid, "gaid", platform.toLowerCase())); context.write(outKey, outValue); } if (idfa.matches(CommonMapReduce.didPtn) && !idfa.equals(CommonMapReduce.allZero)) { outKey.set(MRUtils.JOINER.join(idfa, "idfa", platform.toLowerCase())); context.write(outKey, outValue); } */ if (!androidId.isEmpty() && androidId.matches(CommonMapReduce.andriodIdPtn) && "android".equals(platform)) { outKey.set(MRUtils.JOINER.join(androidId, "androidid", "android")); context.write(outKey, outValue); } // 卓盟需求,新增oaid字段 2020.07.27 if (!oaid.isEmpty() && !CommonMapReduce.allZero.equals(oaid) && "android".equals(platform) && !oaid.matches("^0+$")) { outKey.set(MRUtils.JOINER.join(oaid, "oaid", "android")); context.write(outKey, outValue); } // 算法需求,新增 idfv 字段 2020.08.03 if (StringUtils.isNotBlank(idfv) && !CommonMapReduce.allZero.equals(idfv) && "ios".equals(platform) && idfv.matches(didPtn)) { outKey.set(MRUtils.JOINER.join(idfv, "idfv", "ios")); context.write(outKey, outValue); } // 算法需求,新增 ruid 字段 2021.02.25 if (StringUtils.isNotBlank(ruid) && ruid.length() > 16) { outKey.set(MRUtils.JOINER.join(ruid, "ruid", "ios")); context.write(outKey, outValue); } } } } public static class AdnSdkReducer extends CommonReducer { private Map<String, String> appIdMap; private String date; @Override protected void setup(Context context) throws IOException, InterruptedException { appIdMap = Maps.newHashMap(); for (URI uri : context.getCacheFiles()) { logger.info("load cache {}", uri.getPath()); FileSystem fileSystem = FileSystem.get(uri, context.getConfiguration()); BufferedReader reader = new BufferedReader(new InputStreamReader(fileSystem.open(new Path(uri.getPath())))); String line; while ((line = reader.readLine()) != null) { String[] array = line.split(","); if (array[1].isEmpty() || array[1].equals("null")) { continue; } appIdMap.put(array[0], array[1]); } reader.close(); } logger.info("app id map: {}", appIdMap); date = Preconditions.checkNotNull(context.getConfiguration().get("task.date")); } @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String model = ""; String osVersion = ""; String device = key.toString(); Set<String> set = Sets.newHashSet(); for (Text text : values) { // 如此去重当数据倾斜时间会出现OOM问题 String[] valSplits = MRUtils.SPLITTER.split(text.toString(), -1); set.add(valSplits[0]); model = valSplits[1]; osVersion = valSplits[2]; } for (String appId : set) { String pkgName = Strings.nullToEmpty(appIdMap.get(appId)); if (pkgName.isEmpty()) { //no pkgname for this appid CommonMapReduce.setMetrics(context, "DMP", "appid_no_pkg", 1); } out.set(MRUtils.JOINER.join( device, appId, pkgName, date, model, osVersion )); //device_id device_type platform app_id pkg_name date context.write(out, NullWritable.get()); } } } @Override protected void otherSetting(Job job, String[] args) throws Exception { Path mapping = new Path(args[1]); for (FileStatus status : mapping.getFileSystem(job.getConfiguration()).listStatus(mapping)) { logger.info("add file {} into cache", status.getPath().toString()); job.addCacheFile(new URI(status.getPath().toString())); } // 添加人工appid与package对应关系文件 job.addCacheFile(URI.create(args[3])); } @Override protected void setOutputPath(Job job, String[] args) throws IOException { FileOutputFormat.setOutputPath(job, new Path(args[2])); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); } @Override protected void setInputPath(Job job, String[] args) throws IOException { FileInputFormat.addInputPath(job, new Path(args[0])); } }