Commit 98769871 by Hemanth Yamijala

ATLAS-629 Kafka messages in ATLAS_HOOK might be lost in HA mode at the instant…

ATLAS-629 Kafka messages in ATLAS_HOOK might be lost in HA mode at the instant of failover. (yhemanth)
parent 07b8b4d3
......@@ -59,6 +59,7 @@ atlas.kafka.zookeeper.sync.time.ms=20
atlas.kafka.auto.commit.interval.ms=1000
atlas.kafka.auto.offset.reset=smallest
atlas.kafka.hook.group.id=atlas
atlas.kafka.auto.commit.enable=false
######### Hive Lineage Configs #########
......
......@@ -19,6 +19,7 @@ package org.apache.atlas.kafka;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.atlas.notification.AbstractNotificationConsumer;
import org.apache.atlas.notification.MessageDeserializer;
......@@ -35,24 +36,29 @@ public class KafkaConsumer<T> extends AbstractNotificationConsumer<T> {
private final int consumerId;
private final ConsumerIterator iterator;
private final ConsumerConnector consumerConnector;
private final boolean autoCommitEnabled;
private long lastSeenOffset;
// ----- Constructors ----------------------------------------------------
/**
* Create a Kafka consumer.
*
* @param type the notification type returned by this consumer
* @param deserializer the message deserializer used for this consumer
* @param stream the underlying Kafka stream
* @param consumerId an id value for this consumer
* @param consumerConnector the {@link ConsumerConnector} which created the underlying Kafka stream
* @param autoCommitEnabled true if consumer does not need to commit offsets explicitly, false otherwise.
*/
public KafkaConsumer(Class<T> type,
MessageDeserializer<T> deserializer, KafkaStream<String, String> stream, int consumerId) {
public KafkaConsumer(MessageDeserializer<T> deserializer, KafkaStream<String, String> stream, int consumerId,
ConsumerConnector consumerConnector, boolean autoCommitEnabled) {
super(deserializer);
this.consumerConnector = consumerConnector;
this.lastSeenOffset = 0;
this.iterator = stream.iterator();
this.consumerId = consumerId;
this.autoCommitEnabled = autoCommitEnabled;
}
......@@ -71,6 +77,7 @@ public class KafkaConsumer<T> extends AbstractNotificationConsumer<T> {
MessageAndMetadata message = iterator.next();
LOG.debug("Read message: conumerId: {}, topic - {}, partition - {}, offset - {}, message - {}",
consumerId, message.topic(), message.partition(), message.offset(), message.message());
lastSeenOffset = message.offset();
return (String) message.message();
}
......@@ -79,4 +86,14 @@ public class KafkaConsumer<T> extends AbstractNotificationConsumer<T> {
MessageAndMetadata message = (MessageAndMetadata) iterator.peek();
return (String) message.message();
}
@Override
public void commit() {
if (autoCommitEnabled) {
LOG.debug("Auto commit is disabled, not committing.");
} else {
consumerConnector.commitOffsets();
LOG.debug("Committed offset: {}", lastSeenOffset);
}
}
}
......@@ -17,6 +17,7 @@
*/
package org.apache.atlas.kafka;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Singleton;
import kafka.consumer.Consumer;
import kafka.consumer.KafkaStream;
......@@ -112,9 +113,6 @@ public class KafkaNotification extends AbstractNotification implements Service {
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
......@@ -123,6 +121,10 @@ public class KafkaNotification extends AbstractNotification implements Service {
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest");
}
@VisibleForTesting
protected KafkaNotification(Properties properties) {
this.properties = properties;
}
// ----- Service ---------------------------------------------------------
......@@ -159,26 +161,34 @@ public class KafkaNotification extends AbstractNotification implements Service {
@Override
public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType,
int numConsumers) {
return createConsumers(notificationType, numConsumers,
Boolean.valueOf(properties.getProperty("auto.commit.enable", "true")));
}
@VisibleForTesting
public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType,
int numConsumers, boolean autoCommitEnabled) {
String topic = TOPIC_MAP.get(notificationType);
Properties consumerProperties = getConsumerProperties(notificationType);
ConsumerConnector consumerConnector = createConsumerConnector(consumerProperties);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, numConsumers);
StringDecoder decoder = new StringDecoder(null);
Map<String, List<KafkaStream<String, String>>> streamsMap =
consumerConnector.createMessageStreams(topicCountMap, decoder, decoder);
List<KafkaStream<String, String>> kafkaConsumers = streamsMap.get(topic);
List<NotificationConsumer<T>> consumers = new ArrayList<>(numConsumers);
int consumerId = 0;
for (KafkaStream stream : kafkaConsumers) {
KafkaConsumer<T> kafkaConsumer =
createKafkaConsumer(notificationType.getClassType(), notificationType.getDeserializer(),
stream, consumerId++);
consumers.add(kafkaConsumer);
for (int i = 0; i < numConsumers; i++) {
ConsumerConnector consumerConnector = createConsumerConnector(consumerProperties);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, 1);
StringDecoder decoder = new StringDecoder(null);
Map<String, List<KafkaStream<String, String>>> streamsMap =
consumerConnector.createMessageStreams(topicCountMap, decoder, decoder);
List<KafkaStream<String, String>> kafkaConsumers = streamsMap.get(topic);
for (KafkaStream stream : kafkaConsumers) {
KafkaConsumer<T> kafkaConsumer =
createKafkaConsumer(notificationType.getClassType(), notificationType.getDeserializer(),
stream, i, consumerConnector, autoCommitEnabled);
consumers.add(kafkaConsumer);
}
consumerConnectors.add(consumerConnector);
}
consumerConnectors.add(consumerConnector);
return consumers;
}
......@@ -245,12 +255,14 @@ public class KafkaNotification extends AbstractNotification implements Service {
* @param stream the Kafka stream
* @param consumerId the id for the new consumer
*
* @param consumerConnector
* @return a new Kafka consumer
*/
protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T> type,
MessageDeserializer<T> deserializer, KafkaStream stream,
int consumerId) {
return new org.apache.atlas.kafka.KafkaConsumer<T>(type, deserializer, stream, consumerId);
protected <T> org.apache.atlas.kafka.KafkaConsumer<T>
createKafkaConsumer(Class<T> type, MessageDeserializer<T> deserializer, KafkaStream stream,
int consumerId, ConsumerConnector consumerConnector, boolean autoCommitEnabled) {
return new org.apache.atlas.kafka.KafkaConsumer<T>(deserializer, stream,
consumerId, consumerConnector, autoCommitEnabled);
}
// Get properties for consumer request
......@@ -266,6 +278,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
consumerProperties.putAll(properties);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
LOG.info("Consumer property: auto.commit.enable: " + consumerProperties.getProperty("auto.commit.enable"));
return consumerProperties;
}
......
......@@ -17,6 +17,7 @@
*/
package org.apache.atlas.notification;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
......@@ -46,7 +47,7 @@ public abstract class AbstractNotification implements NotificationInterface {
*/
public static final MessageVersion CURRENT_MESSAGE_VERSION = new MessageVersion("1.0.0");
private static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded";
public static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded";
private final boolean embedded;
private final boolean isHAEnabled;
......@@ -59,7 +60,6 @@ public abstract class AbstractNotification implements NotificationInterface {
registerTypeAdapter(JSONArray.class, new JSONArraySerializer()).
create();
// ----- Constructors ----------------------------------------------------
public AbstractNotification(Configuration applicationProperties) throws AtlasException {
......@@ -67,6 +67,11 @@ public abstract class AbstractNotification implements NotificationInterface {
this.isHAEnabled = HAConfiguration.isHAEnabled(applicationProperties);
}
@VisibleForTesting
protected AbstractNotification() {
embedded = false;
isHAEnabled = false;
}
// ----- NotificationInterface -------------------------------------------
......
......@@ -68,4 +68,6 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon
public T peek() {
return deserializer.deserialize(peekMessage());
}
public abstract void commit();
}
......@@ -43,4 +43,13 @@ public interface NotificationConsumer<T> {
* @return the next notification
*/
T peek();
/**
* Commit the offset of messages that have been successfully processed.
*
* This API should be called when messages read with {@link #next()} have been successfully processed and
* the consumer is ready to handle the next message, which could happen even after a normal or an abnormal
* restart.
*/
void commit();
}
......@@ -20,6 +20,7 @@ package org.apache.atlas.kafka;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.MessageVersion;
......@@ -33,6 +34,9 @@ import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.codehaus.jettison.json.JSONException;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.Collections;
......@@ -41,6 +45,8 @@ import java.util.List;
import java.util.NoSuchElementException;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.*;
......@@ -51,6 +57,14 @@ public class KafkaConsumerTest {
private static final String TRAIT_NAME = "MyTrait";
@Mock
private ConsumerConnector consumerConnector;
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Test
public void testNext() throws Exception {
KafkaStream<String, String> stream = mock(KafkaStream.class);
......@@ -70,8 +84,9 @@ public class KafkaConsumerTest {
when(messageAndMetadata.message()).thenReturn(json);
NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
new KafkaConsumer<>(NotificationInterface.NotificationType.HOOK.getClassType(),
NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99);
new KafkaConsumer<>(
NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99,
consumerConnector, false);
assertTrue(consumer.hasNext());
......@@ -101,8 +116,9 @@ public class KafkaConsumerTest {
when(messageAndMetadata.message()).thenReturn(json);
NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
new KafkaConsumer<>(NotificationInterface.NotificationType.HOOK.getClassType(),
NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99);
new KafkaConsumer<>(
NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99,
consumerConnector, false);
assertTrue(consumer.hasNext());
......@@ -135,8 +151,9 @@ public class KafkaConsumerTest {
when(messageAndMetadata.message()).thenReturn(json);
NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
new KafkaConsumer<>(NotificationInterface.NotificationType.HOOK.getClassType(),
NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99);
new KafkaConsumer<>(
NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99,
consumerConnector, false);
assertTrue(consumer.hasNext());
......@@ -147,6 +164,32 @@ public class KafkaConsumerTest {
assertTrue(consumer.hasNext());
}
@Test
public void testCommitIsCalledIfAutoCommitDisabled() {
KafkaStream<String, String> stream = mock(KafkaStream.class);
NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
new KafkaConsumer<>(
NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99,
consumerConnector, false);
consumer.commit();
verify(consumerConnector).commitOffsets();
}
@Test
public void testCommitIsNotCalledIfAutoCommitEnabled() {
KafkaStream<String, String> stream = mock(KafkaStream.class);
NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
new KafkaConsumer<>(
NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99,
consumerConnector, true);
consumer.commit();
verify(consumerConnector, never()).commitOffsets();
}
private Referenceable getEntity(String traitName) {
Referenceable entity = EntityNotificationImplTest.getEntity("id");
List<IStruct> traitInfo = new LinkedList<>();
......
......@@ -17,26 +17,16 @@
*/
package org.apache.atlas.kafka;
import com.google.inject.Inject;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.MessageDeserializer;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule;
import org.apache.commons.configuration.Configuration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.util.Collections;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
......@@ -44,99 +34,80 @@ import java.util.Properties;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@Guice(modules = NotificationModule.class)
public class KafkaNotificationTest {
@Inject
private KafkaNotification kafka;
@BeforeClass
public void setUp() throws Exception {
kafka.start();
}
@Test
@SuppressWarnings("unchecked")
public void testCreateConsumers() throws Exception {
Configuration configuration = mock(Configuration.class);
Iterator iterator = mock(Iterator.class);
ConsumerConnector consumerConnector = mock(ConsumerConnector.class);
KafkaStream kafkaStream1 = mock(KafkaStream.class);
KafkaStream kafkaStream2 = mock(KafkaStream.class);
String groupId = "groupId9999";
when(configuration.subset(KafkaNotification.PROPERTY_PREFIX)).thenReturn(configuration);
when(configuration.getKeys()).thenReturn(iterator);
when(iterator.hasNext()).thenReturn(true).thenReturn(false);
when(iterator.next()).thenReturn("entities." + KafkaNotification.CONSUMER_GROUP_ID_PROPERTY);
when(configuration.getList("entities." + KafkaNotification.CONSUMER_GROUP_ID_PROPERTY))
.thenReturn(Collections.<Object>singletonList(groupId));
Map<String, List<KafkaStream<String, String>>> streamsMap = new HashMap<>();
List<KafkaStream<String, String>> kafkaStreamList = new LinkedList<>();
kafkaStreamList.add(kafkaStream1);
kafkaStreamList.add(kafkaStream2);
streamsMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, kafkaStreamList);
Properties properties = mock(Properties.class);
when(properties.getProperty("entities.group.id")).thenReturn("atlas");
final ConsumerConnector consumerConnector = mock(ConsumerConnector.class);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, 2);
topicCountMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, 1);
when(consumerConnector.createMessageStreams(
eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class))).thenReturn(streamsMap);
TestKafkaNotification kafkaNotification = new TestKafkaNotification(configuration, consumerConnector);
Map<String, List<KafkaStream<String, String>>> kafkaStreamsMap =
new HashMap<>();
List<KafkaStream<String, String>> kafkaStreams = new ArrayList<>();
KafkaStream kafkaStream = mock(KafkaStream.class);
kafkaStreams.add(kafkaStream);
kafkaStreamsMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, kafkaStreams);
List<NotificationConsumer<String>> consumers =
kafkaNotification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 2);
when(consumerConnector.createMessageStreams(
eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class))).thenReturn(kafkaStreamsMap);
assertEquals(2, consumers.size());
final KafkaConsumer consumer1 = mock(KafkaConsumer.class);
final KafkaConsumer consumer2 = mock(KafkaConsumer.class);
// assert that all of the given kafka streams were used to create kafka consumers
List<KafkaStream> streams = kafkaNotification.kafkaStreams;
assertTrue(streams.contains(kafkaStream1));
assertTrue(streams.contains(kafkaStream2));
KafkaNotification kafkaNotification =
new TestKafkaNotification(properties, consumerConnector, consumer1, consumer2);
// assert that the given consumer group id was added to the properties used to create the consumer connector
Properties properties = kafkaNotification.myProperties;
assertEquals(groupId, properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
}
List<NotificationConsumer<String>> consumers =
kafkaNotification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 2);
@AfterClass
public void teardown() throws Exception {
kafka.stop();
verify(consumerConnector, times(2)).createMessageStreams(
eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class));
assertEquals(consumers.size(), 2);
assertTrue(consumers.contains(consumer1));
assertTrue(consumers.contains(consumer2));
}
// Extended kafka notification class for testing.
private static class TestKafkaNotification extends KafkaNotification {
class TestKafkaNotification extends KafkaNotification {
private final ConsumerConnector consumerConnector;
private final KafkaConsumer consumer1;
private final KafkaConsumer consumer2;
private Properties myProperties;
private List<KafkaStream> kafkaStreams = new LinkedList<>();
public TestKafkaNotification(Configuration applicationProperties,
ConsumerConnector consumerConnector) throws AtlasException {
super(applicationProperties);
TestKafkaNotification(Properties properties, ConsumerConnector consumerConnector,
KafkaConsumer consumer1, KafkaConsumer consumer2) {
super(properties);
this.consumerConnector = consumerConnector;
this.consumer1 = consumer1;
this.consumer2 = consumer2;
}
@Override
protected ConsumerConnector createConsumerConnector(Properties consumerProperties) {
this.myProperties = consumerProperties;
kafkaStreams.clear();
return consumerConnector;
}
@Override
protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T> type,
MessageDeserializer<T> deserializer,
KafkaStream stream,
int consumerId) {
kafkaStreams.add(stream);
return super.createKafkaConsumer(type, deserializer, stream, consumerId);
protected <T> org.apache.atlas.kafka.KafkaConsumer<T>
createKafkaConsumer(Class<T> type, MessageDeserializer<T> deserializer, KafkaStream stream,
int consumerId, ConsumerConnector connector, boolean autoCommitEnabled) {
if (consumerId == 0) {
return consumer1;
} else if (consumerId == 1) {
return consumer2;
}
return null;
}
}
}
......@@ -253,6 +253,11 @@ public class AbstractNotificationConsumerTest {
public boolean hasNext() {
return index < messageList.size();
}
@Override
public void commit() {
// do nothing.
}
}
private static final class TestDeserializer<T> extends VersionedMessageDeserializer<T> {
......
......@@ -20,6 +20,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
ATLAS-629 Kafka messages in ATLAS_HOOK might be lost in HA mode at the instant of failover. (yhemanth)
ATLAS-758 hdfs location of hive table is pointing to old location even after rename ( sumasai )
ATLAS-667 Entity delete should check for required reverse references ( dkantor via sumasai )
ATLAS-738 Add query ability on system properties like guid, state, createdtime etc (shwethags)
......
......@@ -70,6 +70,7 @@ atlas.kafka.consumer.timeout.ms=100
atlas.kafka.auto.commit.interval.ms=100
atlas.kafka.hook.group.id=atlas
atlas.kafka.entities.group.id=atlas_entities
atlas.kafka.auto.commit.enable=false
######### Entity Audit Configs #########
atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS
......
......@@ -17,6 +17,7 @@
*/
package org.apache.atlas.notification;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.google.inject.Singleton;
......@@ -183,50 +184,55 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
while (shouldRun.get()) {
try {
if (hasNext()) {
HookNotification.HookNotificationMessage message = consumer.next();
atlasClient.setUser(message.getUser());
try {
switch (message.getType()) {
case ENTITY_CREATE:
HookNotification.EntityCreateRequest createRequest =
(HookNotification.EntityCreateRequest) message;
atlasClient.createEntity(createRequest.getEntities());
break;
handleMessage(consumer.next());
}
} catch (Throwable t) {
LOG.warn("Failure in NotificationHookConsumer", t);
}
}
}
case ENTITY_PARTIAL_UPDATE:
HookNotification.EntityPartialUpdateRequest partialUpdateRequest =
(HookNotification.EntityPartialUpdateRequest) message;
atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
partialUpdateRequest.getAttribute(),
partialUpdateRequest.getAttributeValue(), partialUpdateRequest.getEntity());
break;
@VisibleForTesting
void handleMessage(HookNotification.HookNotificationMessage message) {
atlasClient.setUser(message.getUser());
try {
switch (message.getType()) {
case ENTITY_CREATE:
HookNotification.EntityCreateRequest createRequest =
(HookNotification.EntityCreateRequest) message;
atlasClient.createEntity(createRequest.getEntities());
break;
case ENTITY_DELETE:
HookNotification.EntityDeleteRequest deleteRequest =
(HookNotification.EntityDeleteRequest) message;
atlasClient.deleteEntity(deleteRequest.getTypeName(),
deleteRequest.getAttribute(),
deleteRequest.getAttributeValue());
break;
case ENTITY_PARTIAL_UPDATE:
HookNotification.EntityPartialUpdateRequest partialUpdateRequest =
(HookNotification.EntityPartialUpdateRequest) message;
atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
partialUpdateRequest.getAttribute(),
partialUpdateRequest.getAttributeValue(), partialUpdateRequest.getEntity());
break;
case ENTITY_FULL_UPDATE:
HookNotification.EntityUpdateRequest updateRequest =
(HookNotification.EntityUpdateRequest) message;
atlasClient.updateEntities(updateRequest.getEntities());
break;
case ENTITY_DELETE:
HookNotification.EntityDeleteRequest deleteRequest =
(HookNotification.EntityDeleteRequest) message;
atlasClient.deleteEntity(deleteRequest.getTypeName(),
deleteRequest.getAttribute(),
deleteRequest.getAttributeValue());
break;
default:
throw new IllegalStateException("Unhandled exception!");
}
} catch (Exception e) {
//todo handle failures
LOG.warn("Error handling message {}", message, e);
}
}
} catch (Throwable t) {
LOG.warn("Failure in NotificationHookConsumer", t);
case ENTITY_FULL_UPDATE:
HookNotification.EntityUpdateRequest updateRequest =
(HookNotification.EntityUpdateRequest) message;
atlasClient.updateEntities(updateRequest.getEntities());
break;
default:
throw new IllegalStateException("Unhandled exception!");
}
} catch (Exception e) {
//todo handle failures
LOG.warn("Error handling message {}", message, e);
}
consumer.commit();
}
boolean serverAvailable(Timer timer) {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import com.google.inject.Inject;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.LocalAtlasClient;
import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.commons.lang.RandomStringUtils;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@Guice(modules = NotificationModule.class)
public class NotificationHookConsumerKafkaTest {
@Inject
private NotificationInterface notificationInterface;
private KafkaNotification kafkaNotification;
@BeforeTest
public void setup() throws AtlasException {
kafkaNotification = startKafkaServer();
}
@AfterTest
public void shutdown() {
kafkaNotification.stop();
}
@Test
public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException {
produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity()));
NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
createNewConsumer(kafkaNotification, false);
LocalAtlasClient localAtlasClient = mock(LocalAtlasClient.class);
NotificationHookConsumer notificationHookConsumer =
new NotificationHookConsumer(kafkaNotification, localAtlasClient);
NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(consumer);
consumeOneMessage(consumer, hookConsumer);
verify(localAtlasClient).setUser("test_user1");
// produce another message, and make sure it moves ahead. If commit succeeded, this would work.
produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity()));
consumeOneMessage(consumer, hookConsumer);
verify(localAtlasClient).setUser("test_user2");
kafkaNotification.close();
}
@Test
public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled()
throws NotificationException, InterruptedException {
produceMessage(new HookNotification.EntityCreateRequest("test_user3", createEntity()));
NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
createNewConsumer(kafkaNotification, true);
LocalAtlasClient localAtlasClient = mock(LocalAtlasClient.class);
NotificationHookConsumer notificationHookConsumer =
new NotificationHookConsumer(kafkaNotification, localAtlasClient);
NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(consumer);
consumeOneMessage(consumer, hookConsumer);
verify(localAtlasClient).setUser("test_user3");
// produce another message, but this will not be consumed, as commit code is not executed in hook consumer.
produceMessage(new HookNotification.EntityCreateRequest("test_user4", createEntity()));
consumeOneMessage(consumer, hookConsumer);
verify(localAtlasClient).setUser("test_user3");
kafkaNotification.close();
}
NotificationConsumer<HookNotification.HookNotificationMessage> createNewConsumer(
KafkaNotification kafkaNotification, boolean autoCommitEnabled) {
return kafkaNotification.<HookNotification.HookNotificationMessage>createConsumers(
NotificationInterface.NotificationType.HOOK, 1, autoCommitEnabled).get(0);
}
void consumeOneMessage(NotificationConsumer<HookNotification.HookNotificationMessage> consumer,
NotificationHookConsumer.HookConsumer hookConsumer) throws InterruptedException {
while (!consumer.hasNext()) {
Thread.sleep(1000);
}
hookConsumer.handleMessage(consumer.next());
}
Referenceable createEntity() {
final Referenceable entity = new Referenceable(AtlasClient.DATA_SET_SUPER_TYPE);
entity.set("name", "db" + randomString());
entity.set("description", randomString());
return entity;
}
KafkaNotification startKafkaServer() throws AtlasException {
KafkaNotification kafkaNotification = (KafkaNotification) notificationInterface;
kafkaNotification.start();
return kafkaNotification;
}
protected String randomString() {
return RandomStringUtils.randomAlphanumeric(10);
}
private void produceMessage(HookNotification.HookNotificationMessage message) throws NotificationException {
notificationInterface.send(NotificationInterface.NotificationType.HOOK, message);
}
}
......@@ -21,6 +21,7 @@ import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.LocalAtlasClient;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.commons.configuration.Configuration;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
......@@ -34,6 +35,7 @@ import java.util.concurrent.ExecutorService;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
......@@ -87,6 +89,40 @@ public class NotificationHookConsumerTest {
}
@Test
public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException {
NotificationHookConsumer notificationHookConsumer =
new NotificationHookConsumer(notificationInterface, atlasClient);
NotificationConsumer consumer = mock(NotificationConsumer.class);
NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(consumer);
HookNotification.EntityCreateRequest message = mock(HookNotification.EntityCreateRequest.class);
when(message.getUser()).thenReturn("user");
when(message.getType()).thenReturn(HookNotification.HookNotificationType.ENTITY_CREATE);
hookConsumer.handleMessage(message);
verify(consumer).commit();
}
@Test
public void testCommitIsCalledEvenWhenMessageProcessingFails() throws AtlasServiceException {
NotificationHookConsumer notificationHookConsumer =
new NotificationHookConsumer(notificationInterface, atlasClient);
NotificationConsumer consumer = mock(NotificationConsumer.class);
NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(consumer);
HookNotification.EntityCreateRequest message = mock(HookNotification.EntityCreateRequest.class);
when(message.getUser()).thenReturn("user");
when(message.getType()).thenReturn(HookNotification.HookNotificationType.ENTITY_CREATE);
when(atlasClient.createEntity(any(List.class))).
thenThrow(new RuntimeException("Simulating exception in processing message"));
hookConsumer.handleMessage(message);
verify(consumer).commit();
}
@Test
public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException, InterruptedException {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
NotificationHookConsumer.HookConsumer hookConsumer =
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment