Commit 8c7555e2 by WangJinfeng

fix dmp_install_list

parent ba8366b1
...@@ -138,7 +138,19 @@ abstract class CommonInstallListOrc extends CommonSparkJob with Serializable { ...@@ -138,7 +138,19 @@ abstract class CommonInstallListOrc extends CommonSparkJob with Serializable {
if (dailyOpt.isEmpty && totalOpt.isDefined) { if (dailyOpt.isEmpty && totalOpt.isDefined) {
val installListDate = MRUtils.SPLITTER.split(totalOpt.get, -1) val installListDate = MRUtils.SPLITTER.split(totalOpt.get, -1)
pkgs = installListDate(0) pkgs = installListDate(0)
ext_data = installListDate(1) val old_ext_data_json = JSON.parseObject(installListDate(1))
val region_list = if (old_ext_data_json.containsKey("region")) {
JSON.parseArray(old_ext_data_json.getString("region"), classOf[String])
/*
.filter(r => {
MobvistaConstant.regionSet.contains(r)
}).asJava
*/
} else {
new util.ArrayList[String]()
}
old_ext_data_json.put("region", new util.HashSet(region_list))
ext_data = old_ext_data_json.toJSONString
updateDate = installListDate(2) updateDate = installListDate(2)
country = installListDate(3) country = installListDate(3)
} else if (dailyOpt.isDefined && totalOpt.isEmpty) { } else if (dailyOpt.isDefined && totalOpt.isEmpty) {
...@@ -205,12 +217,17 @@ abstract class CommonInstallListOrc extends CommonSparkJob with Serializable { ...@@ -205,12 +217,17 @@ abstract class CommonInstallListOrc extends CommonSparkJob with Serializable {
} }
val region_list = if (old_ext_data_json.containsKey("region")) { val region_list = if (old_ext_data_json.containsKey("region")) {
JSON.parseArray(old_ext_data_json.getString("region"), classOf[String]) JSON.parseArray(old_ext_data_json.getString("region"), classOf[String])
/*
.filter(r => {
MobvistaConstant.regionSet.contains(r)
}).asJava
*/
} else { } else {
new util.ArrayList[String]() new util.ArrayList[String]()
} }
if (daily_ext_data_json.containsKey("region")) { if (daily_ext_data_json.containsKey("region")) {
ext_data_list.addAll(JSON.parseArray(daily_ext_data_json.getString("region"), classOf[String])) region_list.addAll(JSON.parseArray(daily_ext_data_json.getString("region"), classOf[String]))
old_ext_data_json.put("region", new util.HashSet(ext_data_list)) old_ext_data_json.put("region", new util.HashSet(region_list))
} }
ext_data = old_ext_data_json.toJSONString ext_data = old_ext_data_json.toJSONString
......
package mobvista.dmp.common package mobvista.dmp.common
import java.net.URI
import com.alibaba.fastjson.JSONObject import com.alibaba.fastjson.JSONObject
import mobvista.dmp.util.DateUtil import mobvista.dmp.util.DateUtil
import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.{SaveMode, SparkSession}
import java.net.URI
import scala.collection.mutable import scala.collection.mutable
/** /**
...@@ -65,10 +64,12 @@ class InstallListDailyV2 extends CommonSparkJob with Serializable { ...@@ -65,10 +64,12 @@ class InstallListDailyV2 extends CommonSparkJob with Serializable {
activeDev.createOrReplaceTempView("active_dev") activeDev.createOrReplaceTempView("active_dev")
spark.udf.register("merge", merge _) spark.udf.register("merge", merge _)
spark.udf.register("filterInstall", filterInstall _)
spark.udf.register("udf_mergeExtData", mobvista.dmp.datasource.dm.Constant.mergeExtData _) spark.udf.register("udf_mergeExtData", mobvista.dmp.datasource.dm.Constant.mergeExtData _)
sql = sql.replace("@date", date).replace("@before_date", before_date) sql = sql.replace("@date", date).replace("@before_date", before_date)
val df = spark.sql(sql).filter(
val df = spark.sql(sql) "filterInstall(install_list)"
)
df.repartition(coalesce.toInt) df.repartition(coalesce.toInt)
.write .write
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
...@@ -126,12 +127,17 @@ class InstallListDailyV2 extends CommonSparkJob with Serializable { ...@@ -126,12 +127,17 @@ class InstallListDailyV2 extends CommonSparkJob with Serializable {
val installJSONObject = new JSONObject val installJSONObject = new JSONObject
installList.iterator.foreach(install => { installList.iterator.foreach(install => {
val installMap = MobvistaConstant.String2JSONObject(install).asInstanceOf[java.util.Map[String, String]].asScala val installMap = MobvistaConstant.String2JSONObject(install).asInstanceOf[java.util.Map[String, String]].asScala
installMap.retain((k, v) => !installJSONObject.containsKey(k) || installJSONObject.getString(k).compareTo(v) < 0).foreach(kv => { installMap.retain((k, v) => (!installJSONObject.containsKey(k) || installJSONObject.getString(k).compareTo(v) < 0)
&& !k.equalsIgnoreCase("0000000000") && !k.equalsIgnoreCase("com.nonepkg.nonepkg")).foreach(kv => {
installJSONObject.put(kv._1, kv._2) installJSONObject.put(kv._1, kv._2)
}) })
}) })
installJSONObject.toJSONString installJSONObject.toJSONString
} }
def filterInstall(installList: String): Boolean = {
MobvistaConstant.String2JSONObject(installList).size() > 0
}
} }
object InstallListDailyV2 { object InstallListDailyV2 {
......
...@@ -312,6 +312,8 @@ object MobvistaConstant { ...@@ -312,6 +312,8 @@ object MobvistaConstant {
} }
} }
val regionSet: mutable.Set[String] = mutable.Set("virginia", "frankfurt", "singapore", "seoul")
val countryPtn = "^([A-Z]{2,3})$" val countryPtn = "^([A-Z]{2,3})$"
// IDFA/GAID // IDFA/GAID
......
...@@ -759,12 +759,16 @@ object Constant { ...@@ -759,12 +759,16 @@ object Constant {
val regionSet = new mutable.HashSet[String]() val regionSet = new mutable.HashSet[String]()
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
if (data_1.containsKey("region")) { if (data_1.containsKey("region")) {
data_1.getJSONArray("region").foreach(v => { data_1.getJSONArray("region").filter(r => {
StringUtils.isNotBlank(r.toString)
}).foreach(v => {
regionSet.add(v.toString) regionSet.add(v.toString)
}) })
} }
if (data_2.containsKey("region")) { if (data_2.containsKey("region")) {
data_2.getJSONArray("region").foreach(v => { data_2.getJSONArray("region").filter(r => {
StringUtils.isNotBlank(r.toString)
}).foreach(v => {
regionSet.add(v.toString) regionSet.add(v.toString)
}) })
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment