Commit cc601d73 by Ashutosh Mestry

ATLAS-3642: PC fx: WorkItemManager getResults Modification.

parent e8661ecb
...@@ -21,6 +21,7 @@ package org.apache.atlas.pc; ...@@ -21,6 +21,7 @@ package org.apache.atlas.pc;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Queue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -37,7 +38,7 @@ public abstract class WorkItemConsumer<T> implements Runnable { ...@@ -37,7 +38,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
private final AtomicBoolean isDirty = new AtomicBoolean(false); private final AtomicBoolean isDirty = new AtomicBoolean(false);
private final AtomicLong maxCommitTimeInMs = new AtomicLong(DEFAULT_COMMIT_TIME_IN_MS); private final AtomicLong maxCommitTimeInMs = new AtomicLong(DEFAULT_COMMIT_TIME_IN_MS);
private CountDownLatch countdownLatch; private CountDownLatch countdownLatch;
private BlockingQueue<Object> results; private Queue<Object> results;
public WorkItemConsumer(BlockingQueue<T> queue) { public WorkItemConsumer(BlockingQueue<T> queue) {
this.queue = queue; this.queue = queue;
...@@ -101,11 +102,7 @@ public abstract class WorkItemConsumer<T> implements Runnable { ...@@ -101,11 +102,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
protected abstract void processItem(T item); protected abstract void processItem(T item);
protected void addResult(Object value) { protected void addResult(Object value) {
try { results.add(value);
results.put(value);
} catch (InterruptedException e) {
LOG.error("Interrupted while adding result: {}", value);
}
} }
protected void updateCommitTime(long commitTime) { protected void updateCommitTime(long commitTime) {
...@@ -118,7 +115,7 @@ public abstract class WorkItemConsumer<T> implements Runnable { ...@@ -118,7 +115,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
this.countdownLatch = countdownLatch; this.countdownLatch = countdownLatch;
} }
public <V> void setResults(BlockingQueue<Object> queue) { public <V> void setResults(Queue<Object> queue) {
this.results = queue; this.results = queue;
} }
} }
...@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory; ...@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Queue;
import java.util.concurrent.*; import java.util.concurrent.*;
public class WorkItemManager<T, U extends WorkItemConsumer> { public class WorkItemManager<T, U extends WorkItemConsumer> {
...@@ -33,7 +34,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> { ...@@ -33,7 +34,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
private final ExecutorService service; private final ExecutorService service;
private final List<U> consumers = new ArrayList<>(); private final List<U> consumers = new ArrayList<>();
private CountDownLatch countdownLatch; private CountDownLatch countdownLatch;
private BlockingQueue<Object> resultsQueue; private Queue<Object> resultsQueue;
public WorkItemManager(WorkItemBuilder builder, String namePrefix, int batchSize, int numWorkers, boolean collectResults) { public WorkItemManager(WorkItemBuilder builder, String namePrefix, int batchSize, int numWorkers, boolean collectResults) {
this.numWorkers = numWorkers; this.numWorkers = numWorkers;
...@@ -49,13 +50,13 @@ public class WorkItemManager<T, U extends WorkItemConsumer> { ...@@ -49,13 +50,13 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
this(builder, "workItemConsumer", batchSize, numWorkers, false); this(builder, "workItemConsumer", batchSize, numWorkers, false);
} }
public void setResultsCollection(BlockingQueue<Object> resultsQueue) { public void setResultsCollection(Queue<Object> resultsQueue) {
this.resultsQueue = resultsQueue; this.resultsQueue = resultsQueue;
} }
private void createConsumers(WorkItemBuilder builder, int numWorkers, boolean collectResults) { private void createConsumers(WorkItemBuilder builder, int numWorkers, boolean collectResults) {
if (collectResults) { if (collectResults) {
setResultsCollection(new LinkedBlockingQueue<>()); setResultsCollection(new ConcurrentLinkedQueue<>());
} }
for (int i = 0; i < numWorkers; i++) { for (int i = 0; i < numWorkers; i++) {
...@@ -124,7 +125,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> { ...@@ -124,7 +125,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
LOG.info("WorkItemManager: Shutdown done!"); LOG.info("WorkItemManager: Shutdown done!");
} }
public BlockingQueue getResults() { public Queue getResults() {
return this.resultsQueue; return this.resultsQueue;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment