OpenApiServiceImpl.java 3.72 KB
Newer Older
shenggui.li committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
package com.reyun.service.impl;

import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.reyun.service.IOpenApiService;
import com.reyun.util.AwsS3Util;
import com.reyun.util.ObjectStorageUtil;

import org.anarres.lzo.LzoAlgorithm;
import org.anarres.lzo.LzoDecompressor;
import org.anarres.lzo.LzoInputStream;
import org.anarres.lzo.LzoLibrary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

/**
 * description:
 *
 * @author nolan
 * @date 13/09/2017
 */
@Service
public class OpenApiServiceImpl
        implements IOpenApiService
{
    private static Logger logger = LoggerFactory.getLogger(OpenApiServiceImpl.class);

    @Override
    public List<Object[]> listInfo4Toutiao(String ds)
    {
        String skey = "warehouse/tkio/tkdm.db/tkdm.today_tt_day/ds=" + ds + "/";

        final AwsS3Util s3Util = AwsS3Util.getInstance();
        List itemList = ObjectStorageUtil.listObjects("reyuntkio", skey);
        if (itemList == null || itemList.size() == 0) {
            return Lists.newArrayList();
        }

        List<Object[]> rtnList = Lists.newArrayList();
        for (int i = 0; i < itemList.size(); i++) {
            String s3key = String.valueOf(itemList.get(i));
            rtnList.addAll(readS3File(s3Util, s3key));
        }
        Collections.sort(rtnList, new Comparator<Object[]>()
        {
            @Override
            public int compare(Object[] o1, Object[] o2)
            {
                int cp = 0;
                cp = Long.valueOf(String.valueOf(o2[1])).compareTo(new Long(String.valueOf(o1[1])));
                if (cp == 0) {
                    cp = Long.valueOf(String.valueOf(o2[2])).compareTo(new Long(String.valueOf(o1[2])));
                }
                if (cp == 0) {
                    cp = Long.valueOf(String.valueOf(o2[3])).compareTo(new Long(String.valueOf(o1[3])));
                }
                return cp;
            }
        });
        return rtnList;
    }

    private List<Object[]> readS3File(AwsS3Util s3Util, String s3key)
    {
        List<Object[]> rtnList = Lists.newArrayList();

        InputStream inputStream = null;
        BufferedReader br = null;
        try {
            final byte[] fileBytes = ObjectStorageUtil.getObjectContentByteArray("reyuntkio", s3key);
            inputStream = new ByteArrayInputStream(fileBytes);
            if (s3key.endsWith(".lzo_deflate")) {
                LzoAlgorithm algorithm = LzoAlgorithm.LZO1X;
                LzoDecompressor deCompressor = LzoLibrary.getInstance().newDecompressor(algorithm, null);
                LzoInputStream stream = new LzoInputStream(inputStream, deCompressor);
                br = new BufferedReader(new InputStreamReader(stream));
            }
            else {
                br = new BufferedReader(new InputStreamReader(inputStream));
            }

            String line = null;
            while ((line = br.readLine()) != null) {
                rtnList.add(Splitter.on("\t").trimResults().splitToList(line).toArray());
            }
        }
        catch (Exception e) {
            logger.error("读取s3文件错误", e);
        }
        finally {
            try {
                if (br != null) {
                    br.close();
                }
                if (inputStream != null) {
                    inputStream.close();
                }
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }
        return rtnList;
    }
}