package mobvista.dmp.datasource.iqiyi;

import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.util.concurrent.*;
import mobvista.dmp.common.ClickHouseJdbc;
import mobvista.dmp.common.Constants;
import mobvista.dmp.util.DateUtil;
import mobvista.dmp.util.PropertyUtil;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.except.ClickHouseUnknownException;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.*;

/**
 * @package: mobvista.dmp.datasource.iqiyi
 * @author: wangjf
 * @date: 2020/9/10
 * @time: 2:40 下午
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
public class IQiYiRequest {

    public static Logger logger = null;

    static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(200, 300, 500, TimeUnit.MILLISECONDS,
            new LinkedBlockingDeque<>(300), new CustomizableThreadFactory("IQiYiRequest-"), new ThreadPoolExecutor.CallerRunsPolicy());

    private static String dt = DateUtil.format(new Date(), "yyyy-MM-dd");

    public static void main(String[] args) throws InterruptedException, JoranException {
        LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(context);
        context.reset();
        configurator.doConfigure(IQiYiMain.class.getClassLoader().getResourceAsStream("logback-syslog.xml"));
        logger = context.getLogger("IQiYiRequest");

        int fors = 30;
        int concurrent = 10000;
        if (args.length >= 3) {
            fors = Integer.parseInt(args[0]);
            concurrent = Integer.parseInt(args[1]);
            dt = args[2];
        }

        long start = System.currentTimeMillis();
        List<ListenableFuture<String>> futures = new CopyOnWriteArrayList<>();

        long min_start = System.currentTimeMillis();
        for (int c = 1; c <= fors; c++) {
            long in_start = System.currentTimeMillis();
            ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(poolExecutor);
            MoreExecutors.addDelayedShutdownHook(listeningExecutor, 5, TimeUnit.SECONDS);
            ClickHouseConnection connection = null;
            ResultSet resultSet = null;
            try {
                connection = ClickHouseJdbc.connectionByTime(c % 3);
            } catch (SQLException e) {
                try {
                    connection = ClickHouseJdbc.connectionByTime(c % 3);
                } catch (SQLException ex) {
                    logger.info("ClickHouse Connection Failure!");
                }
            }
            String date = DateUtil.getDayByString(dt, "yyyy-MM-dd", -1);
            try {
                assert connection != null;
                resultSet = query(connection, date, concurrent * (c - 1), concurrent);
            } catch (SQLException e) {
                logger.info("ClickHouse Query Failure!");
            }

            List<String> resultList = new CopyOnWriteArrayList<>();
            try {
                while (resultSet.next()) {
                    String device_ids = resultSet.getString("device_ids");
                    ListenableFuture listenableFuture = listeningExecutor.submit(() -> {
                        JSONObject jsonObject = devicePeopleBatch(device_ids);
                        return jsonObject.toJSONString();
                    });

                    Futures.addCallback(listenableFuture, new FutureCallback<String>() {
                        @Override
                        public void onSuccess(String result) {
                            resultList.add(result);
                        }

                        @Override
                        public void onFailure(Throwable t) {
                            resultList.add(new JSONObject().toJSONString());
                        }
                    });
                    futures.add(listenableFuture);
                }
            } catch (SQLException e) {
                logger.info("ClickHouse Execute Failure!");
            }
            long min_end = System.currentTimeMillis();
            logger.info("Times -->> " + c + ", Runtime -->> " + (min_end - in_start));
            try {
                String datetime = DateUtil.format(new Date(), "yyyy-MM-dd HH");
                insertIQiYi(connection, futures, datetime.substring(0, 10), datetime.substring(11, 13));
                logger.info("Times -->> " + c + ", ClickHouse Insert Success! Size -->> " + futures.size());
            } catch (SQLException e) {
                logger.info("ClickHouse Insert Failure!");
            } finally {
                try {
                    resultSet.close();
                    connection.close();
                } catch (SQLException throwables) {
                    logger.info("ClickHouse Connection Close!");
                }
            }
            futures = new CopyOnWriteArrayList<>();
            min_end = System.currentTimeMillis();
            logger.info("ClickHouse Insert Success! Times -->> " + c + ", Runtime -->> " + (min_end - min_start));
            min_start = System.currentTimeMillis();
        }
        poolExecutor.shutdown();
        long end = System.currentTimeMillis();
        logger.info("all runtime -->> " + (end - start));
    }

    public static ResultSet query(Connection connection, String dt, int offSet, int size) throws SQLException, InterruptedException {
        ResultSet resultSet;
        String sql = "SELECT device_ids FROM dwh.iqiyi_lahuo_daily_all WHERE dt = '@dt' LIMIT @offSet, @size";
        sql = sql.replace("@dt", dt).replace("@offSet", String.valueOf(offSet)).replace("@size", String.valueOf(size));
        try {
            resultSet = connection.createStatement().executeQuery(sql);
            return resultSet;
        } catch (ClickHouseUnknownException e) {
            Thread.sleep(100);
            resultSet = connection.createStatement().executeQuery(sql);
            return resultSet;
        }
    }

    public static void insertIQiYi(Connection connection, List<ListenableFuture<String>> futures, String dt, String hour) throws SQLException, InterruptedException {
        PreparedStatement preparedStatement = null;
        try {
            preparedStatement = connection.prepareStatement("INSERT INTO dwh.iqiyi_lahuo_result_daily (dt, hour, device_ids) VALUES (?, ?, ?)");
            for (ListenableFuture<String> future : futures) {
                try {
                    String result = future.get();
                    preparedStatement.setString(1, dt);
                    preparedStatement.setString(2, hour);
                    preparedStatement.setString(3, result);
                    preparedStatement.addBatch();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
            preparedStatement.executeBatch();
            logger.info("INSERT SUCCESS!");
        } catch (ClickHouseUnknownException | NullPointerException e) {
            logger.info("INSERT -->> " + e.getMessage());
            Thread.sleep(100);
            assert preparedStatement != null;
            preparedStatement.executeBatch();
        } finally {
            if (preparedStatement != null) {
                preparedStatement.close();
            }
        }
    }

    public static JSONObject deviceDmpBatch(String deviceIds) {

        final String iqiyiUrl = PropertyUtil.getProperty("config.properties", "dmp.server.url");

        String encryptType = "1";
        String deviceIdType = "1";

        JSONObject requestBody = new JSONObject();
        requestBody.put("deviceIds", deviceIds);
        requestBody.put("encryptType", encryptType);
        requestBody.put("deviceIdType", deviceIdType);

        CloseableHttpClient client = HttpClients.createDefault();

        RequestConfig requestConfig = RequestConfig.custom()
                .setConnectTimeout(1000).setConnectionRequestTimeout(1000)
                .setSocketTimeout(1000).build();

        HttpPost post = new HttpPost(iqiyiUrl);
        post.setHeader("Content-Type", "application/json;charset=utf-8");
        JSONObject jsonObject = new JSONObject();
        HttpResponse response;
        try {
            post.setConfig(requestConfig);
            post.setEntity(new StringEntity(requestBody.toJSONString()));
            response = client.execute(post);
            BufferedReader rd = new BufferedReader(
                    new InputStreamReader(response.getEntity().getContent()));
            StringBuilder result = new StringBuilder();
            String line;
            while ((line = rd.readLine()) != null) {
                result.append(line);
            }
            jsonObject = Constants.String2JSONObject(result.toString());
        } catch (IOException e) {
            logger.info("IOException -->> " + e.getMessage());
        } finally {
            post.abort();
            client.getConnectionManager().shutdown();
            try {
                client.close();
            } catch (IOException e) {
                logger.info("IOException -->> " + e.getMessage());
            }
        }
        return jsonObject;
    }

    public static JSONObject devicePeopleBatch(String deviceIds) {

        final String iqiyiUrl = PropertyUtil.getProperty("config.properties", "iqiyi.lahuo.url");

        String encryptType = "1";
        String deviceIdType = "1";
        /*
        String deviceIds = request.getString("deviceIds");
        if (request.containsKey("encryptType")) {
            encryptType = request.getString("encryptType");
        }
        if (request.containsKey("deviceIdType")) {
            deviceIdType = request.getString("deviceIdType");
        }
        */

        Map<String, String> map = new HashMap<>();
        map.put("jobId", "40");
        map.put("deviceIds", deviceIds);
        map.put("encryptType", encryptType);
        map.put("deviceIdType", deviceIdType);
        String secretKey = "57bdede217ae941222f470d047c3ceac";
        String signResult = sign(map, secretKey);

        JSONObject requestBody = JSONObject.parseObject(JSON.toJSONString(map));
        requestBody.put("sign", signResult);

        //  logger.info("requestBody -->> " + requestBody);

        CloseableHttpClient client = HttpClients.createDefault();

        RequestConfig requestConfig = RequestConfig.custom()
                .setConnectTimeout(3000).setConnectionRequestTimeout(3000)
                .setSocketTimeout(0).build();

        HttpPost post = new HttpPost(iqiyiUrl);
        post.setHeader("Content-Type", "application/json;charset=utf-8");
        JSONObject jsonObject = new JSONObject();
        HttpResponse response;
        try {
            post.setConfig(requestConfig);
            post.setEntity(new StringEntity(requestBody.toJSONString()));
            response = client.execute(post);
            BufferedReader rd = new BufferedReader(
                    new InputStreamReader(response.getEntity().getContent()));
            StringBuilder result = new StringBuilder();
            String line;
            while ((line = rd.readLine()) != null) {
                result.append(line);
            }
            //  logger.info("result -->> " + result.toString());
            jsonObject = Constants.String2JSONObject(result.toString());
        } catch (IOException e) {
            //  logger.info("IOException -->> " + e.getMessage());
        } finally {
            post.abort();
            client.getConnectionManager().shutdown();
            try {
                client.close();
            } catch (IOException e) {
                //  logger.info("IOException -->> " + e.getMessage());
            }
        }
        return jsonObject;
    }

    public static String sign(Map<String, String> params, String secretKey) {
        SortedMap<String, String> sortedParams = new TreeMap<>(params);
        StringBuilder sb = new StringBuilder();
        for (String key : sortedParams.keySet()) {
            String v = sortedParams.get(key);
            sb.append(key).append("=").append(StringUtils.defaultIfEmpty(v,
                    "")).append("&");
        }
        final String data = sb.append(secretKey).toString();
        return DigestUtils.md5Hex(data);
    }
}