TrackingInstallDailyMR.java 8.15 KB
package mobvista.dmp.datasource.tracking.mapreduce;

import mobvista.dmp.common.CommonMapReduce;
import mobvista.dmp.datasource.ga.mapreduce.vo.TextPair;
import mobvista.prd.datasource.util.MRUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;

/**
 * Created by liushuai on 2017/3/20 0020.
 * desc  : 清洗3s设备每日安装的app数据
 */
public class TrackingInstallDailyMR {
    public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {

        Configuration conf = new Configuration();
        conf.set("mapreduce.map.speculative", "true");
        conf.set("mapreduce.reduce.speculative", "true");
        conf.set("mapreduce.task.io.sort.mb", "500");
        conf.set("mapreduce.reduce.java.opts", "-Xmx1536m");
        conf.set("mapreduce.reduce.memory.mb", "2048");
        conf.set("mapreduce.reduce.shuffle.parallelcopies", "50");
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        Job job = Job.getInstance(conf, "TrackingInstallDailyMR");

        job.setJarByClass(TrackingInstallDailyMR.class);
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

        job.setMapperClass(TrackingInstallDailyMapper.class);
        job.setReducerClass(TrackingInstallDailyReducer.class);

        job.setMapOutputKeyClass(TextPair.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        job.setGroupingComparatorClass(TextPair.FirstComparator.class);
        job.setPartitionerClass(TextPair.FirstPartitioner.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class TrackingInstallDailyMapper extends Mapper<LongWritable, Text, TextPair, Text> {
        String date;
        Text outValue = new Text();
        TextPair outKey = new TextPair();

        public void setup(Context context) throws IOException {
            date = context.getConfiguration().get("task.date");
        }

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();

            if (line.contains("\t")) {
                String[] fields = MRUtils.SPLITTER.split(line, -1);
                if (fields.length != 3) {
                    return;
                }
                outKey.set(fields[0], "0");
                outValue.set(MRUtils.JOINER.join(
                        fields[1], // 平台
                        fields[2],  // package_name
                        "0"
                ));
                context.write(outKey, outValue);
            } else {
                String[] fields = line.split(",", -1);
                if(fields != null && fields.length >= 29) {
                    // if (!fields[10].isEmpty() && ( fields[10].matches(CommonMapReduce.andriodIdPtn) || ( fields[10].matches(CommonMapReduce.didPtn) && !fields[10].equals(CommonMapReduce.allZero)))) {  // adrId 在该字段中
                    if (fields[10].matches(CommonMapReduce.didPtn) && !fields[10].equals(CommonMapReduce.allZero)) {
                        outKey.set(fields[1], "1");
                        outValue.set(MRUtils.JOINER.join(fields[10], fields[3].toUpperCase(), date, "1"));
                        context.write(outKey, outValue);
                    } else if (fields[28].matches(CommonMapReduce.didPtn) && !fields[28].equals(CommonMapReduce.allZero)) {
                        outKey.set(fields[1], "1");
                        outValue.set(MRUtils.JOINER.join(fields[28], fields[3].toUpperCase(), date, "1"));
                        context.write(outKey, outValue);
                    }

                    if (!fields[10].isEmpty() && fields[10].matches(CommonMapReduce.andriodIdPtn)) {
                        //   System.out.println("TrackingInstallDailyMapper:" + fields[10]);
                        outKey.set(fields[1], "1");
                        outValue.set(MRUtils.JOINER.join(fields[10], fields[3].toUpperCase(), date, "1"));
                        context.write(outKey, outValue);
                    }
                    if (!fields[11].isEmpty() && fields[11].matches(CommonMapReduce.imeiPtn)) {  // add imei信息
                        outKey.set(fields[1], "1");
                        outValue.set(MRUtils.JOINER.join("imei", fields[11], fields[3].toUpperCase(), date, "1"));
                        context.write(outKey, outValue);
                    }
                }
            }
        }
    }


    public static class TrackingInstallDailyReducer extends Reducer<TextPair, Text, Text, NullWritable> {
        private Text outKey = new Text();
        private static final String idfaRegex = "^[0-9A-F]{8}(-[0-9A-F]{4}){3}-[0-9A-F]{12}$";

        public void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            String first = values.iterator().next().toString();
            String[] firstSplits = MRUtils.SPLITTER.split(first, -1);

            // 验证如果第一条数据不是来自campaign表或来自campaign表但包名不为空,则返回
            String packageName = firstSplits[1];
            String source = firstSplits[firstSplits.length - 1];
            if ("0".equals(source)) {
                if (StringUtils.isEmpty(packageName)) {
                    return;
                }
            } else {
                // 第一条不是campaign,不能匹配到package 故退出
                return;
            }


            for (Text value : values) {
                String[] fields = MRUtils.SPLITTER.split(value.toString(), -1);

                if(fields.length == 5){ // imei信息
                    outKey.set(MRUtils.JOINER.join(fields[1], "imei", "android", packageName, fields[3], fields[2]));
                    context.write(outKey, NullWritable.get());
                }else {
                    String deviceId = fields[0].replaceAll("\"", "").replaceAll("\\[", "");
                    String platform = getPlatform(firstSplits[0], deviceId);

                    String deviceType = getDeviceType(platform);
                    if ("ios".equals(platform)) {
                        packageName = packageName.replace("id", "");
                    }

                    if(deviceId.matches(CommonMapReduce.andriodIdPtn)) {
                        deviceType = "androidid";  //adr type
                    }
                    outKey.set(MRUtils.JOINER.join(deviceId, deviceType, platform, packageName, fields[2], fields[1]));
                    context.write(outKey, NullWritable.get());
                }
            }
        }

        private String getPlatform(String platform, String deviceId) {
            if (!"ios".equalsIgnoreCase(platform) && !"android".equalsIgnoreCase(platform)) {
                if (deviceId.matches(idfaRegex)) {
                    return "ios";
                } else {
                    return "android";
                }
            }
            return platform;
        }

        private String getDeviceType(String platform) {
            switch (platform) {
                case "ios":
                    return "idfa";
                case "android":
                    return "gaid";
                case "adr":
                    return "gaid";
                default:
                    return "unknown";
            }
        }
    }
}