1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package common.service.impl;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import common.service.IOpenApiService;
import dmp.model.OtPkgDevStats;
import dmp.repository.OtPkgDevStatsRepository;
import org.anarres.lzo.LzoAlgorithm;
import org.anarres.lzo.LzoDecompressor;
import org.anarres.lzo.LzoInputStream;
import org.anarres.lzo.LzoLibrary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import util.AwsS3Util;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
/**
* description:
*
* @author nolan
* @date 13/09/2017
*/
@Service
public class OpenApiServiceImpl
implements IOpenApiService
{
private static Logger logger = LoggerFactory.getLogger(OpenApiServiceImpl.class);
@Autowired OtPkgDevStatsRepository otPkgDevStatsRepository;
@Override
public void listInfo4AppData(String ds)
{
String skey = "export/pkg_dev_stats/ds=" + ds + "/";
final AwsS3Util s3Util = AwsS3Util.getInstance();
List itemList = AwsS3Util.getInstance().getS3Keys("reyunbpu", skey);
if (itemList == null || itemList.size() == 0) {
return;
}
for (int i = 0; i < itemList.size(); i++) {
String s3key = String.valueOf(itemList.get(i));
readS3File(s3Util, s3key);
}
}
private void readS3File(AwsS3Util s3Util, String s3key)
{
InputStream inputStream = null;
BufferedReader br = null;
try {
final byte[] fileBytes = s3Util.downloadBytesFromS3("reyunbpu", s3key);
inputStream = new ByteArrayInputStream(fileBytes);
if (s3key.endsWith(".lzo_deflate")) {
LzoAlgorithm algorithm = LzoAlgorithm.LZO1X;
LzoDecompressor deCompressor = LzoLibrary.getInstance().newDecompressor(algorithm, null);
LzoInputStream stream = new LzoInputStream(inputStream, deCompressor);
br = new BufferedReader(new InputStreamReader(stream));
}
else {
br = new BufferedReader(new InputStreamReader(inputStream));
}
String line = null;
List<OtPkgDevStats> resultList = new ArrayList<>();
while ((line = br.readLine()) != null) {
// System.out.println(line);
Object[] array = Splitter.on("\t").trimResults().splitToList(line).toArray();
if (array.length == 3) {
OtPkgDevStats opds = new OtPkgDevStats();
opds.setPkgname(array[0].toString());
opds.setDevNum(Long.valueOf(array[1].toString()));
opds.setDs(array[2].toString());
resultList.add(opds);
if (resultList.size() >= 2000) {
System.out.println(resultList);
otPkgDevStatsRepository.save(resultList);
resultList = new ArrayList<>();
}
}
}
}
catch (Exception e) {
logger.error("读取s3文件错误", e);
}
finally {
try {
if (br != null) {
br.close();
}
if (inputStream != null) {
inputStream.close();
}
}
catch (IOException e) {
e.printStackTrace();
}
}
}
}