1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package mobvista.dmp.datasource.rtdmp
import com.alibaba.fastjson.JSONObject
import mobvista.dmp.common.MobvistaConstant
import mobvista.dmp.datasource.rtdmp.Constant.{AudienceInfo, NewAudienceInfo}
/**
* @package: mobvista.dmp.datasource.rtdmp
* @author: wangjf
* @date: 2020/8/4
* @time: 4:00 下午
* @email: jinfeng.wang@mobvista.com
* @phone: 152-1062-7698
*/
class CustomIteratorAudienceInfoV2(audinceInfos: Iterator[AudienceInfo], update_time: String, expire_time: String, set: java.util.Set[Integer])
extends Iterator[NewAudienceInfo] {
override def hasNext: Boolean = {
audinceInfos.hasNext
}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
override def next(): Constant.NewAudienceInfo = {
val audienceInfo = audinceInfos.next()
val devId = audienceInfo.devid
// val update_time = audienceInfo.update_time
val audience_data = audienceInfo.audience_data
val old_audience_data = audienceInfo.old_audience_data
val new_json = new JSONObject()
audience_data.split(",", -1).foreach(k => {
if (Integer.parseInt(k) > 0) {
new_json.put(k, update_time)
}
})
// 对旧的 audience_data 数据进行过滤,筛选出满足条件(audienceId 对应的 update_date 没有过期)的 audienceId 映射关系
val old_json = MobvistaConstant.String2JSONObject(old_audience_data)
.asInstanceOf[java.util.Map[String, String]]
.retain((k, v) =>
// !set.contains(Integer.parseInt(k)) && v.compareTo(expire_time) > 0
v.compareTo(expire_time) > 0
).asJava
old_json.putAll(new_json.asInstanceOf[java.util.Map[String, String]])
NewAudienceInfo(devId, update_time, old_json.toString, "")
}
}