Commit 0e7f8ea4 by nixonrodrigues Committed by Madhan Neethiraj

ATLAS-1766: updated NotificationConsumer implementation to use new Kafka…

ATLAS-1766: updated NotificationConsumer implementation to use new Kafka Consumer API, to enable support for SASL_SSL protocol Signed-off-by: 's avatarMadhan Neethiraj <madhan@apache.org>
parent 5bb2dcbe
......@@ -74,9 +74,13 @@ atlas.kafka.zookeeper.session.timeout.ms=400
atlas.kafka.zookeeper.connection.timeout.ms=200
atlas.kafka.zookeeper.sync.time.ms=20
atlas.kafka.auto.commit.interval.ms=1000
atlas.kafka.auto.offset.reset=smallest
atlas.kafka.hook.group.id=atlas
atlas.kafka.auto.commit.enable=false
atlas.kafka.enable.auto.commit=false
atlas.kafka.auto.offset.reset=earliest
atlas.kafka.session.timeout.ms=30000
atlas.notification.create.topics=true
atlas.notification.replicas=1
atlas.notification.topics=ATLAS_HOOK,ATLAS_ENTITIES
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.kafka;
import org.apache.atlas.notification.AbstractNotificationConsumer;
import org.apache.atlas.notification.MessageDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
/**
* Kafka specific notification consumer.
*
* @param <T> the notification type returned by this consumer
*/
public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
private static final Logger LOG = LoggerFactory.getLogger(AtlasKafkaConsumer.class);
private final KafkaConsumer kafkaConsumer;
private final boolean autoCommitEnabled;
public AtlasKafkaConsumer(MessageDeserializer<T> deserializer, KafkaConsumer kafkaConsumer, boolean autoCommitEnabled) {
super(deserializer);
this.kafkaConsumer = kafkaConsumer;
this.autoCommitEnabled = autoCommitEnabled;
}
public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
List<AtlasKafkaMessage<T>> messages = new ArrayList();
ConsumerRecords<?, ?> records = kafkaConsumer.poll(timeoutMilliSeconds);
if (records != null) {
for (ConsumerRecord<?, ?> record : records) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received Message topic ={}, partition ={}, offset = {}, key = {}, value = {}",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
T message = deserializer.deserialize(record.value().toString());
messages.add(new AtlasKafkaMessage(message, record.offset(), record.partition()));
}
}
return messages;
}
@Override
public void commit(TopicPartition partition, long offset) {
if (!autoCommitEnabled) {
if (LOG.isDebugEnabled()) {
LOG.info(" commiting the offset ==>> " + offset);
}
kafkaConsumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset)));
}
}
@Override
public void close() {
if (kafkaConsumer != null) {
kafkaConsumer.close();
}
}
}
package org.apache.atlas.kafka;
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public class AtlasKafkaMessage<T> {
private final T message;
private final long offset;
private final int partition;
public AtlasKafkaMessage(T message, long offset, int partition) {
this.message = message;
this.offset = offset;
this.partition = partition;
}
public T getMessage() {
return message;
}
public long getOffset() {
return offset;
}
public int getPartition() {
return partition;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.kafka;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.atlas.notification.AbstractNotificationConsumer;
import org.apache.atlas.notification.MessageDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Kafka specific notification consumer.
*
* @param <T> the notification type returned by this consumer
*/
public class KafkaConsumer<T> extends AbstractNotificationConsumer<T> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
private final int consumerId;
private final ConsumerIterator iterator;
private final ConsumerConnector consumerConnector;
private final boolean autoCommitEnabled;
private long lastSeenOffset;
// ----- Constructors ----------------------------------------------------
/**
* Create a Kafka consumer.
* @param deserializer the message deserializer used for this consumer
* @param stream the underlying Kafka stream
* @param consumerId an id value for this consumer
* @param consumerConnector the {@link ConsumerConnector} which created the underlying Kafka stream
* @param autoCommitEnabled true if consumer does not need to commit offsets explicitly, false otherwise.
*/
public KafkaConsumer(MessageDeserializer<T> deserializer, KafkaStream<String, String> stream, int consumerId,
ConsumerConnector consumerConnector, boolean autoCommitEnabled) {
super(deserializer);
this.consumerConnector = consumerConnector;
this.lastSeenOffset = 0;
this.iterator = stream.iterator();
this.consumerId = consumerId;
this.autoCommitEnabled = autoCommitEnabled;
}
// ----- NotificationConsumer --------------------------------------------
@Override
public boolean hasNext() {
return iterator.hasNext();
}
// ----- AbstractNotificationConsumer ------------------------------------
@Override
public String getNext() {
MessageAndMetadata message = iterator.next();
LOG.debug("Read message: conumerId: {}, topic - {}, partition - {}, offset - {}, message - {}",
consumerId, message.topic(), message.partition(), message.offset(), message.message());
lastSeenOffset = message.offset();
return (String) message.message();
}
@Override
protected String peekMessage() {
MessageAndMetadata message = (MessageAndMetadata) iterator.peek();
return (String) message.message();
}
@Override
public void commit() {
if (autoCommitEnabled) {
LOG.debug("Auto commit is disabled, not committing.");
} else {
consumerConnector.commitOffsets();
LOG.debug("Committed offset: {}", lastSeenOffset);
}
}
@Override
public void close() {
consumerConnector.shutdown();
}
}
......@@ -18,25 +18,22 @@
package org.apache.atlas.kafka;
import com.google.common.annotations.VisibleForTesting;
import kafka.consumer.Consumer;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.Time;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.MessageDeserializer;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
......@@ -56,10 +53,11 @@ import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.Future;
......@@ -83,9 +81,8 @@ public class KafkaNotification extends AbstractNotification implements Service {
private KafkaServer kafkaServer;
private ServerCnxnFactory factory;
private Properties properties;
private KafkaConsumer consumer = null;
private KafkaProducer producer = null;
private List<ConsumerConnector> consumerConnectors = new ArrayList<>();
private static final Map<NotificationType, String> TOPIC_MAP = new HashMap<NotificationType, String>() {
{
......@@ -126,8 +123,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "roundrobin");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
@VisibleForTesting
......@@ -171,34 +167,18 @@ public class KafkaNotification extends AbstractNotification implements Service {
public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType,
int numConsumers) {
return createConsumers(notificationType, numConsumers,
Boolean.valueOf(properties.getProperty("auto.commit.enable", "true")));
Boolean.valueOf(properties.getProperty("enable.auto.commit", "true")));
}
@VisibleForTesting
public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType,
int numConsumers, boolean autoCommitEnabled) {
String topic = TOPIC_MAP.get(notificationType);
Properties consumerProperties = getConsumerProperties(notificationType);
List<NotificationConsumer<T>> consumers = new ArrayList<>(numConsumers);
for (int i = 0; i < numConsumers; i++) {
ConsumerConnector consumerConnector = createConsumerConnector(consumerProperties);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, 1);
StringDecoder decoder = new StringDecoder(null);
Map<String, List<KafkaStream<String, String>>> streamsMap =
consumerConnector.createMessageStreams(topicCountMap, decoder, decoder);
List<KafkaStream<String, String>> kafkaConsumers = streamsMap.get(topic);
for (KafkaStream stream : kafkaConsumers) {
KafkaConsumer<T> kafkaConsumer =
createKafkaConsumer(notificationType.getClassType(), notificationType.getDeserializer(),
stream, i, consumerConnector, autoCommitEnabled);
consumers.add(kafkaConsumer);
}
consumerConnectors.add(consumerConnector);
}
List<NotificationConsumer<T>> consumers = new ArrayList<>();
AtlasKafkaConsumer kafkaConsumer = new AtlasKafkaConsumer(notificationType.getDeserializer(), getKafkaConsumer(consumerProperties,notificationType, autoCommitEnabled), autoCommitEnabled);
consumers.add(kafkaConsumer);
return consumers;
}
......@@ -208,11 +188,6 @@ public class KafkaNotification extends AbstractNotification implements Service {
producer.close();
producer = null;
}
for (ConsumerConnector consumerConnector : consumerConnectors) {
consumerConnector.shutdown();
}
consumerConnectors.clear();
}
......@@ -254,43 +229,31 @@ public class KafkaNotification extends AbstractNotification implements Service {
}
}
// ----- helper methods --------------------------------------------------
/**
* Create a Kafka consumer connector from the given properties.
*
* @param consumerProperties the properties for creating the consumer connector
*
* @return a new Kafka consumer connector
*/
protected ConsumerConnector createConsumerConnector(Properties consumerProperties) {
return Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(consumerProperties));
}
public KafkaConsumer getKafkaConsumer(Properties consumerProperties, NotificationType type, boolean autoCommitEnabled) {
if(this.consumer == null) {
try {
String topic = TOPIC_MAP.get(type);
consumerProperties.put("enable.auto.commit", autoCommitEnabled);
this.consumer = new KafkaConsumer(consumerProperties);
this.consumer.subscribe(Arrays.asList(topic));
}catch (Exception ee) {
LOG.error("Exception in getKafkaConsumer ", ee);
}
}
/**
* Create a Kafka consumer from the given Kafka stream.
*
* @param type the notification type to be returned by the consumer
* @param deserializer the deserializer for the created consumers
* @param stream the Kafka stream
* @param consumerId the id for the new consumer
*
* @param consumerConnector
* @return a new Kafka consumer
*/
protected <T> org.apache.atlas.kafka.KafkaConsumer<T>
createKafkaConsumer(Class<T> type, MessageDeserializer<T> deserializer, KafkaStream stream,
int consumerId, ConsumerConnector consumerConnector, boolean autoCommitEnabled) {
return new org.apache.atlas.kafka.KafkaConsumer<>(deserializer, stream,
consumerId, consumerConnector, autoCommitEnabled);
return this.consumer;
}
// Get properties for consumer request
private Properties getConsumerProperties(NotificationType type) {
// find the configured group id for the given notification type
String groupId = properties.getProperty(type.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY);
if (groupId == null) {
String groupId = properties.getProperty(type.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY);
if (StringUtils.isEmpty(groupId)) {
throw new IllegalStateException("No configuration group id set for the notification type " + type);
}
......@@ -298,7 +261,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
consumerProperties.putAll(properties);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
LOG.info("Consumer property: auto.commit.enable: {}", consumerProperties.getProperty("auto.commit.enable"));
LOG.info("Consumer property: atlas.kafka.enable.auto.commit: {}", consumerProperties.getProperty("enable.auto.commit"));
return consumerProperties;
}
......
......@@ -128,7 +128,7 @@ public abstract class AbstractMessageDeserializer<T> extends VersionedMessageDes
/**
* Deserializer for JSONArray.
*/
protected static final class JSONArrayDeserializer implements JsonDeserializer<JSONArray> {
public static final class JSONArrayDeserializer implements JsonDeserializer<JSONArray> {
@Override
public JSONArray deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) {
......
......@@ -16,6 +16,7 @@
* limitations under the License.
*/
package org.apache.atlas.notification;
import org.apache.kafka.common.TopicPartition;
/**
* Abstract notification consumer.
......@@ -25,10 +26,9 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon
/**
* Deserializer used to deserialize notification messages for this consumer.
*/
private final MessageDeserializer<T> deserializer;
protected final MessageDeserializer<T> deserializer;
// ----- Constructors ----------------------------------------------------
/**
* Construct an AbstractNotificationConsumer.
......@@ -40,34 +40,6 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon
}
// ----- AbstractNotificationConsumer -------------------------------------
/**
* Get the next notification as a string.
*
* @return the next notification in string form
*/
protected abstract String getNext();
/**
* Get the next notification as a string without advancing.
*
* @return the next notification in string form
*/
protected abstract String peekMessage();
// ----- NotificationConsumer ---------------------------------------------
@Override
public T next() {
return deserializer.deserialize(getNext());
}
@Override
public T peek() {
return deserializer.deserialize(peekMessage());
}
public abstract void commit();
public abstract void commit(TopicPartition partition, long offset);
}
......@@ -17,32 +17,16 @@
*/
package org.apache.atlas.notification;
import java.util.List;
import org.apache.kafka.common.TopicPartition;
import org.apache.atlas.kafka.AtlasKafkaMessage;
/**
* Atlas notification consumer. This consumer blocks until a notification can be read.
*
* @param <T> the class type of notifications returned by this consumer
*/
public interface NotificationConsumer<T> {
/**
* Returns true when the consumer has more notifications. Blocks until a notification becomes available.
*
* @return true when the consumer has notifications to be read
*/
boolean hasNext();
/**
* Returns the next notification.
*
* @return the next notification
*/
T next();
/**
* Returns the next notification without advancing.
*
* @return the next notification
*/
T peek();
/**
* Commit the offset of messages that have been successfully processed.
......@@ -51,7 +35,14 @@ public interface NotificationConsumer<T> {
* the consumer is ready to handle the next message, which could happen even after a normal or an abnormal
* restart.
*/
void commit();
void commit(TopicPartition partition, long offset);
void close();
/**
* Fetch data for the topics from Kafka
* @param timeoutMilliSeconds poll timeout
* @return List containing kafka message and partionId and offset.
*/
List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds);
}
......@@ -18,13 +18,9 @@
package org.apache.atlas.kafka;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.MessageVersion;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.IncompatibleVersionException;
import org.apache.atlas.notification.VersionedMessage;
......@@ -33,6 +29,11 @@ import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.codehaus.jettison.json.JSONException;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
......@@ -42,7 +43,10 @@ import org.testng.annotations.Test;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
......@@ -57,8 +61,10 @@ public class KafkaConsumerTest {
private static final String TRAIT_NAME = "MyTrait";
@Mock
private ConsumerConnector consumerConnector;
private KafkaConsumer kafkaConsumer;
@BeforeMethod
public void setup() {
......@@ -66,9 +72,9 @@ public class KafkaConsumerTest {
}
@Test
public void testNext() throws Exception {
KafkaStream<String, String> stream = mock(KafkaStream.class);
ConsumerIterator<String, String> iterator = mock(ConsumerIterator.class);
public void testReceive() throws Exception {
MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class);
Referenceable entity = getEntity(TRAIT_NAME);
......@@ -78,29 +84,34 @@ public class KafkaConsumerTest {
String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), message));
when(stream.iterator()).thenReturn(iterator);
when(iterator.hasNext()).thenReturn(true).thenReturn(false);
when(iterator.next()).thenReturn(messageAndMetadata).thenThrow(new NoSuchElementException());
kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0)));
List<ConsumerRecord> klist = new ArrayList<>();
klist.add(new ConsumerRecord<String, String>("ATLAS_HOOK",
0, 0L, "mykey", json));
TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
Map mp = new HashMap();
mp.put(tp,klist);
ConsumerRecords records = new ConsumerRecords(mp);
when(kafkaConsumer.poll(1000)).thenReturn(records);
when(messageAndMetadata.message()).thenReturn(json);
NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
new KafkaConsumer<>(
NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99,
consumerConnector, false);
assertTrue(consumer.hasNext());
AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer,false);
List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive(1000);
assertTrue(messageList.size() > 0);
HookNotification.HookNotificationMessage consumedMessage = consumer.next();
HookNotification.HookNotificationMessage consumedMessage = messageList.get(0).getMessage();
assertMessagesEqual(message, consumedMessage, entity);
assertFalse(consumer.hasNext());
}
@Test
public void testNextVersionMismatch() throws Exception {
KafkaStream<String, String> stream = mock(KafkaStream.class);
ConsumerIterator<String, String> iterator = mock(ConsumerIterator.class);
MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class);
Referenceable entity = getEntity(TRAIT_NAME);
......@@ -110,84 +121,56 @@ public class KafkaConsumerTest {
String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), message));
when(stream.iterator()).thenReturn(iterator);
when(iterator.hasNext()).thenReturn(true).thenReturn(false);
when(iterator.next()).thenReturn(messageAndMetadata).thenThrow(new NoSuchElementException());
when(messageAndMetadata.message()).thenReturn(json);
kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0)));
List<ConsumerRecord> klist = new ArrayList<>();
klist.add(new ConsumerRecord<String, String>("ATLAS_HOOK",
0, 0L, "mykey", json));
NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
new KafkaConsumer<>(
NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99,
consumerConnector, false);
TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
Map mp = new HashMap();
mp.put(tp,klist);
ConsumerRecords records = new ConsumerRecords(mp);
assertTrue(consumer.hasNext());
when(kafkaConsumer.poll(1000)).thenReturn(records);
when(messageAndMetadata.message()).thenReturn(json);
AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer ,false);
try {
consumer.next();
List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive(1000);
assertTrue(messageList.size() > 0);
HookNotification.HookNotificationMessage consumedMessage = messageList.get(0).getMessage();
fail("Expected VersionMismatchException!");
} catch (IncompatibleVersionException e) {
e.printStackTrace();
}
assertFalse(consumer.hasNext());
}
@Test
public void testPeekMessage() throws Exception {
KafkaStream<String, String> stream = mock(KafkaStream.class);
ConsumerIterator<String, String> iterator = mock(ConsumerIterator.class);
MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class);
}
Referenceable entity = getEntity(TRAIT_NAME);
HookNotification.EntityUpdateRequest message =
new HookNotification.EntityUpdateRequest("user1", entity);
String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), message));
when(stream.iterator()).thenReturn(iterator);
when(iterator.hasNext()).thenReturn(true);
when(iterator.peek()).thenReturn(messageAndMetadata);
when(messageAndMetadata.message()).thenReturn(json);
NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
new KafkaConsumer<>(
NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99,
consumerConnector, false);
@Test
public void testCommitIsCalledIfAutoCommitDisabled() {
assertTrue(consumer.hasNext());
TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
HookNotification.HookNotificationMessage consumedMessage = consumer.peek();
AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, false);
assertMessagesEqual(message, consumedMessage, entity);
consumer.commit(tp, 1);
assertTrue(consumer.hasNext());
verify(kafkaConsumer).commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(1)));
}
@Test
public void testCommitIsCalledIfAutoCommitDisabled() {
KafkaStream<String, String> stream = mock(KafkaStream.class);
NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
new KafkaConsumer<>(
NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99,
consumerConnector, false);
consumer.commit();
public void testCommitIsNotCalledIfAutoCommitEnabled() {
verify(consumerConnector).commitOffsets();
}
TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
@Test
public void testCommitIsNotCalledIfAutoCommitEnabled() {
KafkaStream<String, String> stream = mock(KafkaStream.class);
NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
new KafkaConsumer<>(
NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99,
consumerConnector, true);
AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, true);
consumer.commit();
consumer.commit(tp, 1);
verify(consumerConnector, never()).commitOffsets();
verify(kafkaConsumer, never()).commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(1)));
}
private Referenceable getEntity(String traitName) {
......
......@@ -24,12 +24,13 @@ import org.apache.atlas.notification.MessageDeserializer;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.testng.annotations.Test;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
......@@ -37,7 +38,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.atlas.kafka.AtlasKafkaConsumer;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
......@@ -55,36 +56,24 @@ public class KafkaNotificationMockTest {
public void testCreateConsumers() throws Exception {
Properties properties = mock(Properties.class);
when(properties.getProperty("entities.group.id")).thenReturn("atlas");
final ConsumerConnector consumerConnector = mock(ConsumerConnector.class);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, 1);
Map<String, List<KafkaStream<String, String>>> kafkaStreamsMap =
new HashMap<>();
List<KafkaStream<String, String>> kafkaStreams = new ArrayList<>();
KafkaStream kafkaStream = mock(KafkaStream.class);
kafkaStreams.add(kafkaStream);
kafkaStreamsMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, kafkaStreams);
when(consumerConnector.createMessageStreams(
eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class))).thenReturn(kafkaStreamsMap);
final KafkaConsumer consumer1 = mock(KafkaConsumer.class);
final KafkaConsumer consumer2 = mock(KafkaConsumer.class);
final AtlasKafkaConsumer consumer1 = mock(AtlasKafkaConsumer.class);
final AtlasKafkaConsumer consumer2 = mock(AtlasKafkaConsumer.class);
KafkaNotification kafkaNotification =
new TestKafkaNotification(properties, consumerConnector, consumer1, consumer2);
new TestKafkaNotification(properties, consumer1, consumer2);
List<NotificationConsumer<String>> consumers =
List<NotificationConsumer<AtlasKafkaConsumer>> consumers =
kafkaNotification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 2);
verify(consumerConnector, times(2)).createMessageStreams(
eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class));
assertEquals(consumers.size(), 2);
assertTrue(consumers.contains(consumer1));
assertTrue(consumers.contains(consumer2));
}
@Test
@SuppressWarnings("unchecked")
public void shouldSendMessagesSuccessfully() throws NotificationException,
......@@ -164,27 +153,28 @@ public class KafkaNotificationMockTest {
class TestKafkaNotification extends KafkaNotification {
private final ConsumerConnector consumerConnector;
private final KafkaConsumer consumer1;
private final KafkaConsumer consumer2;
private final AtlasKafkaConsumer consumer1;
private final AtlasKafkaConsumer consumer2;
TestKafkaNotification(Properties properties, ConsumerConnector consumerConnector,
KafkaConsumer consumer1, KafkaConsumer consumer2) {
TestKafkaNotification(Properties properties,
AtlasKafkaConsumer consumer1, AtlasKafkaConsumer consumer2) {
super(properties);
this.consumerConnector = consumerConnector;
this.consumer1 = consumer1;
this.consumer2 = consumer2;
}
@Override
protected ConsumerConnector createConsumerConnector(Properties consumerProperties) {
return consumerConnector;
public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType,
int numConsumers) {
List consumerList = new ArrayList<NotificationConsumer>();
consumerList.add(consumer1);
consumerList.add(consumer2);
return consumerList;
}
@Override
protected <T> org.apache.atlas.kafka.KafkaConsumer<T>
createKafkaConsumer(Class<T> type, MessageDeserializer<T> deserializer, KafkaStream stream,
int consumerId, ConsumerConnector connector, boolean autoCommitEnabled) {
protected <T> AtlasKafkaConsumer<T>
createConsumers(Class<T> type, int consumerId, boolean autoCommitEnabled) {
if (consumerId == 0) {
return consumer1;
} else if (consumerId == 1) {
......
......@@ -28,6 +28,9 @@ import org.apache.commons.lang.RandomStringUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
import java.util.List;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
......@@ -52,7 +55,7 @@ public class KafkaNotificationTest {
}
@Test
public void testNext() throws Exception {
public void testReceiveKafkaMessages() throws Exception {
kafkaNotification.send(NotificationInterface.NotificationType.HOOK,
new HookNotification.EntityCreateRequest("u1", new Referenceable("type")));
kafkaNotification.send(NotificationInterface.NotificationType.HOOK,
......@@ -64,44 +67,21 @@ public class KafkaNotificationTest {
NotificationConsumer<Object> consumer =
kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
assertTrue(consumer.hasNext());
HookNotification.HookNotificationMessage message = (HookNotification.HookNotificationMessage) consumer.next();
assertEquals(message.getUser(), "u1");
assertTrue(consumer.hasNext());
message = (HookNotification.HookNotificationMessage) consumer.next();
assertEquals(message.getUser(), "u2");
consumer.close();
//nothing committed(even though u1 and u2 are read), now should restart from u1
consumer = kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
assertTrue(consumer.hasNext());
message = (HookNotification.HookNotificationMessage) consumer.next();
assertEquals(message.getUser(), "u1");
consumer.commit();
assertTrue(consumer.hasNext());
message = (HookNotification.HookNotificationMessage) consumer.next();
assertEquals(message.getUser(), "u2");
consumer.close();
//u1 committed, u2 read, should start from u2
consumer = kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
assertTrue(consumer.hasNext());
message = (HookNotification.HookNotificationMessage) consumer.next();
assertEquals(message.getUser(), "u2");
assertTrue(consumer.hasNext());
message = (HookNotification.HookNotificationMessage) consumer.next();
assertEquals(message.getUser(), "u3");
consumer.commit();
consumer.close();
List<AtlasKafkaMessage<Object>> messages = null ;
long startTime = System.currentTimeMillis(); //fetch starting time
while ((System.currentTimeMillis() - startTime) < 10000) {
messages = consumer.receive(1000L);
if (messages.size() > 0) {
break;
}
}
int i=1;
for (AtlasKafkaMessage<Object> msg : messages){
HookNotification.HookNotificationMessage message = (HookNotificationMessage) msg.getMessage();
assertEquals(message.getUser(), "u"+i++);
}
//u2, u3 read, but only u3 committed, should start from u4
consumer = kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
assertTrue(consumer.hasNext());
message = (HookNotification.HookNotificationMessage) consumer.next();
assertEquals(message.getUser(), "u4");
consumer.close();
}
}
......@@ -20,10 +20,12 @@ package org.apache.atlas.notification;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.slf4j.Logger;
import org.testng.annotations.Test;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
......@@ -35,6 +37,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import org.apache.kafka.common.TopicPartition;
/**
* AbstractNotificationConsumer tests.
......@@ -44,7 +47,7 @@ public class AbstractNotificationConsumerTest {
private static final Gson GSON = new Gson();
@Test
public void testNext() throws Exception {
public void testReceive() throws Exception {
Logger logger = mock(Logger.class);
TestMessage testMessage1 = new TestMessage("sValue1", 99);
......@@ -52,7 +55,7 @@ public class AbstractNotificationConsumerTest {
TestMessage testMessage3 = new TestMessage("sValue3", 97);
TestMessage testMessage4 = new TestMessage("sValue4", 96);
List<String> jsonList = new LinkedList<>();
List jsonList = new LinkedList<>();
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1)));
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage2)));
......@@ -62,25 +65,19 @@ public class AbstractNotificationConsumerTest {
Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
assertTrue(consumer.hasNext());
assertEquals(testMessage1, consumer.next());
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
assertTrue(consumer.hasNext());
List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(1000L);
assertEquals(testMessage2, consumer.next());
assertFalse(messageList.isEmpty());
assertTrue(consumer.hasNext());
assertEquals(testMessage1, messageList.get(0).getMessage());
assertEquals(testMessage3, consumer.next());
assertEquals(testMessage2, messageList.get(1).getMessage());
assertTrue(consumer.hasNext());
assertEquals(testMessage3, messageList.get(2).getMessage());
assertEquals(testMessage4, consumer.next());
assertFalse(consumer.hasNext());
assertEquals(testMessage4, messageList.get(3).getMessage());
}
@Test
......@@ -92,7 +89,7 @@ public class AbstractNotificationConsumerTest {
TestMessage testMessage3 = new TestMessage("sValue3", 97);
TestMessage testMessage4 = new TestMessage("sValue4", 96);
List<String> jsonList = new LinkedList<>();
List jsonList = new LinkedList<>();
String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1));
String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("0.0.5"), testMessage2));
......@@ -108,26 +105,17 @@ public class AbstractNotificationConsumerTest {
NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
assertTrue(consumer.hasNext());
assertEquals(new TestMessage("sValue1", 99), consumer.next());
assertTrue(consumer.hasNext());
assertEquals(new TestMessage("sValue2", 98), consumer.next());
verify(logger).info(endsWith(json2));
List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(1000L);
assertTrue(consumer.hasNext());
assertEquals(new TestMessage("sValue1", 99), messageList.get(0).getMessage());
assertEquals(new TestMessage("sValue3", 97), consumer.next());
verify(logger).info(endsWith(json3));
assertEquals(new TestMessage("sValue2", 98), messageList.get(1).getMessage());
assertTrue(consumer.hasNext());
assertEquals(new TestMessage("sValue3", 97), messageList.get(2).getMessage());
assertEquals(new TestMessage("sValue4", 96), consumer.next());
verify(logger).info(endsWith(json4));
assertEquals(new TestMessage("sValue4", 96), messageList.get(3).getMessage());
assertFalse(consumer.hasNext());
}
@Test
......@@ -137,7 +125,7 @@ public class AbstractNotificationConsumerTest {
TestMessage testMessage1 = new TestMessage("sValue1", 99);
TestMessage testMessage2 = new TestMessage("sValue2", 98);
List<String> jsonList = new LinkedList<>();
List jsonList = new LinkedList<>();
String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1));
String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), testMessage2));
......@@ -149,52 +137,19 @@ public class AbstractNotificationConsumerTest {
NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
assertTrue(consumer.hasNext());
assertEquals(testMessage1, consumer.next());
try {
List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(1000L);
assertTrue(consumer.hasNext());
messageList.get(1).getMessage();
try {
consumer.next();
fail("Expected VersionMismatchException!");
} catch (IncompatibleVersionException e) {
verify(logger).error(endsWith(json2));
}
assertFalse(consumer.hasNext());
}
@Test
public void testPeek() throws Exception {
Logger logger = mock(Logger.class);
TestMessage testMessage1 = new TestMessage("sValue1", 99);
TestMessage testMessage2 = new TestMessage("sValue2", 98);
TestMessage testMessage3 = new TestMessage("sValue3", 97);
TestMessage testMessage4 = new TestMessage("sValue4", 96);
List<String> jsonList = new LinkedList<>();
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1)));
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage2)));
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage3)));
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage4)));
Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
assertTrue(consumer.hasNext());
assertEquals(testMessage1, consumer.peek());
assertTrue(consumer.hasNext());
assertEquals(testMessage1, consumer.peek());
assertTrue(consumer.hasNext());
}
private static class TestMessage {
private String s;
......@@ -229,31 +184,16 @@ public class AbstractNotificationConsumerTest {
}
private static class TestNotificationConsumer<T> extends AbstractNotificationConsumer<T> {
private final List<String> messageList;
private final List<T> messageList;
private int index = 0;
public TestNotificationConsumer(Type versionedMessageType, List<String> messages, Logger logger) {
public TestNotificationConsumer(Type versionedMessageType, List<T> messages, Logger logger) {
super(new TestDeserializer<T>(versionedMessageType, logger));
this.messageList = messages;
}
@Override
protected String getNext() {
return messageList.get(index++);
}
@Override
protected String peekMessage() {
return messageList.get(index);
}
@Override
public boolean hasNext() {
return index < messageList.size();
}
@Override
public void commit() {
public void commit(TopicPartition partition, long offset) {
// do nothing.
}
......@@ -261,6 +201,15 @@ public class AbstractNotificationConsumerTest {
public void close() {
//do nothing
}
@Override
public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
List<AtlasKafkaMessage<T>> tempMessageList = new ArrayList();
for(Object json : messageList) {
tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String)json), -1, -1));
}
return tempMessageList;
}
}
private static final class TestDeserializer<T> extends VersionedMessageDeserializer<T> {
......
......@@ -91,7 +91,13 @@ atlas.kafka.consumer.timeout.ms=4000
atlas.kafka.auto.commit.interval.ms=100
atlas.kafka.hook.group.id=atlas
atlas.kafka.entities.group.id=atlas_entities
atlas.kafka.auto.commit.enable=false
#atlas.kafka.auto.commit.enable=false
atlas.kafka.enable.auto.commit=false
atlas.kafka.auto.offset.reset=earliest
atlas.kafka.session.timeout.ms=30000
######### Entity Audit Configs #########
atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS
......
......@@ -19,16 +19,15 @@ package org.apache.atlas.notification;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import kafka.consumer.ConsumerTimeoutException;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
......@@ -46,7 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.apache.kafka.common.TopicPartition;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Date;
......@@ -135,14 +134,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private void startConsumers(ExecutorService executorService) {
int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
List<NotificationConsumer<HookNotification.HookNotificationMessage>> notificationConsumers =
List<NotificationConsumer<HookNotificationMessage>> notificationConsumers =
notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
if (executorService == null) {
executorService = Executors.newFixedThreadPool(notificationConsumers.size(),
new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
}
executors = executorService;
for (final NotificationConsumer<HookNotification.HookNotificationMessage> consumer : notificationConsumers) {
for (final NotificationConsumer<HookNotificationMessage> consumer : notificationConsumers) {
HookConsumer hookConsumer = new HookConsumer(consumer);
consumers.add(hookConsumer);
executors.submit(hookConsumer);
......@@ -207,21 +206,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
class HookConsumer implements Runnable {
private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer;
private final NotificationConsumer<HookNotificationMessage> consumer;
private final AtomicBoolean shouldRun = new AtomicBoolean(false);
private List<HookNotification.HookNotificationMessage> failedMessages = new ArrayList<>();
private List<HookNotificationMessage> failedMessages = new ArrayList<>();
public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
public HookConsumer(NotificationConsumer<HookNotificationMessage> consumer) {
this.consumer = consumer;
}
private boolean hasNext() {
try {
return consumer.hasNext();
} catch (ConsumerTimeoutException e) {
return false;
}
}
@Override
public void run() {
......@@ -233,8 +225,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
while (shouldRun.get()) {
try {
if (hasNext()) {
handleMessage(consumer.next());
List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive(1000L);
for (AtlasKafkaMessage<HookNotificationMessage> msg : messages){
handleMessage(msg);
}
} catch (Throwable t) {
LOG.warn("Failure in NotificationHookConsumer", t);
......@@ -243,7 +236,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
@VisibleForTesting
void handleMessage(HookNotificationMessage message) throws AtlasServiceException, AtlasException {
void handleMessage(AtlasKafkaMessage<HookNotificationMessage> kafkaMsg) throws AtlasServiceException, AtlasException {
HookNotificationMessage message = kafkaMsg.getMessage();
String messageUser = message.getUser();
// Used for intermediate conversions during create and update
AtlasEntity.AtlasEntitiesWithExtInfo entities;
......@@ -345,7 +339,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
RequestContextV1.clear();
}
}
commit();
commit(kafkaMsg);
}
private void recordFailedMessages() {
......@@ -356,9 +350,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
failedMessages.clear();
}
private void commit() {
private void commit(AtlasKafkaMessage<HookNotificationMessage> kafkaMessage) {
recordFailedMessages();
consumer.commit();
TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition());
consumer.commit(partition, kafkaMessage.getOffset());
}
boolean serverAvailable(Timer timer) {
......
......@@ -34,8 +34,6 @@ import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.apache.atlas.web.integration.BaseResourceIT;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
......@@ -55,7 +53,7 @@ public class EntityNotificationIT extends BaseResourceIT {
private Id tableId;
private Id dbId;
private String traitName;
private NotificationConsumer<EntityNotification> notificationConsumer;
private NotificationConsumer notificationConsumer;
@BeforeClass
public void setUp() throws Exception {
......@@ -64,13 +62,9 @@ public class EntityNotificationIT extends BaseResourceIT {
Referenceable HiveDBInstance = createHiveDBInstanceBuiltIn(DATABASE_NAME);
dbId = createInstance(HiveDBInstance);
List<NotificationConsumer<EntityNotification>> consumers =
notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
notificationConsumer = consumers.iterator().next();
notificationConsumer = notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1).get(0);
}
@Test
public void testCreateEntity() throws Exception {
Referenceable tableInstance = createHiveTableInstanceBuiltIn(DATABASE_NAME, TABLE_NAME, dbId);
tableId = createInstance(tableInstance);
......@@ -81,7 +75,6 @@ public class EntityNotificationIT extends BaseResourceIT {
newNotificationPredicate(EntityNotification.OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid));
}
@Test(dependsOnMethods = "testCreateEntity")
public void testUpdateEntity() throws Exception {
final String property = "description";
final String newValue = "New description!";
......@@ -94,7 +87,6 @@ public class EntityNotificationIT extends BaseResourceIT {
newNotificationPredicate(EntityNotification.OperationType.ENTITY_UPDATE, HIVE_TABLE_TYPE_BUILTIN, guid));
}
@Test
public void testDeleteEntity() throws Exception {
final String tableName = "table-" + randomString();
final String dbName = "db-" + randomString();
......@@ -116,7 +108,6 @@ public class EntityNotificationIT extends BaseResourceIT {
newNotificationPredicate(EntityNotification.OperationType.ENTITY_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid));
}
@Test(dependsOnMethods = "testCreateEntity")
public void testAddTrait() throws Exception {
String superSuperTraitName = "SuperTrait" + randomString();
createTrait(superSuperTraitName);
......@@ -175,7 +166,6 @@ public class EntityNotificationIT extends BaseResourceIT {
assertEquals(2, Collections.frequency(allTraitNames, superTraitName));
}
@Test(dependsOnMethods = "testAddTrait")
public void testDeleteTrait() throws Exception {
final String guid = tableId._getId();
......
......@@ -22,6 +22,7 @@ import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.kafka.NotificationProvider;
import org.apache.atlas.model.instance.AtlasEntity;
......@@ -40,12 +41,21 @@ import org.testng.Assert;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import static org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
import java.util.List;
import org.apache.atlas.kafka.AtlasKafkaConsumer;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.*;
import org.apache.commons.configuration.Configuration;
import org.apache.atlas.ApplicationProperties;
import static org.testng.Assert.*;
public class NotificationHookConsumerKafkaTest {
public static final String NAME = "name";
......@@ -80,6 +90,7 @@ public class NotificationHookConsumerKafkaTest {
@AfterTest
public void shutdown() {
kafkaNotification.close();
kafkaNotification.stop();
}
......@@ -87,21 +98,19 @@ public class NotificationHookConsumerKafkaTest {
public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException, AtlasBaseException {
try {
produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity()));
NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
createNewConsumer(kafkaNotification, false);
NotificationConsumer<HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, false);
NotificationHookConsumer notificationHookConsumer =
new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(consumer);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
consumeOneMessage(consumer, hookConsumer);
verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean());
// produce another message, and make sure it moves ahead. If commit succeeded, this would work.
produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity()));
consumeOneMessage(consumer, hookConsumer);
verify(atlasEntityStore, times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
reset(atlasEntityStore);
}
finally {
......@@ -113,42 +122,49 @@ public class NotificationHookConsumerKafkaTest {
public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() throws Exception {
try {
produceMessage(new HookNotification.EntityCreateRequest("test_user3", createEntity()));
NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
createNewConsumer(kafkaNotification, true);
NotificationConsumer<HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, true);
assertNotNull (consumer);
NotificationHookConsumer notificationHookConsumer =
new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(consumer);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
consumeOneMessage(consumer, hookConsumer);
verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean());
// produce another message, but this will not be consumed, as commit code is not executed in hook consumer.
produceMessage(new HookNotification.EntityCreateRequest("test_user4", createEntity()));
consumeOneMessage(consumer, hookConsumer);
verify(atlasEntityStore, times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
}
finally {
kafkaNotification.close();
}
}
NotificationConsumer<HookNotification.HookNotificationMessage> createNewConsumer(
KafkaNotification kafkaNotification, boolean autoCommitEnabled) {
return kafkaNotification.<HookNotification.HookNotificationMessage>createConsumers(
NotificationInterface.NotificationType.HOOK, 1, autoCommitEnabled).get(0);
AtlasKafkaConsumer<HookNotificationMessage> createNewConsumer(KafkaNotification kafkaNotification, boolean autoCommitEnabled) {
return (AtlasKafkaConsumer) kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1, autoCommitEnabled).get(0);
}
void consumeOneMessage(NotificationConsumer<HookNotification.HookNotificationMessage> consumer,
void consumeOneMessage(NotificationConsumer<HookNotificationMessage> consumer,
NotificationHookConsumer.HookConsumer hookConsumer) throws InterruptedException {
while (!consumer.hasNext()) {
Thread.sleep(1000);
}
try {
hookConsumer.handleMessage(consumer.next());
long startTime = System.currentTimeMillis(); //fetch starting time
while ((System.currentTimeMillis() - startTime) < 10000) {
List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive(1000L);
for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) {
hookConsumer.handleMessage(msg);
}
if (messages.size() > 0) {
break;
}
}
} catch (AtlasServiceException | AtlasException e) {
Assert.fail("Consumer failed with exception ", e);
}
......@@ -163,7 +179,10 @@ public class NotificationHookConsumerKafkaTest {
}
KafkaNotification startKafkaServer() throws AtlasException, InterruptedException {
KafkaNotification kafkaNotification = (KafkaNotification) notificationInterface;
Configuration applicationProperties = ApplicationProperties.get();
applicationProperties.setProperty("atlas.kafka.data", "target/" + RandomStringUtils.randomAlphanumeric(5));
kafkaNotification = new KafkaNotification(applicationProperties);
kafkaNotification.start();
Thread.sleep(2000);
return kafkaNotification;
......@@ -173,8 +192,8 @@ public class NotificationHookConsumerKafkaTest {
return RandomStringUtils.randomAlphanumeric(10);
}
private void produceMessage(HookNotification.HookNotificationMessage message) throws NotificationException {
notificationInterface.send(NotificationInterface.NotificationType.HOOK, message);
private void produceMessage(HookNotificationMessage message) throws NotificationException {
kafkaNotification.send(NotificationInterface.NotificationType.HOOK, message);
}
}
......@@ -21,6 +21,7 @@ import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.notification.hook.HookNotification;
......@@ -36,7 +37,7 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.apache.kafka.common.TopicPartition;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
......@@ -124,9 +125,8 @@ public class NotificationHookConsumerTest {
Referenceable mock = mock(Referenceable.class);
when(message.getEntities()).thenReturn(Arrays.asList(mock));
hookConsumer.handleMessage(message);
verify(consumer).commit();
hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1));
verify(consumer).commit(any(TopicPartition.class),anyInt());
}
@Test
......@@ -141,7 +141,7 @@ public class NotificationHookConsumerTest {
{ add(mock(Referenceable.class)); }
});
when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenThrow(new RuntimeException("Simulating exception in processing message"));
hookConsumer.handleMessage(message);
hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1));
verifyZeroInteractions(consumer);
}
......
......@@ -21,7 +21,6 @@ package org.apache.atlas.web.integration;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import kafka.consumer.ConsumerTimeoutException;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasClientV2;
......@@ -42,7 +41,9 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinali
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.kafka.*;
import org.apache.atlas.notification.entity.EntityNotification;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
......@@ -634,14 +635,21 @@ public abstract class BaseResourceIT {
@Override
public boolean evaluate() throws Exception {
try {
while (consumer.hasNext() && System.currentTimeMillis() < maxCurrentTime) {
EntityNotification notification = consumer.next();
if (predicate.evaluate(notification)) {
pair.left = notification;
return true;
}
while (System.currentTimeMillis() < maxCurrentTime) {
List<AtlasKafkaMessage<EntityNotification>> messageList = consumer.receive(1000);
if(messageList.size() > 0) {
EntityNotification notification = messageList.get(0).getMessage();
if (predicate.evaluate(notification)) {
pair.left = notification;
return true;
}
}else{
LOG.info( System.currentTimeMillis()+ " messageList no records" +maxCurrentTime );
}
}
} catch(ConsumerTimeoutException e) {
} catch(Exception e) {
LOG.error(" waitForNotification", e);
//ignore
}
return false;
......
......@@ -81,7 +81,6 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
private static final String TRAITS = "traits";
private NotificationInterface notificationInterface = NotificationProvider.get();
private NotificationConsumer<EntityNotification> notificationConsumer;
@BeforeClass
public void setUp() throws Exception {
......@@ -89,10 +88,6 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
createTypeDefinitionsV1();
List<NotificationConsumer<EntityNotification>> consumers =
notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
notificationConsumer = consumers.iterator().next();
}
@Test
......@@ -218,29 +213,12 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
assertEntityAudit(dbId, EntityAuditEvent.EntityAuditAction.ENTITY_CREATE);
waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() {
@Override
public boolean evaluate(EntityNotification notification) throws Exception {
return notification != null && notification.getEntity().getId()._getId().equals(dbId);
}
});
JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, dbName));
assertEquals(results.length(), 1);
//create entity again shouldn't create another instance with same unique attribute value
List<String> entityResults = atlasClientV1.createEntity(HiveDBInstance);
assertEquals(entityResults.size(), 0);
try {
waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() {
@Override
public boolean evaluate(EntityNotification notification) throws Exception {
return notification != null && notification.getEntity().getId()._getId().equals(dbId);
}
});
} catch (Exception e) {
//expected timeout
}
results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, dbName));
assertEquals(results.length(), 1);
......
......@@ -55,7 +55,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.atlas.kafka.AtlasKafkaConsumer;
import static org.testng.Assert.*;
......@@ -72,8 +72,6 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT {
private AtlasEntity dbEntity;
private AtlasEntity tableEntity;
private NotificationInterface notificationInterface = NotificationProvider.get();
private NotificationConsumer<EntityNotification> notificationConsumer;
@BeforeClass
public void setUp() throws Exception {
......@@ -81,10 +79,6 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT {
createTypeDefinitionsV2();
List<NotificationConsumer<EntityNotification>> consumers =
notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
notificationConsumer = consumers.iterator().next();
}
@Test
......@@ -166,14 +160,6 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT {
assertEquals(results.length(), 1);
final AtlasEntity hiveDBInstanceV2 = createHiveDB();
// Do the notification thing here
waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() {
@Override
public boolean evaluate(EntityNotification notification) throws Exception {
return notification != null && notification.getEntity().getId()._getId().equals(hiveDBInstanceV2.getGuid());
}
});
results = searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE_V2, DATABASE_NAME));
assertEquals(results.length(), 1);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment