Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
atlas
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
dataplatform
atlas
Commits
572c0b80
Commit
572c0b80
authored
5 years ago
by
Ashutosh Mestry
Committed by
Sarath Subramanian
5 years ago
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
ATLAS-3090: PC Framework: Improve Existing Producer Consumer Framework
Signed-off-by:
Sarath Subramanian
<
ssubramanian@hortonworks.com
>
parent
f029a4e0
Show whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
380 additions
and
32 deletions
+380
-32
JsonNodeProcessManager.java
...itory/graphdb/janus/migration/JsonNodeProcessManager.java
+17
-12
WorkItemConsumer.java
intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
+38
-12
WorkItemManager.java
intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java
+64
-7
WorkItemConsumerTest.java
...c/test/java/org/apache/atlas/pc/WorkItemConsumerTest.java
+3
-1
WorkItemConsumerWithResultsTest.java
.../org/apache/atlas/pc/WorkItemConsumerWithResultsTest.java
+108
-0
WorkItemManagerWithResultsTest.java
...a/org/apache/atlas/pc/WorkItemManagerWithResultsTest.java
+150
-0
No files found.
graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeProcessManager.java
View file @
572c0b80
...
@@ -31,6 +31,7 @@ import java.util.List;
...
@@ -31,6 +31,7 @@ import java.util.List;
import
java.util.Map
;
import
java.util.Map
;
import
java.util.NoSuchElementException
;
import
java.util.NoSuchElementException
;
import
java.util.concurrent.BlockingQueue
;
import
java.util.concurrent.BlockingQueue
;
import
java.util.concurrent.atomic.AtomicLong
;
public
class
JsonNodeProcessManager
{
public
class
JsonNodeProcessManager
{
private
static
class
Consumer
extends
WorkItemConsumer
<
JsonNode
>
{
private
static
class
Consumer
extends
WorkItemConsumer
<
JsonNode
>
{
...
@@ -42,9 +43,9 @@ public class JsonNodeProcessManager {
...
@@ -42,9 +43,9 @@ public class JsonNodeProcessManager {
protected
final
Graph
bulkLoadGraph
;
protected
final
Graph
bulkLoadGraph
;
protected
final
ParseElement
parseElement
;
protected
final
ParseElement
parseElement
;
private
final
long
batchSize
;
private
final
long
batchSize
;
private
long
counter
;
private
AtomicLong
counter
;
private
final
MappedElementCache
cache
;
private
final
MappedElementCache
cache
;
private
final
List
<
JsonNode
>
nodes
=
new
ArrayList
<>(
);
private
static
ThreadLocal
<
List
<
JsonNode
>>
nodes
=
ThreadLocal
.
withInitial
(()
->
new
ArrayList
<>()
);
public
Consumer
(
BlockingQueue
<
JsonNode
>
workQueue
,
Graph
graph
,
Graph
bulkLoadGraph
,
ParseElement
parseElement
,
long
batchSize
)
{
public
Consumer
(
BlockingQueue
<
JsonNode
>
workQueue
,
Graph
graph
,
Graph
bulkLoadGraph
,
ParseElement
parseElement
,
long
batchSize
)
{
super
(
workQueue
);
super
(
workQueue
);
...
@@ -53,7 +54,7 @@ public class JsonNodeProcessManager {
...
@@ -53,7 +54,7 @@ public class JsonNodeProcessManager {
this
.
bulkLoadGraph
=
bulkLoadGraph
;
this
.
bulkLoadGraph
=
bulkLoadGraph
;
this
.
parseElement
=
parseElement
;
this
.
parseElement
=
parseElement
;
this
.
batchSize
=
batchSize
;
this
.
batchSize
=
batchSize
;
this
.
counter
=
0
;
this
.
counter
=
new
AtomicLong
(
0
)
;
this
.
cache
=
new
MappedElementCache
();
this
.
cache
=
new
MappedElementCache
();
}
}
...
@@ -63,8 +64,8 @@ public class JsonNodeProcessManager {
...
@@ -63,8 +64,8 @@ public class JsonNodeProcessManager {
Map
<
String
,
Object
>
result
=
parseElement
.
parse
(
bulkLoadGraph
,
cache
,
node
);
Map
<
String
,
Object
>
result
=
parseElement
.
parse
(
bulkLoadGraph
,
cache
,
node
);
if
(
result
==
null
)
{
if
(
result
==
null
)
{
nodes
.
add
(
node
);
addNode
(
node
);
commitConditionally
(
counter
++
);
commitConditionally
(
counter
.
getAndIncrement
()
);
}
else
{
}
else
{
commitBulk
();
commitBulk
();
cache
.
clearAll
();
cache
.
clearAll
();
...
@@ -77,6 +78,10 @@ public class JsonNodeProcessManager {
...
@@ -77,6 +78,10 @@ public class JsonNodeProcessManager {
}
}
}
}
private
void
addNode
(
JsonNode
node
)
{
nodes
.
get
().
add
(
node
);
}
@Override
@Override
protected
void
commitDirty
()
{
protected
void
commitDirty
()
{
super
.
commitDirty
();
super
.
commitDirty
();
...
@@ -89,18 +94,18 @@ public class JsonNodeProcessManager {
...
@@ -89,18 +94,18 @@ public class JsonNodeProcessManager {
}
}
private
void
commitConditionally
(
long
index
)
{
private
void
commitConditionally
(
long
index
)
{
if
(
index
%
batchSize
==
0
&&
nodes
.
size
()
>
0
)
{
if
(
index
%
batchSize
==
0
&&
nodes
.
get
().
size
()
>
0
)
{
commitBulk
();
commitBulk
();
}
}
}
}
private
void
commitBulk
()
{
private
void
commitBulk
()
{
commit
(
bulkLoadGraph
,
nodes
.
size
());
commit
(
bulkLoadGraph
,
nodes
.
get
().
size
());
nodes
.
clear
();
nodes
.
get
().
clear
();
}
}
private
void
commitRegular
()
{
private
void
commitRegular
()
{
commit
(
graph
,
nodes
.
size
());
commit
(
graph
,
nodes
.
get
().
size
());
cache
.
clearAll
();
cache
.
clearAll
();
}
}
...
@@ -139,15 +144,15 @@ public class JsonNodeProcessManager {
...
@@ -139,15 +144,15 @@ public class JsonNodeProcessManager {
}
}
private
void
retryBatchCommit
()
{
private
void
retryBatchCommit
()
{
display
(
"Waiting with [{} nodes] for 1 secs."
,
nodes
.
size
());
display
(
"Waiting with [{} nodes] for 1 secs."
,
nodes
.
get
().
size
());
try
{
try
{
Thread
.
sleep
(
WAIT_DURATION_AFTER_COMMIT_EXCEPTION
);
Thread
.
sleep
(
WAIT_DURATION_AFTER_COMMIT_EXCEPTION
);
for
(
JsonNode
n
:
nodes
)
{
for
(
JsonNode
n
:
nodes
.
get
()
)
{
parseElement
.
parse
(
bulkLoadGraph
,
cache
,
n
);
parseElement
.
parse
(
bulkLoadGraph
,
cache
,
n
);
}
}
commitBulk
();
commitBulk
();
display
(
"Done!: After re-adding {}."
,
nodes
.
size
());
display
(
"Done!: After re-adding {}."
,
nodes
.
get
().
size
());
}
catch
(
Exception
ex
)
{
}
catch
(
Exception
ex
)
{
error
(
"retryBatchCommit: Failed! Potential data loss."
,
ex
);
error
(
"retryBatchCommit: Failed! Potential data loss."
,
ex
);
}
}
...
...
This diff is collapsed.
Click to expand it.
intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
View file @
572c0b80
...
@@ -22,24 +22,31 @@ import org.slf4j.Logger;
...
@@ -22,24 +22,31 @@ import org.slf4j.Logger;
import
org.slf4j.LoggerFactory
;
import
org.slf4j.LoggerFactory
;
import
java.util.concurrent.BlockingQueue
;
import
java.util.concurrent.BlockingQueue
;
import
java.util.concurrent.CountDownLatch
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.atomic.AtomicBoolean
;
import
java.util.concurrent.atomic.AtomicLong
;
public
abstract
class
WorkItemConsumer
<
T
>
implements
Runnable
{
public
abstract
class
WorkItemConsumer
<
T
>
implements
Runnable
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
WorkItemConsumer
.
class
);
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
WorkItemConsumer
.
class
);
private
static
final
int
POLLING_DURATION_SECONDS
=
5
;
private
static
final
int
POLLING_DURATION_SECONDS
=
5
;
private
static
final
int
DEFAULT_COMMIT_TIME_IN_MS
=
15000
;
private
final
BlockingQueue
<
T
>
queue
;
private
final
BlockingQueue
<
T
>
queue
;
private
boolean
isDirty
=
false
;
private
AtomicBoolean
isDirty
=
new
AtomicBoolean
(
false
);
private
long
maxCommitTimeInMs
=
0
;
private
AtomicLong
maxCommitTimeInMs
=
new
AtomicLong
(
0
);
private
CountDownLatch
countdownLatch
;
private
BlockingQueue
<
Object
>
results
;
public
WorkItemConsumer
(
BlockingQueue
<
T
>
queue
)
{
public
WorkItemConsumer
(
BlockingQueue
<
T
>
queue
)
{
this
.
queue
=
queue
;
this
.
queue
=
queue
;
}
}
public
void
run
()
{
public
void
run
()
{
while
(!
Thread
.
currentThread
().
isInterrupted
())
{
try
{
try
{
while
(!
Thread
.
currentThread
().
isInterrupted
())
{
T
item
=
queue
.
poll
(
POLLING_DURATION_SECONDS
,
TimeUnit
.
SECONDS
);
T
item
=
queue
.
poll
(
POLLING_DURATION_SECONDS
,
TimeUnit
.
SECONDS
);
if
(
item
==
null
)
{
if
(
item
==
null
)
{
...
@@ -47,21 +54,24 @@ public abstract class WorkItemConsumer<T> implements Runnable {
...
@@ -47,21 +54,24 @@ public abstract class WorkItemConsumer<T> implements Runnable {
return
;
return
;
}
}
isDirty
=
true
;
isDirty
.
set
(
true
);
processItem
(
item
);
processItem
(
item
);
}
}
catch
(
InterruptedException
e
)
{
}
catch
(
InterruptedException
e
)
{
LOG
.
error
(
"WorkItemConsumer: Interrupted: "
,
e
);
LOG
.
error
(
"WorkItemConsumer: Interrupted: "
,
e
);
}
}
finally
{
maxCommitTimeInMs
.
set
(
0
);
countdownLatch
.
countDown
();
}
}
}
}
public
long
getMaxCommitTimeSeconds
()
{
public
long
getMaxCommitTimeInMs
()
{
return
(
this
.
maxCommitTimeInMs
>
0
?
this
.
maxCommitTimeInMs
/
1000
:
15
);
long
commitTime
=
this
.
maxCommitTimeInMs
.
get
();
return
((
commitTime
>
DEFAULT_COMMIT_TIME_IN_MS
)
?
commitTime
:
DEFAULT_COMMIT_TIME_IN_MS
);
}
}
protected
void
commitDirty
()
{
protected
void
commitDirty
()
{
if
(!
isDirty
)
{
if
(!
isDirty
.
get
()
)
{
return
;
return
;
}
}
...
@@ -78,16 +88,32 @@ public abstract class WorkItemConsumer<T> implements Runnable {
...
@@ -78,16 +88,32 @@ public abstract class WorkItemConsumer<T> implements Runnable {
updateCommitTime
((
end
-
start
));
updateCommitTime
((
end
-
start
));
isDirty
=
false
;
isDirty
.
set
(
false
)
;
}
}
protected
abstract
void
doCommit
();
protected
abstract
void
doCommit
();
protected
abstract
void
processItem
(
T
item
);
protected
abstract
void
processItem
(
T
item
);
protected
void
addResult
(
Object
value
)
{
try
{
results
.
put
(
value
);
}
catch
(
InterruptedException
e
)
{
LOG
.
error
(
"Interrupted while adding result: {}"
,
value
);
}
}
protected
void
updateCommitTime
(
long
commitTime
)
{
protected
void
updateCommitTime
(
long
commitTime
)
{
if
(
this
.
maxCommitTimeInMs
<
commitTime
)
{
if
(
this
.
maxCommitTimeInMs
.
get
()
<
commitTime
)
{
this
.
maxCommitTimeInMs
=
commitTime
;
this
.
maxCommitTimeInMs
.
set
(
commitTime
);
}
}
}
public
void
setCountDownLatch
(
CountDownLatch
countdownLatch
)
{
this
.
countdownLatch
=
countdownLatch
;
}
public
<
V
>
void
setResults
(
BlockingQueue
<
Object
>
queue
)
{
this
.
results
=
queue
;
}
}
}
}
This diff is collapsed.
Click to expand it.
intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java
View file @
572c0b80
...
@@ -17,7 +17,7 @@
...
@@ -17,7 +17,7 @@
*/
*/
package
org
.
apache
.
atlas
.
pc
;
package
org
.
apache
.
atlas
.
pc
;
import
org.apache.curator.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder
;
import
org.slf4j.Logger
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.slf4j.LoggerFactory
;
...
@@ -28,19 +28,51 @@ import java.util.concurrent.*;
...
@@ -28,19 +28,51 @@ import java.util.concurrent.*;
public
class
WorkItemManager
<
T
,
U
extends
WorkItemConsumer
>
{
public
class
WorkItemManager
<
T
,
U
extends
WorkItemConsumer
>
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
WorkItemManager
.
class
);
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
WorkItemManager
.
class
);
private
final
int
numWorkers
;
private
final
BlockingQueue
<
T
>
workQueue
;
private
final
BlockingQueue
<
T
>
workQueue
;
private
final
ExecutorService
service
;
private
final
ExecutorService
service
;
private
final
List
<
U
>
consumers
=
new
ArrayList
<>();
private
final
List
<
U
>
consumers
=
new
ArrayList
<>();
private
CountDownLatch
countdownLatch
;
private
BlockingQueue
<
Object
>
resultsQueue
;
public
WorkItemManager
(
WorkItemBuilder
builder
,
int
batchSize
,
int
numWorkers
)
{
public
WorkItemManager
(
WorkItemBuilder
builder
,
String
namePrefix
,
int
batchSize
,
int
numWorkers
,
boolean
collectResults
)
{
this
.
numWorkers
=
numWorkers
;
workQueue
=
new
LinkedBlockingQueue
<>(
batchSize
*
numWorkers
);
workQueue
=
new
LinkedBlockingQueue
<>(
batchSize
*
numWorkers
);
service
=
Executors
.
newFixedThreadPool
(
numWorkers
);
service
=
Executors
.
newFixedThreadPool
(
numWorkers
,
new
ThreadFactoryBuilder
().
setNameFormat
(
namePrefix
+
"-%d"
).
build
());
createConsumers
(
builder
,
numWorkers
,
collectResults
);
execute
();
}
public
WorkItemManager
(
WorkItemBuilder
builder
,
int
batchSize
,
int
numWorkers
)
{
this
(
builder
,
"workItem"
,
batchSize
,
numWorkers
,
false
);
}
public
void
setResultsCollection
(
BlockingQueue
<
Object
>
resultsQueue
)
{
this
.
resultsQueue
=
resultsQueue
;
}
private
void
createConsumers
(
WorkItemBuilder
builder
,
int
numWorkers
,
boolean
collectResults
)
{
if
(
collectResults
)
{
setResultsCollection
(
new
LinkedBlockingQueue
<>());
}
for
(
int
i
=
0
;
i
<
numWorkers
;
i
++)
{
for
(
int
i
=
0
;
i
<
numWorkers
;
i
++)
{
U
c
=
(
U
)
builder
.
build
(
workQueue
);
U
c
=
(
U
)
builder
.
build
(
workQueue
);
service
.
submit
(
c
);
consumers
.
add
(
c
);
consumers
.
add
(
c
);
if
(
collectResults
)
{
c
.
setResults
(
resultsQueue
);
}
}
}
private
void
execute
()
{
this
.
countdownLatch
=
new
CountDownLatch
(
numWorkers
);
for
(
U
c
:
consumers
)
{
c
.
setCountDownLatch
(
countdownLatch
);
service
.
execute
(
c
);
}
}
}
}
...
@@ -52,6 +84,27 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
...
@@ -52,6 +84,27 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
}
}
}
}
public
void
checkProduce
(
T
item
)
{
if
(
countdownLatch
.
getCount
()
==
0
)
{
execute
();
}
produce
(
item
);
}
public
void
drain
()
{
try
{
if
(
countdownLatch
==
null
||
countdownLatch
.
getCount
()
==
0
)
{
return
;
}
LOG
.
debug
(
"Drain: Stated! Queue size: {}"
,
workQueue
.
size
());
this
.
countdownLatch
.
await
();
LOG
.
debug
(
"Drain: Done! Queue size: {}"
,
workQueue
.
size
());
}
catch
(
InterruptedException
ex
)
{
Thread
.
currentThread
().
interrupt
();
}
}
public
void
shutdown
()
throws
InterruptedException
{
public
void
shutdown
()
throws
InterruptedException
{
int
avgCommitTimeSeconds
=
getAvgCommitTimeSeconds
()
*
2
;
int
avgCommitTimeSeconds
=
getAvgCommitTimeSeconds
()
*
2
;
...
@@ -63,13 +116,17 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
...
@@ -63,13 +116,17 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
LOG
.
info
(
"WorkItemManager: Shutdown done!"
);
LOG
.
info
(
"WorkItemManager: Shutdown done!"
);
}
}
public
BlockingQueue
getResults
()
{
return
this
.
resultsQueue
;
}
private
int
getAvgCommitTimeSeconds
()
{
private
int
getAvgCommitTimeSeconds
()
{
int
commitTimeSeconds
=
0
;
int
commitTimeSeconds
=
0
;
for
(
U
c
:
consumers
)
{
for
(
U
c
:
consumers
)
{
commitTimeSeconds
+=
c
.
getMaxCommitTime
Second
s
();
commitTimeSeconds
+=
c
.
getMaxCommitTime
InM
s
();
}
}
return
commitTimeSeconds
/
consumers
.
size
()
;
return
(
commitTimeSeconds
/
consumers
.
size
())
/
1000
;
}
}
}
}
This diff is collapsed.
Click to expand it.
intg/src/test/java/org/apache/atlas/pc/WorkItemConsumerTest.java
View file @
572c0b80
...
@@ -21,6 +21,7 @@ package org.apache.atlas.pc;
...
@@ -21,6 +21,7 @@ package org.apache.atlas.pc;
import
org.testng.annotations.Test
;
import
org.testng.annotations.Test
;
import
java.util.concurrent.BlockingQueue
;
import
java.util.concurrent.BlockingQueue
;
import
java.util.concurrent.CountDownLatch
;
import
java.util.concurrent.LinkedBlockingQueue
;
import
java.util.concurrent.LinkedBlockingQueue
;
import
static
org
.
testng
.
Assert
.
assertFalse
;
import
static
org
.
testng
.
Assert
.
assertFalse
;
...
@@ -28,12 +29,13 @@ import static org.testng.Assert.assertTrue;
...
@@ -28,12 +29,13 @@ import static org.testng.Assert.assertTrue;
public
class
WorkItemConsumerTest
{
public
class
WorkItemConsumerTest
{
private
class
IntegerConsumerSpy
extends
WorkItemConsumer
<
Integer
>
{
static
class
IntegerConsumerSpy
extends
WorkItemConsumer
<
Integer
>
{
boolean
commitDirtyCalled
=
false
;
boolean
commitDirtyCalled
=
false
;
private
boolean
updateCommitTimeCalled
;
private
boolean
updateCommitTimeCalled
;
public
IntegerConsumerSpy
(
BlockingQueue
<
Integer
>
queue
)
{
public
IntegerConsumerSpy
(
BlockingQueue
<
Integer
>
queue
)
{
super
(
queue
);
super
(
queue
);
setCountDownLatch
(
new
CountDownLatch
(
1
));
}
}
@Override
@Override
...
...
This diff is collapsed.
Click to expand it.
intg/src/test/java/org/apache/atlas/pc/WorkItemConsumerWithResultsTest.java
0 → 100644
View file @
572c0b80
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
pc
;
import
org.testng.annotations.Test
;
import
java.util.concurrent.BlockingQueue
;
import
java.util.concurrent.CountDownLatch
;
import
java.util.concurrent.LinkedBlockingQueue
;
import
static
org
.
testng
.
Assert
.
assertEquals
;
import
static
org
.
testng
.
Assert
.
assertTrue
;
public
class
WorkItemConsumerWithResultsTest
{
private
class
IntegerConsumerSpy
extends
WorkItemConsumer
<
Integer
>
{
int
payload
=
-
1
;
public
IntegerConsumerSpy
(
BlockingQueue
<
Integer
>
queue
)
{
super
(
queue
);
}
@Override
protected
void
doCommit
()
{
addResult
(
payload
);
}
@Override
protected
void
processItem
(
Integer
item
)
{
payload
=
item
.
intValue
();
}
@Override
protected
void
commitDirty
()
{
super
.
commitDirty
();
}
}
private
class
IntegerConsumerThrowingError
extends
WorkItemConsumer
<
Integer
>
{
int
payload
=
-
1
;
public
IntegerConsumerThrowingError
(
BlockingQueue
<
Integer
>
queue
)
{
super
(
queue
);
}
@Override
protected
void
doCommit
()
{
throw
new
NullPointerException
();
}
@Override
protected
void
processItem
(
Integer
item
)
{
payload
=
item
.
intValue
();
}
@Override
protected
void
commitDirty
()
{
super
.
commitDirty
();
}
}
@Test
public
void
runningConsumerWillPopulateResults
()
{
CountDownLatch
countDownLatch
=
new
CountDownLatch
(
1
);
BlockingQueue
<
Integer
>
bc
=
new
LinkedBlockingQueue
<>(
5
);
LinkedBlockingQueue
<
Object
>
results
=
new
LinkedBlockingQueue
<>();
IntegerConsumerSpy
ic
=
new
IntegerConsumerSpy
(
bc
);
ic
.
setResults
(
results
);
ic
.
setCountDownLatch
(
countDownLatch
);
ic
.
run
();
assertTrue
(
bc
.
isEmpty
());
assertEquals
(
results
.
size
(),
bc
.
size
());
assertEquals
(
countDownLatch
.
getCount
(),
0
);
}
@Test
public
void
errorInConsumerWillDecrementCountdownLatch
()
{
CountDownLatch
countDownLatch
=
new
CountDownLatch
(
1
);
BlockingQueue
<
Integer
>
bc
=
new
LinkedBlockingQueue
<>(
5
);
LinkedBlockingQueue
<
Object
>
results
=
new
LinkedBlockingQueue
<>();
IntegerConsumerThrowingError
ic
=
new
IntegerConsumerThrowingError
(
bc
);
ic
.
setCountDownLatch
(
countDownLatch
);
ic
.
setResults
(
results
);
ic
.
run
();
assertTrue
(
bc
.
isEmpty
());
assertTrue
(
results
.
isEmpty
());
assertEquals
(
countDownLatch
.
getCount
(),
0
);
}
}
This diff is collapsed.
Click to expand it.
intg/src/test/java/org/apache/atlas/pc/WorkItemManagerWithResultsTest.java
0 → 100644
View file @
572c0b80
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
pc
;
import
org.apache.commons.lang3.RandomUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.testng.annotations.Test
;
import
java.util.HashSet
;
import
java.util.Set
;
import
java.util.concurrent.BlockingQueue
;
import
static
org
.
testng
.
Assert
.
assertEquals
;
import
static
org
.
testng
.
Assert
.
assertTrue
;
public
class
WorkItemManagerWithResultsTest
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
WorkItemManagerWithResultsTest
.
class
);
private
static
class
IntegerConsumer
extends
WorkItemConsumer
<
Integer
>
{
private
static
ThreadLocal
<
Integer
>
payload
=
new
ThreadLocal
<
Integer
>();
public
IntegerConsumer
(
BlockingQueue
<
Integer
>
queue
)
{
super
(
queue
);
}
@Override
protected
void
doCommit
()
{
if
(
getPayload
()
==
-
1
)
{
LOG
.
debug
(
"Skipping:"
);
return
;
}
incrementPayload
(
100
);
addResult
(
getPayload
());
setPayload
(
0
);
}
@Override
protected
void
processItem
(
Integer
item
)
{
try
{
setPayload
(
item
.
intValue
());
Thread
.
sleep
(
20
+
RandomUtils
.
nextInt
(
5
,
7
));
super
.
commit
();
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
();
}
}
public
void
setPayload
(
int
v
)
{
payload
.
set
(
v
);
}
public
int
getPayload
()
{
return
payload
.
get
();
}
public
void
incrementPayload
(
int
v
)
{
payload
.
set
(
payload
.
get
()
+
v
);
}
}
private
class
IntegerConsumerBuilder
implements
WorkItemBuilder
<
IntegerConsumer
,
Integer
>
{
@Override
public
IntegerConsumer
build
(
BlockingQueue
<
Integer
>
queue
)
{
return
new
IntegerConsumer
(
queue
);
}
}
private
WorkItemManager
<
Integer
,
WorkItemConsumer
>
getWorkItemManger
(
IntegerConsumerBuilder
cb
,
int
numWorkers
)
{
return
new
WorkItemManager
<>(
cb
,
"IntegerConsumer"
,
5
,
numWorkers
,
true
);
}
@Test
public
void
drainTest
()
throws
InterruptedException
{
final
int
maxItems
=
50
;
IntegerConsumerBuilder
cb
=
new
IntegerConsumerBuilder
();
WorkItemManager
<
Integer
,
WorkItemConsumer
>
wi
=
getWorkItemManger
(
cb
,
5
);
for
(
int
i
=
0
;
i
<
maxItems
;
i
++)
{
wi
.
produce
(
i
);
}
wi
.
drain
();
assertEquals
(
wi
.
getResults
().
size
(),
maxItems
);
Set
<
Integer
>
set
=
new
HashSet
<
Integer
>(
wi
.
getResults
());
assertEquals
(
set
.
size
(),
maxItems
);
wi
.
shutdown
();
}
@Test
public
void
drainCheckProduceTest
()
throws
InterruptedException
{
IntegerConsumerBuilder
cb
=
new
IntegerConsumerBuilder
();
WorkItemManager
<
Integer
,
WorkItemConsumer
>
wi
=
getWorkItemManger
(
cb
,
2
);
for
(
int
i
=
0
;
i
<
5
;
i
++)
{
repeatedDrainAndProduce
(
i
,
wi
);
}
wi
.
shutdown
();
}
private
void
repeatedDrainAndProduce
(
int
runCount
,
WorkItemManager
<
Integer
,
WorkItemConsumer
>
wi
)
{
final
int
maxItems
=
100
;
int
halfWay
=
maxItems
/
2
;
LOG
.
info
(
"Run: {}"
,
runCount
);
wi
.
getResults
().
clear
();
for
(
int
i
=
0
;
i
<
maxItems
;
i
++)
{
if
(
i
==
halfWay
)
{
wi
.
drain
();
Set
<
Integer
>
set
=
new
HashSet
<
Integer
>(
wi
.
getResults
());
assertEquals
(
wi
.
getResults
().
size
(),
halfWay
,
"halfWay: total count"
);
assertEquals
(
set
.
size
(),
halfWay
,
"halfWay: set match"
);
}
wi
.
checkProduce
(
i
);
}
wi
.
drain
();
assertEquals
(
wi
.
getResults
().
size
(),
maxItems
,
"total count"
);
Set
<
Integer
>
set
=
new
HashSet
<
Integer
>(
wi
.
getResults
());
assertEquals
(
set
.
size
(),
maxItems
,
"set count"
);
for
(
int
i
=
100
;
i
<
100
+
maxItems
;
i
++)
{
assertTrue
(
set
.
contains
(
i
),
"Could not test: "
+
i
);
}
}
}
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment