Commit 0267eecd by nixonrodrigues Committed by Madhan Neethiraj

ATLAS-1944: updated handling of shutdown in KafkaConsumer

Change-Id: I07cbe1955cd08005660f5189f30f0690809ce1b1 Signed-off-by: 's avatarMadhan Neethiraj <madhan@apache.org>
parent b59460ff
......@@ -96,4 +96,11 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
kafkaConsumer.close();
}
}
@Override
public void wakeup() {
if (kafkaConsumer != null) {
kafkaConsumer.wakeup();
}
}
}
......@@ -39,6 +39,8 @@ public interface NotificationConsumer<T> {
void close();
void wakeup();
/**
* Fetch data for the topics from Kafka
* @return List containing kafka message and partionId and offset.
......@@ -53,4 +55,5 @@ public interface NotificationConsumer<T> {
List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds);
}
......@@ -203,6 +203,11 @@ public class AbstractNotificationConsumerTest {
}
@Override
public void wakeup() {
}
@Override
public List<AtlasKafkaMessage<T>> receive() {
return receive(1000L);
}
......
......@@ -168,12 +168,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
private void stopConsumerThreads() {
LOG.info("==> stopConsumerThreads()");
if (consumers != null) {
for (HookConsumer consumer : consumers) {
consumer.stop();
consumer.shutdown();
}
consumers.clear();
}
LOG.info("<== stopConsumerThreads()");
}
/**
......@@ -218,21 +222,35 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
@Override
public void doWork() {
LOG.info("==> HookConsumer doWork()");
shouldRun.set(true);
if (!serverAvailable(new NotificationHookConsumer.Timer())) {
return;
}
try {
while (shouldRun.get()) {
try {
List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive();
for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) {
handleMessage(msg);
}
} catch (Throwable t) {
LOG.warn("Failure in NotificationHookConsumer", t);
} catch (Exception e) {
if (shouldRun.get()) {
LOG.warn("Exception in NotificationHookConsumer", e);
}
}
}
} finally {
if (consumer != null) {
LOG.info("closing NotificationConsumer");
consumer.close();
}
LOG.info("<== HookConsumer doWork()");
}
}
......@@ -369,7 +387,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private void commit(AtlasKafkaMessage<HookNotificationMessage> kafkaMessage) {
recordFailedMessages();
TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition());
consumer.commit(partition, kafkaMessage.getOffset());
consumer.commit(partition, kafkaMessage.getOffset() + 1);
}
boolean serverAvailable(Timer timer) {
......@@ -397,11 +415,18 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
@Override
public void shutdown() {
LOG.info("==> HookConsumer shutdown()");
super.initiateShutdown();
shouldRun.set(false);
consumer.close();
if (consumer != null) {
consumer.wakeup();
}
super.awaitShutdown();
LOG.info("<== HookConsumer shutdown()");
}
}
private void audit(String messageUser, String method, String path) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment