package mobvista.dmp.datasource.dm import java.util import com.google.gson.{JsonArray, JsonObject} import mobvista.dmp.datasource.dm.Constant.{pattern, str2Json} import mobvista.dmp.util.MRUtils import mobvista.prd.datasource.util.GsonUtil import org.apache.commons.lang3.StringUtils import org.apache.spark.broadcast.Broadcast /** * @package: mobvista.dmp.datasource.dm * @author: wangjf * @date: 2019/4/12 * @time: 下午3:58 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class CustomerIteratorNew(iter: Iterator[DmDeviceTag]) extends Iterator[(String, (String, String))] { def hasNext: Boolean = { iter.hasNext } def next: (String, (String, String)) = { val row = iter.next (row.device_id, ("new", MRUtils.JOINER.join(row.package_name, row.device_type, row.platform, row.update_date, row.tags))) } } class CustomerIteratorOld(iter: Iterator[DmInterestTag]) extends Iterator[(String, (String, String))] { def hasNext: Boolean = { iter.hasNext } def next: (String, (String, String)) = { val row = iter.next (row.device_id, ("new", MRUtils.JOINER.join(row.device_type, row.platform, row.tags))) } } class CustomerIterator(iter: Iterator[(String, Iterable[(String, String)])]) extends Iterator[DmInterestTag] { def hasNext: Boolean = { iter.hasNext } def next: DmInterestTag = { val ir = iter.next val device_id = ir._1 var device_type_new = "" var platform_new = "" val resultMap: util.HashMap[String, (String, String, String)] = new util.HashMap[String, (String, String, String)]() val rs = ir._2.iterator val jsonArray = new JsonArray val tmpJsonArray = new JsonArray while (rs.hasNext) { val r = rs.next if (r._1.equals("new")) { val array = MRUtils.SPLITTER.split(r._2) val package_name = array(0) if (StringUtils.isBlank(device_type_new)) { device_type_new = array(1) } if (StringUtils.isBlank(platform_new)) { platform_new = array(2) } val update_date = array(3) val tags = array(4) if (resultMap.containsKey(package_name)) { val values = resultMap.get(package_name) resultMap.put(package_name, (values._1, values._2, tags)) } else { if (pattern.matcher(update_date).matches()) { resultMap.put(package_name, (update_date, "", tags)) } else { resultMap.put(package_name, ("", "", tags)) } } } if (r._1.equals("old")) { val array = MRUtils.SPLITTER.split(r._2) if (StringUtils.isBlank(device_type_new)) { device_type_new = array(0) } if (StringUtils.isBlank(platform_new)) { platform_new = array(1) } val jsonNode = GsonUtil.String2JsonArray(array(2)) for (i <- 0 until jsonNode.size) { val json = jsonNode.get(i).getAsJsonObject if (!json.has("package_name") || !json.has("date")) { tmpJsonArray.add(json) } else { if (resultMap.containsKey(json.get("package_name").getAsString)) { val values = resultMap.get(json.get("package_name").getAsString) resultMap.put(json.get("package_name").getAsString, (values._1, json.get("tag").toString, values._3)) } else { if (pattern.matcher(json.get("date").getAsString).matches()) { resultMap.put(json.get("package_name").getAsString, (json.get("date").getAsString, json.get("tag").toString, "")) } else { resultMap.put(json.get("package_name").getAsString, ("", json.get("tag").toString, "")) } } } } } } import scala.collection.JavaConversions._ resultMap.foreach(rs => { val jsonObject = new JsonObject jsonObject.addProperty("package_name", rs._1) if (StringUtils.isNotBlank(rs._2._1)) { jsonObject.addProperty("date", rs._2._1) } if (StringUtils.isNotBlank(rs._2._2)) { jsonObject.add("tag", GsonUtil.String2JsonArray(rs._2._2)) } if (StringUtils.isNotBlank(rs._2._3)) { jsonObject.add("tag_new", GsonUtil.String2JsonArray(rs._2._3)) } jsonArray.add(jsonObject) }) if (tmpJsonArray.size > 0) { jsonArray.addAll(tmpJsonArray) } DmInterestTag(device_id, device_type_new, platform_new, jsonArray.toString) } } class CustomerIteratorV2(iter: Iterator[(String, Iterable[(String, String)])], bMap: Broadcast[scala.collection.Map[String, String]]) extends Iterator[DmInterestTag] { def hasNext: Boolean = { iter.hasNext } def next: DmInterestTag = { val ir = iter.next val device_id = ir._1 var device_type_new = "" var platform_new = "" val resultMap: java.util.HashMap[String, (String, String, util.HashSet[String])] = new util.HashMap[String, (String, String, util.HashSet[String])]() val rs = ir._2.iterator val jsonArray = new JsonArray val tmpJsonArray = new JsonArray while (rs.hasNext) { val r = rs.next if (r._1.equals("new")) { val array = MRUtils.SPLITTER.split(r._2) val package_name = array(0) if (StringUtils.isBlank(device_type_new)) { device_type_new = array(1) } if (StringUtils.isBlank(platform_new)) { platform_new = array(2) } val tag_type = array(3) val first_tag = array(4) val second_tag = array(5) val update_date = array(6) val tag_code = tag_type + "-" + first_tag + "-" + second_tag val tag_id = bMap.value.getOrElse(tag_code.toUpperCase, "") val tagJson = str2Json(tag_id, first_tag, second_tag) if (resultMap.keySet.contains(package_name)) { val values = resultMap.get(package_name) val tag_new = values._3 tag_new.add(tagJson) resultMap.put(package_name, (values._1, values._2, tag_new)) } else { val tag_new = new util.HashSet[String]() tag_new.add(tagJson) resultMap.put(package_name, (update_date, "", tag_new)) } } if (r._1.equals("old")) { val array = MRUtils.SPLITTER.split(r._2) if (StringUtils.isBlank(device_type_new)) { device_type_new = array(0) } if (StringUtils.isBlank(platform_new)) { platform_new = array(1) } val jsonNode = GsonUtil.String2JsonArray(array(2)) for (i <- 0 until jsonNode.size) { val json = jsonNode.get(i).getAsJsonObject if (!json.has("package_name") || !json.has("date")) { tmpJsonArray.add(json) } else if (resultMap.containsKey(json.get("package_name").getAsString)) { val values = resultMap.get(json.get("package_name").getAsString) resultMap.put(json.get("package_name").getAsString, (values._1, json.get("tag").toString, values._3)) } else { resultMap.put(json.get("package_name").getAsString, (json.get("date").getAsString, json.get("tag").toString, new util.HashSet[String]())) } } } } import scala.collection.JavaConversions._ resultMap.foreach(rs => { val jsonObject = new JsonObject jsonObject.addProperty("package_name", rs._1) if (StringUtils.isNotBlank(rs._2._1)) { jsonObject.addProperty("date", rs._2._1) } if (StringUtils.isNotBlank(rs._2._2)) { jsonObject.add("tag", GsonUtil.String2JsonArray(rs._2._2)) } if (rs._2._3.size() > 0) { jsonObject.add("tag_new", GsonUtil.String2JsonArray(rs._2._3.toString)) } jsonArray.add(jsonObject) }) if (tmpJsonArray.size > 0) { jsonArray.addAll(tmpJsonArray) } DmInterestTag(device_id, device_type_new, platform_new, jsonArray.toString) } }