Commit fff94633 by Sarath Subramanian

ATLAS-2751: Atlas is not consuming messages from ATLAS_HOOK topic after…

ATLAS-2751: Atlas is not consuming messages from ATLAS_HOOK topic after recovering from zookeeper connection timeout.
parent ce5ffeb7
......@@ -56,6 +56,8 @@ public class KafkaNotification extends AbstractNotification implements Service {
public static final String ATLAS_ENTITIES_TOPIC = "ATLAS_ENTITIES";
protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";
private static final String DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE = "This consumer has already been closed.";
private static final Map<NotificationType, String> TOPIC_MAP = new HashMap<NotificationType, String>() {
{
put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);
......@@ -67,6 +69,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
private final Long pollTimeOutMs;
private KafkaConsumer consumer;
private KafkaProducer producer;
private String consumerClosedErrorMsg;
// ----- Constructors ----------------------------------------------------
......@@ -85,8 +88,9 @@ public class KafkaNotification extends AbstractNotification implements Service {
Configuration kafkaConf = ApplicationProperties.getSubsetConfiguration(applicationProperties, PROPERTY_PREFIX);
properties = ConfigurationConverter.getProperties(kafkaConf);
pollTimeOutMs = kafkaConf.getLong("poll.timeout.ms", 1000);
properties = ConfigurationConverter.getProperties(kafkaConf);
pollTimeOutMs = kafkaConf.getLong("poll.timeout.ms", 1000);
consumerClosedErrorMsg = kafkaConf.getString("error.message.consumer_closed", DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE);
//Override default configs
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
......@@ -223,7 +227,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
public KafkaConsumer getKafkaConsumer(Properties consumerProperties, NotificationType type, boolean autoCommitEnabled) {
if(this.consumer == null) {
if (consumer == null || !isKafkaConsumerOpen(consumer)) {
try {
String topic = TOPIC_MAP.get(type);
......@@ -287,4 +291,19 @@ public class KafkaNotification extends AbstractNotification implements Service {
return message;
}
}
// kafka-client doesn't have method to check if consumer is open, hence checking list topics and catching exception
private boolean isKafkaConsumerOpen(KafkaConsumer consumer) {
boolean ret = true;
try {
consumer.listTopics();
} catch (IllegalStateException ex) {
if (ex.getMessage().equalsIgnoreCase(consumerClosedErrorMsg)) {
ret = false;
}
}
return ret;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment