package mobvista.dmp.datasource.iqiyi; import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.joran.JoranConfigurator; import ch.qos.logback.core.joran.spi.JoranException; import com.alibaba.fastjson.JSONObject; import com.google.common.util.concurrent.*; import mobvista.dmp.util.DateUtil; import mobvista.dmp.util.PropertyUtil; import org.slf4j.LoggerFactory; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.InputStreamReader; import java.util.Date; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @package: mobvista.dmp.datasource.iqiyi * @author: wangjf * @date: 2020/9/11 * @time: 10:02 下午 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ public class IQiYiReadFile { public static ch.qos.logback.classic.Logger logger = null; private static final String INPUT = PropertyUtil.getProperty("config.properties", "iqiyi.lahuo.input.dir"); private static String dt = DateUtil.format(new Date(), "yyyyMMdd"); public static void main(String[] args) throws JoranException { LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(context); context.reset(); configurator.doConfigure(IQiYiMain.class.getClassLoader().getResourceAsStream("logback-syslog.xml")); logger = context.getLogger("IQiYiRequest"); ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(300, 500, 500, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(500), new CustomizableThreadFactory("IQiYiRequest-"), new ThreadPoolExecutor.CallerRunsPolicy()); int size = 1; if (args.length >= 2) { size = Integer.parseInt(args[0]); dt = args[1]; } for (int i = 0; i < size; i++) { ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(poolExecutor); MoreExecutors.addDelayedShutdownHook(listeningExecutor, 5, TimeUnit.SECONDS); long start = System.currentTimeMillis(); String filePath = INPUT + dt + "_" + i + ".txt"; try { List<ListenableFuture<String>> futures = new CopyOnWriteArrayList<>(); List<String> resultList = new CopyOnWriteArrayList<>(); File file = new File(filePath); // 判断文件是否存在 if (file.isFile() && file.exists()) { // 考虑到编码格式 InputStreamReader read = new InputStreamReader(new FileInputStream(file)); BufferedReader bufferedReader = new BufferedReader(read); String lineTxt; int j = 1; long in_start = System.currentTimeMillis(); while ((lineTxt = bufferedReader.readLine()) != null) { // logger.info(lineTxt); final String deviceIds = lineTxt; ListenableFuture listenableFuture = listeningExecutor.submit(() -> { JSONObject jsonObject = IQiYiRequest.devicePeopleBatch(deviceIds); return jsonObject.toJSONString(); }); Futures.addCallback(listenableFuture, new FutureCallback<String>() { @Override public void onSuccess(String result) { resultList.add(result); } @Override public void onFailure(Throwable t) { resultList.add(new JSONObject().toJSONString()); } }); futures.add(listenableFuture); /* if (j % 100000 == 0) { ClickHouseConnection connection = null; try { connection = ClickHouseJdbc.connection(); } catch (SQLException e) { try { connection = ClickHouseJdbc.connection(); } catch (SQLException ex) { logger.info("ClickHouse Connection Failure!"); } } try { String datetime = DateUtil.format(new Date(), "yyyy-MM-dd HH"); IQiYiRequest.insertIQiYi(connection, futures, datetime.substring(0, 10), datetime.substring(11, 13)); logger.info("Times -->> " + resultList.size() / 100000 + ", ClickHouse Insert Success! Size -->> " + resultList.size()); } catch (SQLException e) { logger.info("ClickHouse Insert Failure!"); } long in_end = System.currentTimeMillis(); logger.info("Times -->> " + j / 100000 + ", Insert Size -->> " + futures.size() + ", Insert Run Time -->> " + (in_end - in_start)); futures = new CopyOnWriteArrayList<>(); in_start = System.currentTimeMillis(); } */ // j++; } bufferedReader.close(); read.close(); } else { System.out.println("找不到指定的文件"); } } catch (Exception e) { System.out.println("读取文件内容出错"); e.printStackTrace(); } poolExecutor.shutdown(); long end = System.currentTimeMillis(); logger.info("Read File." + filePath + " SUCCESS! Run Time -->> " + (end - start)); } } }