package mobvista.dmp.utils.clickhouse import mobvista.dmp.utils.clickhouse.ClickHouseResultSetExt._ import mobvista.dmp.utils.clickhouse.Utils._ import ru.yandex.clickhouse.ClickHouseDataSource /** * @package: mobvista.dmp.utils.clickhouse * @author: wangjf * @date: 2019/07/16 * @time: 17:56:56 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ case class ClickHouseClient(clusterNameO: Option[String] = None) (implicit ds: ClickHouseDataSource) { def createDb(dbName: String) { query(s"create database if not exists $dbName") } def dropDb(dbName: String) { query(s"DROP DATABASE IF EXISTS $dbName") } def dropTable(dbName: String, tbName: String) { query(s"DROP TABLE IF EXISTS $dbName.$tbName") } def query(sql: String) = { using(ds.getConnection) { conn => val statement = conn.createStatement() val rs = statement.executeQuery(sql) rs } } def queryCluster(sql: String) = { val resultSet = runOnAllNodes(sql) ClusterResultSet(resultSet) } def createDbCluster(dbName: String) = { runOnAllNodes(s"create database if not exists $dbName") .count(x => x._2 == null) } def dropDbCluster(dbName: String) = { runOnAllNodes(s"DROP DATABASE IF EXISTS $dbName") .count(x => x._2 == null) } def dropTableCluster(dbName: String, tbName: String) = { runOnAllNodes(s"DROP TABLE IF EXISTS $dbName.$tbName") .count(x => x._2 == null) } def getClusterNodes() = { val clusterName = isClusterNameProvided() using(ds.getConnection) { conn => val statement = conn.createStatement() val rs = statement.executeQuery(s"select host_name, host_address from system.clusters where cluster == '$clusterName'") val r = rs.map(x => x.getString("host_address")) require(r.nonEmpty, s"cluster $clusterNameO not found") r } } private def runOnAllNodes(sql: String) = { getClusterNodes().map { nodeIp => val nodeDs = ClickHouseConnectionFactory.get(nodeIp) val client = ClickHouseClient()(nodeDs) (nodeIp, client.query(sql)) } } private def isClusterNameProvided() = { clusterNameO match { case None => throw new Exception("cluster name is requires") case Some(clusterName) => clusterName } } }