Commit 47d4d588 by Saqeeb Shaikh Committed by Madhan Neethiraj

ATLAS-3133: enhanced Atlas server to process notifications from multiple Kafka topics

parent d59c0d08
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package org.apache.atlas; package org.apache.atlas;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
/** /**
* Enum that encapsulated each property name and its default value. * Enum that encapsulated each property name and its default value.
...@@ -39,6 +40,9 @@ public enum AtlasConfiguration { ...@@ -39,6 +40,9 @@ public enum AtlasConfiguration {
NOTIFICATION_HOOK_TOPIC_NAME("atlas.notification.hook.topic.name", "ATLAS_HOOK"), NOTIFICATION_HOOK_TOPIC_NAME("atlas.notification.hook.topic.name", "ATLAS_HOOK"),
NOTIFICATION_ENTITIES_TOPIC_NAME("atlas.notification.entities.topic.name", "ATLAS_ENTITIES"), NOTIFICATION_ENTITIES_TOPIC_NAME("atlas.notification.entities.topic.name", "ATLAS_ENTITIES"),
NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES("atlas.notification.hook.consumer.topic.names", "ATLAS_HOOK"), // a comma separated list of topic names
NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES("atlas.notification.entities.consumer.topic.names", "ATLAS_ENTITIES"), // a comma separated list of topic names
NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES("atlas.notification.message.max.length.bytes", (1000 * 1000)), NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES("atlas.notification.message.max.length.bytes", (1000 * 1000)),
NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled", true), NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled", true),
NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds", 15 * 60), NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds", 15 * 60),
...@@ -84,6 +88,28 @@ public enum AtlasConfiguration { ...@@ -84,6 +88,28 @@ public enum AtlasConfiguration {
return APPLICATION_PROPERTIES.getString(propertyName, defaultValue.toString()); return APPLICATION_PROPERTIES.getString(propertyName, defaultValue.toString());
} }
public String[] getStringArray() {
String[] ret = APPLICATION_PROPERTIES.getStringArray(propertyName);
if (ret == null || ret.length == 0 || (ret.length == 1 && StringUtils.isEmpty(ret[0]))) {
if (defaultValue != null) {
ret = StringUtils.split(defaultValue.toString(), ',');
}
}
return ret;
}
public String[] getStringArray(String... defaultValue) {
String[] ret = APPLICATION_PROPERTIES.getStringArray(propertyName);
if (ret == null || ret.length == 0 || (ret.length == 1 && StringUtils.isEmpty(ret[0]))) {
ret = defaultValue;
}
return ret;
}
public Object get() { public Object get() {
Object value = APPLICATION_PROPERTIES.getProperty(propertyName); Object value = APPLICATION_PROPERTIES.getProperty(propertyName);
return value == null ? defaultValue : value; return value == null ? defaultValue : value;
......
...@@ -55,8 +55,7 @@ public class AtlasMetrics { ...@@ -55,8 +55,7 @@ public class AtlasMetrics {
public static final String STAT_NOTIFY_FAILED_COUNT_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourFailed"; public static final String STAT_NOTIFY_FAILED_COUNT_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourFailed";
public static final String STAT_NOTIFY_START_TIME_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourStartTime"; public static final String STAT_NOTIFY_START_TIME_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourStartTime";
public static final String STAT_NOTIFY_LAST_MESSAGE_PROCESSED_TIME = PREFIX_NOTIFICATION + "lastMessageProcessedTime"; public static final String STAT_NOTIFY_LAST_MESSAGE_PROCESSED_TIME = PREFIX_NOTIFICATION + "lastMessageProcessedTime";
public static final String STAT_NOTIFY_START_OFFSET = PREFIX_NOTIFICATION + "offsetStart"; public static final String STAT_NOTIFY_TOPIC_OFFSETS = PREFIX_NOTIFICATION + "topicOffsets";
public static final String STAT_NOTIFY_CURRENT_OFFSET = PREFIX_NOTIFICATION + "offsetCurrent";
public static final String STAT_NOTIFY_COUNT_PREV_DAY = PREFIX_NOTIFICATION + "previousDay"; public static final String STAT_NOTIFY_COUNT_PREV_DAY = PREFIX_NOTIFICATION + "previousDay";
public static final String STAT_NOTIFY_AVG_TIME_PREV_DAY = PREFIX_NOTIFICATION + "previousDayAvgTime"; public static final String STAT_NOTIFY_AVG_TIME_PREV_DAY = PREFIX_NOTIFICATION + "previousDayAvgTime";
public static final String STAT_NOTIFY_CREATES_COUNT_PREV_DAY = PREFIX_NOTIFICATION + "previousDayEntityCreates"; public static final String STAT_NOTIFY_CREATES_COUNT_PREV_DAY = PREFIX_NOTIFICATION + "previousDayEntityCreates";
......
...@@ -88,7 +88,7 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> { ...@@ -88,7 +88,7 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
continue; continue;
} }
messages.add(new AtlasKafkaMessage(message, record.offset(), record.partition())); messages.add(new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition()));
} }
} }
......
...@@ -18,15 +18,17 @@ ...@@ -18,15 +18,17 @@
package org.apache.atlas.kafka; package org.apache.atlas.kafka;
import org.apache.kafka.common.TopicPartition;
public class AtlasKafkaMessage<T> { public class AtlasKafkaMessage<T> {
private final T message; private final T message;
private final long offset; private final long offset;
private final int partition; private final TopicPartition topicPartition;
public AtlasKafkaMessage(T message, long offset, int partition) { public AtlasKafkaMessage(T message, long offset, String topic, int partition) {
this.message = message; this.message = message;
this.offset = offset; this.offset = offset;
this.partition = partition; this.topicPartition = new TopicPartition(topic, partition);
} }
public T getMessage() { public T getMessage() {
...@@ -37,8 +39,16 @@ public class AtlasKafkaMessage<T> { ...@@ -37,8 +39,16 @@ public class AtlasKafkaMessage<T> {
return offset; return offset;
} }
public TopicPartition getTopicPartition() {
return topicPartition;
}
public String getTopic() {
return topicPartition.topic();
}
public int getPartition() { public int getPartition() {
return partition; return topicPartition.partition();
} }
} }
...@@ -57,20 +57,30 @@ public class KafkaNotification extends AbstractNotification implements Service { ...@@ -57,20 +57,30 @@ public class KafkaNotification extends AbstractNotification implements Service {
public static final String ATLAS_ENTITIES_TOPIC = AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString(); public static final String ATLAS_ENTITIES_TOPIC = AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString();
protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id"; protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";
private static final String[] ATLAS_HOOK_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_HOOK_TOPIC);
private static final String[] ATLAS_ENTITIES_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_ENTITIES_TOPIC);
private static final String DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE = "This consumer has already been closed."; private static final String DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE = "This consumer has already been closed.";
private static final Map<NotificationType, String> TOPIC_MAP = new HashMap<NotificationType, String>() { private static final Map<NotificationType, String> PRODUCER_TOPIC_MAP = new HashMap<NotificationType, String>() {
{ {
put(NotificationType.HOOK, ATLAS_HOOK_TOPIC); put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);
put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC); put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC);
} }
}; };
private final Properties properties; private static final Map<NotificationType, String[]> CONSUMER_TOPICS_MAP = new HashMap<NotificationType, String[]>() {
private final Long pollTimeOutMs; {
private KafkaConsumer consumer; put(NotificationType.HOOK, trimAndPurge(ATLAS_HOOK_CONSUMER_TOPICS));
private KafkaProducer producer; put(NotificationType.ENTITIES, trimAndPurge(ATLAS_ENTITIES_CONSUMER_TOPICS));
private String consumerClosedErrorMsg; }
};
private final Properties properties;
private final Long pollTimeOutMs;
private final Map<NotificationType, List<KafkaConsumer>> consumers = new HashMap<>();
private final Map<NotificationType, KafkaProducer> producers = new HashMap<>();
private String consumerClosedErrorMsg;
// ----- Constructors ---------------------------------------------------- // ----- Constructors ----------------------------------------------------
...@@ -125,8 +135,8 @@ public class KafkaNotification extends AbstractNotification implements Service { ...@@ -125,8 +135,8 @@ public class KafkaNotification extends AbstractNotification implements Service {
} }
@VisibleForTesting @VisibleForTesting
String getTopicName(NotificationType notificationType) { String getProducerTopicName(NotificationType notificationType) {
return TOPIC_MAP.get(notificationType); return PRODUCER_TOPIC_MAP.get(notificationType);
} }
// ----- Service --------------------------------------------------------- // ----- Service ---------------------------------------------------------
...@@ -156,10 +166,43 @@ public class KafkaNotification extends AbstractNotification implements Service { ...@@ -156,10 +166,43 @@ public class KafkaNotification extends AbstractNotification implements Service {
public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers, boolean autoCommitEnabled) { public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers, boolean autoCommitEnabled) {
LOG.info("==> KafkaNotification.createConsumers(notificationType={}, numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, autoCommitEnabled); LOG.info("==> KafkaNotification.createConsumers(notificationType={}, numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, autoCommitEnabled);
Properties consumerProperties = getConsumerProperties(notificationType); String[] topics = CONSUMER_TOPICS_MAP.get(notificationType);
AtlasKafkaConsumer kafkaConsumer = new AtlasKafkaConsumer(notificationType, getKafkaConsumer(consumerProperties, notificationType, autoCommitEnabled), autoCommitEnabled, pollTimeOutMs);
if (numConsumers < topics.length) {
LOG.warn("consumers count {} is fewer than number of topics {}. Creating {} consumers, so that consumer count is equal to number of topics.", numConsumers, topics.length, topics.length);
numConsumers = topics.length;
} else if (numConsumers > topics.length) {
LOG.warn("consumers count {} is higher than number of topics {}. Creating {} consumers, so that consumer count is equal to number of topics", numConsumers, topics.length, topics.length);
numConsumers = topics.length;
}
List<KafkaConsumer> notificationConsumers = this.consumers.get(notificationType);
if (notificationConsumers == null) {
notificationConsumers = new ArrayList<>(numConsumers);
this.consumers.put(notificationType, notificationConsumers);
}
List<NotificationConsumer<T>> consumers = new ArrayList<>();
Properties consumerProperties = getConsumerProperties(notificationType);
consumerProperties.put("enable.auto.commit", autoCommitEnabled);
List<NotificationConsumer<T>> consumers = Collections.singletonList(kafkaConsumer); for (int i = 0; i < numConsumers; i++) {
KafkaConsumer existingConsumer = notificationConsumers.size() > i ? notificationConsumers.get(i) : null;
KafkaConsumer kafkaConsumer = getOrCreateKafkaConsumer(existingConsumer, consumerProperties, notificationType, i);
if (notificationConsumers.size() > i) {
notificationConsumers.set(i, kafkaConsumer);
} else {
notificationConsumers.add(kafkaConsumer);
}
consumers.add(new AtlasKafkaConsumer(notificationType, kafkaConsumer, autoCommitEnabled, pollTimeOutMs));
}
LOG.info("<== KafkaNotification.createConsumers(notificationType={}, numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, autoCommitEnabled); LOG.info("<== KafkaNotification.createConsumers(notificationType={}, numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, autoCommitEnabled);
...@@ -170,29 +213,33 @@ public class KafkaNotification extends AbstractNotification implements Service { ...@@ -170,29 +213,33 @@ public class KafkaNotification extends AbstractNotification implements Service {
public void close() { public void close() {
LOG.info("==> KafkaNotification.close()"); LOG.info("==> KafkaNotification.close()");
if (producer != null) { for (KafkaProducer producer : producers.values()) {
producer.close(); if (producer != null) {
try {
producer = null; producer.close();
} catch (Throwable t) {
LOG.error("failed to close Kafka producer. Ignoring", t);
}
}
} }
producers.clear();
LOG.info("<== KafkaNotification.close()"); LOG.info("<== KafkaNotification.close()");
} }
// ----- AbstractNotification -------------------------------------------- // ----- AbstractNotification --------------------------------------------
@Override @Override
public void sendInternal(NotificationType type, List<String> messages) throws NotificationException { public void sendInternal(NotificationType notificationType, List<String> messages) throws NotificationException {
if (producer == null) { KafkaProducer producer = getOrCreateProducer(notificationType);
createProducer();
}
sendInternalToProducer(producer, type, messages); sendInternalToProducer(producer, notificationType, messages);
} }
@VisibleForTesting @VisibleForTesting
void sendInternalToProducer(Producer p, NotificationType type, List<String> messages) throws NotificationException { void sendInternalToProducer(Producer p, NotificationType notificationType, List<String> messages) throws NotificationException {
String topic = TOPIC_MAP.get(type); String topic = PRODUCER_TOPIC_MAP.get(notificationType);
List<MessageContext> messageContexts = new ArrayList<>(); List<MessageContext> messageContexts = new ArrayList<>();
for (String message : messages) { for (String message : messages) {
...@@ -229,53 +276,82 @@ public class KafkaNotification extends AbstractNotification implements Service { ...@@ -229,53 +276,82 @@ public class KafkaNotification extends AbstractNotification implements Service {
} }
} }
// Get properties for consumer request
@VisibleForTesting
public Properties getConsumerProperties(NotificationType notificationType) {
// find the configured group id for the given notification type
String groupId = properties.getProperty(notificationType.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY);
public KafkaConsumer getKafkaConsumer(Properties consumerProperties, NotificationType type, boolean autoCommitEnabled) { if (StringUtils.isEmpty(groupId)) {
if (consumer == null || !isKafkaConsumerOpen(consumer)) { throw new IllegalStateException("No configuration group id set for the notification type " + notificationType);
try { }
String topic = TOPIC_MAP.get(type);
consumerProperties.put("enable.auto.commit", autoCommitEnabled); Properties consumerProperties = new Properties();
consumerProperties.putAll(properties);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
this.consumer = new KafkaConsumer(consumerProperties); return consumerProperties;
}
@VisibleForTesting
public KafkaConsumer getOrCreateKafkaConsumer(KafkaConsumer existingConsumer, Properties consumerProperties, NotificationType notificationType, int idxConsumer) {
KafkaConsumer ret = existingConsumer;
this.consumer.subscribe(Arrays.asList(topic)); try {
} catch (Exception ee) { if (ret == null || !isKafkaConsumerOpen(ret)) {
LOG.error("Exception in getKafkaConsumer ", ee); String[] topics = CONSUMER_TOPICS_MAP.get(notificationType);
String topic = topics[idxConsumer % topics.length];
LOG.debug("Creating new KafkaConsumer for topic : {}, index : {}", topic, idxConsumer);
ret = new KafkaConsumer(consumerProperties);
ret.subscribe(Arrays.asList(topic));
} }
} catch (Exception ee) {
LOG.error("Exception in getKafkaConsumer ", ee);
} }
return this.consumer; return ret;
} }
private KafkaProducer getOrCreateProducer(NotificationType notificationType) {
LOG.debug("==> KafkaNotification.getOrCreateProducer()");
@VisibleForTesting KafkaProducer ret = producers.get(notificationType);
public
// Get properties for consumer request
Properties getConsumerProperties(NotificationType type) {
// find the configured group id for the given notification type
String groupId = properties.getProperty(type.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY);
if (StringUtils.isEmpty(groupId)) { if (ret == null) {
throw new IllegalStateException("No configuration group id set for the notification type " + type); synchronized (this) {
} ret = producers.get(notificationType);
Properties consumerProperties = new Properties(); if (ret == null) {
ret = new KafkaProducer(properties);
consumerProperties.putAll(properties); producers.put(notificationType, ret);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); }
}
}
return consumerProperties; LOG.debug("<== KafkaNotification.getOrCreateProducer()");
return ret;
} }
private synchronized void createProducer() { public static String[] trimAndPurge(String[] strings) {
LOG.info("==> KafkaNotification.createProducer()"); List<String> ret = new ArrayList<>();
if (producer == null) { if (strings != null) {
producer = new KafkaProducer(properties); for (int i = 0; i < strings.length; i++) {
String str = StringUtils.trim(strings[i]);
if (StringUtils.isNotEmpty(str)) {
ret.add(str);
}
}
} }
LOG.info("<== KafkaNotification.createProducer()"); return ret.toArray(new String[ret.size()]);
} }
private class MessageContext { private class MessageContext {
......
...@@ -54,7 +54,8 @@ import static org.testng.Assert.*; ...@@ -54,7 +54,8 @@ import static org.testng.Assert.*;
public class KafkaConsumerTest { public class KafkaConsumerTest {
private static final String TRAIT_NAME = "MyTrait"; private static final String TRAIT_NAME = "MyTrait";
private final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString(); private static final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
private static final String[] ATLAS_HOOK_CONSUMER_TOPICS = KafkaNotification.trimAndPurge(AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_HOOK_TOPIC));
@Mock @Mock
...@@ -67,11 +68,25 @@ public class KafkaConsumerTest { ...@@ -67,11 +68,25 @@ public class KafkaConsumerTest {
@Test @Test
public void testReceive() throws Exception { public void testReceive() throws Exception {
Referenceable entity = getEntity(TRAIT_NAME); for (String topic : ATLAS_HOOK_CONSUMER_TOPICS) {
EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); String traitName = TRAIT_NAME + "_" + topic;
Referenceable entity = getEntity(traitName);
EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
List<AtlasKafkaMessage<HookNotification>> messageList = testReceiveHelper(message, topic);
assertTrue(messageList.size() > 0);
HookNotification consumedMessage = messageList.get(0).getMessage();
assertMessagesEqual(message, consumedMessage, entity);
}
}
private List<AtlasKafkaMessage<HookNotification>> testReceiveHelper(EntityUpdateRequest message, String topic) throws Exception {
String json = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), message)); String json = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), message));
TopicPartition tp = new TopicPartition(ATLAS_HOOK_TOPIC, 0); TopicPartition tp = new TopicPartition(topic, 0);
List<ConsumerRecord<String, String>> klist = Collections.singletonList(new ConsumerRecord<>(ATLAS_HOOK_TOPIC, 0, 0L, "mykey", json)); List<ConsumerRecord<String, String>> klist = Collections.singletonList(new ConsumerRecord<>(topic, 0, 0L, "mykey", json));
Map mp = Collections.singletonMap(tp, klist); Map mp = Collections.singletonMap(tp, klist);
ConsumerRecords records = new ConsumerRecords(mp); ConsumerRecords records = new ConsumerRecords(mp);
...@@ -81,12 +96,7 @@ public class KafkaConsumerTest { ...@@ -81,12 +96,7 @@ public class KafkaConsumerTest {
AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer, false, 100L); AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer, false, 100L);
List<AtlasKafkaMessage<HookNotification>> messageList = consumer.receive(); List<AtlasKafkaMessage<HookNotification>> messageList = consumer.receive();
return messageList;
assertTrue(messageList.size() > 0);
HookNotification consumedMessage = messageList.get(0).getMessage();
assertMessagesEqual(message, consumedMessage, entity);
} }
@Test @Test
......
...@@ -74,7 +74,7 @@ public class KafkaNotificationMockTest { ...@@ -74,7 +74,7 @@ public class KafkaNotificationMockTest {
KafkaNotification kafkaNotification = new KafkaNotification(configProperties); KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
Producer producer = mock(Producer.class); Producer producer = mock(Producer.class);
String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK); String topicName = kafkaNotification.getProducerTopicName(NotificationInterface.NotificationType.HOOK);
String message = "This is a test message"; String message = "This is a test message";
Future returnValue = mock(Future.class); Future returnValue = mock(Future.class);
TopicPartition topicPartition = new TopicPartition(topicName, 0); TopicPartition topicPartition = new TopicPartition(topicName, 0);
...@@ -96,7 +96,7 @@ public class KafkaNotificationMockTest { ...@@ -96,7 +96,7 @@ public class KafkaNotificationMockTest {
KafkaNotification kafkaNotification = new KafkaNotification(configProperties); KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
Producer producer = mock(Producer.class); Producer producer = mock(Producer.class);
String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK); String topicName = kafkaNotification.getProducerTopicName(NotificationInterface.NotificationType.HOOK);
String message = "This is a test message"; String message = "This is a test message";
Future returnValue = mock(Future.class); Future returnValue = mock(Future.class);
when(returnValue.get()).thenThrow(new RuntimeException("Simulating exception")); when(returnValue.get()).thenThrow(new RuntimeException("Simulating exception"));
...@@ -121,7 +121,7 @@ public class KafkaNotificationMockTest { ...@@ -121,7 +121,7 @@ public class KafkaNotificationMockTest {
KafkaNotification kafkaNotification = new KafkaNotification(configProperties); KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
Producer producer = mock(Producer.class); Producer producer = mock(Producer.class);
String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK); String topicName = kafkaNotification.getProducerTopicName(NotificationInterface.NotificationType.HOOK);
String message1 = "This is a test message1"; String message1 = "This is a test message1";
String message2 = "This is a test message2"; String message2 = "This is a test message2";
Future returnValue1 = mock(Future.class); Future returnValue1 = mock(Future.class);
......
...@@ -183,6 +183,8 @@ public class AbstractNotificationConsumerTest { ...@@ -183,6 +183,8 @@ public class AbstractNotificationConsumerTest {
} }
private static class TestNotificationConsumer extends AbstractNotificationConsumer<TestMessage> { private static class TestNotificationConsumer extends AbstractNotificationConsumer<TestMessage> {
private static final String TEST_TOPIC_NAME = "TEST_TOPIC";
private final List<TestMessage> messageList; private final List<TestMessage> messageList;
private int index = 0; private int index = 0;
...@@ -217,7 +219,7 @@ public class AbstractNotificationConsumerTest { ...@@ -217,7 +219,7 @@ public class AbstractNotificationConsumerTest {
public List<AtlasKafkaMessage<TestMessage>> receive(long timeoutMilliSeconds) { public List<AtlasKafkaMessage<TestMessage>> receive(long timeoutMilliSeconds) {
List<AtlasKafkaMessage<TestMessage>> tempMessageList = new ArrayList(); List<AtlasKafkaMessage<TestMessage>> tempMessageList = new ArrayList();
for(Object json : messageList) { for(Object json : messageList) {
tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String) json), -1, -1)); tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String) json), -1, TEST_TOPIC_NAME, -1));
} }
return tempMessageList; return tempMessageList;
} }
......
...@@ -24,6 +24,7 @@ import java.time.Instant; ...@@ -24,6 +24,7 @@ import java.time.Instant;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.LocalTime; import java.time.LocalTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.atlas.util.AtlasMetricsCounter.Period.*; import static org.apache.atlas.util.AtlasMetricsCounter.Period.*;
...@@ -87,10 +88,10 @@ public class AtlasMetricsCounter { ...@@ -87,10 +88,10 @@ public class AtlasMetricsCounter {
} }
} }
public Stats report() { public StatsReport report() {
updateForTime(clock.instant()); updateForTime(clock.instant());
return new Stats(stats, dayStartTime.toEpochMilli(), hourStartTime.toEpochMilli()); return new StatsReport(stats, dayStartTime.toEpochMilli(), hourStartTime.toEpochMilli());
} }
// visible only for testing // visible only for testing
...@@ -179,16 +180,15 @@ public class AtlasMetricsCounter { ...@@ -179,16 +180,15 @@ public class AtlasMetricsCounter {
return LocalDateTime.of(time.toLocalDate().plusDays(1), LocalTime.MIN).toInstant(ZoneOffset.UTC); return LocalDateTime.of(time.toLocalDate().plusDays(1), LocalTime.MIN).toInstant(ZoneOffset.UTC);
} }
public static class Stats { public static class Stats {
private static final int NUM_PERIOD = Period.values().length; private static final int NUM_PERIOD = Period.values().length;
private final long dayStartTimeMs; private final long dayStartTimeMs;
private final long hourStartTimeMs; private final long hourStartTimeMs;
private final long[] count = new long[NUM_PERIOD]; private final AtomicLong[] count = new AtomicLong[NUM_PERIOD];
private final long[] measureSum = new long[NUM_PERIOD]; private final AtomicLong[] measureSum = new AtomicLong[NUM_PERIOD];
private final long[] measureMin = new long[NUM_PERIOD]; private final AtomicLong[] measureMin = new AtomicLong[NUM_PERIOD];
private final long[] measureMax = new long[NUM_PERIOD]; private final AtomicLong[] measureMax = new AtomicLong[NUM_PERIOD];
public Stats() { public Stats() {
...@@ -200,7 +200,57 @@ public class AtlasMetricsCounter { ...@@ -200,7 +200,57 @@ public class AtlasMetricsCounter {
} }
} }
public Stats(Stats other, long dayStartTimeMs, long hourStartTimeMs) { public void addCount(Period period, long num) {
count[period.ordinal()].addAndGet(num);
}
public void addMeasure(Period period, long measure) {
int idx = period.ordinal();
measureSum[idx].addAndGet(measure);
if (measureMin[idx].get() > measure) {
measureMin[idx].set(measure);
}
if (measureMax[idx].get() < measure) {
measureMax[idx].set(measure);
}
}
private void copy(Period src, Period dest) {
int srcIdx = src.ordinal();
int destIdx = dest.ordinal();
count[destIdx].set(count[srcIdx].get());
measureSum[destIdx].set(measureSum[srcIdx].get());
measureMin[destIdx].set(measureMin[srcIdx].get());
measureMax[destIdx].set( measureMax[srcIdx].get());
}
private void reset(Period period) {
int idx = period.ordinal();
count[idx] = new AtomicLong(0);
measureSum[idx] = new AtomicLong(0);
measureMin[idx] = new AtomicLong(Long.MAX_VALUE);
measureMax[idx] = new AtomicLong(Long.MIN_VALUE);
}
}
public static class StatsReport {
private static final int NUM_PERIOD = Period.values().length;
private final long dayStartTimeMs;
private final long hourStartTimeMs;
private final long[] count = new long[NUM_PERIOD];
private final long[] measureSum = new long[NUM_PERIOD];
private final long[] measureMin = new long[NUM_PERIOD];
private final long[] measureMax = new long[NUM_PERIOD];
public StatsReport(Stats other, long dayStartTimeMs, long hourStartTimeMs) {
this.dayStartTimeMs = dayStartTimeMs; this.dayStartTimeMs = dayStartTimeMs;
this.hourStartTimeMs = hourStartTimeMs; this.hourStartTimeMs = hourStartTimeMs;
...@@ -229,46 +279,9 @@ public class AtlasMetricsCounter { ...@@ -229,46 +279,9 @@ public class AtlasMetricsCounter {
return c != 0 ? (measureSum[idx] / c) : 0; return c != 0 ? (measureSum[idx] / c) : 0;
} }
public void addCount(Period period, long num) { private void copy(AtomicLong[] src, long[] dest) {
count[period.ordinal()] += num;
}
public void addMeasure(Period period, long measure) {
int idx = period.ordinal();
measureSum[idx] += measure;
if (measureMin[idx] > measure) {
measureMin[idx] = measure;
}
if (measureMax[idx] < measure) {
measureMax[idx] = measure;
}
}
private void copy(Period src, Period dest) {
int srcIdx = src.ordinal();
int destIdx = dest.ordinal();
count[destIdx] = count[srcIdx];
measureSum[destIdx] = measureSum[srcIdx];
measureMin[destIdx] = measureMin[srcIdx];
measureMax[destIdx] = measureMax[srcIdx];
}
private void reset(Period period) {
int idx = period.ordinal();
count[idx] = 0;
measureSum[idx] = 0;
measureMin[idx] = Long.MAX_VALUE;
measureMax[idx] = Long.MIN_VALUE;
}
private void copy(long[] src, long[] dest) {
for (int i = 0; i < dest.length; i++) { for (int i = 0; i < dest.length; i++) {
dest[i] = src[i]; dest[i] = src[i].get();
} }
} }
} }
......
...@@ -21,7 +21,7 @@ import org.apache.atlas.model.instance.EntityMutationResponse; ...@@ -21,7 +21,7 @@ import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.util.AtlasMetricsCounter.Stats; import org.apache.atlas.util.AtlasMetricsCounter.StatsReport;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -50,16 +50,15 @@ public class AtlasMetricsUtil { ...@@ -50,16 +50,15 @@ public class AtlasMetricsUtil {
private static final String STATUS_CONNECTED = "connected"; private static final String STATUS_CONNECTED = "connected";
private static final String STATUS_NOT_CONNECTED = "not-connected"; private static final String STATUS_NOT_CONNECTED = "not-connected";
private final AtlasGraph graph; private final AtlasGraph graph;
private long serverStartTime = 0; private long serverStartTime = 0;
private long serverActiveTime = 0; private long serverActiveTime = 0;
private long msgOffsetStart = -1; private final Map<String, TopicStats> topicStats = new HashMap<>();
private long msgOffsetCurrent = 0; private final AtlasMetricsCounter messagesProcessed = new AtlasMetricsCounter("messagesProcessed");
private final AtlasMetricsCounter messagesProcessed = new AtlasMetricsCounter("messagesProcessed"); private final AtlasMetricsCounter messagesFailed = new AtlasMetricsCounter("messagesFailed");
private final AtlasMetricsCounter messagesFailed = new AtlasMetricsCounter("messagesFailed"); private final AtlasMetricsCounter entityCreates = new AtlasMetricsCounter("entityCreates");
private final AtlasMetricsCounter entityCreates = new AtlasMetricsCounter("entityCreates"); private final AtlasMetricsCounter entityUpdates = new AtlasMetricsCounter("entityUpdates");
private final AtlasMetricsCounter entityUpdates = new AtlasMetricsCounter("entityUpdates"); private final AtlasMetricsCounter entityDeletes = new AtlasMetricsCounter("entityDeletes");
private final AtlasMetricsCounter entityDeletes = new AtlasMetricsCounter("entityDeletes");
@Inject @Inject
public AtlasMetricsUtil(AtlasGraph graph) { public AtlasMetricsUtil(AtlasGraph graph) {
...@@ -83,7 +82,7 @@ public class AtlasMetricsUtil { ...@@ -83,7 +82,7 @@ public class AtlasMetricsUtil {
serverActiveTime = System.currentTimeMillis(); serverActiveTime = System.currentTimeMillis();
} }
public void onNotificationProcessingComplete(long msgOffset, NotificationStat stats) { public void onNotificationProcessingComplete(String topicName, int partition, long msgOffset, NotificationStat stats) {
messagesProcessed.incrWithMeasure(stats.timeTakenMs); messagesProcessed.incrWithMeasure(stats.timeTakenMs);
entityCreates.incrBy(stats.entityCreates); entityCreates.incrBy(stats.entityCreates);
entityUpdates.incrBy(stats.entityUpdates); entityUpdates.incrBy(stats.entityUpdates);
...@@ -93,21 +92,33 @@ public class AtlasMetricsUtil { ...@@ -93,21 +92,33 @@ public class AtlasMetricsUtil {
messagesFailed.incr(); messagesFailed.incr();
} }
if (msgOffsetStart == -1) { TopicStats topicStat = topicStats.get(topicName);
msgOffsetStart = msgOffset;
if (topicStat == null) {
topicStat = new TopicStats(topicName);
topicStats.put(topicName, topicStat);
} }
msgOffsetCurrent = ++msgOffset; TopicPartitionStat partitionStat = topicStat.get(partition);
if (partitionStat == null) {
partitionStat = new TopicPartitionStat(topicName, partition, msgOffset, msgOffset);
topicStat.set(partition, partitionStat);
}
partitionStat.setCurrentOffset(msgOffset + 1);
} }
public Map<String, Object> getStats() { public Map<String, Object> getStats() {
Map<String, Object> ret = new HashMap<>(); Map<String, Object> ret = new HashMap<>();
Stats messagesProcessed = this.messagesProcessed.report(); StatsReport messagesProcessed = this.messagesProcessed.report();
Stats messagesFailed = this.messagesFailed.report(); StatsReport messagesFailed = this.messagesFailed.report();
Stats entityCreates = this.entityCreates.report(); StatsReport entityCreates = this.entityCreates.report();
Stats entityUpdates = this.entityUpdates.report(); StatsReport entityUpdates = this.entityUpdates.report();
Stats entityDeletes = this.entityDeletes.report(); StatsReport entityDeletes = this.entityDeletes.report();
ret.put(STAT_SERVER_START_TIMESTAMP, serverStartTime); ret.put(STAT_SERVER_START_TIMESTAMP, serverStartTime);
ret.put(STAT_SERVER_ACTIVE_TIMESTAMP, serverActiveTime); ret.put(STAT_SERVER_ACTIVE_TIMESTAMP, serverActiveTime);
...@@ -115,8 +126,20 @@ public class AtlasMetricsUtil { ...@@ -115,8 +126,20 @@ public class AtlasMetricsUtil {
ret.put(STAT_SERVER_STATUS_BACKEND_STORE, getBackendStoreStatus() ? STATUS_CONNECTED : STATUS_NOT_CONNECTED); ret.put(STAT_SERVER_STATUS_BACKEND_STORE, getBackendStoreStatus() ? STATUS_CONNECTED : STATUS_NOT_CONNECTED);
ret.put(STAT_SERVER_STATUS_INDEX_STORE, getIndexStoreStatus() ? STATUS_CONNECTED : STATUS_NOT_CONNECTED); ret.put(STAT_SERVER_STATUS_INDEX_STORE, getIndexStoreStatus() ? STATUS_CONNECTED : STATUS_NOT_CONNECTED);
ret.put(STAT_NOTIFY_START_OFFSET, msgOffsetStart); Map<String, Map<String, Long>> topicOffsets = new HashMap<>();
ret.put(STAT_NOTIFY_CURRENT_OFFSET, msgOffsetCurrent);
for (TopicStats tStat : topicStats.values()) {
for (TopicPartitionStat tpStat : tStat.partitionStats.values()) {
Map<String, Long> tpOffsets = new HashMap<>();
tpOffsets.put("offsetStart", tpStat.startOffset);
tpOffsets.put("offsetCurrent", tpStat.currentOffset);
topicOffsets.put(tpStat.topicName + "-" + tpStat.partition, tpOffsets);
}
}
ret.put(STAT_NOTIFY_TOPIC_OFFSETS, topicOffsets);
ret.put(STAT_NOTIFY_LAST_MESSAGE_PROCESSED_TIME, this.messagesProcessed.getLastIncrTime().toEpochMilli()); ret.put(STAT_NOTIFY_LAST_MESSAGE_PROCESSED_TIME, this.messagesProcessed.getLastIncrTime().toEpochMilli());
ret.put(STAT_NOTIFY_COUNT_TOTAL, messagesProcessed.getCount(ALL)); ret.put(STAT_NOTIFY_COUNT_TOTAL, messagesProcessed.getCount(ALL));
...@@ -297,4 +320,58 @@ public class AtlasMetricsUtil { ...@@ -297,4 +320,58 @@ public class AtlasMetricsUtil {
return collection != null ? collection.size() : 0; return collection != null ? collection.size() : 0;
} }
} }
class TopicStats {
private final String topicName;
private final Map<Integer, TopicPartitionStat> partitionStats = new HashMap<>();
public TopicStats(String topicName) {
this.topicName = topicName;
}
public String getTopicName() { return topicName; }
public Map<Integer, TopicPartitionStat> getPartitionStats() { return partitionStats; }
public TopicPartitionStat get(Integer partition) { return partitionStats.get(partition); }
public void set(Integer partition, TopicPartitionStat partitionStat) {
partitionStats.put(partition, partitionStat);
}
}
class TopicPartitionStat {
private final String topicName;
private final int partition;
private final long startOffset;
private long currentOffset;
public TopicPartitionStat(String topicName, int partition, long startOffset, long currentOffset) {
this.topicName = topicName;
this.partition = partition;
this.startOffset = startOffset;
this.currentOffset = currentOffset;
}
public String getTopicName() {
return topicName;
}
public int getPartition() {
return partition;
}
public long getStartOffset() {
return startOffset;
}
public long getCurrentOffset() {
return currentOffset;
}
public void setCurrentOffset(long currentOffset) {
this.currentOffset = currentOffset;
}
};
} }
...@@ -227,10 +227,10 @@ public class MetricsServiceTest { ...@@ -227,10 +227,10 @@ public class MetricsServiceTest {
private void processMessage(Instant instant) { private void processMessage(Instant instant) {
clock.setInstant(instant); clock.setInstant(instant);
metricsUtil.onNotificationProcessingComplete(++msgOffset, new AtlasMetricsUtil.NotificationStat(true, 1)); metricsUtil.onNotificationProcessingComplete("ATLAS_HOOK", 0, ++msgOffset, new AtlasMetricsUtil.NotificationStat(true, 1));
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
metricsUtil.onNotificationProcessingComplete(msgOffset++, new AtlasMetricsUtil.NotificationStat(false, 1)); metricsUtil.onNotificationProcessingComplete("ATLAS_HOOK", 0, msgOffset++, new AtlasMetricsUtil.NotificationStat(false, 1));
} }
clock.setInstant(null); clock.setInstant(null);
......
...@@ -23,7 +23,6 @@ import kafka.utils.ShutdownableThread; ...@@ -23,7 +23,6 @@ import kafka.utils.ShutdownableThread;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasClientV2; import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException; import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.RequestContext; import org.apache.atlas.RequestContext;
...@@ -118,7 +117,6 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -118,7 +117,6 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName(); private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName();
private static final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads"; public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries"; public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries";
...@@ -701,7 +699,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -701,7 +699,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
stats.timeTakenMs = System.currentTimeMillis() - startTime; stats.timeTakenMs = System.currentTimeMillis() - startTime;
metricsUtil.onNotificationProcessingComplete(kafkaMsg.getOffset(), stats); metricsUtil.onNotificationProcessingComplete(kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset(), stats);
if (stats.timeTakenMs > largeMessageProcessingTimeThresholdMs) { if (stats.timeTakenMs > largeMessageProcessingTimeThresholdMs) {
String strMessage = AbstractNotification.getMessageJson(message); String strMessage = AbstractNotification.getMessageJson(message);
...@@ -785,9 +783,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -785,9 +783,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
try { try {
recordFailedMessages(); recordFailedMessages();
TopicPartition partition = new TopicPartition(ATLAS_HOOK_TOPIC, kafkaMessage.getPartition()); consumer.commit(kafkaMessage.getTopicPartition(), kafkaMessage.getOffset() + 1);
consumer.commit(partition, kafkaMessage.getOffset() + 1);
commitSucceessStatus = true; commitSucceessStatus = true;
} finally { } finally {
failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus, kafkaMessage.getOffset()); failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus, kafkaMessage.getOffset());
......
...@@ -182,7 +182,10 @@ public class NotificationHookConsumerKafkaTest { ...@@ -182,7 +182,10 @@ public class NotificationHookConsumerKafkaTest {
ExceptionThrowingCommitConsumer createNewConsumerThatThrowsExceptionInCommit(KafkaNotification kafkaNotification, boolean autoCommitEnabled) { ExceptionThrowingCommitConsumer createNewConsumerThatThrowsExceptionInCommit(KafkaNotification kafkaNotification, boolean autoCommitEnabled) {
Properties prop = kafkaNotification.getConsumerProperties(NotificationInterface.NotificationType.HOOK); Properties prop = kafkaNotification.getConsumerProperties(NotificationInterface.NotificationType.HOOK);
KafkaConsumer consumer = kafkaNotification.getKafkaConsumer(prop, NotificationInterface.NotificationType.HOOK, true);
prop.put("enable.auto.commit", autoCommitEnabled);
KafkaConsumer consumer = kafkaNotification.getOrCreateKafkaConsumer(null, prop, NotificationInterface.NotificationType.HOOK, 0);
return new ExceptionThrowingCommitConsumer(NotificationInterface.NotificationType.HOOK, consumer, autoCommitEnabled, 1000); return new ExceptionThrowingCommitConsumer(NotificationInterface.NotificationType.HOOK, consumer, autoCommitEnabled, 1000);
} }
......
...@@ -22,6 +22,7 @@ import org.apache.atlas.AtlasServiceException; ...@@ -22,6 +22,7 @@ import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.kafka.AtlasKafkaMessage; import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.notification.HookNotification.HookNotificationType; import org.apache.atlas.model.notification.HookNotification.HookNotificationType;
...@@ -136,7 +137,7 @@ public class NotificationHookConsumerTest { ...@@ -136,7 +137,7 @@ public class NotificationHookConsumerTest {
when(message.getType()).thenReturn(HookNotificationType.ENTITY_CREATE); when(message.getType()).thenReturn(HookNotificationType.ENTITY_CREATE);
when(message.getEntities()).thenReturn(Arrays.asList(mock)); when(message.getEntities()).thenReturn(Arrays.asList(mock));
hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1)); hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1));
verify(consumer).commit(any(TopicPartition.class), anyInt()); verify(consumer).commit(any(TopicPartition.class), anyInt());
} }
...@@ -150,7 +151,7 @@ public class NotificationHookConsumerTest { ...@@ -150,7 +151,7 @@ public class NotificationHookConsumerTest {
when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenThrow(new RuntimeException("Simulating exception in processing message")); when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenThrow(new RuntimeException("Simulating exception in processing message"));
hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1)); hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1));
verifyZeroInteractions(consumer); verifyZeroInteractions(consumer);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment