Estimate.java 5.73 KB
package mobvista.dmp.datasource.rtdmp;

import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import mobvista.dmp.common.ClickHouseJdbc;
import mobvista.dmp.datasource.iqiyi.IQiYiMain;
import mobvista.dmp.util.DateUtil;
import org.apache.commons.lang.StringUtils;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHouseConnection;
import scala.Tuple4;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Date;
import java.util.Map;
import java.util.Objects;

/**
 * @package: mobvista.dmp.datasource.rtdmp
 * @author: wangjf
 * @date: 2020/9/15
 * @time: 3:04 下午
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
public class Estimate {

    public static Logger logger = null;

    public static void main(String[] args) throws JoranException {

        LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(context);
        context.reset();
        configurator.doConfigure(IQiYiMain.class.getClassLoader().getResourceAsStream("logback-syslog.xml"));
        logger = context.getLogger("Estimate");

        String start = DateUtil.format(new Date(), "yyyy-MM-dd HH") + ":00:00";
        String end = DateUtil.format(new Date(), "yyyy-MM-dd HH") + ":59:59";

        if (args.length >= 2) {
            start = args[0];
            end = args[1];
        } else {
            logger.info("Please Input DateTime!");
        }
        Long audience_date_utime_start = 1577811600L;
        Long audience_date_utime_end = 4100731200L;

        //  预估此时间段内有过更新的人群包设备量
        Map<Integer, Tuple4<JSONArray, Integer, Integer, JSONObject>> audienceInfo =
                ServerUtil.request(start, end, audience_date_utime_start, audience_date_utime_end, 3, 2,4);

        logger.info("audiemceInfo -->> " + audienceInfo);
        JSONArray jsonArray = new JSONArray();
        for (Map.Entry<Integer, Tuple4<JSONArray, Integer, Integer, JSONObject>> entry : audienceInfo.entrySet()) {
            JSONObject jsonObject = new JSONObject();
            int count = estimate(entry.getKey(), entry.getValue()._3(), entry.getValue()._4());
            jsonObject.put("id", entry.getKey());
            jsonObject.put("audience_count", count);
            jsonArray.add(jsonObject);
        }
        logger.info("result -->> " + jsonArray);
        ServerUtil.update(jsonArray);
    }

    private static int estimate(int audienceId, int audience_type, JSONObject audience_rules) {
        ClickHouseConnection connection = null;
        try {
            connection = ClickHouseJdbc.connection();
        } catch (SQLException | InterruptedException e) {
            logger.info("ClickHouse Connection Failure!");
        }
        assert connection != null;
        ResultSet partition = ClickHouseJdbc.getMaxPartition(connection, "dwh", "audience_merge");
        int count = 0;
        try {
            String dt = "";
            String hour = "";
            while (partition.next()) {
                String partStr = partition.getString("partition");
                String[] parts = partStr.substring(1, partStr.length() - 1).replace("'", "").split(",", -1);
                dt = DateUtil.format(DateUtil.parse(parts[0], "yyyyMMdd"), "yyyy-MM-dd");
                hour = parts[1];
            }
            String sql = buildSql(dt, hour, audienceId, audience_type, audience_rules).replace("@table", "audience_merge_all").replace("@key", "COUNT(1) count");
            logger.info("sql -->> " + sql);
            ResultSet resultSet = connection.createStatement().executeQuery(sql);
            while (resultSet.next()) {
                count = resultSet.getInt("count");
            }
        } catch (SQLException e) {
            logger.info(e.getMessage());
        }
        return count;
    }

    private static String buildSql(String dt, String hour, int audienceId, int audience_type, JSONObject audience_rules) {
        if (audience_type == 3) {
            StringBuilder sql = new StringBuilder();
            sql.append("SELECT @key FROM dwh.@table WHERE dt = '" + dt + "' AND hour = '" + hour + "' ");
            StringBuilder ruleSql = new StringBuilder();
            //  hasAll
            if (audience_rules.containsKey("intersections") && !audience_rules.getJSONArray("intersections").isEmpty()) {
                ruleSql.append("hasAll(audience_id," + audience_rules.getJSONArray("intersections") + ")");
            }
            //  hasAny
            if (audience_rules.containsKey("union") && !audience_rules.getJSONArray("union").isEmpty()) {
                if (StringUtils.isNotBlank(String.valueOf(ruleSql))) {
                    ruleSql.append(" OR hasAny(audience_id," + audience_rules.getJSONArray("union") + ")");
                } else {
                    ruleSql.append("hasAny(audience_id," + audience_rules.getJSONArray("union") + ")");
                }
            }
            if (StringUtils.isNotBlank(String.valueOf(ruleSql))) {
                sql.append("AND (").append(ruleSql).append(")");
            }
            //  not hasAny
            if (audience_rules.containsKey("subtraction") && !audience_rules.getJSONArray("subtraction").isEmpty()) {
                sql.append(" AND NOT hasAny(audience_id," + audience_rules.getJSONArray("subtraction") + ")");
            }
            return sql.toString();
        } else {
            return "SELECT @key FROM dwh.@table WHERE dt = '" + dt + "' AND hour = '" + hour + "' AND has(audience_id," + audienceId + ")";
        }
    }
}