Commit 3cebcc3f by WangJinfeng

fix event_tag,mapreduce job

parent c9ec291d
...@@ -1045,3 +1045,8 @@ export SPARK_CONF_DIR="/data/hadoop-config/command-home/engineplus-k8s-spark-3.1 ...@@ -1045,3 +1045,8 @@ export SPARK_CONF_DIR="/data/hadoop-config/command-home/engineplus-k8s-spark-3.1
export JAVA_HOME="/usr/lib/jvm/jdk1.8.0_131" export JAVA_HOME="/usr/lib/jvm/jdk1.8.0_131"
export HIVE_CONF_DIR="/data/hadoop-config/command-home/apache-hive-2.3.3-offline/conf" export HIVE_CONF_DIR="/data/hadoop-config/command-home/apache-hive-2.3.3-offline/conf"
# 避免 MapReduce 作业空跑
export HADOOP_USER_CLASSPATH_FIRST=yes
export HADOOP_CLASSPATH="${HADOOP_CLASSPATH}:/data/hadoop-alternative/hive/lib/*"
...@@ -51,24 +51,13 @@ echo "3ss_offer_event_spec.csv file success exist, and then can start" ...@@ -51,24 +51,13 @@ echo "3ss_offer_event_spec.csv file success exist, and then can start"
spark-submit --class mobvista.dmp.datasource.event_tag.Event_tag \ spark-submit --class mobvista.dmp.datasource.event_tag.Event_tag \
--master yarn --deploy-mode cluster \ --master yarn --deploy-mode cluster \
--conf spark.akka.frameSize=100 \ --conf spark.akka.frameSize=100 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --conf spark.driver.memory=4G \
--conf spark.driver.memory=4G --queue root.dataplatform \
--num-executors 40 --executor-cores 4 \ --num-executors 40 --executor-cores 4 \
--executor-memory 8G --conf spark.shuffle.file.buffer=256k \ --executor-memory 8G --conf spark.shuffle.file.buffer=256k \
--conf spark.reducer.maxSizeInFlight=100m \
--conf spark.shuffle.io.maxRetries=60 \
--conf spark.shuffle.io.retryWait=60s \
--conf spark.network.timeout=1200 \
--conf spark.shuffle.compress=true \
--conf spark.io.compression.codec=lz4 \ --conf spark.io.compression.codec=lz4 \
--conf spark.driver.maxResultSize=2024M \ --conf spark.driver.maxResultSize=2024M \
--conf spark.shuffle.spill.compress=true \
--name event_tag_daily --conf spark.app.name=event_tag_daily \ --name event_tag_daily --conf spark.app.name=event_tag_daily \
--conf spark.app.db_name=dwh --conf spark.app.table=dmp_event_tag_daily \ --conf spark.app.db_name=dwh --conf spark.app.table=dmp_event_tag_daily \
--conf spark.app.loadTime=${deal_time} \ --conf spark.app.loadTime=${deal_time} \
--conf spark.sql.shuffle.partitions=500 \ --conf spark.sql.shuffle.partitions=500 \
--conf spark.yarn.executor.memoryOverhead=3048 \
--conf spark.locality.wait=0 \
--conf spark.shuffle.io.numConnectionsPerPeer=10 \
--conf spark.shuffle.consolidateFiles=true \
../DMP.jar ../DMP.jar
...@@ -27,4 +27,13 @@ deal_time=$(date -d "$ScheduleTime 1 days ago" +"%Y%m%d") ...@@ -27,4 +27,13 @@ deal_time=$(date -d "$ScheduleTime 1 days ago" +"%Y%m%d")
echo "$deal_time" echo "$deal_time"
sleep 10s sleep 10s
spark-submit --class mobvista.dmp.datasource.event_tag.Dmp_event_source --master yarn --deploy-mode cluster --conf spark.akka.frameSize=100 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.driver.memory=2G --queue root.dataplatform --num-executors 35 --executor-cores 2 --executor-memory 5G --conf spark.shuffle.file.buffer=256k --conf spark.reducer.maxSizeInFlight=100m --conf spark.shuffle.io.maxRetries=60 --conf spark.shuffle.io.retryWait=60s --conf spark.network.timeout=1200 --conf spark.shuffle.compress=true --conf spark.io.compression.codec=lz4 --conf spark.driver.maxResultSize=2024M --conf spark.shuffle.spill.compress=true --name event_tag_source_daily --conf spark.app.loadTime=${deal_time} --conf spark.app.coalesce_num=500 --conf spark.yarn.executor.memoryOverhead=3048 --conf spark.locality.wait=0 --conf spark.shuffle.io.numConnectionsPerPeer=10 --conf spark.shuffle.consolidateFiles=true --conf spark.dynamicAllocation.enabled=false ../DMP.jar spark-submit --class mobvista.dmp.datasource.event_tag.Dmp_event_source \
--master yarn --deploy-mode cluster --conf spark.akka.frameSize=100 \
--conf spark.driver.memory=2G \
--num-executors 35 --executor-cores 2 --executor-memory 5G \
--conf spark.shuffle.file.buffer=256k \
--conf spark.shuffle.compress=true --conf spark.io.compression.codec=lz4\
--conf spark.driver.maxResultSize=2024M \
--name event_tag_source_daily --conf spark.app.loadTime=${deal_time} \
--conf spark.app.coalesce_num=500 --conf spark.yarn.executor.memoryOverhead=3048 \
../DMP.jar
...@@ -5,6 +5,8 @@ user="andy.liu" ...@@ -5,6 +5,8 @@ user="andy.liu"
map_memory="mapreduce.map.memory.mb=2048" map_memory="mapreduce.map.memory.mb=2048"
reduce_memory="mapreduce.reduce.memory.mb=3072" reduce_memory="mapreduce.reduce.memory.mb=3072"
export HIVE_CONF_DIR="/data/hadoop-config/command-home/apache-hive-2.3.3-offline/conf"
# export HIVE_CONF_DIR=/data/azkaban-hadoop/command-home/hive-offline/conf # export HIVE_CONF_DIR=/data/azkaban-hadoop/command-home/hive-offline/conf
function hive_func(){ function hive_func(){
......
package mobvista.dmp.datasource.event_tag package mobvista.dmp.datasource.event_tag
import java.text.SimpleDateFormat
import java.util.Date
import com.alibaba.fastjson.JSON import com.alibaba.fastjson.JSON
import org.apache.spark.sql.{SaveMode, SparkSession, _} import org.apache.spark.sql._
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import java.text.SimpleDateFormat
import java.util.Date
/** /**
* 刘凯 2017-12-18 15:20 * 刘凯 2017-12-18 15:20
* 计算 3s event source 日志抽取事件 * 计算 3s event source 日志抽取事件
*/ */
object Dmp_event_source { object Dmp_event_source {
def main(args: Array[String]) { def main(args: Array[String]) {
val spark = SparkSession.builder() val spark = SparkSession.builder()
.enableHiveSupport() .enableHiveSupport()
.getOrCreate() .getOrCreate()
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
spark.conf.set("spark.kryoserializer.buffer.max", "300m")
//yyyyMMdd //yyyyMMdd
val loadTime = spark.conf.get("spark.app.loadTime") val loadTime = spark.conf.get("spark.app.loadTime")
var year = loadTime.substring(0, 4) var year = loadTime.substring(0, 4)
var month = loadTime.substring(4, 6) var month = loadTime.substring(4, 6)
val day = loadTime.substring(6, 8) val day = loadTime.substring(6, 8)
val coalesce_num_str = spark.conf.get("spark.app.coalesce_num") val coalesce_num_str = spark.conf.get("spark.app.coalesce_num")
var coalesce_num=500 var coalesce_num = 500
if(coalesce_num_str.nonEmpty){ if (coalesce_num_str.nonEmpty) {
coalesce_num=Integer.parseInt(coalesce_num_str) coalesce_num = Integer.parseInt(coalesce_num_str)
} }
spark.conf.set("spark.kryoserializer.buffer.max", "300m")
//����orc���� //����orc����
//OrcConfigUtil.configSettings(sparkContext) //OrcConfigUtil.configSettings(sparkContext)
spark.sparkContext.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2") //spark.sparkContext.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
spark.sparkContext.hadoopConfiguration.set("yarn.nodemanager.pmem-check-enabled", "false") //spark.sparkContext.hadoopConfiguration.set("yarn.nodemanager.pmem-check-enabled", "false")
spark.sparkContext.hadoopConfiguration.set("yarn.nodemanager.vmem-check-enabled", "false") //spark.sparkContext.hadoopConfiguration.set("yarn.nodemanager.vmem-check-enabled", "false")
val jobContext = spark.sqlContext val jobContext = spark.sqlContext
val normal_path="s3://mob-ad/3s/trackinglogs/event/"+year+"/"+month+"/"+day+"/singapore/*/info/*" val normal_path="s3://mob-ad/3s/trackinglogs/event/"+year+"/"+month+"/"+day+"/singapore/*/info/*"
......
...@@ -2,8 +2,8 @@ package mobvista.dmp.datasource.event_tag ...@@ -2,8 +2,8 @@ package mobvista.dmp.datasource.event_tag
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.MobvistaSparkHadoopUtil import org.apache.spark.MobvistaSparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, SaveMode, SparkSession, _}
/** /**
* 刘凯 2017-12-18 15:20 * 刘凯 2017-12-18 15:20
...@@ -14,8 +14,6 @@ object Event_tag { ...@@ -14,8 +14,6 @@ object Event_tag {
val spark = SparkSession.builder() val spark = SparkSession.builder()
.enableHiveSupport() .enableHiveSupport()
.getOrCreate() .getOrCreate()
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
spark.conf.set("spark.kryoserializer.buffer.max", "300m")
//yyyyMMdd //yyyyMMdd
val loadTime = spark.conf.get("spark.app.loadTime") val loadTime = spark.conf.get("spark.app.loadTime")
var year = loadTime.substring(0, 4) var year = loadTime.substring(0, 4)
...@@ -27,17 +25,16 @@ object Event_tag { ...@@ -27,17 +25,16 @@ object Event_tag {
val outputPath = "s3://mob-emr-test/dataplatform/DataWareHouse/data/" + db_name + "/" + table_name val outputPath = "s3://mob-emr-test/dataplatform/DataWareHouse/data/" + db_name + "/" + table_name
val categories_package_3s = "s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/dim_categories_package_3s/tmp/categories.txt" val categories_package_3s = "s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/dim_categories_package_3s/tmp/categories.txt"
spark.conf.set("spark.kryoserializer.buffer.max", "300m")
val event_type_brocast = spark.sparkContext.broadcast(new EventTypeUtils()) val event_type_brocast = spark.sparkContext.broadcast(new EventTypeUtils())
//***parquet** //***parquet**
spark.sparkContext.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2") //spark.sparkContext.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
spark.sparkContext.hadoopConfiguration.set("yarn.nodemanager.pmem-check-enabled", "false") //spark.sparkContext.hadoopConfiguration.set("yarn.nodemanager.pmem-check-enabled", "false")
spark.sparkContext.hadoopConfiguration.set("yarn.nodemanager.vmem-check-enabled", "false") //spark.sparkContext.hadoopConfiguration.set("yarn.nodemanager.vmem-check-enabled", "false")
val jobContext = spark.sqlContext val jobContext = spark.sqlContext
/** /**
* categories_package_3s * categories_package_3s
*/ */
val categories_LOGRDD = spark.sparkContext.textFile(categories_package_3s) val categories_LOGRDD = spark.sparkContext.textFile(categories_package_3s)
val categoriesRdd = categories_LOGRDD.filter { x => !x.contains("catego") }.map(_.split("\t")).filter(_.length > 4).map(p => Row(p(0).toInt, p(1), p(2), p(3), p(4))) val categoriesRdd = categories_LOGRDD.filter { x => !x.contains("catego") }.map(_.split("\t")).filter(_.length > 4).map(p => Row(p(0).toInt, p(1), p(2), p(3), p(4)))
val categories_schema = StructType(Array( val categories_schema = StructType(Array(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment