DatatoryAdnMain.java 8.17 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
package mobvista.dmp.datasource.datatory;

import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import mobvista.dmp.common.ClickHouseJdbc;
import mobvista.dmp.datasource.baichuan.AsoDevice;
import mobvista.dmp.datasource.baichuan.BaiChuanServer;
import mobvista.dmp.util.DateUtil;
import mobvista.dmp.util.MySQLUtil;
import mobvista.dmp.util.PropertyUtil;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseDataSource;
import ru.yandex.clickhouse.ClickHousePreparedStatement;
import ru.yandex.clickhouse.except.ClickHouseException;
import ru.yandex.clickhouse.settings.ClickHouseProperties;

import java.sql.SQLException;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.*;

/**
 * @package: mobvista.dmp.datasource.datatory
 * @author: wangjf
 * @date: 2019-08-31
 * @time: 07:49
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
public class DatatoryAdnMain {
    private static final String[] SET_VALUES = PropertyUtil.getProperty("config.properties", "http.private.server.ip").split(",");
    private static String driver = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.driverClassName");
    private static String url = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.url");
    private static String username = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.username");
    private static String password = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.password");
    private static String database = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.database");
    private static int timeout = Integer.parseInt(PropertyUtil.getProperty("config.properties", "datasource.clickhouse.timeout"));

    private static Set<AsoDevice> deviceInfoSet = new CopyOnWriteArraySet<>();

    public static void main(String[] args) throws JoranException {
        LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(context);
        context.reset();
        configurator.doConfigure(BaiChuanServer.class.getClassLoader().getResourceAsStream("logback-syslog.xml"));
        Logger logger = context.getLogger("Datatory.Adn");

        String date = "";
        String part = "";
        if (args.length >= 1) {
            part = args[0];
            date = DateUtil.format(DateUtil.parse(part, "yyyyMMdd"), "yyyy-MM-dd");
        } else {
            logger.info("Please Input Partition Date");
            System.exit(1);
        }
        long start = System.currentTimeMillis();
        ClickHouseProperties properties = new ClickHouseProperties();
        properties.setUser(username);
        properties.setPassword(password);
        properties.setDatabase(database);
        properties.setSocketTimeout(timeout);
        properties.setConnectionTimeout(timeout);

        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat(" datatory-service-%d").build();

        ExecutorService datatoryPool = new ThreadPoolExecutor(3, 5,
                360L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
WangJinfeng committed
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104

        int randomIp = new Random().nextInt(3);
        ClickHousePreparedStatement preparedStatementDrop = null;
        logger.info(dropPartition(part));
        ClickHouseConnection dropConnection = null;
        try {
            dropConnection = ClickHouseJdbc.connectionByTime(randomIp);
        } catch (SQLException | InterruptedException e) {
            try {
                dropConnection = ClickHouseJdbc.connectionByTime(randomIp);
            } catch (SQLException | InterruptedException ex) {
                logger.info("ClickHouse Connection Failure!");
            }
        }
        try {
            preparedStatementDrop = (ClickHousePreparedStatement) dropConnection.prepareStatement(dropPartition(part));
            preparedStatementDrop.execute();
            Thread.sleep(120000);
        } catch (SQLException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                preparedStatementDrop.close();
                dropConnection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }

wang-jinfeng committed
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
        for (int ipId = 0; ipId < SET_VALUES.length; ipId++) {
            int finalIpId = ipId;
            String finalDate = date;
            datatoryPool.execute(() -> {
                String[] ips = SET_VALUES[finalIpId].split(":"); // ips[new Random().nextInt(2)]
                ClickHouseDataSource dataSource = new ClickHouseDataSource(url.replace("host", ips[new Random().nextInt(2)]), properties);
                ClickHouseConnection connection = null;
                ClickHousePreparedStatement preparedStatement = null;
                try {
                    try {
                        connection = ClickHouseJdbc.connectionByTime(finalIpId);
                    } catch (ClickHouseException e) {
                        Thread.sleep(200);
                    }
                    assert connection != null;
WangJinfeng committed
120

wang-jinfeng committed
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
                    logger.info(buildSql(finalDate));
                    preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(buildSql(finalDate));
                    preparedStatement.execute();
                } catch (SQLException | InterruptedException e) {
                    e.printStackTrace();
                    System.exit(255);
                } finally {
                    if (connection != null) {
                        try {
                            connection.close();
                        } catch (SQLException e) {
                            e.printStackTrace();
                        }
                        try {
                            Thread.sleep(200);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    if (preparedStatement != null) {
                        try {
                            preparedStatement.close();
                        } catch (SQLException e) {
                            e.printStackTrace();
                        }
                        try {
                            Thread.sleep(200);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
WangJinfeng committed
155

wang-jinfeng committed
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
        long end = System.currentTimeMillis();
        logger.info("all runtime ==>> " + (end - start));
        datatoryPool.shutdown();
    }

    private static String buildSql(String date) {
        String partition = MySQLUtil.getLastPartition("dwh", "ods_user_info");
        String dt = DateUtil.format(DateUtil.parse(partition, "yyyyMMdd"), "yyyy-MM-dd");

        String insertSql = "INSERT INTO dwh.adn_tracking_insight_daily(device_id, device_model, os_version, country, city, age, gender, install_apps, interest, campaign_id, event_name, event_type, app_id, log_type,dt) " +
                "SELECT device_id,COALESCE(b.device_model,a.device_model) device_model,COALESCE(b.os_version,a.os_version) os_version,COALESCE(b.country,a.country) country,b.city,a.age,a.gender,install_apps,interest, " +
                "campaign_id, event_name, event_type, app_id, log_type,b.dt FROM (SELECT * FROM dwh.ods_user_info WHERE dt = '@dt' AND update_date = '@dt') a ALL INNER JOIN (SELECT * FROM dwh.adn_tracking_daily_all) b USING device_id WHERE b.dt = '@date'";

        return insertSql.replace("@date", date)
                .replace("@dt", dt)
                .replace("@update_date", dt);
    }

    private static String dropPartition(String part) {
WangJinfeng committed
175
        String dropSql = "ALTER TABLE dwh.adn_tracking_insight_daily ON CLUSTER cluster_1st DROP PARTITION @part";
wang-jinfeng committed
176 177 178 179

        return dropSql.replace("@part", part);
    }
}