Commit 986ad516 by WangJinfeng

fix rtdmp

parent c89c29dc
package mobvista.dmp.datasource.rtdmp
import com.alibaba.fastjson.{JSON, JSONObject}
import com.alibaba.fastjson.JSONObject
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.rtdmp.Constant.AudienceMerge
import org.apache.commons.cli.{BasicParser, Options}
......@@ -85,26 +85,29 @@ class RTDmpMain extends CommonSparkJob with Serializable {
import spark.implicits._
val df = hour_rdd.fullOuterJoin(merge_rdd).map(t => {
val devid = t._1
val opt1 = t._2._1
val opt2 = t._2._2
if (opt1.nonEmpty && opt2.nonEmpty) {
val new_audience = MobvistaConstant.String2JSONObject(opt1.get._1).asInstanceOf[java.util.Map[String, String]]
val old_audience = opt2.get._1
val retain_old_audience = MobvistaConstant.String2JSONObject(old_audience).asInstanceOf[java.util.Map[String, String]].asScala
.retain((k, v) => !new_audience.keySet().contains(k) && v.compareTo(expire_time) > 0)
new_audience.putAll(retain_old_audience.asJava)
AudienceMerge(devid, new ObjectMapper().writeValueAsString(new_audience), datetime, opt1.get._2)
} else if (opt1.nonEmpty && opt2.isEmpty) {
AudienceMerge(devid, opt1.get._1, datetime, opt1.get._2)
} else {
val old_audience = opt2.get._1
val retain_old_audience = MobvistaConstant.String2JSONObject(old_audience).asInstanceOf[java.util.Map[String, String]].asScala
.retain((_, v) => v.compareTo(expire_time) > 0)
AudienceMerge(devid, new ObjectMapper().writeValueAsString(retain_old_audience.asJava), opt2.get._2, opt2.get._3)
}
}).filter(o => {
val df = hour_rdd.fullOuterJoin(merge_rdd)
.mapPartitions(ts => {
ts.map(t => {
val devid = t._1
val opt1 = t._2._1
val opt2 = t._2._2
if (opt1.nonEmpty && opt2.nonEmpty) {
val new_audience = MobvistaConstant.String2JSONObject(opt1.get._1).asInstanceOf[java.util.Map[String, String]]
val old_audience = opt2.get._1
val retain_old_audience = MobvistaConstant.String2JSONObject(old_audience).asInstanceOf[java.util.Map[String, String]].asScala
.retain((k, v) => !new_audience.keySet().contains(k) && v.compareTo(expire_time) > 0)
new_audience.putAll(retain_old_audience.asJava)
AudienceMerge(devid, new ObjectMapper().writeValueAsString(new_audience), datetime, opt1.get._2)
} else if (opt1.nonEmpty && opt2.isEmpty) {
AudienceMerge(devid, opt1.get._1, datetime, opt1.get._2)
} else {
val old_audience = opt2.get._1
val retain_old_audience = MobvistaConstant.String2JSONObject(old_audience).asInstanceOf[java.util.Map[String, String]].asScala
.retain((_, v) => v.compareTo(expire_time) > 0)
AudienceMerge(devid, new ObjectMapper().writeValueAsString(retain_old_audience.asJava), opt2.get._2, opt2.get._3)
}
})
}).filter(o => {
!MobvistaConstant.String2JSONObject(o.audience_map).isEmpty
})
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment