1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package mobvista.dmp.utils.clickhouse
import mobvista.dmp.utils.clickhouse.ClickHouseResultSetExt._
import mobvista.dmp.utils.clickhouse.Utils._
import ru.yandex.clickhouse.ClickHouseDataSource
/**
* @package: mobvista.dmp.utils.clickhouse
* @author: wangjf
* @date: 2019/07/16
* @time: 17:56:56
* @email: jinfeng.wang@mobvista.com
* @phone: 152-1062-7698
*/
case class ClickHouseClient(clusterNameO: Option[String] = None)
(implicit ds: ClickHouseDataSource) {
def createDb(dbName: String) {
query(s"create database if not exists $dbName")
}
def dropDb(dbName: String) {
query(s"DROP DATABASE IF EXISTS $dbName")
}
def dropTable(dbName: String, tbName: String) {
query(s"DROP TABLE IF EXISTS $dbName.$tbName")
}
def query(sql: String) = {
using(ds.getConnection) { conn =>
val statement = conn.createStatement()
val rs = statement.executeQuery(sql)
rs
}
}
def queryCluster(sql: String) = {
val resultSet = runOnAllNodes(sql)
ClusterResultSet(resultSet)
}
def createDbCluster(dbName: String) = {
runOnAllNodes(s"create database if not exists $dbName")
.count(x => x._2 == null)
}
def dropDbCluster(dbName: String) = {
runOnAllNodes(s"DROP DATABASE IF EXISTS $dbName")
.count(x => x._2 == null)
}
def dropTableCluster(dbName: String, tbName: String) = {
runOnAllNodes(s"DROP TABLE IF EXISTS $dbName.$tbName")
.count(x => x._2 == null)
}
def getClusterNodes() = {
val clusterName = isClusterNameProvided()
using(ds.getConnection) { conn =>
val statement = conn.createStatement()
val rs = statement.executeQuery(s"select host_name, host_address from system.clusters where cluster == '$clusterName'")
val r = rs.map(x => x.getString("host_address"))
require(r.nonEmpty, s"cluster $clusterNameO not found")
r
}
}
private def runOnAllNodes(sql: String) = {
getClusterNodes().map { nodeIp =>
val nodeDs = ClickHouseConnectionFactory.get(nodeIp)
val client = ClickHouseClient()(nodeDs)
(nodeIp, client.query(sql))
}
}
private def isClusterNameProvided() = {
clusterNameO match {
case None => throw new Exception("cluster name is requires")
case Some(clusterName) => clusterName
}
}
}