package mobvista.dmp.datasource.iqiyi;

import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import com.alibaba.fastjson.JSONObject;
import com.google.common.util.concurrent.*;
import mobvista.dmp.util.DateUtil;
import mobvista.dmp.util.PropertyUtil;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @package: mobvista.dmp.datasource.iqiyi
 * @author: wangjf
 * @date: 2020/9/11
 * @time: 10:02 下午
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
public class IQiYiReadFile {

    public static ch.qos.logback.classic.Logger logger = null;

    private static final String INPUT = PropertyUtil.getProperty("config.properties", "iqiyi.lahuo.input.dir");

    private static String dt = DateUtil.format(new Date(), "yyyyMMdd");

    public static void main(String[] args) throws JoranException {
        LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(context);
        context.reset();
        configurator.doConfigure(IQiYiMain.class.getClassLoader().getResourceAsStream("logback-syslog.xml"));
        logger = context.getLogger("IQiYiRequest");


        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(300, 500, 500, TimeUnit.MILLISECONDS,
                new LinkedBlockingDeque<>(500), new CustomizableThreadFactory("IQiYiRequest-"), new ThreadPoolExecutor.CallerRunsPolicy());

        int size = 1;
        if (args.length >= 2) {
            size = Integer.parseInt(args[0]);
            dt = args[1];
        }

        for (int i = 0; i < size; i++) {
            ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(poolExecutor);
            MoreExecutors.addDelayedShutdownHook(listeningExecutor, 5, TimeUnit.SECONDS);
            long start = System.currentTimeMillis();
            String filePath = INPUT + dt + "_" + i + ".txt";
            try {
                List<ListenableFuture<String>> futures = new CopyOnWriteArrayList<>();
                List<String> resultList = new CopyOnWriteArrayList<>();
                File file = new File(filePath);
                // 判断文件是否存在
                if (file.isFile() && file.exists()) {
                    // 考虑到编码格式
                    InputStreamReader read = new InputStreamReader(new FileInputStream(file));
                    BufferedReader bufferedReader = new BufferedReader(read);
                    String lineTxt;
                    int j = 1;
                    long in_start = System.currentTimeMillis();
                    while ((lineTxt = bufferedReader.readLine()) != null) {
                        //  logger.info(lineTxt);
                        final String deviceIds = lineTxt;
                        ListenableFuture listenableFuture = listeningExecutor.submit(() -> {
                            JSONObject jsonObject = IQiYiRequest.devicePeopleBatch(deviceIds);
                            return jsonObject.toJSONString();
                        });

                        Futures.addCallback(listenableFuture, new FutureCallback<String>() {
                            @Override
                            public void onSuccess(String result) {
                                resultList.add(result);
                            }

                            @Override
                            public void onFailure(Throwable t) {
                                resultList.add(new JSONObject().toJSONString());
                            }
                        });
                        futures.add(listenableFuture);
                        /*
                        if (j % 100000 == 0) {
                            ClickHouseConnection connection = null;
                            try {
                                connection = ClickHouseJdbc.connection();
                            } catch (SQLException e) {
                                try {
                                    connection = ClickHouseJdbc.connection();
                                } catch (SQLException ex) {
                                    logger.info("ClickHouse Connection Failure!");
                                }
                            }
                            try {
                                String datetime = DateUtil.format(new Date(), "yyyy-MM-dd HH");
                                IQiYiRequest.insertIQiYi(connection, futures, datetime.substring(0, 10), datetime.substring(11, 13));
                                logger.info("Times -->> " + resultList.size() / 100000 + ", ClickHouse Insert Success! Size -->> " + resultList.size());
                            } catch (SQLException e) {
                                logger.info("ClickHouse Insert Failure!");
                            }

                            long in_end = System.currentTimeMillis();
                            logger.info("Times -->> " + j / 100000 + ", Insert Size -->> " + futures.size() + ", Insert Run Time -->> " + (in_end - in_start));
                            futures = new CopyOnWriteArrayList<>();
                            in_start = System.currentTimeMillis();
                        }
                        */

                        //  j++;
                    }
                    bufferedReader.close();
                    read.close();
                } else {
                    System.out.println("找不到指定的文件");
                }
            } catch (Exception e) {
                System.out.println("读取文件内容出错");
                e.printStackTrace();
            }
            poolExecutor.shutdown();
            long end = System.currentTimeMillis();
            logger.info("Read File." + filePath + " SUCCESS! Run Time -->> " + (end - start));
        }
    }
}