Commit f29a2b7b by Ashutosh Mestry

ATLAS-2634: Avoid duplicate message processing.

parent 015b8bf3
......@@ -241,8 +241,10 @@ public class KafkaNotification extends AbstractNotification implements Service {
}
@VisibleForTesting
public
// Get properties for consumer request
private Properties getConsumerProperties(NotificationType type) {
Properties getConsumerProperties(NotificationType type) {
// find the configured group id for the given notification type
String groupId = properties.getProperty(type.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY);
......
......@@ -55,6 +55,7 @@ import org.apache.atlas.web.filters.AuditFilter;
import org.apache.atlas.web.filters.AuditFilter.AuditLog;
import org.apache.atlas.web.service.ServiceState;
import org.apache.commons.configuration.Configuration;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -297,10 +298,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final List<HookNotification> failedMessages = new ArrayList<>();
private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
@VisibleForTesting
final FailedCommitOffsetRecorder failedCommitOffsetRecorder;
public HookConsumer(NotificationConsumer<HookNotification> consumer) {
super("atlas-hook-consumer-thread", false);
this.consumer = consumer;
failedCommitOffsetRecorder = new FailedCommitOffsetRecorder();
}
@Override
......@@ -358,6 +363,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
try {
if(failedCommitOffsetRecorder.isMessageReplayed(kafkaMsg.getOffset())) {
commit(kafkaMsg);
return;
}
// Used for intermediate conversions during create and update
for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
if (LOG.isDebugEnabled()) {
......@@ -558,11 +568,17 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
private void commit(AtlasKafkaMessage<HookNotification> kafkaMessage) {
boolean commitSucceessStatus = false;
try {
recordFailedMessages();
TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition());
consumer.commit(partition, kafkaMessage.getOffset() + 1);
commitSucceessStatus = true;
} finally {
failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus, kafkaMessage.getOffset());
}
}
boolean serverAvailable(Timer timer) {
......@@ -612,4 +628,24 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
LOG.info("<== HookConsumer shutdown()");
}
}
static class FailedCommitOffsetRecorder {
private Long currentOffset;
public void recordIfFailed(boolean commitStatus, long offset) {
if(commitStatus) {
currentOffset = null;
} else {
currentOffset = offset;
}
}
public boolean isMessageReplayed(long offset) {
return currentOffset != null && currentOffset == offset;
}
public Long getCurrentOffset() {
return currentOffset;
}
}
}
......@@ -34,6 +34,9 @@ import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.web.service.ServiceState;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
......@@ -42,6 +45,7 @@ import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import java.util.List;
import java.util.Properties;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
......@@ -115,6 +119,38 @@ public class NotificationHookConsumerKafkaTest {
reset(atlasEntityStore);
}
@Test
public void consumerConsumesNewMessageButCommitThrowsAnException_MessageOffsetIsRecorded() throws AtlasException, InterruptedException, AtlasBaseException {
ExceptionThrowingCommitConsumer consumer = createNewConsumerThatThrowsExceptionInCommit(kafkaNotification, true);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
NotificationHookConsumer.FailedCommitOffsetRecorder failedCommitOffsetRecorder = hookConsumer.failedCommitOffsetRecorder;
produceMessage(new HookNotificationV1.EntityCreateRequest("test_user2", createEntity()));
try {
produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity()));
consumeOneMessage(consumer, hookConsumer);
consumeOneMessage(consumer, hookConsumer);
}
catch(KafkaException ex) {
assertTrue(true, "ExceptionThrowing consumer throws an excepion.");
}
assertTrue(failedCommitOffsetRecorder.getCurrentOffset() > -1);
consumer.disableCommitExpcetion();
produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity()));
consumeOneMessage(consumer, hookConsumer);
consumeOneMessage(consumer, hookConsumer);
assertNull(failedCommitOffsetRecorder.getCurrentOffset());
reset(atlasEntityStore);
}
@Test(dependsOnMethods = "testConsumerConsumesNewMessageWithAutoCommitDisabled")
public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() throws Exception {
produceMessage(new HookNotificationV1.EntityCreateRequest("test_user3", createEntity()));
......@@ -140,6 +176,12 @@ public class NotificationHookConsumerKafkaTest {
return (AtlasKafkaConsumer) kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1, autoCommitEnabled).get(0);
}
ExceptionThrowingCommitConsumer createNewConsumerThatThrowsExceptionInCommit(KafkaNotification kafkaNotification, boolean autoCommitEnabled) {
Properties prop = kafkaNotification.getConsumerProperties(NotificationInterface.NotificationType.HOOK);
KafkaConsumer consumer = kafkaNotification.getKafkaConsumer(prop, NotificationInterface.NotificationType.HOOK, true);
return new ExceptionThrowingCommitConsumer(NotificationInterface.NotificationType.HOOK, consumer, autoCommitEnabled, 1000);
}
void consumeOneMessage(NotificationConsumer<HookNotification> consumer,
NotificationHookConsumer.HookConsumer hookConsumer) throws InterruptedException {
try {
......@@ -205,4 +247,28 @@ public class NotificationHookConsumerKafkaTest {
}
}
private static class ExceptionThrowingCommitConsumer extends AtlasKafkaConsumer {
private boolean exceptionThrowingEnabled;
public ExceptionThrowingCommitConsumer(NotificationInterface.NotificationType notificationType,
KafkaConsumer kafkaConsumer, boolean autoCommitEnabled, long pollTimeoutMilliSeconds) {
super(notificationType, kafkaConsumer, autoCommitEnabled, pollTimeoutMilliSeconds);
exceptionThrowingEnabled = true;
}
@Override
public void commit(TopicPartition partition, long offset) {
if(exceptionThrowingEnabled) {
throw new KafkaException("test case verifying exception");
}
else {
super.commit(partition, offset);
}
}
public void disableCommitExpcetion() {
exceptionThrowingEnabled = false;
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment