package mobvista.dmp.datasource.dm import com.alibaba.fastjson.{JSON, JSONArray, JSONObject} import com.fasterxml.jackson.databind.ObjectMapper import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.Row /** * @package: mobvista.dmp.datasource.dm * @author: wangjf * @date: 2020/3/18 * @time: 5:11 下午 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class CustomIterator(iter: Iterator[Row], mapper: ObjectMapper, oldMap: Broadcast[scala.collection.Map[String, String]], newMap: Broadcast[scala.collection.Map[String, String]]) extends Iterator[DmInterestTag] { def hasNext: Boolean = { iter.hasNext } def next: DmInterestTag = { val row = iter.next val device_id = row.getAs("device_id").toString val device_type = row.getAs("device_type").toString val platform = row.getAs("platform").toString import scala.collection.JavaConverters._ val install_list = JSON.parse(row.getAs("install_list").toString).asInstanceOf[java.util.Map[String, String]].asScala val jsonArray = new JSONArray() install_list.foreach(install => { val json = new JSONObject() var package_name = install._1 val install_date = install._2 if (package_name.matches("^id\\\\d+$")) { package_name = package_name.replace("id", "") } json.put("package_name", package_name) json.put("date", install_date) if (oldMap.value.keySet.contains(package_name)) { val oldTags = oldMap.value(package_name) json.put("tag", oldTags) } if (newMap.value.keySet.contains(package_name + "-" + platform)) { val newTags = newMap.value(package_name + "-" + platform) json.put("tag_new", newTags) } if (json.containsKey("tag") || json.containsKey("tag_new")) { jsonArray.add(json) } }) if (jsonArray.size() > 0) { DmInterestTag(device_id, device_type, platform, jsonArray.toString) } else { null } } }