package mobvista.dmp.common; import org.apache.commons.lang3.StringUtils; import org.apache.commons.net.util.Base64; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.*; import java.net.URI; import java.util.HashMap; import java.util.Map; /** * author: houying * date : 16-11-8 * desc : */ public abstract class CommonMapReduce extends Configured implements Tool { private final String name; private final Class<? extends Mapper> mapperClass; private final Class<? extends Reducer> reducerClass; public final static String didPtn = "^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$"; public final static String imeiPtn = "^([0-9]{15,17})$"; public final static String imeiMd5Ptn = "^([a-fA-F0-9]{32})$"; public final static String imeiMd5 = "imeimd5"; public final static String oaidMd5 = "oaidmd5"; public final static String andriodIdPtn = "^[a-zA-Z0-9]{16}$"; public final static String allZero = "00000000-0000-0000-0000-000000000000"; // 14~16位连续多位相同字符,非法IMEI过滤 public final static String imeiPtnAll = "^([0-9])\1{14,16}"; // 连续多位相同字符,非法 androidId 过滤 public final static String andriodIdAll = "^[a-zA-Z0-9]\1{15}$"; // 连续多位相同字符,非法 IMEI MD5 过滤 public final static String umd5Ptn = "^([0-9A-Za-z])\1{29,31}"; // OAID public final static String oaidPtb = "^[0-9A-Za-z-]{16,64}$"; public final static String md5Ptn = "^([a-fA-F0-9]{32})$"; public CommonMapReduce(String name, Class<? extends Mapper> mapperClass, Class<? extends Reducer> reducerClass) { this.name = name; this.mapperClass = mapperClass; this.reducerClass = reducerClass; } public static void setMetrics(Mapper.Context context, String metricsGroup, String metricsName, long increaseNum) { context.getCounter(metricsGroup, metricsName).increment(increaseNum); } public static void setMetrics(Reducer.Context context, String metricsGroup, String metricsName, long increaseNum) { context.getCounter(metricsGroup, metricsName).increment(increaseNum); } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); /* conf.set("mapreduce.map.speculative", "true"); conf.set("mapreduce.reduce.speculative", "true"); conf.set("mapreduce.task.io.sort.mb", "500"); conf.set("mapreduce.reduce.java.opts", "-Xmx1536m"); conf.set("mapreduce.reduce.memory.mb", "2048"); conf.set("mapreduce.reduce.shuffle.parallelcopies", "50");*/ Job job = Job.getInstance(conf, name); job.setJarByClass(CommonMapReduce.class); setInputPath(job, args); setOutputPath(job, args); Path outputPath = FileOutputFormat.getOutputPath(job); FileSystem fileSystem = outputPath.getFileSystem(job.getConfiguration()); if (fileSystem.exists(outputPath)) { fileSystem.delete(outputPath, true); } job.setMapperClass(mapperClass); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); if (reducerClass != null) { job.setReducerClass(reducerClass); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); } else { job.setOutputKeyClass(job.getMapOutputKeyClass()); job.setOutputValueClass(job.getMapOutputValueClass()); } otherSetting(job, args); setConf(job, args); return job.waitForCompletion(true) ? 0 : 1; } protected void otherSetting(Job job, String[] args) throws Exception { } protected void setOutputPath(Job job, String[] args) throws IOException { } protected void setInputPath(Job job, String[] args) throws IOException { } protected void setConf(Job job, String[] args) throws IOException { } public static void start(Tool tool, String[] args) throws Exception { System.exit(ToolRunner.run(new Configuration(), tool, args)); } /** * 从指定文件中读取需要广播的内容放入到Map中 * * @param filePath 文件路径 * @param conf hadoop配置 * @param keyIndex 作为map key的数据下标索引 * @param valueIndex 作为map value的数据下标索引 * @return * @throws Exception */ public static Map<String, String> buildBroadcast(String filePath, Configuration conf, int keyIndex, int valueIndex) throws Exception { Map<String, String> map = new HashMap<String, String>(); FileSystem fs = FileSystem.get(URI.create(filePath), conf); BufferedReader reader = null; try { InputStream in = fs.open(new Path(filePath)); reader = new BufferedReader(new InputStreamReader(in)); String line = reader.readLine(); while (line != null) { String[] splits = line.split("\t", -1); map.put(splits[keyIndex], splits[valueIndex]); line = reader.readLine(); } } finally { } return map; } /** * 将对象序列化成字符串 * * @param obj * @return * @throws Exception */ public static String encodeObject(Object obj) throws Exception { ObjectOutputStream oos = null; try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); oos = new ObjectOutputStream(baos); oos.writeObject(obj); byte[] bytes = baos.toByteArray(); return new String(Base64.encodeBase64(bytes)); } finally { if (oos != null) { oos.flush(); oos.close(); } } } /** * 将字符串反序列化成对象 * * @param str * @param clazz * @param <T> * @return * @throws Exception */ public static <T extends Serializable> T decodeObject(String str, Class<T> clazz) throws Exception { ObjectInputStream ois = null; try { byte[] input = Base64.decodeBase64(str); ByteArrayInputStream bais = new ByteArrayInputStream(input); ois = new ObjectInputStream(bais); return (T) ois.readObject(); } finally { if (ois != null) { ois.close(); } } } public static boolean checkDeviceId(String deviceId) { boolean flag = StringUtils.isNotBlank(deviceId) && ((deviceId.matches(didPtn) && !allZero.equals(deviceId)) || (deviceId.matches(imeiPtn) && !deviceId.matches(imeiPtnAll)) || (deviceId.matches(andriodIdPtn) && !deviceId.matches(andriodIdAll)) || (deviceId.matches(md5Ptn) && !deviceId.matches(umd5Ptn)) || deviceId.matches(oaidPtb) || deviceId.length() > 16); return flag; } }