package mobvista.dmp.util; import ch.qos.logback.classic.Logger; import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.joran.JoranConfigurator; import ch.qos.logback.core.joran.spi.JoranException; import com.google.common.util.concurrent.ThreadFactoryBuilder; import mobvista.dmp.datasource.baichuan.BaiChuanServer; import org.slf4j.LoggerFactory; import ru.yandex.clickhouse.ClickHouseConnection; import ru.yandex.clickhouse.ClickHouseDataSource; import ru.yandex.clickhouse.ClickHousePreparedStatement; import ru.yandex.clickhouse.except.ClickHouseException; import ru.yandex.clickhouse.settings.ClickHouseProperties; import java.sql.SQLException; import java.text.SimpleDateFormat; import java.util.Random; import java.util.concurrent.*; /** * @package: mobvista.dmp.datasource.realtime.service * @author: wangjf * @date: 2019-10-24 * @time: 17:29 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ public class ClickHouseMain { private static final String[] SET_VALUES = PropertyUtil.getProperty("config.properties", "http.private.host.server.ip").split(","); private static String driver = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.driverClassName"); private static String url = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.url"); private static String username = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.username"); private static String password = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.password"); private static String database = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.database"); private static int timeout = Integer.parseInt(PropertyUtil.getProperty("config.properties", "datasource.clickhouse.timeout")); static SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd"); static SimpleDateFormat sdf2 = new SimpleDateFormat("yyyyMMdd"); public static void main(String[] args) throws JoranException { LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(context); context.reset(); configurator.doConfigure(BaiChuanServer.class.getClassLoader().getResourceAsStream("logback-syslog.xml")); Logger logger = context.getLogger("realtime-service"); String database = ""; String table = ""; if (args.length >= 2) { database = args[0]; table = args[1]; } else { logger.info("Please Input database table"); System.exit(1); } long start = System.currentTimeMillis(); ClickHouseProperties properties = new ClickHouseProperties(); properties.setUser(username); properties.setPassword(password); properties.setDatabase(database); properties.setSocketTimeout(timeout); properties.setConnectionTimeout(timeout); ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("dmp-ods_user_info-%d").build(); ExecutorService realtimePool = new ThreadPoolExecutor(3, 5, 360L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); String partition = MySQLUtil.getLastPartition(database, table); String dropPartition = DateUtil.getDayByString(partition, "yyyyMMdd", -1); for (int ipId = 0; ipId < SET_VALUES.length; ipId++) { int finalIpId = ipId; String finalDataBase = database; String finalTable = table; String finalPatition = dropPartition; realtimePool.execute(() -> { String[] ips = SET_VALUES[finalIpId].split(":"); // ips[new Random().nextInt(2)] ClickHouseDataSource dataSource = new ClickHouseDataSource(url.replace("host", ips[new Random().nextInt(2)]), properties); ClickHouseConnection connection = null; ClickHousePreparedStatement preparedStatement = null; try { try { connection = dataSource.getConnection(); } catch (ClickHouseException e) { Thread.sleep(1000); System.exit(255); } assert connection != null; String sql = dropPartition(finalDataBase, finalTable, finalPatition); logger.info("SQL ==>> " + sql); preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(sql); preparedStatement.execute(); } catch (SQLException | InterruptedException e) { e.printStackTrace(); System.exit(255); } finally { if (connection != null) { try { connection.close(); } catch (SQLException e) { e.printStackTrace(); } } if (preparedStatement != null) { try { preparedStatement.close(); } catch (SQLException e) { e.printStackTrace(); } } } }); } realtimePool.shutdown(); long end = System.currentTimeMillis(); logger.info("all runtime ==>> " + (end - start)); } private static String dropPartition(String database, String table, String part) { String dropSql = "ALTER TABLE @database.@table DROP PARTITION @part"; return dropSql.replace("@database", database) .replace("@table", table) .replace("@part", part); } }