package mobvista.dmp.datasource.iqiyi; import ch.qos.logback.classic.Logger; import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.joran.JoranConfigurator; import ch.qos.logback.core.joran.spi.JoranException; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.google.common.util.concurrent.*; import mobvista.dmp.common.ClickHouseJdbc; import mobvista.dmp.common.Constants; import mobvista.dmp.util.DateUtil; import mobvista.dmp.util.PropertyUtil; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang.StringUtils; import org.apache.http.HttpResponse; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.slf4j.LoggerFactory; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import ru.yandex.clickhouse.ClickHouseConnection; import ru.yandex.clickhouse.except.ClickHouseUnknownException; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.*; import java.util.concurrent.*; /** * @package: mobvista.dmp.datasource.iqiyi * @author: wangjf * @date: 2020/9/10 * @time: 2:40 下午 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ public class IQiYiRequest { public static Logger logger = null; static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(200, 300, 500, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(300), new CustomizableThreadFactory("IQiYiRequest-"), new ThreadPoolExecutor.CallerRunsPolicy()); private static String dt = DateUtil.format(new Date(), "yyyy-MM-dd"); public static void main(String[] args) throws InterruptedException, JoranException { LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(context); context.reset(); configurator.doConfigure(IQiYiMain.class.getClassLoader().getResourceAsStream("logback-syslog.xml")); logger = context.getLogger("IQiYiRequest"); int fors = 30; int concurrent = 10000; if (args.length >= 3) { fors = Integer.parseInt(args[0]); concurrent = Integer.parseInt(args[1]); dt = args[2]; } long start = System.currentTimeMillis(); List<ListenableFuture<String>> futures = new CopyOnWriteArrayList<>(); long min_start = System.currentTimeMillis(); for (int c = 1; c <= fors; c++) { long in_start = System.currentTimeMillis(); ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(poolExecutor); MoreExecutors.addDelayedShutdownHook(listeningExecutor, 5, TimeUnit.SECONDS); ClickHouseConnection connection = null; ResultSet resultSet = null; try { connection = ClickHouseJdbc.connectionByTime(c % 3); } catch (SQLException e) { try { connection = ClickHouseJdbc.connectionByTime(c % 3); } catch (SQLException ex) { logger.info("ClickHouse Connection Failure!"); } } String date = DateUtil.getDayByString(dt, "yyyy-MM-dd", -1); try { assert connection != null; resultSet = query(connection, date, concurrent * (c - 1), concurrent); } catch (SQLException e) { logger.info("ClickHouse Query Failure!"); } List<String> resultList = new CopyOnWriteArrayList<>(); try { while (resultSet.next()) { String device_ids = resultSet.getString("device_ids"); ListenableFuture listenableFuture = listeningExecutor.submit(() -> { JSONObject jsonObject = devicePeopleBatch(device_ids); return jsonObject.toJSONString(); }); Futures.addCallback(listenableFuture, new FutureCallback<String>() { @Override public void onSuccess(String result) { resultList.add(result); } @Override public void onFailure(Throwable t) { resultList.add(new JSONObject().toJSONString()); } }); futures.add(listenableFuture); } } catch (SQLException e) { logger.info("ClickHouse Execute Failure!"); } long min_end = System.currentTimeMillis(); logger.info("Times -->> " + c + ", Runtime -->> " + (min_end - in_start)); try { String datetime = DateUtil.format(new Date(), "yyyy-MM-dd HH"); insertIQiYi(connection, futures, datetime.substring(0, 10), datetime.substring(11, 13)); logger.info("Times -->> " + c + ", ClickHouse Insert Success! Size -->> " + futures.size()); } catch (SQLException e) { logger.info("ClickHouse Insert Failure!"); } finally { try { resultSet.close(); connection.close(); } catch (SQLException throwables) { logger.info("ClickHouse Connection Close!"); } } futures = new CopyOnWriteArrayList<>(); min_end = System.currentTimeMillis(); logger.info("ClickHouse Insert Success! Times -->> " + c + ", Runtime -->> " + (min_end - min_start)); min_start = System.currentTimeMillis(); } poolExecutor.shutdown(); long end = System.currentTimeMillis(); logger.info("all runtime -->> " + (end - start)); } public static ResultSet query(Connection connection, String dt, int offSet, int size) throws SQLException, InterruptedException { ResultSet resultSet; String sql = "SELECT device_ids FROM dwh.iqiyi_lahuo_daily_all WHERE dt = '@dt' LIMIT @offSet, @size"; sql = sql.replace("@dt", dt).replace("@offSet", String.valueOf(offSet)).replace("@size", String.valueOf(size)); try { resultSet = connection.createStatement().executeQuery(sql); return resultSet; } catch (ClickHouseUnknownException e) { Thread.sleep(100); resultSet = connection.createStatement().executeQuery(sql); return resultSet; } } public static void insertIQiYi(Connection connection, List<ListenableFuture<String>> futures, String dt, String hour) throws SQLException, InterruptedException { PreparedStatement preparedStatement = null; try { preparedStatement = connection.prepareStatement("INSERT INTO dwh.iqiyi_lahuo_result_daily (dt, hour, device_ids) VALUES (?, ?, ?)"); for (ListenableFuture<String> future : futures) { try { String result = future.get(); preparedStatement.setString(1, dt); preparedStatement.setString(2, hour); preparedStatement.setString(3, result); preparedStatement.addBatch(); } catch (ExecutionException e) { e.printStackTrace(); } } preparedStatement.executeBatch(); logger.info("INSERT SUCCESS!"); } catch (ClickHouseUnknownException | NullPointerException e) { logger.info("INSERT -->> " + e.getMessage()); Thread.sleep(100); assert preparedStatement != null; preparedStatement.executeBatch(); } finally { if (preparedStatement != null) { preparedStatement.close(); } } } public static JSONObject deviceDmpBatch(String deviceIds) { final String iqiyiUrl = PropertyUtil.getProperty("config.properties", "dmp.server.url"); String encryptType = "1"; String deviceIdType = "1"; JSONObject requestBody = new JSONObject(); requestBody.put("deviceIds", deviceIds); requestBody.put("encryptType", encryptType); requestBody.put("deviceIdType", deviceIdType); CloseableHttpClient client = HttpClients.createDefault(); RequestConfig requestConfig = RequestConfig.custom() .setConnectTimeout(1000).setConnectionRequestTimeout(1000) .setSocketTimeout(1000).build(); HttpPost post = new HttpPost(iqiyiUrl); post.setHeader("Content-Type", "application/json;charset=utf-8"); JSONObject jsonObject = new JSONObject(); HttpResponse response; try { post.setConfig(requestConfig); post.setEntity(new StringEntity(requestBody.toJSONString())); response = client.execute(post); BufferedReader rd = new BufferedReader( new InputStreamReader(response.getEntity().getContent())); StringBuilder result = new StringBuilder(); String line; while ((line = rd.readLine()) != null) { result.append(line); } jsonObject = Constants.String2JSONObject(result.toString()); } catch (IOException e) { logger.info("IOException -->> " + e.getMessage()); } finally { post.abort(); client.getConnectionManager().shutdown(); try { client.close(); } catch (IOException e) { logger.info("IOException -->> " + e.getMessage()); } } return jsonObject; } public static JSONObject devicePeopleBatch(String deviceIds) { final String iqiyiUrl = PropertyUtil.getProperty("config.properties", "iqiyi.lahuo.url"); String encryptType = "1"; String deviceIdType = "1"; /* String deviceIds = request.getString("deviceIds"); if (request.containsKey("encryptType")) { encryptType = request.getString("encryptType"); } if (request.containsKey("deviceIdType")) { deviceIdType = request.getString("deviceIdType"); } */ Map<String, String> map = new HashMap<>(); map.put("jobId", "40"); map.put("deviceIds", deviceIds); map.put("encryptType", encryptType); map.put("deviceIdType", deviceIdType); String secretKey = "57bdede217ae941222f470d047c3ceac"; String signResult = sign(map, secretKey); JSONObject requestBody = JSONObject.parseObject(JSON.toJSONString(map)); requestBody.put("sign", signResult); // logger.info("requestBody -->> " + requestBody); CloseableHttpClient client = HttpClients.createDefault(); RequestConfig requestConfig = RequestConfig.custom() .setConnectTimeout(3000).setConnectionRequestTimeout(3000) .setSocketTimeout(0).build(); HttpPost post = new HttpPost(iqiyiUrl); post.setHeader("Content-Type", "application/json;charset=utf-8"); JSONObject jsonObject = new JSONObject(); HttpResponse response; try { post.setConfig(requestConfig); post.setEntity(new StringEntity(requestBody.toJSONString())); response = client.execute(post); BufferedReader rd = new BufferedReader( new InputStreamReader(response.getEntity().getContent())); StringBuilder result = new StringBuilder(); String line; while ((line = rd.readLine()) != null) { result.append(line); } // logger.info("result -->> " + result.toString()); jsonObject = Constants.String2JSONObject(result.toString()); } catch (IOException e) { // logger.info("IOException -->> " + e.getMessage()); } finally { post.abort(); client.getConnectionManager().shutdown(); try { client.close(); } catch (IOException e) { // logger.info("IOException -->> " + e.getMessage()); } } return jsonObject; } public static String sign(Map<String, String> params, String secretKey) { SortedMap<String, String> sortedParams = new TreeMap<>(params); StringBuilder sb = new StringBuilder(); for (String key : sortedParams.keySet()) { String v = sortedParams.get(key); sb.append(key).append("=").append(StringUtils.defaultIfEmpty(v, "")).append("&"); } final String data = sb.append(secretKey).toString(); return DigestUtils.md5Hex(data); } }