Commit 953771b9 by WangJinfeng

fix rtdmp

parent 07920dc9
package mobvista.dmp.datasource.rtdmp
import com.datastax.oss.driver.api.core.ConsistencyLevel
import com.datastax.spark.connector._
import com.datastax.spark.connector.rdd.ReadConf
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.util.PropertyUtil
import com.alibaba.fastjson.JSONObject
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.rtdmp.Constant.AudienceMerge
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.codehaus.jackson.map.ObjectMapper
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import java.text.SimpleDateFormat
import java.util.Calendar
import scala.collection.JavaConverters._
/**
* @package: mobvista.dmp.datasource.rtdmp
......@@ -23,6 +23,8 @@ class RTDmpMain extends CommonSparkJob with Serializable {
def commandOptions(): Options = {
val options = new Options()
options.addOption("datetime", true, "datetime")
options.addOption("input", true, "input")
options.addOption("output", true, "output")
options.addOption("coalesce", true, "coalesce")
options
......@@ -33,60 +35,78 @@ class RTDmpMain extends CommonSparkJob with Serializable {
val parser = new BasicParser()
val options = commandOptions()
val commandLine = parser.parse(options, args)
val datetime = commandLine.getOptionValue("datetime")
val input = commandLine.getOptionValue("input")
val output = commandLine.getOptionValue("output")
val coalesce = commandLine.getOptionValue("coalesce")
val system = "rtdmp"
val region = "vg"
val spark: SparkSession = SparkSession.builder()
.appName(s"RTDmpMainSpe")
.appName(s"RTDmpMain")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.cassandra.connection.factory", s"mobvista.dmp.utils.cassandra.$system.${region.toUpperCase}Factory")
.config("spark.cassandra.connection.host", PropertyUtil.getProperty("ip.properties", s"$system.$region.host"))
.config("spark.cassandra.connection.port", "9042")
.config("spark.cassandra.connection.remoteConnectionsPerExecutor", "64")
.config("spark.cassandra.connection.localConnectionsPerExecutor", "32")
.config("spark.cassandra.query.retry.count", "10")
.config("spark.cassandra.connection.compression", "LZ4")
.config("spark.cassandra.input.consistency.level", "LOCAL_ONE")
.config("spark.cassandra.output.consistency.level", "LOCAL_ONE")
.config("spark.cassandra.input.fetch.sizeInRows", "2048")
.config("spark.cassandra.concurrent.reads", "2048")
.config("spark.cassandra.output.concurrent.writes", "16")
.config("spark.cassandra.output.batch.grouping.buffer.size", "1024")
.config("spark.cassandra.output.batch.size.bytes", "1024")
.config("spark.cassandra.connection.keepAliveMS", "60000")
.config("spark.cassandra.auth.username", "U&6zBV$*wBuYUpJRq$hp")
.config("spark.cassandra.auth.password", "Z8tzjTMBe^M2#hat$nAJ")
.getOrCreate()
val sc = spark.sparkContext
try {
import spark.implicits._
val keyspace = "rtdmp"
val tableName = "audience_info"
val sdf = new SimpleDateFormat("yyyyMMddHH")
val calendar = Calendar.getInstance()
val date = sdf.parse(datetime)
calendar.setTime(date)
calendar.set(Calendar.HOUR_OF_DAY, calendar.get(Calendar.HOUR_OF_DAY) - 24)
val expire_time = sdf.format(calendar.getTime)
object ReadConfigurationOne {
implicit val readConf = ReadConf(Option(10000), 5, 2048, ConsistencyLevel.LOCAL_ONE, true)
}
val hour_rdd = spark.read.orc(input).rdd.map(row => {
val devid = row.getAs[String]("devid")
val audience_ids = row.getAs[String]("audience_ids").split(",")
val audience_data = new JSONObject()
audience_ids.foreach(audience_id => {
audience_data.put(audience_id, datetime)
})
val device_type = row.getAs[String]("device_type")
(devid, (audience_data.toJSONString, device_type))
})
import ReadConfigurationOne._
val sql =
s"""
|SELECT * FROM dwh.audience_merge WHERE dt = '$datetime'
|""".stripMargin
val merge_rdd = spark.sql(sql).rdd
.map(row => {
val devid = row.getAs[String]("devid")
val audience_map = row.getAs[String]("audience_map")
val update_time = row.getAs[String]("update_time")
val device_type = row.getAs[String]("device_type")
(devid, (audience_map, update_time, device_type))
})
import spark.implicits._
val cassandraDF = sc.cassandraTable(keyspace, tableName)
.mapPartitions(irs => {
val res = new ArrayBuffer[(String, String, String)]()
irs.foreach(r => {
res.add(r.getString("devid"), r.getString("audience_data"), r.getString("update_time"))
val df = hour_rdd.fullOuterJoin(merge_rdd).map(t => {
val devid = t._1
val opt1 = t._2._1
val opt2 = t._2._2
if (opt1.nonEmpty && opt2.nonEmpty) {
val new_audience = MobvistaConstant.String2JSONObject(opt1.get._1)
val old_audience = opt2.get._1
val retain_old_audience = MobvistaConstant.String2JSONObject(old_audience).asInstanceOf[java.util.Map[String, String]].asScala
.retain((k, v) => !new_audience.keySet().contains(k) && v.compareTo(expire_time) > 0)
new_audience.putAll(retain_old_audience.asJava)
AudienceMerge(devid, new_audience.toJSONString, datetime, opt1.get._2)
} else if (opt1.nonEmpty && opt2.isEmpty) {
AudienceMerge(devid, opt1.get._1, datetime, opt1.get._2)
} else {
val old_audience = opt2.get._1
val retain_old_audience = MobvistaConstant.String2JSONObject(old_audience).asInstanceOf[java.util.Map[String, String]].asScala
.retain((_, v) => v.compareTo(expire_time) > 0)
AudienceMerge(devid, new ObjectMapper().writeValueAsString(retain_old_audience.asJava), opt2.get._2, opt2.get._3)
}
})
res.iterator
}).toDF("devid", "audience_data", "update_time")
cassandraDF.repartition(coalesce.toInt)
.toDF
df.toDF
.repartition(coalesce.toInt)
.write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment