package mobvista.dmp.common; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import mobvista.dmp.util.MRUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.JavaType; import org.joda.time.format.DateTimeFormat; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; /** * author: houying * date : 16-11-9 * desc : */ public class InstallTotalReducer extends CommonReducer { private ObjectMapper objectMapper; private JavaType javaType; private String date; @Override protected void setup(Context context) throws IOException, InterruptedException { this.objectMapper = new ObjectMapper(); this.javaType = objectMapper.getTypeFactory().constructMapType(HashMap.class, String.class, String.class); String tmpDate = context.getConfiguration().get("task.date"); this.date = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(tmpDate).minusMonths(12).toString("yyyy-MM-dd"); } @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String[] device = MRUtils.SPLITTER.split(key.toString(), -1); if (device.length < 2) { System.out.println(key.toString()); return; } // baddd133d15552d6 androidid android [{"date":"2019-03-23","package_name":"com.traveloka.android"}] // if (device[0].equals("00000000-0000-0000-0000-000000000000") || (! device[1].equals("idfa") && ! device[1].equals("gaid") && ! device[1].equals("imei") && ! device[1].equalsIgnoreCase("androidid") )) { if (!(device[1].equals("idfa") || device[1].equals("gaid") || device[1].equals("imei") || device[1].equalsIgnoreCase("androidid") || device[1].equalsIgnoreCase("android_id") || device[1].equalsIgnoreCase("sysid") || device[1].equalsIgnoreCase("imeimd5") || device[1].equalsIgnoreCase("gaidmd5") || device[1].equalsIgnoreCase("idfamd5") || device[1].equalsIgnoreCase("oaidmd5") || device[1].equalsIgnoreCase("oaid") || device[1].equalsIgnoreCase("idfv") || device[1].equalsIgnoreCase("ruid"))) { CommonMapReduce.setMetrics(context, "DMP", "device_zero", 1); return; } String platform = device[2]; Map<String, Map<String, String>> appMap = Maps.newHashMap(); List<Map<String, String>> tmpList = Lists.newArrayList(); for (Text value : values) { String val = value.toString(); if (val.contains("\t")) { //pkg_name \t date String[] array = MRUtils.SPLITTER.split(val); Map<String, String> app = Maps.newHashMap(); String packageName = array[0]; if ("ios".equalsIgnoreCase(platform) && packageName.matches("^id[0-9]+$")) { app.put("package_name", packageName.replace("id", "")); } else { app.put("package_name", packageName); } /** * daily 中所带的 package 的 install_date 可能过期 */ if (array[1].compareTo(date) < 0) { continue; } app.put("date", array[1]); tmpList.add(app); } else { //pkg install list JsonNode node = objectMapper.readTree(val); if (node.size() > 1000) { //安装app数超过1000 return; } for (JsonNode appNode : node) { Map<String, String> app = objectMapper.readValue(appNode, javaType); if (app.containsKey("date") && app.get("date").compareTo(date) < 0) { continue; } String packageName = app.get("package_name"); if ("ios".equalsIgnoreCase(platform) && packageName.matches("^id[0-9]+$")) { packageName = packageName.replace("id", ""); app.put("package_name", packageName); } appMap.put(packageName, app); } } } for (Map<String, String> app : tmpList) { appMap.put(app.get("package_name"), app); } if (appMap.isEmpty()) { return; } // System.out.println("keyis:" + key.toString()); out.set(MRUtils.JOINER.join( key.toString(), objectMapper.writeValueAsString(appMap.values()) )); context.write(out, NullWritable.get()); } }