OpenApiServiceImpl.java 3.71 KB
Newer Older
zhangxiaoyan committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
package common.service.impl;

import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import common.service.IOpenApiService;
import dmp.model.OtPkgDevStats;
import dmp.repository.OtPkgDevStatsRepository;
import org.anarres.lzo.LzoAlgorithm;
import org.anarres.lzo.LzoDecompressor;
import org.anarres.lzo.LzoInputStream;
import org.anarres.lzo.LzoLibrary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import util.AwsS3Util;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

/**
 * description:
 *
 * @author nolan
 * @date 13/09/2017
 */
@Service
public class OpenApiServiceImpl
        implements IOpenApiService
{
    private static Logger logger = LoggerFactory.getLogger(OpenApiServiceImpl.class);

    @Autowired OtPkgDevStatsRepository otPkgDevStatsRepository;

    @Override
    public void listInfo4AppData(String ds)
    {
        String skey = "export/pkg_dev_stats/ds=" + ds + "/";

        final AwsS3Util s3Util = AwsS3Util.getInstance();
        List itemList = AwsS3Util.getInstance().getS3Keys("reyunbpu", skey);
        if (itemList == null || itemList.size() == 0) {
            return;
        }

        for (int i = 0; i < itemList.size(); i++) {
            String s3key = String.valueOf(itemList.get(i));
            readS3File(s3Util, s3key);
        }
    }

    private void readS3File(AwsS3Util s3Util, String s3key)
    {

        InputStream inputStream = null;
        BufferedReader br = null;
        try {
            final byte[] fileBytes = s3Util.downloadBytesFromS3("reyunbpu", s3key);
            inputStream = new ByteArrayInputStream(fileBytes);
            if (s3key.endsWith(".lzo_deflate")) {
                LzoAlgorithm algorithm = LzoAlgorithm.LZO1X;
                LzoDecompressor deCompressor = LzoLibrary.getInstance().newDecompressor(algorithm, null);
                LzoInputStream stream = new LzoInputStream(inputStream, deCompressor);
                br = new BufferedReader(new InputStreamReader(stream));
            }
            else {
                br = new BufferedReader(new InputStreamReader(inputStream));
            }

            String line = null;
            List<OtPkgDevStats> resultList = new ArrayList<>();
            while ((line = br.readLine()) != null) {
//                System.out.println(line);
                Object[] array = Splitter.on("\t").trimResults().splitToList(line).toArray();
                if (array.length == 3) {
                    OtPkgDevStats opds = new OtPkgDevStats();
                    opds.setPkgname(array[0].toString());
                    opds.setDevNum(Long.valueOf(array[1].toString()));
                    opds.setDs(array[2].toString());
                    resultList.add(opds);
                    if (resultList.size() >= 2000) {
                        System.out.println(resultList);
                        otPkgDevStatsRepository.save(resultList);
                        resultList = new ArrayList<>();
                    }
                }
            }
        }
        catch (Exception e) {
            logger.error("读取s3文件错误", e);
        }
        finally {
            try {
                if (br != null) {
                    br.close();
                }
                if (inputStream != null) {
                    inputStream.close();
                }
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}