package mobvista.dmp.datasource.ali;

import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import com.alibaba.fastjson.JSONObject;
import com.google.common.util.concurrent.*;
import mobvista.dmp.common.ClickHouseJdbc;
import mobvista.dmp.util.DateUtil;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.except.ClickHouseUnknownException;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.*;

public class YOUKURequest {
    public static Logger logger = null;

    static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(300, 500, 1000, TimeUnit.MILLISECONDS,
            new LinkedBlockingDeque<>(500), new CustomizableThreadFactory("YOUKURequest-"), new ThreadPoolExecutor.CallerRunsPolicy());

    private static String dt = DateUtil.format(new Date(), "yyyy-MM-dd");

    public static void main(String[] args) throws JoranException {
        LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(context);
        context.reset();
        configurator.doConfigure(YOUKURequest.class.getClassLoader().getResourceAsStream("logback-syslog.xml"));
        logger = context.getLogger("YOUKURequest");

        int fors = 30;
        int concurrent = 10000;
        String device_type="";
        if (args.length >= 4) {
            fors = Integer.parseInt(args[0]);
            concurrent = Integer.parseInt(args[1]);
            dt = args[2];
            device_type = args[3];
        }

        long start = System.currentTimeMillis();
        List<ListenableFuture<String>> futures = new CopyOnWriteArrayList<>();

        long min_start = System.currentTimeMillis();
        for (int c = 1; c <= fors; c++) {
            long in_start = System.currentTimeMillis();
            ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(poolExecutor);
            MoreExecutors.addDelayedShutdownHook(listeningExecutor, 5, TimeUnit.SECONDS);
            ClickHouseConnection connection = null;
            ResultSet resultSet = null;
            try {
                connection = ClickHouseJdbc.connectionByTime(c);
            } catch (SQLException | InterruptedException e) {
                try {
                    connection = ClickHouseJdbc.connectionByTime(c);
                } catch (SQLException | InterruptedException ex) {
                    logger.info("ClickHouse Connection Failure!");
                }
            }
            String date = DateUtil.getDayByString(dt, "yyyy-MM-dd", -1);
            try {
                assert connection != null;
                resultSet = query(connection, date, device_type,concurrent * (c - 1), concurrent);
            } catch (SQLException | InterruptedException e) {
                logger.info("ClickHouse Query Failure!");
            }
            List<String> resultList = new CopyOnWriteArrayList<>();
            try {
                while (resultSet.next()) {
                    String device_ids = resultSet.getString("device_ids");
                    String finalDevice_type = device_type;
                    ListenableFuture listenableFuture = listeningExecutor.submit(() -> {
                        return getLaxinResult(device_ids, finalDevice_type);
                    });

                    Futures.addCallback(listenableFuture, new FutureCallback<String>() {
                        @Override
                        public void onSuccess(String result) {
                            resultList.add(result);
                        }

                        @Override
                        public void onFailure(Throwable t) {
                            resultList.add(new JSONObject().toJSONString());
                        }
                    });
                    futures.add(listenableFuture);
                }
            } catch (SQLException e) {
                logger.info("ClickHouse Execute Failure!");
            }
            long min_end = System.currentTimeMillis();
            logger.info("Times -->> " + c + ", Runtime -->> " + (min_end - in_start));
            if (c % 10 == 0) {
                try {
                    logger.info("Times -->> " + c / 10 + ", ClickHouse begin Insert~~");
                    String datetime = DateUtil.format(new Date(), "yyyy-MM-dd HH");
                    insertYOUKU(connection, futures, datetime.substring(0, 10), datetime.substring(11, 13), device_type);
                    logger.info("Times -->> " + c / 10 + ", ClickHouse Insert Success! Size -->> " + futures.size());
                } catch (SQLException | InterruptedException e) {
                    logger.info("ClickHouse Insert Failure!");
                }
                futures = new CopyOnWriteArrayList<>();
                min_end = System.currentTimeMillis();
                logger.info("ClickHouse Insert Success! Times -->> " + c / 10 + ", Runtime -->> " + (min_end - min_start));
                min_start = System.currentTimeMillis();
            }
        }
        poolExecutor.shutdown();
        long end = System.currentTimeMillis();
        logger.info("all runtime -->> " + (end - start));
    }

