Commit 6f9684b4 by ashutoshm

ATLAS-2033: HookConsumer updated to address case where consumer is stopped…

ATLAS-2033: HookConsumer updated to address case where consumer is stopped before starting. Updated unit tests.
parent 02a6e477
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
......@@ -97,7 +97,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private NotificationInterface notificationInterface;
private ExecutorService executors;
private Configuration applicationProperties;
private List<HookConsumer> consumers;
@VisibleForTesting
List<HookConsumer> consumers;
@Inject
public NotificationHookConsumer(NotificationInterface notificationInterface, AtlasEntityStore atlasEntityStore,
......@@ -212,6 +214,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
}
@VisibleForTesting
class HookConsumer extends ShutdownableThread {
private final NotificationConsumer<HookNotificationMessage> consumer;
private final AtomicBoolean shouldRun = new AtomicBoolean(false);
......@@ -419,6 +422,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public void shutdown() {
LOG.info("==> HookConsumer shutdown()");
// handle the case where thread was not started at all
// and shutdown called
if(shouldRun.get() == false) {
return;
}
super.initiateShutdown();
shouldRun.set(false);
if (consumer != null) {
......@@ -428,7 +437,6 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
LOG.info("<== HookConsumer shutdown()");
}
}
private void audit(String messageUser, String method, String path) {
......
......@@ -33,11 +33,14 @@ import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.web.service.ServiceState;
import org.apache.commons.configuration.Configuration;
import org.apache.kafka.common.TopicPartition;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.apache.kafka.common.TopicPartition;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
......@@ -48,7 +51,6 @@ import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue;
public class NotificationHookConsumerTest {
@Mock
private NotificationInterface notificationInterface;
......@@ -126,7 +128,7 @@ public class NotificationHookConsumerTest {
when(message.getEntities()).thenReturn(Arrays.asList(mock));
hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1));
verify(consumer).commit(any(TopicPartition.class),anyInt());
verify(consumer).commit(any(TopicPartition.class), anyInt());
}
@Test
......@@ -138,8 +140,10 @@ public class NotificationHookConsumerTest {
notificationHookConsumer.new HookConsumer(consumer);
HookNotification.EntityCreateRequest message = new HookNotification.EntityCreateRequest("user",
new ArrayList<Referenceable>() {
{ add(mock(Referenceable.class)); }
});
{
add(mock(Referenceable.class));
}
});
when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenThrow(new RuntimeException("Simulating exception in processing message"));
hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1));
......@@ -204,13 +208,44 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumersAreStoppedWhenInstanceBecomesPassive() throws Exception {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
List<NotificationConsumer<Object>> consumers = new ArrayList();
consumers.add(mock(NotificationConsumer.class));
when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
thenReturn(consumers);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class);
consumers.add(notificationConsumerMock);
when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).thenReturn(consumers);
final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
notificationHookConsumer.consumers.get(0).start();
Thread.sleep(500);
return null;
}
}).when(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
notificationHookConsumer.startInternal(configuration, executorService);
notificationHookConsumer.instanceIsPassive();
verify(notificationInterface).close();
verify(executorService).shutdown();
verify(notificationConsumerMock).wakeup();
}
@Test
public void consumersStoppedBeforeStarting() throws Exception {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
List<NotificationConsumer<Object>> consumers = new ArrayList();
NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class);
consumers.add(notificationConsumerMock);
when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).thenReturn(consumers);
final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
notificationHookConsumer.startInternal(configuration, executorService);
notificationHookConsumer.instanceIsPassive();
verify(notificationInterface).close();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment