Commit 2dc96cc2 by WangJinfeng

update RTDmpRequest filter by define update_date

parent b7a056cd
...@@ -129,4 +129,6 @@ tencent.package_name=com.tencent.news_bes,com.tencent.news_bes_7,com.tencent.new ...@@ -129,4 +129,6 @@ tencent.package_name=com.tencent.news_bes,com.tencent.news_bes_7,com.tencent.new
youku_acquisition.package_name=com.youku.foracquisition_imei,com.youku.foracquisition_oaid youku_acquisition.package_name=com.youku.foracquisition_imei,com.youku.foracquisition_oaid
rtdmp.stop.audience=433,406,405,407,390,395,389,123,238,388,1133,1134,1135 rtdmp.stop.audience=433,406,405,407,390,395,389,123,238,388,1133,1134,1135
\ No newline at end of file
define.package_name.update_date=com.sankuai.meituan_oppo:0,com.sankuai.meituan_bes:0,com.sankuai.meituan_iqiyi:0
\ No newline at end of file
...@@ -85,8 +85,8 @@ object Constant { ...@@ -85,8 +85,8 @@ object Constant {
// AND check_package(package_name) // AND check_package(package_name)
val device_sql = val device_sql =
""" """
|SELECT device_id, device_type, platform, package_name |SELECT device_id, device_type, platform, package_name, update_date
| FROM dwh.@table WHERE dt = '@dt' AND business = '@business' @check_update_date @check_package @check_hr | FROM dwh.@table WHERE dt = '@dt' AND business = '@business' @check_package @check_hr
| GROUP BY device_id, device_type, platform, package_name | GROUP BY device_id, device_type, platform, package_name
|""".stripMargin |""".stripMargin
......
...@@ -99,10 +99,8 @@ class RTDmpRequest extends CommonSparkJob with Serializable { ...@@ -99,10 +99,8 @@ class RTDmpRequest extends CommonSparkJob with Serializable {
if (!hh.equals("00")) { if (!hh.equals("00")) {
device_sql = device_sql.replace("@check_hr", s"AND hh = '$hh'") device_sql = device_sql.replace("@check_hr", s"AND hh = '$hh'")
.replace("@check_update_date", "")
} else { } else {
device_sql = device_sql.replace("@check_hr", "") device_sql = device_sql.replace("@check_hr", "")
.replace("@check_update_date", s"AND update_date = '$update_date'")
} }
println(s"package_name_set.size -->> ${package_name_set.size}") println(s"package_name_set.size -->> ${package_name_set.size}")
...@@ -116,7 +114,22 @@ class RTDmpRequest extends CommonSparkJob with Serializable { ...@@ -116,7 +114,22 @@ class RTDmpRequest extends CommonSparkJob with Serializable {
Integer.valueOf(r) Integer.valueOf(r)
}) })
val rdd = spark.sql(device_sql).rdd.map(r => { val filterMap: mutable.Map[String, Int] = new mutable.HashMap[String, Int]()
PropertyUtil.getProperty("config.properties", "define.package_name.update_date").split(",", -1)
.foreach(str => {
val kv = str.split(":", -1)
filterMap.put(kv(0), Integer.valueOf(kv(1)))
})
val rdd = spark.sql(device_sql).rdd
.filter(r => {
val package_name = r.getAs[String]("package_name")
val update_date = r.getAs[String]("update_date")
val days: Int = filterMap.getOrElse(package_name, 0)
val filterDate = DateUtil.getDay(DateUtil.parse(date, "yyyyMMdd"), "yyyy-MM-dd", -days)
update_date.compareTo(filterDate) >= 0
}).map(r => {
val device_id = r.getAs[String]("device_id") val device_id = r.getAs[String]("device_id")
var device_type = r.getAs[String]("device_type") var device_type = r.getAs[String]("device_type")
val platform = r.getAs[String]("platform") val platform = r.getAs[String]("platform")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment