Commit f408e93e by Suma Shivaprasad

ATLAS-1129 Remove notification failed logs on retry and add sleep between…

ATLAS-1129 Remove notification failed logs on retry and add sleep between retries (svimal2106 via sumasai)
parent ec94d2ad
...@@ -65,6 +65,9 @@ atlas.kafka.auto.commit.enable=false ...@@ -65,6 +65,9 @@ atlas.kafka.auto.commit.enable=false
atlas.notification.create.topics=true atlas.notification.create.topics=true
atlas.notification.replicas=1 atlas.notification.replicas=1
atlas.notification.topics=ATLAS_HOOK,ATLAS_ENTITIES atlas.notification.topics=ATLAS_HOOK,ATLAS_ENTITIES
atlas.notification.log.failed.messages=true
atlas.notification.consumer.retry.interval=500
atlas.notification.hook.retry.interval=1000
# Enable for Kerberized Kafka clusters # Enable for Kerberized Kafka clusters
#atlas.notification.kafka.service.principal=kafka/_HOST@EXAMPLE.COM #atlas.notification.kafka.service.principal=kafka/_HOST@EXAMPLE.COM
#atlas.notification.kafka.keytab.location=/etc/security/keytabs/kafka.service.keytab #atlas.notification.kafka.keytab.location=/etc/security/keytabs/kafka.service.keytab
......
...@@ -54,6 +54,8 @@ public abstract class AtlasHook { ...@@ -54,6 +54,8 @@ public abstract class AtlasHook {
private static boolean logFailedMessages; private static boolean logFailedMessages;
private static FailedMessagesLogger failedMessagesLogger; private static FailedMessagesLogger failedMessagesLogger;
private static int notificationRetryInterval;
public static final String ATLAS_NOTIFICATION_RETRY_INTERVAL = "atlas.notification.hook.retry.interval";
public static final String ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY = public static final String ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY =
"atlas.notification.failed.messages.filename"; "atlas.notification.failed.messages.filename";
...@@ -76,6 +78,7 @@ public abstract class AtlasHook { ...@@ -76,6 +78,7 @@ public abstract class AtlasHook {
failedMessagesLogger.init(); failedMessagesLogger.init();
} }
notificationRetryInterval = atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);
Injector injector = Guice.createInjector(new NotificationModule()); Injector injector = Guice.createInjector(new NotificationModule());
notifInterface = injector.getInstance(NotificationInterface.class); notifInterface = injector.getInstance(NotificationInterface.class);
...@@ -128,7 +131,14 @@ public abstract class AtlasHook { ...@@ -128,7 +131,14 @@ public abstract class AtlasHook {
} catch (Exception e) { } catch (Exception e) {
numRetries++; numRetries++;
if (numRetries < maxRetries) { if (numRetries < maxRetries) {
LOG.info("Failed to notify atlas for entity {}. Retrying", message, e); LOG.error("Notification send retry failed");
try {
LOG.info("Sleeping for {} ms before retry", notificationRetryInterval);
Thread.sleep(notificationRetryInterval);
} catch (InterruptedException ie){
LOG.error("Notification hook thread sleep interrupted");
}
} else { } else {
if (shouldLogFailedMessages && e instanceof NotificationException) { if (shouldLogFailedMessages && e instanceof NotificationException) {
List<String> failedMessages = ((NotificationException) e).getFailedMessages(); List<String> failedMessages = ((NotificationException) e).getFailedMessages();
......
...@@ -241,7 +241,6 @@ public class KafkaNotification extends AbstractNotification implements Service { ...@@ -241,7 +241,6 @@ public class KafkaNotification extends AbstractNotification implements Service {
LOG.debug("Sent message for topic - {}, partition - {}, offset - {}", response.topic(), LOG.debug("Sent message for topic - {}, partition - {}, offset - {}", response.topic(),
response.partition(), response.offset()); response.partition(), response.offset());
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Could not send message - {}", context.getMessage(), e);
lastFailureException = e; lastFailureException = e;
failedMessages.add(context.getMessage()); failedMessages.add(context.getMessage());
} }
......
...@@ -10,6 +10,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements for al ...@@ -10,6 +10,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements for al
ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai) ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai)
ALL CHANGES: ALL CHANGES:
ATLAS-1129 Remove notification failed logs on retry and add sleep between retries (svimal2106 via sumasai)
ATLAS-1126 Fix NPE in getSchema calls (sumasai) ATLAS-1126 Fix NPE in getSchema calls (sumasai)
ATLAS-1125 Enable compression on hbase audit table (shwethags via sumasai) ATLAS-1125 Enable compression on hbase audit table (shwethags via sumasai)
ATLAS-1121 NPE while submitting topology in StormHook (ayubkhan via sumasai) ATLAS-1121 NPE while submitting topology in StormHook (ayubkhan via sumasai)
......
...@@ -54,11 +54,13 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -54,11 +54,13 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads"; public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries"; public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries";
public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize"; public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize";
public static final String CONSUMER_RETRY_INTERVAL="atlas.notification.consumer.retry.interval";
public static final int SERVER_READY_WAIT_TIME_MS = 1000; public static final int SERVER_READY_WAIT_TIME_MS = 1000;
private final LocalAtlasClient atlasClient; private final LocalAtlasClient atlasClient;
private final int maxRetries; private final int maxRetries;
private final int failedMsgCacheSize; private final int failedMsgCacheSize;
private final int consumerRetryInterval;
private NotificationInterface notificationInterface; private NotificationInterface notificationInterface;
private ExecutorService executors; private ExecutorService executors;
...@@ -74,6 +76,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -74,6 +76,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3); maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20); failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20);
consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
} }
...@@ -246,7 +249,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -246,7 +249,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
break; break;
} catch (Throwable e) { } catch (Throwable e) {
LOG.warn("Error handling message", e); LOG.warn("Error handling message" + e.getMessage());
try{
LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
Thread.sleep(consumerRetryInterval);
}catch (InterruptedException ie){
LOG.error("Notification consumer thread sleep interrupted");
}
if (numRetries == (maxRetries - 1)) { if (numRetries == (maxRetries - 1)) {
LOG.warn("Max retries exceeded for message {}", message, e); LOG.warn("Max retries exceeded for message {}", message, e);
failedMessages.add(message); failedMessages.add(message);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment