Commit 48e52249 by Madhan Neethiraj

ATLAS-2877: updated notification processing to wait only before retry

parent 3176d1a1
...@@ -294,7 +294,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -294,7 +294,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
class HookConsumer extends ShutdownableThread { class HookConsumer extends ShutdownableThread {
private final NotificationConsumer<HookNotification> consumer; private final NotificationConsumer<HookNotification> consumer;
private final AtomicBoolean shouldRun = new AtomicBoolean(false); private final AtomicBoolean shouldRun = new AtomicBoolean(false);
private final List<HookNotification> failedMessages = new ArrayList<>(); private final List<String> failedMessages = new ArrayList<>();
private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration); private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
@VisibleForTesting @VisibleForTesting
...@@ -523,26 +523,29 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -523,26 +523,29 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
} catch (Throwable e) { } catch (Throwable e) {
RequestContext.get().resetEntityGuidUpdates(); RequestContext.get().resetEntityGuidUpdates();
LOG.warn("Error handling message", e);
try {
LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
Thread.sleep(consumerRetryInterval);
} catch (InterruptedException ie) {
LOG.error("Notification consumer thread sleep interrupted");
}
if (numRetries == (maxRetries - 1)) { if (numRetries == (maxRetries - 1)) {
LOG.warn("Max retries exceeded for message {}", message, e); String strMessage = AbstractNotification.getMessageJson(message);
LOG.warn("Max retries exceeded for message {}", strMessage, e);
isFailedMsg = true; isFailedMsg = true;
failedMessages.add(message); failedMessages.add(strMessage);
if (failedMessages.size() >= failedMsgCacheSize) { if (failedMessages.size() >= failedMsgCacheSize) {
recordFailedMessages(); recordFailedMessages();
} }
return; return;
} else {
LOG.warn("Error handling message", e);
try {
LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
Thread.sleep(consumerRetryInterval);
} catch (InterruptedException ie) {
LOG.error("Notification consumer thread sleep interrupted");
}
} }
} finally { } finally {
RequestContext.clear(); RequestContext.clear();
...@@ -564,8 +567,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -564,8 +567,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private void recordFailedMessages() { private void recordFailedMessages() {
//logging failed messages //logging failed messages
for (HookNotification message : failedMessages) { for (String message : failedMessages) {
FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", AbstractNotification.getMessageJson(message)); FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", message);
} }
failedMessages.clear(); failedMessages.clear();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment