RealTimeServiceMain.java 9.65 KB
package mobvista.dmp.datasource.realtime.service;

import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import mobvista.dmp.datasource.baichuan.AsoDevice;
import mobvista.dmp.datasource.baichuan.BaiChuanServer;
import mobvista.dmp.util.DateUtil;
import mobvista.dmp.util.MySQLUtil;
import mobvista.dmp.util.PropertyUtil;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseDataSource;
import ru.yandex.clickhouse.ClickHousePreparedStatement;
import ru.yandex.clickhouse.except.ClickHouseException;
import ru.yandex.clickhouse.settings.ClickHouseProperties;

import java.sql.SQLException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.*;

/**
 * @package: mobvista.dmp.datasource.realtime.service
 * @author: wangjf
 * @date: 2019-10-24
 * @time: 17:29
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
public class RealTimeServiceMain {
    private static final String[] SET_VALUES = PropertyUtil.getProperty("config.properties", "http.private.host.server.ip").split(",");
    private static String driver = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.driverClassName");
    private static String url = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.url");
    private static String username = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.username");
    private static String password = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.password");
    private static String database = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.database");
    private static int timeout = Integer.parseInt(PropertyUtil.getProperty("config.properties", "datasource.clickhouse.timeout"));

    private static Set<AsoDevice> deviceInfoSet = new CopyOnWriteArraySet<>();

    public static void main(String[] args) throws JoranException {
        LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(context);
        context.reset();
        configurator.doConfigure(BaiChuanServer.class.getClassLoader().getResourceAsStream("logback-syslog.xml"));
        Logger logger = context.getLogger("realtime-service");

        String date = "";
        String hour = "";
        //  String region = "";
        int expire = -12;
        if (args.length >= 2) {
            date = args[0];
            hour = args[1];
            //  region = args[2];
        } else {
            logger.info("Please Input date hour region");
            System.exit(1);
        }
        long start = System.currentTimeMillis();
        ClickHouseProperties properties = new ClickHouseProperties();
        properties.setUser(username);
        properties.setPassword(password);
        properties.setDatabase(database);
        properties.setSocketTimeout(timeout);
        properties.setConnectionTimeout(timeout);

        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("realtime-service-%d").build();

        ExecutorService realtimePool = new ThreadPoolExecutor(3, 5,
                360L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
        for (int ipId = 0; ipId < SET_VALUES.length; ipId++) {
            int finalIpId = ipId;
            String finalDate = date;
            String finalHour = hour;
            //  String finalRegion = region;
            realtimePool.execute(() -> {
                String[] ips = SET_VALUES[finalIpId].split(":");
                ClickHouseDataSource dataSource = new ClickHouseDataSource(url.replace("host", ips[new Random().nextInt(2)]), properties);
                ClickHouseConnection connection = null;
                ClickHousePreparedStatement preparedStatement = null;
                ClickHousePreparedStatement preparedStatementPart = null;

                SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
                SimpleDateFormat sdf2 = new SimpleDateFormat("yyyyMMdd");
                try {
                    try {
                        connection = dataSource.getConnection();
                    } catch (ClickHouseException e) {
                        Thread.sleep(500);
                        System.exit(255);
                    }

                    assert connection != null;
                    /*
                    preparedStatementPart = (ClickHousePreparedStatement) connection.prepareStatement(getLastPart("dwh", "ods_user_info"));
                    ResultSet resultSet = preparedStatementPart.executeQuery();
                    String partition = "";
                    while (resultSet.next()) {
                        partition = resultSet.getString("partition");
                    }
                    */
                    String partition = MySQLUtil.getLastPartition("dwh", "ods_user_info");
                    String dt = sdf1.format(sdf2.parse(partition));

                    Set<String> regionSet = new HashSet<>();
                    regionSet.add("cn");
                    regionSet.add("tokyo");
                    regionSet.add("virginia");
                    for (String region : regionSet) {
                        String part = sdf2.format(sdf1.parse(finalDate));
                        preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(dropPartition(part, finalHour, region));
                        preparedStatement.execute();
                        Thread.sleep(1000);
                    }

                    String expireDate = DateUtil.getDayByString(finalDate, "yyyy-MM-dd", expire);
                    System.out.println("SQL ===>>> " + buildSql(finalDate, finalHour, dt, expireDate));

                    preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(buildSql(finalDate, finalHour, dt, expireDate));
                    preparedStatement.execute();
                } catch (SQLException | InterruptedException | ParseException e) {
                    System.exit(255);
                    e.printStackTrace();
                } finally {
                    if (connection != null) {
                        try {
                            connection.close();
                        } catch (SQLException e) {
                            e.printStackTrace();
                        }
                    }
                    if (preparedStatement != null) {
                        try {
                            preparedStatement.close();
                        } catch (SQLException e) {
                            e.printStackTrace();
                        }
                    }
                    if (preparedStatementPart != null) {
                        try {
                            preparedStatementPart.close();
                        } catch (SQLException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
        realtimePool.shutdown();
        long end = System.currentTimeMillis();
        logger.info("all runtime ==>> " + (end - start));
    }

    private static String buildSql(String date, String hour, String dt, String expireDate) {
        //  AND a.publish_date > '@expire_date'
        String insertSql = "INSERT INTO dwc.realtime_service_result (dt, hour, region, device_id, country, age, gender, install_apps, interest, frequency, flag) " +
                "SELECT b.dt, b.hour, arrayJoin(arrayPushBack(a.region, b.region)) region, b.device_id, UPPER(COALESCE(b.country,a.country)) country, if(b.age != 0,b.age,a.age) age, " +
                "if(b.gender != 0,b.gender,a.gender) gender," +
                "arrayDistinct(arrayConcat(a.install_apps,b.install_apps)) install_apps, arrayDistinct(arrayConcat(a.interest,b.interest)) interest, a.frequency, " +
                "if(hasAll(a.install_apps,b.install_apps) AND a.publish_date > '@expire_date', 0, 1) flag FROM (SELECT * FROM dwh.ods_user_info WHERE dt = '@partition') a ALL INNER JOIN (SELECT * FROM dwh.realtime_service_hour_all) b " +
                "USING device_id WHERE b.dt = '@date' AND b.hour = '@hour'";

        //  .replace("@region", region)
        return insertSql.replace("@date", date)
                .replace("@hour", hour)
                .replace("@partition", dt)
                .replace("@expire_date", expireDate);
    }

    private static String getLastPartitionFromMySQL(String database, String table) {
        String sql = "SELECT partition FROM system.parts WHERE database = '@database' AND table = '@table' GROUP BY partition ORDER BY partition DESC LIMIT 1";
        return sql.replace("@database", database).replace("@table", table);
    }

    private static String getLastPart(String database, String table) {
        String sql = "SELECT partition FROM system.parts WHERE database = '@database' AND table = '@table' GROUP BY partition ORDER BY partition DESC LIMIT 1";
        return sql.replace("@database", database).replace("@table", table);
    }

    private static String dropPartition(String part, String hour, String region) {
        String dropSql = "ALTER TABLE dwc.realtime_service_result DROP PARTITION (@part,'@hour','@region')";

        return dropSql.replace("@part", part)
                .replace("@hour", hour)
                .replace("@region", region);
    }
}