    public static ResultSet query(Connection connection, String dt, String device_type, int offSet, int size) throws SQLException, InterruptedException {
        ResultSet resultSet;
        String sql = "SELECT device_ids FROM dmp.youku_laxin_daily_all WHERE dt = '@dt' and device_type = '@device_type' LIMIT @offSet, @size";
        sql = sql.replace("@dt", dt).replace("@device_type", device_type).replace("@offSet", String.valueOf(offSet)).replace("@size", String.valueOf(size));
        try {
            resultSet = connection.createStatement().executeQuery(sql);
            return resultSet;
        } catch (ClickHouseUnknownException e) {
            Thread.sleep(100);
            resultSet = connection.createStatement().executeQuery(sql);
            return resultSet;
        }
    }

    public static void insertYOUKU(Connection connection, List<ListenableFuture<String>> futures, String dt, String hour,String device_type) throws SQLException, InterruptedException {
        PreparedStatement preparedStatement = null;
        try {
            preparedStatement = connection.prepareStatement("INSERT INTO dmp.youku_laxin_result_daily (dt, hour, device_type,device_ids) VALUES (?, ?, ?, ?)");
            logger.info("INSERT begin===");
            for (ListenableFuture<String> future : futures) {
                try {
                    String result = future.get();
                    preparedStatement.setString(1, dt);
                    preparedStatement.setString(2, hour);
                    preparedStatement.setString(3, device_type);
                    preparedStatement.setString(4, result);
                    preparedStatement.addBatch();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
            preparedStatement.executeBatch();
            logger.info("INSERT SUCCESS!");
        } catch (ClickHouseUnknownException | NullPointerException e) {
            logger.info("INSERT -->> " + e.getMessage());
            Thread.sleep(100);
            assert preparedStatement != null;
            preparedStatement.executeBatch();
        } finally {
            if (preparedStatement != null) {
                preparedStatement.close();
            }
        }
    }

    public static String getLaxinResult(String deviceId,String deviceType) {
        final String youkuUrl = "https://youku-dsp.youku.com/open/query";
//        Map<String, String> dataMap = new HashMap<>();
//        dataMap.put("imei","008796754298691");
//        dataMap.put("hashImei","3a0e6c21f3610532970eb7cd25b55efe");

//        Map<String, String> hashImeimap = new HashMap<>();
//        hashImeimap.put("hashImei","3a0e6c21f3610532970eb7cd25b55efe");

        Map<String, String> dataMap = new HashMap<>();
        if(deviceType.equals("OAID_MD5")){
            dataMap.put("hashOaid",deviceId);
        }else if(deviceType.equals("IMEI_MD5")){
            dataMap.put("hashImei",deviceId);
        }

        List<Map<String, String>> queryItems = Arrays.asList(dataMap);

        JSONObject requestBody = new JSONObject();
        requestBody.put("company", "huishi");
        requestBody.put("deviceOs", 2);
        requestBody.put("queryItems", queryItems);

        CloseableHttpClient client = HttpClients.createDefault();

        RequestConfig requestConfig = RequestConfig.custom()
                .setConnectTimeout(3000).setConnectionRequestTimeout(3000)
                .setSocketTimeout(0).build();

        HttpPost post = new HttpPost(youkuUrl);
        post.setHeader("Content-Type", "application/json;charset=utf-8");
        JSONObject jsonObject = new JSONObject();
        HttpResponse response;
        StringBuilder result = new StringBuilder();
        try {
            post.setConfig(requestConfig);
            post.setEntity(new StringEntity(requestBody.toJSONString()));
            response = client.execute(post);
            BufferedReader rd = new BufferedReader(
                    new InputStreamReader(response.getEntity().getContent()));
            String line;
            while ((line = rd.readLine()) != null) {
                result.append(line);
            }
            //  logger.info("result -->> " + result.toString());
        } catch (IOException e) {
//            logger.info("IOException -->> " + e.getMessage());
        } finally {
            post.abort();
//            client.getConnectionManager().shutdown();
            try {
                client.close();
            } catch (IOException e) {
//                logger.info("IOException -->> " + e.getMessage());
            }
        }
        return result.toString();
    }

}