From ed2755dcce37318ffce21e5ecd7692416aa49a1d Mon Sep 17 00:00:00 2001 From: Ashutosh Mestry <amestry@cloudera.com> Date: Mon, 2 Mar 2020 08:05:55 -0800 Subject: [PATCH] ATLAS-3643: PC Fx: StatusReporter: Introduce Status Reporting to Be used With PC Framework --- intg/src/main/java/org/apache/atlas/pc/StatusReporter.java | 75 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ intg/src/test/java/org/apache/atlas/pc/StatusReporterTest.java | 94 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 169 insertions(+) create mode 100644 intg/src/main/java/org/apache/atlas/pc/StatusReporter.java create mode 100644 intg/src/test/java/org/apache/atlas/pc/StatusReporterTest.java diff --git a/intg/src/main/java/org/apache/atlas/pc/StatusReporter.java b/intg/src/main/java/org/apache/atlas/pc/StatusReporter.java new file mode 100644 index 0000000..f84e8d0 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/pc/StatusReporter.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.pc; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; + +public class StatusReporter<T, U> { + private Map<T,U> producedItems = new LinkedHashMap<>(); + private Set<T> processedSet = new HashSet<>(); + + public void produced(T item, U index) { + this.producedItems.put(item, index); + } + + public void processed(T item) { + this.processedSet.add(item); + } + + public void processed(T[] index) { + this.processedSet.addAll(Arrays.asList(index)); + } + + public U ack() { + U ack = null; + U ret; + do { + ret = completionIndex(getFirstElement(this.producedItems)); + if (ret != null) { + ack = ret; + } + } while(ret != null); + + return ack; + } + + private Map.Entry<T, U> getFirstElement(Map<T, U> map) { + if (map.isEmpty()) { + return null; + } + + return map.entrySet().iterator().next(); + } + + private U completionIndex(Map.Entry<T, U> lookFor) { + U ack = null; + if (lookFor == null || !processedSet.contains(lookFor.getKey())) { + return ack; + } + + ack = lookFor.getValue(); + producedItems.remove(lookFor.getKey()); + processedSet.remove(lookFor); + return ack; + } +} diff --git a/intg/src/test/java/org/apache/atlas/pc/StatusReporterTest.java b/intg/src/test/java/org/apache/atlas/pc/StatusReporterTest.java new file mode 100644 index 0000000..3e50562 --- /dev/null +++ b/intg/src/test/java/org/apache/atlas/pc/StatusReporterTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.pc; + +import org.apache.commons.lang3.RandomUtils; +import org.testng.annotations.Test; + +import java.util.concurrent.BlockingQueue; + +import static org.testng.Assert.assertEquals; + +public class StatusReporterTest { + private static class IntegerConsumer extends WorkItemConsumer<Integer> { + private static ThreadLocal<Integer> payload = new ThreadLocal<Integer>(); + private Integer current; + + public IntegerConsumer(BlockingQueue<Integer> queue) { + super(queue); + } + + @Override + protected void doCommit() { + addResult(current); + } + + @Override + protected void processItem(Integer item) { + try { + this.current = item; + Thread.sleep(20 + RandomUtils.nextInt(5, 7)); + super.commit(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + private class IntegerConsumerBuilder implements WorkItemBuilder<IntegerConsumer, Integer> { + @Override + public IntegerConsumer build(BlockingQueue<Integer> queue) { + return new IntegerConsumer(queue); + } + } + + private WorkItemManager<Integer, WorkItemConsumer> getWorkItemManger(IntegerConsumerBuilder cb, int numWorkers) { + return new WorkItemManager<>(cb, "IntegerConsumer", 5, numWorkers, true); + } + + @Test + public void statusReporting() throws InterruptedException { + final int maxItems = 50; + + IntegerConsumerBuilder cb = new IntegerConsumerBuilder(); + WorkItemManager<Integer, WorkItemConsumer> wi = getWorkItemManger(cb, 5); + StatusReporter<Integer, Integer> statusReporter = new StatusReporter<>(); + + for (int i = 0; i < maxItems; i++) { + wi.produce(i); + statusReporter.produced(i, i); + + extractResults(wi, statusReporter); + } + + wi.drain(); + extractResults(wi, statusReporter); + assertEquals(statusReporter.ack().intValue(), (maxItems - 1)); + wi.shutdown(); + } + + private void extractResults(WorkItemManager<Integer, WorkItemConsumer> wi, StatusReporter<Integer, Integer> statusReporter) { + Object result = null; + while((result = wi.getResults().poll()) != null) { + if (result == null || !(result instanceof Integer)) { + continue; + } + statusReporter.processed((Integer) result); + } + } +} -- libgit2 0.27.1