package mobvista.dmp.device.mapreduce; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.map.WrappedMapper; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer; import org.apache.hadoop.mapreduce.task.ReduceContextImpl; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.regex.Pattern; /** * author: houying * date : 16-10-8 * desc : */ public class DmpDeviceTotalMR extends Configured implements Tool { private static Pattern splitPtn = Pattern.compile("\t"); public static class DmpDeviceMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] array = splitPtn.split(line); int length = array.length; if (length == 13 || length == 34 || length == 45) { //adn || ga || dmp context.write(new Text(array[0]), value); } } } public static class DmpDeviceReducer extends Reducer<Text, Text, Text, NullWritable> { private static Joiner joiner = Joiner.on("\t").useForNull(""); private DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String id = key.toString(); String[] outputArray = null; String[] adnArray = null; String[] gaArray = null; for (Text value : values) { String[] array = splitPtn.split(value.toString()); if ((adnArray == null || ((adnArray[1].equals("androidid") || adnArray[1].equals("android_id")) && array[1].equals("gaid"))) && array.length == 13) { // adn adnArray = array; } else if (gaArray == null && array.length == 34) { //ga gaArray = array; } else if (outputArray == null && array.length == 45) { outputArray = array; } } if (outputArray == null) { outputArray = new String[45]; Arrays.fill(outputArray, ""); } outputArray[0] = id; if (gaArray != null) { fillInGa(gaArray, outputArray); } if (adnArray != null) { fillInAdn(adnArray, outputArray); } for (int i = 2; i < outputArray.length; i++) { if (outputArray[i].equals("NULL")) { outputArray[i] = ""; } } context.write(new Text(joiner.join(outputArray)), NullWritable.get()); } /** * 向outputArray中填充adn数据字段 * * @param array adn数据字段 * @param outputArray 输出数组 */ private void fillInAdn(String[] array, String[] outputArray) { outputArray[1] = array[1]; //device_type outputArray[2] = array[3]; //platform outputArray[3] = array[4]; //os version outputArray[4] = array[6]; //device model outputArray[5] = array[7]; //device brand outputArray[6] = array[9]; //country code outputArray[7] = array[10];//language outputArray[8] = array[2]; //mac outputArray[43] = array[11]; //adn_app_click_list outputArray[44] = array[12]; //update time } /** * 向outputArray中填充ga数据字段 * * @param array ga数据字段 * @param outputArray 输出数组 */ private void fillInGa(String[] array, String[] outputArray) throws IOException { outputArray[1] = array[1]; //device_type outputArray[9] = array[26]; //gender outputArray[10] = array[27]; //birth_year // outputArray[11] = ""; //profession // outputArray[12] = ""; //education level // outputArray[13] = ""; //income range // outputArray[14] = ""; //family status // outputArray[15] = ""; //hobby //outputArray[16:37] = array[4:25] System.arraycopy(array, 4, outputArray, 16, 22); outputArray[38] = array[28]; //revenue_week outputArray[39] = array[29]; //revenue_month outputArray[40] = array[30]; //average_session_per_app outputArray[41] = array[31]; //average_days_per_app outputArray[42] = array[32]; //installation list outputArray[44] = formatter.print(DateTime.now()); } } public int run(String[] strings) throws Exception { Configuration conf = getConf(); conf.set("mapreduce.map.speculative", "true"); conf.set("mapreduce.reduce.speculative", "true"); conf.set("mapreduce.task.io.sort.mb", "500"); conf.set("mapreduce.reduce.java.opts", "-Xmx1536m"); conf.set("mapreduce.reduce.memory.mb", "2048"); conf.set("mapreduce.reduce.shuffle.parallelcopies", "50"); Job job = Job.getInstance(conf, "dmp device total"); job.setJarByClass(DmpDeviceTotalMR.class); FileInputFormat.addInputPath(job, new Path(strings[0])); FileInputFormat.addInputPath(job, new Path(strings[1])); FileInputFormat.addInputPath(job, new Path(strings[2])); Path outputPath = new Path(strings[3]); FileSystem fileSystem = outputPath.getFileSystem(conf); if (fileSystem.exists(outputPath)) { fileSystem.delete(outputPath, true); } FileOutputFormat.setOutputPath(job, outputPath); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); job.setInputFormatClass(TextInputFormat.class); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(DmpDeviceMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(DmpDeviceReducer.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new Configuration(), new DmpDeviceTotalMR(), args)); /* String gaTotal = "00000B0A-46ED-4F9D-A101-7D8BF004CFF6\tidfa\tios\tIQ\t2016-09-07\t0\t\t1\t1\t0\t1\t0\t0\t0\t0\t0\t0\t0\t24\t36.0\t24\t36.0\t24\t36.0\t527708077401556\t\tfemale\t\t0.0\t0.0\t8.0\t1.0\t[{\"date\": \"20160907\", \"appName\": \"949498190\"}]\t20161006"; String adnTotal = "00000B0A-46ED-4F9D-A101-7D8BF004CFF6\tidfa\t0\tandroid\t5.0\t0\tx6a\tvivo\t0\tCN\t0\t[{\"publisher_id\":\"6028\",\"app_version\":\"0\",\"app_id\":\"20660\",\"datetime\":\"2016-09-06 21:44:49\"}]\t2016-09-06 21:44:49"; Iterable<Text> iterable = Lists.newArrayList(new Text(gaTotal), new Text(adnTotal)); new DmpDeviceReducer().reduce(new Text("00000B0A-46ED-4F9D-A101-7D8BF004CFF6\tidfa"), iterable, null);*/ } }