Commit 8dbadaec by WangJinfeng

update rtdmp

parent 03189e98
type=command
dependencies=rtdmp_estimate
dependencies=rtdmp_as_v2
command=echo 'rtdmp success!'
\ No newline at end of file
package mobvista.dmp.datasource.rtdmp
import com.alibaba.fastjson.{JSONArray, JSONObject}
import com.alibaba.fastjson.JSONObject
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.format.TextMultipleOutputFormat
import mobvista.dmp.util.DateUtil
......@@ -87,90 +87,6 @@ class RTDmpASV2 extends CommonSparkJob with Serializable {
val rdd = spark.read.orc(input_data).rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
/*
val data_v2_output = output + "/data_v2"
FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(data_v2_output), true)
// val foractivationIdSet = Logic.getForactivationIdSet(1)
// 2020-12-30 14:28:58,RTDmp 数据新增人群包更新时间
// 运算人群包不进行产出。
rdd.map(row => {
val array = new ArrayBuffer[(Text, Text)]()
val jsonObject = new JSONObject()
// case class NewAudienceInfo(devid: String, update_time: String, audience_data: String)
val deviceId = row.getAs[String]("devid")
val audienceData = MobvistaConstant.String2JSONObject(row.getAs[String]("audience_data"))
val audienceMap = new mutable.HashMap[String, Long]()
val audience_id = new mutable.HashSet[Integer]()
for (entry: java.util.Map.Entry[String, Object] <- audienceData.entrySet) {
audience_id.add(entry.getKey.toInt)
audienceMap.put(entry.getKey, DateUtil.parse(entry.getValue.asInstanceOf[String] + ":59:59", "yyyy-MM-dd HH:mm:ss").getTime / 1000 - 28800)
}
/*
val audience_info = row.getAs[String]("audience_info")
val jsonObject = MobvistaConstant.String2JSONObject(audience_info)
import scala.collection.JavaConversions._
val audience_id = JSON.parseArray(jsonObject.getJSONArray("audience_id").toJSONString, classOf[Integer]).toSet
val audienceIds = (audience_id.asJava -- falseId) & trueId
*/
val audienceIds = (audience_id.asJava -- falseId) & trueId
val audienceMaps = audienceMap.retain((k, _) => !falseId.contains(Integer.parseInt(k)) && trueId.contains(Integer.parseInt(k)))
if (audienceIds.nonEmpty) {
jsonObject.put("devid", deviceId)
// jsonObject.put("audience_id", audienceIds.asJava)
jsonObject.put("audience_id_v2", audienceMaps.asJava)
// 2020-11-20 14:43:31 移除按 region 输出逻辑
/*
val regionSet = row.getAs("region").asInstanceOf[mutable.WrappedArray[String]]
for (region <- regionSet) {
if (StringUtils.isNotBlank(region) && !region.equalsIgnoreCase("null")) {
// 淘宝拉活输出路径
if ((Constant.foractivationIdSet & audienceIds & update_ids).nonEmpty) {
array.add((new Text(s"$data_output/foractivation/$region, "), new Text(jsonObject.toJSONString)))
}
// adx 输出路径
if ((Constant.adxIdSet & audienceIds & update_ids).nonEmpty) {
array.add((new Text(s"$data_output/adx/$region, "), new Text(jsonObject.toJSONString)))
}
// 其他人群包默认输出路径
if ((Constant.foractivationIdSet & audienceIds & update_ids).isEmpty && (Constant.adxIdSet & audienceIds & update_ids).isEmpty) {
array.add((new Text(s"$data_output/normal/$region, "), new Text(jsonObject.toJSONString)))
}
}
}
*/
// 淘宝拉活输出路径
if ((audienceIds & update_ids).nonEmpty)
array.add((new Text(s"$data_v2_output/foractivation, "), new Text(jsonObject.toJSONString)))
// adx 输出路径
if ((Constant.adxIdSet & audienceIds & update_ids).nonEmpty)
array.add((new Text(s"$data_v2_output/adx, "), new Text(jsonObject.toJSONString)))
// 其他人群包默认输出路径
if ((audienceIds & update_ids).isEmpty)
array.add((new Text(s"$data_v2_output/normal, "), new Text(jsonObject.toJSONString)))
/**
* // 淘宝拉活输出路径
* if ((foractivationIdSet & audienceIds & update_ids).nonEmpty)
* array.add((new Text(s"${data_output}/foractivation, "), new Text(jsonObject.toJSONString)))
* // adx 输出路径
* if ((Constant.adxIdSet & audienceIds & update_ids).nonEmpty)
* array.add((new Text(s"${data_output}/adx, "), new Text(jsonObject.toJSONString)))
* // 其他人群包默认输出路径
* if ((foractivationIdSet & audienceIds & update_ids).isEmpty && (Constant.adxIdSet & audienceIds & update_ids).isEmpty)
* array.add((new Text(s"${data_output}/normal, "), new Text(jsonObject.toJSONString)))
*/
}
array.iterator
}).flatMap(l => l)
.repartition(coalesce.toInt)
.saveAsNewAPIHadoopFile(data_v2_output, classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat])
*/
val data_output = output + "/data"
FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(data_output), true)
......@@ -222,16 +138,6 @@ class RTDmpASV2 extends CommonSparkJob with Serializable {
array.add((audienceId, 1))
}
})
/*
val audienceInfo = MobvistaConstant.String2JSONObject(row.getAs[String]("audience_info"))
audienceInfo.getJSONArray("audience_id").iterator().foreach(k => {
val audienceId = Integer.parseInt(k.toString)
if (update_ids.contains(audienceId)) {
array.add((audienceId, 1))
}
})
*/
array.iterator
}).flatMap(l => l)
.combineByKey(
......@@ -244,20 +150,6 @@ class RTDmpASV2 extends CommonSparkJob with Serializable {
FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(audience_output), true)
audienceSum.coalesce(1).saveAsTextFile(audience_output)
val jsonArray = new JSONArray()
audienceSum.collect().foreach(m => {
val jsonObject = new JSONObject()
jsonObject.put("id", m._1)
jsonObject.put("audience_data_status", 2)
jsonObject.put("audience_count", m._2)
jsonArray.add(jsonObject)
})
val jsonObject = ServerUtil.update(jsonArray)
if (jsonObject.getInteger("code") == 200) {
println("Audience Update OK!")
}
} finally {
if (sc != null) {
sc.stop()
......
......@@ -51,6 +51,7 @@ class RTDmpMergeCK extends CommonSparkJob with Serializable {
spark.udf.register("process", process _)
val df = spark.sql(sql.replace("@dt", date_time))
.filter("size(audience_id) > 0")
implicit val clickhouseDataSource: ClickHouseDataSource = ClickHouseConnectionFactory.get(host)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment