1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package mobvista.dmp.datasource.rtdmp;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import mobvista.dmp.common.ClickHouseJdbc;
import mobvista.dmp.datasource.iqiyi.IQiYiMain;
import mobvista.dmp.util.DateUtil;
import org.apache.commons.lang.StringUtils;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHouseConnection;
import scala.Tuple4;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Date;
import java.util.Map;
import java.util.Objects;
/**
* @package: mobvista.dmp.datasource.rtdmp
* @author: wangjf
* @date: 2020/9/15
* @time: 3:04 下午
* @email: jinfeng.wang@mobvista.com
* @phone: 152-1062-7698
*/
public class Estimate {
public static Logger logger = null;
public static void main(String[] args) throws JoranException {
LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(context);
context.reset();
configurator.doConfigure(IQiYiMain.class.getClassLoader().getResourceAsStream("logback-syslog.xml"));
logger = context.getLogger("Estimate");
String start = DateUtil.format(new Date(), "yyyy-MM-dd HH") + ":00:00";
String end = DateUtil.format(new Date(), "yyyy-MM-dd HH") + ":59:59";
if (args.length >= 2) {
start = args[0];
end = args[1];
} else {
logger.info("Please Input DateTime!");
}
Long audience_date_utime_start = 1577811600L;
Long audience_date_utime_end = 4100731200L;
// 预估此时间段内有过更新的人群包设备量
Map<Integer, Tuple4<JSONArray, Integer, Integer, JSONObject>> audienceInfo =
ServerUtil.request(start, end, audience_date_utime_start, audience_date_utime_end, 3, 2,4);
logger.info("audiemceInfo -->> " + audienceInfo);
JSONArray jsonArray = new JSONArray();
for (Map.Entry<Integer, Tuple4<JSONArray, Integer, Integer, JSONObject>> entry : audienceInfo.entrySet()) {
JSONObject jsonObject = new JSONObject();
int count = estimate(entry.getKey(), entry.getValue()._3(), entry.getValue()._4());
jsonObject.put("id", entry.getKey());
jsonObject.put("audience_count", count);
jsonArray.add(jsonObject);
}
logger.info("result -->> " + jsonArray);
ServerUtil.update(jsonArray);
}
private static int estimate(int audienceId, int audience_type, JSONObject audience_rules) {
ClickHouseConnection connection = null;
try {
connection = ClickHouseJdbc.connection();
} catch (SQLException | InterruptedException e) {
logger.info("ClickHouse Connection Failure!");
}
assert connection != null;
ResultSet partition = ClickHouseJdbc.getMaxPartition(connection, "dwh", "audience_merge");
int count = 0;
try {
String dt = "";
String hour = "";
while (partition.next()) {
String partStr = partition.getString("partition");
String[] parts = partStr.substring(1, partStr.length() - 1).replace("'", "").split(",", -1);
dt = DateUtil.format(DateUtil.parse(parts[0], "yyyyMMdd"), "yyyy-MM-dd");
hour = parts[1];
}
String sql = buildSql(dt, hour, audienceId, audience_type, audience_rules).replace("@table", "audience_merge_all").replace("@key", "COUNT(1) count");
logger.info("sql -->> " + sql);
ResultSet resultSet = connection.createStatement().executeQuery(sql);
while (resultSet.next()) {
count = resultSet.getInt("count");
}
} catch (SQLException e) {
logger.info(e.getMessage());
}
return count;
}
private static String buildSql(String dt, String hour, int audienceId, int audience_type, JSONObject audience_rules) {
if (audience_type == 3) {
StringBuilder sql = new StringBuilder();
sql.append("SELECT @key FROM dwh.@table WHERE dt = '" + dt + "' AND hour = '" + hour + "' ");
StringBuilder ruleSql = new StringBuilder();
// hasAll
if (audience_rules.containsKey("intersections") && !audience_rules.getJSONArray("intersections").isEmpty()) {
ruleSql.append("hasAll(audience_id," + audience_rules.getJSONArray("intersections") + ")");
}
// hasAny
if (audience_rules.containsKey("union") && !audience_rules.getJSONArray("union").isEmpty()) {
if (StringUtils.isNotBlank(String.valueOf(ruleSql))) {
ruleSql.append(" OR hasAny(audience_id," + audience_rules.getJSONArray("union") + ")");
} else {
ruleSql.append("hasAny(audience_id," + audience_rules.getJSONArray("union") + ")");
}
}
if (StringUtils.isNotBlank(String.valueOf(ruleSql))) {
sql.append("AND (").append(ruleSql).append(")");
}
// not hasAny
if (audience_rules.containsKey("subtraction") && !audience_rules.getJSONArray("subtraction").isEmpty()) {
sql.append(" AND NOT hasAny(audience_id," + audience_rules.getJSONArray("subtraction") + ")");
}
return sql.toString();
} else {
return "SELECT @key FROM dwh.@table WHERE dt = '" + dt + "' AND hour = '" + hour + "' AND has(audience_id," + audienceId + ")";
}
}
}