Datatory3SMain.java 9.17 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
package mobvista.dmp.datasource.datatory;

import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import mobvista.dmp.common.ClickHouseJdbc;
import mobvista.dmp.datasource.baichuan.AsoDevice;
import mobvista.dmp.datasource.baichuan.BaiChuanServer;
import mobvista.dmp.util.DateUtil;
import mobvista.dmp.util.MySQLUtil;
import mobvista.dmp.util.PropertyUtil;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseDataSource;
import ru.yandex.clickhouse.ClickHousePreparedStatement;
import ru.yandex.clickhouse.except.ClickHouseException;
import ru.yandex.clickhouse.settings.ClickHouseProperties;

import java.sql.SQLException;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.*;

/**
 * @package: mobvista.dmp.datasource.datatory
 * @author: wangjf
 * @date: 2019-08-31
 * @time: 07:49
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
public class Datatory3SMain {
    private static final String[] SET_VALUES = PropertyUtil.getProperty("config.properties", "http.private.server.ip").split(",");
    private static String driver = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.driverClassName");
    private static String url = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.url");
    private static String username = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.username");
    private static String password = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.password");
    private static String database = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.database");
    private static int timeout = Integer.parseInt(PropertyUtil.getProperty("config.properties", "datasource.clickhouse.timeout"));

    private static Set<AsoDevice> deviceInfoSet = new CopyOnWriteArraySet<>();

    public static void main(String[] args) throws JoranException {
        LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(context);
        context.reset();
        configurator.doConfigure(BaiChuanServer.class.getClassLoader().getResourceAsStream("logback-syslog.xml"));
        Logger logger = context.getLogger("Datatory");

        String date = "";
        String part = "";
        if (args.length >= 1) {
            part = args[0];
            date = DateUtil.format(DateUtil.parse(part, "yyyyMMdd"), "yyyy-MM-dd");
        } else {
            logger.info("Please Input Partition Date");
            System.exit(1);
        }
        long start = System.currentTimeMillis();
        ClickHouseProperties properties = new ClickHouseProperties();
        properties.setUser(username);
        properties.setPassword(password);
        properties.setDatabase(database);
        properties.setSocketTimeout(timeout);
        properties.setConnectionTimeout(timeout);

        /**
         * foreach runAllNode
         */

        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("datatory-service-%d").build();

        ExecutorService datatoryPool = new ThreadPoolExecutor(3, 5,
                360L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
        for (int ipId = 0; ipId < SET_VALUES.length; ipId++) {
            int finalIpId = ipId;
            String finalDate = date;
            String finalPart = part;
            datatoryPool.execute(() -> {
                String[] ips = SET_VALUES[finalIpId].split(":");
                ClickHouseDataSource dataSource = new ClickHouseDataSource(url.replace("host", ips[new Random().nextInt(2)]), properties);
                ClickHouseConnection connection = null;
                ClickHousePreparedStatement preparedStatement = null;
                ClickHousePreparedStatement preparedStatementDrop = null;
                try {
                    try {
                        connection = ClickHouseJdbc.connectionByTime(finalIpId);
                    } catch (ClickHouseException e) {
                        Thread.sleep(200);
                    }
                    assert connection != null;
                    logger.info(dropPartition(finalPart));
                    preparedStatementDrop = (ClickHousePreparedStatement) connection.prepareStatement(dropPartition(finalPart));
                    preparedStatementDrop.execute();
                    Thread.sleep(120000);
                    logger.info(buildSql(finalDate));
                    preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(buildSql(finalDate));
                    preparedStatement.execute();
                } catch (SQLException | InterruptedException e) {
                    e.printStackTrace();
                    System.exit(255);
                } finally {
                    if (connection != null) {
                        try {
                            connection.close();
                        } catch (SQLException e) {
                            e.printStackTrace();
                        }
                        try {
                            Thread.sleep(200);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    if (preparedStatement != null) {
                        try {
                            preparedStatement.close();
                        } catch (SQLException e) {
                            e.printStackTrace();
                        }
                        try {
                            Thread.sleep(200);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    if (preparedStatementDrop != null) {
                        try {
                            preparedStatementDrop.close();
                        } catch (SQLException e) {
                            e.printStackTrace();
                        }
                        try {
                            Thread.sleep(200);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }

        /*
        for (int ipId = 0; ipId < SET_VALUES.length; ipId++) {
            //  Map<Integer, Set<String>> map = Collections.synchronizedMap(new HashMap<>(10));
            ClickHouseDataSource dataSource = new ClickHouseDataSource(url.replace("host", SET_VALUES[ipId]), properties);
            ClickHouseConnection connection = null;
            ClickHousePreparedStatement preparedStatement = null;
            try {
                try {
                    connection = dataSource.getConnection();
                } catch (ClickHouseException e) {
                    Thread.sleep(200);
                }
                assert connection != null;
                preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(buildSql(date));
                preparedStatement.execute();
            } catch (SQLException | InterruptedException e) {
                e.printStackTrace();
            } finally {
                if (connection != null) {
                    connection.close();
                    Thread.sleep(200);
                }
                if (preparedStatement != null) {
                    preparedStatement.close();
                    Thread.sleep(200);
                }
            }
        }
        */

        long end = System.currentTimeMillis();
        logger.info("all runtime ==>> " + (end - start));
        datatoryPool.shutdown();
    }

    private static String buildSql(String date) {
        String partition = MySQLUtil.getLastPartition("dwh", "ods_user_info");
        String dt = DateUtil.format(DateUtil.parse(partition, "yyyyMMdd"), "yyyy-MM-dd");

        String insertSql = "INSERT INTO dwh.sss_tracking_insight_daily (device_id, device_model, os_version, country, city, age, gender, install_apps, interest, offer_id, event_name, event_type, log_type, dt) " +
                "SELECT device_id, COALESCE(b.device_model,a.device_model) device_model, COALESCE(b.os_version,a.os_version) os_version, COALESCE(b.country,a.country) country, b.city, a.age, a.gender, install_apps, interest, " +
                "offer_id, event_name, event_type, log_type, b.dt FROM (SELECT * FROM dwh.ods_user_info WHERE dt = '@dt' AND update_date = '@dt') a ALL INNER JOIN (SELECT * FROM dwh.sss_tracking_daily_all) b USING device_id WHERE b.dt = '@date'";

        return insertSql.replace("@date", date)
                .replace("@dt", dt)
                .replace("@update_date", dt);
    }

    private static String dropPartition(String part) {
        String dropSql = "ALTER TABLE dwh.sss_tracking_insight_daily DROP PARTITION @part";

        return dropSql.replace("@part", part);
    }
}