Commit 9011af37 by WangJinfeng

update Datatory logic

parent 2dc96cc2
...@@ -67,26 +67,49 @@ public class Datatory3SMain { ...@@ -67,26 +67,49 @@ public class Datatory3SMain {
properties.setSocketTimeout(timeout); properties.setSocketTimeout(timeout);
properties.setConnectionTimeout(timeout); properties.setConnectionTimeout(timeout);
/**
* foreach runAllNode
*/
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("datatory-service-%d").build(); .setNameFormat("datatory-service-%d").build();
ExecutorService datatoryPool = new ThreadPoolExecutor(3, 5, ExecutorService datatoryPool = new ThreadPoolExecutor(3, 5,
360L, TimeUnit.MILLISECONDS, 360L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
int randomIp = new Random().nextInt(3);
ClickHousePreparedStatement preparedStatementDrop = null;
logger.info(dropPartition(part));
ClickHouseConnection dropConnection = null;
try {
dropConnection = ClickHouseJdbc.connectionByTime(randomIp);
} catch (SQLException | InterruptedException e) {
try {
dropConnection = ClickHouseJdbc.connectionByTime(randomIp);
} catch (SQLException | InterruptedException ex) {
logger.info("ClickHouse Connection Failure!");
}
}
try {
preparedStatementDrop = (ClickHousePreparedStatement) dropConnection.prepareStatement(dropPartition(part));
preparedStatementDrop.execute();
Thread.sleep(120000);
} catch (SQLException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
preparedStatementDrop.close();
dropConnection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
for (int ipId = 0; ipId < SET_VALUES.length; ipId++) { for (int ipId = 0; ipId < SET_VALUES.length; ipId++) {
int finalIpId = ipId; int finalIpId = ipId;
String finalDate = date; String finalDate = date;
String finalPart = part;
datatoryPool.execute(() -> { datatoryPool.execute(() -> {
String[] ips = SET_VALUES[finalIpId].split(":"); String[] ips = SET_VALUES[finalIpId].split(":");
ClickHouseDataSource dataSource = new ClickHouseDataSource(url.replace("host", ips[new Random().nextInt(2)]), properties); ClickHouseDataSource dataSource = new ClickHouseDataSource(url.replace("host", ips[new Random().nextInt(2)]), properties);
ClickHouseConnection connection = null; ClickHouseConnection connection = null;
ClickHousePreparedStatement preparedStatement = null; ClickHousePreparedStatement preparedStatement = null;
ClickHousePreparedStatement preparedStatementDrop = null;
try { try {
try { try {
connection = ClickHouseJdbc.connectionByTime(finalIpId); connection = ClickHouseJdbc.connectionByTime(finalIpId);
...@@ -94,10 +117,6 @@ public class Datatory3SMain { ...@@ -94,10 +117,6 @@ public class Datatory3SMain {
Thread.sleep(200); Thread.sleep(200);
} }
assert connection != null; assert connection != null;
logger.info(dropPartition(finalPart));
preparedStatementDrop = (ClickHousePreparedStatement) connection.prepareStatement(dropPartition(finalPart));
preparedStatementDrop.execute();
Thread.sleep(120000);
logger.info(buildSql(finalDate)); logger.info(buildSql(finalDate));
preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(buildSql(finalDate)); preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(buildSql(finalDate));
preparedStatement.execute(); preparedStatement.execute();
...@@ -129,52 +148,10 @@ public class Datatory3SMain { ...@@ -129,52 +148,10 @@ public class Datatory3SMain {
e.printStackTrace(); e.printStackTrace();
} }
} }
if (preparedStatementDrop != null) {
try {
preparedStatementDrop.close();
} catch (SQLException e) {
e.printStackTrace();
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} }
}); });
} }
/*
for (int ipId = 0; ipId < SET_VALUES.length; ipId++) {
// Map<Integer, Set<String>> map = Collections.synchronizedMap(new HashMap<>(10));
ClickHouseDataSource dataSource = new ClickHouseDataSource(url.replace("host", SET_VALUES[ipId]), properties);
ClickHouseConnection connection = null;
ClickHousePreparedStatement preparedStatement = null;
try {
try {
connection = dataSource.getConnection();
} catch (ClickHouseException e) {
Thread.sleep(200);
}
assert connection != null;
preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(buildSql(date));
preparedStatement.execute();
} catch (SQLException | InterruptedException e) {
e.printStackTrace();
} finally {
if (connection != null) {
connection.close();
Thread.sleep(200);
}
if (preparedStatement != null) {
preparedStatement.close();
Thread.sleep(200);
}
}
}
*/
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
logger.info("all runtime ==>> " + (end - start)); logger.info("all runtime ==>> " + (end - start));
datatoryPool.shutdown(); datatoryPool.shutdown();
......
...@@ -67,26 +67,49 @@ public class DatatoryAdnMain { ...@@ -67,26 +67,49 @@ public class DatatoryAdnMain {
properties.setSocketTimeout(timeout); properties.setSocketTimeout(timeout);
properties.setConnectionTimeout(timeout); properties.setConnectionTimeout(timeout);
/**
* Thread
*/
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat(" datatory-service-%d").build(); .setNameFormat(" datatory-service-%d").build();
ExecutorService datatoryPool = new ThreadPoolExecutor(3, 5, ExecutorService datatoryPool = new ThreadPoolExecutor(3, 5,
360L, TimeUnit.MILLISECONDS, 360L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
int randomIp = new Random().nextInt(3);
ClickHousePreparedStatement preparedStatementDrop = null;
logger.info(dropPartition(part));
ClickHouseConnection dropConnection = null;
try {
dropConnection = ClickHouseJdbc.connectionByTime(randomIp);
} catch (SQLException | InterruptedException e) {
try {
dropConnection = ClickHouseJdbc.connectionByTime(randomIp);
} catch (SQLException | InterruptedException ex) {
logger.info("ClickHouse Connection Failure!");
}
}
try {
preparedStatementDrop = (ClickHousePreparedStatement) dropConnection.prepareStatement(dropPartition(part));
preparedStatementDrop.execute();
Thread.sleep(120000);
} catch (SQLException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
preparedStatementDrop.close();
dropConnection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
for (int ipId = 0; ipId < SET_VALUES.length; ipId++) { for (int ipId = 0; ipId < SET_VALUES.length; ipId++) {
int finalIpId = ipId; int finalIpId = ipId;
String finalDate = date; String finalDate = date;
String finalPart = part;
datatoryPool.execute(() -> { datatoryPool.execute(() -> {
String[] ips = SET_VALUES[finalIpId].split(":"); // ips[new Random().nextInt(2)] String[] ips = SET_VALUES[finalIpId].split(":"); // ips[new Random().nextInt(2)]
ClickHouseDataSource dataSource = new ClickHouseDataSource(url.replace("host", ips[new Random().nextInt(2)]), properties); ClickHouseDataSource dataSource = new ClickHouseDataSource(url.replace("host", ips[new Random().nextInt(2)]), properties);
ClickHouseConnection connection = null; ClickHouseConnection connection = null;
ClickHousePreparedStatement preparedStatement = null; ClickHousePreparedStatement preparedStatement = null;
ClickHousePreparedStatement preparedStatementDrop = null;
try { try {
try { try {
connection = ClickHouseJdbc.connectionByTime(finalIpId); connection = ClickHouseJdbc.connectionByTime(finalIpId);
...@@ -94,10 +117,7 @@ public class DatatoryAdnMain { ...@@ -94,10 +117,7 @@ public class DatatoryAdnMain {
Thread.sleep(200); Thread.sleep(200);
} }
assert connection != null; assert connection != null;
logger.info(dropPartition(finalPart));
preparedStatementDrop = (ClickHousePreparedStatement) connection.prepareStatement(dropPartition(finalPart));
preparedStatementDrop.execute();
Thread.sleep(120000);
logger.info(buildSql(finalDate)); logger.info(buildSql(finalDate));
preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(buildSql(finalDate)); preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(buildSql(finalDate));
preparedStatement.execute(); preparedStatement.execute();
...@@ -129,49 +149,10 @@ public class DatatoryAdnMain { ...@@ -129,49 +149,10 @@ public class DatatoryAdnMain {
e.printStackTrace(); e.printStackTrace();
} }
} }
if (preparedStatementDrop != null) {
try {
preparedStatementDrop.close();
} catch (SQLException e) {
e.printStackTrace();
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} }
}); });
} }
/*
for (int ipId = 0; ipId < SET_VALUES.length; ipId++) {
ClickHouseDataSource dataSource = new ClickHouseDataSource(url.replace("host", SET_VALUES[ipId]), properties);
ClickHouseConnection connection = null;
ClickHousePreparedStatement preparedStatement = null;
try {
try {
connection = dataSource.getConnection();
} catch (ClickHouseException e) {
Thread.sleep(200);
}
assert connection != null;
preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(buildSql(date));
preparedStatement.execute();
} catch (SQLException | InterruptedException e) {
e.printStackTrace();
} finally {
if (connection != null) {
connection.close();
Thread.sleep(200);
}
if (preparedStatement != null) {
preparedStatement.close();
Thread.sleep(200);
}
}
}
*/
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
logger.info("all runtime ==>> " + (end - start)); logger.info("all runtime ==>> " + (end - start));
datatoryPool.shutdown(); datatoryPool.shutdown();
......
...@@ -15,6 +15,7 @@ import ru.yandex.clickhouse.ClickHousePreparedStatement; ...@@ -15,6 +15,7 @@ import ru.yandex.clickhouse.ClickHousePreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Random;
import java.util.concurrent.*; import java.util.concurrent.*;
/** /**
...@@ -46,7 +47,7 @@ public class JoypacMain { ...@@ -46,7 +47,7 @@ public class JoypacMain {
} }
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
/** /*
* Thread * Thread
*/ */
...@@ -56,10 +57,37 @@ public class JoypacMain { ...@@ -56,10 +57,37 @@ public class JoypacMain {
ExecutorService datatoryPool = new ThreadPoolExecutor(3, 5, ExecutorService datatoryPool = new ThreadPoolExecutor(3, 5,
360L, TimeUnit.MILLISECONDS, 360L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
int randomIp = new Random().nextInt(3);
ClickHousePreparedStatement preparedStatementDrop = null;
logger.info(dropPartition(part));
ClickHouseConnection dropConnection = null;
try {
dropConnection = ClickHouseJdbc.connectionByTime(randomIp);
} catch (SQLException | InterruptedException e) {
try {
dropConnection = ClickHouseJdbc.connectionByTime(randomIp);
} catch (SQLException | InterruptedException ex) {
logger.info("ClickHouse Connection Failure!");
}
}
try {
preparedStatementDrop = (ClickHousePreparedStatement) dropConnection.prepareStatement(dropPartition(part));
preparedStatementDrop.execute();
Thread.sleep(120000);
} catch (SQLException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
preparedStatementDrop.close();
dropConnection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
for (int ipId = 0; ipId < ClickHouseJdbc.SET_VALUES.length; ipId++) { for (int ipId = 0; ipId < ClickHouseJdbc.SET_VALUES.length; ipId++) {
int finalIpId = ipId; int finalIpId = ipId;
String finalDate = date; String finalDate = date;
String finalPart = part;
datatoryPool.execute(() -> { datatoryPool.execute(() -> {
ClickHouseConnection connection = null; ClickHouseConnection connection = null;
...@@ -74,21 +102,14 @@ public class JoypacMain { ...@@ -74,21 +102,14 @@ public class JoypacMain {
} }
} }
// String[] ips = SET_VALUES[finalIpId].split(":");
// ClickHouseDataSource dataSource = new ClickHouseDataSource(URL.replace("host", ips[new Random().nextInt(2)]), properties);
// ClickHouseConnection connection = null;
ClickHousePreparedStatement preparedStatement = null; ClickHousePreparedStatement preparedStatement = null;
ClickHousePreparedStatement preparedStatementDrop = null;
try { try {
assert connection != null; assert connection != null;
logger.info(dropPartition(finalPart));
preparedStatementDrop = (ClickHousePreparedStatement) connection.prepareStatement(dropPartition(finalPart));
preparedStatementDrop.execute();
Thread.sleep(120000);
logger.info(buildSql(finalDate)); logger.info(buildSql(finalDate));
preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(buildSql(finalDate)); preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(buildSql(finalDate));
preparedStatement.execute(); preparedStatement.execute();
} catch (SQLException | InterruptedException e) { } catch (SQLException e) {
e.printStackTrace(); e.printStackTrace();
System.exit(255); System.exit(255);
} finally { } finally {
...@@ -116,18 +137,6 @@ public class JoypacMain { ...@@ -116,18 +137,6 @@ public class JoypacMain {
e.printStackTrace(); e.printStackTrace();
} }
} }
if (preparedStatementDrop != null) {
try {
preparedStatementDrop.close();
} catch (SQLException e) {
e.printStackTrace();
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} }
}); });
} }
...@@ -151,10 +160,8 @@ public class JoypacMain { ...@@ -151,10 +160,8 @@ public class JoypacMain {
} }
private static String dropPartition(String part) { private static String dropPartition(String part) {
String dropSql = "ALTER TABLE dwh.joypac_insight_daily DROP PARTITION @part"; String dropSql = "ALTER TABLE dwh.joypac_insight_daily ON CLUSTER cluster_1st DROP PARTITION @part";
return dropSql.replace("@part", part); return dropSql.replace("@part", part);
} }
// SELECT COUNT(1) FROM (SELECT * FROM dwh.ods_user_info WHERE dt = '2019-12-22') a ALL INNER JOIN (SELECT * FROM dwh.joypac_daily_all) b USING device_id
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment