package mobvista.dmp.datasource.adn.mapreduce; import mobvista.dmp.common.CommonMapReduce; import mobvista.dmp.common.ListCache; import mobvista.dmp.util.MRUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.regex.Pattern; /** * author: houying * date : 16-11-1 * desc : adn每日的安装和campaign_list联查出package name */ public class AdnInstallDailyMR extends Configured implements Tool { private static Pattern didPtn = Pattern.compile("^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$"); private static Pattern idIosPkgPtn = Pattern.compile("^id\\d+$"); private static Pattern iosPkgPtn = Pattern.compile("^\\d+$"); private static Pattern adrPkgPtn = Pattern.compile("^[0-9a-zA-Z\\.]+$"); public static class CampaignIdJoinMapper extends Mapper<LongWritable, Text, Text, Text> { private final Text outKey = new Text(); private final Text outValue = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] array = MRUtils.SPLITTER.split(value.toString(), -1); if (array.length >= 59) { //install v3 log String campaignId = array[7]; String gaid = array[42]; String idfa = array[43]; String platform = array[13]; String requestId = array[33]; // 从extra5改为requestId 冯亮 20170616 String imei = array[35]; String andriodId = array[37]; String idfv = GetDevIdUtil.getIdfv(array[55]); String packageName = ""; if (array.length >= 72) { if (idIosPkgPtn.matcher(array[71]).matches() || iosPkgPtn.matcher(array[71]).matches() || adrPkgPtn.matcher(array[71]).matches()) { packageName = array[71]; } } outKey.set(campaignId); String extSysid = ""; if (array.length >= 118) { extSysid = array[117]; } String oaid = ""; if (array.length >= 135) { oaid = array[134]; } //新增自有Id String sysIdType = GetDevIdUtil.getExtSysId(extSysid); if (StringUtils.isNotBlank(sysIdType)) { outValue.set(MRUtils.JOINER.join(sysIdType, platform, requestId) + "#" + packageName); context.write(outKey, outValue); } String devIdType = GetDevIdUtil.getIdByIdAndPlatform(gaid, idfa, extSysid, platform); if (StringUtils.isNotBlank(devIdType)) { outValue.set(MRUtils.JOINER.join(devIdType, platform, requestId) + "#" + packageName); context.write(outKey, outValue); } /* if (gaid.matches(CommonMapReduce.didPtn) && !gaid.equals(CommonMapReduce.allZero)) { outValue.set(MRUtils.JOINER.join(gaid, "gaid", platform, requestId)); context.write(outKey, outValue); } else if (idfa.matches(CommonMapReduce.didPtn) && !idfa.equals(CommonMapReduce.allZero)) { outValue.set(MRUtils.JOINER.join(idfa, "idfa", platform, requestId)); context.write(outKey, outValue); } else { outValue.set(MRUtils.JOINER.join(idfa, "other", platform, requestId)); context.write(outKey, outValue); } */ if (!imei.isEmpty() && imei.matches(CommonMapReduce.imeiPtn) && "android".equals(platform)) { //添加imei信息 2019.01.14 outValue.set(MRUtils.JOINER.join(imei, "imei", platform, requestId) + "#" + packageName); context.write(outKey, outValue); } if (!andriodId.isEmpty() && andriodId.matches(CommonMapReduce.andriodIdPtn) && "android".equals(platform)) { //添加 android 信息 2019.01.14 outValue.set(MRUtils.JOINER.join(andriodId, "androidid", "android", requestId) + "#" + packageName); context.write(outKey, outValue); } if (!oaid.isEmpty() && !oaid.equals(CommonMapReduce.allZero) && "android".equals(platform) && !oaid.matches("^0+$")) { //添加 oaid 信息 2020.07.27 outValue.set(MRUtils.JOINER.join(oaid, "oaid", "android", requestId) + "#" + packageName); context.write(outKey, outValue); } if (!idfv.isEmpty() && !idfv.equals(CommonMapReduce.allZero) && "ios".equals(platform) && didPtn.matcher(idfv).matches()) { //添加 idfv 信息 2020.08.04 outValue.set(MRUtils.JOINER.join(idfv, "idfv", "ios", requestId) + "#" + packageName); context.write(outKey, outValue); } } else if (array.length <= 5) { //campaign list String campaignId = array[0]; String packageName = array[2]; outKey.set(campaignId); outValue.set("#" + packageName); context.write(outKey, outValue); } } } public static class CampaignIdJoinReducer extends Reducer<Text, Text, Text, NullWritable> { private Text outKey = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String campaignId = key.toString(); String packageName = null; ListCache<String> listCache = ListCache.newStringListCache(); for (Text value : values) { String tmp = value.toString(); if (tmp.startsWith("#")) { packageName = tmp.substring(1); } else { listCache.add(value.toString()); //device_id,device_type } } listCache.flushAndClose(); for (String device : listCache) { //去掉IOS中id开头包名中的id if (StringUtils.isNotBlank(packageName) && packageName.matches("^id\\d+$")) { if ("ios".equals(MRUtils.SPLITTER.split(device, -1)[2])) { packageName = packageName.replace("id", ""); } } else { packageName = device.split("#", -1)[1]; } outKey.set(MRUtils.JOINER.join( device.split("#", -1)[0], campaignId, packageName )); //device_id, device_type, platform, request_id, campaign_id, package_name context.write(outKey, NullWritable.get()); } } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.set("mapreduce.map.speculative", "true"); conf.set("mapreduce.reduce.speculative", "true"); conf.set("mapreduce.task.io.sort.mb", "500"); conf.set("mapreduce.reduce.java.opts", "-Xmx1536m"); conf.set("mapreduce.reduce.memory.mb", "2048"); conf.set("mapreduce.reduce.shuffle.parallelcopies", "50"); Job job = Job.getInstance(conf, "adn install daily"); job.setJarByClass(AdnInstallDailyMR.class); Path outputPath = new Path(args[2]); FileInputFormat.addInputPath(job, new Path(args[0])); FileInputFormat.addInputPath(job, new Path(args[1])); FileSystem fileSystem = outputPath.getFileSystem(conf); if (fileSystem.exists(outputPath)) { fileSystem.delete(outputPath, true); } FileOutputFormat.setOutputPath(job, outputPath); job.setMapperClass(CampaignIdJoinMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setNumReduceTasks(10); job.setReducerClass(CampaignIdJoinReducer.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new Configuration(), new AdnInstallDailyMR(), args)); } }