1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package mobvista.dmp.clickhouse.adn_report
import java.text.SimpleDateFormat
import mobvista.dmp.utils.clickhouse.ClickHouseConnectionFactory
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.sql.SparkSession
import ru.yandex.clickhouse.ClickHouseDataSource
import mobvista.dmp.utils.clickhouse.ClickHouseSparkExt._
/**
* @package: mobvista.dmp.clickhouse.adn_report
* @author: wangjf
* @date: 2020/2/19
* @time: 5:09 下午
* @email: jinfeng.wang@mobvista.com
* @phone: 152-1062-7698
*/
class ExtractToClickHouse extends Serializable {
val sdf1 = new SimpleDateFormat("yyyy-MM-dd")
val sdf2 = new SimpleDateFormat("yyyyMMdd")
def commandOptions(): Options = {
val options = new Options()
options.addOption("date", true, "date")
options.addOption("host", true, "host")
options.addOption("cluster", true, "cluster")
options.addOption("database", true, "database")
options.addOption("table", true, "table")
options
}
protected def run(args: Array[String]) {
val parser = new BasicParser()
val options = commandOptions()
val commandLine = parser.parse(options, args)
val date = commandLine.getOptionValue("date")
val cluster = commandLine.getOptionValue("cluster")
val host = commandLine.getOptionValue("host")
val database = commandLine.getOptionValue("database")
val table = commandLine.getOptionValue("table")
val spark = SparkSession
.builder()
.appName(s"ExtractToClickHouse.$database.$table.$date")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "lz4")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
try {
val sql = Constant.ss_creative_report_hour_sql
.replace("@database", database)
.replace("@table", table)
.replace("@date", date)
val df = spark.sql(sql)
val cdf = spark.createDataFrame(df.rdd.map(Constant.precess(_, table)), Constant.getSchema(table))
// clickhouse params
val clusterName = Some(cluster): Option[String]
implicit val clickhouseDataSource: ClickHouseDataSource = ClickHouseConnectionFactory.get(host)
// val tdf = spark.emptyDataFrame
val partition = sdf1.format(sdf2.parse(date))
// drop table
// tdf.dropPartition(database, table, date, clusterName)
cdf.createClickHouseDb(database, clusterName)
val indexColumn = Constant.getIndexColumn(table)
cdf.createClickHouseTable(database, table, Seq("pdate"), indexColumn, Seq(), clusterName)
cdf.saveToClickHouse(database, table, Seq(partition), Seq("pdate"), clusterName, batchSize = 2000000)
} finally {
if (spark != null) {
spark.stop()
}
}
}
}
object ExtractToClickHouse {
def main(args: Array[String]): Unit = {
new ExtractToClickHouse().run(args)
}
}