CustomIteratorAudienceInfoV2.scala 1.71 KB
package mobvista.dmp.datasource.rtdmp

import com.alibaba.fastjson.JSONObject
import mobvista.dmp.common.MobvistaConstant
import mobvista.dmp.datasource.rtdmp.Constant.{AudienceInfo, NewAudienceInfo}

/**
 * @package: mobvista.dmp.datasource.rtdmp
 * @author: wangjf
 * @date: 2020/8/4
 * @time: 4:00 下午
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
class CustomIteratorAudienceInfoV2(audinceInfos: Iterator[AudienceInfo], update_time: String, expire_time: String, set: java.util.Set[Integer])
  extends Iterator[NewAudienceInfo] {
  override def hasNext: Boolean = {
    audinceInfos.hasNext
  }

  import scala.collection.JavaConversions._
  import scala.collection.JavaConverters._

  override def next(): Constant.NewAudienceInfo = {
    val audienceInfo = audinceInfos.next()
    val devId = audienceInfo.devid
    //  val update_time = audienceInfo.update_time
    val audience_data = audienceInfo.audience_data
    val old_audience_data = audienceInfo.old_audience_data
    val new_json = new JSONObject()
    audience_data.split(",", -1).foreach(k => {
      if (Integer.parseInt(k) > 0) {
        new_json.put(k, update_time)
      }
    })

    //  对旧的 audience_data 数据进行过滤,筛选出满足条件(audienceId 对应的 update_date 没有过期)的 audienceId 映射关系
    val old_json = MobvistaConstant.String2JSONObject(old_audience_data)
      .asInstanceOf[java.util.Map[String, String]]
      .retain((k, v) =>
        //  !set.contains(Integer.parseInt(k)) && v.compareTo(expire_time) > 0
        v.compareTo(expire_time) > 0
      ).asJava

    old_json.putAll(new_json.asInstanceOf[java.util.Map[String, String]])
    NewAudienceInfo(devId, update_time, old_json.toString, "")
  }
}