From 9011af3715868a33465f03074061b5d08ba0ba82 Mon Sep 17 00:00:00 2001 From: WangJinfeng <wjf20110627@163.com> Date: Wed, 4 Aug 2021 17:49:09 +0800 Subject: [PATCH] update Datatory logic --- src/main/java/mobvista/dmp/datasource/datatory/Datatory3SMain.java | 81 +++++++++++++++++++++++++++++---------------------------------------------------- src/main/java/mobvista/dmp/datasource/datatory/DatatoryAdnMain.java | 81 +++++++++++++++++++++++++++++++-------------------------------------------------- src/main/java/mobvista/dmp/datasource/joypac/JoypacMain.java | 59 +++++++++++++++++++++++++++++++++-------------------------- 3 files changed, 93 insertions(+), 128 deletions(-) diff --git a/src/main/java/mobvista/dmp/datasource/datatory/Datatory3SMain.java b/src/main/java/mobvista/dmp/datasource/datatory/Datatory3SMain.java index ee2edc2..232f9b8 100644 --- a/src/main/java/mobvista/dmp/datasource/datatory/Datatory3SMain.java +++ b/src/main/java/mobvista/dmp/datasource/datatory/Datatory3SMain.java @@ -67,26 +67,49 @@ public class Datatory3SMain { properties.setSocketTimeout(timeout); properties.setConnectionTimeout(timeout); - /** - * foreach runAllNode - */ - ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("datatory-service-%d").build(); ExecutorService datatoryPool = new ThreadPoolExecutor(3, 5, 360L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); + + int randomIp = new Random().nextInt(3); + ClickHousePreparedStatement preparedStatementDrop = null; + logger.info(dropPartition(part)); + ClickHouseConnection dropConnection = null; + try { + dropConnection = ClickHouseJdbc.connectionByTime(randomIp); + } catch (SQLException | InterruptedException e) { + try { + dropConnection = ClickHouseJdbc.connectionByTime(randomIp); + } catch (SQLException | InterruptedException ex) { + logger.info("ClickHouse Connection Failure!"); + } + } + try { + preparedStatementDrop = (ClickHousePreparedStatement) dropConnection.prepareStatement(dropPartition(part)); + preparedStatementDrop.execute(); + Thread.sleep(120000); + } catch (SQLException | InterruptedException e) { + e.printStackTrace(); + } finally { + try { + preparedStatementDrop.close(); + dropConnection.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + for (int ipId = 0; ipId < SET_VALUES.length; ipId++) { int finalIpId = ipId; String finalDate = date; - String finalPart = part; datatoryPool.execute(() -> { String[] ips = SET_VALUES[finalIpId].split(":"); ClickHouseDataSource dataSource = new ClickHouseDataSource(url.replace("host", ips[new Random().nextInt(2)]), properties); ClickHouseConnection connection = null; ClickHousePreparedStatement preparedStatement = null; - ClickHousePreparedStatement preparedStatementDrop = null; try { try { connection = ClickHouseJdbc.connectionByTime(finalIpId); @@ -94,10 +117,6 @@ public class Datatory3SMain { Thread.sleep(200); } assert connection != null; - logger.info(dropPartition(finalPart)); - preparedStatementDrop = (ClickHousePreparedStatement) connection.prepareStatement(dropPartition(finalPart)); - preparedStatementDrop.execute(); - Thread.sleep(120000); logger.info(buildSql(finalDate)); preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(buildSql(finalDate)); preparedStatement.execute(); @@ -129,52 +148,10 @@ public class Datatory3SMain { e.printStackTrace(); } } - if (preparedStatementDrop != null) { - try { - preparedStatementDrop.close(); - } catch (SQLException e) { - e.printStackTrace(); - } - try { - Thread.sleep(200); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } } }); } - /* - for (int ipId = 0; ipId < SET_VALUES.length; ipId++) { - // Map<Integer, Set<String>> map = Collections.synchronizedMap(new HashMap<>(10)); - ClickHouseDataSource dataSource = new ClickHouseDataSource(url.replace("host", SET_VALUES[ipId]), properties); - ClickHouseConnection connection = null; - ClickHousePreparedStatement preparedStatement = null; - try { - try { - connection = dataSource.getConnection(); - } catch (ClickHouseException e) { - Thread.sleep(200); - } - assert connection != null; - preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(buildSql(date)); - preparedStatement.execute(); - } catch (SQLException | InterruptedException e) { - e.printStackTrace(); - } finally { - if (connection != null) { - connection.close(); - Thread.sleep(200); - } - if (preparedStatement != null) { - preparedStatement.close(); - Thread.sleep(200); - } - } - } - */ - long end = System.currentTimeMillis(); logger.info("all runtime ==>> " + (end - start)); datatoryPool.shutdown(); diff --git a/src/main/java/mobvista/dmp/datasource/datatory/DatatoryAdnMain.java b/src/main/java/mobvista/dmp/datasource/datatory/DatatoryAdnMain.java index 7981daa..a513792 100644 --- a/src/main/java/mobvista/dmp/datasource/datatory/DatatoryAdnMain.java +++ b/src/main/java/mobvista/dmp/datasource/datatory/DatatoryAdnMain.java @@ -67,26 +67,49 @@ public class DatatoryAdnMain { properties.setSocketTimeout(timeout); properties.setConnectionTimeout(timeout); - /** - * Thread - */ - ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat(" datatory-service-%d").build(); ExecutorService datatoryPool = new ThreadPoolExecutor(3, 5, 360L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); + + int randomIp = new Random().nextInt(3); + ClickHousePreparedStatement preparedStatementDrop = null; + logger.info(dropPartition(part)); + ClickHouseConnection dropConnection = null; + try { + dropConnection = ClickHouseJdbc.connectionByTime(randomIp); + } catch (SQLException | InterruptedException e) { + try { + dropConnection = ClickHouseJdbc.connectionByTime(randomIp); + } catch (SQLException | InterruptedException ex) { + logger.info("ClickHouse Connection Failure!"); + } + } + try { + preparedStatementDrop = (ClickHousePreparedStatement) dropConnection.prepareStatement(dropPartition(part)); + preparedStatementDrop.execute(); + Thread.sleep(120000); + } catch (SQLException | InterruptedException e) { + e.printStackTrace(); + } finally { + try { + preparedStatementDrop.close(); + dropConnection.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + for (int ipId = 0; ipId < SET_VALUES.length; ipId++) { int finalIpId = ipId; String finalDate = date; - String finalPart = part; datatoryPool.execute(() -> { String[] ips = SET_VALUES[finalIpId].split(":"); // ips[new Random().nextInt(2)] ClickHouseDataSource dataSource = new ClickHouseDataSource(url.replace("host", ips[new Random().nextInt(2)]), properties); ClickHouseConnection connection = null; ClickHousePreparedStatement preparedStatement = null; - ClickHousePreparedStatement preparedStatementDrop = null; try { try { connection = ClickHouseJdbc.connectionByTime(finalIpId); @@ -94,10 +117,7 @@ public class DatatoryAdnMain { Thread.sleep(200); } assert connection != null; - logger.info(dropPartition(finalPart)); - preparedStatementDrop = (ClickHousePreparedStatement) connection.prepareStatement(dropPartition(finalPart)); - preparedStatementDrop.execute(); - Thread.sleep(120000); + logger.info(buildSql(finalDate)); preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(buildSql(finalDate)); preparedStatement.execute(); @@ -129,49 +149,10 @@ public class DatatoryAdnMain { e.printStackTrace(); } } - if (preparedStatementDrop != null) { - try { - preparedStatementDrop.close(); - } catch (SQLException e) { - e.printStackTrace(); - } - try { - Thread.sleep(200); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } } }); } - /* - for (int ipId = 0; ipId < SET_VALUES.length; ipId++) { - ClickHouseDataSource dataSource = new ClickHouseDataSource(url.replace("host", SET_VALUES[ipId]), properties); - ClickHouseConnection connection = null; - ClickHousePreparedStatement preparedStatement = null; - try { - try { - connection = dataSource.getConnection(); - } catch (ClickHouseException e) { - Thread.sleep(200); - } - assert connection != null; - preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(buildSql(date)); - preparedStatement.execute(); - } catch (SQLException | InterruptedException e) { - e.printStackTrace(); - } finally { - if (connection != null) { - connection.close(); - Thread.sleep(200); - } - if (preparedStatement != null) { - preparedStatement.close(); - Thread.sleep(200); - } - } - } - */ + long end = System.currentTimeMillis(); logger.info("all runtime ==>> " + (end - start)); datatoryPool.shutdown(); diff --git a/src/main/java/mobvista/dmp/datasource/joypac/JoypacMain.java b/src/main/java/mobvista/dmp/datasource/joypac/JoypacMain.java index 42dc447..89aee55 100644 --- a/src/main/java/mobvista/dmp/datasource/joypac/JoypacMain.java +++ b/src/main/java/mobvista/dmp/datasource/joypac/JoypacMain.java @@ -15,6 +15,7 @@ import ru.yandex.clickhouse.ClickHousePreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.Random; import java.util.concurrent.*; /** @@ -46,7 +47,7 @@ public class JoypacMain { } long start = System.currentTimeMillis(); - /** + /* * Thread */ @@ -56,10 +57,37 @@ public class JoypacMain { ExecutorService datatoryPool = new ThreadPoolExecutor(3, 5, 360L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); + + int randomIp = new Random().nextInt(3); + ClickHousePreparedStatement preparedStatementDrop = null; + logger.info(dropPartition(part)); + ClickHouseConnection dropConnection = null; + try { + dropConnection = ClickHouseJdbc.connectionByTime(randomIp); + } catch (SQLException | InterruptedException e) { + try { + dropConnection = ClickHouseJdbc.connectionByTime(randomIp); + } catch (SQLException | InterruptedException ex) { + logger.info("ClickHouse Connection Failure!"); + } + } + try { + preparedStatementDrop = (ClickHousePreparedStatement) dropConnection.prepareStatement(dropPartition(part)); + preparedStatementDrop.execute(); + Thread.sleep(120000); + } catch (SQLException | InterruptedException e) { + e.printStackTrace(); + } finally { + try { + preparedStatementDrop.close(); + dropConnection.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + } for (int ipId = 0; ipId < ClickHouseJdbc.SET_VALUES.length; ipId++) { int finalIpId = ipId; String finalDate = date; - String finalPart = part; datatoryPool.execute(() -> { ClickHouseConnection connection = null; @@ -74,21 +102,14 @@ public class JoypacMain { } } - // String[] ips = SET_VALUES[finalIpId].split(":"); - // ClickHouseDataSource dataSource = new ClickHouseDataSource(URL.replace("host", ips[new Random().nextInt(2)]), properties); - // ClickHouseConnection connection = null; ClickHousePreparedStatement preparedStatement = null; - ClickHousePreparedStatement preparedStatementDrop = null; try { assert connection != null; - logger.info(dropPartition(finalPart)); - preparedStatementDrop = (ClickHousePreparedStatement) connection.prepareStatement(dropPartition(finalPart)); - preparedStatementDrop.execute(); - Thread.sleep(120000); + logger.info(buildSql(finalDate)); preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(buildSql(finalDate)); preparedStatement.execute(); - } catch (SQLException | InterruptedException e) { + } catch (SQLException e) { e.printStackTrace(); System.exit(255); } finally { @@ -116,18 +137,6 @@ public class JoypacMain { e.printStackTrace(); } } - if (preparedStatementDrop != null) { - try { - preparedStatementDrop.close(); - } catch (SQLException e) { - e.printStackTrace(); - } - try { - Thread.sleep(200); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } } }); } @@ -151,10 +160,8 @@ public class JoypacMain { } private static String dropPartition(String part) { - String dropSql = "ALTER TABLE dwh.joypac_insight_daily DROP PARTITION @part"; + String dropSql = "ALTER TABLE dwh.joypac_insight_daily ON CLUSTER cluster_1st DROP PARTITION @part"; return dropSql.replace("@part", part); } - - // SELECT COUNT(1) FROM (SELECT * FROM dwh.ods_user_info WHERE dt = '2019-12-22') a ALL INNER JOIN (SELECT * FROM dwh.joypac_daily_all) b USING device_id } -- libgit2 0.27.1