package mobvista.dmp.datasource.baichuan; import com.google.common.util.concurrent.ListenableFuture; import ru.yandex.clickhouse.except.ClickHouseUnknownException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ConcurrentModificationException; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; /** * @package: mobvista.dmp.datasource.baichuan * @author: wangjf * @date: 2019-08-26 * @time: 15:48 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ public class ClickHouseUtils { /** * @param appId 应用ID,1-tmail,2-taobao * @param appOs 设备ID类型,1-android,2-ios * @param offSet 偏移量 * @param size 每页查询设备量 * @return * @throws SQLException */ public static ResultSet query(Connection connection, String dt, String appId, String appOs, int offSet, int size) throws SQLException, InterruptedException { ResultSet resultSet; String sql = "SELECT device_id FROM etl_baichuan_daily_all WHERE dt = '@dt' AND app_id = @appId AND app_os = @appOs AND tag = 0 LIMIT @offSet, @size"; sql = sql.replace("@dt", dt) .replace("@appId", appId) .replace("@appOs", appOs).replace("@offSet", String.valueOf(offSet)).replace("@size", String.valueOf(size)); try { resultSet = connection.createStatement().executeQuery(sql); return resultSet; } catch (ClickHouseUnknownException e) { Thread.sleep(200); resultSet = connection.createStatement().executeQuery(sql); return resultSet; } } public static ResultSet queryAli(Connection connection, String dt, String appId, String appOs, int offSet, int size) throws SQLException, InterruptedException { ResultSet resultSet; String sql = "SELECT device_id, app_os FROM dwh.etl_baichuan_daily_all WHERE dt ='@dt' AND app_id = @appId AND app_os = @appOs AND tag = 0 LIMIT @offSet, @size"; sql = sql.replace("@dt", dt).replace("@appId", appId) .replace("@appOs", appOs).replace("@offSet", String.valueOf(offSet)) .replace("@size", String.valueOf(size)); try { resultSet = connection.createStatement().executeQuery(sql); return resultSet; } catch (ClickHouseUnknownException e) { Thread.sleep(200); resultSet = connection.createStatement().executeQuery(sql); return resultSet; } } public static ResultSet queryExcludeAli(Connection connection, String dt, int offSet, int size) throws SQLException, InterruptedException { ResultSet resultSet; String sql = "SELECT device_id, 2 app_os FROM baichuan_exclude_ios_daily_all WHERE dt ='@dt' LIMIT @offSet, @size"; sql = sql.replace("@dt", dt) .replace("@offSet", String.valueOf(offSet)).replace("@size", String.valueOf(size)); try { resultSet = connection.createStatement().executeQuery(sql); return resultSet; } catch (ClickHouseUnknownException e) { Thread.sleep(200); resultSet = connection.createStatement().executeQuery(sql); return resultSet; } } /** * @param asoDeviceSet 百川返回 is_new_device 为 false 的设备列表 * @throws SQLException */ public static void insert(Connection connection, AtomicReference<Set<AsoDevice>> asoDeviceSet) throws SQLException, InterruptedException, ConcurrentModificationException { PreparedStatement preparedStatement = null; try { /* String createSql = "CREATE TABLE IF NOT EXISTS dwh.baichuan_install_daily (dt Date, device_id String, device_type String, platform String, package_name String) ENGINE = MergeTree()" + " PARTITION BY toYYYYMMDD(dt) ORDER BY (dt, device_id) SETTINGS index_granularity = 8192"; connection.createStatement().execute(createSql); String createAllSql = "CREATE TABLE IF NOT EXISTS dwh.baichuan_install_daily_all (dt Date, device_id String, device_type String, platform String, package_name String) " + "ENGINE = Distributed(cluster_1st,dwh,baichuan_install_daily,rand())"; connection.createStatement().execute(createAllSql); */ preparedStatement = connection.prepareStatement("INSERT INTO dwh.baichuan_install_daily (dt, device_id, device_type, platform, package_name) VALUES (?, ?, ?, ?, ?)"); for (AsoDevice deviceInfo : asoDeviceSet.get()) { preparedStatement.setString(1, deviceInfo.getDt()); preparedStatement.setString(2, deviceInfo.getDeviceId()); preparedStatement.setString(3, deviceInfo.getDeviceType()); preparedStatement.setString(4, deviceInfo.getPlatform()); preparedStatement.setString(5, deviceInfo.getPackageName()); preparedStatement.addBatch(); } preparedStatement.executeBatch(); } catch (ClickHouseUnknownException | NullPointerException e) { Thread.sleep(200); assert preparedStatement != null; preparedStatement.executeBatch(); } finally { if (preparedStatement != null) { preparedStatement.close(); } } } /** * @throws SQLException */ public static void insert2(Connection connection, List<ListenableFuture<AsoDevice>> futures) throws SQLException, InterruptedException, ConcurrentModificationException { PreparedStatement preparedStatement = null; try { preparedStatement = connection.prepareStatement("INSERT INTO dwh.baichuan_install_daily (dt, device_id, device_type, platform, package_name) VALUES (?, ?, ?, ?, ?)"); for (ListenableFuture<AsoDevice> future : futures) { try { AsoDevice iQiYiClass = future.get(); preparedStatement.setString(1, iQiYiClass.getDt()); preparedStatement.setString(2, iQiYiClass.getDeviceId()); preparedStatement.setString(3, iQiYiClass.getDeviceType()); preparedStatement.setString(4, iQiYiClass.getPlatform()); preparedStatement.setString(5, iQiYiClass.getPackageName()); preparedStatement.addBatch(); } catch (ExecutionException | NullPointerException e) { // NO } } preparedStatement.executeBatch(); } catch (ClickHouseUnknownException | NullPointerException e) { Thread.sleep(200); assert preparedStatement != null; preparedStatement.executeBatch(); } finally { if (preparedStatement != null) { preparedStatement.close(); } } } public static void insert(Connection connection, Set<String> deviceSet, boolean isNewDevice) throws SQLException, InterruptedException { PreparedStatement preparedStatement = null; try { /* String createSql = "CREATE TABLE IF NOT EXISTS dwh.baichuan_install_daily (dt Date, device_id String, device_type String, platform String, package_name String) ENGINE = MergeTree()" + " PARTITION BY toYYYYMMDD(dt) ORDER BY (dt, device_id) SETTINGS index_granularity = 8192"; connection.createStatement().execute(createSql); String createAllSql = "CREATE TABLE IF NOT EXISTS dwh.baichuan_install_daily_all (dt Date, device_id String, device_type String, platform String, package_name String) " + "ENGINE = Distributed(cluster_1st,dwh,baichuan_install_daily,rand())"; connection.createStatement().execute(createAllSql); */ preparedStatement = connection.prepareStatement("INSERT INTO dwh.baichuan_install_daily (dt, device_id, device_type, platform, package_name) VALUES (?, ?, ?, ?, ?)"); for (String deviceInfo : deviceSet) { // preparedStatement.setString(1, deviceInfo.getDt()); // preparedStatement.setString(2, deviceInfo.getDeviceId()); // preparedStatement.setString(3, deviceInfo.getDeviceType()); // preparedStatement.setString(4, deviceInfo.getPlatform()); // preparedStatement.setString(5, deviceInfo.getPackageName()); preparedStatement.addBatch(); } preparedStatement.executeBatch(); } catch (ClickHouseUnknownException | NullPointerException e) { Thread.sleep(200); assert preparedStatement != null; preparedStatement.executeBatch(); } finally { if (preparedStatement != null) { preparedStatement.close(); } } } /** * @param list 设备列表 * @param isNewDevice 是否是新用户 * @throws SQLException */ public static void update(Connection connection, Set<String> list, boolean isNewDevice) throws SQLException, InterruptedException { StringBuilder updateSql = new StringBuilder(); try { updateSql.append("ALTER TABLE dwh.etl_baichuan_daily "); /** * @parm tag 用户标注该设备是 0-未识别,1-新用户,2-老用户 */ if (isNewDevice) { updateSql.append("UPDATE tag = 1"); } else { updateSql.append("UPDATE tag = 2"); } StringBuilder inSql = new StringBuilder(); for (String device : list) { inSql.append("'").append(device).append("',"); } updateSql.append(" WHERE device_id IN (").append(inSql, 0, inSql.length() - 1).append(")"); connection.createStatement().execute(String.valueOf(updateSql)); } catch (ClickHouseUnknownException | NullPointerException e) { Thread.sleep(200); assert connection != null; connection.createStatement().execute(String.valueOf(updateSql)); } } }