InstallTotalReducer.java 4.77 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
package mobvista.dmp.common;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import mobvista.dmp.util.MRUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.JavaType;
import org.joda.time.format.DateTimeFormat;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * author: houying
 * date  : 16-11-9
 * desc  :
 */
public class InstallTotalReducer extends CommonReducer {
    private ObjectMapper objectMapper;
    private JavaType javaType;
    private String date;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        this.objectMapper = new ObjectMapper();
        this.javaType = objectMapper.getTypeFactory().constructMapType(HashMap.class, String.class, String.class);
        String tmpDate = context.getConfiguration().get("task.date");
        this.date = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(tmpDate).minusMonths(12).toString("yyyy-MM-dd");
    }

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        String[] device = MRUtils.SPLITTER.split(key.toString(), -1);
        if (device.length < 2) {
            System.out.println(key.toString());
            return;
        }
        // baddd133d15552d6	androidid	android	[{"date":"2019-03-23","package_name":"com.traveloka.android"}]
        //         if (device[0].equals("00000000-0000-0000-0000-000000000000") || (! device[1].equals("idfa")  && ! device[1].equals("gaid") && ! device[1].equals("imei") && ! device[1].equalsIgnoreCase("androidid") )) {
        if (!(device[1].equals("idfa") || device[1].equals("gaid") || device[1].equals("imei") ||
                device[1].equalsIgnoreCase("androidid") || device[1].equalsIgnoreCase("android_id") ||
                device[1].equalsIgnoreCase("sysid") || device[1].equalsIgnoreCase("imeimd5") ||
                device[1].equalsIgnoreCase("gaidmd5") || device[1].equalsIgnoreCase("idfamd5") ||
                device[1].equalsIgnoreCase("oaidmd5") || device[1].equalsIgnoreCase("oaid") ||
                device[1].equalsIgnoreCase("idfv") || device[1].equalsIgnoreCase("ruid"))) {
            CommonMapReduce.setMetrics(context, "DMP", "device_zero", 1);
            return;
        }


        String platform = device[2];
        Map<String, Map<String, String>> appMap = Maps.newHashMap();
        List<Map<String, String>> tmpList = Lists.newArrayList();
        for (Text value : values) {
            String val = value.toString();
            if (val.contains("\t")) { //pkg_name \t date
                String[] array = MRUtils.SPLITTER.split(val);
                Map<String, String> app = Maps.newHashMap();
                String packageName = array[0];
                if ("ios".equalsIgnoreCase(platform) && packageName.matches("^id[0-9]+$")) {
                    app.put("package_name", packageName.replace("id", ""));
                } else {
                    app.put("package_name", packageName);
                }
                /**
                 * daily 中所带的 package 的 install_date 可能过期
                 */
                if (array[1].compareTo(date) < 0) {
                    continue;
                }
                app.put("date", array[1]);
                tmpList.add(app);
            } else { //pkg install list
                JsonNode node = objectMapper.readTree(val);
                if (node.size() > 1000) { //安装app数超过1000
                    return;
                }
                for (JsonNode appNode : node) {
                    Map<String, String> app = objectMapper.readValue(appNode, javaType);
                    if (app.containsKey("date") && app.get("date").compareTo(date) < 0) {
                        continue;
                    }

                    String packageName = app.get("package_name");
                    if ("ios".equalsIgnoreCase(platform) && packageName.matches("^id[0-9]+$")) {
                        packageName = packageName.replace("id", "");
                        app.put("package_name", packageName);
                    }
                    appMap.put(packageName, app);
                }
            }
        }
        for (Map<String, String> app : tmpList) {
            appMap.put(app.get("package_name"), app);
        }
        if (appMap.isEmpty()) {
            return;
        }
        //   System.out.println("keyis:" + key.toString());
        out.set(MRUtils.JOINER.join(
                key.toString(),
                objectMapper.writeValueAsString(appMap.values())
        ));
        context.write(out, NullWritable.get());
    }
}