1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package mobvista.dmp.datasource.dm
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.Row
/**
* @package: mobvista.dmp.datasource.dm
* @author: wangjf
* @date: 2020/3/18
* @time: 5:11 下午
* @email: jinfeng.wang@mobvista.com
* @phone: 152-1062-7698
*/
class CustomIterator(iter: Iterator[Row], mapper: ObjectMapper, oldMap: Broadcast[scala.collection.Map[String, String]], newMap: Broadcast[scala.collection.Map[String, String]]) extends Iterator[DmInterestTag] {
def hasNext: Boolean = {
iter.hasNext
}
def next: DmInterestTag = {
val row = iter.next
val device_id = row.getAs("device_id").toString
val device_type = row.getAs("device_type").toString
val platform = row.getAs("platform").toString
import scala.collection.JavaConverters._
val install_list = JSON.parse(row.getAs("install_list").toString).asInstanceOf[java.util.Map[String, String]].asScala
val jsonArray = new JSONArray()
install_list.foreach(install => {
val json = new JSONObject()
var package_name = install._1
val install_date = install._2
if (package_name.matches("^id\\\\d+$")) {
package_name = package_name.replace("id", "")
}
json.put("package_name", package_name)
json.put("date", install_date)
if (oldMap.value.keySet.contains(package_name)) {
val oldTags = oldMap.value(package_name)
json.put("tag", oldTags)
}
if (newMap.value.keySet.contains(package_name + "-" + platform)) {
val newTags = newMap.value(package_name + "-" + platform)
json.put("tag_new", newTags)
}
if (json.containsKey("tag") || json.containsKey("tag_new")) {
jsonArray.add(json)
}
})
if (jsonArray.size() > 0) {
DmInterestTag(device_id, device_type, platform, jsonArray.toString)
} else {
null
}
}
}