package mobvista.dmp.datasource.realtime.service; import ch.qos.logback.classic.Logger; import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.joran.JoranConfigurator; import ch.qos.logback.core.joran.spi.JoranException; import com.google.common.util.concurrent.ThreadFactoryBuilder; import mobvista.dmp.datasource.baichuan.AsoDevice; import mobvista.dmp.datasource.baichuan.BaiChuanServer; import mobvista.dmp.util.DateUtil; import mobvista.dmp.util.MySQLUtil; import mobvista.dmp.util.PropertyUtil; import org.slf4j.LoggerFactory; import ru.yandex.clickhouse.ClickHouseConnection; import ru.yandex.clickhouse.ClickHouseDataSource; import ru.yandex.clickhouse.ClickHousePreparedStatement; import ru.yandex.clickhouse.except.ClickHouseException; import ru.yandex.clickhouse.settings.ClickHouseProperties; import java.sql.SQLException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.HashSet; import java.util.Random; import java.util.Set; import java.util.concurrent.*; /** * @package: mobvista.dmp.datasource.realtime.service * @author: wangjf * @date: 2019-10-24 * @time: 17:29 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ public class RealTimeServiceMain { private static final String[] SET_VALUES = PropertyUtil.getProperty("config.properties", "http.private.host.server.ip").split(","); private static String driver = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.driverClassName"); private static String url = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.url"); private static String username = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.username"); private static String password = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.password"); private static String database = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.database"); private static int timeout = Integer.parseInt(PropertyUtil.getProperty("config.properties", "datasource.clickhouse.timeout")); private static Set<AsoDevice> deviceInfoSet = new CopyOnWriteArraySet<>(); public static void main(String[] args) throws JoranException { LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(context); context.reset(); configurator.doConfigure(BaiChuanServer.class.getClassLoader().getResourceAsStream("logback-syslog.xml")); Logger logger = context.getLogger("realtime-service"); String date = ""; String hour = ""; // String region = ""; int expire = -12; if (args.length >= 2) { date = args[0]; hour = args[1]; // region = args[2]; } else { logger.info("Please Input date hour region"); System.exit(1); } long start = System.currentTimeMillis(); ClickHouseProperties properties = new ClickHouseProperties(); properties.setUser(username); properties.setPassword(password); properties.setDatabase(database); properties.setSocketTimeout(timeout); properties.setConnectionTimeout(timeout); ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("realtime-service-%d").build(); ExecutorService realtimePool = new ThreadPoolExecutor(3, 5, 360L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); for (int ipId = 0; ipId < SET_VALUES.length; ipId++) { int finalIpId = ipId; String finalDate = date; String finalHour = hour; // String finalRegion = region; realtimePool.execute(() -> { String[] ips = SET_VALUES[finalIpId].split(":"); ClickHouseDataSource dataSource = new ClickHouseDataSource(url.replace("host", ips[new Random().nextInt(2)]), properties); ClickHouseConnection connection = null; ClickHousePreparedStatement preparedStatement = null; ClickHousePreparedStatement preparedStatementPart = null; SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd"); SimpleDateFormat sdf2 = new SimpleDateFormat("yyyyMMdd"); try { try { connection = dataSource.getConnection(); } catch (ClickHouseException e) { Thread.sleep(500); System.exit(255); } assert connection != null; /* preparedStatementPart = (ClickHousePreparedStatement) connection.prepareStatement(getLastPart("dwh", "ods_user_info")); ResultSet resultSet = preparedStatementPart.executeQuery(); String partition = ""; while (resultSet.next()) { partition = resultSet.getString("partition"); } */ String partition = MySQLUtil.getLastPartition("dwh", "ods_user_info"); String dt = sdf1.format(sdf2.parse(partition)); Set<String> regionSet = new HashSet<>(); regionSet.add("cn"); regionSet.add("tokyo"); regionSet.add("virginia"); for (String region : regionSet) { String part = sdf2.format(sdf1.parse(finalDate)); preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(dropPartition(part, finalHour, region)); preparedStatement.execute(); Thread.sleep(1000); } String expireDate = DateUtil.getDayByString(finalDate, "yyyy-MM-dd", expire); System.out.println("SQL ===>>> " + buildSql(finalDate, finalHour, dt, expireDate)); preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(buildSql(finalDate, finalHour, dt, expireDate)); preparedStatement.execute(); } catch (SQLException | InterruptedException | ParseException e) { System.exit(255); e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (SQLException e) { e.printStackTrace(); } } if (preparedStatement != null) { try { preparedStatement.close(); } catch (SQLException e) { e.printStackTrace(); } } if (preparedStatementPart != null) { try { preparedStatementPart.close(); } catch (SQLException e) { e.printStackTrace(); } } } }); } realtimePool.shutdown(); long end = System.currentTimeMillis(); logger.info("all runtime ==>> " + (end - start)); } private static String buildSql(String date, String hour, String dt, String expireDate) { // AND a.publish_date > '@expire_date' String insertSql = "INSERT INTO dwc.realtime_service_result (dt, hour, region, device_id, country, age, gender, install_apps, interest, frequency, flag) " + "SELECT b.dt, b.hour, arrayJoin(arrayPushBack(a.region, b.region)) region, b.device_id, UPPER(COALESCE(b.country,a.country)) country, if(b.age != 0,b.age,a.age) age, " + "if(b.gender != 0,b.gender,a.gender) gender," + "arrayDistinct(arrayConcat(a.install_apps,b.install_apps)) install_apps, arrayDistinct(arrayConcat(a.interest,b.interest)) interest, a.frequency, " + "if(hasAll(a.install_apps,b.install_apps) AND a.publish_date > '@expire_date', 0, 1) flag FROM (SELECT * FROM dwh.ods_user_info WHERE dt = '@partition') a ALL INNER JOIN (SELECT * FROM dwh.realtime_service_hour_all) b " + "USING device_id WHERE b.dt = '@date' AND b.hour = '@hour'"; // .replace("@region", region) return insertSql.replace("@date", date) .replace("@hour", hour) .replace("@partition", dt) .replace("@expire_date", expireDate); } private static String getLastPartitionFromMySQL(String database, String table) { String sql = "SELECT partition FROM system.parts WHERE database = '@database' AND table = '@table' GROUP BY partition ORDER BY partition DESC LIMIT 1"; return sql.replace("@database", database).replace("@table", table); } private static String getLastPart(String database, String table) { String sql = "SELECT partition FROM system.parts WHERE database = '@database' AND table = '@table' GROUP BY partition ORDER BY partition DESC LIMIT 1"; return sql.replace("@database", database).replace("@table", table); } private static String dropPartition(String part, String hour, String region) { String dropSql = "ALTER TABLE dwc.realtime_service_result DROP PARTITION (@part,'@hour','@region')"; return dropSql.replace("@part", part) .replace("@hour", hour) .replace("@region", region); } }