package mobvista.dmp.datasource.ali;

import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import com.alibaba.fastjson.JSONObject;
import com.google.common.util.concurrent.*;
import mobvista.dmp.common.ClickHouseJdbc;
import mobvista.dmp.util.DateUtil;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HTTP;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.except.ClickHouseUnknownException;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;

public class UCRequest {
    public static Logger logger = null;

    static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(100, 180, 500, TimeUnit.MILLISECONDS,
            new LinkedBlockingDeque<>(180), new CustomizableThreadFactory("UCRequest-"), new ThreadPoolExecutor.CallerRunsPolicy());

    private static String dt = DateUtil.format(new Date(), "yyyy-MM-dd");

    public static void main(String[] args) throws InterruptedException,JoranException {
        LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(context);
        context.reset();
        configurator.doConfigure(UCRequest.class.getClassLoader().getResourceAsStream("logback-syslog.xml"));
        logger = context.getLogger("UCRequest");

        int fors = 30;
        int concurrent = 10000;
        String device_type="";
        if (args.length >= 4) {
            fors = Integer.parseInt(args[0]);
            concurrent = Integer.parseInt(args[1]);
            dt = args[2];
            device_type = args[3];
        }
        List<String> crowdCodesList =null;
        //  2021.05.12 调整crowdCodes
        if(device_type.equals("IMEI_MD5")){
             crowdCodesList = Arrays.asList("d3f521a0253bc032f245530c18e47348","304fcdfc329107ce4817e0606dc4bc80");
        }else if(device_type.equals("OAID_MD5")){
             crowdCodesList = Arrays.asList("aff149ab9af9da4d95b4a448f24d922f","4b5a58c23b3bd5e171be0cc430593086");
        }

        long start = System.currentTimeMillis();
        List<ListenableFuture<String>> futures = new CopyOnWriteArrayList<>();


        long min_start = System.currentTimeMillis();
        for (int c = 1; c <= fors; c++) {
            long in_start = System.currentTimeMillis();
            ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(poolExecutor);
            MoreExecutors.addDelayedShutdownHook(listeningExecutor, 5, TimeUnit.SECONDS);
            ClickHouseConnection connection = null;
            ResultSet resultSet = null;
            try {
                connection = ClickHouseJdbc.connectionByTime(c);
            } catch (SQLException e) {
                try {
                    connection = ClickHouseJdbc.connectionByTime(c);
                } catch (SQLException ex) {
                    logger.info("ClickHouse Connection Failure!");
                }
            }
            String date = DateUtil.getDayByString(dt, "yyyy-MM-dd", -1);
            try {
                assert connection != null;
                resultSet = query(connection, date, device_type,concurrent * (c - 1), concurrent);
            } catch (SQLException e) {
                logger.info("ClickHouse Query Failure!");
            }
            List<String> resultList = new CopyOnWriteArrayList<>();
            try {
                while (resultSet.next()) {
                    String device_ids = resultSet.getString("device_ids");
                    String finalDevice_type = device_type;
                    List<String> finalCrowdCodesList = crowdCodesList;
                    ListenableFuture listenableFuture = listeningExecutor.submit(() -> {
                        String[] split = device_ids.split(",");
                        List<String> uidsList = Arrays.asList(split);
                        return PostJSONSetTimeout(uidsList, finalDevice_type, finalCrowdCodesList);
                    });

                    Futures.addCallback(listenableFuture, new FutureCallback<String>() {
                        @Override
                        public void onSuccess(String result) {
                            resultList.add(result);
                        }

                        @Override
                        public void onFailure(Throwable t) {
                            resultList.add(new JSONObject().toJSONString());
                        }
                    });
                    futures.add(listenableFuture);
                }
            } catch (SQLException e) {
                logger.info("ClickHouse Execute Failure!");
            }
            long min_end = System.currentTimeMillis();
            logger.info("Times -->> " + c + ", Runtime -->> " + (min_end - in_start));
            if (c % 10 == 0) {
                try {
                    logger.info("Times -->> " + c / 10 + ", ClickHouse begin Insert~~");
                    String datetime = DateUtil.format(new Date(), "yyyy-MM-dd HH");
                    insertUC(connection, futures, datetime.substring(0, 10), datetime.substring(11, 13), device_type);
                    logger.info("Times -->> " + c / 10 + ", ClickHouse Insert Success! Size -->> " + futures.size());
                } catch (SQLException e) {
                    logger.info("ClickHouse Insert Failure!");
                }
                futures = new CopyOnWriteArrayList<>();
                min_end = System.currentTimeMillis();
                logger.info("ClickHouse Insert Success! Times -->> " + c / 10 + ", Runtime -->> " + (min_end - min_start));
                min_start = System.currentTimeMillis();
            }
        }
        poolExecutor.shutdown();
        long end = System.currentTimeMillis();
        logger.info("all runtime -->> " + (end - start));

    }

    public static ResultSet query(Connection connection, String dt,String device_type, int offSet, int size) throws SQLException, InterruptedException {
        ResultSet resultSet;
        String sql = "SELECT device_ids FROM dmp.uc_lahuo_daily_all WHERE dt = '@dt' and device_type = '@device_type' LIMIT @offSet, @size";
        sql = sql.replace("@dt", dt).replace("@device_type", device_type).replace("@offSet", String.valueOf(offSet)).replace("@size", String.valueOf(size));
        try {
            resultSet = connection.createStatement().executeQuery(sql);
            return resultSet;
        } catch (ClickHouseUnknownException e) {
            Thread.sleep(100);
            resultSet = connection.createStatement().executeQuery(sql);
            return resultSet;
        }
    }

    public static void insertUC(Connection connection, List<ListenableFuture<String>> futures, String dt, String hour,String device_type) throws SQLException, InterruptedException {
        PreparedStatement preparedStatement = null;
        try {
            preparedStatement = connection.prepareStatement("INSERT INTO dmp.uc_lahuo_result_daily (dt, hour, device_type,device_ids) VALUES (?, ?, ?, ?)");
            logger.info("INSERT begin===");
            for (ListenableFuture<String> future : futures) {
                try {
                    String result = future.get();
                    preparedStatement.setString(1, dt);
                    preparedStatement.setString(2, hour);
                    preparedStatement.setString(3, device_type);
                    preparedStatement.setString(4, result);
                    preparedStatement.addBatch();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
            preparedStatement.executeBatch();
            logger.info("INSERT SUCCESS!");
        } catch (ClickHouseUnknownException | NullPointerException e) {
            logger.info("INSERT -->> " + e.getMessage());
            Thread.sleep(100);
            assert preparedStatement != null;
            preparedStatement.executeBatch();
        } finally {
            if (preparedStatement != null) {
                preparedStatement.close();
            }
        }
    }

    public static String PostJSON( List<String> uidsList,String uidType,List<String> crowdCodesList) {

        String url = "https://ugp.uc.cn/crowd/batch/v2";
        JSONObject json = new JSONObject();
//        List<String> uidsList = Arrays.asList("00200072f915c1ca9dee20cedeb275a5", "00019c55395e6f73ccae5b5ef3aec28c", "0015685ac69cd22edf8e8ebe1e9cd71f", "00156dcfe3263865123d0aace1643688","002b3c08245db0c7c70742c4480d3d72");
//        List<String> crowdCodesList = Arrays.asList("d3f521a0253bc032f245530c18e47348", "4b5a58c23b3bd5e171be0cc430593086","33c7777291f475dd2e3d15872a3b9c76","304fcdfc329107ce4817e0606dc4bc80");
        json.put("uids", uidsList);
        json.put("uidType", uidType);
        json.put("crowdCodes", crowdCodesList);
        json.put("product", "UC");
        json.put("channel", "Mintegral");
//        HttpClient client = new DefaultHttpClient();
//        HttpClient client = HttpClientBuilder.create().build();
        CloseableHttpClient client = HttpClients.createDefault();
        RequestConfig requestConfig = RequestConfig.custom()
                .setConnectTimeout(4000).setConnectionRequestTimeout(4000)
                .setSocketTimeout(4000).build();
        HttpPost post = new HttpPost(url);
        post.setHeader("Content-Type", "application/json");
        String result = "";
        try {
            StringEntity s = new StringEntity(json.toString(), "utf-8");
            s.setContentEncoding(new BasicHeader(HTTP.CONTENT_TYPE,
                    "application/json"));
            post.setConfig(requestConfig);
            post.setEntity(s);
            // 发送请求
            HttpResponse httpResponse = client.execute(post);
            // 获取响应输入流
            InputStream inStream = httpResponse.getEntity().getContent();
            BufferedReader reader = new BufferedReader(new InputStreamReader(
                    inStream, "utf-8"));
            StringBuilder strber = new StringBuilder();
            String line = null;
            while ((line = reader.readLine()) != null)
                strber.append(line);
            inStream.close();
            result = strber.toString();
        } catch (Exception e) {
            System.out.println("请求异常");
            throw new RuntimeException(e);
        }finally{
            post.abort();
            try {
                client.close();
            } catch (IOException e) {
                logger.info("IOException -->> " + e.getMessage());
            }
        }
        return result;
    }

    public static String PostJSONSetTimeout( List<String> uidsList, String uidType, List<String> crowdCodesList) {
        String url = "https://ugp.uc.cn/crowd/batch/v2";
        JSONObject requestBody = new JSONObject();
//        List<String> crowdCodesList = Arrays.asList("d3f521a0253bc032f245530c18e47348","4b5a58c23b3bd5e171be0cc430593086","33c7777291f475dd2e3d15872a3b9c76","304fcdfc329107ce4817e0606dc4bc80");
        requestBody.put("uids", uidsList);
        requestBody.put("uidType", uidType);
        requestBody.put("crowdCodes", crowdCodesList);
        requestBody.put("product", "UC");
        requestBody.put("channel", "Mintegral");

        CloseableHttpClient client = HttpClients.createDefault();
        RequestConfig requestConfig = RequestConfig.custom()
                .setConnectTimeout(4000).setConnectionRequestTimeout(4000)
                .setSocketTimeout(4000).build();

        HttpPost post = new HttpPost(url);
        post.setHeader("Content-Type", "application/json;charset=utf-8");
        HttpResponse response;
        StringBuilder result = new StringBuilder();
        try {
            post.setConfig(requestConfig);
            post.setEntity(new StringEntity(requestBody.toJSONString()));
            response = client.execute(post);
            BufferedReader rd = new BufferedReader(
                    new InputStreamReader(response.getEntity().getContent()));
            String line;
            while ((line = rd.readLine()) != null) {
                result.append(line);
            }
        } catch (IOException e) {
//            e.printStackTrace();
//            屏蔽错误信息输出语句,提高运行速度
//            System.out.println("IOException="+e.getMessage());
        } finally {
            post.abort();
            try {
                client.close();
            } catch (IOException e) {
                logger.info("IOException -->> " + e.getMessage());
            }
        }
        return result.toString();
    }

}