package mobvista.dmp.datasource.ali; import ch.qos.logback.classic.Logger; import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.joran.JoranConfigurator; import ch.qos.logback.core.joran.spi.JoranException; import com.alibaba.fastjson.JSONObject; import com.google.common.util.concurrent.*; import mobvista.dmp.common.ClickHouseJdbc; import mobvista.dmp.util.DateUtil; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.client.HttpClient; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; import org.apache.http.message.BasicHeader; import org.apache.http.protocol.HTTP; import org.slf4j.LoggerFactory; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import ru.yandex.clickhouse.ClickHouseConnection; import ru.yandex.clickhouse.except.ClickHouseUnknownException; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.concurrent.*; public class UCRequest { public static Logger logger = null; static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(100, 180, 500, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(180), new CustomizableThreadFactory("UCRequest-"), new ThreadPoolExecutor.CallerRunsPolicy()); private static String dt = DateUtil.format(new Date(), "yyyy-MM-dd"); public static void main(String[] args) throws InterruptedException,JoranException { LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(context); context.reset(); configurator.doConfigure(UCRequest.class.getClassLoader().getResourceAsStream("logback-syslog.xml")); logger = context.getLogger("UCRequest"); int fors = 30; int concurrent = 10000; String device_type=""; if (args.length >= 4) { fors = Integer.parseInt(args[0]); concurrent = Integer.parseInt(args[1]); dt = args[2]; device_type = args[3]; } List<String> crowdCodesList =null; // 2021.05.12 调整crowdCodes if(device_type.equals("IMEI_MD5")){ crowdCodesList = Arrays.asList("d3f521a0253bc032f245530c18e47348","304fcdfc329107ce4817e0606dc4bc80"); }else if(device_type.equals("OAID_MD5")){ crowdCodesList = Arrays.asList("aff149ab9af9da4d95b4a448f24d922f","4b5a58c23b3bd5e171be0cc430593086"); } long start = System.currentTimeMillis(); List<ListenableFuture<String>> futures = new CopyOnWriteArrayList<>(); long min_start = System.currentTimeMillis(); for (int c = 1; c <= fors; c++) { long in_start = System.currentTimeMillis(); ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(poolExecutor); MoreExecutors.addDelayedShutdownHook(listeningExecutor, 5, TimeUnit.SECONDS); ClickHouseConnection connection = null; ResultSet resultSet = null; try { connection = ClickHouseJdbc.connectionByTime(c); } catch (SQLException e) { try { connection = ClickHouseJdbc.connectionByTime(c); } catch (SQLException ex) { logger.info("ClickHouse Connection Failure!"); } } String date = DateUtil.getDayByString(dt, "yyyy-MM-dd", -1); try { assert connection != null; resultSet = query(connection, date, device_type,concurrent * (c - 1), concurrent); } catch (SQLException e) { logger.info("ClickHouse Query Failure!"); } List<String> resultList = new CopyOnWriteArrayList<>(); try { while (resultSet.next()) { String device_ids = resultSet.getString("device_ids"); String finalDevice_type = device_type; List<String> finalCrowdCodesList = crowdCodesList; ListenableFuture listenableFuture = listeningExecutor.submit(() -> { String[] split = device_ids.split(","); List<String> uidsList = Arrays.asList(split); return PostJSONSetTimeout(uidsList, finalDevice_type, finalCrowdCodesList); }); Futures.addCallback(listenableFuture, new FutureCallback<String>() { @Override public void onSuccess(String result) { resultList.add(result); } @Override public void onFailure(Throwable t) { resultList.add(new JSONObject().toJSONString()); } }); futures.add(listenableFuture); } } catch (SQLException e) { logger.info("ClickHouse Execute Failure!"); } long min_end = System.currentTimeMillis(); logger.info("Times -->> " + c + ", Runtime -->> " + (min_end - in_start)); if (c % 10 == 0) { try { logger.info("Times -->> " + c / 10 + ", ClickHouse begin Insert~~"); String datetime = DateUtil.format(new Date(), "yyyy-MM-dd HH"); insertUC(connection, futures, datetime.substring(0, 10), datetime.substring(11, 13), device_type); logger.info("Times -->> " + c / 10 + ", ClickHouse Insert Success! Size -->> " + futures.size()); } catch (SQLException e) { logger.info("ClickHouse Insert Failure!"); } futures = new CopyOnWriteArrayList<>(); min_end = System.currentTimeMillis(); logger.info("ClickHouse Insert Success! Times -->> " + c / 10 + ", Runtime -->> " + (min_end - min_start)); min_start = System.currentTimeMillis(); } } poolExecutor.shutdown(); long end = System.currentTimeMillis(); logger.info("all runtime -->> " + (end - start)); } public static ResultSet query(Connection connection, String dt,String device_type, int offSet, int size) throws SQLException, InterruptedException { ResultSet resultSet; String sql = "SELECT device_ids FROM dmp.uc_lahuo_daily_all WHERE dt = '@dt' and device_type = '@device_type' LIMIT @offSet, @size"; sql = sql.replace("@dt", dt).replace("@device_type", device_type).replace("@offSet", String.valueOf(offSet)).replace("@size", String.valueOf(size)); try { resultSet = connection.createStatement().executeQuery(sql); return resultSet; } catch (ClickHouseUnknownException e) { Thread.sleep(100); resultSet = connection.createStatement().executeQuery(sql); return resultSet; } } public static void insertUC(Connection connection, List<ListenableFuture<String>> futures, String dt, String hour,String device_type) throws SQLException, InterruptedException { PreparedStatement preparedStatement = null; try { preparedStatement = connection.prepareStatement("INSERT INTO dmp.uc_lahuo_result_daily (dt, hour, device_type,device_ids) VALUES (?, ?, ?, ?)"); logger.info("INSERT begin==="); for (ListenableFuture<String> future : futures) { try { String result = future.get(); preparedStatement.setString(1, dt); preparedStatement.setString(2, hour); preparedStatement.setString(3, device_type); preparedStatement.setString(4, result); preparedStatement.addBatch(); } catch (ExecutionException e) { e.printStackTrace(); } } preparedStatement.executeBatch(); logger.info("INSERT SUCCESS!"); } catch (ClickHouseUnknownException | NullPointerException e) { logger.info("INSERT -->> " + e.getMessage()); Thread.sleep(100); assert preparedStatement != null; preparedStatement.executeBatch(); } finally { if (preparedStatement != null) { preparedStatement.close(); } } } public static String PostJSON( List<String> uidsList,String uidType,List<String> crowdCodesList) { String url = "https://ugp.uc.cn/crowd/batch/v2"; JSONObject json = new JSONObject(); // List<String> uidsList = Arrays.asList("00200072f915c1ca9dee20cedeb275a5", "00019c55395e6f73ccae5b5ef3aec28c", "0015685ac69cd22edf8e8ebe1e9cd71f", "00156dcfe3263865123d0aace1643688","002b3c08245db0c7c70742c4480d3d72"); // List<String> crowdCodesList = Arrays.asList("d3f521a0253bc032f245530c18e47348", "4b5a58c23b3bd5e171be0cc430593086","33c7777291f475dd2e3d15872a3b9c76","304fcdfc329107ce4817e0606dc4bc80"); json.put("uids", uidsList); json.put("uidType", uidType); json.put("crowdCodes", crowdCodesList); json.put("product", "UC"); json.put("channel", "Mintegral"); // HttpClient client = new DefaultHttpClient(); // HttpClient client = HttpClientBuilder.create().build(); CloseableHttpClient client = HttpClients.createDefault(); RequestConfig requestConfig = RequestConfig.custom() .setConnectTimeout(4000).setConnectionRequestTimeout(4000) .setSocketTimeout(4000).build(); HttpPost post = new HttpPost(url); post.setHeader("Content-Type", "application/json"); String result = ""; try { StringEntity s = new StringEntity(json.toString(), "utf-8"); s.setContentEncoding(new BasicHeader(HTTP.CONTENT_TYPE, "application/json")); post.setConfig(requestConfig); post.setEntity(s); // 发送请求 HttpResponse httpResponse = client.execute(post); // 获取响应输入流 InputStream inStream = httpResponse.getEntity().getContent(); BufferedReader reader = new BufferedReader(new InputStreamReader( inStream, "utf-8")); StringBuilder strber = new StringBuilder(); String line = null; while ((line = reader.readLine()) != null) strber.append(line); inStream.close(); result = strber.toString(); } catch (Exception e) { System.out.println("请求异常"); throw new RuntimeException(e); }finally{ post.abort(); try { client.close(); } catch (IOException e) { logger.info("IOException -->> " + e.getMessage()); } } return result; } public static String PostJSONSetTimeout( List<String> uidsList, String uidType, List<String> crowdCodesList) { String url = "https://ugp.uc.cn/crowd/batch/v2"; JSONObject requestBody = new JSONObject(); // List<String> crowdCodesList = Arrays.asList("d3f521a0253bc032f245530c18e47348","4b5a58c23b3bd5e171be0cc430593086","33c7777291f475dd2e3d15872a3b9c76","304fcdfc329107ce4817e0606dc4bc80"); requestBody.put("uids", uidsList); requestBody.put("uidType", uidType); requestBody.put("crowdCodes", crowdCodesList); requestBody.put("product", "UC"); requestBody.put("channel", "Mintegral"); CloseableHttpClient client = HttpClients.createDefault(); RequestConfig requestConfig = RequestConfig.custom() .setConnectTimeout(4000).setConnectionRequestTimeout(4000) .setSocketTimeout(4000).build(); HttpPost post = new HttpPost(url); post.setHeader("Content-Type", "application/json;charset=utf-8"); HttpResponse response; StringBuilder result = new StringBuilder(); try { post.setConfig(requestConfig); post.setEntity(new StringEntity(requestBody.toJSONString())); response = client.execute(post); BufferedReader rd = new BufferedReader( new InputStreamReader(response.getEntity().getContent())); String line; while ((line = rd.readLine()) != null) { result.append(line); } } catch (IOException e) { // e.printStackTrace(); // 屏蔽错误信息输出语句,提高运行速度 // System.out.println("IOException="+e.getMessage()); } finally { post.abort(); try { client.close(); } catch (IOException e) { logger.info("IOException -->> " + e.getMessage()); } } return result.toString(); } }