CustomInteratorList.scala 1.85 KB
package mobvista.dmp.common

import com.alibaba.fastjson.JSON
import mobvista.dmp.util.MRUtils

/**
  * @package: mobvista.dmp.common
  * @author: wangjf
  * @date: 2020/4/1
  * @time: 2:49 下午
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
class CustomInteratorList(dateTime: String, iter: Iterator[(String, (Option[String], Option[String]))]) extends Iterator[DmpInstallList] {
  def hasNext: Boolean = {
    iter.hasNext
  }

  import scala.collection.JavaConversions._
  import scala.collection.JavaConverters._

  def next: DmpInstallList = {
    val tuple = iter.next
    val key = tuple._1
    val valTuple = tuple._2
    val dailyOpt = valTuple._1
    val totalOpt = valTuple._2
    var pkgs = ""
    var updateDate = ""
    if (dailyOpt.isEmpty && totalOpt.isDefined) {
      val installListDate = MRUtils.SPLITTER.split(totalOpt.get)
      updateDate = installListDate(1)
      pkgs = installListDate(0)
    } else if (dailyOpt.isDefined && totalOpt.isEmpty) {
      updateDate = dateTime
      pkgs = dailyOpt.get
    } else if (dailyOpt.isDefined && totalOpt.isDefined) {
      updateDate = dateTime
      //  优先加入最近活跃的 package_name
      val installJson = JSON.parseObject(dailyOpt.get)
      //  删除过期的安装信息,并对安装时间进行排序,优先入库最近活跃的安装包
      val installListDate = MRUtils.SPLITTER.split(totalOpt.get)
      val installMap = JSON.parse(installListDate(0)).asInstanceOf[java.util.Map[String, String]].asScala
      installMap.retain((k, _) => !installJson.contains(k)).toList.sortWith(_._2 > _._2).foreach(kv => {
        if (installJson.size() < 1000) {
          installJson.put(kv._1, kv._2)
        }
      })

      pkgs = installJson.toJSONString
    }
    val keys = MRUtils.SPLITTER.split(key)
    DmpInstallList(keys(0), keys(1), keys(2), "", pkgs, "", updateDate)
  }
}