package mobvista.dmp.datasource.rtdmp; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.google.common.util.concurrent.ThreadFactoryBuilder; import mobvista.dmp.common.Constants; import mobvista.dmp.datasource.rtdmp.entity.KV; import mobvista.dmp.datasource.rtdmp.entity.Tuple; import mobvista.dmp.util.*; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpResponse; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPut; import org.apache.http.client.utils.URIBuilder; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.message.BasicNameValuePair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import ru.yandex.clickhouse.ClickHouseConnection; import ru.yandex.clickhouse.ClickHouseDataSource; import ru.yandex.clickhouse.response.ClickHouseResultSet; import ru.yandex.clickhouse.settings.ClickHouseProperties; import java.io.*; import java.net.URISyntaxException; import java.sql.ResultSet; import java.sql.SQLException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.*; /** * @package: mobvista.dmp.datasource.rtdmp * @author: wangjf * @date: 2021/8/3 * @time: 10:59 上午 * @email: jinfeng.wang@mobvista.com */ public class RTDmpFetch { private static String baseUrl = PropertyUtil.getProperty("config.properties", "rtdmp.url"); private static final String[] SET_VALUES = PropertyUtil.getProperty("config.properties", "http.private.host.server.ip").split(","); private static final String DRIVER = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.driverClassName"); private static final String URL = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.url"); private static final String USERNAME = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.username"); private static final String PASSWORD = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.password"); private static final String DATABASE = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.database"); private static final int TIMEOUT = Integer.parseInt(PropertyUtil.getProperty("config.properties", "datasource.clickhouse.timeout")); private static final String bucketName = PropertyUtil.getProperty("config.properties", "aws.s3.bucket"); private static final String keyName = PropertyUtil.getProperty("config.properties", "aws.s3.key"); private static final String OUTPUT = PropertyUtil.getProperty("config.properties", "rtdmp.output.dir"); private static final SimpleDateFormat SDF = new SimpleDateFormat("yyyy-MM-dd HH"); private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private static final MySQLUtil mySqlUtil = new MySQLUtil(); public static final Logger LOGGER = LoggerFactory.getLogger(RTDmpFetch.class); public static void main(String[] args) { String update_time_start; String update_time_end; if (args.length >= 2) { update_time_start = args[0]; update_time_end = args[1]; } else { Calendar cal = Calendar.getInstance(); Date date = new Date(); cal.setTime(date); cal.add(Calendar.DATE, -7); update_time_start = DateUtil.format(cal.getTime(), "yyyy-MM-dd ") + "00:00:00"; cal.setTime(date); cal.add(Calendar.DATE, -1); update_time_end = DateUtil.format(cal.getTime(), "yyyy-MM-dd ") + "23:59:59"; } String part = DateUtil.format(DateUtil.parse(update_time_end, "yyyy-MM-dd HH:mm:ss"), "yyyyMMddHH"); JSONArray jsonArray = ruleAudienceInfo(update_time_start, update_time_end); jsonArray.sort(Comparator.comparing(obj -> ((JSONObject) obj).getIntValue("id"))); ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("RTDmpFetch-%d").build(); ExecutorService pool = new ThreadPoolExecutor(40, 60, 120L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(128), threadFactory, new ThreadPoolExecutor.AbortPolicy()); for (int i = 0; i < jsonArray.size(); i++) { int finalI = i; pool.execute(() -> { JSONObject audienceObject = jsonArray.getJSONObject(finalI); doFetch(audienceObject, part); }); } pool.shutdown(); } private static void doFetch(JSONObject audienceObject, String part) { int audienceId = audienceObject.getIntValue("id"); boolean success = fetch(audienceObject, part); if (success) { LOGGER.info(Thread.currentThread().getName() + " fetch " + audienceId + " success!"); } else { LOGGER.info(Thread.currentThread().getName() + " fetch " + audienceId + " failure!"); } } private static boolean fetch(JSONObject jsonObject, String part) { long startTime = DateUtil.parse(DateUtil.format(new Date(), "yyyy-MM-dd") + " 00:00:00", "yyyy-MM-dd HH:mm:ss").getTime() / 1000; long endTime = DateUtil.parse(DateUtil.format(new Date(), "yyyy-MM-dd") + " 14:00:00", "yyyy-MM-dd HH:mm:ss").getTime() / 1000; long start = System.currentTimeMillis(); int audienceId = jsonObject.getIntValue("id"); LOGGER.info("checkRules -->> audienceId:" + audienceId + ", jsonObject:" + jsonObject + ", startTime:" + startTime + ", endTime:" + endTime); Tuple tuple = checkRules(jsonObject, startTime, endTime); if (tuple.getFlag()) { KV kv = mySqlUtil.getPartitionTime("dwh", "audience_merge_v1"); // retry getPartitionTime for (int i = 0; i < 5; i++) { if (StringUtils.isBlank(kv.getK())) { kv = mySqlUtil.getPartitionTime("dwh", "audience_merge_v1"); } else { break; } try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } if (StringUtils.isBlank(kv.getK())) { LOGGER.info("getPartitionTime Blank !!! Please Check."); System.exit(-1); } String dt = DateUtil.format(DateUtil.parse(kv.getK().substring(0, 8), "yyyyMMdd"), "yyyy-MM-dd"); String hour = kv.getK().substring(8, 10); // time of insert partition String utime = kv.getV(); String partTime = dt + " " + hour; String newPartTime = SDF.format(new Date(tuple.getUtime() * 1000)); LOGGER.info("checkPartition -->> audienceId:" + audienceId + ", partTime:" + partTime + ", newPartTime:" + newPartTime); while (partTime.compareTo(newPartTime) < 0) { long nowTimestamp = DateUtil.parse(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), "yyyy-MM-dd HH:mm:ss").getTime() / 1000; LOGGER.info("checkPartition -->> audienceId:" + audienceId + ", nowTimestamp:" + nowTimestamp + ", partTime:" + partTime + ", newPartTime:" + newPartTime + ", sleep 300s"); if (nowTimestamp > endTime) { break; } try { Thread.sleep(300000); } catch (InterruptedException e) { LOGGER.info(e.getMessage()); } kv = mySqlUtil.getPartitionTime("dwh", "audience_merge_v1"); dt = DateUtil.format(DateUtil.parse(kv.getK().substring(0, 8), "yyyyMMdd"), "yyyy-MM-dd"); hour = kv.getK().substring(8, 10); utime = kv.getV(); partTime = dt + " " + hour; } long nowTime = DateUtil.parse(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), "yyyy-MM-dd HH:mm:ss").getTime() / 1000; if (Long.parseLong(utime) > (nowTime - 600) && nowTime < endTime) { // 避免后台分片同步导致数据缺失 try { LOGGER.info("Sleep 10 min."); Thread.sleep(600000); } catch (InterruptedException e) { LOGGER.info(e.getMessage()); } } ClickHouseProperties properties = new ClickHouseProperties(); properties.setUser(USERNAME); properties.setPassword(PASSWORD); properties.setDatabase(DATABASE); properties.setSocketTimeout(TIMEOUT); properties.setConnectionTimeout(TIMEOUT); String[] ips0 = SET_VALUES[new Random().nextInt(3)].split(":"); int random0 = new Random().nextInt(2); String sql = buildSql(dt, hour, jsonObject) .replace("@key", "device_type") .replace("@table", "audience_merge_v1_all") + " GROUP BY device_type"; LOGGER.info("checkDeviceType -->> audienceId:" + audienceId + ", sql -->>" + sql); ClickHouseDataSource dataSource0 = new ClickHouseDataSource(URL.replace("host", ips0[random0]), properties); ClickHouseConnection connection0 = null; ClickHouseResultSet resultSet0 = null; Set<String> devTypeSet = new HashSet<>(); try { connection0 = dataSource0.getConnection(); resultSet0 = (ClickHouseResultSet) connection0.prepareStatement(sql).executeQuery(); while (resultSet0.next()) { String deviceType = resultSet0.getString("device_type"); devTypeSet.add(deviceType); } } catch (SQLException e) { LOGGER.info("SQLException -->> " + e.getMessage()); } finally { try { connection0.close(); resultSet0.close(); } catch (SQLException throwables) { LOGGER.info(throwables.getMessage()); } } sql = buildSql(dt, hour, jsonObject) .replace("@key", "COUNT(1) counts") .replace("@table", "audience_merge_v1_all"); int counts = 0; try { connection0 = dataSource0.getConnection(); resultSet0 = (ClickHouseResultSet) connection0.prepareStatement(sql).executeQuery(); while (resultSet0.next()) { counts = resultSet0.getInt("counts"); } } catch (SQLException e) { LOGGER.info("SQLException -->> " + e.getMessage()); } finally { try { connection0.close(); resultSet0.close(); } catch (SQLException throwables) { LOGGER.info(throwables.getMessage()); } } LOGGER.info("checkCount -->> audienceId:" + audienceId + ", sql -->>" + sql + ", count:" + counts); for (String devType : devTypeSet) { sql = buildSql(dt, hour, jsonObject) .replace("@key", "devid") .replace("@table", "audience_merge_v1") + " AND device_type = '" + devType + "'"; LOGGER.info("checkDeviceId -->> audienceId:" + audienceId + ",sql -->> " + sql); for (int i = 0; i < SET_VALUES.length; i++) { String[] ips = SET_VALUES[i].split(":"); int random = new Random().nextInt(2); ClickHouseDataSource dataSource = new ClickHouseDataSource(URL.replace("host", ips[random]), properties); ClickHouseConnection connection; ClickHouseResultSet resultSet; try { connection = dataSource.getConnection(); resultSet = (ClickHouseResultSet) connection.prepareStatement(sql).executeQuery(); } catch (SQLException e) { LOGGER.info("SQLException -->> " + e.getMessage()); return false; } LOGGER.info("audienceId -->> " + audienceId + ",upload start!"); try { if (resultSet != null && resultSet.hasNext()) { LOGGER.info("audienceId -->> " + audienceId + ",resultSet is not null"); } else { devType = "unknown"; LOGGER.info("audienceId -->> " + audienceId + ",resultSet is null"); sql = "SELECT 'c4ca4238a0b923820dcc509a6f75849b' devid"; try { resultSet = (ClickHouseResultSet) connection.prepareStatement(sql).executeQuery(); } catch (SQLException e) { LOGGER.info("SQLException -->> " + e.getMessage()); return false; } } multipleFileUpload(resultSet, audienceId, devType, part, i); } catch (SQLException throwables) { LOGGER.info("SQLException -->> " + throwables.getMessage()); return false; } } } // 有数据更新即更新,否则不更新 if (devTypeSet.size() > 0) { StringBuilder s3Path = new StringBuilder(); s3Path.append("s3://").append(bucketName).append("/").append(keyName).append("/") .append(audienceId).append("/").append(part).append("/*/"); JSONObject updateJSON = new JSONObject(); updateJSON.put("id", audienceId); updateJSON.put("s3_path", s3Path); updateJSON.put("status", 1); updateJSON.put("audience_data_status", 1); updateJSON.put("audience_count", counts); JSONArray updateJSONArray = new JSONArray(); updateJSONArray.add(updateJSON); update(updateJSONArray.toJSONString()); } } long end = System.currentTimeMillis(); LOGGER.info("audienceId -->> " + audienceId + ",runtime ==>> " + (end - start)); return true; } private static JSONArray ruleAudienceInfo(String startTime, String endTime) { Long startTimestamp = null; Long endTimestamp = null; try { startTimestamp = simpleDateFormat.parse(startTime).getTime() / 1000; endTimestamp = simpleDateFormat.parse(endTime).getTime() / 1000; } catch (ParseException e) { e.printStackTrace(); } CloseableHttpClient client = HttpClients.createDefault(); List<BasicNameValuePair> formparams = new ArrayList<>(); final String serverUrl = baseUrl + "rtdmp/audience/query"; URIBuilder uri = new URIBuilder(); try { uri = new URIBuilder(serverUrl) .addParameter("update_time_start", startTime) .addParameter("audience_type", "3") .addParameter("is_offline", "1"); } catch (URISyntaxException e) { e.printStackTrace(); } LOGGER.info("ruleAudienceInfo.url: " + uri); RequestConfig requestConfig = RequestConfig.custom() .setConnectTimeout(5000).setConnectionRequestTimeout(5000) .setSocketTimeout(5000).build(); HttpGet httpGet = new HttpGet(); try { httpGet = new HttpGet(uri.build()); } catch (URISyntaxException e) { LOGGER.info(e.getMessage()); } String key = UUID.randomUUID().toString(); String token = MD5Util.getMD5Str(key); httpGet.setHeader("Auth-System", "dmp"); httpGet.setHeader("Content-Type", "text/plain"); httpGet.setHeader("key", key); httpGet.setHeader("token", token); JSONArray jsonArrayReturn = new JSONArray(); CloseableHttpResponse response; try { response = client.execute(httpGet); BufferedReader rd = new BufferedReader( new InputStreamReader(response.getEntity().getContent())); StringBuilder result = new StringBuilder(); String line; while ((line = rd.readLine()) != null) { result.append(line); } JSONObject jsonObject = Constants.String2JSONObject(result.toString()); if (jsonObject.getInteger("code") == 200 && jsonObject.containsKey("data")) { /* for (int i = 0; i < jsonArray.size(); i++) { JSONObject json = jsonArray.getJSONObject(i); if (json.getLongValue("audience_data_utime") >= startTimestamp && json.getLongValue("audience_data_utime") < endTimestamp) { jsonArrayReturn.add(json); } } */ jsonArrayReturn = jsonObject.getJSONArray("data"); } } catch (IOException e) { LOGGER.info(e.getMessage()); jsonArrayReturn = new JSONArray(); } finally { httpGet.abort(); } LOGGER.info("ruleAudienceInfo.result: " + jsonArrayReturn); return jsonArrayReturn; } private static void update(String requestBody) { LOGGER.info("rtdmp/update -->> requestBody -->> " + requestBody); CloseableHttpClient client = HttpClients.createDefault(); RequestConfig requestConfig = RequestConfig.custom() .setConnectTimeout(5000).setConnectionRequestTimeout(5000) .setSocketTimeout(5000).build(); final String serverUrl = baseUrl + "rtdmp/audience/update"; HttpPut put = new HttpPut(serverUrl); String key = UUID.randomUUID().toString(); String token = MD5Util.getMD5Str(key); put.setHeader("key", key); put.setHeader("token", token); JSONObject jsonObject = new JSONObject(); HttpResponse response; try { put.setConfig(requestConfig); put.setEntity(new StringEntity(requestBody)); response = client.execute(put); BufferedReader rd = new BufferedReader( new InputStreamReader(response.getEntity().getContent())); StringBuilder result = new StringBuilder(); String line; while ((line = rd.readLine()) != null) { result.append(line); } jsonObject = JSON.parseObject(result.toString()); } catch (IOException e) { LOGGER.info("IOException -->> " + e.getMessage()); } finally { put.abort(); } LOGGER.info("rtdmp/update -->> jsonObject -->> " + jsonObject.toJSONString()); } private static Tuple checkRules(JSONObject jsonObject, long startTime, long endTime) { JSONArray audienceIds = new JSONArray(); if (jsonObject.containsKey("group_rules")) { JSONArray group_rules = jsonObject.getJSONArray("group_rules"); for (int i = 0; i < group_rules.size(); i++) { audienceIds.addAll(group_rules.getJSONObject(i).getJSONArray("audiences")); } } if (jsonObject.containsKey("audience_rules") && audienceIds.isEmpty()) { JSONObject audience_rules = jsonObject.getJSONObject("audience_rules"); if (audience_rules.containsKey("intersections") && !audience_rules.getJSONArray("intersections").isEmpty()) { audienceIds.addAll(audience_rules.getJSONArray("intersections")); } if (audience_rules.containsKey("union") && !audience_rules.getJSONArray("union").isEmpty()) { audienceIds.addAll(audience_rules.getJSONArray("union")); } if (audience_rules.containsKey("subtraction") && !audience_rules.getJSONArray("subtraction").isEmpty()) { audienceIds.addAll(audience_rules.getJSONArray("subtraction")); } } if (audienceIds.size() > 0) { long nowTime = DateUtil.parse(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), "yyyy-MM-dd HH:mm:ss").getTime() / 1000; while (nowTime < endTime) { Tuple tuple = checkSuccess(audienceIds, startTime); LOGGER.info("checkSuccess -->> flag:" + tuple.getFlag() + ", utime:" + tuple.getUtime()); if (tuple.getFlag()) { return tuple; } try { LOGGER.info("checkRules sleep 300s"); Thread.sleep(300000); } catch (InterruptedException e) { LOGGER.info(e.getMessage()); } nowTime = DateUtil.parse(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), "yyyy-MM-dd HH:mm:ss").getTime() / 1000; } return new Tuple(true, nowTime); } return new Tuple(false, 0L); } private static String buildSql(String dt, String hour, JSONObject jsonObject) { StringBuilder sql = new StringBuilder(); sql.append("SELECT @key FROM dwh.@table WHERE dt = '").append(dt).append("' AND hour = '").append(hour).append("' AND device_type != 'unknown' "); if (jsonObject.containsKey("group_rules")) { StringBuilder ruleSql = new StringBuilder(); JSONArray group_rules = jsonObject.getJSONArray("group_rules"); for (int i = 0; i < group_rules.size(); i++) { JSONObject json = Constants.String2JSONObject(group_rules.getJSONObject(i).toString()); if (json.getString("inner_op").equals("|")) { ruleSql.insert(0, "hasAny(audience_id," + json.getJSONArray("audiences") + ")"); } else { ruleSql.insert(0, "hasAll(audience_id," + json.getJSONArray("audiences") + ")"); } if (i + 1 < group_rules.size()) { JSONObject outerJson = Constants.String2JSONObject(group_rules.getJSONObject(i + 1).toString()); if (outerJson.getString("outer_op").equals("-")) { ruleSql.insert(0, " AND NOT (").append(")"); } else if (outerJson.getString("outer_op").equals("|")) { ruleSql.insert(0, " OR "); } else { ruleSql.insert(0, " AND "); } } } if (StringUtils.isNotBlank(ruleSql)) { sql.append("AND (").append(ruleSql).append(")"); } } if (jsonObject.containsKey("audience_rules")) { JSONObject audience_rules = jsonObject.getJSONObject("audience_rules"); // String[] rules = jsonObject.getString("audience_rules_str").split(";", -1); StringBuilder ruleSql = new StringBuilder(); // hasAll if (audience_rules.containsKey("intersections") && audience_rules.getJSONArray("intersections") != null && !audience_rules.getJSONArray("intersections").isEmpty()) { ruleSql.append("hasAll(audience_id,").append(audience_rules.getJSONArray("intersections")).append(")"); } // hasAny if (audience_rules.containsKey("union") && audience_rules.getJSONArray("union") != null && !audience_rules.getJSONArray("union").isEmpty()) { if (StringUtils.isNotBlank(ruleSql)) { ruleSql.append(" OR hasAny(audience_id,").append(audience_rules.getJSONArray("union")).append(")"); } else { ruleSql.append("hasAny(audience_id,").append(audience_rules.getJSONArray("union")).append(")"); } } if (StringUtils.isNotBlank(ruleSql)) { sql.append("AND (").append(ruleSql).append(")"); } // not hasAny if (audience_rules.containsKey("subtraction") && audience_rules.getJSONArray("subtraction") != null && !audience_rules.getJSONArray("subtraction").isEmpty()) { sql.append(" AND NOT hasAny(audience_id,").append(audience_rules.getJSONArray("subtraction")).append(")"); } } return sql.toString(); } private static Tuple checkSuccess(JSONArray jsonArray, long startTime) { Long futime = 0L; for (Object o : jsonArray) { JSONObject audienceObject = queryById(o.toString()); Long utime = audienceObject.getLong("audience_data_utime"); if (utime > futime) { futime = utime; } if (utime < startTime) { return new Tuple(false, utime); } } return new Tuple(true, futime); } private static JSONObject queryById(String id) { CloseableHttpClient client = HttpClients.createDefault(); List<BasicNameValuePair> formparams = new ArrayList<>(); final String serverUrl = baseUrl + "rtdmp/audience/query"; URIBuilder uri = new URIBuilder(); try { uri = new URIBuilder(serverUrl) .addParameter("id", id); } catch (URISyntaxException e) { e.printStackTrace(); } RequestConfig requestConfig = RequestConfig.custom() .setConnectTimeout(5000).setConnectionRequestTimeout(5000) .setSocketTimeout(5000).build(); HttpGet httpGet = null; try { httpGet = new HttpGet(uri.build()); } catch (URISyntaxException e) { e.printStackTrace(); } String key = UUID.randomUUID().toString(); String token = MD5Util.getMD5Str(key); httpGet.setHeader("Auth-System", "dmp"); httpGet.setHeader("Content-Type", "text/plain"); httpGet.setHeader("key", key); httpGet.setHeader("token", token); JSONObject res = new JSONObject(); CloseableHttpResponse response; try { response = client.execute(httpGet); BufferedReader rd = new BufferedReader( new InputStreamReader(response.getEntity().getContent())); StringBuilder result = new StringBuilder(); String line; while ((line = rd.readLine()) != null) { result.append(line); } JSONObject jsonObject = Constants.String2JSONObject(result.toString()); if (jsonObject.getInteger("code") == 200 && jsonObject.containsKey("data") && jsonObject.getJSONArray("data").size() > 0) { res = jsonObject.getJSONArray("data").getJSONObject(0); } } catch (IOException e) { e.printStackTrace(); } finally { httpGet.abort(); } return res; } private static void multipleFileUpload(ResultSet resultSet, int audienceId, String deviceType, String part, int ip) { List<String> filePaths = new ArrayList<>(); try { int i = 0; FileWriter fstream = null; BufferedWriter out = null; int j = 0; while (resultSet.next()) { String devid = resultSet.getString("devid"); String fileName = OUTPUT + audienceId + "." + ip + "_" + j + ".txt"; File file = new File(fileName); if (file.exists() && i == 0) { file.delete(); try { fstream = new FileWriter(fileName, true); } catch (IOException e) { e.printStackTrace(); } out = new BufferedWriter(fstream); } else if (!file.exists()) { try { fstream = new FileWriter(fileName, true); } catch (IOException e) { e.printStackTrace(); } out = new BufferedWriter(fstream); } i = i + 1; out.write(devid); out.newLine(); if (i == 10000000) { i = 0; out.close(); String zipName = OUTPUT + audienceId + "." + ip + "_" + j + ".gz"; ZipFileUtil.compress(fileName, zipName); filePaths.add(zipName); // 压缩完后即删除原文件 if (file.exists()) { file.delete(); } j = j + 1; } else if (i <= 10000000 && resultSet.isLast()) { out.close(); String zipName = OUTPUT + audienceId + "." + ip + "_" + j + ".gz"; ZipFileUtil.compress(fileName, zipName); filePaths.add(zipName); // 压缩完后即删除原文件 if (file.exists()) { file.delete(); } } } } catch (IOException | SQLException e) { LOGGER.info(e.getMessage()); } AwsUploadFileToS3 awsUploadFileToS3 = new AwsUploadFileToS3(); awsUploadFileToS3.uploadFileList(filePaths, audienceId, deviceType, part); } }