Commit 7a8e01b8 by WangJinfeng

fix rtdmp, adn_sdk_daily

parent 55b0991b
...@@ -26,8 +26,8 @@ spark-submit --class mobvista.dmp.datasource.adn_sdk.AdnSdkDaily \ ...@@ -26,8 +26,8 @@ spark-submit --class mobvista.dmp.datasource.adn_sdk.AdnSdkDaily \
--conf spark.app.loadTime=${LOG_TIME} \ --conf spark.app.loadTime=${LOG_TIME} \
--conf spark.app.input_path=${INPUT_PATH} \ --conf spark.app.input_path=${INPUT_PATH} \
--conf spark.app.output_path=${OUTPUT_PATH} \ --conf spark.app.output_path=${OUTPUT_PATH} \
--conf spark.sql.shuffle.partitions=5000 \ --conf spark.sql.shuffle.partitions=2000 \
--conf spark.default.parallelism=5000 \ --conf spark.default.parallelism=2000 \
--conf spark.shuffle.memoryFraction=0.4 \ --conf spark.shuffle.memoryFraction=0.4 \
--conf spark.storage.memoryFraction=0.4 \ --conf spark.storage.memoryFraction=0.4 \
--conf spark.driver.maxResultSize=8g \ --conf spark.driver.maxResultSize=8g \
...@@ -35,7 +35,7 @@ spark-submit --class mobvista.dmp.datasource.adn_sdk.AdnSdkDaily \ ...@@ -35,7 +35,7 @@ spark-submit --class mobvista.dmp.datasource.adn_sdk.AdnSdkDaily \
--conf spark.app.coalesce=60000 \ --conf spark.app.coalesce=60000 \
--files ${HIVE_SITE_PATH} \ --files ${HIVE_SITE_PATH} \
--jars ${JARS} \ --jars ${JARS} \
--master yarn --deploy-mode cluster --name adn_sdk_daily --executor-memory 18g --driver-memory 6g --executor-cores 5 --num-executors 80 \ --master yarn --deploy-mode cluster --name adn_sdk_daily --executor-memory 8g --driver-memory 6g --executor-cores 4 --num-executors 200 \
../${JAR} ../${JAR}
if [[ $? -ne 0 ]];then if [[ $? -ne 0 ]];then
......
...@@ -18,18 +18,20 @@ OLD_MERGE_INPUT="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/audience_ ...@@ -18,18 +18,20 @@ OLD_MERGE_INPUT="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/audience_
check_await ${OLD_MERGE_INPUT}/_SUCCESS check_await ${OLD_MERGE_INPUT}/_SUCCESS
sleep 120
OUTPUT="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/audience_merge/${date_path}" OUTPUT="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/audience_merge/${date_path}"
spark-submit --class mobvista.dmp.datasource.rtdmp.RTDmpMain \ spark-submit --class mobvista.dmp.datasource.rtdmp.RTDmpMain \
--name "RTDmpMain.${date_time}" \ --name "RTDmpMain.${date_time}" \
--conf spark.sql.shuffle.partitions=1000 \ --conf spark.sql.shuffle.partitions=2000 \
--conf spark.default.parallelism=1000 \ --conf spark.default.parallelism=2000 \
--conf spark.kryoserializer.buffer.max=512m \ --conf spark.kryoserializer.buffer.max=512m \
--conf spark.kryoserializer.buffer=64m \ --conf spark.kryoserializer.buffer=64m \
--master yarn --deploy-mode cluster \ --master yarn --deploy-mode cluster \
--executor-memory 18g --driver-memory 6g --executor-cores 5 --num-executors 40 \ --executor-memory 8g --driver-memory 6g --executor-cores 4 --num-executors 100 \
.././DMP.jar \ .././DMP.jar \
-datetime ${date_time} -old_datetime ${old_date_time} -input ${INPUT} -output ${OUTPUT} -coalesce 200 -datetime ${date_time} -old_datetime ${old_date_time} -input ${INPUT} -output ${OUTPUT} -coalesce 400
if [[ $? -ne 0 ]]; then if [[ $? -ne 0 ]]; then
exit 255 exit 255
......
package mobvista.dmp.datasource.adn_sdk package mobvista.dmp.datasource.adn_sdk
import java.net.URI
import java.text.SimpleDateFormat
import java.util.{Date, Properties}
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject} import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import mobvista.dmp.common.MobvistaConstant import mobvista.dmp.common.MobvistaConstant
import mobvista.dmp.datasource.apptag.Constant
import mobvista.dmp.datasource.datatory.ConstantV2
import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.broadcast.Broadcast import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import org.apache.spark.sql.{SparkSession, _} import org.apache.spark.sql._
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
import java.net.URI
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.util.control.Breaks._ import scala.util.control.Breaks._
...@@ -94,7 +92,7 @@ object AdnSdkDaily extends Serializable { ...@@ -94,7 +92,7 @@ object AdnSdkDaily extends Serializable {
} }
linesArr.toIterator linesArr.toIterator
} }
) ).repartition(2000)
filter_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) filter_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
/*{ /*{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment