Commit b8a85cf4 by WangJinfeng

m/dsp merge ext_data

parent edf2bf9d
package mobvista.dmp.common package mobvista.dmp.common
import java.net.URI
import com.alibaba.fastjson.JSON import com.alibaba.fastjson.JSON
import mobvista.dmp.common.MobvistaConstant.deviceTypeSet import mobvista.dmp.common.MobvistaConstant.deviceTypeSet
import mobvista.dmp.util.{DateUtil, MRUtils} import mobvista.dmp.util.{DateUtil, MRUtils}
...@@ -10,6 +9,8 @@ import org.apache.hadoop.fs.{FileSystem, Path} ...@@ -10,6 +9,8 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.joda.time.format.DateTimeFormat import org.joda.time.format.DateTimeFormat
import java.net.URI
import java.util
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
...@@ -38,14 +39,7 @@ abstract class CommonInstallListOrc extends CommonSparkJob with Serializable { ...@@ -38,14 +39,7 @@ abstract class CommonInstallListOrc extends CommonSparkJob with Serializable {
val dateTime = DateUtil.format(DateUtil.parse(date, "yyyyMMdd"), "yyyy-MM-dd") val dateTime = DateUtil.format(DateUtil.parse(date, "yyyyMMdd"), "yyyy-MM-dd")
val expireDate = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(dateTime).minusMonths(12).toString("yyyy-MM-dd") val expireDate = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(dateTime).minusMonths(12).toString("yyyy-MM-dd")
val spark = SparkSession val spark = MobvistaConstant.createSparkSession(s"DmpInstallList.$business.$date")
.builder()
.appName(s"DmpInstallList.$business.$date")
.config("spark.rdd.compress", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
val v2_flag = if (business.equals("adn_sdk_v2")) { val v2_flag = if (business.equals("adn_sdk_v2")) {
true true
...@@ -194,7 +188,33 @@ abstract class CommonInstallListOrc extends CommonSparkJob with Serializable { ...@@ -194,7 +188,33 @@ abstract class CommonInstallListOrc extends CommonSparkJob with Serializable {
installJson.putAll(installMap.asJava) installJson.putAll(installMap.asJava)
} }
pkgs = installJson.toString pkgs = installJson.toString
ext_data = dailyOpt.get._2 val old_ext_data_json = JSON.parseObject(array(1))
val daily_ext_data_json = JSON.parseObject(dailyOpt.get._2)
if (daily_ext_data_json.containsKey("dev_tag") && daily_ext_data_json.getInteger("dev_tag") == 1) {
old_ext_data_json.put("dev_tag", daily_ext_data_json.getInteger("dev_tag"))
}
val ext_data_list = if (old_ext_data_json.containsKey("strategy")) {
JSON.parseArray(old_ext_data_json.getString("strategy"), classOf[String])
} else {
new util.ArrayList[String]()
}
if (daily_ext_data_json.containsKey("strategy")) {
ext_data_list.addAll(JSON.parseArray(daily_ext_data_json.getString("strategy"), classOf[String]))
old_ext_data_json.put("strategy", new util.HashSet(ext_data_list))
}
val region_list = if (old_ext_data_json.containsKey("region")) {
JSON.parseArray(old_ext_data_json.getString("region"), classOf[String])
} else {
new util.ArrayList[String]()
}
if (daily_ext_data_json.containsKey("region")) {
ext_data_list.addAll(JSON.parseArray(daily_ext_data_json.getString("region"), classOf[String]))
old_ext_data_json.put("region", new util.HashSet(ext_data_list))
}
ext_data = old_ext_data_json.toJSONString
// ext_data = dailyOpt.get._2
country = if (StringUtils.isNotBlank(dailyOpt.get._3)) { country = if (StringUtils.isNotBlank(dailyOpt.get._3)) {
dailyOpt.get._3 dailyOpt.get._3
} else { } else {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment