Commit b837c0ee by ashutoshm

ATLAS-2047: Exception Thrown by Kafka Consumer Ends up Filling Logs Due to Incorrect Handling

parent ef300f15
...@@ -84,6 +84,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -84,6 +84,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries"; public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries";
public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize"; public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize";
public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval"; public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval";
public static final String CONSUMER_MIN_RETRY_INTERVAL = "atlas.notification.consumer.min.retry.interval";
public static final String CONSUMER_MAX_RETRY_INTERVAL = "atlas.notification.consumer.max.retry.interval";
public static final int SERVER_READY_WAIT_TIME_MS = 1000; public static final int SERVER_READY_WAIT_TIME_MS = 1000;
private final AtlasEntityStore atlasEntityStore; private final AtlasEntityStore atlasEntityStore;
...@@ -92,7 +95,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -92,7 +95,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final AtlasTypeRegistry typeRegistry; private final AtlasTypeRegistry typeRegistry;
private final int maxRetries; private final int maxRetries;
private final int failedMsgCacheSize; private final int failedMsgCacheSize;
private final int consumerRetryInterval;
@VisibleForTesting
final int consumerRetryInterval;
private final int minWaitDuration;
private final int maxWaitDuration;
private NotificationInterface notificationInterface; private NotificationInterface notificationInterface;
private ExecutorService executors; private ExecutorService executors;
...@@ -116,7 +123,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -116,7 +123,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3); maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20); failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20);
consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500); consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
minWaitDuration = applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms by default
maxWaitDuration = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60); // 30 sec by default
} }
@Override @Override
...@@ -214,12 +222,64 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -214,12 +222,64 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
} }
} }
static class AdaptiveWaiter {
private final long increment;
private final long maxDuration;
private final long minDuration;
private final long resetInterval;
private long lastWaitAt;
@VisibleForTesting
long waitDuration;
public AdaptiveWaiter(long minDuration, long maxDuration, long increment) {
this.minDuration = minDuration;
this.maxDuration = maxDuration;
this.increment = increment;
this.waitDuration = minDuration;
this.lastWaitAt = 0;
this.resetInterval = maxDuration * 2;
}
public void pause(Exception ex) {
setWaitDurations();
try {
if (LOG.isDebugEnabled()) {
LOG.debug("{} in NotificationHookConsumer. Waiting for {} ms for recovery.", ex.getClass().getName(), waitDuration, ex);
}
Thread.sleep(waitDuration);
} catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} in NotificationHookConsumer. Waiting for recovery interrupted.", ex.getClass().getName(), e);
}
}
}
private void setWaitDurations() {
long timeSinceLastWait = (lastWaitAt == 0) ? 0 : System.currentTimeMillis() - lastWaitAt;
lastWaitAt = System.currentTimeMillis();
if (timeSinceLastWait > resetInterval) {
waitDuration = minDuration;
} else {
waitDuration += increment;
if (waitDuration > maxDuration) {
waitDuration = maxDuration;
}
}
}
}
@VisibleForTesting @VisibleForTesting
class HookConsumer extends ShutdownableThread { class HookConsumer extends ShutdownableThread {
private final NotificationConsumer<HookNotificationMessage> consumer; private final NotificationConsumer<HookNotificationMessage> consumer;
private final AtomicBoolean shouldRun = new AtomicBoolean(false); private final AtomicBoolean shouldRun = new AtomicBoolean(false);
private List<HookNotificationMessage> failedMessages = new ArrayList<>(); private List<HookNotificationMessage> failedMessages = new ArrayList<>();
private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
public HookConsumer(NotificationConsumer<HookNotificationMessage> consumer) { public HookConsumer(NotificationConsumer<HookNotificationMessage> consumer) {
super("atlas-hook-consumer-thread", false); super("atlas-hook-consumer-thread", false);
this.consumer = consumer; this.consumer = consumer;
...@@ -242,16 +302,20 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -242,16 +302,20 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) { for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) {
handleMessage(msg); handleMessage(msg);
} }
} catch (IllegalStateException ex) {
adaptiveWaiter.pause(ex);
} catch (Exception e) { } catch (Exception e) {
if (shouldRun.get()) { if (shouldRun.get()) {
LOG.warn("Exception in NotificationHookConsumer", e); LOG.warn("Exception in NotificationHookConsumer", e);
adaptiveWaiter.pause(e);
} else {
break;
} }
} }
} }
} finally { } finally {
if (consumer != null) { if (consumer != null) {
LOG.info("closing NotificationConsumer"); LOG.info("closing NotificationConsumer");
consumer.close(); consumer.close();
} }
...@@ -424,7 +488,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -424,7 +488,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
// handle the case where thread was not started at all // handle the case where thread was not started at all
// and shutdown called // and shutdown called
if(shouldRun.get() == false) { if (shouldRun.get() == false) {
return; return;
} }
...@@ -433,8 +497,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -433,8 +497,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
if (consumer != null) { if (consumer != null) {
consumer.wakeup(); consumer.wakeup();
} }
super.awaitShutdown();
super.awaitShutdown();
LOG.info("<== HookConsumer shutdown()"); LOG.info("<== HookConsumer shutdown()");
} }
} }
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
public class AdaptiveWaiterTest {
private final int maxDuration = 100;
private final int minDuration = 5;
private final int increment = 5;
private NotificationHookConsumer.AdaptiveWaiter waiter;
@BeforeClass
public void setup() {
waiter = new NotificationHookConsumer.AdaptiveWaiter(minDuration, maxDuration, increment);
}
@Test
public void basicTest() {
for (int i = 0; i < 20; i++) {
waiter.pause(new IllegalStateException());
}
assertEquals(waiter.waitDuration, 95);
}
@Test
public void resetTest() {
final int someHighAttemptNumber = 30;
for (int i = 0; i < someHighAttemptNumber; i++) {
waiter.pause(new IllegalStateException());
}
assertEquals(waiter.waitDuration, maxDuration);
}
@Test
public void longPauseResets() {
waiter.pause(new IllegalStateException());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
waiter.pause(new IllegalArgumentException());
assertEquals(waiter.waitDuration, 5);
}
}
...@@ -251,4 +251,55 @@ public class NotificationHookConsumerTest { ...@@ -251,4 +251,55 @@ public class NotificationHookConsumerTest {
verify(notificationInterface).close(); verify(notificationInterface).close();
verify(executorService).shutdown(); verify(executorService).shutdown();
} }
@Test
public void consumersThrowsIllegalStateExceptionThreadUsesPauseRetryLogic() throws Exception {
final NotificationHookConsumer notificationHookConsumer = setupNotificationHookConsumer();
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
notificationHookConsumer.consumers.get(0).start();
Thread.sleep(1000);
return null;
}
}).when(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
notificationHookConsumer.startInternal(configuration, executorService);
Thread.sleep(1000);
assertTrue(notificationHookConsumer.consumers.get(0).isAlive());
notificationHookConsumer.consumers.get(0).shutdown();
}
@Test
public void consumersThrowsIllegalStateExceptionPauseRetryLogicIsInterrupted() throws Exception {
final NotificationHookConsumer notificationHookConsumer = setupNotificationHookConsumer();
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
notificationHookConsumer.consumers.get(0).start();
Thread.sleep(1000);
return null;
}
}).when(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
notificationHookConsumer.startInternal(configuration, executorService);
Thread.sleep(1000);
notificationHookConsumer.consumers.get(0).shutdown();
assertFalse(notificationHookConsumer.consumers.get(0).isAlive());
}
private NotificationHookConsumer setupNotificationHookConsumer() throws AtlasException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
List<NotificationConsumer<Object>> consumers = new ArrayList();
NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class);
when(notificationConsumerMock.receive()).thenThrow(new IllegalStateException());
consumers.add(notificationConsumerMock);
when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).thenReturn(consumers);
return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment