Commit 30893c5e by Suma Shivaprasad

ATLAS-1111 Data loss is observed when atlas is restarted while hive_table…

ATLAS-1111 Data loss is observed when atlas is restarted while hive_table metadata ingestion into kafka topic is in-progress(shwethags via sumasai)
parent 7f2a4086
...@@ -41,7 +41,7 @@ public class Services { ...@@ -41,7 +41,7 @@ public class Services {
public void start() { public void start() {
try { try {
for (Service service : services) { for (Service service : services) {
LOG.debug("Starting service {}", service.getClass().getName()); LOG.info("Starting service {}", service.getClass().getName());
service.start(); service.start();
} }
} catch (Exception e) { } catch (Exception e) {
...@@ -51,7 +51,7 @@ public class Services { ...@@ -51,7 +51,7 @@ public class Services {
public void stop() { public void stop() {
for (Service service : services) { for (Service service : services) {
LOG.debug("Stopping service {}", service.getClass().getName()); LOG.info("Stopping service {}", service.getClass().getName());
try { try {
service.stop(); service.stop();
} catch (Throwable e) { } catch (Throwable e) {
......
...@@ -43,6 +43,14 @@ ...@@ -43,6 +43,14 @@
</layout> </layout>
</appender> </appender>
<appender name="FAILED" class="org.apache.log4j.DailyRollingFileAppender">
<param name="File" value="${atlas.log.dir}/failed.log"/>
<param name="Append" value="true"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %m"/>
</layout>
</appender>
<logger name="org.apache.atlas" additivity="false"> <logger name="org.apache.atlas" additivity="false">
<level value="info"/> <level value="info"/>
<appender-ref ref="FILE"/> <appender-ref ref="FILE"/>
...@@ -80,6 +88,11 @@ ...@@ -80,6 +88,11 @@
<appender-ref ref="AUDIT"/> <appender-ref ref="AUDIT"/>
</logger> </logger>
<logger name="FAILED" additivity="false">
<level value="info"/>
<appender-ref ref="AUDIT"/>
</logger>
<root> <root>
<priority value="warn"/> <priority value="warn"/>
<appender-ref ref="FILE"/> <appender-ref ref="FILE"/>
......
...@@ -96,4 +96,9 @@ public class KafkaConsumer<T> extends AbstractNotificationConsumer<T> { ...@@ -96,4 +96,9 @@ public class KafkaConsumer<T> extends AbstractNotificationConsumer<T> {
LOG.debug("Committed offset: {}", lastSeenOffset); LOG.debug("Committed offset: {}", lastSeenOffset);
} }
} }
@Override
public void close() {
consumerConnector.shutdown();
}
} }
...@@ -52,4 +52,6 @@ public interface NotificationConsumer<T> { ...@@ -52,4 +52,6 @@ public interface NotificationConsumer<T> {
* restart. * restart.
*/ */
void commit(); void commit();
void close();
} }
...@@ -156,7 +156,7 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN ...@@ -156,7 +156,7 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
} }
} }
public List<Referenceable> getEntities() throws JSONException { public List<Referenceable> getEntities() {
return entities; return entities;
} }
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.kafka;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import org.apache.atlas.notification.MessageDeserializer;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
public class KafkaNotificationMockTest {
@Test
@SuppressWarnings("unchecked")
public void testCreateConsumers() throws Exception {
Properties properties = mock(Properties.class);
when(properties.getProperty("entities.group.id")).thenReturn("atlas");
final ConsumerConnector consumerConnector = mock(ConsumerConnector.class);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, 1);
Map<String, List<KafkaStream<String, String>>> kafkaStreamsMap =
new HashMap<>();
List<KafkaStream<String, String>> kafkaStreams = new ArrayList<>();
KafkaStream kafkaStream = mock(KafkaStream.class);
kafkaStreams.add(kafkaStream);
kafkaStreamsMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, kafkaStreams);
when(consumerConnector.createMessageStreams(
eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class))).thenReturn(kafkaStreamsMap);
final KafkaConsumer consumer1 = mock(KafkaConsumer.class);
final KafkaConsumer consumer2 = mock(KafkaConsumer.class);
KafkaNotification kafkaNotification =
new TestKafkaNotification(properties, consumerConnector, consumer1, consumer2);
List<NotificationConsumer<String>> consumers =
kafkaNotification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 2);
verify(consumerConnector, times(2)).createMessageStreams(
eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class));
assertEquals(consumers.size(), 2);
assertTrue(consumers.contains(consumer1));
assertTrue(consumers.contains(consumer2));
}
@Test
@SuppressWarnings("unchecked")
public void shouldSendMessagesSuccessfully() throws NotificationException,
ExecutionException, InterruptedException {
Properties configProperties = mock(Properties.class);
KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
Producer producer = mock(Producer.class);
String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
String message = "This is a test message";
Future returnValue = mock(Future.class);
when(returnValue.get()).thenReturn(new RecordMetadata(new TopicPartition(topicName, 0), 0, 0));
ProducerRecord expectedRecord = new ProducerRecord(topicName, message);
when(producer.send(expectedRecord)).thenReturn(returnValue);
kafkaNotification.sendInternalToProducer(producer,
NotificationInterface.NotificationType.HOOK, new String[]{message});
verify(producer).send(expectedRecord);
}
@Test
@SuppressWarnings("unchecked")
public void shouldThrowExceptionIfProducerFails() throws NotificationException,
ExecutionException, InterruptedException {
Properties configProperties = mock(Properties.class);
KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
Producer producer = mock(Producer.class);
String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
String message = "This is a test message";
Future returnValue = mock(Future.class);
when(returnValue.get()).thenThrow(new RuntimeException("Simulating exception"));
ProducerRecord expectedRecord = new ProducerRecord(topicName, message);
when(producer.send(expectedRecord)).thenReturn(returnValue);
try {
kafkaNotification.sendInternalToProducer(producer,
NotificationInterface.NotificationType.HOOK, new String[]{message});
fail("Should have thrown NotificationException");
} catch (NotificationException e) {
assertEquals(e.getFailedMessages().size(), 1);
assertEquals(e.getFailedMessages().get(0), "This is a test message");
}
}
@Test
@SuppressWarnings("unchecked")
public void shouldCollectAllFailedMessagesIfProducerFails() throws NotificationException,
ExecutionException, InterruptedException {
Properties configProperties = mock(Properties.class);
KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
Producer producer = mock(Producer.class);
String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
String message1 = "This is a test message1";
String message2 = "This is a test message2";
Future returnValue1 = mock(Future.class);
when(returnValue1.get()).thenThrow(new RuntimeException("Simulating exception"));
Future returnValue2 = mock(Future.class);
when(returnValue2.get()).thenThrow(new RuntimeException("Simulating exception"));
ProducerRecord expectedRecord1 = new ProducerRecord(topicName, message1);
when(producer.send(expectedRecord1)).thenReturn(returnValue1);
ProducerRecord expectedRecord2 = new ProducerRecord(topicName, message2);
when(producer.send(expectedRecord2)).thenReturn(returnValue1);
try {
kafkaNotification.sendInternalToProducer(producer,
NotificationInterface.NotificationType.HOOK, new String[]{message1, message2});
fail("Should have thrown NotificationException");
} catch (NotificationException e) {
assertEquals(e.getFailedMessages().size(), 2);
assertEquals(e.getFailedMessages().get(0), "This is a test message1");
assertEquals(e.getFailedMessages().get(1), "This is a test message2");
}
}
class TestKafkaNotification extends KafkaNotification {
private final ConsumerConnector consumerConnector;
private final KafkaConsumer consumer1;
private final KafkaConsumer consumer2;
TestKafkaNotification(Properties properties, ConsumerConnector consumerConnector,
KafkaConsumer consumer1, KafkaConsumer consumer2) {
super(properties);
this.consumerConnector = consumerConnector;
this.consumer1 = consumer1;
this.consumer2 = consumer2;
}
@Override
protected ConsumerConnector createConsumerConnector(Properties consumerProperties) {
return consumerConnector;
}
@Override
protected <T> org.apache.atlas.kafka.KafkaConsumer<T>
createKafkaConsumer(Class<T> type, MessageDeserializer<T> deserializer, KafkaStream stream,
int consumerId, ConsumerConnector connector, boolean autoCommitEnabled) {
if (consumerId == 0) {
return consumer1;
} else if (consumerId == 1) {
return consumer2;
}
return null;
}
}
}
...@@ -262,6 +262,11 @@ public class AbstractNotificationConsumerTest { ...@@ -262,6 +262,11 @@ public class AbstractNotificationConsumerTest {
public void commit() { public void commit() {
// do nothing. // do nothing.
} }
@Override
public void close() {
//do nothing
}
} }
private static final class TestDeserializer<T> extends VersionedMessageDeserializer<T> { private static final class TestDeserializer<T> extends VersionedMessageDeserializer<T> {
......
...@@ -75,16 +75,16 @@ public final class AtlasPluginClassLoader extends URLClassLoader { ...@@ -75,16 +75,16 @@ public final class AtlasPluginClassLoader extends URLClassLoader {
@Override @Override
public Class<?> findClass(String name) throws ClassNotFoundException { public Class<?> findClass(String name) throws ClassNotFoundException {
if (LOG.isDebugEnabled()) { if (LOG.isTraceEnabled()) {
LOG.debug("==> AtlasPluginClassLoader.findClass(" + name + ")"); LOG.trace("==> AtlasPluginClassLoader.findClass(" + name + ")");
} }
Class<?> ret = null; Class<?> ret = null;
try { try {
// first try to find the class in pluginClassloader // first try to find the class in pluginClassloader
if (LOG.isDebugEnabled()) { if (LOG.isTraceEnabled()) {
LOG.debug("AtlasPluginClassLoader.findClass(" + name + "): calling pluginClassLoader.findClass()"); LOG.trace("AtlasPluginClassLoader.findClass(" + name + "): calling pluginClassLoader.findClass()");
} }
ret = super.findClass(name); ret = super.findClass(name);
...@@ -93,8 +93,8 @@ public final class AtlasPluginClassLoader extends URLClassLoader { ...@@ -93,8 +93,8 @@ public final class AtlasPluginClassLoader extends URLClassLoader {
MyClassLoader savedClassLoader = getComponentClassLoader(); MyClassLoader savedClassLoader = getComponentClassLoader();
if (savedClassLoader != null) { if (savedClassLoader != null) {
if (LOG.isDebugEnabled()) { if (LOG.isTraceEnabled()) {
LOG.debug( LOG.trace(
"AtlasPluginClassLoader.findClass(" + name + "): calling componentClassLoader.findClass()"); "AtlasPluginClassLoader.findClass(" + name + "): calling componentClassLoader.findClass()");
} }
...@@ -102,8 +102,8 @@ public final class AtlasPluginClassLoader extends URLClassLoader { ...@@ -102,8 +102,8 @@ public final class AtlasPluginClassLoader extends URLClassLoader {
} }
} }
if (LOG.isDebugEnabled()) { if (LOG.isTraceEnabled()) {
LOG.debug("<== AtlasPluginClassLoader.findClass(" + name + "): " + ret); LOG.trace("<== AtlasPluginClassLoader.findClass(" + name + "): " + ret);
} }
return ret; return ret;
...@@ -111,16 +111,16 @@ public final class AtlasPluginClassLoader extends URLClassLoader { ...@@ -111,16 +111,16 @@ public final class AtlasPluginClassLoader extends URLClassLoader {
@Override @Override
public Class<?> loadClass(String name) throws ClassNotFoundException { public Class<?> loadClass(String name) throws ClassNotFoundException {
if (LOG.isDebugEnabled()) { if (LOG.isTraceEnabled()) {
LOG.debug("==> AtlasPluginClassLoader.loadClass(" + name + ")"); LOG.trace("==> AtlasPluginClassLoader.loadClass(" + name + ")");
} }
Class<?> ret = null; Class<?> ret = null;
try { try {
// first try to load the class from pluginClassloader // first try to load the class from pluginClassloader
if (LOG.isDebugEnabled()) { if (LOG.isTraceEnabled()) {
LOG.debug("AtlasPluginClassLoader.loadClass(" + name + "): calling pluginClassLoader.loadClass()"); LOG.trace("AtlasPluginClassLoader.loadClass(" + name + "): calling pluginClassLoader.loadClass()");
} }
ret = super.loadClass(name); ret = super.loadClass(name);
...@@ -129,8 +129,8 @@ public final class AtlasPluginClassLoader extends URLClassLoader { ...@@ -129,8 +129,8 @@ public final class AtlasPluginClassLoader extends URLClassLoader {
MyClassLoader savedClassLoader = getComponentClassLoader(); MyClassLoader savedClassLoader = getComponentClassLoader();
if (savedClassLoader != null) { if (savedClassLoader != null) {
if (LOG.isDebugEnabled()) { if (LOG.isTraceEnabled()) {
LOG.debug( LOG.trace(
"AtlasPluginClassLoader.loadClass(" + name + "): calling componentClassLoader.loadClass()"); "AtlasPluginClassLoader.loadClass(" + name + "): calling componentClassLoader.loadClass()");
} }
...@@ -138,8 +138,8 @@ public final class AtlasPluginClassLoader extends URLClassLoader { ...@@ -138,8 +138,8 @@ public final class AtlasPluginClassLoader extends URLClassLoader {
} }
} }
if (LOG.isDebugEnabled()) { if (LOG.isTraceEnabled()) {
LOG.debug("<== AtlasPluginClassLoader.loadClass(" + name + "): " + ret); LOG.trace("<== AtlasPluginClassLoader.loadClass(" + name + "): " + ret);
} }
return ret; return ret;
......
...@@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES: ...@@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES:
ATLAS-1060 Add composite indexes for exact match performance improvements for all attributes (sumasai via shwethags) ATLAS-1060 Add composite indexes for exact match performance improvements for all attributes (sumasai via shwethags)
ALL CHANGES: ALL CHANGES:
ATLAS-1111 Data loss is observed when atlas is restarted while hive_table metadata ingestion into kafka topic is in-progress(shwethags via sumasai)
ATLAS-1115 Show Tag / Taxonomy Listing in sorted order (Kalyanikashikar via sumasai) ATLAS-1115 Show Tag / Taxonomy Listing in sorted order (Kalyanikashikar via sumasai)
ATLAS-1117 Atlas start fails on trunk (jnhagelb via dkantor) ATLAS-1117 Atlas start fails on trunk (jnhagelb via dkantor)
ATLAS-1112 Hive table GET response from atlas server had duplicate column entries ( ayubkhan, mneethiraj via sumasai) ATLAS-1112 Hive table GET response from atlas server had duplicate column entries ( ayubkhan, mneethiraj via sumasai)
......
...@@ -41,7 +41,7 @@ public class GraphTransactionInterceptor implements MethodInterceptor { ...@@ -41,7 +41,7 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
try { try {
Object response = invocation.proceed(); Object response = invocation.proceed();
titanGraph.commit(); titanGraph.commit();
LOG.debug("graph commit"); LOG.info("graph commit");
return response; return response;
} catch (Throwable t) { } catch (Throwable t) {
titanGraph.rollback(); titanGraph.rollback();
......
...@@ -57,6 +57,19 @@ ...@@ -57,6 +57,19 @@
</logger> </logger>
--> -->
<appender name="FAILED" class="org.apache.log4j.DailyRollingFileAppender">
<param name="File" value="${atlas.log.dir}/failed.log"/>
<param name="Append" value="true"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %m"/>
</layout>
</appender>
<logger name="FAILED" additivity="false">
<level value="info"/>
<appender-ref ref="AUDIT"/>
</logger>
<logger name="com.thinkaurelius.titan" additivity="false"> <logger name="com.thinkaurelius.titan" additivity="false">
<level value="info"/> <level value="info"/>
<appender-ref ref="console"/> <appender-ref ref="console"/>
......
...@@ -77,7 +77,7 @@ atlas.kafka.bootstrap.servers=localhost:19027 ...@@ -77,7 +77,7 @@ atlas.kafka.bootstrap.servers=localhost:19027
atlas.kafka.data=${sys:atlas.data}/kafka atlas.kafka.data=${sys:atlas.data}/kafka
atlas.kafka.zookeeper.session.timeout.ms=4000 atlas.kafka.zookeeper.session.timeout.ms=4000
atlas.kafka.zookeeper.sync.time.ms=20 atlas.kafka.zookeeper.sync.time.ms=20
atlas.kafka.consumer.timeout.ms=100 atlas.kafka.consumer.timeout.ms=4000
atlas.kafka.auto.commit.interval.ms=100 atlas.kafka.auto.commit.interval.ms=100
atlas.kafka.hook.group.id=atlas atlas.kafka.hook.group.id=atlas
atlas.kafka.entities.group.id=atlas_entities atlas.kafka.entities.group.id=atlas_entities
...@@ -122,4 +122,4 @@ atlas.auth.policy.file=${sys:user.dir}/distro/src/conf/policy-store.txt ...@@ -122,4 +122,4 @@ atlas.auth.policy.file=${sys:user.dir}/distro/src/conf/policy-store.txt
atlas.authentication.method.file=true atlas.authentication.method.file=true
atlas.authentication.method.ldap.type=none atlas.authentication.method.ldap.type=none
atlas.authentication.method.file.filename=${sys:user.dir}/distro/src/conf/users-credentials.properties atlas.authentication.method.file.filename=${sys:user.dir}/distro/src/conf/users-credentials.properties
atlas.authentication.method.kerberos=false atlas.authentication.method.kerberos=false
\ No newline at end of file
...@@ -24,6 +24,7 @@ import com.google.inject.Singleton; ...@@ -24,6 +24,7 @@ import com.google.inject.Singleton;
import kafka.consumer.ConsumerTimeoutException; import kafka.consumer.ConsumerTimeoutException;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.LocalAtlasClient; import org.apache.atlas.LocalAtlasClient;
import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.listener.ActiveStateChangeHandler;
...@@ -46,11 +47,18 @@ import java.util.concurrent.atomic.AtomicBoolean; ...@@ -46,11 +47,18 @@ import java.util.concurrent.atomic.AtomicBoolean;
@Singleton @Singleton
public class NotificationHookConsumer implements Service, ActiveStateChangeHandler { public class NotificationHookConsumer implements Service, ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class); private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
private static Logger FAILED_LOG = LoggerFactory.getLogger("FAILED");
private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName(); private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName();
public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads"; public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries";
public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize";
public static final int SERVER_READY_WAIT_TIME_MS = 1000; public static final int SERVER_READY_WAIT_TIME_MS = 1000;
private final LocalAtlasClient atlasClient; private final LocalAtlasClient atlasClient;
private final int maxRetries;
private final int failedMsgCacheSize;
private NotificationInterface notificationInterface; private NotificationInterface notificationInterface;
private ExecutorService executors; private ExecutorService executors;
...@@ -58,20 +66,23 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -58,20 +66,23 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private List<HookConsumer> consumers; private List<HookConsumer> consumers;
@Inject @Inject
public NotificationHookConsumer(NotificationInterface notificationInterface, LocalAtlasClient atlasClient) { public NotificationHookConsumer(NotificationInterface notificationInterface, LocalAtlasClient atlasClient)
throws AtlasException {
this.notificationInterface = notificationInterface; this.notificationInterface = notificationInterface;
this.atlasClient = atlasClient; this.atlasClient = atlasClient;
this.applicationProperties = ApplicationProperties.get();
maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20);
} }
@Override @Override
public void start() throws AtlasException { public void start() throws AtlasException {
Configuration configuration = ApplicationProperties.get(); startInternal(applicationProperties, null);
startInternal(configuration, null);
} }
void startInternal(Configuration configuration, void startInternal(Configuration configuration, ExecutorService executorService) {
ExecutorService executorService) {
this.applicationProperties = configuration;
if (consumers == null) { if (consumers == null) {
consumers = new ArrayList<>(); consumers = new ArrayList<>();
} }
...@@ -103,16 +114,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -103,16 +114,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
@Override @Override
public void stop() { public void stop() {
//Allow for completion of outstanding work //Allow for completion of outstanding work
notificationInterface.close();
try { try {
stopConsumerThreads();
if (executors != null) { if (executors != null) {
stopConsumerThreads(); executors.shutdown();
executors.shutdownNow();
if (!executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) { if (!executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly"); LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
} }
executors = null; executors = null;
} }
notificationInterface.close();
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.error("Failure in shutting down consumers"); LOG.error("Failure in shutting down consumers");
} }
...@@ -160,6 +171,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -160,6 +171,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
class HookConsumer implements Runnable { class HookConsumer implements Runnable {
private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer; private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer;
private final AtomicBoolean shouldRun = new AtomicBoolean(false); private final AtomicBoolean shouldRun = new AtomicBoolean(false);
private List<HookNotification.HookNotificationMessage> failedMessages = new ArrayList<>();
public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> consumer) { public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
this.consumer = consumer; this.consumer = consumer;
...@@ -193,45 +205,71 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -193,45 +205,71 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
} }
@VisibleForTesting @VisibleForTesting
void handleMessage(HookNotification.HookNotificationMessage message) { void handleMessage(HookNotification.HookNotificationMessage message) throws
atlasClient.setUser(message.getUser()); AtlasServiceException, AtlasException {
try { for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
switch (message.getType()) { LOG.debug("Running attempt {}", numRetries);
case ENTITY_CREATE: try {
HookNotification.EntityCreateRequest createRequest = atlasClient.setUser(message.getUser());
switch (message.getType()) {
case ENTITY_CREATE:
HookNotification.EntityCreateRequest createRequest =
(HookNotification.EntityCreateRequest) message; (HookNotification.EntityCreateRequest) message;
atlasClient.createEntity(createRequest.getEntities()); atlasClient.createEntity(createRequest.getEntities());
break; break;
case ENTITY_PARTIAL_UPDATE: case ENTITY_PARTIAL_UPDATE:
HookNotification.EntityPartialUpdateRequest partialUpdateRequest = HookNotification.EntityPartialUpdateRequest partialUpdateRequest =
(HookNotification.EntityPartialUpdateRequest) message; (HookNotification.EntityPartialUpdateRequest) message;
atlasClient.updateEntity(partialUpdateRequest.getTypeName(), atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttribute(),
partialUpdateRequest.getAttributeValue(), partialUpdateRequest.getEntity()); partialUpdateRequest.getAttributeValue(), partialUpdateRequest.getEntity());
break; break;
case ENTITY_DELETE: case ENTITY_DELETE:
HookNotification.EntityDeleteRequest deleteRequest = HookNotification.EntityDeleteRequest deleteRequest =
(HookNotification.EntityDeleteRequest) message; (HookNotification.EntityDeleteRequest) message;
atlasClient.deleteEntity(deleteRequest.getTypeName(), atlasClient.deleteEntity(deleteRequest.getTypeName(),
deleteRequest.getAttribute(), deleteRequest.getAttribute(),
deleteRequest.getAttributeValue()); deleteRequest.getAttributeValue());
break; break;
case ENTITY_FULL_UPDATE: case ENTITY_FULL_UPDATE:
HookNotification.EntityUpdateRequest updateRequest = HookNotification.EntityUpdateRequest updateRequest =
(HookNotification.EntityUpdateRequest) message; (HookNotification.EntityUpdateRequest) message;
atlasClient.updateEntities(updateRequest.getEntities()); atlasClient.updateEntities(updateRequest.getEntities());
break; break;
default: default:
throw new IllegalStateException("Unhandled exception!"); throw new IllegalStateException("Unhandled exception!");
}
break;
} catch (Throwable e) {
LOG.warn("Error handling message", e);
if (numRetries == (maxRetries - 1)) {
LOG.warn("Max retries exceeded for message {}", message, e);
failedMessages.add(message);
if (failedMessages.size() >= failedMsgCacheSize) {
recordFailedMessages();
}
return;
}
} }
} catch (Exception e) {
//todo handle failures
LOG.warn("Error handling message {}", message, e);
} }
commit();
}
private void recordFailedMessages() {
//logging failed messages
for (HookNotification.HookNotificationMessage message : failedMessages) {
FAILED_LOG.error("[DROPPED_NOTIFICATION] " + AbstractNotification.getMessageJson(message));
}
failedMessages.clear();
}
private void commit() {
recordFailedMessages();
consumer.commit(); consumer.commit();
} }
...@@ -260,6 +298,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -260,6 +298,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public void stop() { public void stop() {
shouldRun.set(false); shouldRun.set(false);
consumer.close();
} }
} }
} }
...@@ -155,8 +155,11 @@ public class GuiceServletConfig extends GuiceServletContextListener { ...@@ -155,8 +155,11 @@ public class GuiceServletConfig extends GuiceServletContextListener {
@Override @Override
public void contextDestroyed(ServletContextEvent servletContextEvent) { public void contextDestroyed(ServletContextEvent servletContextEvent) {
super.contextDestroyed(servletContextEvent); LOG.info("Starting servlet context destroy");
if(injector != null) { if(injector != null) {
//stop services
stopServices();
TypeLiteral<GraphProvider<TitanGraph>> graphProviderType = new TypeLiteral<GraphProvider<TitanGraph>>() {}; TypeLiteral<GraphProvider<TitanGraph>> graphProviderType = new TypeLiteral<GraphProvider<TitanGraph>>() {};
Provider<GraphProvider<TitanGraph>> graphProvider = injector.getProvider(Key.get(graphProviderType)); Provider<GraphProvider<TitanGraph>> graphProvider = injector.getProvider(Key.get(graphProviderType));
final Graph graph = graphProvider.get().get(); final Graph graph = graphProvider.get().get();
...@@ -166,15 +169,13 @@ public class GuiceServletConfig extends GuiceServletContextListener { ...@@ -166,15 +169,13 @@ public class GuiceServletConfig extends GuiceServletContextListener {
} catch(Throwable t) { } catch(Throwable t) {
LOG.warn("Error while shutting down graph", t); LOG.warn("Error while shutting down graph", t);
} }
//stop services
stopServices();
} }
super.contextDestroyed(servletContextEvent);
} }
protected void stopServices() { protected void stopServices() {
LOG.debug("Stopping services"); LOG.info("Stopping services");
Services services = injector.getInstance(Services.class); Services services = injector.getInstance(Services.class);
services.stop(); services.stop();
} }
} }
\ No newline at end of file
...@@ -63,11 +63,11 @@ ...@@ -63,11 +63,11 @@
</filter-mapping> </filter-mapping>
<listener> <listener>
<listener-class>org.apache.atlas.web.listeners.GuiceServletConfig</listener-class> <listener-class>org.springframework.web.util.Log4jConfigListener</listener-class>
</listener> </listener>
<listener> <listener>
<listener-class>org.springframework.web.util.Log4jConfigListener</listener-class> <listener-class>org.apache.atlas.web.listeners.GuiceServletConfig</listener-class>
</listener> </listener>
<listener> <listener>
...@@ -80,4 +80,4 @@ ...@@ -80,4 +80,4 @@
</web-app> </web-app>
\ No newline at end of file
...@@ -21,11 +21,13 @@ package org.apache.atlas.notification; ...@@ -21,11 +21,13 @@ package org.apache.atlas.notification;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.LocalAtlasClient; import org.apache.atlas.LocalAtlasClient;
import org.apache.atlas.kafka.KafkaNotification; import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.RandomStringUtils;
import org.testng.Assert;
import org.testng.annotations.AfterTest; import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest; import org.testng.annotations.BeforeTest;
import org.testng.annotations.Guice; import org.testng.annotations.Guice;
...@@ -54,7 +56,6 @@ public class NotificationHookConsumerKafkaTest { ...@@ -54,7 +56,6 @@ public class NotificationHookConsumerKafkaTest {
@Test @Test
public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException { public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException {
produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity())); produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity()));
NotificationConsumer<HookNotification.HookNotificationMessage> consumer = NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
...@@ -68,7 +69,6 @@ public class NotificationHookConsumerKafkaTest { ...@@ -68,7 +69,6 @@ public class NotificationHookConsumerKafkaTest {
consumeOneMessage(consumer, hookConsumer); consumeOneMessage(consumer, hookConsumer);
verify(localAtlasClient).setUser("test_user1"); verify(localAtlasClient).setUser("test_user1");
// produce another message, and make sure it moves ahead. If commit succeeded, this would work. // produce another message, and make sure it moves ahead. If commit succeeded, this would work.
produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity())); produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity()));
consumeOneMessage(consumer, hookConsumer); consumeOneMessage(consumer, hookConsumer);
...@@ -77,10 +77,8 @@ public class NotificationHookConsumerKafkaTest { ...@@ -77,10 +77,8 @@ public class NotificationHookConsumerKafkaTest {
kafkaNotification.close(); kafkaNotification.close();
} }
@Test @Test(dependsOnMethods = "testConsumerConsumesNewMessageWithAutoCommitDisabled")
public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() throws Exception {
throws NotificationException, InterruptedException {
produceMessage(new HookNotification.EntityCreateRequest("test_user3", createEntity())); produceMessage(new HookNotification.EntityCreateRequest("test_user3", createEntity()));
NotificationConsumer<HookNotification.HookNotificationMessage> consumer = NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
...@@ -114,7 +112,14 @@ public class NotificationHookConsumerKafkaTest { ...@@ -114,7 +112,14 @@ public class NotificationHookConsumerKafkaTest {
while (!consumer.hasNext()) { while (!consumer.hasNext()) {
Thread.sleep(1000); Thread.sleep(1000);
} }
hookConsumer.handleMessage(consumer.next());
try {
hookConsumer.handleMessage(consumer.next());
} catch (AtlasServiceException e) {
Assert.fail("Consumer failed with exception ", e);
} catch (AtlasException e) {
Assert.fail("Consumer failed with exception ", e);
}
} }
Referenceable createEntity() { Referenceable createEntity() {
......
...@@ -18,10 +18,12 @@ ...@@ -18,10 +18,12 @@
package org.apache.atlas.notification; package org.apache.atlas.notification;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException; import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.LocalAtlasClient; import org.apache.atlas.LocalAtlasClient;
import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.MockitoAnnotations; import org.mockito.MockitoAnnotations;
...@@ -62,7 +64,7 @@ public class NotificationHookConsumerTest { ...@@ -62,7 +64,7 @@ public class NotificationHookConsumerTest {
} }
@Test @Test
public void testConsumerCanProceedIfServerIsReady() throws InterruptedException, AtlasServiceException { public void testConsumerCanProceedIfServerIsReady() throws Exception {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
NotificationHookConsumer.HookConsumer hookConsumer = NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
...@@ -75,7 +77,7 @@ public class NotificationHookConsumerTest { ...@@ -75,7 +77,7 @@ public class NotificationHookConsumerTest {
} }
@Test @Test
public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws AtlasServiceException, InterruptedException { public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws Exception {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
NotificationHookConsumer.HookConsumer hookConsumer = NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
...@@ -88,7 +90,7 @@ public class NotificationHookConsumerTest { ...@@ -88,7 +90,7 @@ public class NotificationHookConsumerTest {
} }
@Test @Test
public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException { public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException, AtlasException {
NotificationHookConsumer notificationHookConsumer = NotificationHookConsumer notificationHookConsumer =
new NotificationHookConsumer(notificationInterface, atlasClient); new NotificationHookConsumer(notificationInterface, atlasClient);
NotificationConsumer consumer = mock(NotificationConsumer.class); NotificationConsumer consumer = mock(NotificationConsumer.class);
...@@ -104,25 +106,22 @@ public class NotificationHookConsumerTest { ...@@ -104,25 +106,22 @@ public class NotificationHookConsumerTest {
} }
@Test @Test
public void testCommitIsCalledEvenWhenMessageProcessingFails() throws AtlasServiceException { public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException {
NotificationHookConsumer notificationHookConsumer = NotificationHookConsumer notificationHookConsumer =
new NotificationHookConsumer(notificationInterface, atlasClient); new NotificationHookConsumer(notificationInterface, atlasClient);
NotificationConsumer consumer = mock(NotificationConsumer.class); NotificationConsumer consumer = mock(NotificationConsumer.class);
NotificationHookConsumer.HookConsumer hookConsumer = NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(consumer); notificationHookConsumer.new HookConsumer(consumer);
HookNotification.EntityCreateRequest message = mock(HookNotification.EntityCreateRequest.class); HookNotification.EntityCreateRequest message = new HookNotification.EntityCreateRequest("user", new ArrayList<Referenceable>());
when(message.getUser()).thenReturn("user");
when(message.getType()).thenReturn(HookNotification.HookNotificationType.ENTITY_CREATE);
when(atlasClient.createEntity(any(List.class))). when(atlasClient.createEntity(any(List.class))).
thenThrow(new RuntimeException("Simulating exception in processing message")); thenThrow(new RuntimeException("Simulating exception in processing message"));
hookConsumer.handleMessage(message); hookConsumer.handleMessage(message);
verify(consumer).commit(); verifyZeroInteractions(consumer);
} }
@Test @Test
public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException, InterruptedException { public void testConsumerProceedsWithFalseIfInterrupted() throws Exception {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
NotificationHookConsumer.HookConsumer hookConsumer = NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
...@@ -134,7 +133,7 @@ public class NotificationHookConsumerTest { ...@@ -134,7 +133,7 @@ public class NotificationHookConsumerTest {
} }
@Test @Test
public void testConsumerProceedsWithFalseOnAtlasServiceException() throws AtlasServiceException { public void testConsumerProceedsWithFalseOnAtlasServiceException() throws Exception {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
NotificationHookConsumer.HookConsumer hookConsumer = NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
...@@ -146,7 +145,7 @@ public class NotificationHookConsumerTest { ...@@ -146,7 +145,7 @@ public class NotificationHookConsumerTest {
} }
@Test @Test
public void testConsumersStartedIfHAIsDisabled() { public void testConsumersStartedIfHAIsDisabled() throws Exception {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
List<NotificationConsumer<Object>> consumers = new ArrayList(); List<NotificationConsumer<Object>> consumers = new ArrayList();
...@@ -160,7 +159,7 @@ public class NotificationHookConsumerTest { ...@@ -160,7 +159,7 @@ public class NotificationHookConsumerTest {
} }
@Test @Test
public void testConsumersAreNotStartedIfHAIsEnabled() { public void testConsumersAreNotStartedIfHAIsEnabled() throws Exception {
when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
...@@ -174,7 +173,7 @@ public class NotificationHookConsumerTest { ...@@ -174,7 +173,7 @@ public class NotificationHookConsumerTest {
} }
@Test @Test
public void testConsumersAreStartedWhenInstanceBecomesActive() { public void testConsumersAreStartedWhenInstanceBecomesActive() throws Exception {
when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
...@@ -190,7 +189,7 @@ public class NotificationHookConsumerTest { ...@@ -190,7 +189,7 @@ public class NotificationHookConsumerTest {
} }
@Test @Test
public void testConsumersAreStoppedWhenInstanceBecomesPassive() { public void testConsumersAreStoppedWhenInstanceBecomesPassive() throws Exception {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
List<NotificationConsumer<Object>> consumers = new ArrayList(); List<NotificationConsumer<Object>> consumers = new ArrayList();
...@@ -201,6 +200,6 @@ public class NotificationHookConsumerTest { ...@@ -201,6 +200,6 @@ public class NotificationHookConsumerTest {
notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.startInternal(configuration, executorService);
notificationHookConsumer.instanceIsPassive(); notificationHookConsumer.instanceIsPassive();
verify(notificationInterface).close(); verify(notificationInterface).close();
verify(executorService).shutdownNow(); verify(executorService).shutdown();
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment