package mobvista.dmp.datasource.rtdmp;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import mobvista.dmp.common.Constants;
import mobvista.dmp.datasource.rtdmp.entity.KV;
import mobvista.dmp.datasource.rtdmp.entity.Tuple;
import mobvista.dmp.util.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseDataSource;
import ru.yandex.clickhouse.response.ClickHouseResultSet;
import ru.yandex.clickhouse.settings.ClickHouseProperties;

import java.io.*;
import java.net.URISyntaxException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.*;

/**
 * @package: mobvista.dmp.datasource.rtdmp
 * @author: wangjf
 * @date: 2021/8/3
 * @time: 10:59 上午
 * @email: jinfeng.wang@mobvista.com
 */
public class RTDmpFetch {

    private static String baseUrl = PropertyUtil.getProperty("config.properties", "rtdmp.url");

    private static final String[] SET_VALUES = PropertyUtil.getProperty("config.properties", "http.private.host.server.ip").split(",");
    private static final String DRIVER = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.driverClassName");
    private static final String URL = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.url");
    private static final String USERNAME = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.username");
    private static final String PASSWORD = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.password");
    private static final String DATABASE = PropertyUtil.getProperty("config.properties", "datasource.clickhouse.database");
    private static final int TIMEOUT = Integer.parseInt(PropertyUtil.getProperty("config.properties", "datasource.clickhouse.timeout"));

    private static final String bucketName = PropertyUtil.getProperty("config.properties", "aws.s3.bucket");
    private static final String keyName = PropertyUtil.getProperty("config.properties", "aws.s3.key");

    private static final String OUTPUT = PropertyUtil.getProperty("config.properties", "rtdmp.output.dir");

    private static final SimpleDateFormat SDF = new SimpleDateFormat("yyyy-MM-dd HH");

    private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    private static final MySQLUtil mySqlUtil = new MySQLUtil();

    public static final Logger LOGGER = LoggerFactory.getLogger(RTDmpFetch.class);

    public static void main(String[] args) {
        String update_time_start;
        String update_time_end;
        if (args.length >= 2) {
            update_time_start = args[0];
            update_time_end = args[1];
        } else {
            Calendar cal = Calendar.getInstance();
            Date date = new Date();
            cal.setTime(date);
            cal.add(Calendar.DATE, -7);
            update_time_start = DateUtil.format(cal.getTime(), "yyyy-MM-dd ") + "00:00:00";
            cal.setTime(date);
            cal.add(Calendar.DATE, -1);
            update_time_end = DateUtil.format(cal.getTime(), "yyyy-MM-dd ") + "23:59:59";
        }
        String part = DateUtil.format(DateUtil.parse(update_time_end, "yyyy-MM-dd HH:mm:ss"), "yyyyMMddHH");
        JSONArray jsonArray = ruleAudienceInfo(update_time_start, update_time_end);
        jsonArray.sort(Comparator.comparing(obj -> ((JSONObject) obj).getIntValue("id")));

        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("RTDmpFetch-%d").build();

        ExecutorService pool = new ThreadPoolExecutor(40, 60,
                120L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(128), threadFactory, new ThreadPoolExecutor.AbortPolicy());

        for (int i = 0; i < jsonArray.size(); i++) {
            int finalI = i;
            pool.execute(() -> {
                JSONObject audienceObject = jsonArray.getJSONObject(finalI);
                doFetch(audienceObject, part);
            });
        }

        pool.shutdown();
    }

    private static void doFetch(JSONObject audienceObject, String part) {
        int audienceId = audienceObject.getIntValue("id");
        boolean success = fetch(audienceObject, part);
        if (success) {
            LOGGER.info(Thread.currentThread().getName() + " fetch " + audienceId + " success!");
        } else {
            LOGGER.info(Thread.currentThread().getName() + " fetch " + audienceId + " failure!");
        }
    }

    private static boolean fetch(JSONObject jsonObject, String part) {
        long startTime = DateUtil.parse(DateUtil.format(new Date(), "yyyy-MM-dd") + " 00:00:00", "yyyy-MM-dd HH:mm:ss").getTime() / 1000;
        long endTime = DateUtil.parse(DateUtil.format(new Date(), "yyyy-MM-dd") + " 14:00:00", "yyyy-MM-dd HH:mm:ss").getTime() / 1000;

        long start = System.currentTimeMillis();
        int audienceId = jsonObject.getIntValue("id");
        LOGGER.info("checkRules -->> audienceId:" + audienceId + ", jsonObject:" + jsonObject + ", startTime:" + startTime + ", endTime:" + endTime);
        Tuple tuple = checkRules(jsonObject, startTime, endTime);
        if (tuple.getFlag()) {
            KV kv = mySqlUtil.getPartitionTime("dwh", "audience_merge_v1");
            //  retry getPartitionTime
            for (int i = 0; i < 5; i++) {
                if (StringUtils.isBlank(kv.getK())) {
                    kv = mySqlUtil.getPartitionTime("dwh", "audience_merge_v1");
                } else {
                    break;
                }
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if (StringUtils.isBlank(kv.getK())) {
                LOGGER.info("getPartitionTime Blank !!! Please Check.");
                System.exit(-1);
            }
            String dt = DateUtil.format(DateUtil.parse(kv.getK().substring(0, 8), "yyyyMMdd"), "yyyy-MM-dd");
            String hour = kv.getK().substring(8, 10);
            // time of insert partition
            String utime = kv.getV();

            String partTime = dt + " " + hour;

            String newPartTime = SDF.format(new Date(tuple.getUtime() * 1000));

            LOGGER.info("checkPartition -->> audienceId:" + audienceId + ", partTime:" + partTime + ", newPartTime:" + newPartTime);

            while (partTime.compareTo(newPartTime) < 0) {
                long nowTimestamp = DateUtil.parse(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), "yyyy-MM-dd HH:mm:ss").getTime() / 1000;
                LOGGER.info("checkPartition -->> audienceId:" + audienceId + ", nowTimestamp:" + nowTimestamp + ", partTime:" + partTime + ", newPartTime:" + newPartTime + ", sleep 300s");
                if (nowTimestamp > endTime) {
                    break;
                }
                try {
                    Thread.sleep(300000);
                } catch (InterruptedException e) {
                    LOGGER.info(e.getMessage());
                }
                kv = mySqlUtil.getPartitionTime("dwh", "audience_merge_v1");
                dt = DateUtil.format(DateUtil.parse(kv.getK().substring(0, 8), "yyyyMMdd"), "yyyy-MM-dd");
                hour = kv.getK().substring(8, 10);
                utime = kv.getV();

                partTime = dt + " " + hour;
            }

            long nowTime = DateUtil.parse(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), "yyyy-MM-dd HH:mm:ss").getTime() / 1000;
            if (Long.parseLong(utime) > (nowTime - 600) && nowTime < endTime) {
                //  避免后台分片同步导致数据缺失
                try {
                    LOGGER.info("Sleep 10 min.");
                    Thread.sleep(600000);
                } catch (InterruptedException e) {
                    LOGGER.info(e.getMessage());
                }
            }

            ClickHouseProperties properties = new ClickHouseProperties();
            properties.setUser(USERNAME);
            properties.setPassword(PASSWORD);
            properties.setDatabase(DATABASE);
            properties.setSocketTimeout(TIMEOUT);
            properties.setConnectionTimeout(TIMEOUT);

            String[] ips0 = SET_VALUES[new Random().nextInt(3)].split(":");
            int random0 = new Random().nextInt(2);

            String sql = buildSql(dt, hour, jsonObject)
                    .replace("@key", "device_type")
                    .replace("@table", "audience_merge_v1_all") + " GROUP BY device_type";

            LOGGER.info("checkDeviceType -->> audienceId:" + audienceId + ", sql -->>" + sql);
            ClickHouseDataSource dataSource0 = new ClickHouseDataSource(URL.replace("host", ips0[random0]), properties);
            ClickHouseConnection connection0 = null;
            ClickHouseResultSet resultSet0 = null;
            Set<String> devTypeSet = new HashSet<>();
            try {
                connection0 = dataSource0.getConnection();
                resultSet0 = (ClickHouseResultSet) connection0.prepareStatement(sql).executeQuery();
                while (resultSet0.next()) {
                    String deviceType = resultSet0.getString("device_type");
                    devTypeSet.add(deviceType);
                }
            } catch (SQLException e) {
                LOGGER.info("SQLException -->> " + e.getMessage());
            } finally {
                try {
                    connection0.close();
                    resultSet0.close();
                } catch (SQLException throwables) {
                    LOGGER.info(throwables.getMessage());
                }
            }

            sql = buildSql(dt, hour, jsonObject)
                    .replace("@key", "COUNT(1) counts")
                    .replace("@table", "audience_merge_v1_all");
            int counts = 0;
            try {
                connection0 = dataSource0.getConnection();
                resultSet0 = (ClickHouseResultSet) connection0.prepareStatement(sql).executeQuery();
                while (resultSet0.next()) {
                    counts = resultSet0.getInt("counts");
                }
            } catch (SQLException e) {
                LOGGER.info("SQLException -->> " + e.getMessage());
            } finally {
                try {
                    connection0.close();
                    resultSet0.close();
                } catch (SQLException throwables) {
                    LOGGER.info(throwables.getMessage());
                }
            }
            LOGGER.info("checkCount -->> audienceId:" + audienceId + ", sql -->>" + sql + ", count:" + counts);

            for (String devType : devTypeSet) {
                sql = buildSql(dt, hour, jsonObject)
                        .replace("@key", "devid")
                        .replace("@table", "audience_merge_v1") + " AND device_type = '" + devType + "'";

                LOGGER.info("checkDeviceId -->> audienceId:" + audienceId + ",sql -->> " + sql);
                for (int i = 0; i < SET_VALUES.length; i++) {
                    String[] ips = SET_VALUES[i].split(":");
                    int random = new Random().nextInt(2);
                    ClickHouseDataSource dataSource = new ClickHouseDataSource(URL.replace("host", ips[random]), properties);
                    ClickHouseConnection connection;
                    ClickHouseResultSet resultSet;
                    try {
                        connection = dataSource.getConnection();
                        resultSet = (ClickHouseResultSet) connection.prepareStatement(sql).executeQuery();
                    } catch (SQLException e) {
                        LOGGER.info("SQLException -->> " + e.getMessage());
                        return false;
                    }
                    LOGGER.info("audienceId -->> " + audienceId + ",upload start!");
                    try {
                        if (resultSet != null && resultSet.hasNext()) {
                            LOGGER.info("audienceId -->> " + audienceId + ",resultSet is not null");
                        } else {
                            devType = "unknown";
                            LOGGER.info("audienceId -->> " + audienceId + ",resultSet is null");
                            sql = "SELECT 'c4ca4238a0b923820dcc509a6f75849b' devid";
                            try {
                                resultSet = (ClickHouseResultSet) connection.prepareStatement(sql).executeQuery();
                            } catch (SQLException e) {
                                LOGGER.info("SQLException -->> " + e.getMessage());
                                return false;
                            }
                        }
                        multipleFileUpload(resultSet, audienceId, devType, part, i);
                    } catch (SQLException throwables) {
                        LOGGER.info("SQLException -->> " + throwables.getMessage());
                        return false;
                    }
                }
            }
            //  有数据更新即更新,否则不更新
            if (devTypeSet.size() > 0) {
                StringBuilder s3Path = new StringBuilder();
                s3Path.append("s3://").append(bucketName).append("/").append(keyName).append("/")
                        .append(audienceId).append("/").append(part).append("/*/");
                JSONObject updateJSON = new JSONObject();
                updateJSON.put("id", audienceId);
                updateJSON.put("s3_path", s3Path);
                updateJSON.put("status", 1);
                updateJSON.put("audience_data_status", 1);
                updateJSON.put("audience_count", counts);
                JSONArray updateJSONArray = new JSONArray();
                updateJSONArray.add(updateJSON);
                update(updateJSONArray.toJSONString());
            }
        }
        long end = System.currentTimeMillis();
        LOGGER.info("audienceId -->> " + audienceId + ",runtime ==>> " + (end - start));
        return true;
    }

    private static JSONArray ruleAudienceInfo(String startTime, String endTime) {

        Long startTimestamp = null;
        Long endTimestamp = null;
        try {
            startTimestamp = simpleDateFormat.parse(startTime).getTime() / 1000;
            endTimestamp = simpleDateFormat.parse(endTime).getTime() / 1000;
        } catch (ParseException e) {
            e.printStackTrace();
        }

        CloseableHttpClient client = HttpClients.createDefault();
        List<BasicNameValuePair> formparams = new ArrayList<>();

        final String serverUrl = baseUrl + "rtdmp/audience/query";
        URIBuilder uri = new URIBuilder();
        try {
            uri = new URIBuilder(serverUrl)
                    .addParameter("update_time_start", startTime)
                    .addParameter("audience_type", "3")
                    .addParameter("is_offline", "1");
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
        LOGGER.info("ruleAudienceInfo.url: " + uri);

        RequestConfig requestConfig = RequestConfig.custom()
                .setConnectTimeout(5000).setConnectionRequestTimeout(5000)
                .setSocketTimeout(5000).build();

        HttpGet httpGet = new HttpGet();
        try {
            httpGet = new HttpGet(uri.build());
        } catch (URISyntaxException e) {
            LOGGER.info(e.getMessage());
        }

        String key = UUID.randomUUID().toString();
        String token = MD5Util.getMD5Str(key);

        httpGet.setHeader("Auth-System", "dmp");
        httpGet.setHeader("Content-Type", "text/plain");
        httpGet.setHeader("key", key);
        httpGet.setHeader("token", token);

        JSONArray jsonArrayReturn = new JSONArray();
        CloseableHttpResponse response;
        try {
            response = client.execute(httpGet);
            BufferedReader rd = new BufferedReader(
                    new InputStreamReader(response.getEntity().getContent()));
            StringBuilder result = new StringBuilder();
            String line;
            while ((line = rd.readLine()) != null) {
                result.append(line);
            }

            JSONObject jsonObject = Constants.String2JSONObject(result.toString());
            if (jsonObject.getInteger("code") == 200 && jsonObject.containsKey("data")) {
                /*
                for (int i = 0; i < jsonArray.size(); i++) {
                    JSONObject json = jsonArray.getJSONObject(i);
                    if (json.getLongValue("audience_data_utime") >= startTimestamp && json.getLongValue("audience_data_utime") < endTimestamp) {
                        jsonArrayReturn.add(json);
                    }
                }
                */
                jsonArrayReturn = jsonObject.getJSONArray("data");
            }
        } catch (IOException e) {
            LOGGER.info(e.getMessage());
            jsonArrayReturn = new JSONArray();
        } finally {
            httpGet.abort();
        }
        LOGGER.info("ruleAudienceInfo.result: " + jsonArrayReturn);

        return jsonArrayReturn;
    }

    private static void update(String requestBody) {

        LOGGER.info("rtdmp/update -->> requestBody -->> " + requestBody);
        CloseableHttpClient client = HttpClients.createDefault();

        RequestConfig requestConfig = RequestConfig.custom()
                .setConnectTimeout(5000).setConnectionRequestTimeout(5000)
                .setSocketTimeout(5000).build();

        final String serverUrl = baseUrl + "rtdmp/audience/update";
        HttpPut put = new HttpPut(serverUrl);
        String key = UUID.randomUUID().toString();
        String token = MD5Util.getMD5Str(key);

        put.setHeader("key", key);
        put.setHeader("token", token);

        JSONObject jsonObject = new JSONObject();
        HttpResponse response;
        try {
            put.setConfig(requestConfig);
            put.setEntity(new StringEntity(requestBody));
            response = client.execute(put);
            BufferedReader rd = new BufferedReader(
                    new InputStreamReader(response.getEntity().getContent()));
            StringBuilder result = new StringBuilder();
            String line;
            while ((line = rd.readLine()) != null) {
                result.append(line);
            }
            jsonObject = JSON.parseObject(result.toString());
        } catch (IOException e) {
            LOGGER.info("IOException -->> " + e.getMessage());
        } finally {
            put.abort();
        }
        LOGGER.info("rtdmp/update -->> jsonObject -->> " + jsonObject.toJSONString());
    }

    private static Tuple checkRules(JSONObject jsonObject, long startTime, long endTime) {
        JSONArray audienceIds = new JSONArray();
        if (jsonObject.containsKey("group_rules")) {
            JSONArray group_rules = jsonObject.getJSONArray("group_rules");
            for (int i = 0; i < group_rules.size(); i++) {
                audienceIds.addAll(group_rules.getJSONObject(i).getJSONArray("audiences"));
            }
        }
        if (jsonObject.containsKey("audience_rules") && audienceIds.isEmpty()) {
            JSONObject audience_rules = jsonObject.getJSONObject("audience_rules");
            if (audience_rules.containsKey("intersections") && !audience_rules.getJSONArray("intersections").isEmpty()) {
                audienceIds.addAll(audience_rules.getJSONArray("intersections"));
            }
            if (audience_rules.containsKey("union") && !audience_rules.getJSONArray("union").isEmpty()) {
                audienceIds.addAll(audience_rules.getJSONArray("union"));
            }
            if (audience_rules.containsKey("subtraction") && !audience_rules.getJSONArray("subtraction").isEmpty()) {
                audienceIds.addAll(audience_rules.getJSONArray("subtraction"));
            }
        }
        if (audienceIds.size() > 0) {
            long nowTime = DateUtil.parse(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), "yyyy-MM-dd HH:mm:ss").getTime() / 1000;
            while (nowTime < endTime) {
                Tuple tuple = checkSuccess(audienceIds, startTime);
                LOGGER.info("checkSuccess -->> flag:" + tuple.getFlag() + ", utime:" + tuple.getUtime());
                if (tuple.getFlag()) {
                    return tuple;
                }
                try {
                    LOGGER.info("checkRules sleep 300s");
                    Thread.sleep(300000);
                } catch (InterruptedException e) {
                    LOGGER.info(e.getMessage());
                }
                nowTime = DateUtil.parse(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), "yyyy-MM-dd HH:mm:ss").getTime() / 1000;
            }
            return new Tuple(true, nowTime);
        }
        return new Tuple(false, 0L);
    }

    private static String buildSql(String dt, String hour, JSONObject jsonObject) {

        StringBuilder sql = new StringBuilder();
        sql.append("SELECT @key FROM dwh.@table WHERE dt = '").append(dt).append("' AND hour = '").append(hour).append("' AND device_type != 'unknown' ");

        if (jsonObject.containsKey("group_rules")) {
            StringBuilder ruleSql = new StringBuilder();
            JSONArray group_rules = jsonObject.getJSONArray("group_rules");
            for (int i = 0; i < group_rules.size(); i++) {
                JSONObject json = Constants.String2JSONObject(group_rules.getJSONObject(i).toString());
                if (json.getString("inner_op").equals("|")) {
                    ruleSql.insert(0, "hasAny(audience_id," + json.getJSONArray("audiences") + ")");
                } else {
                    ruleSql.insert(0, "hasAll(audience_id," + json.getJSONArray("audiences") + ")");
                }
                if (i + 1 < group_rules.size()) {
                    JSONObject outerJson = Constants.String2JSONObject(group_rules.getJSONObject(i + 1).toString());
                    if (outerJson.getString("outer_op").equals("-")) {
                        ruleSql.insert(0, " AND NOT (").append(")");
                    } else if (outerJson.getString("outer_op").equals("|")) {
                        ruleSql.insert(0, " OR ");
                    } else {
                        ruleSql.insert(0, " AND ");
                    }
                }
            }
            if (StringUtils.isNotBlank(ruleSql)) {
                sql.append("AND (").append(ruleSql).append(")");
            }
        }

        if (jsonObject.containsKey("audience_rules")) {
            JSONObject audience_rules = jsonObject.getJSONObject("audience_rules");
            //  String[] rules = jsonObject.getString("audience_rules_str").split(";", -1);
            StringBuilder ruleSql = new StringBuilder();

            //  hasAll
            if (audience_rules.containsKey("intersections") && audience_rules.getJSONArray("intersections") != null && !audience_rules.getJSONArray("intersections").isEmpty()) {
                ruleSql.append("hasAll(audience_id,").append(audience_rules.getJSONArray("intersections")).append(")");
            }
            //  hasAny
            if (audience_rules.containsKey("union") && audience_rules.getJSONArray("union") != null && !audience_rules.getJSONArray("union").isEmpty()) {
                if (StringUtils.isNotBlank(ruleSql)) {
                    ruleSql.append(" OR hasAny(audience_id,").append(audience_rules.getJSONArray("union")).append(")");
                } else {
                    ruleSql.append("hasAny(audience_id,").append(audience_rules.getJSONArray("union")).append(")");
                }
            }
            if (StringUtils.isNotBlank(ruleSql)) {
                sql.append("AND (").append(ruleSql).append(")");
            }
            //  not hasAny
            if (audience_rules.containsKey("subtraction") && audience_rules.getJSONArray("subtraction") != null && !audience_rules.getJSONArray("subtraction").isEmpty()) {
                sql.append(" AND NOT hasAny(audience_id,").append(audience_rules.getJSONArray("subtraction")).append(")");
            }
        }
        return sql.toString();
    }

    private static Tuple checkSuccess(JSONArray jsonArray, long startTime) {
        Long futime = 0L;
        for (Object o : jsonArray) {
            JSONObject audienceObject = queryById(o.toString());
            Long utime = audienceObject.getLong("audience_data_utime");
            if (utime > futime) {
                futime = utime;
            }
            if (utime < startTime) {
                return new Tuple(false, utime);
            }
        }
        return new Tuple(true, futime);
    }

    private static JSONObject queryById(String id) {

        CloseableHttpClient client = HttpClients.createDefault();
        List<BasicNameValuePair> formparams = new ArrayList<>();

        final String serverUrl = baseUrl + "rtdmp/audience/query";
        URIBuilder uri = new URIBuilder();
        try {
            uri = new URIBuilder(serverUrl)
                    .addParameter("id", id);
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
        RequestConfig requestConfig = RequestConfig.custom()
                .setConnectTimeout(5000).setConnectionRequestTimeout(5000)
                .setSocketTimeout(5000).build();

        HttpGet httpGet = null;
        try {
            httpGet = new HttpGet(uri.build());
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }

        String key = UUID.randomUUID().toString();
        String token = MD5Util.getMD5Str(key);

        httpGet.setHeader("Auth-System", "dmp");
        httpGet.setHeader("Content-Type", "text/plain");
        httpGet.setHeader("key", key);
        httpGet.setHeader("token", token);

        JSONObject res = new JSONObject();
        CloseableHttpResponse response;
        try {
            response = client.execute(httpGet);
            BufferedReader rd = new BufferedReader(
                    new InputStreamReader(response.getEntity().getContent()));
            StringBuilder result = new StringBuilder();
            String line;
            while ((line = rd.readLine()) != null) {
                result.append(line);
            }
            JSONObject jsonObject = Constants.String2JSONObject(result.toString());
            if (jsonObject.getInteger("code") == 200 && jsonObject.containsKey("data") && jsonObject.getJSONArray("data").size() > 0) {
                res = jsonObject.getJSONArray("data").getJSONObject(0);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            httpGet.abort();
        }
        return res;
    }

    private static void multipleFileUpload(ResultSet resultSet, int audienceId, String deviceType, String part, int ip) {

        List<String> filePaths = new ArrayList<>();

        try {
            int i = 0;
            FileWriter fstream = null;
            BufferedWriter out = null;

            int j = 0;
            while (resultSet.next()) {
                String devid = resultSet.getString("devid");
                String fileName = OUTPUT + audienceId + "." + ip + "_" + j + ".txt";
                File file = new File(fileName);
                if (file.exists() && i == 0) {
                    file.delete();
                    try {
                        fstream = new FileWriter(fileName, true);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    out = new BufferedWriter(fstream);
                } else if (!file.exists()) {
                    try {
                        fstream = new FileWriter(fileName, true);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    out = new BufferedWriter(fstream);
                }
                i = i + 1;

                out.write(devid);
                out.newLine();
                if (i == 10000000) {
                    i = 0;
                    out.close();
                    String zipName = OUTPUT + audienceId + "." + ip + "_" + j + ".gz";
                    ZipFileUtil.compress(fileName, zipName);
                    filePaths.add(zipName);
                    //  压缩完后即删除原文件
                    if (file.exists()) {
                        file.delete();
                    }
                    j = j + 1;
                } else if (i <= 10000000 && resultSet.isLast()) {
                    out.close();
                    String zipName = OUTPUT + audienceId + "." + ip + "_" + j + ".gz";
                    ZipFileUtil.compress(fileName, zipName);
                    filePaths.add(zipName);
                    //  压缩完后即删除原文件
                    if (file.exists()) {
                        file.delete();
                    }
                }
            }
        } catch (IOException | SQLException e) {
            LOGGER.info(e.getMessage());
        }

        AwsUploadFileToS3 awsUploadFileToS3 = new AwsUploadFileToS3();
        awsUploadFileToS3.uploadFileList(filePaths, audienceId, deviceType, part);
    }
}