package mobvista.dmp.datasource.baichuan;

import com.google.common.util.concurrent.ListenableFuture;
import ru.yandex.clickhouse.except.ClickHouseUnknownException;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;

/**
 * @package: mobvista.dmp.datasource.baichuan
 * @author: wangjf
 * @date: 2019-08-26
 * @time: 15:48
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
public class ClickHouseUtils {
    /**
     * @param appId  应用ID,1-tmail,2-taobao
     * @param appOs  设备ID类型,1-android,2-ios
     * @param offSet 偏移量
     * @param size   每页查询设备量
     * @return
     * @throws SQLException
     */
    public static ResultSet query(Connection connection, String dt, String appId, String appOs, int offSet, int size) throws SQLException, InterruptedException {
        ResultSet resultSet;
        String sql = "SELECT device_id FROM etl_baichuan_daily_all WHERE dt = '@dt' AND app_id = @appId AND app_os = @appOs AND tag = 0 LIMIT @offSet, @size";
        sql = sql.replace("@dt", dt)
                .replace("@appId", appId)
                .replace("@appOs", appOs).replace("@offSet", String.valueOf(offSet)).replace("@size", String.valueOf(size));
        try {
            resultSet = connection.createStatement().executeQuery(sql);
            return resultSet;
        } catch (ClickHouseUnknownException e) {
            Thread.sleep(200);
            resultSet = connection.createStatement().executeQuery(sql);
            return resultSet;
        }
    }

    public static ResultSet queryAli(Connection connection, String dt, String appId, String appOs, int offSet, int size) throws SQLException, InterruptedException {
        ResultSet resultSet;

        String sql = "SELECT device_id, app_os FROM dwh.etl_baichuan_daily_all WHERE dt ='@dt' AND app_id = @appId AND app_os = @appOs AND tag = 0 LIMIT @offSet, @size";

        sql = sql.replace("@dt", dt).replace("@appId", appId)
                .replace("@appOs", appOs).replace("@offSet", String.valueOf(offSet))
                .replace("@size", String.valueOf(size));
        try {
            resultSet = connection.createStatement().executeQuery(sql);
            return resultSet;
        } catch (ClickHouseUnknownException e) {
            Thread.sleep(200);
            resultSet = connection.createStatement().executeQuery(sql);
            return resultSet;
        }
    }

    public static ResultSet queryExcludeAli(Connection connection, String dt, int offSet, int size) throws SQLException, InterruptedException {
        ResultSet resultSet;

        String sql = "SELECT device_id, 2 app_os FROM baichuan_exclude_ios_daily_all WHERE dt ='@dt' LIMIT @offSet, @size";
        sql = sql.replace("@dt", dt)
                .replace("@offSet", String.valueOf(offSet)).replace("@size", String.valueOf(size));
        try {
            resultSet = connection.createStatement().executeQuery(sql);
            return resultSet;
        } catch (ClickHouseUnknownException e) {
            Thread.sleep(200);
            resultSet = connection.createStatement().executeQuery(sql);
            return resultSet;
        }
    }

    /**
     * @param asoDeviceSet 百川返回 is_new_device 为 false 的设备列表
     * @throws SQLException
     */
    public static void insert(Connection connection, AtomicReference<Set<AsoDevice>> asoDeviceSet) throws SQLException, InterruptedException, ConcurrentModificationException {
        PreparedStatement preparedStatement = null;
        try {
            /*
            String createSql = "CREATE TABLE IF NOT EXISTS dwh.baichuan_install_daily (dt Date, device_id String, device_type String, platform String, package_name String) ENGINE = MergeTree()" +
                    " PARTITION BY toYYYYMMDD(dt) ORDER BY (dt, device_id) SETTINGS index_granularity = 8192";
            connection.createStatement().execute(createSql);
            String createAllSql = "CREATE TABLE IF NOT EXISTS dwh.baichuan_install_daily_all (dt Date, device_id String, device_type String, platform String, package_name String) " +
                    "ENGINE = Distributed(cluster_1st,dwh,baichuan_install_daily,rand())";
            connection.createStatement().execute(createAllSql);
            */
            preparedStatement = connection.prepareStatement("INSERT INTO dwh.baichuan_install_daily (dt, device_id, device_type, platform, package_name) VALUES (?, ?, ?, ?, ?)");
            for (AsoDevice deviceInfo : asoDeviceSet.get()) {
                preparedStatement.setString(1, deviceInfo.getDt());
                preparedStatement.setString(2, deviceInfo.getDeviceId());
                preparedStatement.setString(3, deviceInfo.getDeviceType());
                preparedStatement.setString(4, deviceInfo.getPlatform());
                preparedStatement.setString(5, deviceInfo.getPackageName());
                preparedStatement.addBatch();
            }
            preparedStatement.executeBatch();
        } catch (ClickHouseUnknownException | NullPointerException e) {
            Thread.sleep(200);
            assert preparedStatement != null;
            preparedStatement.executeBatch();
        } finally {
            if (preparedStatement != null) {
                preparedStatement.close();
            }
        }
    }

    /**
     * @throws SQLException
     */
    public static void insert2(Connection connection, List<ListenableFuture<AsoDevice>> futures) throws SQLException, InterruptedException, ConcurrentModificationException {
        PreparedStatement preparedStatement = null;
        try {
            preparedStatement = connection.prepareStatement("INSERT INTO dwh.baichuan_install_daily (dt, device_id, device_type, platform, package_name) VALUES (?, ?, ?, ?, ?)");
            for (ListenableFuture<AsoDevice> future : futures) {
                try {
                    AsoDevice iQiYiClass = future.get();
                    preparedStatement.setString(1, iQiYiClass.getDt());
                    preparedStatement.setString(2, iQiYiClass.getDeviceId());
                    preparedStatement.setString(3, iQiYiClass.getDeviceType());
                    preparedStatement.setString(4, iQiYiClass.getPlatform());
                    preparedStatement.setString(5, iQiYiClass.getPackageName());
                    preparedStatement.addBatch();
                } catch (ExecutionException | NullPointerException e) {
                    //  NO
                }
            }
            preparedStatement.executeBatch();
        } catch (ClickHouseUnknownException | NullPointerException e) {
            Thread.sleep(200);
            assert preparedStatement != null;
            preparedStatement.executeBatch();
        } finally {
            if (preparedStatement != null) {
                preparedStatement.close();
            }
        }
    }

    public static void insert(Connection connection, Set<String> deviceSet, boolean isNewDevice) throws SQLException, InterruptedException {
        PreparedStatement preparedStatement = null;
        try {
            /*
            String createSql = "CREATE TABLE IF NOT EXISTS dwh.baichuan_install_daily (dt Date, device_id String, device_type String, platform String, package_name String) ENGINE = MergeTree()" +
                    " PARTITION BY toYYYYMMDD(dt) ORDER BY (dt, device_id) SETTINGS index_granularity = 8192";
            connection.createStatement().execute(createSql);
            String createAllSql = "CREATE TABLE IF NOT EXISTS dwh.baichuan_install_daily_all (dt Date, device_id String, device_type String, platform String, package_name String) " +
                    "ENGINE = Distributed(cluster_1st,dwh,baichuan_install_daily,rand())";
            connection.createStatement().execute(createAllSql);
            */
            preparedStatement = connection.prepareStatement("INSERT INTO dwh.baichuan_install_daily (dt, device_id, device_type, platform, package_name) VALUES (?, ?, ?, ?, ?)");
            for (String deviceInfo : deviceSet) {
                //  preparedStatement.setString(1, deviceInfo.getDt());
                //  preparedStatement.setString(2, deviceInfo.getDeviceId());
                //  preparedStatement.setString(3, deviceInfo.getDeviceType());
                //  preparedStatement.setString(4, deviceInfo.getPlatform());
                //  preparedStatement.setString(5, deviceInfo.getPackageName());
                preparedStatement.addBatch();
            }
            preparedStatement.executeBatch();
        } catch (ClickHouseUnknownException | NullPointerException e) {
            Thread.sleep(200);
            assert preparedStatement != null;
            preparedStatement.executeBatch();
        } finally {
            if (preparedStatement != null) {
                preparedStatement.close();
            }
        }
    }

    /**
     * @param list        设备列表
     * @param isNewDevice 是否是新用户
     * @throws SQLException
     */
    public static void update(Connection connection, Set<String> list, boolean isNewDevice) throws SQLException, InterruptedException {
        StringBuilder updateSql = new StringBuilder();
        try {
            updateSql.append("ALTER TABLE dwh.etl_baichuan_daily ");
            /**
             * @parm tag 用户标注该设备是 0-未识别,1-新用户,2-老用户
             */
            if (isNewDevice) {
                updateSql.append("UPDATE tag = 1");
            } else {
                updateSql.append("UPDATE tag = 2");
            }
            StringBuilder inSql = new StringBuilder();
            for (String device : list) {
                inSql.append("'").append(device).append("',");
            }
            updateSql.append(" WHERE device_id IN (").append(inSql, 0, inSql.length() - 1).append(")");
            connection.createStatement().execute(String.valueOf(updateSql));
        } catch (ClickHouseUnknownException | NullPointerException e) {
            Thread.sleep(200);
            assert connection != null;
            connection.createStatement().execute(String.valueOf(updateSql));
        }
    }

}