Commit 3ef225e1 by WangJinfeng

fix rtdmp

parent 953771b9
package mobvista.dmp.datasource.rtdmp
import com.alibaba.fastjson.JSONObject
import com.alibaba.fastjson.{JSON, JSONObject}
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.rtdmp.Constant.AudienceMerge
import org.apache.commons.cli.{BasicParser, Options}
......@@ -89,12 +89,12 @@ class RTDmpMain extends CommonSparkJob with Serializable {
val opt1 = t._2._1
val opt2 = t._2._2
if (opt1.nonEmpty && opt2.nonEmpty) {
val new_audience = MobvistaConstant.String2JSONObject(opt1.get._1)
val new_audience = JSON.parseObject(opt1.get._1).asInstanceOf[java.util.Map[String, String]]
val old_audience = opt2.get._1
val retain_old_audience = MobvistaConstant.String2JSONObject(old_audience).asInstanceOf[java.util.Map[String, String]].asScala
val retain_old_audience = JSON.parseObject(old_audience).asInstanceOf[java.util.Map[String, String]].asScala
.retain((k, v) => !new_audience.keySet().contains(k) && v.compareTo(expire_time) > 0)
new_audience.putAll(retain_old_audience.asJava)
AudienceMerge(devid, new_audience.toJSONString, datetime, opt1.get._2)
AudienceMerge(devid, new ObjectMapper().writeValueAsString(new_audience), datetime, opt1.get._2)
} else if (opt1.nonEmpty && opt2.isEmpty) {
AudienceMerge(devid, opt1.get._1, datetime, opt1.get._2)
} else {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment