package mobvista.dmp.utils.clickhouse

import mobvista.dmp.utils.clickhouse.ClickHouseResultSetExt._
import mobvista.dmp.utils.clickhouse.Utils._
import ru.yandex.clickhouse.ClickHouseDataSource

/**
  * @package: mobvista.dmp.utils.clickhouse
  * @author: wangjf
  * @date: 2019/07/16
  * @time: 17:56:56
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */

case class ClickHouseClient(clusterNameO: Option[String] = None)
                           (implicit ds: ClickHouseDataSource) {

  def createDb(dbName: String) {
    query(s"create database if not exists $dbName")
  }

  def dropDb(dbName: String) {
    query(s"DROP DATABASE IF EXISTS $dbName")
  }

  def dropTable(dbName: String, tbName: String) {
    query(s"DROP TABLE IF EXISTS $dbName.$tbName")
  }

  def query(sql: String) = {
    using(ds.getConnection) { conn =>
      val statement = conn.createStatement()
      val rs = statement.executeQuery(sql)
      rs
    }
  }

  def queryCluster(sql: String) = {
    val resultSet = runOnAllNodes(sql)
    ClusterResultSet(resultSet)
  }

  def createDbCluster(dbName: String) = {
    runOnAllNodes(s"create database if not exists $dbName")
      .count(x => x._2 == null)
  }

  def dropDbCluster(dbName: String) = {
    runOnAllNodes(s"DROP DATABASE IF EXISTS $dbName")
      .count(x => x._2 == null)
  }

  def dropTableCluster(dbName: String, tbName: String) = {
    runOnAllNodes(s"DROP TABLE IF EXISTS $dbName.$tbName")
      .count(x => x._2 == null)
  }

  def getClusterNodes() = {
    val clusterName = isClusterNameProvided()
    using(ds.getConnection) { conn =>
      val statement = conn.createStatement()
      val rs = statement.executeQuery(s"select host_name, host_address from system.clusters where cluster == '$clusterName'")
      val r = rs.map(x => x.getString("host_address"))
      require(r.nonEmpty, s"cluster $clusterNameO not found")
      r
    }
  }

  private def runOnAllNodes(sql: String) = {
    getClusterNodes().map { nodeIp =>
      val nodeDs = ClickHouseConnectionFactory.get(nodeIp)
      val client = ClickHouseClient()(nodeDs)
      (nodeIp, client.query(sql))
    }
  }

  private def isClusterNameProvided() = {
    clusterNameO match {
      case None => throw new Exception("cluster name is requires")
      case Some(clusterName) => clusterName
    }
  }
}