package mobvista.dmp.datasource.ali; import ch.qos.logback.classic.Logger; import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.joran.JoranConfigurator; import ch.qos.logback.core.joran.spi.JoranException; import com.alibaba.fastjson.JSONObject; import com.google.common.util.concurrent.*; import mobvista.dmp.common.ClickHouseJdbc; import mobvista.dmp.util.DateUtil; import org.apache.http.HttpResponse; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.slf4j.LoggerFactory; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import ru.yandex.clickhouse.ClickHouseConnection; import ru.yandex.clickhouse.except.ClickHouseUnknownException; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.*; import java.util.concurrent.*; public class YOUKURequest { public static Logger logger = null; static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(300, 500, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(500), new CustomizableThreadFactory("YOUKURequest-"), new ThreadPoolExecutor.CallerRunsPolicy()); private static String dt = DateUtil.format(new Date(), "yyyy-MM-dd"); public static void main(String[] args) throws JoranException { LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(context); context.reset(); configurator.doConfigure(YOUKURequest.class.getClassLoader().getResourceAsStream("logback-syslog.xml")); logger = context.getLogger("YOUKURequest"); int fors = 30; int concurrent = 10000; String device_type=""; if (args.length >= 4) { fors = Integer.parseInt(args[0]); concurrent = Integer.parseInt(args[1]); dt = args[2]; device_type = args[3]; } long start = System.currentTimeMillis(); List<ListenableFuture<String>> futures = new CopyOnWriteArrayList<>(); long min_start = System.currentTimeMillis(); for (int c = 1; c <= fors; c++) { long in_start = System.currentTimeMillis(); ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(poolExecutor); MoreExecutors.addDelayedShutdownHook(listeningExecutor, 5, TimeUnit.SECONDS); ClickHouseConnection connection = null; ResultSet resultSet = null; try { connection = ClickHouseJdbc.connectionByTime(c); } catch (SQLException | InterruptedException e) { try { connection = ClickHouseJdbc.connectionByTime(c); } catch (SQLException | InterruptedException ex) { logger.info("ClickHouse Connection Failure!"); } } String date = DateUtil.getDayByString(dt, "yyyy-MM-dd", -1); try { assert connection != null; resultSet = query(connection, date, device_type,concurrent * (c - 1), concurrent); } catch (SQLException | InterruptedException e) { logger.info("ClickHouse Query Failure!"); } List<String> resultList = new CopyOnWriteArrayList<>(); try { while (resultSet.next()) { String device_ids = resultSet.getString("device_ids"); String finalDevice_type = device_type; ListenableFuture listenableFuture = listeningExecutor.submit(() -> { return getLaxinResult(device_ids, finalDevice_type); }); Futures.addCallback(listenableFuture, new FutureCallback<String>() { @Override public void onSuccess(String result) { resultList.add(result); } @Override public void onFailure(Throwable t) { resultList.add(new JSONObject().toJSONString()); } }); futures.add(listenableFuture); } } catch (SQLException e) { logger.info("ClickHouse Execute Failure!"); } long min_end = System.currentTimeMillis(); logger.info("Times -->> " + c + ", Runtime -->> " + (min_end - in_start)); if (c % 10 == 0) { try { logger.info("Times -->> " + c / 10 + ", ClickHouse begin Insert~~"); String datetime = DateUtil.format(new Date(), "yyyy-MM-dd HH"); insertYOUKU(connection, futures, datetime.substring(0, 10), datetime.substring(11, 13), device_type); logger.info("Times -->> " + c / 10 + ", ClickHouse Insert Success! Size -->> " + futures.size()); } catch (SQLException | InterruptedException e) { logger.info("ClickHouse Insert Failure!"); } futures = new CopyOnWriteArrayList<>(); min_end = System.currentTimeMillis(); logger.info("ClickHouse Insert Success! Times -->> " + c / 10 + ", Runtime -->> " + (min_end - min_start)); min_start = System.currentTimeMillis(); } } poolExecutor.shutdown(); long end = System.currentTimeMillis(); logger.info("all runtime -->> " + (end - start)); } public static ResultSet query(Connection connection, String dt, String device_type, int offSet, int size) throws SQLException, InterruptedException { ResultSet resultSet; String sql = "SELECT device_ids FROM dmp.youku_laxin_daily_all WHERE dt = '@dt' and device_type = '@device_type' LIMIT @offSet, @size"; sql = sql.replace("@dt", dt).replace("@device_type", device_type).replace("@offSet", String.valueOf(offSet)).replace("@size", String.valueOf(size)); try { resultSet = connection.createStatement().executeQuery(sql); return resultSet; } catch (ClickHouseUnknownException e) { Thread.sleep(100); resultSet = connection.createStatement().executeQuery(sql); return resultSet; } } public static void insertYOUKU(Connection connection, List<ListenableFuture<String>> futures, String dt, String hour,String device_type) throws SQLException, InterruptedException { PreparedStatement preparedStatement = null; try { preparedStatement = connection.prepareStatement("INSERT INTO dmp.youku_laxin_result_daily (dt, hour, device_type,device_ids) VALUES (?, ?, ?, ?)"); logger.info("INSERT begin==="); for (ListenableFuture<String> future : futures) { try { String result = future.get(); preparedStatement.setString(1, dt); preparedStatement.setString(2, hour); preparedStatement.setString(3, device_type); preparedStatement.setString(4, result); preparedStatement.addBatch(); } catch (ExecutionException e) { e.printStackTrace(); } } preparedStatement.executeBatch(); logger.info("INSERT SUCCESS!"); } catch (ClickHouseUnknownException | NullPointerException e) { logger.info("INSERT -->> " + e.getMessage()); Thread.sleep(100); assert preparedStatement != null; preparedStatement.executeBatch(); } finally { if (preparedStatement != null) { preparedStatement.close(); } } } public static String getLaxinResult(String deviceId,String deviceType) { final String youkuUrl = "https://youku-dsp.youku.com/open/query"; // Map<String, String> dataMap = new HashMap<>(); // dataMap.put("imei","008796754298691"); // dataMap.put("hashImei","3a0e6c21f3610532970eb7cd25b55efe"); // Map<String, String> hashImeimap = new HashMap<>(); // hashImeimap.put("hashImei","3a0e6c21f3610532970eb7cd25b55efe"); Map<String, String> dataMap = new HashMap<>(); if(deviceType.equals("OAID_MD5")){ dataMap.put("hashOaid",deviceId); }else if(deviceType.equals("IMEI_MD5")){ dataMap.put("hashImei",deviceId); } List<Map<String, String>> queryItems = Arrays.asList(dataMap); JSONObject requestBody = new JSONObject(); requestBody.put("company", "huishi"); requestBody.put("deviceOs", 2); requestBody.put("queryItems", queryItems); CloseableHttpClient client = HttpClients.createDefault(); RequestConfig requestConfig = RequestConfig.custom() .setConnectTimeout(3000).setConnectionRequestTimeout(3000) .setSocketTimeout(0).build(); HttpPost post = new HttpPost(youkuUrl); post.setHeader("Content-Type", "application/json;charset=utf-8"); JSONObject jsonObject = new JSONObject(); HttpResponse response; StringBuilder result = new StringBuilder(); try { post.setConfig(requestConfig); post.setEntity(new StringEntity(requestBody.toJSONString())); response = client.execute(post); BufferedReader rd = new BufferedReader( new InputStreamReader(response.getEntity().getContent())); String line; while ((line = rd.readLine()) != null) { result.append(line); } // logger.info("result -->> " + result.toString()); } catch (IOException e) { // logger.info("IOException -->> " + e.getMessage()); } finally { post.abort(); // client.getConnectionManager().shutdown(); try { client.close(); } catch (IOException e) { // logger.info("IOException -->> " + e.getMessage()); } } return result.toString(); } }