Commit d2703fa6 by WangJinfeng

rtdmp_merge remove

parent 5675c153
...@@ -8,6 +8,8 @@ import mobvista.dmp.utils.clickhouse.ClickHouseSparkExt._ ...@@ -8,6 +8,8 @@ import mobvista.dmp.utils.clickhouse.ClickHouseSparkExt._
import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.cli.{BasicParser, Options}
import ru.yandex.clickhouse.ClickHouseDataSource import ru.yandex.clickhouse.ClickHouseDataSource
import java.text.SimpleDateFormat
import java.util.Calendar
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.collection.mutable import scala.collection.mutable
...@@ -48,27 +50,37 @@ class RTDmpMergeCK extends CommonSparkJob with Serializable { ...@@ -48,27 +50,37 @@ class RTDmpMergeCK extends CommonSparkJob with Serializable {
try { try {
expire_time = DateUtil.getDayByString(date_time, "yyyyMMddHH", -1) expire_time = DateUtil.getDayByString(date_time, "yyyyMMddHH", -1)
spark.udf.register("process", process _) val tdf = spark.emptyDataFrame
val sdf = new SimpleDateFormat("yyyyMMddHH")
val df = spark.sql(sql.replace("@dt", date_time)) // drop expire partition
.filter("size(audience_id) > 0") val calendar = Calendar.getInstance()
var date = sdf.parse(date_time)
calendar.setTime(date)
calendar.set(Calendar.HOUR_OF_DAY, calendar.get(Calendar.HOUR_OF_DAY) - 6)
val expire_part = sdf.format(calendar.getTime)
var dt_part = expire_part.substring(0, 8)
var hour_part = expire_part.substring(8, 10)
implicit val clickhouseDataSource: ClickHouseDataSource = ClickHouseConnectionFactory.get(host) implicit val clickhouseDataSource: ClickHouseDataSource = ClickHouseConnectionFactory.get(host)
val clusterName = Some(cluster): Option[String] val clusterName = Some(cluster): Option[String]
val date = date_time.substring(0, 8) tdf.dropPartition(database, table, s"($dt_part,'$hour_part')", clusterName)
val dt = MobvistaConstant.sdf1.format(MobvistaConstant.sdf2.parse(date)) spark.udf.register("process", process _)
val hour = date_time.substring(8, 10)
val tdf = spark.emptyDataFrame val df = spark.sql(sql.replace("@dt", date_time))
// drop curr partition .filter("size(audience_id) > 0")
tdf.dropPartition(database, table, s"($date,'$hour')", clusterName)
dt_part = date_time.substring(0, 8)
hour_part = expire_time.substring(8, 10)
val dt = MobvistaConstant.sdf1.format(MobvistaConstant.sdf2.parse(dt_part))
tdf.dropPartition(database, table, s"($dt_part,'$hour_part')", clusterName)
Thread.sleep(120000) Thread.sleep(120000)
df.saveToClickHouse(database, table, Seq(dt, hour), Seq("dt", "hour"), clusterName, batchSize = 200000) df.saveToClickHouse(database, table, Seq(dt, hour_part), Seq("dt", "hour"), clusterName, batchSize = 200000)
} finally { } finally {
if (spark != null) { if (spark != null) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment