package mobvista.dmp.datasource.dm import com.google.gson.JsonArray import mobvista.prd.datasource.util.GsonUtil import org.apache.commons.lang3.StringUtils import org.apache.spark.broadcast.Broadcast /** * @package: mobvista.dmp.datasource.dm * @author: wangjf * @date: 2019/4/16 * @time: 下午1:44 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class CustomerIteratorInterest(iter: Iterator[DmInterestTag], bMap: Broadcast[scala.collection.Map[String, String]]) extends Iterator[DmInterestTag] { def hasNext: Boolean = { iter.hasNext } def next: DmInterestTag = { val row = iter.next val device_id = row.device_id val device_type = row.device_type val platform = row.platform val tags = row.tags val jsonNode = GsonUtil.String2JsonArray(tags) val jsonArray = new JsonArray for (i <- 0 until jsonNode.size) { val json = jsonNode.get(i).getAsJsonObject if (json.has("package_name")) { val package_name = json.get("package_name").getAsString val tag_new = bMap.value.getOrElse(package_name + "-" + platform, "") if (StringUtils.isNotBlank(tag_new)) { json.add("tag_new", GsonUtil.String2JsonArray(tag_new)) } } jsonArray.add(json) } DmInterestTag(device_id, device_type, platform, jsonArray.toString) } }