From 8bde666ba1986f5b6c9e20cad82d6037a6739db9 Mon Sep 17 00:00:00 2001 From: Shwetha GS <sshivalingamurthy@hortonworks.com> Date: Fri, 1 Apr 2016 11:08:39 +0530 Subject: [PATCH] ATLAS-511 Ability to run multiple instances of Atlas Server with automatic failover to one active server (yhemanth via shwethags) --- addons/falcon-bridge/pom.xml | 5 +++++ addons/hive-bridge/pom.xml | 5 +++++ addons/sqoop-bridge/pom.xml | 5 +++++ addons/storm-bridge/pom.xml | 5 +++++ client/src/main/java/org/apache/atlas/AtlasClient.java | 6 ++++++ client/src/test/java/org/apache/atlas/AtlasClientTest.java | 30 ++++++++++++++++++++++++++++++ common/src/main/java/org/apache/atlas/AtlasConstants.java | 2 ++ distro/src/conf/atlas-application.properties | 11 ++++++++++- notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java | 4 ++++ notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java | 7 +++++++ notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java | 100 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------ notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java | 95 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------- release-log.txt | 1 + repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java | 39 ++++++++++++++++++++++++++++++++++----- repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java | 41 +++++++++++++++++++++++++++++++++++------ repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java | 62 +++++++++++++++++++++++++++++++++++++++++++++++++++----------- repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryHATest.java | 94 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexerTest.java | 94 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java | 104 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--- server-api/pom.xml | 13 +++++++++++++ server-api/src/main/java/org/apache/atlas/ha/HAConfiguration.java | 196 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ server-api/src/main/test/org/apache/atlas/ha/HAConfigurationTest.java | 90 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java | 10 +++++++++- typesystem/src/main/resources/atlas-application.properties | 6 +++++- typesystem/src/test/java/org/apache/atlas/typesystem/types/TypeSystemTest.java | 47 ++++++++++++++++++++++++++++++++++++++--------- webapp/src/main/java/org/apache/atlas/Atlas.java | 8 +++++--- webapp/src/main/java/org/apache/atlas/web/filters/ActiveServerFilter.java | 139 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java | 47 +++++++++++++++++++++++++++++++++++++---------- webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorModule.java | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java | 197 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceState.java | 109 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ webapp/src/main/java/org/apache/atlas/web/service/CuratorFactory.java | 94 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ webapp/src/main/java/org/apache/atlas/web/service/ServiceState.java | 96 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ webapp/src/test/java/org/apache/atlas/web/filters/ActiveServerFilterTest.java | 172 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java | 364 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceStateTest.java | 137 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ webapp/src/test/java/org/apache/atlas/web/service/ServiceStateTest.java | 67 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 38 files changed, 2530 insertions(+), 70 deletions(-) create mode 100644 repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryHATest.java create mode 100644 repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexerTest.java create mode 100644 server-api/src/main/java/org/apache/atlas/ha/HAConfiguration.java create mode 100644 server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java create mode 100644 server-api/src/main/test/org/apache/atlas/ha/HAConfigurationTest.java create mode 100644 webapp/src/main/java/org/apache/atlas/web/filters/ActiveServerFilter.java create mode 100644 webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorModule.java create mode 100644 webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java create mode 100644 webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceState.java create mode 100644 webapp/src/main/java/org/apache/atlas/web/service/CuratorFactory.java create mode 100644 webapp/src/main/java/org/apache/atlas/web/service/ServiceState.java create mode 100644 webapp/src/test/java/org/apache/atlas/web/filters/ActiveServerFilterTest.java create mode 100644 webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java create mode 100644 webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceStateTest.java create mode 100644 webapp/src/test/java/org/apache/atlas/web/service/ServiceStateTest.java diff --git a/addons/falcon-bridge/pom.xml b/addons/falcon-bridge/pom.xml index ad345c5..afbc150 100644 --- a/addons/falcon-bridge/pom.xml +++ b/addons/falcon-bridge/pom.xml @@ -151,6 +151,11 @@ <version>${project.version}</version> </artifactItem> <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-server-api</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>${scala.version}</version> diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml index 8bfbb13..720b6d1 100755 --- a/addons/hive-bridge/pom.xml +++ b/addons/hive-bridge/pom.xml @@ -229,6 +229,11 @@ <version>${project.version}</version> </artifactItem> <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-server-api</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>${scala.version}</version> diff --git a/addons/sqoop-bridge/pom.xml b/addons/sqoop-bridge/pom.xml index 343bb4e..4b5dbb1 100644 --- a/addons/sqoop-bridge/pom.xml +++ b/addons/sqoop-bridge/pom.xml @@ -234,6 +234,11 @@ <version>${project.version}</version> </artifactItem> <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-server-api</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>${scala.version}</version> diff --git a/addons/storm-bridge/pom.xml b/addons/storm-bridge/pom.xml index e3b4ed7..9efa568 100644 --- a/addons/storm-bridge/pom.xml +++ b/addons/storm-bridge/pom.xml @@ -184,6 +184,11 @@ <version>${project.version}</version> </artifactItem> <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-server-api</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>${scala.version}</version> diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java index 1fc811a..18c0569 100755 --- a/client/src/main/java/org/apache/atlas/AtlasClient.java +++ b/client/src/main/java/org/apache/atlas/AtlasClient.java @@ -145,6 +145,12 @@ public class AtlasClient { return true; } catch (ClientHandlerException che) { return false; + } catch (AtlasServiceException ase) { + if (ase.getStatus().equals(ClientResponse.Status.SERVICE_UNAVAILABLE)) { + LOG.warn("Received SERVICE_UNAVAILABLE, server is not yet ready"); + return false; + } + throw ase; } } diff --git a/client/src/test/java/org/apache/atlas/AtlasClientTest.java b/client/src/test/java/org/apache/atlas/AtlasClientTest.java index 1e7eed1..6e1fbe2 100644 --- a/client/src/test/java/org/apache/atlas/AtlasClientTest.java +++ b/client/src/test/java/org/apache/atlas/AtlasClientTest.java @@ -28,6 +28,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.testng.Assert.fail; public class AtlasClientTest { @@ -64,4 +65,33 @@ public class AtlasClientTest { new ClientHandlerException()); assertFalse(atlasClient.isServerReady()); } + + @Test + public void shouldReturnFalseIfServiceIsUnavailable() throws AtlasServiceException { + WebResource webResource = mock(WebResource.class); + AtlasClient atlasClient = new AtlasClient(webResource); + WebResource.Builder builder = setupBuilder(webResource); + ClientResponse response = mock(ClientResponse.class); + when(response.getStatus()).thenReturn(Response.Status.SERVICE_UNAVAILABLE.getStatusCode()); + when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.SERVICE_UNAVAILABLE); + + when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenReturn(response); + + assertFalse(atlasClient.isServerReady()); + } + + @Test(expectedExceptions = AtlasServiceException.class) + public void shouldThrowErrorIfAnyResponseOtherThanServiceUnavailable() throws AtlasServiceException { + WebResource webResource = mock(WebResource.class); + AtlasClient atlasClient = new AtlasClient(webResource); + WebResource.Builder builder = setupBuilder(webResource); + ClientResponse response = mock(ClientResponse.class); + when(response.getStatus()).thenReturn(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); + when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.INTERNAL_SERVER_ERROR); + + when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenReturn(response); + + atlasClient.isServerReady(); + fail("Should throw exception"); + } } diff --git a/common/src/main/java/org/apache/atlas/AtlasConstants.java b/common/src/main/java/org/apache/atlas/AtlasConstants.java index 85719c9..950ed6b 100644 --- a/common/src/main/java/org/apache/atlas/AtlasConstants.java +++ b/common/src/main/java/org/apache/atlas/AtlasConstants.java @@ -28,4 +28,6 @@ public final class AtlasConstants { public static final String CLUSTER_NAME_KEY = "atlas.cluster.name"; public static final String DEFAULT_CLUSTER_NAME = "primary"; public static final String CLUSTER_NAME_ATTRIBUTE = "clusterName"; + public static final String SYSTEM_PROPERTY_APP_PORT = "atlas.app.port"; + public static final String DEFAULT_APP_PORT_STR = "21000"; } diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties index 453435b..00c5d5a 100755 --- a/distro/src/conf/atlas-application.properties +++ b/distro/src/conf/atlas-application.properties @@ -95,4 +95,13 @@ atlas.http.authentication.enabled=false atlas.http.authentication.type=simple ######### Server Properties ######### -atlas.rest.address=http://localhost:21000 \ No newline at end of file +atlas.rest.address=http://localhost:21000 + +######### High Availability Configuration ######## +atlas.server.ha.enabled=false +atlas.server.ids=id1 +atlas.server.address.id1=localhost:21000 +atlas.server.ha.zookeeper.connect=localhost:2181 +atlas.server.ha.zookeeper.retry.sleeptime.ms=1000 +atlas.server.ha.zookeeper.num.retries=3 +atlas.server.ha.zookeeper.session.timeout.ms=20000 \ No newline at end of file diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java index 2701039..889af11 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java @@ -127,6 +127,10 @@ public class KafkaNotification extends AbstractNotification implements Service { @Override public void start() throws AtlasException { + if (isHAEnabled()) { + LOG.info("Not starting embedded instances when HA is enabled."); + return; + } if (isEmbedded()) { try { startZk(); diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java index 885242d..596f988 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java +++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java @@ -18,6 +18,7 @@ package org.apache.atlas.notification; import org.apache.atlas.AtlasException; +import org.apache.atlas.ha.HAConfiguration; import org.apache.commons.configuration.Configuration; import java.util.Arrays; @@ -30,12 +31,14 @@ public abstract class AbstractNotification implements NotificationInterface { private static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded"; private final boolean embedded; + private final boolean isHAEnabled; // ----- Constructors ------------------------------------------------------ public AbstractNotification(Configuration applicationProperties) throws AtlasException { this.embedded = applicationProperties.getBoolean(PROPERTY_EMBEDDED, false); + this.isHAEnabled = HAConfiguration.isHAEnabled(applicationProperties); } @@ -50,6 +53,10 @@ public abstract class AbstractNotification implements NotificationInterface { return embedded; } + protected final boolean isHAEnabled() { + return isHAEnabled; + } + @Override public <T> void send(NotificationType type, List<T> messages) throws NotificationException { String[] strMessages = new String[messages.size()]; diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 2fcbcd3..ca53fd2 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -23,6 +23,8 @@ import kafka.consumer.ConsumerTimeoutException; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; +import org.apache.atlas.ha.HAConfiguration; +import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.service.Service; import org.apache.commons.configuration.Configuration; @@ -30,39 +32,69 @@ import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * Consumer of notifications from hooks e.g., hive hook etc. */ @Singleton -public class NotificationHookConsumer implements Service { +public class NotificationHookConsumer implements Service, ActiveStateChangeHandler { private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class); public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads"; public static final String ATLAS_ENDPOINT_PROPERTY = "atlas.rest.address"; public static final int SERVER_READY_WAIT_TIME_MS = 1000; - @Inject private NotificationInterface notificationInterface; private ExecutorService executors; private String atlasEndpoint; + private Configuration applicationProperties; + private List<HookConsumer> consumers; + + @Inject + public NotificationHookConsumer(NotificationInterface notificationInterface) { + this.notificationInterface = notificationInterface; + } @Override public void start() throws AtlasException { - Configuration applicationProperties = ApplicationProperties.get(); + Configuration configuration = ApplicationProperties.get(); + startInternal(configuration, null); + } - atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000"); + void startInternal(Configuration configuration, + ExecutorService executorService) { + this.applicationProperties = configuration; + this.atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000"); + if (consumers == null) { + consumers = new ArrayList<>(); + } + if (executorService != null) { + executors = executorService; + } + if (!HAConfiguration.isHAEnabled(configuration)) { + LOG.info("HA is disabled, starting consumers inline."); + startConsumers(executorService); + } + } + + private void startConsumers(ExecutorService executorService) { int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1); - List<NotificationConsumer<HookNotification.HookNotificationMessage>> consumers = + List<NotificationConsumer<HookNotification.HookNotificationMessage>> notificationConsumers = notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads); - executors = Executors.newFixedThreadPool(consumers.size()); - - for (final NotificationConsumer<HookNotification.HookNotificationMessage> consumer : consumers) { - executors.submit(new HookConsumer(consumer)); + if (executorService == null) { + executorService = Executors.newFixedThreadPool(notificationConsumers.size()); + } + executors = executorService; + for (final NotificationConsumer<HookNotification.HookNotificationMessage> consumer : notificationConsumers) { + HookConsumer hookConsumer = new HookConsumer(consumer); + consumers.add(hookConsumer); + executors.submit(hookConsumer); } } @@ -71,14 +103,52 @@ public class NotificationHookConsumer implements Service { //Allow for completion of outstanding work notificationInterface.close(); try { - if (executors != null && !executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) { - LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly"); + if (executors != null) { + stopConsumerThreads(); + executors.shutdownNow(); + if (!executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) { + LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly"); + } + executors = null; } } catch (InterruptedException e) { LOG.error("Failure in shutting down consumers"); } } + private void stopConsumerThreads() { + if (consumers != null) { + for (HookConsumer consumer : consumers) { + consumer.stop(); + } + consumers.clear(); + } + } + + /** + * Start Kafka consumer threads that read from Kafka topic when server is activated. + * + * Since the consumers create / update entities to the shared backend store, only the active instance + * should perform this activity. Hence, these threads are started only on server activation. + */ + @Override + public void instanceIsActive() { + LOG.info("Reacting to active state: initializing Kafka consumers"); + startConsumers(executors); + } + + /** + * Stop Kafka consumer threads that read from Kafka topic when server is de-activated. + * + * Since the consumers create / update entities to the shared backend store, only the active instance + * should perform this activity. Hence, these threads are stopped only on server deactivation. + */ + @Override + public void instanceIsPassive() { + LOG.info("Reacting to passive state: shutting down Kafka consumers."); + stop(); + } + static class Timer { public void sleep(int interval) throws InterruptedException { Thread.sleep(interval); @@ -87,6 +157,7 @@ public class NotificationHookConsumer implements Service { class HookConsumer implements Runnable { private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer; + private final AtomicBoolean shouldRun = new AtomicBoolean(false); public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> consumer) { this.consumer = consumer; @@ -102,12 +173,13 @@ public class NotificationHookConsumer implements Service { @Override public void run() { + shouldRun.set(true); if (!serverAvailable(new NotificationHookConsumer.Timer())) { return; } - while (true) { + while (shouldRun.get()) { try { if (hasNext()) { HookNotification.HookNotificationMessage message = consumer.next(); @@ -177,5 +249,9 @@ public class NotificationHookConsumer implements Service { LOG.info("Atlas Server is ready, can start reading Kafka events."); return true; } + + public void stop() { + shouldRun.set(false); + } } } diff --git a/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java index 02255a7..177de6d 100644 --- a/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java @@ -20,18 +20,43 @@ package org.apache.atlas.notification; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasServiceException; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.atlas.ha.HAConfiguration; +import org.apache.commons.configuration.Configuration; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; + import static org.mockito.Mockito.*; import static org.testng.AssertJUnit.assertFalse; import static org.testng.AssertJUnit.assertTrue; public class NotificationHookConsumerTest { + @Mock + private NotificationInterface notificationInterface; + + @Mock + private AtlasClient atlasClient; + + @Mock + private Configuration configuration; + + @Mock + private ExecutorService executorService; + + @BeforeMethod + public void setup() { + MockitoAnnotations.initMocks(this); + } + @Test public void testConsumerCanProceedIfServerIsReady() throws InterruptedException, AtlasServiceException { - final AtlasClient atlasClient = mock(AtlasClient.class); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) { @Override @@ -49,8 +74,7 @@ public class NotificationHookConsumerTest { @Test public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws AtlasServiceException, InterruptedException { - final AtlasClient atlasClient = mock(AtlasClient.class); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) { @Override @@ -68,8 +92,7 @@ public class NotificationHookConsumerTest { @Test public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException, InterruptedException { - final AtlasClient atlasClient = mock(AtlasClient.class); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) { @Override @@ -86,8 +109,7 @@ public class NotificationHookConsumerTest { @Test public void testConsumerProceedsWithFalseOnAtlasServiceException() throws AtlasServiceException { - final AtlasClient atlasClient = mock(AtlasClient.class); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) { @Override @@ -101,4 +123,61 @@ public class NotificationHookConsumerTest { assertFalse(hookConsumer.serverAvailable(timer)); } + + @Test + public void testConsumersStartedIfHAIsDisabled() { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false); + when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); + List<NotificationConsumer<Object>> consumers = new ArrayList(); + consumers.add(mock(NotificationConsumer.class)); + when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). + thenReturn(consumers); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface); + notificationHookConsumer.startInternal(configuration, executorService); + verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1); + verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class)); + } + + @Test + public void testConsumersAreNotStartedIfHAIsEnabled() { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); + List<NotificationConsumer<Object>> consumers = new ArrayList(); + consumers.add(mock(NotificationConsumer.class)); + when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). + thenReturn(consumers); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface); + notificationHookConsumer.startInternal(configuration, executorService); + verifyZeroInteractions(notificationInterface); + } + + @Test + public void testConsumersAreStartedWhenInstanceBecomesActive() { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); + List<NotificationConsumer<Object>> consumers = new ArrayList(); + consumers.add(mock(NotificationConsumer.class)); + when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). + thenReturn(consumers); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface); + notificationHookConsumer.startInternal(configuration, executorService); + notificationHookConsumer.instanceIsActive(); + verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1); + verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class)); + } + + @Test + public void testConsumersAreStoppedWhenInstanceBecomesPassive() { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); + List<NotificationConsumer<Object>> consumers = new ArrayList(); + consumers.add(mock(NotificationConsumer.class)); + when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). + thenReturn(consumers); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface); + notificationHookConsumer.startInternal(configuration, executorService); + notificationHookConsumer.instanceIsPassive(); + verify(notificationInterface).close(); + verify(executorService).shutdownNow(); + } } diff --git a/release-log.txt b/release-log.txt index aaef9e3..87e39e6 100644 --- a/release-log.txt +++ b/release-log.txt @@ -13,6 +13,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ALL CHANGES: +ATLAS-511 Ability to run multiple instances of Atlas Server with automatic failover to one active server (yhemanth via shwethags) ATLAS-577 Integrate entity audit with DefaultMetadataService (shwethags) ATLAS-588 import-hive.sh fails while importing partitions for a non-partitioned table (sumasai via shwethags) ATLAS-575 jetty-maven-plugin fails with ShutdownMonitorThread already started (shwethags) diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java index ae6e988..c4329a5 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java @@ -18,8 +18,12 @@ package org.apache.atlas.repository.audit; +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.Singleton; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; +import org.apache.atlas.ha.HAConfiguration; +import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.service.Service; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; @@ -59,7 +63,8 @@ import java.util.List; * and only 1 version is kept, there can be just 1 audit event per entity id + timestamp. This is ok for one atlas server. * But if there are more than one atlas servers, we should use server id in the key */ -public class HBaseBasedAuditRepository implements Service, EntityAuditRepository { +@Singleton +public class HBaseBasedAuditRepository implements Service, EntityAuditRepository, ActiveStateChangeHandler { private static final Logger LOG = LoggerFactory.getLogger(HBaseBasedAuditRepository.class); public static final String CONFIG_PREFIX = "atlas.audit"; @@ -237,23 +242,47 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository @Override public void start() throws AtlasException { - Configuration atlasConf = ApplicationProperties.get(); + Configuration configuration = ApplicationProperties.get(); + startInternal(configuration, getHBaseConfiguration(configuration)); + } + + @VisibleForTesting + void startInternal(Configuration atlasConf, + org.apache.hadoop.conf.Configuration hbaseConf) throws AtlasException { String tableNameStr = atlasConf.getString(CONFIG_TABLE_NAME, DEFAULT_TABLE_NAME); tableName = TableName.valueOf(tableNameStr); try { - org.apache.hadoop.conf.Configuration hbaseConf = getHBaseConfiguration(atlasConf); - connection = ConnectionFactory.createConnection(hbaseConf); + connection = createConnection(hbaseConf); } catch (IOException e) { throw new AtlasException(e); } - createTableIfNotExists(); + if (!HAConfiguration.isHAEnabled(atlasConf)) { + LOG.info("HA is disabled. Hence creating table on startup."); + createTableIfNotExists(); + } + } + + @VisibleForTesting + protected Connection createConnection(org.apache.hadoop.conf.Configuration hbaseConf) throws IOException { + return ConnectionFactory.createConnection(hbaseConf); } @Override public void stop() throws AtlasException { close(connection); } + + @Override + public void instanceIsActive() throws AtlasException { + LOG.info("Reacting to active: Creating HBase table for Audit if required."); + createTableIfNotExists(); + } + + @Override + public void instanceIsPassive() { + LOG.info("Reacting to passive: No action for now."); + } } diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java index 7eccc58..e7e8fb9 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java @@ -26,12 +26,15 @@ import com.thinkaurelius.titan.core.schema.TitanGraphIndex; import com.thinkaurelius.titan.core.schema.TitanManagement; import com.tinkerpop.blueprints.Edge; import com.tinkerpop.blueprints.Vertex; +import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; import org.apache.atlas.discovery.SearchIndexer; +import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.IndexCreationException; import org.apache.atlas.repository.IndexException; import org.apache.atlas.repository.RepositoryException; +import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.typesystem.types.AttributeInfo; import org.apache.atlas.typesystem.types.ClassType; import org.apache.atlas.typesystem.types.DataTypes; @@ -39,6 +42,7 @@ import org.apache.atlas.typesystem.types.IDataType; import org.apache.atlas.typesystem.types.Multiplicity; import org.apache.atlas.typesystem.types.StructType; import org.apache.atlas.typesystem.types.TraitType; +import org.apache.commons.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +57,7 @@ import java.util.Map; /** * Adds index for properties of a given type when its added before any instances are added. */ -public class GraphBackedSearchIndexer implements SearchIndexer { +public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChangeHandler { private static final Logger LOG = LoggerFactory.getLogger(GraphBackedSearchIndexer.class); @@ -67,13 +71,16 @@ public class GraphBackedSearchIndexer implements SearchIndexer { @Inject public GraphBackedSearchIndexer(GraphProvider<TitanGraph> graphProvider) throws RepositoryException, - IndexException { + AtlasException { + this(graphProvider, ApplicationProperties.get()); + } + GraphBackedSearchIndexer(GraphProvider<TitanGraph> graphProvider, Configuration configuration) + throws IndexException, RepositoryException { this.titanGraph = graphProvider.get(); - - /* Create the transaction for indexing. - */ - initialize(); + if (!HAConfiguration.isHAEnabled(configuration)) { + initialize(); + } } /** @@ -355,6 +362,28 @@ public class GraphBackedSearchIndexer implements SearchIndexer { } } + /** + * Initialize global indices for Titan graph on server activation. + * + * Since the indices are shared state, we need to do this only from an active instance. + */ + @Override + public void instanceIsActive() throws AtlasException { + LOG.info("Reacting to active: initializing index"); + try { + initialize(); + } catch (RepositoryException e) { + throw new AtlasException("Error in reacting to active on initialization", e); + } catch (IndexException e) { + throw new AtlasException("Error in reacting to active on initialization", e); + } + } + + @Override + public void instanceIsPassive() { + LOG.info("Reacting to passive state: No action right now."); + } + /* Commenting this out since we do not need an index for edge label here private void createEdgeMixedIndex(String propertyName) { EdgeLabel edgeLabel = management.getEdgeLabel(propertyName); diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java index 40728bc..cd1161a 100755 --- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java +++ b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java @@ -22,9 +22,13 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.inject.Provider; + +import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; import org.apache.atlas.classification.InterfaceAudience; +import org.apache.atlas.ha.HAConfiguration; +import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.listener.TypesChangeListener; import org.apache.atlas.repository.MetadataRepository; @@ -58,6 +62,7 @@ import org.apache.atlas.typesystem.types.TypeUtils.Pair; import org.apache.atlas.typesystem.types.ValueConversionException; import org.apache.atlas.typesystem.types.utils.TypesUtil; import org.apache.atlas.utils.ParamChecker; +import org.apache.commons.configuration.Configuration; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -71,13 +76,14 @@ import java.util.Collection; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; /** * Simple wrapper over TypeSystem and MetadataRepository services with hooks * for listening to changes to the repository. */ @Singleton -public class DefaultMetadataService implements MetadataService { +public class DefaultMetadataService implements MetadataService, ActiveStateChangeHandler { private static final Logger LOG = LoggerFactory.getLogger(DefaultMetadataService.class); @@ -89,6 +95,8 @@ public class DefaultMetadataService implements MetadataService { private final Collection<TypesChangeListener> typeChangeListeners = new LinkedHashSet<>(); private final Collection<EntityChangeListener> entityChangeListeners = new LinkedHashSet<>(); + private boolean wasInitialized = false; + @Inject DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore, final IBootstrapTypesRegistrar typesRegistrar, @@ -96,14 +104,15 @@ public class DefaultMetadataService implements MetadataService { final Collection<Provider<EntityChangeListener>> entityListenerProviders) throws AtlasException { this(repository, typeStore, typesRegistrar, typeListenerProviders, entityListenerProviders, - TypeSystem.getInstance()); + TypeSystem.getInstance(), ApplicationProperties.get()); } DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore, final IBootstrapTypesRegistrar typesRegistrar, final Collection<Provider<TypesChangeListener>> typeListenerProviders, final Collection<Provider<EntityChangeListener>> entityListenerProviders, - final TypeSystem typeSystem) throws AtlasException { + final TypeSystem typeSystem, + final Configuration configuration) throws AtlasException { this.typeStore = typeStore; this.typesRegistrar = typesRegistrar; this.typeSystem = typeSystem; @@ -117,25 +126,37 @@ public class DefaultMetadataService implements MetadataService { entityChangeListeners.add(provider.get()); } - restoreTypeSystem(); - - typesRegistrar.registerTypes(ReservedTypesRegistrar.getTypesDir(), typeSystem, this); + if (!HAConfiguration.isHAEnabled(configuration)) { + restoreTypeSystem(); + } } - private void restoreTypeSystem() { + private void restoreTypeSystem() throws AtlasException { LOG.info("Restoring type system from the store"); - try { - TypesDef typesDef = typeStore.restore(); + TypesDef typesDef = typeStore.restore(); + if (!wasInitialized) { + LOG.info("Initializing type system for the first time."); typeSystem.defineTypes(typesDef); // restore types before creating super types createSuperTypes(); - } catch (AtlasException e) { - throw new RuntimeException(e); + typesRegistrar.registerTypes(ReservedTypesRegistrar.getTypesDir(), typeSystem, this); + wasInitialized = true; + } else { + LOG.info("Type system was already initialized, refreshing cache."); + refreshCache(typesDef); } LOG.info("Restored type system from the store"); } + private void refreshCache(TypesDef typesDef) throws AtlasException { + TypeSystem.TransientTypeSystem transientTypeSystem + = typeSystem.createTransientTypeSystem(typesDef, true); + Map<String, IDataType> typesAdded = transientTypeSystem.getTypesAdded(); + LOG.info("Number of types got from transient type system: " + typesAdded.size()); + typeSystem.commitTypes(typesAdded); + } + private static final AttributeDefinition NAME_ATTRIBUTE = TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE); private static final AttributeDefinition DESCRIPTION_ATTRIBUTE = @@ -683,4 +704,23 @@ public class DefaultMetadataService implements MetadataService { listener.onEntitiesDeleted(entities); } } + + /** + * Create or restore the {@link TypeSystem} cache on server activation. + * + * When an instance is passive, types could be created outside of its cache by the active instance. + * Hence, when this instance becomes active, it needs to restore the cache from the backend store. + * The first time initialization happens, the indices for these types also needs to be created. + * This must happen only from the active instance, as it updates shared backend state. + */ + @Override + public void instanceIsActive() throws AtlasException { + LOG.info("Reacting to active state: restoring type system"); + restoreTypeSystem(); + } + + @Override + public void instanceIsPassive() { + LOG.info("Reacting to passive state: no action right now"); + } } diff --git a/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryHATest.java b/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryHATest.java new file mode 100644 index 0000000..2f7edb4 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryHATest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.audit; + +import org.apache.atlas.AtlasException; +import org.apache.atlas.ha.HAConfiguration; +import org.apache.commons.configuration.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.io.IOException; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +public class HBaseBasedAuditRepositoryHATest { + + @Mock + private Configuration configuration; + + @Mock + private org.apache.hadoop.conf.Configuration hbaseConf; + + @Mock + private Connection connection; + + @BeforeMethod + public void setup() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testTableShouldNotBeCreatedOnStartIfHAIsEnabled() throws IOException, AtlasException { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getString(HBaseBasedAuditRepository.CONFIG_TABLE_NAME, + HBaseBasedAuditRepository.DEFAULT_TABLE_NAME)). + thenReturn(HBaseBasedAuditRepository.DEFAULT_TABLE_NAME); + HBaseBasedAuditRepository auditRepository = new HBaseBasedAuditRepository() { + @Override + protected Connection createConnection(org.apache.hadoop.conf.Configuration hbaseConf) { + return connection; + } + }; + auditRepository.startInternal(configuration, hbaseConf); + + verifyZeroInteractions(connection); + } + + @Test + public void testShouldCreateTableWhenReactingToActive() throws AtlasException, IOException { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getString(HBaseBasedAuditRepository.CONFIG_TABLE_NAME, + HBaseBasedAuditRepository.DEFAULT_TABLE_NAME)). + thenReturn(HBaseBasedAuditRepository.DEFAULT_TABLE_NAME); + TableName tableName = TableName.valueOf(HBaseBasedAuditRepository.DEFAULT_TABLE_NAME); + Admin admin = mock(Admin.class); + when(connection.getAdmin()).thenReturn(admin); + when(admin.tableExists(tableName)).thenReturn(true); + HBaseBasedAuditRepository auditRepository = new HBaseBasedAuditRepository() { + @Override + protected Connection createConnection(org.apache.hadoop.conf.Configuration hbaseConf) { + return connection; + } + }; + auditRepository.startInternal(configuration, hbaseConf); + auditRepository.instanceIsActive(); + + verify(connection).getAdmin(); + verify(admin).tableExists(tableName); + } +} diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexerTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexerTest.java new file mode 100644 index 0000000..87fdf87 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexerTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.graph; + +import com.thinkaurelius.titan.core.TitanGraph; +import com.thinkaurelius.titan.core.schema.TitanManagement; +import org.apache.atlas.AtlasException; +import org.apache.atlas.ha.HAConfiguration; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.IndexException; +import org.apache.atlas.repository.RepositoryException; +import org.apache.commons.configuration.Configuration; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +public class GraphBackedSearchIndexerTest { + + @Mock + private Configuration configuration; + + @Mock + private GraphProvider<TitanGraph> graphProvider; + + @Mock + private TitanGraph titanGraph; + + @Mock + private TitanManagement titanManagement; + + @BeforeMethod + public void setup() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testSearchIndicesAreInitializedOnConstructionWhenHAIsDisabled() throws IndexException, RepositoryException { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false); + when(graphProvider.get()).thenReturn(titanGraph); + when(titanGraph.getManagementSystem()).thenReturn(titanManagement); + when(titanManagement.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)).thenReturn(true); + + GraphBackedSearchIndexer graphBackedSearchIndexer = new GraphBackedSearchIndexer(graphProvider, configuration); + + verify(titanManagement).containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY); + } + + @Test + public void testSearchIndicesAreNotInitializedOnConstructionWhenHAIsEnabled() throws IndexException, RepositoryException { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(graphProvider.get()).thenReturn(titanGraph); + when(titanGraph.getManagementSystem()).thenReturn(titanManagement); + when(titanManagement.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)).thenReturn(true); + + GraphBackedSearchIndexer graphBackedSearchIndexer = new GraphBackedSearchIndexer(graphProvider, configuration); + + verifyZeroInteractions(titanManagement); + + } + + @Test + public void testIndicesAreReinitializedWhenServerBecomesActive() throws AtlasException { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(graphProvider.get()).thenReturn(titanGraph); + when(titanGraph.getManagementSystem()).thenReturn(titanManagement); + when(titanManagement.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)).thenReturn(true); + + GraphBackedSearchIndexer graphBackedSearchIndexer = new GraphBackedSearchIndexer(graphProvider, configuration); + graphBackedSearchIndexer.instanceIsActive(); + + verify(titanManagement).containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY); + } +} diff --git a/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java b/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java index 0685e19..effee2a 100644 --- a/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java +++ b/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java @@ -25,28 +25,126 @@ import org.apache.atlas.listener.TypesChangeListener; import org.apache.atlas.repository.MetadataRepository; import org.apache.atlas.repository.typestore.ITypeStore; import org.apache.atlas.typesystem.types.TypeSystem; +import org.apache.atlas.ha.HAConfiguration; +import org.apache.atlas.listener.TypesChangeListener; +import org.apache.atlas.repository.MetadataRepository; +import org.apache.atlas.repository.typestore.ITypeStore; +import org.apache.atlas.typesystem.TypesDef; +import org.apache.atlas.typesystem.types.IDataType; +import org.apache.atlas.typesystem.types.TypeSystem; +import org.apache.commons.configuration.Configuration; +import org.mockito.Matchers; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.util.ArrayList; +import java.util.HashMap; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; public class DefaultMetadataServiceMockTest { + @Mock + private IBootstrapTypesRegistrar typesRegistrar; + + @Mock + private TypeSystem typeSystem; + + @Mock + private MetadataRepository metadataRepository; + + @Mock + private ITypeStore typeStore; + + @Mock + private Configuration configuration; + + @BeforeMethod + public void setup() { + MockitoAnnotations.initMocks(this); + } + @Test public void testShouldInvokeTypesRegistrarOnCreation() throws AtlasException { - IBootstrapTypesRegistrar typesRegistrar = mock(IBootstrapTypesRegistrar.class); - TypeSystem typeSystem = mock(TypeSystem.class); when(typeSystem.isRegistered(any(String.class))).thenReturn(true); + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false); DefaultMetadataService defaultMetadataService = new DefaultMetadataService(mock(MetadataRepository.class), mock(ITypeStore.class), typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(), - new ArrayList<Provider<EntityChangeListener>>(), typeSystem); + new ArrayList<Provider<EntityChangeListener>>(), typeSystem, configuration); + + verify(typesRegistrar).registerTypes(ReservedTypesRegistrar.getTypesDir(), + typeSystem, defaultMetadataService); + } + + @Test + public void testShouldNotRestoreTypesIfHAIsEnabled() throws AtlasException { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + + DefaultMetadataService defaultMetadataService = new DefaultMetadataService(metadataRepository, + typeStore, + typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(), + new ArrayList<Provider<EntityChangeListener>>(), typeSystem, configuration); + + verifyZeroInteractions(typeStore); + verifyZeroInteractions(typeSystem); + verifyZeroInteractions(typesRegistrar); + } + + @Test + public void testShouldRestoreTypeSystemOnServerActive() throws AtlasException { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + + TypesDef typesDef = mock(TypesDef.class); + when(typeStore.restore()).thenReturn(typesDef); + when(typeSystem.isRegistered(any(String.class))).thenReturn(true); + + DefaultMetadataService defaultMetadataService = new DefaultMetadataService(metadataRepository, + typeStore, + typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(), + new ArrayList<Provider<EntityChangeListener>>(), typeSystem, configuration); + defaultMetadataService.instanceIsActive(); + verify(typeStore).restore(); + verify(typeSystem).defineTypes(typesDef); verify(typesRegistrar).registerTypes(ReservedTypesRegistrar.getTypesDir(), typeSystem, defaultMetadataService); } + + @Test + public void testShouldOnlyRestoreCacheOnServerActiveIfAlreadyDoneOnce() throws AtlasException { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + + TypesDef typesDef = mock(TypesDef.class); + when(typeStore.restore()).thenReturn(typesDef); + when(typeSystem.isRegistered(any(String.class))).thenReturn(true); + + TypeSystem.TransientTypeSystem transientTypeSystem = mock(TypeSystem.TransientTypeSystem.class); + HashMap<String, IDataType> typesAdded = new HashMap<>(); + when(transientTypeSystem.getTypesAdded()).thenReturn(typesAdded); + when(typeSystem.createTransientTypeSystem(typesDef, true)). + thenReturn(transientTypeSystem); + DefaultMetadataService defaultMetadataService = new DefaultMetadataService(metadataRepository, + typeStore, + typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(), + new ArrayList<Provider<EntityChangeListener>>(), typeSystem, configuration); + + defaultMetadataService.instanceIsActive(); + defaultMetadataService.instanceIsPassive(); + defaultMetadataService.instanceIsActive(); + + verify(typeStore, times(2)).restore(); + verify(typeSystem, times(1)).defineTypes(typesDef); + verify(typesRegistrar, times(1)). + registerTypes(ReservedTypesRegistrar.getTypesDir(), typeSystem, defaultMetadataService); + verify(typeSystem, times(1)).createTransientTypeSystem(typesDef, true); + verify(typeSystem, times(1)).commitTypes(typesAdded); + } } diff --git a/server-api/pom.xml b/server-api/pom.xml index 93a0358..d3e84c4 100644 --- a/server-api/pom.xml +++ b/server-api/pom.xml @@ -47,6 +47,19 @@ <groupId>org.apache.atlas</groupId> <artifactId>atlas-typesystem</artifactId> </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + </dependency> + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-client</artifactId> + </dependency> </dependencies> </project> \ No newline at end of file diff --git a/server-api/src/main/java/org/apache/atlas/ha/HAConfiguration.java b/server-api/src/main/java/org/apache/atlas/ha/HAConfiguration.java new file mode 100644 index 0000000..06977c5 --- /dev/null +++ b/server-api/src/main/java/org/apache/atlas/ha/HAConfiguration.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.ha; + +import org.apache.atlas.AtlasConstants; +import org.apache.atlas.AtlasException; +import org.apache.atlas.security.SecurityProperties; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.net.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; + +/** + * A wrapper for getting configuration entries related to HighAvailability. + */ +public class HAConfiguration { + + private static final Logger LOG = LoggerFactory.getLogger(HAConfiguration.class); + + public static final String ATLAS_SERVER_HA_PREFIX = "atlas.server.ha"; + public static final String ATLAS_SERVER_HA_ENABLED_KEY = ATLAS_SERVER_HA_PREFIX + ".enabled"; + public static final String ATLAS_SERVER_ADDRESS_PREFIX = "atlas.server.address."; + public static final String ATLAS_SERVER_IDS = "atlas.server.ids"; + public static final String HA_ZOOKEEPER_CONNECT = ATLAS_SERVER_HA_PREFIX + ".zookeeper.connect"; + public static final int DEFAULT_ZOOKEEPER_CONNECT_SLEEPTIME_MILLIS = 1000; + public static final String HA_ZOOKEEPER_RETRY_SLEEPTIME_MILLIS = ATLAS_SERVER_HA_PREFIX + ".zookeeper.retry.sleeptime.ms"; + public static final String HA_ZOOKEEPER_NUM_RETRIES = ATLAS_SERVER_HA_PREFIX + ".zookeeper.num.retries"; + public static final int DEFAULT_ZOOKEEPER_CONNECT_NUM_RETRIES = 3; + public static final String HA_ZOOKEEPER_SESSION_TIMEOUT_MS = ATLAS_SERVER_HA_PREFIX + ".zookeeper.session.timeout.ms"; + public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MILLIS = 20000; + + /** + * Return whether HA is enabled or not. + * @param configuration underlying configuration instance + * @return + */ + public static boolean isHAEnabled(Configuration configuration) { + return configuration.getBoolean(ATLAS_SERVER_HA_ENABLED_KEY, false); + } + + /** + * Return the ID corresponding to this Atlas instance. + * + * The match is done by looking for an ID configured in {@link HAConfiguration#ATLAS_SERVER_IDS} key + * that has a host:port entry for the key {@link HAConfiguration#ATLAS_SERVER_ADDRESS_PREFIX}+ID where + * the host is a local IP address and port is set in the system property + * {@link AtlasConstants#SYSTEM_PROPERTY_APP_PORT}. + * + * @param configuration + * @return + * @throws AtlasException if no ID is found that maps to a local IP Address or port + */ + public static String getAtlasServerId(Configuration configuration) throws AtlasException { + // ids are already trimmed by this method + String[] ids = configuration.getStringArray(ATLAS_SERVER_IDS); + String matchingServerId = null; + int appPort = Integer.parseInt(System.getProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT)); + for (String id : ids) { + String hostPort = configuration.getString(ATLAS_SERVER_ADDRESS_PREFIX +id); + if (!StringUtils.isEmpty(hostPort)) { + InetSocketAddress socketAddress; + try { + socketAddress = NetUtils.createSocketAddr(hostPort); + } catch (Exception e) { + LOG.warn("Exception while trying to get socket address for " + hostPort, e); + continue; + } + if (!socketAddress.isUnresolved() + && NetUtils.isLocalAddress(socketAddress.getAddress()) + && appPort == socketAddress.getPort()) { + LOG.info("Found matched server id " + id + " with host port: " + hostPort); + matchingServerId = id; + break; + } + } else { + LOG.info("Could not find matching address entry for id: " + id); + } + } + if (matchingServerId == null) { + String msg = String.format("Could not find server id for this instance. " + + "Unable to find IDs matching any local host and port binding among %s", + StringUtils.join(ids, ",")); + throw new AtlasException(msg); + } + return matchingServerId; + } + + /** + * Get the web server address that a server instance with the passed ID is bound to. + * + * This method uses the property {@link SecurityProperties#TLS_ENABLED} to determine whether + * the URL is http or https. + * + * @param configuration underlying configuration + * @param serverId serverId whose host:port property is picked to build the web server address. + * @return + */ + public static String getBoundAddressForId(Configuration configuration, String serverId) { + String hostPort = configuration.getString(ATLAS_SERVER_ADDRESS_PREFIX +serverId); + boolean isSecure = configuration.getBoolean(SecurityProperties.TLS_ENABLED); + String protocol = (isSecure) ? "https://" : "http://"; + return protocol + hostPort; + } + + /** + * A collection of Zookeeper specific configuration that is used by High Availability code + */ + public static class ZookeeperProperties { + private String connectString; + private int retriesSleepTimeMillis; + private int numRetries; + private int sessionTimeout; + + public ZookeeperProperties(String connectString, int retriesSleepTimeMillis, int numRetries, + int sessionTimeout) { + this.connectString = connectString; + this.retriesSleepTimeMillis = retriesSleepTimeMillis; + this.numRetries = numRetries; + this.sessionTimeout = sessionTimeout; + } + + public String getConnectString() { + return connectString; + } + + public int getRetriesSleepTimeMillis() { + return retriesSleepTimeMillis; + } + + public int getNumRetries() { + return numRetries; + } + + public int getSessionTimeout() { + return sessionTimeout; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ZookeeperProperties that = (ZookeeperProperties) o; + + if (retriesSleepTimeMillis != that.retriesSleepTimeMillis) return false; + if (numRetries != that.numRetries) return false; + if (sessionTimeout != that.sessionTimeout) return false; + return !(connectString != null ? !connectString.equals(that.connectString) : that.connectString != null); + + } + + @Override + public int hashCode() { + int result = connectString != null ? connectString.hashCode() : 0; + result = 31 * result + retriesSleepTimeMillis; + result = 31 * result + numRetries; + result = 31 * result + sessionTimeout; + return result; + } + } + + public static ZookeeperProperties getZookeeperProperties(Configuration configuration) { + String zookeeperConnectString = configuration.getString("atlas.kafka.zookeeper.connect"); + if (configuration.containsKey(HA_ZOOKEEPER_CONNECT)) { + zookeeperConnectString = configuration.getString(HA_ZOOKEEPER_CONNECT); + } + + int retriesSleepTimeMillis = configuration.getInt(HA_ZOOKEEPER_RETRY_SLEEPTIME_MILLIS, + DEFAULT_ZOOKEEPER_CONNECT_SLEEPTIME_MILLIS); + + int numRetries = configuration.getInt(HA_ZOOKEEPER_NUM_RETRIES, DEFAULT_ZOOKEEPER_CONNECT_NUM_RETRIES); + + int sessionTimeout = configuration.getInt(HA_ZOOKEEPER_SESSION_TIMEOUT_MS, + DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MILLIS); + return new ZookeeperProperties(zookeeperConnectString, retriesSleepTimeMillis, numRetries, sessionTimeout); + } +} diff --git a/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java new file mode 100644 index 0000000..87a69ef --- /dev/null +++ b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.listener; + +import org.apache.atlas.AtlasException; + +/** + * An interface that should be implemented by objects and services to react to changes in state of an Atlas server. + * + * The two state transitions we handle are (1) becoming active and (2) becoming passive. + */ +public interface ActiveStateChangeHandler { + + /** + * Callback that is invoked on an implementor when this instance of Atlas server is declared the leader. + * + * Any initialization that must be carried out by an implementor only when the server becomes active + * should happen on this callback. + * + * @throws {@link AtlasException} if anything is wrong on initialization + */ + void instanceIsActive() throws AtlasException; + + /** + * Callback that is invoked on an implementor when this instance of Atlas server is removed as the leader. + * + * Any cleanup that must be carried out by an implementor when the server becomes passive + * should happen on this callback. + * + * @throws {@link AtlasException} if anything is wrong on shutdown + */ + void instanceIsPassive() throws AtlasException; +} diff --git a/server-api/src/main/test/org/apache/atlas/ha/HAConfigurationTest.java b/server-api/src/main/test/org/apache/atlas/ha/HAConfigurationTest.java new file mode 100644 index 0000000..a7c9f37 --- /dev/null +++ b/server-api/src/main/test/org/apache/atlas/ha/HAConfigurationTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.ha; + +import org.apache.atlas.AtlasConstants; +import org.apache.atlas.AtlasException; +import org.apache.atlas.security.SecurityProperties; +import org.apache.commons.configuration.Configuration; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + +public class HAConfigurationTest { + + @Mock + private Configuration configuration; + + @BeforeMethod + public void setup() { + MockitoAnnotations.initMocks(this); + System.setProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT, AtlasConstants.DEFAULT_APP_PORT_STR); + } + + @Test + public void testShouldSelectRightServerAddress() throws AtlasException { + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1", "id2"}); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:31000"); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id2")).thenReturn("127.0.0.1:21000"); + + String atlasServerId = HAConfiguration.getAtlasServerId(configuration); + assertEquals(atlasServerId, "id2"); + } + + @Test(expectedExceptions = AtlasException.class) + public void testShouldFailIfNoIDsConfiguration() throws AtlasException { + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {}); + HAConfiguration.getAtlasServerId(configuration); + fail("Should not return any server id if IDs not found in configuration"); + } + + @Test(expectedExceptions = AtlasException.class) + public void testShouldFailIfNoMatchingAddressForID() throws AtlasException { + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1", "id2"}); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:31000"); + + HAConfiguration.getAtlasServerId(configuration); + fail("Should not return any server id if no matching address found for any ID"); + } + + @Test + public void testShouldReturnHTTPBoundAddress() { + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); + when(configuration.getBoolean(SecurityProperties.TLS_ENABLED)).thenReturn(false); + + String address = HAConfiguration.getBoundAddressForId(configuration, "id1"); + + assertEquals(address, "http://127.0.0.1:21000"); + } + + @Test + public void testShouldReturnHTTPSBoundAddress() { + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21443"); + when(configuration.getBoolean(SecurityProperties.TLS_ENABLED)).thenReturn(true); + + String address = HAConfiguration.getBoundAddressForId(configuration, "id1"); + + assertEquals(address, "https://127.0.0.1:21443"); + } +} diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java index b41f3db..402800e 100755 --- a/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java +++ b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java @@ -27,11 +27,14 @@ import org.apache.atlas.classification.InterfaceAudience; import org.apache.atlas.typesystem.TypesDef; import org.apache.atlas.typesystem.exception.TypeExistsException; import org.apache.atlas.typesystem.exception.TypeNotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.inject.Singleton; import java.lang.reflect.Constructor; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -43,6 +46,8 @@ import java.util.concurrent.ConcurrentHashMap; @Singleton @InterfaceAudience.Private public class TypeSystem { + private static final Logger LOG = LoggerFactory.getLogger(TypeSystem.class); + private static final TypeSystem INSTANCE = new TypeSystem(); private static ThreadLocal<SimpleDateFormat> dateFormat = new ThreadLocal<SimpleDateFormat>() { @Override @@ -333,7 +338,10 @@ public class TypeSystem { IDataType type = typeEntry.getValue(); //Add/replace the new type in the typesystem types.put(typeName, type); - typeCategoriesToTypeNamesMap.put(type.getTypeCategory(), typeName); + // ArrayListMultiMap allows duplicates - we want to avoid this during re-activation. + if (!typeCategoriesToTypeNamesMap.containsEntry(type.getTypeCategory(), typeName)) { + typeCategoriesToTypeNamesMap.put(type.getTypeCategory(), typeName); + } } } diff --git a/typesystem/src/main/resources/atlas-application.properties b/typesystem/src/main/resources/atlas-application.properties index 9a32e04..f753785 100644 --- a/typesystem/src/main/resources/atlas-application.properties +++ b/typesystem/src/main/resources/atlas-application.properties @@ -87,4 +87,8 @@ atlas.server.https.port=31443 hbase.security.authentication=simple -atlas.hook.falcon.synchronous=true \ No newline at end of file +atlas.hook.falcon.synchronous=true +######### High Availability Configuration ######## +atlas.server.ha.enabled=false +atlas.server.ids=id1 +atlas.server.address.id1=localhost:21000 diff --git a/typesystem/src/test/java/org/apache/atlas/typesystem/types/TypeSystemTest.java b/typesystem/src/test/java/org/apache/atlas/typesystem/types/TypeSystemTest.java index f9f5f21..a3be4c5 100755 --- a/typesystem/src/test/java/org/apache/atlas/typesystem/types/TypeSystemTest.java +++ b/typesystem/src/test/java/org/apache/atlas/typesystem/types/TypeSystemTest.java @@ -32,12 +32,14 @@ import org.testng.annotations.Test; import scala.actors.threadpool.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import static org.apache.atlas.typesystem.types.utils.TypesUtil.createClassTypeDef; import static org.apache.atlas.typesystem.types.utils.TypesUtil.createRequiredAttrDef; import static org.apache.atlas.typesystem.types.utils.TypesUtil.createStructTypeDef; import static org.apache.atlas.typesystem.types.utils.TypesUtil.createTraitTypeDef; +import static org.testng.Assert.assertTrue; public class TypeSystemTest extends BaseTest { @@ -55,7 +57,7 @@ public class TypeSystemTest extends BaseTest { public void testGetTypeNames() throws Exception { getTypeSystem().defineEnumType("enum_test", new EnumValue("0", 0), new EnumValue("1", 1), new EnumValue("2", 2), new EnumValue("3", 3)); - Assert.assertTrue(getTypeSystem().getTypeNames().contains("enum_test")); + assertTrue(getTypeSystem().getTypeNames().contains("enum_test")); } @Test @@ -65,7 +67,7 @@ public class TypeSystemTest extends BaseTest { String typeDescription = typeName + description; getTypeSystem().defineEnumType(typeName, typeDescription, new EnumValue("0", 0), new EnumValue("1", 1), new EnumValue("2", 2), new EnumValue("3", 3)); - Assert.assertTrue(getTypeSystem().getTypeNames().contains(typeName)); + assertTrue(getTypeSystem().getTypeNames().contains(typeName)); IDataType type = getTypeSystem().getDataType(EnumType.class, typeName); Assert.assertNotNull(type); Assert.assertEquals(type.getDescription(), typeDescription); @@ -76,7 +78,7 @@ public class TypeSystemTest extends BaseTest { .createTraitTypeDef(typeName, typeDescription, ImmutableSet.<String>of(), TypesUtil.createRequiredAttrDef("type", DataTypes.STRING_TYPE)); getTypeSystem().defineTraitType(trait); - Assert.assertTrue(getTypeSystem().getTypeNames().contains(typeName)); + assertTrue(getTypeSystem().getTypeNames().contains(typeName)); type = getTypeSystem().getDataType(TraitType.class, typeName); Assert.assertNotNull(type); Assert.assertEquals(type.getDescription(), typeDescription); @@ -87,7 +89,7 @@ public class TypeSystemTest extends BaseTest { .createClassTypeDef(typeName, typeDescription, ImmutableSet.<String>of(), TypesUtil.createRequiredAttrDef("type", DataTypes.STRING_TYPE)); getTypeSystem().defineClassType(classType); - Assert.assertTrue(getTypeSystem().getTypeNames().contains(typeName)); + assertTrue(getTypeSystem().getTypeNames().contains(typeName)); type = getTypeSystem().getDataType(ClassType.class, typeName); Assert.assertNotNull(type); Assert.assertEquals(type.getDescription(), typeDescription); @@ -95,7 +97,7 @@ public class TypeSystemTest extends BaseTest { typeName = "struct_type"; typeDescription = typeName + description; getTypeSystem().defineStructType(typeName, typeDescription, true, createRequiredAttrDef("a", DataTypes.INT_TYPE)); - Assert.assertTrue(getTypeSystem().getTypeNames().contains(typeName)); + assertTrue(getTypeSystem().getTypeNames().contains(typeName)); type = getTypeSystem().getDataType(StructType.class, typeName); Assert.assertNotNull(type); Assert.assertEquals(type.getDescription(), typeDescription); @@ -106,7 +108,7 @@ public class TypeSystemTest extends BaseTest { public void testIsRegistered() throws Exception { getTypeSystem().defineEnumType("enum_test", new EnumValue("0", 0), new EnumValue("1", 1), new EnumValue("2", 2), new EnumValue("3", 3)); - Assert.assertTrue(getTypeSystem().isRegistered("enum_test")); + assertTrue(getTypeSystem().isRegistered("enum_test")); } @Test @@ -182,9 +184,9 @@ public class TypeSystemTest extends BaseTest { ClassType bc = ts.getDataType(ClassType.class, "B"); ClassType cc = ts.getDataType(ClassType.class, "C"); - Assert.assertTrue(ac.compareTo(bc) < 0); - Assert.assertTrue(bc.compareTo(cc) < 0); - Assert.assertTrue(ac.compareTo(cc) < 0); + assertTrue(ac.compareTo(bc) < 0); + assertTrue(bc.compareTo(cc) < 0); + assertTrue(ac.compareTo(cc) < 0); } @Test @@ -223,4 +225,31 @@ public class TypeSystemTest extends BaseTest { Assert.assertEquals(traitNames.size(), 4); Assert.assertEquals(classNames.size(), 3); } + + @Test + public void testTypeNamesAreNotDuplicated() { + TypeSystem typeSystem = getTypeSystem(); + ImmutableList<String> traitNames = typeSystem.getTypeNamesByCategory(DataTypes.TypeCategory.TRAIT); + int numTraits = traitNames.size(); + + HashMap<String, IDataType> typesAdded = new HashMap<>(); + String traitName = "dup_type_test" + random(); + TraitType traitType = new TraitType(typeSystem, traitName, null, null, 0); + typesAdded.put(traitName, traitType); + typeSystem.commitTypes(typesAdded); + + traitNames = typeSystem.getTypeNamesByCategory(DataTypes.TypeCategory.TRAIT); + Assert.assertEquals(traitNames.size(), numTraits+1); + + // add again with another trait this time + traitName = "dup_type_test" + random(); + TraitType traitTypeNew = new TraitType(typeSystem, traitName, null, null, 0); + typesAdded.put(traitName, traitTypeNew); + + typeSystem.commitTypes(typesAdded); + traitNames = typeSystem.getTypeNamesByCategory(DataTypes.TypeCategory.TRAIT); + Assert.assertEquals(traitNames.size(), numTraits+2); + } + + } diff --git a/webapp/src/main/java/org/apache/atlas/Atlas.java b/webapp/src/main/java/org/apache/atlas/Atlas.java index 2d2d619..58c386d 100755 --- a/webapp/src/main/java/org/apache/atlas/Atlas.java +++ b/webapp/src/main/java/org/apache/atlas/Atlas.java @@ -18,6 +18,7 @@ package org.apache.atlas; +import org.apache.atlas.security.SecurityProperties; import org.apache.atlas.web.service.EmbeddedServer; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; @@ -96,10 +97,11 @@ public final class Atlas { setApplicationHome(); Configuration configuration = ApplicationProperties.get(); - final String enableTLSFlag = configuration.getString("atlas.enableTLS"); + final String enableTLSFlag = configuration.getString(SecurityProperties.TLS_ENABLED); final int appPort = getApplicationPort(cmd, enableTLSFlag, configuration); + System.setProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT, String.valueOf(appPort)); final boolean enableTLS = isTLSEnabled(enableTLSFlag, appPort); - configuration.setProperty("atlas.enableTLS", String.valueOf(enableTLS)); + configuration.setProperty(SecurityProperties.TLS_ENABLED, String.valueOf(enableTLS)); showStartupInfo(buildConfiguration, enableTLS, appPort); @@ -147,7 +149,7 @@ public final class Atlas { private static boolean isTLSEnabled(String enableTLSFlag, int appPort) { return Boolean.valueOf(StringUtils.isEmpty(enableTLSFlag) ? - System.getProperty("atlas.enableTLS", (appPort % 1000) == 443 ? "true" : "false") : enableTLSFlag); + System.getProperty(SecurityProperties.TLS_ENABLED, (appPort % 1000) == 443 ? "true" : "false") : enableTLSFlag); } private static void showStartupInfo(PropertiesConfiguration buildConfiguration, boolean enableTLS, int appPort) { diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/ActiveServerFilter.java b/webapp/src/main/java/org/apache/atlas/web/filters/ActiveServerFilter.java new file mode 100644 index 0000000..49ab1ba --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/filters/ActiveServerFilter.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.filters; + +import org.apache.atlas.web.service.ActiveInstanceState; +import org.apache.atlas.web.service.ServiceState; +import org.apache.hadoop.http.HtmlQuoting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import javax.inject.Singleton; +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.core.HttpHeaders; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A servlet {@link Filter} that redirects web requests from a passive Atlas server instance to an active one. + * + * All requests to an active instance pass through. Requests received by a passive instance are redirected + * by identifying the currently active server. Requests to servers which are in transition are returned with + * an error SERVICE_UNAVAILABLE. Identification of this state is carried out using + * {@link ServiceState} and {@link ActiveInstanceState}. + */ +@Singleton +public class ActiveServerFilter implements Filter { + + private static final Logger LOG = LoggerFactory.getLogger(ActiveServerFilter.class); + private final ActiveInstanceState activeInstanceState; + private ServiceState serviceState; + + @Inject + public ActiveServerFilter(ActiveInstanceState activeInstanceState, ServiceState serviceState) { + this.activeInstanceState = activeInstanceState; + this.serviceState = serviceState; + } + + @Override + public void init(FilterConfig filterConfig) throws ServletException { + LOG.info("ActiveServerFilter initialized"); + } + + /** + * Determines if this Atlas server instance is passive and redirects to active if so. + * + * @param servletRequest Request object from which the URL and other parameters are determined. + * @param servletResponse Response object to handle the redirect. + * @param filterChain Chain to pass through requests if the instance is Active. + * @throws IOException + * @throws ServletException + */ + @Override + public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, + FilterChain filterChain) throws IOException, ServletException { + if (isInstanceActive()) { + LOG.debug("Active. Passing request downstream"); + filterChain.doFilter(servletRequest, servletResponse); + } else if (serviceState.isInstanceInTransition()) { + HttpServletResponse httpServletResponse = (HttpServletResponse) servletResponse; + LOG.error("Instance in transition. Service may not be ready to return a result"); + httpServletResponse.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); + } else { + HttpServletResponse httpServletResponse = (HttpServletResponse) servletResponse; + String activeServerAddress = activeInstanceState.getActiveServerAddress(); + if (activeServerAddress == null) { + LOG.error("Could not retrieve active server address as it is null. Cannot redirect request {}", + ((HttpServletRequest)servletRequest).getRequestURI()); + httpServletResponse.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); + } else { + handleRedirect((HttpServletRequest) servletRequest, httpServletResponse, activeServerAddress); + } + } + } + + boolean isInstanceActive() { + return serviceState.getState() == ServiceState.ServiceStateValue.ACTIVE; + } + + private void handleRedirect(HttpServletRequest servletRequest, HttpServletResponse httpServletResponse, + String activeServerAddress) throws IOException { + HttpServletRequest httpServletRequest = servletRequest; + String requestURI = httpServletRequest.getRequestURI(); + String queryString = httpServletRequest.getQueryString(); + if ((queryString != null) && (!queryString.isEmpty())) { + requestURI += "?" + queryString; + } + String quotedUri = HtmlQuoting.quoteHtmlChars(requestURI); + if (quotedUri == null) { + quotedUri = "/"; + } + String redirectLocation = activeServerAddress + quotedUri; + LOG.info("Not active. Redirecting to {}", redirectLocation); + // A POST/PUT/DELETE require special handling by sending HTTP 307 instead of the regular 301/302. + // Reference: http://stackoverflow.com/questions/2068418/whats-the-difference-between-a-302-and-a-307-redirect + if (isUnsafeHttpMethod(httpServletRequest)) { + httpServletResponse.setHeader(HttpHeaders.LOCATION, redirectLocation); + httpServletResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT); + } else { + httpServletResponse.sendRedirect(redirectLocation); + } + } + + private boolean isUnsafeHttpMethod(HttpServletRequest httpServletRequest) { + String method = httpServletRequest.getMethod(); + return (method.equals(HttpMethod.POST)) || + (method.equals(HttpMethod.PUT)) || + (method.equals(HttpMethod.DELETE)); + } + + @Override + public void destroy() { + + } +} diff --git a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java index dac89d7..6bfd780 100755 --- a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java +++ b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java @@ -34,11 +34,14 @@ import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; import org.apache.atlas.RepositoryMetadataModule; +import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.notification.NotificationModule; import org.apache.atlas.repository.graph.GraphProvider; import org.apache.atlas.service.Services; +import org.apache.atlas.web.filters.ActiveServerFilter; import org.apache.atlas.web.filters.AtlasAuthenticationFilter; import org.apache.atlas.web.filters.AuditFilter; +import org.apache.atlas.web.service.ActiveInstanceElectorModule; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.slf4j.Logger; @@ -72,11 +75,26 @@ public class GuiceServletConfig extends GuiceServletContextListener { LoginProcessor loginProcessor = new LoginProcessor(); loginProcessor.login(); - injector = Guice.createInjector(getRepositoryModule(), new NotificationModule(), - new JerseyServletModule() { + injector = Guice.createInjector(getRepositoryModule(), new ActiveInstanceElectorModule(), + new NotificationModule(), new JerseyServletModule() { + + private Configuration appConfiguration = null; + + private Configuration getConfiguration() { + if (appConfiguration == null) { + try { + appConfiguration = ApplicationProperties.get(); + } catch (AtlasException e) { + LOG.warn("Could not load application configuration", e); + } + } + return appConfiguration; + } + @Override protected void configureServlets() { filter("/*").through(AuditFilter.class); + configureActiveServerFilterIfNecessary(); try { configureAuthenticationFilter(); } catch (ConfigurationException e) { @@ -92,15 +110,24 @@ public class GuiceServletConfig extends GuiceServletContextListener { serve("/" + AtlasClient.BASE_URI + "*").with(GuiceContainer.class, params); } + private void configureActiveServerFilterIfNecessary() { + Configuration configuration = getConfiguration(); + if ((configuration == null) || + !HAConfiguration.isHAEnabled(configuration)) { + LOG.info("HA configuration is disabled, not activating ActiveServerFilter"); + } else { + filter("/*").through(ActiveServerFilter.class); + } + } + private void configureAuthenticationFilter() throws ConfigurationException { - try { - Configuration configuration = ApplicationProperties.get(); - if (Boolean.valueOf(configuration.getString(HTTP_AUTHENTICATION_ENABLED))) { - LOG.info("Enabling AuthenticationFilter"); - filter("/*").through(AtlasAuthenticationFilter.class); - } - } catch (AtlasException e) { - LOG.warn("Error loading configuration and initializing authentication filter", e); + Configuration configuration = getConfiguration(); + if (configuration == null) { + throw new ConfigurationException("Could not load application configuration"); + } + if (Boolean.valueOf(configuration.getString(HTTP_AUTHENTICATION_ENABLED))) { + LOG.info("Enabling AuthenticationFilter"); + filter("/*").through(AtlasAuthenticationFilter.class); } } }); diff --git a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorModule.java b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorModule.java new file mode 100644 index 0000000..d662683 --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorModule.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.service; + +import com.google.inject.AbstractModule; +import com.google.inject.multibindings.Multibinder; +import org.apache.atlas.notification.NotificationHookConsumer; +import org.apache.atlas.repository.audit.HBaseBasedAuditRepository; +import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; +import org.apache.atlas.service.Service; +import org.apache.atlas.listener.ActiveStateChangeHandler; +import org.apache.atlas.services.DefaultMetadataService; +import org.apache.atlas.web.filters.ActiveServerFilter; + +/** + * A Guice module that registers the handlers of High Availability state change handlers and other services. + * + * Any new handler that should react to HA state change should be registered here. + */ +public class ActiveInstanceElectorModule extends AbstractModule { + @Override + protected void configure() { + Multibinder<ActiveStateChangeHandler> activeStateChangeHandlerBinder = + Multibinder.newSetBinder(binder(), ActiveStateChangeHandler.class); + activeStateChangeHandlerBinder.addBinding().to(GraphBackedSearchIndexer.class); + activeStateChangeHandlerBinder.addBinding().to(DefaultMetadataService.class); + activeStateChangeHandlerBinder.addBinding().to(NotificationHookConsumer.class); + activeStateChangeHandlerBinder.addBinding().to(HBaseBasedAuditRepository.class); + + Multibinder<Service> serviceBinder = Multibinder.newSetBinder(binder(), Service.class); + serviceBinder.addBinding().to(ActiveInstanceElectorService.class); + } +} diff --git a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java new file mode 100644 index 0000000..9d7db6d --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.service; + +import com.google.inject.Inject; +import com.google.inject.Provider; +import com.google.inject.Singleton; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; +import org.apache.atlas.ha.HAConfiguration; +import org.apache.atlas.listener.ActiveStateChangeHandler; +import org.apache.atlas.service.Service; +import org.apache.commons.configuration.Configuration; +import org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.curator.framework.recipes.leader.LeaderLatchListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; + +/** + * A service that implements leader election to determine whether this Atlas server is Active. + * + * The service implements leader election through <a href="http://curator.apache.org/">Curator</a>'s + * {@link LeaderLatch} recipe. The service also implements {@link LeaderLatchListener} to get + * notified of changes to leadership state. Upon becoming leader, this instance is treated as the + * active Atlas instance and calls {@link ActiveStateChangeHandler}s to activate them. Conversely, + * on being removed from leadership, this instance is treated as a passive instance and calls + * {@link ActiveStateChangeHandler}s to deactivate them. + */ +@Singleton +public class ActiveInstanceElectorService implements Service, LeaderLatchListener { + + private static final Logger LOG = LoggerFactory.getLogger(ActiveInstanceElectorService.class); + + private final Configuration configuration; + private final ServiceState serviceState; + private final ActiveInstanceState activeInstanceState; + private Collection<Provider<ActiveStateChangeHandler>> activeStateChangeHandlerProviders; + private Collection<ActiveStateChangeHandler> activeStateChangeHandlers; + private CuratorFactory curatorFactory; + private LeaderLatch leaderLatch; + private String serverId; + + /** + * Create a new instance of {@link ActiveInstanceElectorService} + * @param activeStateChangeHandlerProviders The list of registered {@link ActiveStateChangeHandler}s that + * must be called back on state changes. + * @throws AtlasException + */ + @Inject + public ActiveInstanceElectorService( + Collection<Provider<ActiveStateChangeHandler>> activeStateChangeHandlerProviders, + CuratorFactory curatorFactory, ActiveInstanceState activeInstanceState, + ServiceState serviceState) + throws AtlasException { + this(ApplicationProperties.get(), activeStateChangeHandlerProviders, + curatorFactory, activeInstanceState, serviceState); + } + + ActiveInstanceElectorService(Configuration configuration, + Collection<Provider<ActiveStateChangeHandler>> activeStateChangeHandlerProviders, + CuratorFactory curatorFactory, ActiveInstanceState activeInstanceState, + ServiceState serviceState) { + this.configuration = configuration; + this.activeStateChangeHandlerProviders = activeStateChangeHandlerProviders; + this.activeStateChangeHandlers = new ArrayList<>(); + this.curatorFactory = curatorFactory; + this.activeInstanceState = activeInstanceState; + this.serviceState = serviceState; + } + + /** + * Join leader election on starting up. + * + * If Atlas High Availability configuration is disabled, this operation is a no-op. + * @throws AtlasException + */ + @Override + public void start() throws AtlasException { + if (!HAConfiguration.isHAEnabled(configuration)) { + LOG.info("HA is not enabled, no need to start leader election service"); + return; + } + cacheActiveStateChangeHandlers(); + serverId = HAConfiguration.getAtlasServerId(configuration); + joinElection(); + } + + private void joinElection() { + LOG.info("Starting leader election for {}", serverId); + leaderLatch = curatorFactory.leaderLatchInstance(serverId); + leaderLatch.addListener(this); + try { + leaderLatch.start(); + LOG.info("Leader latch started for {}.", serverId); + } catch (Exception e) { + LOG.info("Exception while starting leader latch for {}.", serverId, e); + } + } + + /** + * Leave leader election process and clean up resources on shutting down. + * + * If Atlas High Availability configuration is disabled, this operation is a no-op. + * @throws AtlasException + */ + @Override + public void stop() { + if (!HAConfiguration.isHAEnabled(configuration)) { + LOG.info("HA is not enabled, no need to stop leader election service"); + return; + } + try { + leaderLatch.close(); + curatorFactory.close(); + } catch (IOException e) { + LOG.error("Error closing leader latch", e); + } + } + + /** + * Call all registered {@link ActiveStateChangeHandler}s on being elected active. + * + * In addition, shared state information about this instance becoming active is updated + * using {@link ActiveInstanceState}. + */ + @Override + public void isLeader() { + LOG.warn("Server instance with server id {} is elected as leader", serverId); + serviceState.becomingActive(); + try { + for (ActiveStateChangeHandler handler : activeStateChangeHandlers) { + handler.instanceIsActive(); + } + activeInstanceState.update(serverId); + serviceState.setActive(); + } catch (Exception e) { + LOG.error("Got exception while activating", e); + notLeader(); + rejoinElection(); + } + } + + private void cacheActiveStateChangeHandlers() { + if (activeStateChangeHandlers.size()==0) { + for (Provider<ActiveStateChangeHandler> provider : activeStateChangeHandlerProviders) { + ActiveStateChangeHandler handler = provider.get(); + activeStateChangeHandlers.add(handler); + } + } + } + + private void rejoinElection() { + try { + leaderLatch.close(); + joinElection(); + } catch (IOException e) { + LOG.error("Error rejoining election", e); + } + } + + /** + * Call all registered {@link ActiveStateChangeHandler}s on becoming passive instance. + */ + @Override + public void notLeader() { + LOG.warn("Server instance with server id {} is removed as leader", serverId); + serviceState.becomingPassive(); + for (ActiveStateChangeHandler handler: activeStateChangeHandlers) { + try { + handler.instanceIsPassive(); + } catch (AtlasException e) { + LOG.error("Error while reacting to passive state.", e); + } + } + serviceState.setPassive(); + } +} diff --git a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceState.java b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceState.java new file mode 100644 index 0000000..88c3adb --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceState.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.service; + +import com.google.inject.Inject; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; +import org.apache.atlas.ha.HAConfiguration; +import org.apache.commons.configuration.Configuration; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; + +/** + * An object that encapsulates storing and retrieving state related to an Active Atlas server. + * + * The current implementation uses Zookeeper to store and read this state from. It does this + * under a read-write lock implemented using Curator's {@link InterProcessReadWriteLock} to + * provide for safety across multiple processes. + */ +public class ActiveInstanceState { + + private final Configuration configuration; + private final CuratorFactory curatorFactory; + + public static final String APACHE_ATLAS_ACTIVE_SERVER_INFO = "/apache_atlas_active_server_info"; + private static final Logger LOG = LoggerFactory.getLogger(ActiveInstanceState.class); + + /** + * Create a new instance of {@link ActiveInstanceState}. + * @param curatorFactory an instance of {@link CuratorFactory} to get the {@link InterProcessReadWriteLock} + * @throws AtlasException + */ + @Inject + public ActiveInstanceState(CuratorFactory curatorFactory) throws AtlasException { + this(ApplicationProperties.get(), curatorFactory); + } + + /** + * Create a new instance of {@link ActiveInstanceState}. + * @param configuration an instance of {@link Configuration} created from Atlas configuration + * @param curatorFactory an instance of {@link CuratorFactory} to get the {@link InterProcessReadWriteLock} + * @throws AtlasException + */ + public ActiveInstanceState(Configuration configuration, CuratorFactory curatorFactory) { + this.configuration = configuration; + this.curatorFactory = curatorFactory; + } + + /** + * Update state of the active server instance. + * + * This method writes this instance's Server Address to a shared node in Zookeeper. + * This information is used by other passive instances to locate the current active server. + * @throws Exception + * @param serverId ID of this server instance + */ + public void update(String serverId) throws Exception { + CuratorFramework client = curatorFactory.clientInstance(); + String atlasServerAddress = HAConfiguration.getBoundAddressForId(configuration, serverId); + Stat serverInfo = client.checkExists().forPath(APACHE_ATLAS_ACTIVE_SERVER_INFO); + if (serverInfo == null) { + client.create().withMode(CreateMode.EPHEMERAL).forPath(APACHE_ATLAS_ACTIVE_SERVER_INFO); + } + client.setData().forPath(APACHE_ATLAS_ACTIVE_SERVER_INFO, + atlasServerAddress.getBytes(Charset.forName("UTF-8"))); + } + + /** + * Retrieve state of the active server instance. + * + * This method reads the active server location from the shared node in Zookeeper. + * @return the active server's address and port of form http://host-or-ip:port + */ + public String getActiveServerAddress() { + CuratorFramework client = curatorFactory.clientInstance(); + String serverAddress = null; + try { + byte[] bytes = client.getData().forPath(APACHE_ATLAS_ACTIVE_SERVER_INFO); + serverAddress = new String(bytes, Charset.forName("UTF-8")); + } catch (Exception e) { + LOG.error("Error getting active server address", e); + } + return serverAddress; + } + +} diff --git a/webapp/src/main/java/org/apache/atlas/web/service/CuratorFactory.java b/webapp/src/main/java/org/apache/atlas/web/service/CuratorFactory.java new file mode 100644 index 0000000..0bee340 --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/service/CuratorFactory.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.service; + +import com.google.inject.Singleton; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; +import org.apache.atlas.ha.HAConfiguration; +import org.apache.commons.configuration.Configuration; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; +import org.apache.curator.retry.ExponentialBackoffRetry; + +/** + * A factory to create objects related to Curator. + * + * Allows for stubbing in tests. + */ +@Singleton +public class CuratorFactory { + public static final String APACHE_ATLAS_LEADER_ELECTOR_PATH = "/apache_atlas_leader_elector_path"; + + private final Configuration configuration; + private CuratorFramework curatorFramework; + + /** + * Initializes the {@link CuratorFramework} that is used for all interaction with Zookeeper. + * @throws AtlasException + */ + public CuratorFactory() throws AtlasException { + configuration = ApplicationProperties.get(); + initializeCuratorFramework(); + } + + private void initializeCuratorFramework() { + HAConfiguration.ZookeeperProperties zookeeperProperties = + HAConfiguration.getZookeeperProperties(configuration); + curatorFramework = CuratorFrameworkFactory.builder(). + connectString(zookeeperProperties.getConnectString()). + sessionTimeoutMs(zookeeperProperties.getSessionTimeout()). + retryPolicy(new ExponentialBackoffRetry( + zookeeperProperties.getRetriesSleepTimeMillis(), zookeeperProperties.getNumRetries())).build(); + curatorFramework.start(); + } + + /** + * Cleanup resources related to {@link CuratorFramework}. + * + * After this call, no further calls to any curator objects should be done. + */ + public void close() { + curatorFramework.close(); + } + + /** + * Returns a pre-created instance of {@link CuratorFramework}. + * + * This method can be called any number of times to access the {@link CuratorFramework} used in the + * application. + * @return + */ + public CuratorFramework clientInstance() { + return curatorFramework; + } + + /** + * Create a new instance {@link LeaderLatch} + * @param serverId the ID used to register this instance with curator. + * This ID should typically be obtained using + * {@link HAConfiguration#getAtlasServerId(Configuration)} + * @return + */ + public LeaderLatch leaderLatchInstance(String serverId) { + return new LeaderLatch(curatorFramework, APACHE_ATLAS_LEADER_ELECTOR_PATH, serverId); + } +} diff --git a/webapp/src/main/java/org/apache/atlas/web/service/ServiceState.java b/webapp/src/main/java/org/apache/atlas/web/service/ServiceState.java new file mode 100644 index 0000000..2d9e00a --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/service/ServiceState.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.service; + +import com.google.common.base.Preconditions; +import com.google.inject.Singleton; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; +import org.apache.atlas.ha.HAConfiguration; +import org.apache.commons.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A class that maintains the state of this instance. + * + * The states are maintained at a granular level, including in-transition states. The transitions are + * directed by {@link ActiveInstanceElectorService}. + */ +@Singleton +public class ServiceState { + + private static final Logger LOG = LoggerFactory.getLogger(ServiceState.class); + + public enum ServiceStateValue { + ACTIVE, + PASSIVE, + BECOMING_ACTIVE, + BECOMING_PASSIVE + } + + private Configuration configuration; + private volatile ServiceStateValue state; + + public ServiceState() throws AtlasException { + this(ApplicationProperties.get()); + } + + public ServiceState(Configuration configuration) { + this.configuration = configuration; + state = !HAConfiguration.isHAEnabled(configuration) ? + ServiceStateValue.ACTIVE : ServiceStateValue.PASSIVE; + } + + public ServiceStateValue getState() { + return state; + } + + public void becomingActive() { + LOG.warn("Instance becoming active from {}", state); + setState(ServiceStateValue.BECOMING_ACTIVE); + } + + private void setState(ServiceStateValue newState) { + Preconditions.checkState(HAConfiguration.isHAEnabled(configuration), + "Cannot change state as requested, as HA is not enabled for this instance."); + state = newState; + } + + public void setActive() { + LOG.warn("Instance is active from {}", state); + setState(ServiceStateValue.ACTIVE); + } + + public void becomingPassive() { + LOG.warn("Instance becoming passive from {}", state); + setState(ServiceStateValue.BECOMING_PASSIVE); + } + + public void setPassive() { + LOG.warn("Instance is passive from {}", state); + setState(ServiceStateValue.PASSIVE); + } + + public boolean isInstanceInTransition() { + ServiceStateValue state = getState(); + return state == ServiceStateValue.BECOMING_ACTIVE + || state == ServiceStateValue.BECOMING_PASSIVE; + } +} diff --git a/webapp/src/test/java/org/apache/atlas/web/filters/ActiveServerFilterTest.java b/webapp/src/test/java/org/apache/atlas/web/filters/ActiveServerFilterTest.java new file mode 100644 index 0000000..c6962fa --- /dev/null +++ b/webapp/src/test/java/org/apache/atlas/web/filters/ActiveServerFilterTest.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.filters; + +import org.apache.atlas.web.service.ActiveInstanceState; +import org.apache.atlas.web.service.ServiceState; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import javax.servlet.FilterChain; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.HttpMethod; + +import java.io.IOException; + +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +public class ActiveServerFilterTest { + + public static final String ACTIVE_SERVER_ADDRESS = "http://localhost:21000/"; + @Mock + private ActiveInstanceState activeInstanceState; + + @Mock + private HttpServletRequest servletRequest; + + @Mock + private HttpServletResponse servletResponse; + + @Mock + private FilterChain filterChain; + + @Mock + private ServiceState serviceState; + + @BeforeMethod + public void setUp() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testShouldPassThroughRequestsIfActive() throws IOException, ServletException { + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE); + ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState); + + activeServerFilter.doFilter(servletRequest, servletResponse, filterChain); + + verify(filterChain).doFilter(servletRequest, servletResponse); + } + + @Test + public void testShouldFailIfCannotRetrieveActiveServerAddress() throws IOException, ServletException { + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); + ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState); + + when(activeInstanceState.getActiveServerAddress()).thenReturn(null); + + activeServerFilter.doFilter(servletRequest, servletResponse, filterChain); + + verify(servletResponse).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); + } + + @Test + public void testShouldRedirectRequestToActiveServerAddress() throws IOException, ServletException { + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); + ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState); + + when(activeInstanceState.getActiveServerAddress()).thenReturn(ACTIVE_SERVER_ADDRESS); + when(servletRequest.getRequestURI()).thenReturn("types"); + when(servletRequest.getMethod()).thenReturn(HttpMethod.GET); + + activeServerFilter.doFilter(servletRequest, servletResponse, filterChain); + + verify(servletResponse).sendRedirect(ACTIVE_SERVER_ADDRESS+"types"); + } + + @Test + public void testRedirectedRequestShouldContainQueryParameters() throws IOException, ServletException { + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); + ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState); + + when(activeInstanceState.getActiveServerAddress()).thenReturn(ACTIVE_SERVER_ADDRESS); + when(servletRequest.getMethod()).thenReturn(HttpMethod.GET); + when(servletRequest.getRequestURI()).thenReturn("types"); + when(servletRequest.getQueryString()).thenReturn("query=TRAIT"); + + activeServerFilter.doFilter(servletRequest, servletResponse, filterChain); + + verify(servletResponse).sendRedirect(ACTIVE_SERVER_ADDRESS+"types?query=TRAIT"); + + } + + @Test + public void testShouldRedirectPOSTRequest() throws IOException, ServletException { + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); + ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState); + + when(activeInstanceState.getActiveServerAddress()).thenReturn(ACTIVE_SERVER_ADDRESS); + when(servletRequest.getMethod()).thenReturn(HttpMethod.POST); + when(servletRequest.getRequestURI()).thenReturn("types"); + + activeServerFilter.doFilter(servletRequest, servletResponse, filterChain); + + verify(servletResponse).setHeader("Location", ACTIVE_SERVER_ADDRESS+"types"); + verify(servletResponse).setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT); + } + + @Test + public void testShouldRedirectPUTRequest() throws IOException, ServletException { + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); + ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState); + + when(activeInstanceState.getActiveServerAddress()).thenReturn(ACTIVE_SERVER_ADDRESS); + when(servletRequest.getMethod()).thenReturn(HttpMethod.PUT); + when(servletRequest.getRequestURI()).thenReturn("types"); + + activeServerFilter.doFilter(servletRequest, servletResponse, filterChain); + + verify(servletResponse).setHeader("Location", ACTIVE_SERVER_ADDRESS+"types"); + verify(servletResponse).setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT); + } + + @Test + public void testShouldRedirectDELETERequest() throws IOException, ServletException { + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); + ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState); + + when(activeInstanceState.getActiveServerAddress()).thenReturn(ACTIVE_SERVER_ADDRESS); + when(servletRequest.getMethod()).thenReturn(HttpMethod.DELETE); + when(servletRequest.getRequestURI()). + thenReturn("api/atlas/entities/6ebb039f-eaa5-4b9c-ae44-799c7910545d/traits/test_tag_ha3"); + + activeServerFilter.doFilter(servletRequest, servletResponse, filterChain); + + verify(servletResponse).setHeader("Location", ACTIVE_SERVER_ADDRESS + + "api/atlas/entities/6ebb039f-eaa5-4b9c-ae44-799c7910545d/traits/test_tag_ha3"); + verify(servletResponse).setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT); + } + + @Test + public void testShouldReturnServiceUnavailableIfStateBecomingActive() throws IOException, ServletException { + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.BECOMING_ACTIVE); + ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState); + + activeServerFilter.doFilter(servletRequest, servletResponse, filterChain); + + verify(servletResponse).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); + } +} diff --git a/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java b/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java new file mode 100644 index 0000000..e6a46f7 --- /dev/null +++ b/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java @@ -0,0 +1,364 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.service; + +import com.google.inject.Provider; +import org.apache.atlas.AtlasConstants; +import org.apache.atlas.AtlasException; +import org.apache.atlas.ha.HAConfiguration; +import org.apache.atlas.listener.ActiveStateChangeHandler; +import org.apache.commons.configuration.Configuration; +import org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; + +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +public class ActiveInstanceElectorServiceTest { + + @Mock + private Configuration configuration; + + @Mock + private CuratorFactory curatorFactory; + + @Mock + private ActiveInstanceState activeInstanceState; + + @Mock + private ServiceState serviceState; + + @BeforeMethod + public void setup() { + System.setProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT, "21000"); + MockitoAnnotations.initMocks(this); + } + + @Test + public void testLeaderElectionIsJoinedOnStart() throws Exception { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); + LeaderLatch leaderLatch = mock(LeaderLatch.class); + when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch); + + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, new ArrayList(), curatorFactory, + activeInstanceState, serviceState); + activeInstanceElectorService.start(); + + verify(leaderLatch).start(); + } + + @Test + public void testListenerIsAddedForActiveInstanceCallbacks() throws Exception { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); + LeaderLatch leaderLatch = mock(LeaderLatch.class); + when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch); + + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, new ArrayList(), curatorFactory, + activeInstanceState, serviceState); + activeInstanceElectorService.start(); + + verify(leaderLatch).addListener(activeInstanceElectorService); + } + + @Test + public void testLeaderElectionIsNotStartedIfNotInHAMode() throws AtlasException { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false); + + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, new ArrayList(), curatorFactory, + activeInstanceState, serviceState); + activeInstanceElectorService.start(); + + verifyZeroInteractions(curatorFactory); + } + + @Test + public void testLeaderElectionIsLeftOnStop() throws IOException, AtlasException { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); + + LeaderLatch leaderLatch = mock(LeaderLatch.class); + when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch); + + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, new ArrayList(), curatorFactory, + activeInstanceState, serviceState); + activeInstanceElectorService.start(); + activeInstanceElectorService.stop(); + + verify(leaderLatch).close(); + } + + @Test + public void testCuratorFactoryIsClosedOnStop() throws AtlasException { + + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); + + LeaderLatch leaderLatch = mock(LeaderLatch.class); + when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch); + + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, new ArrayList(), curatorFactory, + activeInstanceState, serviceState); + activeInstanceElectorService.start(); + activeInstanceElectorService.stop(); + + verify(curatorFactory).close(); + } + + @Test + public void testNoActionOnStopIfHAModeIsDisabled() { + + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false); + + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, new ArrayList(), curatorFactory, + activeInstanceState, serviceState); + activeInstanceElectorService.stop(); + + verifyZeroInteractions(curatorFactory); + } + + @Test + public void testRegisteredHandlersAreNotifiedWhenInstanceIsActive() throws AtlasException { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); + LeaderLatch leaderLatch = mock(LeaderLatch.class); + when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch); + + Collection<Provider<ActiveStateChangeHandler>> changeHandlers = new ArrayList(); + final ActiveStateChangeHandler handler1 = mock(ActiveStateChangeHandler.class); + final ActiveStateChangeHandler handler2 = mock(ActiveStateChangeHandler.class); + + changeHandlers.add(new Provider<ActiveStateChangeHandler>() { + @Override + public ActiveStateChangeHandler get() { + return handler1; + } + }); + + changeHandlers.add(new Provider<ActiveStateChangeHandler>() { + @Override + public ActiveStateChangeHandler get() { + return handler2; + } + }); + + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory, + activeInstanceState, serviceState); + activeInstanceElectorService.start(); + activeInstanceElectorService.isLeader(); + + verify(handler1).instanceIsActive(); + verify(handler2).instanceIsActive(); + } + + @Test + public void testSharedStateIsUpdatedWhenInstanceIsActive() throws Exception { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); + LeaderLatch leaderLatch = mock(LeaderLatch.class); + when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch); + + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, new ArrayList(), curatorFactory, + activeInstanceState, serviceState); + + activeInstanceElectorService.start(); + activeInstanceElectorService.isLeader(); + + verify(activeInstanceState).update("id1"); + } + + @Test + public void testRegisteredHandlersAreNotifiedOfPassiveWhenStateUpdateFails() throws Exception { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); + + LeaderLatch leaderLatch = mock(LeaderLatch.class); + when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch); + + Collection<Provider<ActiveStateChangeHandler>> changeHandlers = new ArrayList(); + final ActiveStateChangeHandler handler1 = mock(ActiveStateChangeHandler.class); + final ActiveStateChangeHandler handler2 = mock(ActiveStateChangeHandler.class); + + changeHandlers.add(new Provider<ActiveStateChangeHandler>() { + @Override + public ActiveStateChangeHandler get() { + return handler1; + } + }); + + changeHandlers.add(new Provider<ActiveStateChangeHandler>() { + @Override + public ActiveStateChangeHandler get() { + return handler2; + } + }); + + doThrow(new Exception()).when(activeInstanceState).update("id1"); + + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory, + activeInstanceState, serviceState); + activeInstanceElectorService.start(); + activeInstanceElectorService.isLeader(); + + verify(handler1).instanceIsPassive(); + verify(handler2).instanceIsPassive(); + } + + @Test + public void testElectionIsRejoinedWhenStateUpdateFails() throws Exception { + + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); + + LeaderLatch leaderLatch = mock(LeaderLatch.class); + when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch); + + doThrow(new Exception()).when(activeInstanceState).update("id1"); + + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, new ArrayList(), curatorFactory, + activeInstanceState, serviceState); + + activeInstanceElectorService.start(); + activeInstanceElectorService.isLeader(); + + InOrder inOrder = inOrder(leaderLatch, curatorFactory); + inOrder.verify(leaderLatch).close(); + inOrder.verify(curatorFactory).leaderLatchInstance("id1"); + inOrder.verify(leaderLatch).addListener(activeInstanceElectorService); + inOrder.verify(leaderLatch).start(); + } + + @Test + public void testRegisteredHandlersAreNotifiedOfPassiveWhenInstanceIsPassive() throws AtlasException { + + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); + + LeaderLatch leaderLatch = mock(LeaderLatch.class); + when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch); + + Collection<Provider<ActiveStateChangeHandler>> changeHandlers = new ArrayList(); + final ActiveStateChangeHandler handler1 = mock(ActiveStateChangeHandler.class); + final ActiveStateChangeHandler handler2 = mock(ActiveStateChangeHandler.class); + + changeHandlers.add(new Provider<ActiveStateChangeHandler>() { + @Override + public ActiveStateChangeHandler get() { + return handler1; + } + }); + + changeHandlers.add(new Provider<ActiveStateChangeHandler>() { + @Override + public ActiveStateChangeHandler get() { + return handler2; + } + }); + + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory, + activeInstanceState, serviceState); + activeInstanceElectorService.start(); + activeInstanceElectorService.notLeader(); + + verify(handler1).instanceIsPassive(); + verify(handler2).instanceIsPassive(); + } + + @Test + public void testActiveStateSetOnBecomingLeader() { + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, new ArrayList(), + curatorFactory, activeInstanceState, serviceState); + + activeInstanceElectorService.isLeader(); + + InOrder inOrder = inOrder(serviceState); + inOrder.verify(serviceState).becomingActive(); + inOrder.verify(serviceState).setActive(); + } + + @Test + public void testPassiveStateSetOnLoosingLeadership() { + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, new ArrayList(), + curatorFactory, activeInstanceState, serviceState); + + activeInstanceElectorService.notLeader(); + + InOrder inOrder = inOrder(serviceState); + inOrder.verify(serviceState).becomingPassive(); + inOrder.verify(serviceState).setPassive(); + } + + @Test + public void testPassiveStateSetIfActivationFails() throws Exception { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); + + LeaderLatch leaderLatch = mock(LeaderLatch.class); + when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch); + + doThrow(new Exception()).when(activeInstanceState).update("id1"); + + ActiveInstanceElectorService activeInstanceElectorService = + new ActiveInstanceElectorService(configuration, new ArrayList(), + curatorFactory, activeInstanceState, serviceState); + activeInstanceElectorService.start(); + activeInstanceElectorService.isLeader(); + + InOrder inOrder = inOrder(serviceState); + inOrder.verify(serviceState).becomingActive(); + inOrder.verify(serviceState).becomingPassive(); + inOrder.verify(serviceState).setPassive(); + } +} diff --git a/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceStateTest.java b/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceStateTest.java new file mode 100644 index 0000000..939d0ca --- /dev/null +++ b/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceStateTest.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.service; + +import org.apache.atlas.ha.HAConfiguration; +import org.apache.commons.configuration.Configuration; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.CreateBuilder; +import org.apache.curator.framework.api.ExistsBuilder; +import org.apache.curator.framework.api.GetDataBuilder; +import org.apache.curator.framework.api.SetDataBuilder; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.Stat; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +import java.nio.charset.Charset; + +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; +import static org.testng.Assert.assertNull; + +public class ActiveInstanceStateTest { + + private static final String HOST_PORT = "127.0.0.1:21000"; + public static final String SERVER_ADDRESS = "http://" + HOST_PORT; + @Mock + private Configuration configuration; + + @Mock + private CuratorFactory curatorFactory; + + @Mock + private CuratorFramework curatorFramework; + + @BeforeTest + public void setup() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testSharedPathIsCreatedIfNotExists() throws Exception { + + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn(HOST_PORT); + + when(curatorFactory.clientInstance()).thenReturn(curatorFramework); + + ExistsBuilder existsBuilder = mock(ExistsBuilder.class); + when(curatorFramework.checkExists()).thenReturn(existsBuilder); + when(existsBuilder.forPath(ActiveInstanceState.APACHE_ATLAS_ACTIVE_SERVER_INFO)).thenReturn(null); + + CreateBuilder createBuilder = mock(CreateBuilder.class); + when(curatorFramework.create()).thenReturn(createBuilder); + when(createBuilder.withMode(CreateMode.EPHEMERAL)).thenReturn(createBuilder); + + SetDataBuilder setDataBuilder = mock(SetDataBuilder.class); + when(curatorFramework.setData()).thenReturn(setDataBuilder); + + ActiveInstanceState activeInstanceState = new ActiveInstanceState(configuration, curatorFactory); + activeInstanceState.update("id1"); + + verify(createBuilder).forPath(ActiveInstanceState.APACHE_ATLAS_ACTIVE_SERVER_INFO); + } + + @Test + public void testDataIsUpdatedWithAtlasServerAddress() throws Exception { + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn(HOST_PORT); + + when(curatorFactory.clientInstance()).thenReturn(curatorFramework); + ExistsBuilder existsBuilder = mock(ExistsBuilder.class); + when(curatorFramework.checkExists()).thenReturn(existsBuilder); + when(existsBuilder.forPath(ActiveInstanceState.APACHE_ATLAS_ACTIVE_SERVER_INFO)).thenReturn(new Stat()); + + SetDataBuilder setDataBuilder = mock(SetDataBuilder.class); + when(curatorFramework.setData()).thenReturn(setDataBuilder); + + ActiveInstanceState activeInstanceState = new ActiveInstanceState(configuration, curatorFactory); + activeInstanceState.update("id1"); + + verify(setDataBuilder).forPath( + ActiveInstanceState.APACHE_ATLAS_ACTIVE_SERVER_INFO, + SERVER_ADDRESS.getBytes(Charset.forName("UTF-8"))); + } + + @Test + public void testShouldReturnActiveServerAddress() throws Exception { + when(curatorFactory.clientInstance()).thenReturn(curatorFramework); + + GetDataBuilder getDataBuilder = mock(GetDataBuilder.class); + when(curatorFramework.getData()).thenReturn(getDataBuilder); + when(getDataBuilder.forPath(ActiveInstanceState.APACHE_ATLAS_ACTIVE_SERVER_INFO)). + thenReturn(SERVER_ADDRESS.getBytes(Charset.forName("UTF-8"))); + + ActiveInstanceState activeInstanceState = new ActiveInstanceState(configuration, curatorFactory); + String actualServerAddress = activeInstanceState.getActiveServerAddress(); + + assertEquals(SERVER_ADDRESS, actualServerAddress); + } + + @Test + public void testShouldHandleExceptionsInFetchingServerAddress() throws Exception { + when(curatorFactory.clientInstance()).thenReturn(curatorFramework); + + GetDataBuilder getDataBuilder = mock(GetDataBuilder.class); + when(curatorFramework.getData()).thenReturn(getDataBuilder); + when(getDataBuilder.forPath(ActiveInstanceState.APACHE_ATLAS_ACTIVE_SERVER_INFO)). + thenThrow(new Exception()); + + ActiveInstanceState activeInstanceState = new ActiveInstanceState(configuration, curatorFactory); + assertNull(activeInstanceState.getActiveServerAddress()); + } +} diff --git a/webapp/src/test/java/org/apache/atlas/web/service/ServiceStateTest.java b/webapp/src/test/java/org/apache/atlas/web/service/ServiceStateTest.java new file mode 100644 index 0000000..77aba88 --- /dev/null +++ b/webapp/src/test/java/org/apache/atlas/web/service/ServiceStateTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.service; + +import org.apache.atlas.ha.HAConfiguration; +import org.apache.commons.configuration.Configuration; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + +public class ServiceStateTest { + + @Mock + private Configuration configuration; + + @BeforeMethod + public void setup() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testShouldBeActiveIfHAIsDisabled() { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false); + + ServiceState serviceState = new ServiceState(configuration); + assertEquals(ServiceState.ServiceStateValue.ACTIVE, serviceState.getState()); + } + + @Test(expectedExceptions = IllegalStateException.class) + public void testShouldDisallowTransitionIfHAIsDisabled() { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false); + + ServiceState serviceState = new ServiceState(configuration); + serviceState.becomingPassive(); + fail("Should not allow transition"); + } + + @Test + public void testShouldChangeStateIfHAIsEnabled() { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + + ServiceState serviceState = new ServiceState(configuration); + serviceState.becomingPassive(); + assertEquals(ServiceState.ServiceStateValue.BECOMING_PASSIVE, serviceState.getState()); + } +} -- libgit2 0.27.1