From 5307e498a3ecef5d4d24ad7ad9518052b212914a Mon Sep 17 00:00:00 2001 From: Ashutosh Mestry <amestry@hortonworks.com> Date: Mon, 17 Dec 2018 14:31:23 -0800 Subject: [PATCH] ATLAS-2996: Conditionally Prevent Notification Processing --- webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 1cde3d0..b955948 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -104,13 +104,13 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval"; public static final String CONSUMER_MIN_RETRY_INTERVAL = "atlas.notification.consumer.min.retry.interval"; public static final String CONSUMER_MAX_RETRY_INTERVAL = "atlas.notification.consumer.max.retry.interval"; + public static final String CONSUMER_DISABLED = "atlas.notification.consumer.disabled"; public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633 = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633"; public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633.inputs.threshold"; public static final int SERVER_READY_WAIT_TIME_MS = 1000; - private final AtlasEntityStore atlasEntityStore; private final ServiceState serviceState; private final AtlasInstanceConverter instanceConverter; @@ -121,6 +121,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private final int maxWaitDuration; private final boolean skipHiveColumnLineageHive20633; private final int skipHiveColumnLineageHive20633InputsThreshold; + private final boolean consumerDisabled; private NotificationInterface notificationInterface; private ExecutorService executors; @@ -151,6 +152,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl skipHiveColumnLineageHive20633 = applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false); skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15 + consumerDisabled = applicationProperties.getBoolean(CONSUMER_DISABLED, false); LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633); LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold); @@ -158,6 +160,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl @Override public void start() throws AtlasException { + if (consumerDisabled) { + LOG.info("Hook consumer stopped. No hook messages will be processed. " + + "Set property '{}' to false to start consuming hook messages.", CONSUMER_DISABLED); + return; + } + startInternal(applicationProperties, null); } -- libgit2 0.27.1