Commit bd0c5a8a by Ashutosh Mestry

ATLAS-2996: Conditionally Prevent Notification Processing. With support for HA mode.

parent 5307e498
...@@ -161,8 +161,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -161,8 +161,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
@Override @Override
public void start() throws AtlasException { public void start() throws AtlasException {
if (consumerDisabled) { if (consumerDisabled) {
LOG.info("Hook consumer stopped. No hook messages will be processed. " + LOG.info("No hook messages will be processed. {} = {}", CONSUMER_DISABLED, consumerDisabled);
"Set property '{}' to false to start consuming hook messages.", CONSUMER_DISABLED);
return; return;
} }
...@@ -205,6 +204,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -205,6 +204,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public void stop() { public void stop() {
//Allow for completion of outstanding work //Allow for completion of outstanding work
try { try {
if (consumerDisabled) {
return;
}
stopConsumerThreads(); stopConsumerThreads();
if (executors != null) { if (executors != null) {
executors.shutdown(); executors.shutdown();
...@@ -244,6 +247,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -244,6 +247,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
*/ */
@Override @Override
public void instanceIsActive() { public void instanceIsActive() {
if (consumerDisabled) {
return;
}
LOG.info("Reacting to active state: initializing Kafka consumers"); LOG.info("Reacting to active state: initializing Kafka consumers");
startConsumers(executors); startConsumers(executors);
...@@ -257,6 +264,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -257,6 +264,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
*/ */
@Override @Override
public void instanceIsPassive() { public void instanceIsPassive() {
if (consumerDisabled) {
return;
}
LOG.info("Reacting to passive state: shutting down Kafka consumers."); LOG.info("Reacting to passive state: shutting down Kafka consumers.");
stop(); stop();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment