Commit 14b970fb by WangJinfeng

fix dmp bug

parent 13331718
...@@ -64,7 +64,7 @@ spark-submit --class mobvista.dmp.datasource.dm.ActiveTag \ ...@@ -64,7 +64,7 @@ spark-submit --class mobvista.dmp.datasource.dm.ActiveTag \
--conf spark.sql.files.maxPartitionBytes=268435456 \ --conf spark.sql.files.maxPartitionBytes=268435456 \
--conf spark.sql.adaptive.enabled=true \ --conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=268435456 \ --conf spark.sql.adaptive.advisoryPartitionSizeInBytes=268435456 \
--master yarn --deploy-mode cluster --executor-memory 18g --driver-memory 4g --executor-cores 5 --num-executors 60 \ --master yarn --deploy-mode cluster --executor-memory 12g --driver-memory 4g --executor-cores 5 --num-executors 60 \
../${JAR} \ ../${JAR} \
-date ${date} -output ${OUTPUT_PATH} -coalesce 1000 -days 29 -date ${date} -output ${OUTPUT_PATH} -coalesce 1000 -days 29
......
...@@ -66,7 +66,7 @@ spark-submit --class mobvista.dmp.datasource.dm.ActiveTag \ ...@@ -66,7 +66,7 @@ spark-submit --class mobvista.dmp.datasource.dm.ActiveTag \
--conf spark.sql.files.maxPartitionBytes=268435456 \ --conf spark.sql.files.maxPartitionBytes=268435456 \
--conf spark.sql.adaptive.enabled=true \ --conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=268435456 \ --conf spark.sql.adaptive.advisoryPartitionSizeInBytes=268435456 \
--master yarn --deploy-mode cluster --executor-memory 18g --driver-memory 4g --executor-cores 5 --num-executors 40 \ --master yarn --deploy-mode cluster --executor-memory 12g --driver-memory 4g --executor-cores 5 --num-executors 60 \
../${JAR} \ ../${JAR} \
-date ${date} -output ${OUTPUT_PATH} -coalesce 1000 -days 6 -date ${date} -output ${OUTPUT_PATH} -coalesce 1000 -days 6
......
# !/bin/sh #!/bin/sh
source ../dmp_env.sh source ../dmp_env.sh
## date=$1 ## date=$1
## deal_time=""; ## deal_time="";
...@@ -26,7 +26,6 @@ deal_time=$(date -d "$ScheduleTime 1 days ago" +"%Y%m%d") ...@@ -26,7 +26,6 @@ deal_time=$(date -d "$ScheduleTime 1 days ago" +"%Y%m%d")
check_await "${GA_DAILY_PATH}/$date_path/_SUCCESS" check_await "${GA_DAILY_PATH}/$date_path/_SUCCESS"
echo "sdsdsds"
echo "$deal_time" echo "$deal_time"
spark-submit --class mobvista.dmp.datasource.event_tag.Ga_purchase_event \ spark-submit --class mobvista.dmp.datasource.event_tag.Ga_purchase_event \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
......
...@@ -26,21 +26,21 @@ sleep 30 ...@@ -26,21 +26,21 @@ sleep 30
output_path="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/dm_user_info/${date_path}" output_path="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/dm_user_info/${date_path}"
unmount_output_path="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/dm_user_info/${unmount_date_path}" unmount_output_path="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/dm_user_info/${unmount_date_path}"
# export SPARK_HOME="/data/hadoop-home/engineplus-k8s-spark-3.0.0-hadoop3.2" export SPARK_HOME="/data/hadoop-home/engineplus-k8s-spark-3.0.0-hadoop3.2"
# export SPARK_CONF_DIR="/data/hadoop-config/command-home/engineplus-k8s-spark-3.0.0-online/conf" export SPARK_CONF_DIR="/data/hadoop-config/command-home/engineplus-k8s-spark-3.0.0-online/conf"
spark-submit --class mobvista.dmp.datasource.retargeting.DeviceInfoJob \ spark-submit --class mobvista.dmp.datasource.retargeting.DeviceInfoJob \
--name "DeviceInfoJob.wangjf.${date}" \ --name "DeviceInfoJob.wangjf.${date}" \
--conf spark.sql.shuffle.partitions=6000 \ --conf spark.sql.broadcastTimeout=1200 \
--conf spark.default.parallelism=6000 \ --conf spark.sql.shuffle.partitions=10000 \
--conf spark.default.parallelism=10000 \
--conf spark.kryoserializer.buffer.max=512m \ --conf spark.kryoserializer.buffer.max=512m \
--conf spark.kryoserializer.buffer=64m \ --conf spark.kryoserializer.buffer=64m \
--conf spark.sql.files.maxPartitionBytes=536870912 \ --conf spark.sql.files.maxPartitionBytes=536870912 \
--conf spark.sql.autoBroadcastJoinThreshold=-1 \
--conf spark.sql.adaptive.enabled=true \ --conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=536870912 \ --conf spark.sql.adaptive.advisoryPartitionSizeInBytes=536870912 \
--master yarn --deploy-mode cluster --executor-memory 12g --driver-memory 10g --executor-cores 4 --num-executors 180 \ --master yarn --deploy-mode cluster --executor-memory 12g --driver-memory 10g --executor-cores 4 --num-executors 100 \
../${JAR} \ ../${JAR} \
-date ${date} -output ${output_path} -coalesce 3000 -date ${date} -output ${output_path} -coalesce 3000
......
package mobvista.dmp.datasource.age_gender package mobvista.dmp.datasource.age_gender
import java.math.BigDecimal
import java.text.SimpleDateFormat
import java.util
import java.util.Random
import java.util.regex.Pattern
import com.alibaba.fastjson.{JSON, JSONObject} import com.alibaba.fastjson.{JSON, JSONObject}
import com.google.common.collect.Sets import com.google.common.collect.Sets
import mobvista.dmp.datasource.age.mapreduce.Util import mobvista.dmp.datasource.age.mapreduce.Util
...@@ -16,13 +10,18 @@ import org.apache.spark.sql.Row ...@@ -16,13 +10,18 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.codehaus.jackson.map.ObjectMapper import org.codehaus.jackson.map.ObjectMapper
import java.math.BigDecimal
import java.text.SimpleDateFormat
import java.util
import java.util.Random
import java.util.regex.Pattern
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
/** /**
* @package: mobvista.dmp.datasource.age * @package: mobvista.dmp.datasource.age
* @author: wangjf * @author: wangjf
* @create: 2018-09-13 16:01 * @create: 2018-09-13 16:01
**/ * */
object Logic { object Logic {
private val wellSplit = Pattern.compile("#") private val wellSplit = Pattern.compile("#")
...@@ -31,6 +30,7 @@ object Logic { ...@@ -31,6 +30,7 @@ object Logic {
private val lineSplit: Pattern = Pattern.compile("-") private val lineSplit: Pattern = Pattern.compile("-")
private val `match`: Pattern = Pattern.compile("^0*-0*-0*-0*-0*$") private val `match`: Pattern = Pattern.compile("^0*-0*-0*-0*-0*$")
private val regex: Pattern = Pattern.compile("""^\d+$""") private val regex: Pattern = Pattern.compile("""^\d+$""")
private val ageRegex: Pattern = Pattern.compile("""\d{4}$""")
private val matchingAgeSet: util.HashSet[String] = Sets.newHashSet("", "0", "1970", "GB", "null", "-") private val matchingAgeSet: util.HashSet[String] = Sets.newHashSet("", "0", "1970", "GB", "null", "-")
private val matchingGenderSet: util.HashSet[String] = Sets.newHashSet("f", "m") private val matchingGenderSet: util.HashSet[String] = Sets.newHashSet("f", "m")
...@@ -516,7 +516,7 @@ object Logic { ...@@ -516,7 +516,7 @@ object Logic {
} }
def check_birthday(now: Int, birthday: String): Boolean = { def check_birthday(now: Int, birthday: String): Boolean = {
StringUtils.isNotBlank(birthday) && !matchingAgeSet.contains(birthday) && regex.matcher(birthday).matches() && (now - Integer.parseInt(birthday)) > 0 && StringUtils.isNotBlank(birthday) && !matchingAgeSet.contains(birthday) && ageRegex.matcher(birthday).matches() && (now - Integer.parseInt(birthday)) > 0 &&
(now - Integer.parseInt(birthday)) < 100 (now - Integer.parseInt(birthday)) < 100
} }
......
...@@ -14,8 +14,8 @@ object Ga_purchase_event { ...@@ -14,8 +14,8 @@ object Ga_purchase_event {
val spark = SparkSession.builder() val spark = SparkSession.builder()
.enableHiveSupport() .enableHiveSupport()
.getOrCreate() .getOrCreate()
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); // spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
spark.conf.set("spark.kryoserializer.buffer.max", "300m") // spark.conf.set("spark.kryoserializer.buffer.max", "300m")
//yyyyMMdd //yyyyMMdd
val loadTime = spark.conf.get("spark.app.loadTime") val loadTime = spark.conf.get("spark.app.loadTime")
var year = loadTime.substring(0, 4) var year = loadTime.substring(0, 4)
...@@ -25,7 +25,7 @@ object Ga_purchase_event { ...@@ -25,7 +25,7 @@ object Ga_purchase_event {
val table_name = spark.conf.get("spark.app.table") val table_name = spark.conf.get("spark.app.table")
val db_name = spark.conf.get("spark.app.db_name") val db_name = spark.conf.get("spark.app.db_name")
val outputPath = "s3://mob-emr-test/dataplatform/DataWareHouse/data/" + db_name + "/" + table_name val outputPath = "s3://mob-emr-test/dataplatform/DataWareHouse/data/" + db_name + "/" + table_name
spark.conf.set("spark.kryoserializer.buffer.max", "300m") // spark.conf.set("spark.kryoserializer.buffer.max", "300m")
//***parquet** //***parquet**
spark.sparkContext.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2") spark.sparkContext.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
spark.sparkContext.hadoopConfiguration.set("yarn.nodemanager.pmem-check-enabled", "false") spark.sparkContext.hadoopConfiguration.set("yarn.nodemanager.pmem-check-enabled", "false")
......
...@@ -9,6 +9,7 @@ import mobvista.prd.datasource.util.GsonUtil ...@@ -9,6 +9,7 @@ import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.{SaveMode, SparkSession}
import java.net.URI import java.net.URI
...@@ -38,8 +39,8 @@ class DeviceInfoJob extends CommonSparkJob with Serializable { ...@@ -38,8 +39,8 @@ class DeviceInfoJob extends CommonSparkJob with Serializable {
val sdf1 = new SimpleDateFormat("yyyy-MM-dd") val sdf1 = new SimpleDateFormat("yyyy-MM-dd")
val sdf2 = new SimpleDateFormat("yyyyMMdd") val sdf2 = new SimpleDateFormat("yyyyMMdd")
var bMap: scala.collection.Map[String, String] = new mutable.HashMap[String, String]() var bMap: Broadcast[scala.collection.Map[String, String]] = null
var packageMap: scala.collection.Map[String, Int] = new mutable.HashMap[String, Int]() var packageMap: Broadcast[scala.collection.Map[String, Int]] = null
override protected def run(args: Array[String]): Int = { override protected def run(args: Array[String]): Int = {
val parser = new BasicParser() val parser = new BasicParser()
...@@ -56,6 +57,7 @@ class DeviceInfoJob extends CommonSparkJob with Serializable { ...@@ -56,6 +57,7 @@ class DeviceInfoJob extends CommonSparkJob with Serializable {
.config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.orc.filterPushdown", "true")
.config("spark.io.compression.codec", "lz4") .config("spark.io.compression.codec", "lz4")
.config("spark.io.compression.lz4.blockSize", "64k") .config("spark.io.compression.lz4.blockSize", "64k")
.config("spark.sql.autoBroadcastJoinThreshold", "314572800")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport() .enableHiveSupport()
...@@ -69,15 +71,17 @@ class DeviceInfoJob extends CommonSparkJob with Serializable { ...@@ -69,15 +71,17 @@ class DeviceInfoJob extends CommonSparkJob with Serializable {
val sc = spark.sparkContext val sc = spark.sparkContext
val code_sql = Constant.old2new_sql val code_sql = Constant.old2new_sql
bMap = spark.sql(code_sql).rdd.cache() bMap = sc.broadcast(spark.sql(code_sql).rdd.map(r => {
.map(r => { (r.getAs("tag_code").toString, r.getAs("new_second_id").toString)
(r.getAs("tag_code").toString, r.getAs("new_second_id").toString) }).collectAsMap())
}).collectAsMap()
println("bMap.size ===>>> " + bMap.value.size)
val map = spark.sql(Constant.second2first_sql).rdd.cache() val map = sc.broadcast(spark.sql(Constant.second2first_sql).rdd.map(r => {
.map(r => { (r.getAs("new_second_id").toString, r.getAs("new_first_id").toString)
(r.getAs("new_second_id").toString, r.getAs("new_first_id").toString) }).collectAsMap())
}).collectAsMap()
println("map.size ===>>> " + map.value.size)
var package_sql = var package_sql =
""" """
...@@ -89,10 +93,9 @@ class DeviceInfoJob extends CommonSparkJob with Serializable { ...@@ -89,10 +93,9 @@ class DeviceInfoJob extends CommonSparkJob with Serializable {
s""" s"""
|SELECT id, package_name FROM dwh.package_mapping WHERE dt = '${package_dt}' |SELECT id, package_name FROM dwh.package_mapping WHERE dt = '${package_dt}'
""".stripMargin """.stripMargin
packageMap = spark.sql(package_sql).rdd.cache() packageMap = spark.sparkContext.broadcast(spark.sql(package_sql).rdd.map(r => {
.map(r => { (r.getAs("package_name").toString.toLowerCase, Integer.parseInt(r.getAs("id").toString))
(r.getAs("package_name").toString.toLowerCase, Integer.parseInt(r.getAs("id").toString)) }).collectAsMap())
}).collectAsMap()
/* /*
packageMap = sc.broadcast(Constant.jdbcConnection(spark, "mob_adn", "dmp_app_map").rdd.map(r => { packageMap = sc.broadcast(Constant.jdbcConnection(spark, "mob_adn", "dmp_app_map").rdd.map(r => {
...@@ -143,7 +146,7 @@ class DeviceInfoJob extends CommonSparkJob with Serializable { ...@@ -143,7 +146,7 @@ class DeviceInfoJob extends CommonSparkJob with Serializable {
new JSONObject() new JSONObject()
} }
freObject.keySet().foreach(key => { freObject.keySet().foreach(key => {
interest.add(map(key)) interest.add(map.value(key))
interest.add(key) interest.add(key)
}) })
/* /*
...@@ -161,7 +164,7 @@ class DeviceInfoJob extends CommonSparkJob with Serializable { ...@@ -161,7 +164,7 @@ class DeviceInfoJob extends CommonSparkJob with Serializable {
*/ */
val interestArr = r.getAs("interest").asInstanceOf[mutable.WrappedArray[String]] val interestArr = r.getAs("interest").asInstanceOf[mutable.WrappedArray[String]]
interestArr.foreach(i => { interestArr.foreach(i => {
interest.add(map(i)) interest.add(map.value(i))
interest.add(i) interest.add(i)
}) })
...@@ -177,7 +180,7 @@ class DeviceInfoJob extends CommonSparkJob with Serializable { ...@@ -177,7 +180,7 @@ class DeviceInfoJob extends CommonSparkJob with Serializable {
val count = j.get("count").getAsInt val count = j.get("count").getAsInt
cntJson.put("count", count) cntJson.put("count", count)
tag_week_jsonObject.put(tag_id, cntJson) tag_week_jsonObject.put(tag_id, cntJson)
interest.add(map(tag_id)) interest.add(map.value(tag_id))
interest.add(tag_id) interest.add(tag_id)
} }
} }
...@@ -193,7 +196,7 @@ class DeviceInfoJob extends CommonSparkJob with Serializable { ...@@ -193,7 +196,7 @@ class DeviceInfoJob extends CommonSparkJob with Serializable {
val count = j.get("count").getAsInt val count = j.get("count").getAsInt
cntJson.put("count", count) cntJson.put("count", count)
tag_month_jsonObject.put(tag_id, cntJson) tag_month_jsonObject.put(tag_id, cntJson)
interest.add(map(tag_id)) interest.add(map.value(tag_id))
interest.add(tag_id) interest.add(tag_id)
} }
} }
...@@ -224,10 +227,10 @@ class DeviceInfoJob extends CommonSparkJob with Serializable { ...@@ -224,10 +227,10 @@ class DeviceInfoJob extends CommonSparkJob with Serializable {
val ins = inters.toUpperCase.split(",") val ins = inters.toUpperCase.split(",")
if (ins.length >= 3) { if (ins.length >= 3) {
val key = ins(0) + "-" + ins(1) + "-" + ins(2) val key = ins(0) + "-" + ins(1) + "-" + ins(2)
val vals = if (bMap.keySet.contains(key)) { val vals = if (bMap.value.keySet.contains(key)) {
bMap(key) bMap.value(key)
} else { } else {
bMap.getOrElse(key + "OTHER", "") bMap.value.getOrElse(key + "OTHER", "")
} }
if (StringUtils.isNotBlank(vals)) { if (StringUtils.isNotBlank(vals)) {
set.add(vals) set.add(vals)
...@@ -238,10 +241,10 @@ class DeviceInfoJob extends CommonSparkJob with Serializable { ...@@ -238,10 +241,10 @@ class DeviceInfoJob extends CommonSparkJob with Serializable {
} }
def getId(tag_code: String): String = { def getId(tag_code: String): String = {
val id = if (bMap.keySet.contains(tag_code.toUpperCase)) { val id = if (bMap.value.keySet.contains(tag_code.toUpperCase)) {
bMap(tag_code.toUpperCase) bMap.value(tag_code.toUpperCase)
} else { } else {
bMap.getOrElse(tag_code.toUpperCase + "OTHER", "") bMap.value.getOrElse(tag_code.toUpperCase + "OTHER", "")
} }
id id
} }
...@@ -252,9 +255,9 @@ class DeviceInfoJob extends CommonSparkJob with Serializable { ...@@ -252,9 +255,9 @@ class DeviceInfoJob extends CommonSparkJob with Serializable {
if (StringUtils.isNotBlank(install)) { if (StringUtils.isNotBlank(install)) {
install.split(",").foreach(pkgs => { install.split(",").foreach(pkgs => {
val pkd = pkgs.split("\\|") val pkd = pkgs.split("\\|")
if (pkd.nonEmpty && StringUtils.isNotBlank(pkd(0)) && packageMap.contains(pkd(0).toLowerCase) if (pkd.nonEmpty && StringUtils.isNotBlank(pkd(0)) && packageMap.value.contains(pkd(0).toLowerCase)
) { ) {
set.add(packageMap(pkd(0).toLowerCase)) set.add(packageMap.value(pkd(0).toLowerCase))
} }
}) })
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment