package common.service.impl; import com.google.common.base.Splitter; import com.google.common.collect.Lists; import common.service.IOpenApiService; import dmp.model.OtPkgDevStats; import dmp.repository.OtPkgDevStatsRepository; import org.anarres.lzo.LzoAlgorithm; import org.anarres.lzo.LzoDecompressor; import org.anarres.lzo.LzoInputStream; import org.anarres.lzo.LzoLibrary; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import util.AwsS3Util; import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; /** * description: * * @author nolan * @date 13/09/2017 */ @Service public class OpenApiServiceImpl implements IOpenApiService { private static Logger logger = LoggerFactory.getLogger(OpenApiServiceImpl.class); @Autowired OtPkgDevStatsRepository otPkgDevStatsRepository; @Override public void listInfo4AppData(String ds) { String skey = "export/pkg_dev_stats/ds=" + ds + "/"; final AwsS3Util s3Util = AwsS3Util.getInstance(); List itemList = AwsS3Util.getInstance().getS3Keys("reyunbpu", skey); if (itemList == null || itemList.size() == 0) { return; } for (int i = 0; i < itemList.size(); i++) { String s3key = String.valueOf(itemList.get(i)); readS3File(s3Util, s3key); } } private void readS3File(AwsS3Util s3Util, String s3key) { InputStream inputStream = null; BufferedReader br = null; try { final byte[] fileBytes = s3Util.downloadBytesFromS3("reyunbpu", s3key); inputStream = new ByteArrayInputStream(fileBytes); if (s3key.endsWith(".lzo_deflate")) { LzoAlgorithm algorithm = LzoAlgorithm.LZO1X; LzoDecompressor deCompressor = LzoLibrary.getInstance().newDecompressor(algorithm, null); LzoInputStream stream = new LzoInputStream(inputStream, deCompressor); br = new BufferedReader(new InputStreamReader(stream)); } else { br = new BufferedReader(new InputStreamReader(inputStream)); } String line = null; List<OtPkgDevStats> resultList = new ArrayList<>(); while ((line = br.readLine()) != null) { // System.out.println(line); Object[] array = Splitter.on("\t").trimResults().splitToList(line).toArray(); if (array.length == 3) { OtPkgDevStats opds = new OtPkgDevStats(); opds.setPkgname(array[0].toString()); opds.setDevNum(Long.valueOf(array[1].toString())); opds.setDs(array[2].toString()); resultList.add(opds); if (resultList.size() >= 2000) { System.out.println(resultList); otPkgDevStatsRepository.save(resultList); resultList = new ArrayList<>(); } } } } catch (Exception e) { logger.error("读取s3文件错误", e); } finally { try { if (br != null) { br.close(); } if (inputStream != null) { inputStream.close(); } } catch (IOException e) { e.printStackTrace(); } } } }