Commit 15972dc8 by wang-jinfeng

optimize dmp

parent fd0560d0
...@@ -4,25 +4,47 @@ source ../dmp_env.sh ...@@ -4,25 +4,47 @@ source ../dmp_env.sh
today=${ScheduleTime} today=${ScheduleTime}
date_time=$(date +"%Y-%m-%d %H" -d "-1 hour $today") date_time=$(date +"%Y-%m-%d.%H" -d "-1 hour $today")
date_path=$(date +%Y/%m/%d/%H -d "-1 hour $today") date_path=$(date +%Y/%m/%d/%H -d "-1 hour $today")
part_num=$(hadoop fs -ls s3://mob-emr-test/dataplatform/rtdmp_pre/${date_path}/ | wc -l)
if [[ ${part_num} -le 50 ]]; then
echo "This Dir No Data !!!"
partition=10
coalesce=10
executor=2
memory=4
core=2
flag=0
else
partition=2000
coalesce=200
executor=8
memory=10
core=4
flag=1
fi
INPUT="s3://mob-emr-test/dataplatform/rtdmp_pre/${date_path}" INPUT="s3://mob-emr-test/dataplatform/rtdmp_pre/${date_path}"
OUTPUT="s3://mob-emr-test/dataplatform/rtdmp_deal/${date_path}/0" OUTPUT="s3://mob-emr-test/dataplatform/rtdmp_deal/${date_path}"
spark-submit --class mobvista.dmp.datasource.rtdmp.RTDmpMainDeal \ before_date_path=$(date +%Y/%m/%d/%H -d "-2 hour $today")
--name "RTDmpMainDeal.${date_time}" \
--conf spark.sql.shuffle.partitions=10000 \ BEFORE_OUTPUT="s3://mob-emr-test/dataplatform/rtdmp/${before_date_path}"
--conf spark.default.parallelism=500 \
--conf spark.kryoserializer.buffer.max=256m \ check_await "${BEFORE_OUTPUT}/_SUCCESS"
--conf spark.speculation=true \
--conf spark.speculation.quantile=0.9 \ spark-submit --class mobvista.dmp.datasource.rtdmp.RTDmpMain \
--conf spark.speculation.multiplier=1.3 \ --name "RTDmpMain.${date_time}" \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \ --conf spark.sql.shuffle.partitions=${partition} \
--master yarn --deploy-mode cluster --executor-memory 4g --driver-memory 4g --executor-cores 4 --num-executors 50 \ --conf spark.default.parallelism=${partition} \
../${JAR} -time "${date_time}" -data_utime "${date_time}" -input ${INPUT} -output ${OUTPUT} -coalesce 200 -partition 10000 --conf spark.kryoserializer.buffer.max=512m \
--conf spark.kryoserializer.buffer=64m \
--master yarn --deploy-mode cluster \
--executor-memory ${memory}g --driver-memory 6g --executor-cores ${core} --num-executors ${executor} \
.././DMP.jar \
-flag ${flag} -time ${date_time} -input ${INPUT} -output ${OUTPUT} -coalesce ${coalesce}
if [[ $? -ne 0 ]]; then if [[ $? -ne 0 ]]; then
exit 255 exit 255
......
...@@ -31,7 +31,6 @@ import scala.collection.{immutable, mutable} ...@@ -31,7 +31,6 @@ import scala.collection.{immutable, mutable}
object Logic { object Logic {
def getResultFeature(session: CqlSession, iterator: Iterator[Row]): Iterator[AudienceInfo] = { def getResultFeature(session: CqlSession, iterator: Iterator[Row]): Iterator[AudienceInfo] = {
val sql = val sql =
""" """
|select audience_data from rtdmp.audience_info where devid = '@devid' |select audience_data from rtdmp.audience_info where devid = '@devid'
...@@ -39,7 +38,6 @@ object Logic { ...@@ -39,7 +38,6 @@ object Logic {
val res = new ArrayBuffer[AudienceInfo]() val res = new ArrayBuffer[AudienceInfo]()
iterator.foreach(row => { iterator.foreach(row => {
// val session = connector.openSession()
val devId = row.getAs[String](0) val devId = row.getAs[String](0)
val audience_data = row.getAs[String](1) val audience_data = row.getAs[String](1)
val query_sql = sql.replace("@devid", devId) val query_sql = sql.replace("@devid", devId)
...@@ -49,7 +47,6 @@ object Logic { ...@@ -49,7 +47,6 @@ object Logic {
} else { } else {
new JSONObject().toJSONString new JSONObject().toJSONString
} }
// session.close()
res.add(AudienceInfo(devId, audience_data, old_audience_data)) res.add(AudienceInfo(devId, audience_data, old_audience_data))
}) })
res.iterator() res.iterator()
......
package mobvista.dmp.datasource.rtdmp package mobvista.dmp.datasource.rtdmp
import java.net.URI import com.datastax.oss.driver.api.core.ConsistencyLevel
import java.util
import com.alibaba.fastjson.{JSONArray, JSONObject}
import com.datastax.spark.connector._ import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector import com.datastax.spark.connector.cql.CassandraConnector
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import com.datastax.spark.connector.rdd.ReadConf
import mobvista.dmp.util.{DateUtil, MD5Util, PropertyUtil} import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.datasource.rtdmp.Constant.NewAudienceInfo
import mobvista.dmp.util.{DateUtil, PropertyUtil}
import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
import java.net.URI
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.collection.mutable import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
/** /**
* @package: mobvista.dmp.datasource.rtdmp * @package: mobvista.dmp.datasource.rtdmp
...@@ -31,12 +30,10 @@ class RTDmpMain extends CommonSparkJob with Serializable { ...@@ -31,12 +30,10 @@ class RTDmpMain extends CommonSparkJob with Serializable {
def commandOptions(): Options = { def commandOptions(): Options = {
val options = new Options() val options = new Options()
options.addOption("time", true, "time") options.addOption("time", true, "time")
options.addOption("input", true, "input")
options.addOption("output", true, "output") options.addOption("output", true, "output")
options.addOption("coalesce", true, "coalesce") options.addOption("coalesce", true, "coalesce")
options.addOption("partition", true, "partition") options.addOption("flag", true, "flag")
// options.addOption("update_time_start", true, "update_time_start")
// options.addOption("update_time_end", true, "update_time_end")
options.addOption("data_utime", true, "data_utime")
options options
} }
...@@ -45,203 +42,95 @@ class RTDmpMain extends CommonSparkJob with Serializable { ...@@ -45,203 +42,95 @@ class RTDmpMain extends CommonSparkJob with Serializable {
val parser = new BasicParser() val parser = new BasicParser()
val options = commandOptions() val options = commandOptions()
val commandLine = parser.parse(options, args) val commandLine = parser.parse(options, args)
val time = commandLine.getOptionValue("time") val time = commandLine.getOptionValue("time").replace(".", " ")
val input = commandLine.getOptionValue("input")
val output = commandLine.getOptionValue("output") val output = commandLine.getOptionValue("output")
val coalesce = commandLine.getOptionValue("coalesce") val coalesce = commandLine.getOptionValue("coalesce")
val partition = commandLine.getOptionValue("partition") val flag = Integer.parseInt(commandLine.getOptionValue("flag"))
// var update_time_start = commandLine.getOptionValue("update_time_start")
// var update_time_end = commandLine.getOptionValue("update_time_end")
val data_utime = commandLine.getOptionValue("data_utime")
val system = "rtdmp"
val region = "vg"
val spark: SparkSession = SparkSession.builder() val spark: SparkSession = SparkSession.builder()
.appName(s"RTDmpMain.$time") .appName(s"RTDmpMainSpe.$time")
.config("spark.rdd.compress", "true") .config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy") .config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.orc.filterPushdown", "true")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.cassandra.connection.host", PropertyUtil.getProperty("ip.properties", "aws_host")) .config("spark.cassandra.connection.factory", s"mobvista.dmp.utils.cassandra.$system.${region.toUpperCase}Factory")
.config("spark.cassandra.connection.host", PropertyUtil.getProperty("ip.properties", s"$system.$region.host"))
.config("spark.cassandra.connection.port", "9042") .config("spark.cassandra.connection.port", "9042")
.config("spark.cassandra.connection.factory", s"mobvista.dmp.utils.cassandra.AWSFactory") .config("spark.cassandra.connection.remoteConnectionsPerExecutor", "64")
.config("spark.cassandra.connection.connections_per_executor_max", "512") .config("spark.cassandra.connection.localConnectionsPerExecutor", "32")
.config("spark.cassandra.output.concurrent.writes", "32") .config("spark.cassandra.query.retry.count", "10")
.config("spark.cassandra.concurrent.reads", "1024") .config("spark.cassandra.connection.compression", "LZ4")
.config("spark.cassandra.input.consistency.level", "LOCAL_ONE")
.config("spark.cassandra.output.consistency.level", "LOCAL_ONE")
.config("spark.cassandra.input.fetch.sizeInRows", "2048")
.config("spark.cassandra.concurrent.reads", "2048")
.config("spark.cassandra.output.concurrent.writes", "16")
.config("spark.cassandra.output.batch.grouping.buffer.size", "1024") .config("spark.cassandra.output.batch.grouping.buffer.size", "1024")
.config("spark.cassandra.connection.keep_alive_ms", "600000") .config("spark.cassandra.output.batch.size.bytes", "1024")
.config("spark.cassandra.connection.keepAliveMS", "60000")
.config("spark.cassandra.auth.username", "U&6zBV$*wBuYUpJRq$hp")
.config("spark.cassandra.auth.password", "Z8tzjTMBe^M2#hat$nAJ")
.getOrCreate() .getOrCreate()
// .config("spark.cassandra.input.consistency.level", "LOCAL_QUORUM")
// .config("spark.cassandra.output.consistency.level", "LOCAL_QUORUM")
// .config("spark.cassandra.connection.connections_per_executor_max", "8")
val sc = spark.sparkContext val sc = spark.sparkContext
try { try {
var mergeRDD = sc.emptyRDD[(String, (Int, String))] import spark.implicits._
if (flag == 1) {
val expire_time = DateUtil.getDayByString(time, "yyyy-MM-dd HH", -7) val expire_time = DateUtil.getDayByString(time, "yyyy-MM-dd HH", -7)
// 默认计算上个小时的数据
val update_time_start = DateUtil.format(time + ":00:00", "yyyy-MM-dd HH:mm:ss")
val update_time_end = DateUtil.format(time + ":59:59", "yyyy-MM-dd HH:mm:ss")
val audience_date_utime_start = DateUtil.parse(data_utime, "yyyy-MM-dd HH").getTime / 1000 - 28800
val audience_date_utime_end = DateUtil.parse(data_utime, "yyyy-MM-dd HH").getTime / 1000 - 25200
val map: util.Map[Integer, (JSONArray, Integer, Integer, JSONObject)] =
ServerUtil.request(update_time_start, update_time_end, audience_date_utime_start, audience_date_utime_end, 0, 0, 2)
println(s"map -->> $map")
map.foreach(t => {
val audienceId = Integer2int(t._1)
val audienceOp = t._2._2
val dmap = new mutable.HashMap[String, String]()
t._2._1.foreach(json => {
val jsonObject = json.asInstanceOf[JSONObject]
if (jsonObject.containsKey("s3_path") && StringUtils.isNotBlank(jsonObject.getString("s3_path"))) {
// (s3_path, update_date)
dmap.put(jsonObject.getString("s3_path"), jsonObject.getString("update_time"))
}
})
// 判断所有 s3_path 对应的 update_date 中是否有 当前 update_date,进行有效过滤
// if (dmap.values.contains(time)) {
/**
* audienceOp == 0 and dmap.size >= 2 即做差集计算,表示在上个分区出现,这个分区不出现的设备对应的安装包置为 -1 * audienceId,
* 用于下游删除;否则与前一个人群包进行合并操作
*/
val updateRDD = if (audienceOp == 1 && dmap.size >= 2) {
val list = dmap.toList.sortWith(_._2 > _._2).take(1) // 按 update_date 进行降序排序,提取最前面的两个人群包
val newAudience = sc.textFile(list.get(0)._1).map(r => { // First 为最新的人群包
val device_id =
if (r.matches(MobvistaConstant.md5Ptn)) {
r
} else {
MD5Util.getMD5Str(r)
}
(device_id, (audienceId, list.get(0)._2))
})
/*
val oldAudience = sc.textFile(list.get(1)._1).map(r => { // Second 为旧的人群包,表示上一版本人群包
val device_id =
if (r.matches(MobvistaConstant.md5Ptn)) {
r
} else {
MD5Util.getMD5Str(r)
}
(device_id, (audienceId, list.get(1)._2))
})
oldAudience.subtractByKey(newAudience).map(t => {
// 对差集 audienceId * -1,用于下游计算删除 audienceId
val device_id =
if (t._1.matches(MobvistaConstant.md5Ptn)) {
t._1
} else {
MD5Util.getMD5Str(t._1)
}
(device_id, ((-1) * audienceId, t._2._2))
// (devId, ((-1) * audienceId, update_date))
}).union(newAudience) // 与最新的人群包进行合并操作
*/
newAudience
} else {
val audData = dmap.toList.sortWith(_._2 > _._2)
if (audData.nonEmpty) {
sc.textFile(audData.get(0)._1).map(r => { // 取出最新的人群包
val device_id =
if (r.matches(MobvistaConstant.md5Ptn)) {
r
} else {
MD5Util.getMD5Str(r)
}
(device_id, (audienceId, audData.get(0)._2))
})
} else { // 如果没有,则创建 空RDD
sc.emptyRDD[(String, (Int, String))]
}
}
// 所有人群包进行合并操作
mergeRDD = mergeRDD.union(updateRDD)
// }
})
val keyspace = "rtdmp" val keyspace = "rtdmp"
val tableName = "audience_info" val tableName = "audience_info"
val columns = SomeColumns("devid", "audience_data", "update_time") val columns = SomeColumns("devid", "audience_data", "update_time")
val cassandraConnector = CassandraConnector(sc.getConf)
val set = ServerUtil.request(update_time_start, update_time_end, audience_date_utime_start, audience_date_utime_end, 0, 0, 4) val cassandraConnector = CassandraConnector(sc.getConf)
.retain((_, v) => v._2 == 1)
.keySet
println("audienceIds -->> " + set.mkString(",")) object ReadConfigurationOne {
val df = mergeRDD.groupByKey().map(r => { implicit val readConf = ReadConf(Option(10000), 5, 2048, ConsistencyLevel.LOCAL_ONE, true)
val devId = r._1 }
val jsonObject = new JSONObject()
// 生成 audienceId -> update_date JSONObject
r._2.foreach(t => {
jsonObject.put(t._1.toString, t._2)
})
Row(devId, jsonObject.toJSONString, time)
}).repartition(partition.toInt)
.mapPartitions(it => cassandraConnector.withSessionDo(session => {
Logic.getResultFeature(session, it)
}))
.mapPartitions(new CustomIteratorAudienceInfo(_, time, expire_time, set))
// .mapPartitions(Logic.parseAudienceInfo(_, expire_time)) val selectDF = spark.read.orc(input)
df.persist(StorageLevel.MEMORY_AND_DISK_SER) // 仅更新上个小时的数据
val update_time_start = DateUtil.format(time + ":00:00", "yyyy-MM-dd HH:mm:ss")
val update_time_end = DateUtil.format(time + ":59:59", "yyyy-MM-dd HH:mm:ss")
val audience_date_utime_start = DateUtil.parse(time + ":00:00", "yyyy-MM-dd HH:mm:ss").getTime / 1000 - 28800
val audience_date_utime_end = DateUtil.parse(time + ":59:59", "yyyy-MM-dd HH:mm:ss").getTime / 1000 - 28800
df.saveToCassandra(keyspace, tableName, columns) val update_ids = ServerUtil.request(update_time_start, update_time_end, audience_date_utime_start, audience_date_utime_end, 0, 0, 2)
.asScala.keySet
val audienceSum = df.map(r => { val df = selectDF.mapPartitions(it => cassandraConnector.withSessionDo(session => {
val array = new ArrayBuffer[(Int, Int)]() Logic.getResultFeature(session, it)
MobvistaConstant.String2JSONObject(r.audience_data).keySet().foreach(k => { })).toDF
if (map.keySet().contains(Integer.parseInt(k))) { .select(col("devid"), col("audience_data").alias("audience_ids"), col("audience_data"))
array.add((Integer.parseInt(k), 1)) .rdd
} .mapPartitions(v => new CustomMapPartition(v, update_time = time, expire_time, update_ids))
})
array.iterator
}).flatMap(l => l)
.countByKey()
val audience_output = output + "/audience"
FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(audience_output), true)
sc.parallelize(audienceSum.toList).coalesce(1).saveAsTextFile(audience_output) df.persist(StorageLevel.MEMORY_AND_DISK_SER)
val data_output = output + "/data" FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(data_output), true)
import spark.implicits._ df.repartition(coalesce.toInt)
df.mapPartitions(Logic.writeResult(cassandraConnector, _))
.repartition(coalesce.toInt)
.toDF .toDF
.write .write
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("orc.compress", "zlib") .option("orc.compress", "zlib")
.orc(data_output) .orc(output)
// .mapPartitions(Logic.writeResult(cassandraConnector, _))
/*
.mapPartitions(Logic.parseResult(data_output, _))
.repartition(coalesce.toInt)
.saveAsNewAPIHadoopFile(data_output, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]])
val jsonArray = new JSONArray() df.saveToCassandra(keyspace, tableName, columns)
audienceSum.foreach(m => { } else {
val jsonObject = new JSONObject() FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
jsonObject.put("id", m._1)
jsonObject.put("audience_data_status", 2)
jsonObject.put("audience_count", m._2)
jsonArray.add(jsonObject)
})
val jsonObject = ServerUtil.update(jsonArray) Seq.empty[NewAudienceInfo].toDF
if (jsonObject.getInteger("code") == 200) { .write
println("Audience Update OK!") .mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output)
} }
*/
} finally { } finally {
if (sc != null) { if (sc != null) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment