package mobvista.dmp.datasource.joypac; import ch.qos.logback.classic.Logger; import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.joran.JoranConfigurator; import ch.qos.logback.core.joran.spi.JoranException; import com.google.common.util.concurrent.ThreadFactoryBuilder; import mobvista.dmp.common.ClickHouseJdbc; import mobvista.dmp.datasource.baichuan.BaiChuanServer; import mobvista.dmp.util.DateUtil; import mobvista.dmp.util.MySQLUtil; import org.slf4j.LoggerFactory; import ru.yandex.clickhouse.ClickHouseConnection; import ru.yandex.clickhouse.ClickHousePreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.concurrent.*; /** * @package: mobvista.dmp.datasource.joypac * @author: wangjf * @date: 2019-12-24 * @time: 16:18:02 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ public class JoypacMain { public static void main(String[] args) throws JoranException { LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(context); context.reset(); configurator.doConfigure(BaiChuanServer.class.getClassLoader().getResourceAsStream("logback-syslog.xml")); Logger logger = context.getLogger("Joypac"); String date = ""; String part = ""; if (args.length >= 1) { part = args[0]; date = DateUtil.format(DateUtil.parse(part, "yyyyMMdd"), "yyyy-MM-dd"); } else { logger.info("Please Input Partition Date"); System.exit(1); } long start = System.currentTimeMillis(); /** * Thread */ ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("joypac-service-%d").build(); ExecutorService datatoryPool = new ThreadPoolExecutor(3, 5, 360L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); for (int ipId = 0; ipId < ClickHouseJdbc.SET_VALUES.length; ipId++) { int finalIpId = ipId; String finalDate = date; String finalPart = part; datatoryPool.execute(() -> { ClickHouseConnection connection = null; ResultSet resultSet; try { connection = ClickHouseJdbc.connectionByTime(finalIpId); } catch (SQLException | InterruptedException e) { try { connection = ClickHouseJdbc.connectionByTime(finalIpId); } catch (SQLException | InterruptedException ex) { logger.info("ClickHouse Connection Failure!"); } } // String[] ips = SET_VALUES[finalIpId].split(":"); // ClickHouseDataSource dataSource = new ClickHouseDataSource(URL.replace("host", ips[new Random().nextInt(2)]), properties); // ClickHouseConnection connection = null; ClickHousePreparedStatement preparedStatement = null; ClickHousePreparedStatement preparedStatementDrop = null; try { assert connection != null; logger.info(dropPartition(finalPart)); preparedStatementDrop = (ClickHousePreparedStatement) connection.prepareStatement(dropPartition(finalPart)); preparedStatementDrop.execute(); Thread.sleep(120000); logger.info(buildSql(finalDate)); preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(buildSql(finalDate)); preparedStatement.execute(); } catch (SQLException | InterruptedException e) { e.printStackTrace(); System.exit(255); } finally { if (connection != null) { try { connection.close(); } catch (SQLException e) { e.printStackTrace(); } try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } if (preparedStatement != null) { try { preparedStatement.close(); } catch (SQLException e) { e.printStackTrace(); } try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } if (preparedStatementDrop != null) { try { preparedStatementDrop.close(); } catch (SQLException e) { e.printStackTrace(); } try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } }); } long end = System.currentTimeMillis(); logger.info("all runtime ==>> " + (end - start)); datatoryPool.shutdown(); } private static String buildSql(String date) { String partition = MySQLUtil.getLastPartition("dwh", "ods_user_info"); String dt = DateUtil.format(DateUtil.parse(partition, "yyyyMMdd"), "yyyy-MM-dd"); String insertSql = "INSERT INTO dwh.joypac_insight_daily (device_id, country, age, gender, install_apps, interest, dt) " + "SELECT device_id, a.country, a.age, a.gender, install_apps, interest, b.dt FROM (" + "SELECT * FROM dwh.ods_user_info WHERE dt = '@dt' AND update_date = '@dt') a ALL INNER JOIN (SELECT * FROM dwh.joypac_daily_all) b USING device_id WHERE b.dt = '@date'"; return insertSql.replace("@date", date) .replace("@dt", dt) .replace("@update_date", dt); } private static String dropPartition(String part) { String dropSql = "ALTER TABLE dwh.joypac_insight_daily DROP PARTITION @part"; return dropSql.replace("@part", part); } // SELECT COUNT(1) FROM (SELECT * FROM dwh.ods_user_info WHERE dt = '2019-12-22') a ALL INNER JOIN (SELECT * FROM dwh.joypac_daily_all) b USING device_id }