package mobvista.dmp.datasource.baichuan; import ch.qos.logback.classic.Logger; import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.joran.JoranConfigurator; import ch.qos.logback.core.joran.spi.JoranException; import com.alibaba.fastjson.JSONObject; import com.google.common.util.concurrent.*; import mobvista.dmp.common.ClickHouseJdbc; import mobvista.dmp.util.DateUtil; import org.slf4j.LoggerFactory; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import ru.yandex.clickhouse.ClickHouseConnection; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Date; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @package: mobvista.dmp.datasource.baichuan * @author: wangjf * @date: 2019-08-31 * @time: 07:49 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ public class BaiChuanMainV2 { private static String dt = DateUtil.format(new Date(), "yyyy-MM-dd"); static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(100, 200, 500, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(200), new CustomizableThreadFactory("BaiChuan"), new ThreadPoolExecutor.CallerRunsPolicy()); public static void main(String[] args) throws JoranException, InterruptedException { LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(context); context.reset(); configurator.doConfigure(BaiChuanMainV2.class.getClassLoader().getResourceAsStream("logback-syslog.xml")); Logger logger = context.getLogger("baichuan"); String appId = "2"; String appOs = "1"; int fors = 200; int concurrent = 2000; if (args.length >= 4) { appId = args[0]; appOs = args[1]; fors = Integer.parseInt(args[2]); concurrent = Integer.parseInt(args[3]); dt = args[4]; } long start = System.currentTimeMillis(); List<ListenableFuture<AsoDevice>> futures = new CopyOnWriteArrayList<>(); for (int c = 1; c <= fors; c++) { long min_start = System.currentTimeMillis(); ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(poolExecutor); MoreExecutors.addDelayedShutdownHook(listeningExecutor, 500, TimeUnit.SECONDS); ClickHouseConnection connection = null; ResultSet resultSet = null; try { connection = ClickHouseJdbc.connection(); } catch (SQLException e) { try { connection = ClickHouseJdbc.connection(); } catch (SQLException ex) { logger.info("ClickHouse Connection Failure!"); } } String date = DateUtil.getDayByString(dt, "yyyy-MM-dd", -1); try { resultSet = ClickHouseUtils.queryAli(connection, date, appId, appOs, concurrent * c, concurrent); } catch (SQLException e) { logger.info("ClickHouse Query Failure!"); } List<AsoDevice> resultList = new CopyOnWriteArrayList<>(); try { while (resultSet.next()) { String deviceId = resultSet.getString("device_id"); AsoDevice asoDevice = returnClass(appId, appOs, deviceId); int finalAppOs = Integer.parseInt(appOs); ListenableFuture listenableFuture = listeningExecutor.submit(() -> { JSONObject jsonObject = BaiChuanServerV2.request(logger, asoDevice, finalAppOs); if (jsonObject.getBoolean("result")) { asoDevice.setPackageName("0"); } return asoDevice; }); Futures.addCallback(listenableFuture, new FutureCallback<AsoDevice>() { @Override public void onSuccess(AsoDevice asoDevices) { resultList.add(asoDevices); } @Override public void onFailure(Throwable t) { asoDevice.setPackageName("0"); resultList.add(asoDevice); } }); futures.add(listenableFuture); } } catch (SQLException e) { logger.info("ClickHouse Execute Failure!"); } try { ClickHouseUtils.insert2(connection, futures); logger.info("Times -->> " + c + ", ClickHouse Insert Success! Size -->> " + futures.size()); } catch (SQLException e) { logger.info("ClickHouse Insert Failure!"); } futures = new CopyOnWriteArrayList<>(); long min_end = System.currentTimeMillis(); logger.info("Times -->> " + c + ", Runtime -->> " + (min_end - min_start)); } poolExecutor.shutdown(); long end = System.currentTimeMillis(); logger.info("all runtime -->> " + (end - start)); } private static AsoDevice returnClass(String appId, String appOs, String deviceId) { AsoDevice asoDevice = new AsoDevice(); if ("1".equals(appId)) { if ("1".equals(appOs)) { asoDevice.setDeviceId(deviceId); asoDevice.setDeviceType("imei"); asoDevice.setPlatform("android"); asoDevice.setPackageName("com.tmall.wireless"); } else if ("2".equals(appOs)) { asoDevice.setDeviceId(deviceId); asoDevice.setDeviceType("idfa"); asoDevice.setPlatform("ios"); asoDevice.setPackageName("518966501"); } else { asoDevice.setDeviceId(deviceId); asoDevice.setDeviceType("imeimd5"); asoDevice.setPlatform("android"); asoDevice.setPackageName("com.tmall.wireless"); } } else { if ("1".equals(appOs)) { asoDevice.setDeviceId(deviceId); asoDevice.setDeviceType("imei"); asoDevice.setPlatform("android"); asoDevice.setPackageName("com.taobao.taobao"); } else if ("2".equals(appOs)) { asoDevice.setDeviceId(deviceId); asoDevice.setDeviceType("idfa"); asoDevice.setPlatform("ios"); asoDevice.setPackageName("387682726"); } else { asoDevice.setDeviceId(deviceId); asoDevice.setDeviceType("imeimd5"); asoDevice.setPlatform("android"); asoDevice.setPackageName("com.taobao.taobao"); } } asoDevice.setDt(dt); return asoDevice; } }