Commit c89c29dc by WangJinfeng

fix rtdmp

parent 19af5d6f
......@@ -90,9 +90,9 @@ class RTDmpMain extends CommonSparkJob with Serializable {
val opt1 = t._2._1
val opt2 = t._2._2
if (opt1.nonEmpty && opt2.nonEmpty) {
val new_audience = JSON.parseObject(opt1.get._1).asInstanceOf[java.util.Map[String, String]]
val new_audience = MobvistaConstant.String2JSONObject(opt1.get._1).asInstanceOf[java.util.Map[String, String]]
val old_audience = opt2.get._1
val retain_old_audience = JSON.parseObject(old_audience).asInstanceOf[java.util.Map[String, String]].asScala
val retain_old_audience = MobvistaConstant.String2JSONObject(old_audience).asInstanceOf[java.util.Map[String, String]].asScala
.retain((k, v) => !new_audience.keySet().contains(k) && v.compareTo(expire_time) > 0)
new_audience.putAll(retain_old_audience.asJava)
AudienceMerge(devid, new ObjectMapper().writeValueAsString(new_audience), datetime, opt1.get._2)
......@@ -104,6 +104,8 @@ class RTDmpMain extends CommonSparkJob with Serializable {
.retain((_, v) => v.compareTo(expire_time) > 0)
AudienceMerge(devid, new ObjectMapper().writeValueAsString(retain_old_audience.asJava), opt2.get._2, opt2.get._3)
}
}).filter(o => {
!MobvistaConstant.String2JSONObject(o.audience_map).isEmpty
})
df.toDF
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment