Commit 5307e498 by Ashutosh Mestry

ATLAS-2996: Conditionally Prevent Notification Processing

parent 2068d7dd
......@@ -104,13 +104,13 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval";
public static final String CONSUMER_MIN_RETRY_INTERVAL = "atlas.notification.consumer.min.retry.interval";
public static final String CONSUMER_MAX_RETRY_INTERVAL = "atlas.notification.consumer.max.retry.interval";
public static final String CONSUMER_DISABLED = "atlas.notification.consumer.disabled";
public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633 = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633";
public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633.inputs.threshold";
public static final int SERVER_READY_WAIT_TIME_MS = 1000;
private final AtlasEntityStore atlasEntityStore;
private final ServiceState serviceState;
private final AtlasInstanceConverter instanceConverter;
......@@ -121,6 +121,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final int maxWaitDuration;
private final boolean skipHiveColumnLineageHive20633;
private final int skipHiveColumnLineageHive20633InputsThreshold;
private final boolean consumerDisabled;
private NotificationInterface notificationInterface;
private ExecutorService executors;
......@@ -151,6 +152,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
skipHiveColumnLineageHive20633 = applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15
consumerDisabled = applicationProperties.getBoolean(CONSUMER_DISABLED, false);
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold);
......@@ -158,6 +160,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
@Override
public void start() throws AtlasException {
if (consumerDisabled) {
LOG.info("Hook consumer stopped. No hook messages will be processed. " +
"Set property '{}' to false to start consuming hook messages.", CONSUMER_DISABLED);
return;
}
startInternal(applicationProperties, null);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment