Commit 07920dc9 by WangJinfeng

fix rtdmp

parent d2703fa6
......@@ -2,20 +2,14 @@ package mobvista.dmp.datasource.rtdmp
import com.datastax.oss.driver.api.core.ConsistencyLevel
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.rdd.ReadConf
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.datasource.rtdmp.Constant.NewAudienceInfo
import mobvista.dmp.util.{DateUtil, PropertyUtil}
import mobvista.dmp.util.PropertyUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel
import java.net.URI
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
/**
* @package: mobvista.dmp.datasource.rtdmp
......@@ -29,11 +23,8 @@ class RTDmpMain extends CommonSparkJob with Serializable {
def commandOptions(): Options = {
val options = new Options()
options.addOption("time", true, "time")
options.addOption("input", true, "input")
options.addOption("output", true, "output")
options.addOption("coalesce", true, "coalesce")
options.addOption("flag", true, "flag")
options
}
......@@ -42,16 +33,13 @@ class RTDmpMain extends CommonSparkJob with Serializable {
val parser = new BasicParser()
val options = commandOptions()
val commandLine = parser.parse(options, args)
val time = commandLine.getOptionValue("time").replace(".", " ")
val input = commandLine.getOptionValue("input")
val output = commandLine.getOptionValue("output")
val coalesce = commandLine.getOptionValue("coalesce")
val flag = Integer.parseInt(commandLine.getOptionValue("flag"))
val system = "rtdmp"
val region = "vg"
val spark: SparkSession = SparkSession.builder()
.appName(s"RTDmpMainSpe.$time")
.appName(s"RTDmpMainSpe")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true")
......@@ -79,59 +67,31 @@ class RTDmpMain extends CommonSparkJob with Serializable {
try {
import spark.implicits._
if (flag == 1) {
val expire_time = DateUtil.getDayByString(time, "yyyy-MM-dd HH", -7)
val keyspace = "rtdmp"
val tableName = "audience_info"
val columns = SomeColumns("devid", "audience_data", "update_time")
val cassandraConnector = CassandraConnector(sc.getConf)
object ReadConfigurationOne {
implicit val readConf = ReadConf(Option(10000), 5, 2048, ConsistencyLevel.LOCAL_ONE, true)
}
val selectDF = spark.read.orc(input)
// 仅更新上个小时的数据
val update_time_start = DateUtil.format(time + ":00:00", "yyyy-MM-dd HH:mm:ss")
val update_time_end = DateUtil.format(time + ":59:59", "yyyy-MM-dd HH:mm:ss")
val audience_date_utime_start = DateUtil.parse(time + ":00:00", "yyyy-MM-dd HH:mm:ss").getTime / 1000 - 28800
val audience_date_utime_end = DateUtil.parse(time + ":59:59", "yyyy-MM-dd HH:mm:ss").getTime / 1000 - 28800
val update_ids = ServerUtil.request(update_time_start, update_time_end, audience_date_utime_start, audience_date_utime_end, 0, 0, 2)
.asScala.keySet
import ReadConfigurationOne._
val df = selectDF.mapPartitions(it => cassandraConnector.withSessionDo(session => {
Logic.getResultFeature(session, it)
})).toDF
.select(col("devid"), col("audience_data").alias("audience_ids"), col("audience_data"))
.rdd
.mapPartitions(v => new CustomMapPartition(v, update_time = time, expire_time, update_ids))
val cassandraDF = sc.cassandraTable(keyspace, tableName)
.mapPartitions(irs => {
val res = new ArrayBuffer[(String, String, String)]()
irs.foreach(r => {
res.add(r.getString("devid"), r.getString("audience_data"), r.getString("update_time"))
})
res.iterator
}).toDF("devid", "audience_data", "update_time")
df.persist(StorageLevel.MEMORY_AND_DISK_SER)
FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
df.repartition(coalesce.toInt)
cassandraDF.repartition(coalesce.toInt)
.toDF
.write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output)
df.saveToCassandra(keyspace, tableName, columns)
} else {
FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
Seq.empty[NewAudienceInfo].toDF
.write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output)
}
} finally {
if (sc != null) {
sc.stop()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment