CustomerIteratorInterest.scala 1.32 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
package mobvista.dmp.datasource.dm

import com.google.gson.JsonArray
import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.lang3.StringUtils
import org.apache.spark.broadcast.Broadcast

/**
  * @package: mobvista.dmp.datasource.dm
  * @author: wangjf
  * @date: 2019/4/16
  * @time: 下午1:44
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
class CustomerIteratorInterest(iter: Iterator[DmInterestTag], bMap: Broadcast[scala.collection.Map[String, String]]) extends Iterator[DmInterestTag] {
  def hasNext: Boolean = {
    iter.hasNext
  }

  def next: DmInterestTag = {
    val row = iter.next
    val device_id = row.device_id
    val device_type = row.device_type
    val platform = row.platform
    val tags = row.tags
    val jsonNode = GsonUtil.String2JsonArray(tags)
    val jsonArray = new JsonArray
    for (i <- 0 until jsonNode.size) {
      val json = jsonNode.get(i).getAsJsonObject
      if (json.has("package_name")) {
        val package_name = json.get("package_name").getAsString
        val tag_new = bMap.value.getOrElse(package_name + "-" + platform, "")
        if (StringUtils.isNotBlank(tag_new)) {
          json.add("tag_new", GsonUtil.String2JsonArray(tag_new))
        }
      }
      jsonArray.add(json)
    }
    DmInterestTag(device_id, device_type, platform, jsonArray.toString)
  }
}