Commit ed2755dc by Ashutosh Mestry

ATLAS-3643: PC Fx: StatusReporter: Introduce Status Reporting to Be used With PC Framework

parent 6a49d94f
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.pc;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
public class StatusReporter<T, U> {
private Map<T,U> producedItems = new LinkedHashMap<>();
private Set<T> processedSet = new HashSet<>();
public void produced(T item, U index) {
this.producedItems.put(item, index);
}
public void processed(T item) {
this.processedSet.add(item);
}
public void processed(T[] index) {
this.processedSet.addAll(Arrays.asList(index));
}
public U ack() {
U ack = null;
U ret;
do {
ret = completionIndex(getFirstElement(this.producedItems));
if (ret != null) {
ack = ret;
}
} while(ret != null);
return ack;
}
private Map.Entry<T, U> getFirstElement(Map<T, U> map) {
if (map.isEmpty()) {
return null;
}
return map.entrySet().iterator().next();
}
private U completionIndex(Map.Entry<T, U> lookFor) {
U ack = null;
if (lookFor == null || !processedSet.contains(lookFor.getKey())) {
return ack;
}
ack = lookFor.getValue();
producedItems.remove(lookFor.getKey());
processedSet.remove(lookFor);
return ack;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.pc;
import org.apache.commons.lang3.RandomUtils;
import org.testng.annotations.Test;
import java.util.concurrent.BlockingQueue;
import static org.testng.Assert.assertEquals;
public class StatusReporterTest {
private static class IntegerConsumer extends WorkItemConsumer<Integer> {
private static ThreadLocal<Integer> payload = new ThreadLocal<Integer>();
private Integer current;
public IntegerConsumer(BlockingQueue<Integer> queue) {
super(queue);
}
@Override
protected void doCommit() {
addResult(current);
}
@Override
protected void processItem(Integer item) {
try {
this.current = item;
Thread.sleep(20 + RandomUtils.nextInt(5, 7));
super.commit();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private class IntegerConsumerBuilder implements WorkItemBuilder<IntegerConsumer, Integer> {
@Override
public IntegerConsumer build(BlockingQueue<Integer> queue) {
return new IntegerConsumer(queue);
}
}
private WorkItemManager<Integer, WorkItemConsumer> getWorkItemManger(IntegerConsumerBuilder cb, int numWorkers) {
return new WorkItemManager<>(cb, "IntegerConsumer", 5, numWorkers, true);
}
@Test
public void statusReporting() throws InterruptedException {
final int maxItems = 50;
IntegerConsumerBuilder cb = new IntegerConsumerBuilder();
WorkItemManager<Integer, WorkItemConsumer> wi = getWorkItemManger(cb, 5);
StatusReporter<Integer, Integer> statusReporter = new StatusReporter<>();
for (int i = 0; i < maxItems; i++) {
wi.produce(i);
statusReporter.produced(i, i);
extractResults(wi, statusReporter);
}
wi.drain();
extractResults(wi, statusReporter);
assertEquals(statusReporter.ack().intValue(), (maxItems - 1));
wi.shutdown();
}
private void extractResults(WorkItemManager<Integer, WorkItemConsumer> wi, StatusReporter<Integer, Integer> statusReporter) {
Object result = null;
while((result = wi.getResults().poll()) != null) {
if (result == null || !(result instanceof Integer)) {
continue;
}
statusReporter.processed((Integer) result);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment