DspDailyEtlMR.java 12.1 KB
package mobvista.dmp.datasource.dsp.mapreduce;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import mobvista.dmp.common.CommonMapReduce;
import mobvista.dmp.common.CommonMapper;
import mobvista.dmp.format.TextMultipleOutputFormat;
import mobvista.dmp.util.MRUtils;
import mobvista.prd.datasource.util.GsonUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

/**
 * author: houying
 * date  : 17-2-17
 * desc  :
 * output: device_id, device_type, platform, country_code, ip, gender, birthday, maker, model, os_version, package_list, androidIds, datetime, segment
 */
public class DspDailyEtlMR extends CommonMapReduce {

    public enum Fields{
        IDFA("idfa", 37),
        GAID("gaid", 34),
        PKG_NAME("package_name", 20),
        PLATFORM("platform",29),
        UPDATE_TIME("time",0),
        IP("ip",26),
        MAKER("maker",27),
        MODEL("model",28),
        OS_VERSION("os_version",30),
        COUNTRY_CODE("country_code",33),
        BIRTHDAY("birthday",39),
        GENDER("gender",40),
        EXT_ID("ext_id", 15),
        JSON_MSG("json_msg", 6);

        private String name ;
        private int  idx;
        Fields(String name, int idx){
            this.name = name;
            this.idx = idx;
        }
        public String getName(){
            return name;
        }
        public int getIdx(){
            return idx;
        }
    }

    public static class DspDailyEtlMapper extends CommonMapper {
        private Pattern iosPkgPtn = Pattern.compile("^\\d+$");
        private Pattern adrPkgPtn = Pattern.compile("^[0-9a-zA-Z\\.]+$");
        private Pattern deviceIdPtn = Pattern.compile("^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$");
        private Pattern idSplitPtn = Pattern.compile(",");

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] array = MRUtils.SPLITTER.split(value.toString());
            if (array.length < 52) {
                CommonMapReduce.setMetrics(context, "DMP", "dsp_log_fields_num_error", 1);
                return;
            }
            String platform = array[Fields.PLATFORM.getIdx()];
            String packageName = array[Fields.PKG_NAME.getIdx()];
            String idfa = array[Fields.IDFA.getIdx()];
            String gaid = array[Fields.GAID.getIdx()];
            String jsonMsg = array[Fields.JSON_MSG.getIdx()];
            String deviceId = null;
            String deviceType = null;
            String geoInfo = null;
            String longitude = null; //经度
            String latitude = null;  //纬度
            String segment = null;   // double click segment

            // 去掉IOS中id开头包名中的id
            if (platform.equals("ios") && packageName.matches("^id\\d+$")) {
                packageName = packageName.replace("id", "");
            }

            if (platform.equals("ios") && idfa.length() > 4 && checkPkgName("ios", packageName)) {
                deviceId = idfa;
                deviceType = "idfa";
            } else if (platform.equals("android") && gaid.length() > 4 && checkPkgName("adr", packageName)) {
                deviceId = gaid;
                deviceType = "gaid";
            } else {
                return;
            }
            if (!deviceIdPtn.matcher(deviceId).matches()) {
                setMetrics(context, "DMP", "device_id_illegal_format", 1);
                return;
            }
            String[] ids = idSplitPtn.split(array[Fields.EXT_ID.getIdx()], -1);
            if (ids.length <= 5) {
                return;
            }
            String androidId = ids[5];


            if (jsonMsg.startsWith("{")) {
                //处理jsonMsg,获取geo属性值
                JsonObject json = GsonUtil.String2JsonObject(jsonMsg);
                JsonElement element = json.get("device");
                if (element != null && !element.isJsonNull()) {
                    JsonElement geoElement = element.getAsJsonObject().get("geo");
                    if (geoElement != null && !geoElement.isJsonNull()) {
                        geoInfo = geoElement.toString();
                        JsonObject geoJson = geoElement.getAsJsonObject();
                        JsonElement lonElement = geoJson.get("lon");
                        if (lonElement != null && !lonElement.isJsonNull()) {
                            longitude = lonElement.toString();
                        }
                        JsonElement latElement = geoJson.get("lat");
                        if (latElement != null && !latElement.isJsonNull()) {
                            latitude = latElement.toString();
                        }
                    }
                }

                // 获取segment信息
                JsonElement userElement = json.get("user");
                if (userElement != null && !userElement.isJsonNull()) {
                    JsonElement dataElement = userElement.getAsJsonObject().get("data");
                    if (dataElement != null && !dataElement.isJsonNull()) {
                        JsonArray dataArray = dataElement.getAsJsonArray();
                        for (JsonElement dataEle : dataArray) {
                            JsonElement segElement = dataEle.getAsJsonObject().get("segment");
                            if (segElement != null && !segElement.isJsonNull()) {
                                segment = segElement.toString();
                            }
                        }
                    }
                }
            }

            outKey.set(MRUtils.JOINER.join(deviceId, deviceType));
            outValue.set(MRUtils.JOINER.join(
                    platform,
                    array[Fields.COUNTRY_CODE.getIdx()],
                    array[Fields.IP.getIdx()],
                    array[Fields.GENDER.getIdx()],
                    array[Fields.BIRTHDAY.getIdx()],
                    array[Fields.MAKER.getIdx()],
                    array[Fields.MODEL.getIdx()],
                    array[Fields.OS_VERSION.getIdx()],
                    array[Fields.PKG_NAME.getIdx()],   //8
                    androidId,
                    array[Fields.UPDATE_TIME.getIdx()], //10
                    geoInfo,
                    longitude,
                    latitude,
                    segment
            )); // size = 15
            context.write(outKey, outValue);
        }

        private boolean checkPkgName(String platform, String pkg) {
            switch (platform) {
            case "ios":
                return iosPkgPtn.matcher(pkg).matches() || adrPkgPtn.matcher(pkg).matches();
            case "adr":
                return adrPkgPtn.matcher(pkg).matches();
            default:
                return false;
            }
        }
    }

    public static class DspDailyEtlReducer extends Reducer<Text, Text, Text, Text> {
        private ObjectMapper objectMapper = new ObjectMapper();
        private Text outKey = new Text();
        private Text outValue = new Text();
        private String outPath;
        private String detailOutPath;
        private StringBuilder builder = new StringBuilder();

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            detailOutPath = conf.get("detailOutPath");
            outPath = conf.get("outPath");
        }

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            Set<String> pkgSet = Sets.newHashSet();
            Map<String, SegmentVO> segmentMap = Maps.newHashMap();
            String[] outFields = null; //new String[11];
            Set<String> androidId = Sets.newHashSet();
            for (Text value: values) {
                String line = value.toString();
                String[] array = MRUtils.SPLITTER.split(line, -1);
                if (array.length != 15) {
                    continue;
                }
                if (outFields == null) {
                    outFields = new String[13];
                    outFields[0] = key.toString(); //device_id \t device_type
                    for (int i = 1; i < 12; i++) {
                        outFields[i] = array[i - 1];
                    }
                }
                pkgSet.add(array[8]);
                if (!array[9].isEmpty()) {
                    androidId.add(array[9]);
                }

                builder.setLength(0);
                builder.append(detailOutPath)
                        .append(",")               //用于分隔输出路径和输出数据真实key,逗号左边为路径,后边为key
                        .append(key.toString())    //device_id + "\t" + device_type
                        .append("\t")
                        .append(array[0]);         //platform
                outKey.set(builder.toString());
                outValue.set(MRUtils.JOINER.join(
                        array[10], // time
                        array[2],  // ip
                        array[11], // geo
                        array[12], // longitude
                        array[13] // latitude
                ));

                String segment = array[14];
                if (segment.startsWith("[") && segment.contains("{")) {
                    SegmentVO segmentVO = null;
                    JsonArray segArray = GsonUtil.String2JsonArray(segment);
                    for (JsonElement segElement : segArray) {
                        try {
                            segmentVO = GsonUtil.fromJson(segElement, SegmentVO.class);
                            segmentMap.put(segmentVO.getId(), segmentVO);
                        } catch (Exception e) {
                            e.printStackTrace();
                            context.getCounter("DMP", "segment id error").increment(1l);
                        }
                    }
                }
                context.write(outKey, outValue);
            }
            if (outFields != null) {
                outFields[9] = objectMapper.writeValueAsString(pkgSet);
                outFields[10] = MRUtils.join(androidId, ",");
                if (!segmentMap.isEmpty()) {
                    outFields[12] = objectMapper.writeValueAsString(segmentMap.values());
                } else {
                    outFields[12] = "";
                }
                outKey.set(outPath + ", ");
                outValue.set(MRUtils.join(outFields, "\t"));
                context.write(outKey, outValue);
            }
        }
    }

    @Override
    protected void otherSetting(Job job, String[] args) throws Exception {
        job.setOutputFormatClass(TextMultipleOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
    }

    @Override
    protected void setOutputPath(Job job, String[] args) throws IOException {
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
    }

    @Override
    protected void setInputPath(Job job, String[] args) throws IOException {
        FileInputFormat.addInputPath(job, new Path(args[0]));
    }

    public DspDailyEtlMR(String name, Class<? extends Mapper> mapperClass, Class<? extends Reducer> reducerClass) {
        super(name, mapperClass, reducerClass);
    }

    public static void main(String[] args) throws Exception {
        start(new DspDailyEtlMR("dsp daily etl job", DspDailyEtlMapper.class, DspDailyEtlReducer.class), args);
    }
}