Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
M
mobvista-dmp
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
王金锋
mobvista-dmp
Commits
730e68c1
Commit
730e68c1
authored
Aug 03, 2021
by
WangJinfeng
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add rtdmp into dmp project
parent
d5349c12
Hide whitespace changes
Inline
Side-by-side
Showing
14 changed files
with
1029 additions
and
5 deletions
+1029
-5
adn_install_total_v2.sh
azkaban/adn/package/adn_install_total_v2.sh
+4
-0
adn_request_other_install_v2.sh
azkaban/adn/package/adn_request_other_install_v2.sh
+4
-0
adn_request_pkg_total_v2.sh
azkaban/adn/package/adn_request_pkg_total_v2.sh
+4
-0
rtdmp_alter.sh
azkaban/rtdmp/rtdmp_alter.sh
+2
-2
rtdmp_fetch.job
azkaban/rtdmp/rtdmp_fetch.job
+2
-2
rtdmp_fetch_v2.sh
azkaban/rtdmp/rtdmp_fetch_v2.sh
+19
-0
pom.xml
pom.xml
+5
-0
RTDmpFetch.java
src/main/java/mobvista/dmp/datasource/rtdmp/RTDmpFetch.java
+570
-0
Tuple.java
...main/java/mobvista/dmp/datasource/rtdmp/entity/Tuple.java
+29
-0
AwsUploadFileToS3.java
src/main/java/mobvista/dmp/util/AwsUploadFileToS3.java
+68
-0
XferMgrProgress.java
src/main/java/mobvista/dmp/util/XferMgrProgress.java
+250
-0
ZipFileUtil.java
src/main/java/mobvista/dmp/util/ZipFileUtil.java
+64
-0
config.properties
src/main/resources/config.properties
+7
-0
logback.xml
src/main/resources/logback.xml
+1
-1
No files found.
azkaban/adn/package/adn_install_total_v2.sh
View file @
730e68c1
...
...
@@ -25,6 +25,10 @@ hadoop jar ../../${JAR} mobvista.dmp.main.ParseInstallRCFile \
-Dmapreduce
.fileoutputcommitter.algorithm.version
=
2
\
$INPUT_PATH
$OUTPUT_PATH
${
REDUCE_NUM
}
if
[
$?
-ne
0
]
;
then
exit
255
fi
:
'
business="adn_install"
...
...
azkaban/adn/package/adn_request_other_install_v2.sh
View file @
730e68c1
...
...
@@ -24,6 +24,10 @@ hadoop jar ../../${JAR} mobvista.dmp.main.ParseInstallRCFile \
-Dmapreduce
.fileoutputcommitter.algorithm.version
=
2
\
$INPUT_PATH
$OUTPUT_PATH
${
REDUCE_NUM
}
if
[
$?
-ne
0
]
;
then
exit
255
fi
:
'
business="adn_request_other"
...
...
azkaban/adn/package/adn_request_pkg_total_v2.sh
View file @
730e68c1
...
...
@@ -25,6 +25,10 @@ hadoop jar ../../${JAR} mobvista.dmp.main.ParseInstallRCFile \
-Dmapreduce
.fileoutputcommitter.algorithm.version
=
2
\
$INPUT_PATH
$OUTPUT_PATH
${
REDUCE_NUM
}
if
[
$?
-ne
0
]
;
then
exit
255
fi
:
'
business="adn_request_sdk"
...
...
azkaban/rtdmp/rtdmp_alter.sh
View file @
730e68c1
...
...
@@ -4,8 +4,8 @@ source ../dmp_env.sh
today
=
${
ScheduleTime
:-
$1
}
start_time
=
$(
date
+
"%Y-%m-%d
%H:00:00"
-d
"-168 hour
s
$today
"
)
end_time
=
$(
date
+
"%Y-%m-%d
%H:59:59"
-d
"-1 hour
s
$today
"
)
start_time
=
$(
date
+
"%Y-%m-%d
00:00:00"
-d
"-7 day
s
$today
"
)
end_time
=
$(
date
+
"%Y-%m-%d
23:59:59"
-d
"-1 day
s
$today
"
)
java
-cp
../
${
JAR
}
mobvista.dmp.datasource.rtdmp.RTDmpAlter
"
${
start_time
}
"
"
${
end_time
}
"
...
...
azkaban/rtdmp/rtdmp_fetch.job
View file @
730e68c1
type=command
command=bash -x rtdmp_fetch.sh
\ No newline at end of file
command=bash -x rtdmp_fetch_v2.sh
\ No newline at end of file
azkaban/rtdmp/rtdmp_fetch_v2.sh
0 → 100644
View file @
730e68c1
#!/bin/bash
source
../dmp_env.sh
today
=
${
ScheduleTime
:-
$1
}
start_time
=
$(
date
+
"%Y-%m-%d 00:00:00"
-d
"-7 days
$today
"
)
end_time
=
$(
date
+
"%Y-%m-%d 23:59:59"
-d
"-1 days
$today
"
)
if
[[
!
-d
"/home/hadoop/wangjf/DmpServer/data"
]]
;
then
mkdir
-p
/home/hadoop/wangjf/DmpServer/data
fi
java
-cp
../
${
JAR
}
mobvista.dmp.datasource.rtdmp.RTDmpFetch
"
${
start_time
}
"
"
${
end_time
}
"
if
[[
$?
-ne
0
]]
;
then
exit
255
fi
\ No newline at end of file
pom.xml
View file @
730e68c1
...
...
@@ -288,6 +288,11 @@
<artifactId>
alipay-sdk-java
</artifactId>
<version>
4.10.192.ALL
</version>
</dependency>
<dependency>
<groupId>
com.amazonaws
</groupId>
<artifactId>
aws-java-sdk-s3
</artifactId>
<version>
1.11.437
</version>
</dependency>
</dependencies>
<build>
...
...
src/main/java/mobvista/dmp/datasource/rtdmp/RTDmpFetch.java
0 → 100644
View file @
730e68c1
package
mobvista
.
dmp
.
datasource
.
rtdmp
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONArray
;
import
com.alibaba.fastjson.JSONObject
;
import
mobvista.dmp.common.MobvistaConstant
;
import
mobvista.dmp.datasource.rtdmp.entity.Tuple
;
import
mobvista.dmp.util.*
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.http.HttpResponse
;
import
org.apache.http.client.config.RequestConfig
;
import
org.apache.http.client.methods.CloseableHttpResponse
;
import
org.apache.http.client.methods.HttpGet
;
import
org.apache.http.client.methods.HttpPut
;
import
org.apache.http.client.utils.URIBuilder
;
import
org.apache.http.entity.StringEntity
;
import
org.apache.http.impl.client.CloseableHttpClient
;
import
org.apache.http.impl.client.HttpClients
;
import
org.apache.http.message.BasicNameValuePair
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
ru.yandex.clickhouse.ClickHouseConnection
;
import
ru.yandex.clickhouse.ClickHouseDataSource
;
import
ru.yandex.clickhouse.response.ClickHouseResultSet
;
import
ru.yandex.clickhouse.settings.ClickHouseProperties
;
import
java.io.*
;
import
java.net.URISyntaxException
;
import
java.sql.ResultSet
;
import
java.sql.SQLException
;
import
java.text.SimpleDateFormat
;
import
java.util.*
;
/**
* @package: mobvista.dmp.datasource.rtdmp
* @author: wangjf
* @date: 2021/8/3
* @time: 10:59 上午
* @email: jinfeng.wang@mobvista.com
*/
public
class
RTDmpFetch
{
private
static
String
baseUrl
=
PropertyUtil
.
getProperty
(
"config.properties"
,
"rtdmp.url"
);
private
static
final
String
[]
SET_VALUES
=
PropertyUtil
.
getProperty
(
"config.properties"
,
"http.private.host.server.ip"
).
split
(
","
);
private
static
final
String
DRIVER
=
PropertyUtil
.
getProperty
(
"config.properties"
,
"datasource.clickhouse.driverClassName"
);
private
static
final
String
URL
=
PropertyUtil
.
getProperty
(
"config.properties"
,
"datasource.clickhouse.url"
);
private
static
final
String
USERNAME
=
PropertyUtil
.
getProperty
(
"config.properties"
,
"datasource.clickhouse.username"
);
private
static
final
String
PASSWORD
=
PropertyUtil
.
getProperty
(
"config.properties"
,
"datasource.clickhouse.password"
);
private
static
final
String
DATABASE
=
PropertyUtil
.
getProperty
(
"config.properties"
,
"datasource.clickhouse.database"
);
private
static
final
int
TIMEOUT
=
Integer
.
parseInt
(
PropertyUtil
.
getProperty
(
"config.properties"
,
"datasource.clickhouse.timeout"
));
private
static
final
String
bucketName
=
PropertyUtil
.
getProperty
(
"config.properties"
,
"aws.s3.bucket"
);
private
static
final
String
keyName
=
PropertyUtil
.
getProperty
(
"config.properties"
,
"aws.s3.key"
);
private
static
final
String
OUTPUT
=
PropertyUtil
.
getProperty
(
"config.properties"
,
"rtdmp.output.dir"
);
private
static
final
SimpleDateFormat
SDF
=
new
SimpleDateFormat
(
"yyyy-MM-dd HH"
);
private
static
final
MySQLUtil
mySqlUtil
=
new
MySQLUtil
();
public
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
RTDmpFetch
.
class
);
public
static
void
main
(
String
[]
args
)
{
String
update_time_start
;
String
update_time_end
;
if
(
args
.
length
>=
2
)
{
update_time_start
=
args
[
0
];
update_time_end
=
args
[
1
];
}
else
{
Calendar
cal
=
Calendar
.
getInstance
();
Date
date
=
new
Date
();
cal
.
setTime
(
date
);
cal
.
add
(
Calendar
.
DATE
,
-
7
);
update_time_start
=
DateUtil
.
format
(
cal
.
getTime
(),
"yyyy-MM-dd "
)
+
"00:00:00"
;
cal
.
setTime
(
date
);
cal
.
add
(
Calendar
.
DATE
,
-
1
);
update_time_end
=
DateUtil
.
format
(
cal
.
getTime
(),
"yyyy-MM-dd "
)
+
"23:59:59"
;
}
String
part
=
DateUtil
.
format
(
DateUtil
.
parse
(
update_time_end
,
"yyyy-MM-dd HH:mm:ss"
),
"yyyyMMddHH"
);
JSONArray
jsonArray
=
ruleAudienceInfo
(
update_time_start
,
update_time_end
);
jsonArray
.
sort
(
Comparator
.
comparing
(
obj
->
((
JSONObject
)
obj
).
getIntValue
(
"id"
)));
for
(
int
i
=
0
;
i
<
jsonArray
.
size
();
i
++)
{
JSONObject
audienceObject
=
jsonArray
.
getJSONObject
(
i
);
doFetch
(
audienceObject
,
part
);
}
}
private
static
void
doFetch
(
JSONObject
audienceObject
,
String
part
)
{
int
audienceId
=
audienceObject
.
getIntValue
(
"id"
);
boolean
success
=
fetch
(
audienceObject
,
part
);
if
(
success
)
{
LOGGER
.
info
(
Thread
.
currentThread
().
getName
()
+
" fetch "
+
audienceId
+
" success!"
);
}
else
{
LOGGER
.
info
(
Thread
.
currentThread
().
getName
()
+
" fetch "
+
audienceId
+
" failure!"
);
}
}
private
static
boolean
fetch
(
JSONObject
jsonObject
,
String
part
)
{
long
startTime
=
DateUtil
.
parse
(
DateUtil
.
format
(
new
Date
(),
"yyyy-MM-dd"
)
+
" 00:00:00"
,
"yyyy-MM-dd HH:mm:ss"
).
getTime
()
/
1000
;
long
endTime
=
DateUtil
.
parse
(
DateUtil
.
format
(
new
Date
(),
"yyyy-MM-dd"
)
+
" 14:00:00"
,
"yyyy-MM-dd HH:mm:ss"
).
getTime
()
/
1000
;
long
start
=
System
.
currentTimeMillis
();
int
audienceId
=
jsonObject
.
getIntValue
(
"id"
);
LOGGER
.
info
(
"checkRules -->> audienceId:"
+
audienceId
+
", jsonObject:"
+
jsonObject
+
", startTime:"
+
startTime
+
", endTime:"
+
endTime
);
Tuple
tuple
=
checkRules
(
jsonObject
,
startTime
,
endTime
);
if
(
tuple
.
getFlag
())
{
String
partition
=
mySqlUtil
.
getLastPartition
(
"dwh"
,
"audience_merge"
);
String
dt
=
DateUtil
.
format
(
DateUtil
.
parse
(
partition
.
substring
(
0
,
8
),
"yyyyMMdd"
),
"yyyy-MM-dd"
);
String
hour
=
partition
.
substring
(
8
,
10
);
String
partTime
=
dt
+
" "
+
hour
;
String
newPartTime
=
SDF
.
format
(
new
Date
(
tuple
.
getUtime
()
*
1000
));
LOGGER
.
info
(
"checkPartition -->> audienceId:"
+
audienceId
+
", partTime:"
+
partTime
+
", newPartTime:"
+
newPartTime
);
while
(
partTime
.
compareTo
(
newPartTime
)
<
0
)
{
long
nowTimestamp
=
DateUtil
.
parse
(
DateUtil
.
format
(
new
Date
(),
"yyyy-MM-dd HH:mm:ss"
),
"yyyy-MM-dd HH:mm:ss"
).
getTime
()
/
1000
;
LOGGER
.
info
(
"checkPartition -->> audienceId:"
+
audienceId
+
", nowTimestamp:"
+
nowTimestamp
+
", partTime:"
+
partTime
+
", newPartTime:"
+
newPartTime
+
", sleep 300s"
);
if
(
nowTimestamp
>
endTime
)
{
break
;
}
try
{
Thread
.
sleep
(
300000
);
}
catch
(
InterruptedException
e
)
{
LOGGER
.
info
(
e
.
getMessage
());
}
partition
=
mySqlUtil
.
getLastPartition
(
"dwh"
,
"audience_merge"
);
dt
=
DateUtil
.
format
(
DateUtil
.
parse
(
partition
.
substring
(
0
,
8
),
"yyyyMMdd"
),
"yyyy-MM-dd"
);
hour
=
partition
.
substring
(
8
,
10
);
partTime
=
dt
+
" "
+
hour
;
}
long
nowTime
=
DateUtil
.
parse
(
DateUtil
.
format
(
new
Date
(),
"yyyy-MM-dd HH:mm:ss"
),
"yyyy-MM-dd HH:mm:ss"
).
getTime
()
/
1000
;
if
(
tuple
.
getUtime
()
>
(
nowTime
-
7200
)
&&
nowTime
<
endTime
)
{
// 避免后台分片同步导致数据缺失
try
{
LOGGER
.
info
(
"Sleep 10 min."
);
Thread
.
sleep
(
600000
);
}
catch
(
InterruptedException
e
)
{
LOGGER
.
info
(
e
.
getMessage
());
}
}
ClickHouseProperties
properties
=
new
ClickHouseProperties
();
properties
.
setUser
(
USERNAME
);
properties
.
setPassword
(
PASSWORD
);
properties
.
setDatabase
(
DATABASE
);
properties
.
setSocketTimeout
(
TIMEOUT
);
properties
.
setConnectionTimeout
(
TIMEOUT
);
String
[]
ips0
=
SET_VALUES
[
new
Random
().
nextInt
(
3
)].
split
(
":"
);
int
random0
=
new
Random
().
nextInt
(
2
);
String
sql
=
buildSql
(
dt
,
hour
,
jsonObject
)
.
replace
(
"@key"
,
"device_type"
)
.
replace
(
"@table"
,
"audience_merge_all"
)
+
" GROUP BY device_type"
;
LOGGER
.
info
(
"checkDeviceType -->> audienceId:"
+
audienceId
+
", sql -->>"
+
sql
);
ClickHouseDataSource
dataSource0
=
new
ClickHouseDataSource
(
URL
.
replace
(
"host"
,
ips0
[
random0
]),
properties
);
ClickHouseConnection
connection0
=
null
;
ClickHouseResultSet
resultSet0
=
null
;
Set
<
String
>
devTypeSet
=
new
HashSet
<>();
try
{
connection0
=
dataSource0
.
getConnection
();
resultSet0
=
(
ClickHouseResultSet
)
connection0
.
prepareStatement
(
sql
).
executeQuery
();
while
(
resultSet0
.
next
())
{
String
deviceType
=
resultSet0
.
getString
(
"device_type"
);
devTypeSet
.
add
(
deviceType
);
}
}
catch
(
SQLException
e
)
{
LOGGER
.
info
(
"SQLException -->> "
+
e
.
getMessage
());
}
finally
{
try
{
connection0
.
close
();
resultSet0
.
close
();
}
catch
(
SQLException
throwables
)
{
LOGGER
.
info
(
throwables
.
getMessage
());
}
}
sql
=
buildSql
(
dt
,
hour
,
jsonObject
)
.
replace
(
"@key"
,
"COUNT(1) counts"
)
.
replace
(
"@table"
,
"audience_merge_all"
);
int
counts
=
0
;
try
{
connection0
=
dataSource0
.
getConnection
();
resultSet0
=
(
ClickHouseResultSet
)
connection0
.
prepareStatement
(
sql
).
executeQuery
();
while
(
resultSet0
.
next
())
{
counts
=
resultSet0
.
getInt
(
"counts"
);
}
}
catch
(
SQLException
e
)
{
LOGGER
.
info
(
"SQLException -->> "
+
e
.
getMessage
());
}
finally
{
try
{
connection0
.
close
();
resultSet0
.
close
();
}
catch
(
SQLException
throwables
)
{
LOGGER
.
info
(
throwables
.
getMessage
());
}
}
LOGGER
.
info
(
"checkCount -->> audienceId:"
+
audienceId
+
", sql -->>"
+
sql
+
", count:"
+
counts
);
for
(
String
devType
:
devTypeSet
)
{
sql
=
buildSql
(
dt
,
hour
,
jsonObject
)
.
replace
(
"@key"
,
"devid"
)
.
replace
(
"@table"
,
"audience_merge"
)
+
" AND device_type = '"
+
devType
+
"'"
;
LOGGER
.
info
(
"checkDeviceId -->> audienceId:"
+
audienceId
+
",sql -->> "
+
sql
);
for
(
int
i
=
0
;
i
<
SET_VALUES
.
length
;
i
++)
{
String
[]
ips
=
SET_VALUES
[
i
].
split
(
":"
);
int
random
=
new
Random
().
nextInt
(
2
);
ClickHouseDataSource
dataSource
=
new
ClickHouseDataSource
(
URL
.
replace
(
"host"
,
ips
[
random
]),
properties
);
ClickHouseConnection
connection
;
ClickHouseResultSet
resultSet
;
try
{
connection
=
dataSource
.
getConnection
();
resultSet
=
(
ClickHouseResultSet
)
connection
.
prepareStatement
(
sql
).
executeQuery
();
}
catch
(
SQLException
e
)
{
LOGGER
.
info
(
"SQLException -->> "
+
e
.
getMessage
());
return
false
;
}
LOGGER
.
info
(
"audienceId -->> "
+
audienceId
+
",upload start!"
);
try
{
if
(
resultSet
!=
null
&&
resultSet
.
hasNext
())
{
LOGGER
.
info
(
"audienceId -->> "
+
audienceId
+
",resultSet is not null"
);
}
else
{
devType
=
"unknown"
;
LOGGER
.
info
(
"audienceId -->> "
+
audienceId
+
",resultSet is null"
);
sql
=
"SELECT 'c4ca4238a0b923820dcc509a6f75849b' devid"
;
try
{
resultSet
=
(
ClickHouseResultSet
)
connection
.
prepareStatement
(
sql
).
executeQuery
();
}
catch
(
SQLException
e
)
{
LOGGER
.
info
(
"SQLException -->> "
+
e
.
getMessage
());
return
false
;
}
}
multipleFileUpload
(
resultSet
,
audienceId
,
devType
,
part
,
i
);
}
catch
(
SQLException
throwables
)
{
LOGGER
.
info
(
"SQLException -->> "
+
throwables
.
getMessage
());
return
false
;
}
}
}
// 有数据更新即更新,否则不更新
if
(
devTypeSet
.
size
()
>
0
)
{
StringBuilder
s3Path
=
new
StringBuilder
();
s3Path
.
append
(
"s3://"
).
append
(
bucketName
).
append
(
"/"
).
append
(
keyName
).
append
(
"/"
)
.
append
(
audienceId
).
append
(
"/"
).
append
(
part
).
append
(
"/*/"
);
JSONObject
updateJSON
=
new
JSONObject
();
updateJSON
.
put
(
"id"
,
audienceId
);
updateJSON
.
put
(
"s3_path"
,
s3Path
);
updateJSON
.
put
(
"status"
,
1
);
updateJSON
.
put
(
"audience_data_status"
,
1
);
updateJSON
.
put
(
"audience_count"
,
counts
);
JSONArray
updateJSONArray
=
new
JSONArray
();
updateJSONArray
.
add
(
updateJSON
);
update
(
updateJSONArray
.
toJSONString
());
}
}
long
end
=
System
.
currentTimeMillis
();
LOGGER
.
info
(
"audienceId -->> "
+
audienceId
+
",runtime ==>> "
+
(
end
-
start
));
return
true
;
}
private
static
JSONArray
ruleAudienceInfo
(
String
startTime
,
String
endTime
)
{
CloseableHttpClient
client
=
HttpClients
.
createDefault
();
List
<
BasicNameValuePair
>
formparams
=
new
ArrayList
<>();
final
String
serverUrl
=
baseUrl
+
"rtdmp/audience/query"
;
URIBuilder
uri
=
new
URIBuilder
();
try
{
uri
=
new
URIBuilder
(
serverUrl
)
.
addParameter
(
"update_time_start"
,
startTime
)
.
addParameter
(
"update_time_end"
,
endTime
)
.
addParameter
(
"audience_type"
,
"3"
)
.
addParameter
(
"is_offline"
,
"1"
);
}
catch
(
URISyntaxException
e
)
{
e
.
printStackTrace
();
}
LOGGER
.
info
(
"ruleAudienceInfo.url: "
+
uri
);
RequestConfig
requestConfig
=
RequestConfig
.
custom
()
.
setConnectTimeout
(
5000
).
setConnectionRequestTimeout
(
5000
)
.
setSocketTimeout
(
5000
).
build
();
HttpGet
httpGet
=
new
HttpGet
();
try
{
httpGet
=
new
HttpGet
(
uri
.
build
());
}
catch
(
URISyntaxException
e
)
{
LOGGER
.
info
(
e
.
getMessage
());
}
String
key
=
UUID
.
randomUUID
().
toString
();
String
token
=
MD5Util
.
getMD5Str
(
key
);
httpGet
.
setHeader
(
"Auth-System"
,
"dmp"
);
httpGet
.
setHeader
(
"Content-Type"
,
"text/plain"
);
httpGet
.
setHeader
(
"key"
,
key
);
httpGet
.
setHeader
(
"token"
,
token
);
JSONArray
jsonArray
=
new
JSONArray
();
CloseableHttpResponse
response
;
try
{
response
=
client
.
execute
(
httpGet
);
BufferedReader
rd
=
new
BufferedReader
(
new
InputStreamReader
(
response
.
getEntity
().
getContent
()));
StringBuilder
result
=
new
StringBuilder
();
String
line
;
while
((
line
=
rd
.
readLine
())
!=
null
)
{
result
.
append
(
line
);
}
JSONObject
jsonObject
=
MobvistaConstant
.
String2JSONObject
(
result
.
toString
());
if
(
jsonObject
.
getInteger
(
"code"
)
==
200
&&
jsonObject
.
containsKey
(
"data"
))
{
jsonArray
=
jsonObject
.
getJSONArray
(
"data"
);
}
}
catch
(
IOException
e
)
{
LOGGER
.
info
(
e
.
getMessage
());
jsonArray
=
new
JSONArray
();
}
finally
{
httpGet
.
abort
();
}
LOGGER
.
info
(
"ruleAudienceInfo.result: "
+
jsonArray
);
return
jsonArray
;
}
private
static
void
update
(
String
requestBody
)
{
LOGGER
.
info
(
"rtdmp/update -->> requestBody -->> "
+
requestBody
);
CloseableHttpClient
client
=
HttpClients
.
createDefault
();
RequestConfig
requestConfig
=
RequestConfig
.
custom
()
.
setConnectTimeout
(
5000
).
setConnectionRequestTimeout
(
5000
)
.
setSocketTimeout
(
5000
).
build
();
final
String
serverUrl
=
baseUrl
+
"rtdmp/audience/update"
;
HttpPut
put
=
new
HttpPut
(
serverUrl
);
String
key
=
UUID
.
randomUUID
().
toString
();
String
token
=
MD5Util
.
getMD5Str
(
key
);
put
.
setHeader
(
"key"
,
key
);
put
.
setHeader
(
"token"
,
token
);
JSONObject
jsonObject
=
new
JSONObject
();
HttpResponse
response
;
try
{
put
.
setConfig
(
requestConfig
);
put
.
setEntity
(
new
StringEntity
(
requestBody
));
response
=
client
.
execute
(
put
);
BufferedReader
rd
=
new
BufferedReader
(
new
InputStreamReader
(
response
.
getEntity
().
getContent
()));
StringBuilder
result
=
new
StringBuilder
();
String
line
;
while
((
line
=
rd
.
readLine
())
!=
null
)
{
result
.
append
(
line
);
}
jsonObject
=
JSON
.
parseObject
(
result
.
toString
());
}
catch
(
IOException
e
)
{
LOGGER
.
info
(
"IOException -->> "
+
e
.
getMessage
());
}
finally
{
put
.
abort
();
}
LOGGER
.
info
(
"rtdmp/update -->> jsonObject -->> "
+
jsonObject
.
toJSONString
());
}
private
static
Tuple
checkRules
(
JSONObject
jsonObject
,
long
startTime
,
long
endTime
)
{
JSONArray
audienceIds
=
new
JSONArray
();
if
(
jsonObject
.
containsKey
(
"audience_rules"
))
{
JSONObject
audience_rules
=
jsonObject
.
getJSONObject
(
"audience_rules"
);
if
(
audience_rules
.
containsKey
(
"intersections"
))
{
audienceIds
.
addAll
(
audience_rules
.
getJSONArray
(
"intersections"
));
}
if
(
audience_rules
.
containsKey
(
"union"
))
{
audienceIds
.
addAll
(
audience_rules
.
getJSONArray
(
"union"
));
}
if
(
audience_rules
.
containsKey
(
"subtraction"
))
{
audienceIds
.
addAll
(
audience_rules
.
getJSONArray
(
"subtraction"
));
}
}
if
(
audienceIds
.
size
()
>
0
)
{
long
nowTime
=
DateUtil
.
parse
(
DateUtil
.
format
(
new
Date
(),
"yyyy-MM-dd HH:mm:ss"
),
"yyyy-MM-dd HH:mm:ss"
).
getTime
()
/
1000
;
while
(
nowTime
<
endTime
)
{
Tuple
tuple
=
checkSuccess
(
audienceIds
,
startTime
);
LOGGER
.
info
(
"checkSuccess -->> flag:"
+
tuple
.
getFlag
()
+
", utime:"
+
tuple
.
getUtime
());
if
(
tuple
.
getFlag
())
{
return
tuple
;
}
try
{
LOGGER
.
info
(
"checkRules sleep 300s"
);
Thread
.
sleep
(
300000
);
}
catch
(
InterruptedException
e
)
{
LOGGER
.
info
(
e
.
getMessage
());
}
nowTime
=
DateUtil
.
parse
(
DateUtil
.
format
(
new
Date
(),
"yyyy-MM-dd HH:mm:ss"
),
"yyyy-MM-dd HH:mm:ss"
).
getTime
()
/
1000
;
}
return
new
Tuple
(
true
,
nowTime
);
}
return
new
Tuple
(
false
,
0L
);
}
private
static
String
buildSql
(
String
dt
,
String
hour
,
JSONObject
jsonObject
)
{
StringBuilder
sql
=
new
StringBuilder
();
sql
.
append
(
"SELECT @key FROM dwh.@table WHERE dt = '"
).
append
(
dt
).
append
(
"' AND hour = '"
).
append
(
hour
).
append
(
"' AND device_type != 'unknown' "
);
if
(
jsonObject
.
containsKey
(
"audience_rules"
))
{
JSONObject
audience_rules
=
jsonObject
.
getJSONObject
(
"audience_rules"
);
// String[] rules = jsonObject.getString("audience_rules_str").split(";", -1);
StringBuilder
ruleSql
=
new
StringBuilder
();
// hasAll
if
(
audience_rules
.
containsKey
(
"intersections"
)
&&
!
audience_rules
.
getJSONArray
(
"intersections"
).
isEmpty
())
{
ruleSql
.
append
(
"hasAll(audience_id,"
).
append
(
audience_rules
.
getJSONArray
(
"intersections"
)).
append
(
")"
);
}
// hasAny
if
(
audience_rules
.
containsKey
(
"union"
)
&&
!
audience_rules
.
getJSONArray
(
"union"
).
isEmpty
())
{
if
(
StringUtils
.
isNotBlank
(
ruleSql
))
{
ruleSql
.
append
(
" OR hasAny(audience_id,"
).
append
(
audience_rules
.
getJSONArray
(
"union"
)).
append
(
")"
);
}
else
{
ruleSql
.
append
(
"hasAny(audience_id,"
).
append
(
audience_rules
.
getJSONArray
(
"union"
)).
append
(
")"
);
}
}
if
(
StringUtils
.
isNotBlank
(
ruleSql
))
{
sql
.
append
(
"AND ("
).
append
(
ruleSql
).
append
(
")"
);
}
// not hasAny
if
(
audience_rules
.
containsKey
(
"subtraction"
)
&&
!
audience_rules
.
getJSONArray
(
"subtraction"
).
isEmpty
())
{
sql
.
append
(
" AND NOT hasAny(audience_id,"
).
append
(
audience_rules
.
getJSONArray
(
"subtraction"
)).
append
(
")"
);
}
}
return
sql
.
toString
();
}
private
static
Tuple
checkSuccess
(
JSONArray
jsonArray
,
long
startTime
)
{
Long
futime
=
0L
;
for
(
Object
o
:
jsonArray
)
{
JSONObject
audienceObject
=
queryById
(
o
.
toString
());
Long
utime
=
audienceObject
.
getLong
(
"audience_data_utime"
);
if
(
utime
>
futime
)
{
futime
=
utime
;
}
if
(
utime
<
startTime
)
{
return
new
Tuple
(
false
,
utime
);
}
}
return
new
Tuple
(
true
,
futime
);
}
private
static
JSONObject
queryById
(
String
id
)
{
CloseableHttpClient
client
=
HttpClients
.
createDefault
();
List
<
BasicNameValuePair
>
formparams
=
new
ArrayList
<>();
final
String
serverUrl
=
baseUrl
+
"rtdmp/audience/query"
;
URIBuilder
uri
=
new
URIBuilder
();
try
{
uri
=
new
URIBuilder
(
serverUrl
)
.
addParameter
(
"id"
,
id
);
}
catch
(
URISyntaxException
e
)
{
e
.
printStackTrace
();
}
RequestConfig
requestConfig
=
RequestConfig
.
custom
()
.
setConnectTimeout
(
5000
).
setConnectionRequestTimeout
(
5000
)
.
setSocketTimeout
(
5000
).
build
();
HttpGet
httpGet
=
null
;
try
{
httpGet
=
new
HttpGet
(
uri
.
build
());
}
catch
(
URISyntaxException
e
)
{
e
.
printStackTrace
();
}
String
key
=
UUID
.
randomUUID
().
toString
();
String
token
=
MD5Util
.
getMD5Str
(
key
);
httpGet
.
setHeader
(
"Auth-System"
,
"dmp"
);
httpGet
.
setHeader
(
"Content-Type"
,
"text/plain"
);
httpGet
.
setHeader
(
"key"
,
key
);
httpGet
.
setHeader
(
"token"
,
token
);
JSONObject
res
=
new
JSONObject
();
CloseableHttpResponse
response
;
try
{
response
=
client
.
execute
(
httpGet
);
BufferedReader
rd
=
new
BufferedReader
(
new
InputStreamReader
(
response
.
getEntity
().
getContent
()));
StringBuilder
result
=
new
StringBuilder
();
String
line
;
while
((
line
=
rd
.
readLine
())
!=
null
)
{
result
.
append
(
line
);
}
JSONObject
jsonObject
=
MobvistaConstant
.
String2JSONObject
(
result
.
toString
());
if
(
jsonObject
.
getInteger
(
"code"
)
==
200
&&
jsonObject
.
containsKey
(
"data"
)
&&
jsonObject
.
getJSONArray
(
"data"
).
size
()
>
0
)
{
res
=
jsonObject
.
getJSONArray
(
"data"
).
getJSONObject
(
0
);
}
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
finally
{
httpGet
.
abort
();
}
return
res
;
}
private
static
void
multipleFileUpload
(
ResultSet
resultSet
,
int
audienceId
,
String
deviceType
,
String
part
,
int
ip
)
{
List
<
String
>
filePaths
=
new
ArrayList
<>();
try
{
int
i
=
0
;
FileWriter
fstream
=
null
;
BufferedWriter
out
=
null
;
int
j
=
0
;
while
(
resultSet
.
next
())
{
String
devid
=
resultSet
.
getString
(
"devid"
);
String
fileName
=
OUTPUT
+
audienceId
+
"."
+
ip
+
"_"
+
j
+
".txt"
;
File
file
=
new
File
(
fileName
);
if
(
file
.
exists
()
&&
i
==
0
)
{
file
.
delete
();
try
{
fstream
=
new
FileWriter
(
fileName
,
true
);
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
out
=
new
BufferedWriter
(
fstream
);
}
else
if
(!
file
.
exists
())
{
try
{
fstream
=
new
FileWriter
(
fileName
,
true
);
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
out
=
new
BufferedWriter
(
fstream
);
}
i
=
i
+
1
;
out
.
write
(
devid
);
out
.
newLine
();
if
(
i
==
10000000
)
{
i
=
0
;
out
.
close
();
String
zipName
=
OUTPUT
+
audienceId
+
"."
+
ip
+
"_"
+
j
+
".gz"
;
ZipFileUtil
.
compress
(
fileName
,
zipName
);
filePaths
.
add
(
zipName
);
// 压缩完后即删除原文件
if
(
file
.
exists
())
{
file
.
delete
();
}
j
=
j
+
1
;
}
else
if
(
i
<=
10000000
&&
resultSet
.
isLast
())
{
out
.
close
();
String
zipName
=
OUTPUT
+
audienceId
+
"."
+
ip
+
"_"
+
j
+
".gz"
;
ZipFileUtil
.
compress
(
fileName
,
zipName
);
filePaths
.
add
(
zipName
);
// 压缩完后即删除原文件
if
(
file
.
exists
())
{
file
.
delete
();
}
}
}
}
catch
(
IOException
|
SQLException
e
)
{
LOGGER
.
info
(
e
.
getMessage
());
}
AwsUploadFileToS3
awsUploadFileToS3
=
new
AwsUploadFileToS3
();
awsUploadFileToS3
.
uploadFileList
(
filePaths
,
audienceId
,
deviceType
,
part
);
}
}
src/main/java/mobvista/dmp/datasource/rtdmp/entity/Tuple.java
0 → 100644
View file @
730e68c1
package
mobvista
.
dmp
.
datasource
.
rtdmp
.
entity
;
public
class
Tuple
{
private
Boolean
flag
;
private
Long
utime
;
public
Boolean
getFlag
()
{
return
flag
;
}
public
void
setFlag
(
Boolean
flag
)
{
this
.
flag
=
flag
;
}
public
Long
getUtime
()
{
return
utime
;
}
public
void
setUtime
(
Long
utime
)
{
this
.
utime
=
utime
;
}
public
Tuple
(
Boolean
flag
,
Long
utime
)
{
this
.
flag
=
flag
;
this
.
utime
=
utime
;
}
}
src/main/java/mobvista/dmp/util/AwsUploadFileToS3.java
0 → 100644
View file @
730e68c1
package
mobvista
.
dmp
.
util
;
import
com.amazonaws.AmazonServiceException
;
import
com.amazonaws.SdkClientException
;
import
com.amazonaws.auth.AWSCredentials
;
import
com.amazonaws.auth.AWSStaticCredentialsProvider
;
import
com.amazonaws.auth.BasicAWSCredentials
;
import
com.amazonaws.regions.Regions
;
import
com.amazonaws.services.s3.AmazonS3
;
import
com.amazonaws.services.s3.AmazonS3ClientBuilder
;
import
com.amazonaws.services.s3.transfer.MultipleFileUpload
;
import
com.amazonaws.services.s3.transfer.TransferManager
;
import
com.amazonaws.services.s3.transfer.TransferManagerBuilder
;
import
java.io.File
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* @package: mobvista.dmp.util
* @author: wangjf
* @date: 2021/8/3
* @time: 11:22 上午
* @email: jinfeng.wang@mobvista.com
*/
public
class
AwsUploadFileToS3
{
private
static
Regions
clientRegion
=
Regions
.
US_EAST_1
;
private
static
AWSCredentials
credentials
=
new
BasicAWSCredentials
(
PropertyUtil
.
getProperty
(
"config.properties"
,
"aws.accessKey"
),
PropertyUtil
.
getProperty
(
"config.properties"
,
"aws.secretKey"
)
);
private
static
AmazonS3
s3Client
=
AmazonS3ClientBuilder
.
standard
()
.
withRegion
(
clientRegion
)
.
withCredentials
(
new
AWSStaticCredentialsProvider
(
credentials
))
.
build
();
public
void
uploadFileList
(
List
<
String
>
file_paths
,
Integer
jobId
,
String
deviceType
,
String
part
)
{
String
bucketName
=
PropertyUtil
.
getProperty
(
"config.properties"
,
"aws.s3.bucket"
);
String
keyName
=
PropertyUtil
.
getProperty
(
"config.properties"
,
"aws.s3.key"
)
+
"/"
+
jobId
+
"/"
+
part
+
"/"
+
deviceType
;
ArrayList
<
File
>
files
=
new
ArrayList
<
File
>();
for
(
String
path
:
file_paths
)
{
files
.
add
(
new
File
(
path
));
}
TransferManager
xfer_mgr
=
TransferManagerBuilder
.
standard
()
.
withS3Client
(
s3Client
)
.
build
();
try
{
MultipleFileUpload
xfer
=
xfer_mgr
.
uploadFileList
(
bucketName
,
keyName
,
new
File
(
PropertyUtil
.
getProperty
(
"config.properties"
,
"rtdmp.output.dir"
)),
files
);
XferMgrProgress
.
showTransferProgress
(
xfer
);
XferMgrProgress
.
waitForCompletion
(
xfer
);
}
catch
(
AmazonServiceException
e
)
{
System
.
err
.
println
(
e
.
getErrorMessage
());
System
.
exit
(
1
);
}
xfer_mgr
.
shutdownNow
(
false
);
for
(
String
path
:
file_paths
)
{
File
deleteFile
=
new
File
(
path
);
// 上传完完后即删除原文件
if
(
deleteFile
.
exists
())
{
deleteFile
.
delete
();
}
}
}
}
src/main/java/mobvista/dmp/util/XferMgrProgress.java
0 → 100644
View file @
730e68c1
package
mobvista
.
dmp
.
util
;
import
com.amazonaws.AmazonClientException
;
import
com.amazonaws.AmazonServiceException
;
import
com.amazonaws.event.ProgressEvent
;
import
com.amazonaws.event.ProgressListener
;
import
com.amazonaws.services.s3.transfer.*
;
import
java.io.File
;
import
java.util.ArrayList
;
import
java.util.Collection
;
/**
* @package: mobvista.dmp.util
* @author: wangjf
* @date: 2021/8/3
* @time: 11:26 上午
* @email: jinfeng.wang@mobvista.com
*/
public
class
XferMgrProgress
{
// waits for the transfer to complete, catching any exceptions that occur.
public
static
void
waitForCompletion
(
Transfer
xfer
)
{
// snippet-start:[s3.java1.s3_xfer_mgr_progress.wait_for_transfer]
try
{
xfer
.
waitForCompletion
();
}
catch
(
AmazonServiceException
e
)
{
System
.
err
.
println
(
"Amazon service error: "
+
e
.
getMessage
());
System
.
exit
(
1
);
}
catch
(
AmazonClientException
e
)
{
System
.
err
.
println
(
"Amazon client error: "
+
e
.
getMessage
());
System
.
exit
(
1
);
}
catch
(
InterruptedException
e
)
{
System
.
err
.
println
(
"Transfer interrupted: "
+
e
.
getMessage
());
System
.
exit
(
1
);
}
// snippet-end:[s3.java1.s3_xfer_mgr_progress.wait_for_transfer]
}
// Prints progress while waiting for the transfer to finish.
public
static
void
showTransferProgress
(
Transfer
xfer
)
{
// snippet-start:[s3.java1.s3_xfer_mgr_progress.poll]
// print the transfer's human-readable description
System
.
out
.
println
(
xfer
.
getDescription
());
// print an empty progress bar...
printProgressBar
(
0.0
);
// update the progress bar while the xfer is ongoing.
do
{
try
{
Thread
.
sleep
(
100
);
}
catch
(
InterruptedException
e
)
{
return
;
}
// Note: so_far and total aren't used, they're just for
// documentation purposes.
TransferProgress
progress
=
xfer
.
getProgress
();
long
so_far
=
progress
.
getBytesTransferred
();
long
total
=
progress
.
getTotalBytesToTransfer
();
double
pct
=
progress
.
getPercentTransferred
();
eraseProgressBar
();
printProgressBar
(
pct
);
}
while
(
xfer
.
isDone
()
==
false
);
// print the final state of the transfer.
Transfer
.
TransferState
xfer_state
=
xfer
.
getState
();
System
.
out
.
println
(
": "
+
xfer_state
);
// snippet-end:[s3.java1.s3_xfer_mgr_progress.poll]
}
// Prints progress of a multiple file upload while waiting for it to finish.
public
static
void
showMultiUploadProgress
(
MultipleFileUpload
multi_upload
)
{
// print the upload's human-readable description
System
.
out
.
println
(
multi_upload
.
getDescription
());
// snippet-start:[s3.java1.s3_xfer_mgr_progress.substranferes]
Collection
<?
extends
Upload
>
sub_xfers
=
new
ArrayList
<
Upload
>();
sub_xfers
=
multi_upload
.
getSubTransfers
();
do
{
System
.
out
.
println
(
"\nSubtransfer progress:\n"
);
for
(
Upload
u
:
sub_xfers
)
{
System
.
out
.
println
(
" "
+
u
.
getDescription
());
if
(
u
.
isDone
())
{
Transfer
.
TransferState
xfer_state
=
u
.
getState
();
System
.
out
.
println
(
" "
+
xfer_state
);
}
else
{
TransferProgress
progress
=
u
.
getProgress
();
double
pct
=
progress
.
getPercentTransferred
();
printProgressBar
(
pct
);
System
.
out
.
println
();
}
}
// wait a bit before the next update.
try
{
Thread
.
sleep
(
200
);
}
catch
(
InterruptedException
e
)
{
return
;
}
}
while
(
multi_upload
.
isDone
()
==
false
);
// print the final state of the transfer.
Transfer
.
TransferState
xfer_state
=
multi_upload
.
getState
();
System
.
out
.
println
(
"\nMultipleFileUpload "
+
xfer_state
);
// snippet-end:[s3.java1.s3_xfer_mgr_progress.substranferes]
}
// prints a simple text progressbar: [##### ]
public
static
void
printProgressBar
(
double
pct
)
{
// if bar_size changes, then change erase_bar (in eraseProgressBar) to
// match.
final
int
bar_size
=
40
;
final
String
empty_bar
=
" "
;
final
String
filled_bar
=
"########################################"
;
int
amt_full
=
(
int
)
(
bar_size
*
(
pct
/
100.0
));
System
.
out
.
format
(
" [%s%s]"
,
filled_bar
.
substring
(
0
,
amt_full
),
empty_bar
.
substring
(
0
,
bar_size
-
amt_full
));
}
// erases the progress bar.
public
static
void
eraseProgressBar
()
{
// erase_bar is bar_size (from printProgressBar) + 4 chars.
final
String
erase_bar
=
"\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b"
;
System
.
out
.
format
(
erase_bar
);
}
public
static
void
uploadFileWithListener
(
String
file_path
,
String
bucket_name
,
String
key_prefix
,
boolean
pause
)
{
System
.
out
.
println
(
"file: "
+
file_path
+
(
pause
?
" (pause)"
:
""
));
String
key_name
=
null
;
if
(
key_prefix
!=
null
)
{
key_name
=
key_prefix
+
'/'
+
file_path
;
}
else
{
key_name
=
file_path
;
}
// snippet-start:[s3.java1.s3_xfer_mgr_progress.progress_listener]
File
f
=
new
File
(
file_path
);
TransferManager
xfer_mgr
=
TransferManagerBuilder
.
standard
().
build
();
try
{
Upload
u
=
xfer_mgr
.
upload
(
bucket_name
,
key_name
,
f
);
// print an empty progress bar...
printProgressBar
(
0.0
);
u
.
addProgressListener
(
new
ProgressListener
()
{
public
void
progressChanged
(
ProgressEvent
e
)
{
double
pct
=
e
.
getBytesTransferred
()
*
100.0
/
e
.
getBytes
();
eraseProgressBar
();
printProgressBar
(
pct
);
}
});
// block with Transfer.waitForCompletion()
XferMgrProgress
.
waitForCompletion
(
u
);
// print the final state of the transfer.
Transfer
.
TransferState
xfer_state
=
u
.
getState
();
System
.
out
.
println
(
": "
+
xfer_state
);
}
catch
(
AmazonServiceException
e
)
{
System
.
err
.
println
(
e
.
getErrorMessage
());
System
.
exit
(
1
);
}
xfer_mgr
.
shutdownNow
();
// snippet-end:[s3.java1.s3_xfer_mgr_progress.progress_listener]
}
public
static
void
uploadDirWithSubprogress
(
String
dir_path
,
String
bucket_name
,
String
key_prefix
,
boolean
recursive
,
boolean
pause
)
{
System
.
out
.
println
(
"directory: "
+
dir_path
+
(
recursive
?
" (recursive)"
:
""
)
+
(
pause
?
" (pause)"
:
""
));
TransferManager
xfer_mgr
=
new
TransferManager
();
try
{
MultipleFileUpload
multi_upload
=
xfer_mgr
.
uploadDirectory
(
bucket_name
,
key_prefix
,
new
File
(
dir_path
),
recursive
);
// loop with Transfer.isDone()
XferMgrProgress
.
showMultiUploadProgress
(
multi_upload
);
// or block with Transfer.waitForCompletion()
XferMgrProgress
.
waitForCompletion
(
multi_upload
);
}
catch
(
AmazonServiceException
e
)
{
System
.
err
.
println
(
e
.
getErrorMessage
());
System
.
exit
(
1
);
}
xfer_mgr
.
shutdownNow
();
}
public
static
void
main
(
String
[]
args
)
{
final
String
USAGE
=
"\n"
+
"Usage:\n"
+
" XferMgrProgress [--recursive] [--pause] <s3_path> <local_path>\n\n"
+
"Where:\n"
+
" --recursive - Only applied if local_path is a directory.\n"
+
" Copies the contents of the directory recursively.\n\n"
+
" --pause - Attempt to pause+resume the upload. This may not work for\n"
+
" small files.\n\n"
+
" s3_path - The S3 destination (bucket/path) to upload the file(s) to.\n\n"
+
" local_path - The path to a local file or directory path to upload to S3.\n\n"
+
"Examples:\n"
+
" XferMgrProgress public_photos/cat_happy.png my_photos/funny_cat.png\n"
+
" XferMgrProgress public_photos my_photos/cat_sad.png\n"
+
" XferMgrProgress public_photos my_photos\n\n"
;
if
(
args
.
length
<
2
)
{
System
.
out
.
println
(
USAGE
);
System
.
exit
(
1
);
}
int
cur_arg
=
0
;
boolean
recursive
=
false
;
boolean
pause
=
false
;
// first, parse any switches
while
(
args
[
cur_arg
].
startsWith
(
"--"
))
{
if
(
args
[
cur_arg
].
equals
(
"--recursive"
))
{
recursive
=
true
;
}
else
if
(
args
[
cur_arg
].
equals
(
"--pause"
))
{
pause
=
true
;
}
else
{
System
.
out
.
println
(
"Unknown argument: "
+
args
[
cur_arg
]);
System
.
out
.
println
(
USAGE
);
System
.
exit
(
1
);
}
cur_arg
+=
1
;
}
// only the first '/' character is of interest to get the bucket name.
// Subsequent ones are part of the key name.
String
[]
s3_path
=
args
[
cur_arg
].
split
(
"/"
,
2
);
cur_arg
+=
1
;
String
bucket_name
=
s3_path
[
0
];
String
key_prefix
=
null
;
if
(
s3_path
.
length
>
1
)
{
key_prefix
=
s3_path
[
1
];
}
String
local_path
=
args
[
cur_arg
];
// check to see if local path is a directory or file...
File
f
=
new
File
(
args
[
cur_arg
]);
if
(
f
.
exists
()
==
false
)
{
System
.
out
.
println
(
"Input path doesn't exist: "
+
args
[
cur_arg
]);
System
.
exit
(
1
);
}
else
if
(
f
.
isDirectory
())
{
uploadDirWithSubprogress
(
local_path
,
bucket_name
,
key_prefix
,
recursive
,
pause
);
}
else
{
uploadFileWithListener
(
local_path
,
bucket_name
,
key_prefix
,
pause
);
}
}
}
src/main/java/mobvista/dmp/util/ZipFileUtil.java
0 → 100644
View file @
730e68c1
package
mobvista
.
dmp
.
util
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.io.*
;
import
java.util.zip.GZIPOutputStream
;
import
java.util.zip.ZipEntry
;
import
java.util.zip.ZipOutputStream
;
/**
* @package: mobvista.dmp.util
* @author: wangjf
* @date: 2021/8/3
* @time: 11:21 上午
* @email: jinfeng.wang@mobvista.com
*/
public
class
ZipFileUtil
{
public
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
ZipFileUtil
.
class
);
/**
* 压缩单个文件
*/
public
static
void
zipFile
(
String
filePath
,
String
zipPath
)
{
try
{
File
file
=
new
File
(
filePath
);
File
zipFile
=
new
File
(
zipPath
);
InputStream
input
=
new
FileInputStream
(
file
);
ZipOutputStream
zipOut
=
new
ZipOutputStream
(
new
FileOutputStream
(
zipFile
));
zipOut
.
putNextEntry
(
new
ZipEntry
(
file
.
getName
()));
int
temp
=
0
;
while
((
temp
=
input
.
read
())
!=
-
1
)
{
zipOut
.
write
(
temp
);
}
input
.
close
();
zipOut
.
close
();
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
public
static
void
compress
(
String
filePath
,
String
gzipPath
)
{
byte
[]
buffer
=
new
byte
[
1024
];
try
{
// Specify Name and Path of Output GZip file here
GZIPOutputStream
gos
=
new
GZIPOutputStream
(
new
FileOutputStream
(
gzipPath
));
// Specify the input file here
FileInputStream
fis
=
new
FileInputStream
(
filePath
);
// Read from input file and write to output GZip file
int
length
;
// fis.read(buffer), 结果时Buffer有了内容,同时返回读取内容的长度,读到文件末尾时读取内容的长度变为-1
while
((
length
=
fis
.
read
(
buffer
))
>
0
)
{
gos
.
write
(
buffer
,
0
,
length
);
}
fis
.
close
();
gos
.
finish
();
gos
.
close
();
System
.
out
.
println
(
filePath
+
",File Compressed!"
);
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
}
}
src/main/resources/config.properties
View file @
730e68c1
...
...
@@ -77,6 +77,13 @@ rtdmp.noauto.business=dsp_req,btop
rtdmp.auto.business
=
ali_activation
aws.s3.bucket
=
mob-emr-test
aws.s3.key
=
dataplatform/fmp/data/rtdmp
aws.accessKey
=
AKIA35FGARBHBMBRCBHG
aws.secretKey
=
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
rtdmp.output.dir
=
/home/hadoop/wangjf/DmpServer/data/
# alipay_activation,alipay_acquisition,uc_activation,iqiyi_activation,youku_acquisition
ali_activation.package_name
=
com.taobao.foractivation.172393,com.taobao.foractivation.184287,com.taobao.foractivation.184289,
\
...
...
src/main/resources/logback.xml
View file @
730e68c1
...
...
@@ -21,7 +21,7 @@
</Pattern>
</layout>
</appender>
<root
level=
"
warn
"
>
<root
level=
"
info
"
>
<appender-ref
ref=
"console"
/>
</root>
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment