1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package mobvista.dmp.common
import com.alibaba.fastjson.JSON
import mobvista.dmp.util.MRUtils
/**
* @package: mobvista.dmp.common
* @author: wangjf
* @date: 2020/4/1
* @time: 2:49 下午
* @email: jinfeng.wang@mobvista.com
* @phone: 152-1062-7698
*/
class CustomInteratorList(dateTime: String, iter: Iterator[(String, (Option[String], Option[String]))]) extends Iterator[DmpInstallList] {
def hasNext: Boolean = {
iter.hasNext
}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
def next: DmpInstallList = {
val tuple = iter.next
val key = tuple._1
val valTuple = tuple._2
val dailyOpt = valTuple._1
val totalOpt = valTuple._2
var pkgs = ""
var updateDate = ""
if (dailyOpt.isEmpty && totalOpt.isDefined) {
val installListDate = MRUtils.SPLITTER.split(totalOpt.get)
updateDate = installListDate(1)
pkgs = installListDate(0)
} else if (dailyOpt.isDefined && totalOpt.isEmpty) {
updateDate = dateTime
pkgs = dailyOpt.get
} else if (dailyOpt.isDefined && totalOpt.isDefined) {
updateDate = dateTime
// 优先加入最近活跃的 package_name
val installJson = JSON.parseObject(dailyOpt.get)
// 删除过期的安装信息,并对安装时间进行排序,优先入库最近活跃的安装包
val installListDate = MRUtils.SPLITTER.split(totalOpt.get)
val installMap = JSON.parse(installListDate(0)).asInstanceOf[java.util.Map[String, String]].asScala
installMap.retain((k, _) => !installJson.contains(k)).toList.sortWith(_._2 > _._2).foreach(kv => {
if (installJson.size() < 1000) {
installJson.put(kv._1, kv._2)
}
})
pkgs = installJson.toJSONString
}
val keys = MRUtils.SPLITTER.split(key)
DmpInstallList(keys(0), keys(1), keys(2), "", pkgs, "", updateDate)
}
}