Commit 8bde666b by Shwetha GS

ATLAS-511 Ability to run multiple instances of Atlas Server with automatic…

ATLAS-511 Ability to run multiple instances of Atlas Server with automatic failover to one active server (yhemanth via shwethags)
parent bca454e1
...@@ -151,6 +151,11 @@ ...@@ -151,6 +151,11 @@
<version>${project.version}</version> <version>${project.version}</version>
</artifactItem> </artifactItem>
<artifactItem> <artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-server-api</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.scala-lang</groupId> <groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId> <artifactId>scala-compiler</artifactId>
<version>${scala.version}</version> <version>${scala.version}</version>
......
...@@ -229,6 +229,11 @@ ...@@ -229,6 +229,11 @@
<version>${project.version}</version> <version>${project.version}</version>
</artifactItem> </artifactItem>
<artifactItem> <artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-server-api</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.scala-lang</groupId> <groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId> <artifactId>scala-compiler</artifactId>
<version>${scala.version}</version> <version>${scala.version}</version>
......
...@@ -234,6 +234,11 @@ ...@@ -234,6 +234,11 @@
<version>${project.version}</version> <version>${project.version}</version>
</artifactItem> </artifactItem>
<artifactItem> <artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-server-api</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.scala-lang</groupId> <groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId> <artifactId>scala-compiler</artifactId>
<version>${scala.version}</version> <version>${scala.version}</version>
......
...@@ -184,6 +184,11 @@ ...@@ -184,6 +184,11 @@
<version>${project.version}</version> <version>${project.version}</version>
</artifactItem> </artifactItem>
<artifactItem> <artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-server-api</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.scala-lang</groupId> <groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId> <artifactId>scala-compiler</artifactId>
<version>${scala.version}</version> <version>${scala.version}</version>
......
...@@ -145,6 +145,12 @@ public class AtlasClient { ...@@ -145,6 +145,12 @@ public class AtlasClient {
return true; return true;
} catch (ClientHandlerException che) { } catch (ClientHandlerException che) {
return false; return false;
} catch (AtlasServiceException ase) {
if (ase.getStatus().equals(ClientResponse.Status.SERVICE_UNAVAILABLE)) {
LOG.warn("Received SERVICE_UNAVAILABLE, server is not yet ready");
return false;
}
throw ase;
} }
} }
......
...@@ -28,6 +28,7 @@ import static org.junit.Assert.assertFalse; ...@@ -28,6 +28,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.testng.Assert.fail;
public class AtlasClientTest { public class AtlasClientTest {
...@@ -64,4 +65,33 @@ public class AtlasClientTest { ...@@ -64,4 +65,33 @@ public class AtlasClientTest {
new ClientHandlerException()); new ClientHandlerException());
assertFalse(atlasClient.isServerReady()); assertFalse(atlasClient.isServerReady());
} }
@Test
public void shouldReturnFalseIfServiceIsUnavailable() throws AtlasServiceException {
WebResource webResource = mock(WebResource.class);
AtlasClient atlasClient = new AtlasClient(webResource);
WebResource.Builder builder = setupBuilder(webResource);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.SERVICE_UNAVAILABLE.getStatusCode());
when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.SERVICE_UNAVAILABLE);
when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenReturn(response);
assertFalse(atlasClient.isServerReady());
}
@Test(expectedExceptions = AtlasServiceException.class)
public void shouldThrowErrorIfAnyResponseOtherThanServiceUnavailable() throws AtlasServiceException {
WebResource webResource = mock(WebResource.class);
AtlasClient atlasClient = new AtlasClient(webResource);
WebResource.Builder builder = setupBuilder(webResource);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.INTERNAL_SERVER_ERROR);
when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenReturn(response);
atlasClient.isServerReady();
fail("Should throw exception");
}
} }
...@@ -28,4 +28,6 @@ public final class AtlasConstants { ...@@ -28,4 +28,6 @@ public final class AtlasConstants {
public static final String CLUSTER_NAME_KEY = "atlas.cluster.name"; public static final String CLUSTER_NAME_KEY = "atlas.cluster.name";
public static final String DEFAULT_CLUSTER_NAME = "primary"; public static final String DEFAULT_CLUSTER_NAME = "primary";
public static final String CLUSTER_NAME_ATTRIBUTE = "clusterName"; public static final String CLUSTER_NAME_ATTRIBUTE = "clusterName";
public static final String SYSTEM_PROPERTY_APP_PORT = "atlas.app.port";
public static final String DEFAULT_APP_PORT_STR = "21000";
} }
...@@ -96,3 +96,12 @@ atlas.http.authentication.type=simple ...@@ -96,3 +96,12 @@ atlas.http.authentication.type=simple
######### Server Properties ######### ######### Server Properties #########
atlas.rest.address=http://localhost:21000 atlas.rest.address=http://localhost:21000
######### High Availability Configuration ########
atlas.server.ha.enabled=false
atlas.server.ids=id1
atlas.server.address.id1=localhost:21000
atlas.server.ha.zookeeper.connect=localhost:2181
atlas.server.ha.zookeeper.retry.sleeptime.ms=1000
atlas.server.ha.zookeeper.num.retries=3
atlas.server.ha.zookeeper.session.timeout.ms=20000
\ No newline at end of file
...@@ -127,6 +127,10 @@ public class KafkaNotification extends AbstractNotification implements Service { ...@@ -127,6 +127,10 @@ public class KafkaNotification extends AbstractNotification implements Service {
@Override @Override
public void start() throws AtlasException { public void start() throws AtlasException {
if (isHAEnabled()) {
LOG.info("Not starting embedded instances when HA is enabled.");
return;
}
if (isEmbedded()) { if (isEmbedded()) {
try { try {
startZk(); startZk();
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.atlas.notification; package org.apache.atlas.notification;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import java.util.Arrays; import java.util.Arrays;
...@@ -30,12 +31,14 @@ public abstract class AbstractNotification implements NotificationInterface { ...@@ -30,12 +31,14 @@ public abstract class AbstractNotification implements NotificationInterface {
private static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded"; private static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded";
private final boolean embedded; private final boolean embedded;
private final boolean isHAEnabled;
// ----- Constructors ------------------------------------------------------ // ----- Constructors ------------------------------------------------------
public AbstractNotification(Configuration applicationProperties) throws AtlasException { public AbstractNotification(Configuration applicationProperties) throws AtlasException {
this.embedded = applicationProperties.getBoolean(PROPERTY_EMBEDDED, false); this.embedded = applicationProperties.getBoolean(PROPERTY_EMBEDDED, false);
this.isHAEnabled = HAConfiguration.isHAEnabled(applicationProperties);
} }
...@@ -50,6 +53,10 @@ public abstract class AbstractNotification implements NotificationInterface { ...@@ -50,6 +53,10 @@ public abstract class AbstractNotification implements NotificationInterface {
return embedded; return embedded;
} }
protected final boolean isHAEnabled() {
return isHAEnabled;
}
@Override @Override
public <T> void send(NotificationType type, List<T> messages) throws NotificationException { public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
String[] strMessages = new String[messages.size()]; String[] strMessages = new String[messages.size()];
......
...@@ -23,6 +23,8 @@ import kafka.consumer.ConsumerTimeoutException; ...@@ -23,6 +23,8 @@ import kafka.consumer.ConsumerTimeoutException;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.service.Service; import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
...@@ -30,39 +32,69 @@ import org.apache.hadoop.security.UserGroupInformation; ...@@ -30,39 +32,69 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* Consumer of notifications from hooks e.g., hive hook etc. * Consumer of notifications from hooks e.g., hive hook etc.
*/ */
@Singleton @Singleton
public class NotificationHookConsumer implements Service { public class NotificationHookConsumer implements Service, ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class); private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads"; public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
public static final String ATLAS_ENDPOINT_PROPERTY = "atlas.rest.address"; public static final String ATLAS_ENDPOINT_PROPERTY = "atlas.rest.address";
public static final int SERVER_READY_WAIT_TIME_MS = 1000; public static final int SERVER_READY_WAIT_TIME_MS = 1000;
@Inject
private NotificationInterface notificationInterface; private NotificationInterface notificationInterface;
private ExecutorService executors; private ExecutorService executors;
private String atlasEndpoint; private String atlasEndpoint;
private Configuration applicationProperties;
private List<HookConsumer> consumers;
@Inject
public NotificationHookConsumer(NotificationInterface notificationInterface) {
this.notificationInterface = notificationInterface;
}
@Override @Override
public void start() throws AtlasException { public void start() throws AtlasException {
Configuration applicationProperties = ApplicationProperties.get(); Configuration configuration = ApplicationProperties.get();
startInternal(configuration, null);
}
atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000"); void startInternal(Configuration configuration,
ExecutorService executorService) {
this.applicationProperties = configuration;
this.atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000");
if (consumers == null) {
consumers = new ArrayList<>();
}
if (executorService != null) {
executors = executorService;
}
if (!HAConfiguration.isHAEnabled(configuration)) {
LOG.info("HA is disabled, starting consumers inline.");
startConsumers(executorService);
}
}
private void startConsumers(ExecutorService executorService) {
int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1); int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
List<NotificationConsumer<HookNotification.HookNotificationMessage>> consumers = List<NotificationConsumer<HookNotification.HookNotificationMessage>> notificationConsumers =
notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads); notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
executors = Executors.newFixedThreadPool(consumers.size()); if (executorService == null) {
executorService = Executors.newFixedThreadPool(notificationConsumers.size());
for (final NotificationConsumer<HookNotification.HookNotificationMessage> consumer : consumers) { }
executors.submit(new HookConsumer(consumer)); executors = executorService;
for (final NotificationConsumer<HookNotification.HookNotificationMessage> consumer : notificationConsumers) {
HookConsumer hookConsumer = new HookConsumer(consumer);
consumers.add(hookConsumer);
executors.submit(hookConsumer);
} }
} }
...@@ -71,14 +103,52 @@ public class NotificationHookConsumer implements Service { ...@@ -71,14 +103,52 @@ public class NotificationHookConsumer implements Service {
//Allow for completion of outstanding work //Allow for completion of outstanding work
notificationInterface.close(); notificationInterface.close();
try { try {
if (executors != null && !executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) { if (executors != null) {
stopConsumerThreads();
executors.shutdownNow();
if (!executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly"); LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
} }
executors = null;
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.error("Failure in shutting down consumers"); LOG.error("Failure in shutting down consumers");
} }
} }
private void stopConsumerThreads() {
if (consumers != null) {
for (HookConsumer consumer : consumers) {
consumer.stop();
}
consumers.clear();
}
}
/**
* Start Kafka consumer threads that read from Kafka topic when server is activated.
*
* Since the consumers create / update entities to the shared backend store, only the active instance
* should perform this activity. Hence, these threads are started only on server activation.
*/
@Override
public void instanceIsActive() {
LOG.info("Reacting to active state: initializing Kafka consumers");
startConsumers(executors);
}
/**
* Stop Kafka consumer threads that read from Kafka topic when server is de-activated.
*
* Since the consumers create / update entities to the shared backend store, only the active instance
* should perform this activity. Hence, these threads are stopped only on server deactivation.
*/
@Override
public void instanceIsPassive() {
LOG.info("Reacting to passive state: shutting down Kafka consumers.");
stop();
}
static class Timer { static class Timer {
public void sleep(int interval) throws InterruptedException { public void sleep(int interval) throws InterruptedException {
Thread.sleep(interval); Thread.sleep(interval);
...@@ -87,6 +157,7 @@ public class NotificationHookConsumer implements Service { ...@@ -87,6 +157,7 @@ public class NotificationHookConsumer implements Service {
class HookConsumer implements Runnable { class HookConsumer implements Runnable {
private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer; private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer;
private final AtomicBoolean shouldRun = new AtomicBoolean(false);
public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> consumer) { public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
this.consumer = consumer; this.consumer = consumer;
...@@ -102,12 +173,13 @@ public class NotificationHookConsumer implements Service { ...@@ -102,12 +173,13 @@ public class NotificationHookConsumer implements Service {
@Override @Override
public void run() { public void run() {
shouldRun.set(true);
if (!serverAvailable(new NotificationHookConsumer.Timer())) { if (!serverAvailable(new NotificationHookConsumer.Timer())) {
return; return;
} }
while (true) { while (shouldRun.get()) {
try { try {
if (hasNext()) { if (hasNext()) {
HookNotification.HookNotificationMessage message = consumer.next(); HookNotification.HookNotificationMessage message = consumer.next();
...@@ -177,5 +249,9 @@ public class NotificationHookConsumer implements Service { ...@@ -177,5 +249,9 @@ public class NotificationHookConsumer implements Service {
LOG.info("Atlas Server is ready, can start reading Kafka events."); LOG.info("Atlas Server is ready, can start reading Kafka events.");
return true; return true;
} }
public void stop() {
shouldRun.set(false);
}
} }
} }
...@@ -20,18 +20,43 @@ package org.apache.atlas.notification; ...@@ -20,18 +20,43 @@ package org.apache.atlas.notification;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException; import org.apache.atlas.AtlasServiceException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.commons.configuration.Configuration;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
import static org.testng.AssertJUnit.assertFalse; import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue; import static org.testng.AssertJUnit.assertTrue;
public class NotificationHookConsumerTest { public class NotificationHookConsumerTest {
@Mock
private NotificationInterface notificationInterface;
@Mock
private AtlasClient atlasClient;
@Mock
private Configuration configuration;
@Mock
private ExecutorService executorService;
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Test @Test
public void testConsumerCanProceedIfServerIsReady() throws InterruptedException, AtlasServiceException { public void testConsumerCanProceedIfServerIsReady() throws InterruptedException, AtlasServiceException {
final AtlasClient atlasClient = mock(AtlasClient.class); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
NotificationHookConsumer.HookConsumer hookConsumer = NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) { notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
@Override @Override
...@@ -49,8 +74,7 @@ public class NotificationHookConsumerTest { ...@@ -49,8 +74,7 @@ public class NotificationHookConsumerTest {
@Test @Test
public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws AtlasServiceException, InterruptedException { public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws AtlasServiceException, InterruptedException {
final AtlasClient atlasClient = mock(AtlasClient.class); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
NotificationHookConsumer.HookConsumer hookConsumer = NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) { notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
@Override @Override
...@@ -68,8 +92,7 @@ public class NotificationHookConsumerTest { ...@@ -68,8 +92,7 @@ public class NotificationHookConsumerTest {
@Test @Test
public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException, InterruptedException { public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException, InterruptedException {
final AtlasClient atlasClient = mock(AtlasClient.class); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
NotificationHookConsumer.HookConsumer hookConsumer = NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) { notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
@Override @Override
...@@ -86,8 +109,7 @@ public class NotificationHookConsumerTest { ...@@ -86,8 +109,7 @@ public class NotificationHookConsumerTest {
@Test @Test
public void testConsumerProceedsWithFalseOnAtlasServiceException() throws AtlasServiceException { public void testConsumerProceedsWithFalseOnAtlasServiceException() throws AtlasServiceException {
final AtlasClient atlasClient = mock(AtlasClient.class); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
NotificationHookConsumer.HookConsumer hookConsumer = NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) { notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
@Override @Override
...@@ -101,4 +123,61 @@ public class NotificationHookConsumerTest { ...@@ -101,4 +123,61 @@ public class NotificationHookConsumerTest {
assertFalse(hookConsumer.serverAvailable(timer)); assertFalse(hookConsumer.serverAvailable(timer));
} }
@Test
public void testConsumersStartedIfHAIsDisabled() {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
List<NotificationConsumer<Object>> consumers = new ArrayList();
consumers.add(mock(NotificationConsumer.class));
when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
thenReturn(consumers);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
notificationHookConsumer.startInternal(configuration, executorService);
verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1);
verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
}
@Test
public void testConsumersAreNotStartedIfHAIsEnabled() {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
List<NotificationConsumer<Object>> consumers = new ArrayList();
consumers.add(mock(NotificationConsumer.class));
when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
thenReturn(consumers);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
notificationHookConsumer.startInternal(configuration, executorService);
verifyZeroInteractions(notificationInterface);
}
@Test
public void testConsumersAreStartedWhenInstanceBecomesActive() {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
List<NotificationConsumer<Object>> consumers = new ArrayList();
consumers.add(mock(NotificationConsumer.class));
when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
thenReturn(consumers);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
notificationHookConsumer.startInternal(configuration, executorService);
notificationHookConsumer.instanceIsActive();
verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1);
verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
}
@Test
public void testConsumersAreStoppedWhenInstanceBecomesPassive() {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
List<NotificationConsumer<Object>> consumers = new ArrayList();
consumers.add(mock(NotificationConsumer.class));
when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
thenReturn(consumers);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
notificationHookConsumer.startInternal(configuration, executorService);
notificationHookConsumer.instanceIsPassive();
verify(notificationInterface).close();
verify(executorService).shutdownNow();
}
} }
...@@ -13,6 +13,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ...@@ -13,6 +13,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES: ALL CHANGES:
ATLAS-511 Ability to run multiple instances of Atlas Server with automatic failover to one active server (yhemanth via shwethags)
ATLAS-577 Integrate entity audit with DefaultMetadataService (shwethags) ATLAS-577 Integrate entity audit with DefaultMetadataService (shwethags)
ATLAS-588 import-hive.sh fails while importing partitions for a non-partitioned table (sumasai via shwethags) ATLAS-588 import-hive.sh fails while importing partitions for a non-partitioned table (sumasai via shwethags)
ATLAS-575 jetty-maven-plugin fails with ShutdownMonitorThread already started (shwethags) ATLAS-575 jetty-maven-plugin fails with ShutdownMonitorThread already started (shwethags)
......
...@@ -18,8 +18,12 @@ ...@@ -18,8 +18,12 @@
package org.apache.atlas.repository.audit; package org.apache.atlas.repository.audit;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Singleton;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.service.Service; import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
...@@ -59,7 +63,8 @@ import java.util.List; ...@@ -59,7 +63,8 @@ import java.util.List;
* and only 1 version is kept, there can be just 1 audit event per entity id + timestamp. This is ok for one atlas server. * and only 1 version is kept, there can be just 1 audit event per entity id + timestamp. This is ok for one atlas server.
* But if there are more than one atlas servers, we should use server id in the key * But if there are more than one atlas servers, we should use server id in the key
*/ */
public class HBaseBasedAuditRepository implements Service, EntityAuditRepository { @Singleton
public class HBaseBasedAuditRepository implements Service, EntityAuditRepository, ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(HBaseBasedAuditRepository.class); private static final Logger LOG = LoggerFactory.getLogger(HBaseBasedAuditRepository.class);
public static final String CONFIG_PREFIX = "atlas.audit"; public static final String CONFIG_PREFIX = "atlas.audit";
...@@ -237,23 +242,47 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository ...@@ -237,23 +242,47 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
@Override @Override
public void start() throws AtlasException { public void start() throws AtlasException {
Configuration atlasConf = ApplicationProperties.get(); Configuration configuration = ApplicationProperties.get();
startInternal(configuration, getHBaseConfiguration(configuration));
}
@VisibleForTesting
void startInternal(Configuration atlasConf,
org.apache.hadoop.conf.Configuration hbaseConf) throws AtlasException {
String tableNameStr = atlasConf.getString(CONFIG_TABLE_NAME, DEFAULT_TABLE_NAME); String tableNameStr = atlasConf.getString(CONFIG_TABLE_NAME, DEFAULT_TABLE_NAME);
tableName = TableName.valueOf(tableNameStr); tableName = TableName.valueOf(tableNameStr);
try { try {
org.apache.hadoop.conf.Configuration hbaseConf = getHBaseConfiguration(atlasConf); connection = createConnection(hbaseConf);
connection = ConnectionFactory.createConnection(hbaseConf);
} catch (IOException e) { } catch (IOException e) {
throw new AtlasException(e); throw new AtlasException(e);
} }
if (!HAConfiguration.isHAEnabled(atlasConf)) {
LOG.info("HA is disabled. Hence creating table on startup.");
createTableIfNotExists(); createTableIfNotExists();
} }
}
@VisibleForTesting
protected Connection createConnection(org.apache.hadoop.conf.Configuration hbaseConf) throws IOException {
return ConnectionFactory.createConnection(hbaseConf);
}
@Override @Override
public void stop() throws AtlasException { public void stop() throws AtlasException {
close(connection); close(connection);
} }
@Override
public void instanceIsActive() throws AtlasException {
LOG.info("Reacting to active: Creating HBase table for Audit if required.");
createTableIfNotExists();
}
@Override
public void instanceIsPassive() {
LOG.info("Reacting to passive: No action for now.");
}
} }
...@@ -26,12 +26,15 @@ import com.thinkaurelius.titan.core.schema.TitanGraphIndex; ...@@ -26,12 +26,15 @@ import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
import com.thinkaurelius.titan.core.schema.TitanManagement; import com.thinkaurelius.titan.core.schema.TitanManagement;
import com.tinkerpop.blueprints.Edge; import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Vertex; import com.tinkerpop.blueprints.Vertex;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.discovery.SearchIndexer; import org.apache.atlas.discovery.SearchIndexer;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.IndexCreationException; import org.apache.atlas.repository.IndexCreationException;
import org.apache.atlas.repository.IndexException; import org.apache.atlas.repository.IndexException;
import org.apache.atlas.repository.RepositoryException; import org.apache.atlas.repository.RepositoryException;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.typesystem.types.AttributeInfo; import org.apache.atlas.typesystem.types.AttributeInfo;
import org.apache.atlas.typesystem.types.ClassType; import org.apache.atlas.typesystem.types.ClassType;
import org.apache.atlas.typesystem.types.DataTypes; import org.apache.atlas.typesystem.types.DataTypes;
...@@ -39,6 +42,7 @@ import org.apache.atlas.typesystem.types.IDataType; ...@@ -39,6 +42,7 @@ import org.apache.atlas.typesystem.types.IDataType;
import org.apache.atlas.typesystem.types.Multiplicity; import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.StructType; import org.apache.atlas.typesystem.types.StructType;
import org.apache.atlas.typesystem.types.TraitType; import org.apache.atlas.typesystem.types.TraitType;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -53,7 +57,7 @@ import java.util.Map; ...@@ -53,7 +57,7 @@ import java.util.Map;
/** /**
* Adds index for properties of a given type when its added before any instances are added. * Adds index for properties of a given type when its added before any instances are added.
*/ */
public class GraphBackedSearchIndexer implements SearchIndexer { public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(GraphBackedSearchIndexer.class); private static final Logger LOG = LoggerFactory.getLogger(GraphBackedSearchIndexer.class);
...@@ -67,14 +71,17 @@ public class GraphBackedSearchIndexer implements SearchIndexer { ...@@ -67,14 +71,17 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
@Inject @Inject
public GraphBackedSearchIndexer(GraphProvider<TitanGraph> graphProvider) throws RepositoryException, public GraphBackedSearchIndexer(GraphProvider<TitanGraph> graphProvider) throws RepositoryException,
IndexException { AtlasException {
this(graphProvider, ApplicationProperties.get());
}
GraphBackedSearchIndexer(GraphProvider<TitanGraph> graphProvider, Configuration configuration)
throws IndexException, RepositoryException {
this.titanGraph = graphProvider.get(); this.titanGraph = graphProvider.get();
if (!HAConfiguration.isHAEnabled(configuration)) {
/* Create the transaction for indexing.
*/
initialize(); initialize();
} }
}
/** /**
* Initializes the indices for the graph - create indices for Global Vertex Keys * Initializes the indices for the graph - create indices for Global Vertex Keys
...@@ -355,6 +362,28 @@ public class GraphBackedSearchIndexer implements SearchIndexer { ...@@ -355,6 +362,28 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
} }
} }
/**
* Initialize global indices for Titan graph on server activation.
*
* Since the indices are shared state, we need to do this only from an active instance.
*/
@Override
public void instanceIsActive() throws AtlasException {
LOG.info("Reacting to active: initializing index");
try {
initialize();
} catch (RepositoryException e) {
throw new AtlasException("Error in reacting to active on initialization", e);
} catch (IndexException e) {
throw new AtlasException("Error in reacting to active on initialization", e);
}
}
@Override
public void instanceIsPassive() {
LOG.info("Reacting to passive state: No action right now.");
}
/* Commenting this out since we do not need an index for edge label here /* Commenting this out since we do not need an index for edge label here
private void createEdgeMixedIndex(String propertyName) { private void createEdgeMixedIndex(String propertyName) {
EdgeLabel edgeLabel = management.getEdgeLabel(propertyName); EdgeLabel edgeLabel = management.getEdgeLabel(propertyName);
......
...@@ -22,9 +22,13 @@ import com.google.common.base.Preconditions; ...@@ -22,9 +22,13 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.inject.Provider; import com.google.inject.Provider;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.classification.InterfaceAudience; import org.apache.atlas.classification.InterfaceAudience;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.listener.TypesChangeListener; import org.apache.atlas.listener.TypesChangeListener;
import org.apache.atlas.repository.MetadataRepository; import org.apache.atlas.repository.MetadataRepository;
...@@ -58,6 +62,7 @@ import org.apache.atlas.typesystem.types.TypeUtils.Pair; ...@@ -58,6 +62,7 @@ import org.apache.atlas.typesystem.types.TypeUtils.Pair;
import org.apache.atlas.typesystem.types.ValueConversionException; import org.apache.atlas.typesystem.types.ValueConversionException;
import org.apache.atlas.typesystem.types.utils.TypesUtil; import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.apache.atlas.utils.ParamChecker; import org.apache.atlas.utils.ParamChecker;
import org.apache.commons.configuration.Configuration;
import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
...@@ -71,13 +76,14 @@ import java.util.Collection; ...@@ -71,13 +76,14 @@ import java.util.Collection;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* Simple wrapper over TypeSystem and MetadataRepository services with hooks * Simple wrapper over TypeSystem and MetadataRepository services with hooks
* for listening to changes to the repository. * for listening to changes to the repository.
*/ */
@Singleton @Singleton
public class DefaultMetadataService implements MetadataService { public class DefaultMetadataService implements MetadataService, ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(DefaultMetadataService.class); private static final Logger LOG = LoggerFactory.getLogger(DefaultMetadataService.class);
...@@ -89,6 +95,8 @@ public class DefaultMetadataService implements MetadataService { ...@@ -89,6 +95,8 @@ public class DefaultMetadataService implements MetadataService {
private final Collection<TypesChangeListener> typeChangeListeners = new LinkedHashSet<>(); private final Collection<TypesChangeListener> typeChangeListeners = new LinkedHashSet<>();
private final Collection<EntityChangeListener> entityChangeListeners = new LinkedHashSet<>(); private final Collection<EntityChangeListener> entityChangeListeners = new LinkedHashSet<>();
private boolean wasInitialized = false;
@Inject @Inject
DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore, DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore,
final IBootstrapTypesRegistrar typesRegistrar, final IBootstrapTypesRegistrar typesRegistrar,
...@@ -96,14 +104,15 @@ public class DefaultMetadataService implements MetadataService { ...@@ -96,14 +104,15 @@ public class DefaultMetadataService implements MetadataService {
final Collection<Provider<EntityChangeListener>> entityListenerProviders) final Collection<Provider<EntityChangeListener>> entityListenerProviders)
throws AtlasException { throws AtlasException {
this(repository, typeStore, typesRegistrar, typeListenerProviders, entityListenerProviders, this(repository, typeStore, typesRegistrar, typeListenerProviders, entityListenerProviders,
TypeSystem.getInstance()); TypeSystem.getInstance(), ApplicationProperties.get());
} }
DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore, DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore,
final IBootstrapTypesRegistrar typesRegistrar, final IBootstrapTypesRegistrar typesRegistrar,
final Collection<Provider<TypesChangeListener>> typeListenerProviders, final Collection<Provider<TypesChangeListener>> typeListenerProviders,
final Collection<Provider<EntityChangeListener>> entityListenerProviders, final Collection<Provider<EntityChangeListener>> entityListenerProviders,
final TypeSystem typeSystem) throws AtlasException { final TypeSystem typeSystem,
final Configuration configuration) throws AtlasException {
this.typeStore = typeStore; this.typeStore = typeStore;
this.typesRegistrar = typesRegistrar; this.typesRegistrar = typesRegistrar;
this.typeSystem = typeSystem; this.typeSystem = typeSystem;
...@@ -117,25 +126,37 @@ public class DefaultMetadataService implements MetadataService { ...@@ -117,25 +126,37 @@ public class DefaultMetadataService implements MetadataService {
entityChangeListeners.add(provider.get()); entityChangeListeners.add(provider.get());
} }
if (!HAConfiguration.isHAEnabled(configuration)) {
restoreTypeSystem(); restoreTypeSystem();
}
typesRegistrar.registerTypes(ReservedTypesRegistrar.getTypesDir(), typeSystem, this);
} }
private void restoreTypeSystem() { private void restoreTypeSystem() throws AtlasException {
LOG.info("Restoring type system from the store"); LOG.info("Restoring type system from the store");
try {
TypesDef typesDef = typeStore.restore(); TypesDef typesDef = typeStore.restore();
if (!wasInitialized) {
LOG.info("Initializing type system for the first time.");
typeSystem.defineTypes(typesDef); typeSystem.defineTypes(typesDef);
// restore types before creating super types // restore types before creating super types
createSuperTypes(); createSuperTypes();
} catch (AtlasException e) { typesRegistrar.registerTypes(ReservedTypesRegistrar.getTypesDir(), typeSystem, this);
throw new RuntimeException(e); wasInitialized = true;
} else {
LOG.info("Type system was already initialized, refreshing cache.");
refreshCache(typesDef);
} }
LOG.info("Restored type system from the store"); LOG.info("Restored type system from the store");
} }
private void refreshCache(TypesDef typesDef) throws AtlasException {
TypeSystem.TransientTypeSystem transientTypeSystem
= typeSystem.createTransientTypeSystem(typesDef, true);
Map<String, IDataType> typesAdded = transientTypeSystem.getTypesAdded();
LOG.info("Number of types got from transient type system: " + typesAdded.size());
typeSystem.commitTypes(typesAdded);
}
private static final AttributeDefinition NAME_ATTRIBUTE = private static final AttributeDefinition NAME_ATTRIBUTE =
TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE); TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE);
private static final AttributeDefinition DESCRIPTION_ATTRIBUTE = private static final AttributeDefinition DESCRIPTION_ATTRIBUTE =
...@@ -683,4 +704,23 @@ public class DefaultMetadataService implements MetadataService { ...@@ -683,4 +704,23 @@ public class DefaultMetadataService implements MetadataService {
listener.onEntitiesDeleted(entities); listener.onEntitiesDeleted(entities);
} }
} }
/**
* Create or restore the {@link TypeSystem} cache on server activation.
*
* When an instance is passive, types could be created outside of its cache by the active instance.
* Hence, when this instance becomes active, it needs to restore the cache from the backend store.
* The first time initialization happens, the indices for these types also needs to be created.
* This must happen only from the active instance, as it updates shared backend state.
*/
@Override
public void instanceIsActive() throws AtlasException {
LOG.info("Reacting to active state: restoring type system");
restoreTypeSystem();
}
@Override
public void instanceIsPassive() {
LOG.info("Reacting to passive state: no action right now");
}
} }
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.audit;
import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.io.IOException;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class HBaseBasedAuditRepositoryHATest {
@Mock
private Configuration configuration;
@Mock
private org.apache.hadoop.conf.Configuration hbaseConf;
@Mock
private Connection connection;
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Test
public void testTableShouldNotBeCreatedOnStartIfHAIsEnabled() throws IOException, AtlasException {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getString(HBaseBasedAuditRepository.CONFIG_TABLE_NAME,
HBaseBasedAuditRepository.DEFAULT_TABLE_NAME)).
thenReturn(HBaseBasedAuditRepository.DEFAULT_TABLE_NAME);
HBaseBasedAuditRepository auditRepository = new HBaseBasedAuditRepository() {
@Override
protected Connection createConnection(org.apache.hadoop.conf.Configuration hbaseConf) {
return connection;
}
};
auditRepository.startInternal(configuration, hbaseConf);
verifyZeroInteractions(connection);
}
@Test
public void testShouldCreateTableWhenReactingToActive() throws AtlasException, IOException {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getString(HBaseBasedAuditRepository.CONFIG_TABLE_NAME,
HBaseBasedAuditRepository.DEFAULT_TABLE_NAME)).
thenReturn(HBaseBasedAuditRepository.DEFAULT_TABLE_NAME);
TableName tableName = TableName.valueOf(HBaseBasedAuditRepository.DEFAULT_TABLE_NAME);
Admin admin = mock(Admin.class);
when(connection.getAdmin()).thenReturn(admin);
when(admin.tableExists(tableName)).thenReturn(true);
HBaseBasedAuditRepository auditRepository = new HBaseBasedAuditRepository() {
@Override
protected Connection createConnection(org.apache.hadoop.conf.Configuration hbaseConf) {
return connection;
}
};
auditRepository.startInternal(configuration, hbaseConf);
auditRepository.instanceIsActive();
verify(connection).getAdmin();
verify(admin).tableExists(tableName);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graph;
import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.schema.TitanManagement;
import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.IndexException;
import org.apache.atlas.repository.RepositoryException;
import org.apache.commons.configuration.Configuration;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class GraphBackedSearchIndexerTest {
@Mock
private Configuration configuration;
@Mock
private GraphProvider<TitanGraph> graphProvider;
@Mock
private TitanGraph titanGraph;
@Mock
private TitanManagement titanManagement;
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Test
public void testSearchIndicesAreInitializedOnConstructionWhenHAIsDisabled() throws IndexException, RepositoryException {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
when(graphProvider.get()).thenReturn(titanGraph);
when(titanGraph.getManagementSystem()).thenReturn(titanManagement);
when(titanManagement.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)).thenReturn(true);
GraphBackedSearchIndexer graphBackedSearchIndexer = new GraphBackedSearchIndexer(graphProvider, configuration);
verify(titanManagement).containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY);
}
@Test
public void testSearchIndicesAreNotInitializedOnConstructionWhenHAIsEnabled() throws IndexException, RepositoryException {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(graphProvider.get()).thenReturn(titanGraph);
when(titanGraph.getManagementSystem()).thenReturn(titanManagement);
when(titanManagement.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)).thenReturn(true);
GraphBackedSearchIndexer graphBackedSearchIndexer = new GraphBackedSearchIndexer(graphProvider, configuration);
verifyZeroInteractions(titanManagement);
}
@Test
public void testIndicesAreReinitializedWhenServerBecomesActive() throws AtlasException {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(graphProvider.get()).thenReturn(titanGraph);
when(titanGraph.getManagementSystem()).thenReturn(titanManagement);
when(titanManagement.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)).thenReturn(true);
GraphBackedSearchIndexer graphBackedSearchIndexer = new GraphBackedSearchIndexer(graphProvider, configuration);
graphBackedSearchIndexer.instanceIsActive();
verify(titanManagement).containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY);
}
}
...@@ -25,28 +25,126 @@ import org.apache.atlas.listener.TypesChangeListener; ...@@ -25,28 +25,126 @@ import org.apache.atlas.listener.TypesChangeListener;
import org.apache.atlas.repository.MetadataRepository; import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.typestore.ITypeStore; import org.apache.atlas.repository.typestore.ITypeStore;
import org.apache.atlas.typesystem.types.TypeSystem; import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.TypesChangeListener;
import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.typestore.ITypeStore;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.types.IDataType;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.commons.configuration.Configuration;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class DefaultMetadataServiceMockTest { public class DefaultMetadataServiceMockTest {
@Mock
private IBootstrapTypesRegistrar typesRegistrar;
@Mock
private TypeSystem typeSystem;
@Mock
private MetadataRepository metadataRepository;
@Mock
private ITypeStore typeStore;
@Mock
private Configuration configuration;
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Test @Test
public void testShouldInvokeTypesRegistrarOnCreation() throws AtlasException { public void testShouldInvokeTypesRegistrarOnCreation() throws AtlasException {
IBootstrapTypesRegistrar typesRegistrar = mock(IBootstrapTypesRegistrar.class);
TypeSystem typeSystem = mock(TypeSystem.class);
when(typeSystem.isRegistered(any(String.class))).thenReturn(true); when(typeSystem.isRegistered(any(String.class))).thenReturn(true);
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
DefaultMetadataService defaultMetadataService = new DefaultMetadataService(mock(MetadataRepository.class), DefaultMetadataService defaultMetadataService = new DefaultMetadataService(mock(MetadataRepository.class),
mock(ITypeStore.class), mock(ITypeStore.class),
typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(), typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(),
new ArrayList<Provider<EntityChangeListener>>(), typeSystem); new ArrayList<Provider<EntityChangeListener>>(), typeSystem, configuration);
verify(typesRegistrar).registerTypes(ReservedTypesRegistrar.getTypesDir(),
typeSystem, defaultMetadataService);
}
@Test
public void testShouldNotRestoreTypesIfHAIsEnabled() throws AtlasException {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
DefaultMetadataService defaultMetadataService = new DefaultMetadataService(metadataRepository,
typeStore,
typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(),
new ArrayList<Provider<EntityChangeListener>>(), typeSystem, configuration);
verifyZeroInteractions(typeStore);
verifyZeroInteractions(typeSystem);
verifyZeroInteractions(typesRegistrar);
}
@Test
public void testShouldRestoreTypeSystemOnServerActive() throws AtlasException {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
TypesDef typesDef = mock(TypesDef.class);
when(typeStore.restore()).thenReturn(typesDef);
when(typeSystem.isRegistered(any(String.class))).thenReturn(true);
DefaultMetadataService defaultMetadataService = new DefaultMetadataService(metadataRepository,
typeStore,
typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(),
new ArrayList<Provider<EntityChangeListener>>(), typeSystem, configuration);
defaultMetadataService.instanceIsActive();
verify(typeStore).restore();
verify(typeSystem).defineTypes(typesDef);
verify(typesRegistrar).registerTypes(ReservedTypesRegistrar.getTypesDir(), verify(typesRegistrar).registerTypes(ReservedTypesRegistrar.getTypesDir(),
typeSystem, defaultMetadataService); typeSystem, defaultMetadataService);
} }
@Test
public void testShouldOnlyRestoreCacheOnServerActiveIfAlreadyDoneOnce() throws AtlasException {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
TypesDef typesDef = mock(TypesDef.class);
when(typeStore.restore()).thenReturn(typesDef);
when(typeSystem.isRegistered(any(String.class))).thenReturn(true);
TypeSystem.TransientTypeSystem transientTypeSystem = mock(TypeSystem.TransientTypeSystem.class);
HashMap<String, IDataType> typesAdded = new HashMap<>();
when(transientTypeSystem.getTypesAdded()).thenReturn(typesAdded);
when(typeSystem.createTransientTypeSystem(typesDef, true)).
thenReturn(transientTypeSystem);
DefaultMetadataService defaultMetadataService = new DefaultMetadataService(metadataRepository,
typeStore,
typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(),
new ArrayList<Provider<EntityChangeListener>>(), typeSystem, configuration);
defaultMetadataService.instanceIsActive();
defaultMetadataService.instanceIsPassive();
defaultMetadataService.instanceIsActive();
verify(typeStore, times(2)).restore();
verify(typeSystem, times(1)).defineTypes(typesDef);
verify(typesRegistrar, times(1)).
registerTypes(ReservedTypesRegistrar.getTypesDir(), typeSystem, defaultMetadataService);
verify(typeSystem, times(1)).createTransientTypeSystem(typesDef, true);
verify(typeSystem, times(1)).commitTypes(typesAdded);
}
} }
...@@ -47,6 +47,19 @@ ...@@ -47,6 +47,19 @@
<groupId>org.apache.atlas</groupId> <groupId>org.apache.atlas</groupId>
<artifactId>atlas-typesystem</artifactId> <artifactId>atlas-typesystem</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-client</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.ha;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasException;
import org.apache.atlas.security.SecurityProperties;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.net.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
/**
* A wrapper for getting configuration entries related to HighAvailability.
*/
public class HAConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(HAConfiguration.class);
public static final String ATLAS_SERVER_HA_PREFIX = "atlas.server.ha";
public static final String ATLAS_SERVER_HA_ENABLED_KEY = ATLAS_SERVER_HA_PREFIX + ".enabled";
public static final String ATLAS_SERVER_ADDRESS_PREFIX = "atlas.server.address.";
public static final String ATLAS_SERVER_IDS = "atlas.server.ids";
public static final String HA_ZOOKEEPER_CONNECT = ATLAS_SERVER_HA_PREFIX + ".zookeeper.connect";
public static final int DEFAULT_ZOOKEEPER_CONNECT_SLEEPTIME_MILLIS = 1000;
public static final String HA_ZOOKEEPER_RETRY_SLEEPTIME_MILLIS = ATLAS_SERVER_HA_PREFIX + ".zookeeper.retry.sleeptime.ms";
public static final String HA_ZOOKEEPER_NUM_RETRIES = ATLAS_SERVER_HA_PREFIX + ".zookeeper.num.retries";
public static final int DEFAULT_ZOOKEEPER_CONNECT_NUM_RETRIES = 3;
public static final String HA_ZOOKEEPER_SESSION_TIMEOUT_MS = ATLAS_SERVER_HA_PREFIX + ".zookeeper.session.timeout.ms";
public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MILLIS = 20000;
/**
* Return whether HA is enabled or not.
* @param configuration underlying configuration instance
* @return
*/
public static boolean isHAEnabled(Configuration configuration) {
return configuration.getBoolean(ATLAS_SERVER_HA_ENABLED_KEY, false);
}
/**
* Return the ID corresponding to this Atlas instance.
*
* The match is done by looking for an ID configured in {@link HAConfiguration#ATLAS_SERVER_IDS} key
* that has a host:port entry for the key {@link HAConfiguration#ATLAS_SERVER_ADDRESS_PREFIX}+ID where
* the host is a local IP address and port is set in the system property
* {@link AtlasConstants#SYSTEM_PROPERTY_APP_PORT}.
*
* @param configuration
* @return
* @throws AtlasException if no ID is found that maps to a local IP Address or port
*/
public static String getAtlasServerId(Configuration configuration) throws AtlasException {
// ids are already trimmed by this method
String[] ids = configuration.getStringArray(ATLAS_SERVER_IDS);
String matchingServerId = null;
int appPort = Integer.parseInt(System.getProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT));
for (String id : ids) {
String hostPort = configuration.getString(ATLAS_SERVER_ADDRESS_PREFIX +id);
if (!StringUtils.isEmpty(hostPort)) {
InetSocketAddress socketAddress;
try {
socketAddress = NetUtils.createSocketAddr(hostPort);
} catch (Exception e) {
LOG.warn("Exception while trying to get socket address for " + hostPort, e);
continue;
}
if (!socketAddress.isUnresolved()
&& NetUtils.isLocalAddress(socketAddress.getAddress())
&& appPort == socketAddress.getPort()) {
LOG.info("Found matched server id " + id + " with host port: " + hostPort);
matchingServerId = id;
break;
}
} else {
LOG.info("Could not find matching address entry for id: " + id);
}
}
if (matchingServerId == null) {
String msg = String.format("Could not find server id for this instance. " +
"Unable to find IDs matching any local host and port binding among %s",
StringUtils.join(ids, ","));
throw new AtlasException(msg);
}
return matchingServerId;
}
/**
* Get the web server address that a server instance with the passed ID is bound to.
*
* This method uses the property {@link SecurityProperties#TLS_ENABLED} to determine whether
* the URL is http or https.
*
* @param configuration underlying configuration
* @param serverId serverId whose host:port property is picked to build the web server address.
* @return
*/
public static String getBoundAddressForId(Configuration configuration, String serverId) {
String hostPort = configuration.getString(ATLAS_SERVER_ADDRESS_PREFIX +serverId);
boolean isSecure = configuration.getBoolean(SecurityProperties.TLS_ENABLED);
String protocol = (isSecure) ? "https://" : "http://";
return protocol + hostPort;
}
/**
* A collection of Zookeeper specific configuration that is used by High Availability code
*/
public static class ZookeeperProperties {
private String connectString;
private int retriesSleepTimeMillis;
private int numRetries;
private int sessionTimeout;
public ZookeeperProperties(String connectString, int retriesSleepTimeMillis, int numRetries,
int sessionTimeout) {
this.connectString = connectString;
this.retriesSleepTimeMillis = retriesSleepTimeMillis;
this.numRetries = numRetries;
this.sessionTimeout = sessionTimeout;
}
public String getConnectString() {
return connectString;
}
public int getRetriesSleepTimeMillis() {
return retriesSleepTimeMillis;
}
public int getNumRetries() {
return numRetries;
}
public int getSessionTimeout() {
return sessionTimeout;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ZookeeperProperties that = (ZookeeperProperties) o;
if (retriesSleepTimeMillis != that.retriesSleepTimeMillis) return false;
if (numRetries != that.numRetries) return false;
if (sessionTimeout != that.sessionTimeout) return false;
return !(connectString != null ? !connectString.equals(that.connectString) : that.connectString != null);
}
@Override
public int hashCode() {
int result = connectString != null ? connectString.hashCode() : 0;
result = 31 * result + retriesSleepTimeMillis;
result = 31 * result + numRetries;
result = 31 * result + sessionTimeout;
return result;
}
}
public static ZookeeperProperties getZookeeperProperties(Configuration configuration) {
String zookeeperConnectString = configuration.getString("atlas.kafka.zookeeper.connect");
if (configuration.containsKey(HA_ZOOKEEPER_CONNECT)) {
zookeeperConnectString = configuration.getString(HA_ZOOKEEPER_CONNECT);
}
int retriesSleepTimeMillis = configuration.getInt(HA_ZOOKEEPER_RETRY_SLEEPTIME_MILLIS,
DEFAULT_ZOOKEEPER_CONNECT_SLEEPTIME_MILLIS);
int numRetries = configuration.getInt(HA_ZOOKEEPER_NUM_RETRIES, DEFAULT_ZOOKEEPER_CONNECT_NUM_RETRIES);
int sessionTimeout = configuration.getInt(HA_ZOOKEEPER_SESSION_TIMEOUT_MS,
DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MILLIS);
return new ZookeeperProperties(zookeeperConnectString, retriesSleepTimeMillis, numRetries, sessionTimeout);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.listener;
import org.apache.atlas.AtlasException;
/**
* An interface that should be implemented by objects and services to react to changes in state of an Atlas server.
*
* The two state transitions we handle are (1) becoming active and (2) becoming passive.
*/
public interface ActiveStateChangeHandler {
/**
* Callback that is invoked on an implementor when this instance of Atlas server is declared the leader.
*
* Any initialization that must be carried out by an implementor only when the server becomes active
* should happen on this callback.
*
* @throws {@link AtlasException} if anything is wrong on initialization
*/
void instanceIsActive() throws AtlasException;
/**
* Callback that is invoked on an implementor when this instance of Atlas server is removed as the leader.
*
* Any cleanup that must be carried out by an implementor when the server becomes passive
* should happen on this callback.
*
* @throws {@link AtlasException} if anything is wrong on shutdown
*/
void instanceIsPassive() throws AtlasException;
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.ha;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasException;
import org.apache.atlas.security.SecurityProperties;
import org.apache.commons.configuration.Configuration;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
public class HAConfigurationTest {
@Mock
private Configuration configuration;
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
System.setProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT, AtlasConstants.DEFAULT_APP_PORT_STR);
}
@Test
public void testShouldSelectRightServerAddress() throws AtlasException {
when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1", "id2"});
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:31000");
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id2")).thenReturn("127.0.0.1:21000");
String atlasServerId = HAConfiguration.getAtlasServerId(configuration);
assertEquals(atlasServerId, "id2");
}
@Test(expectedExceptions = AtlasException.class)
public void testShouldFailIfNoIDsConfiguration() throws AtlasException {
when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {});
HAConfiguration.getAtlasServerId(configuration);
fail("Should not return any server id if IDs not found in configuration");
}
@Test(expectedExceptions = AtlasException.class)
public void testShouldFailIfNoMatchingAddressForID() throws AtlasException {
when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1", "id2"});
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:31000");
HAConfiguration.getAtlasServerId(configuration);
fail("Should not return any server id if no matching address found for any ID");
}
@Test
public void testShouldReturnHTTPBoundAddress() {
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000");
when(configuration.getBoolean(SecurityProperties.TLS_ENABLED)).thenReturn(false);
String address = HAConfiguration.getBoundAddressForId(configuration, "id1");
assertEquals(address, "http://127.0.0.1:21000");
}
@Test
public void testShouldReturnHTTPSBoundAddress() {
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21443");
when(configuration.getBoolean(SecurityProperties.TLS_ENABLED)).thenReturn(true);
String address = HAConfiguration.getBoundAddressForId(configuration, "id1");
assertEquals(address, "https://127.0.0.1:21443");
}
}
...@@ -27,11 +27,14 @@ import org.apache.atlas.classification.InterfaceAudience; ...@@ -27,11 +27,14 @@ import org.apache.atlas.classification.InterfaceAudience;
import org.apache.atlas.typesystem.TypesDef; import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.exception.TypeExistsException; import org.apache.atlas.typesystem.exception.TypeExistsException;
import org.apache.atlas.typesystem.exception.TypeNotFoundException; import org.apache.atlas.typesystem.exception.TypeNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Singleton; import javax.inject.Singleton;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
...@@ -43,6 +46,8 @@ import java.util.concurrent.ConcurrentHashMap; ...@@ -43,6 +46,8 @@ import java.util.concurrent.ConcurrentHashMap;
@Singleton @Singleton
@InterfaceAudience.Private @InterfaceAudience.Private
public class TypeSystem { public class TypeSystem {
private static final Logger LOG = LoggerFactory.getLogger(TypeSystem.class);
private static final TypeSystem INSTANCE = new TypeSystem(); private static final TypeSystem INSTANCE = new TypeSystem();
private static ThreadLocal<SimpleDateFormat> dateFormat = new ThreadLocal<SimpleDateFormat>() { private static ThreadLocal<SimpleDateFormat> dateFormat = new ThreadLocal<SimpleDateFormat>() {
@Override @Override
...@@ -333,9 +338,12 @@ public class TypeSystem { ...@@ -333,9 +338,12 @@ public class TypeSystem {
IDataType type = typeEntry.getValue(); IDataType type = typeEntry.getValue();
//Add/replace the new type in the typesystem //Add/replace the new type in the typesystem
types.put(typeName, type); types.put(typeName, type);
// ArrayListMultiMap allows duplicates - we want to avoid this during re-activation.
if (!typeCategoriesToTypeNamesMap.containsEntry(type.getTypeCategory(), typeName)) {
typeCategoriesToTypeNamesMap.put(type.getTypeCategory(), typeName); typeCategoriesToTypeNamesMap.put(type.getTypeCategory(), typeName);
} }
} }
}
public class TransientTypeSystem extends TypeSystem { public class TransientTypeSystem extends TypeSystem {
......
...@@ -88,3 +88,7 @@ atlas.server.https.port=31443 ...@@ -88,3 +88,7 @@ atlas.server.https.port=31443
hbase.security.authentication=simple hbase.security.authentication=simple
atlas.hook.falcon.synchronous=true atlas.hook.falcon.synchronous=true
######### High Availability Configuration ########
atlas.server.ha.enabled=false
atlas.server.ids=id1
atlas.server.address.id1=localhost:21000
...@@ -32,12 +32,14 @@ import org.testng.annotations.Test; ...@@ -32,12 +32,14 @@ import org.testng.annotations.Test;
import scala.actors.threadpool.Arrays; import scala.actors.threadpool.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import static org.apache.atlas.typesystem.types.utils.TypesUtil.createClassTypeDef; import static org.apache.atlas.typesystem.types.utils.TypesUtil.createClassTypeDef;
import static org.apache.atlas.typesystem.types.utils.TypesUtil.createRequiredAttrDef; import static org.apache.atlas.typesystem.types.utils.TypesUtil.createRequiredAttrDef;
import static org.apache.atlas.typesystem.types.utils.TypesUtil.createStructTypeDef; import static org.apache.atlas.typesystem.types.utils.TypesUtil.createStructTypeDef;
import static org.apache.atlas.typesystem.types.utils.TypesUtil.createTraitTypeDef; import static org.apache.atlas.typesystem.types.utils.TypesUtil.createTraitTypeDef;
import static org.testng.Assert.assertTrue;
public class TypeSystemTest extends BaseTest { public class TypeSystemTest extends BaseTest {
...@@ -55,7 +57,7 @@ public class TypeSystemTest extends BaseTest { ...@@ -55,7 +57,7 @@ public class TypeSystemTest extends BaseTest {
public void testGetTypeNames() throws Exception { public void testGetTypeNames() throws Exception {
getTypeSystem().defineEnumType("enum_test", new EnumValue("0", 0), new EnumValue("1", 1), new EnumValue("2", 2), getTypeSystem().defineEnumType("enum_test", new EnumValue("0", 0), new EnumValue("1", 1), new EnumValue("2", 2),
new EnumValue("3", 3)); new EnumValue("3", 3));
Assert.assertTrue(getTypeSystem().getTypeNames().contains("enum_test")); assertTrue(getTypeSystem().getTypeNames().contains("enum_test"));
} }
@Test @Test
...@@ -65,7 +67,7 @@ public class TypeSystemTest extends BaseTest { ...@@ -65,7 +67,7 @@ public class TypeSystemTest extends BaseTest {
String typeDescription = typeName + description; String typeDescription = typeName + description;
getTypeSystem().defineEnumType(typeName, typeDescription, new EnumValue("0", 0), new EnumValue("1", 1), new EnumValue("2", 2), getTypeSystem().defineEnumType(typeName, typeDescription, new EnumValue("0", 0), new EnumValue("1", 1), new EnumValue("2", 2),
new EnumValue("3", 3)); new EnumValue("3", 3));
Assert.assertTrue(getTypeSystem().getTypeNames().contains(typeName)); assertTrue(getTypeSystem().getTypeNames().contains(typeName));
IDataType type = getTypeSystem().getDataType(EnumType.class, typeName); IDataType type = getTypeSystem().getDataType(EnumType.class, typeName);
Assert.assertNotNull(type); Assert.assertNotNull(type);
Assert.assertEquals(type.getDescription(), typeDescription); Assert.assertEquals(type.getDescription(), typeDescription);
...@@ -76,7 +78,7 @@ public class TypeSystemTest extends BaseTest { ...@@ -76,7 +78,7 @@ public class TypeSystemTest extends BaseTest {
.createTraitTypeDef(typeName, typeDescription, ImmutableSet.<String>of(), .createTraitTypeDef(typeName, typeDescription, ImmutableSet.<String>of(),
TypesUtil.createRequiredAttrDef("type", DataTypes.STRING_TYPE)); TypesUtil.createRequiredAttrDef("type", DataTypes.STRING_TYPE));
getTypeSystem().defineTraitType(trait); getTypeSystem().defineTraitType(trait);
Assert.assertTrue(getTypeSystem().getTypeNames().contains(typeName)); assertTrue(getTypeSystem().getTypeNames().contains(typeName));
type = getTypeSystem().getDataType(TraitType.class, typeName); type = getTypeSystem().getDataType(TraitType.class, typeName);
Assert.assertNotNull(type); Assert.assertNotNull(type);
Assert.assertEquals(type.getDescription(), typeDescription); Assert.assertEquals(type.getDescription(), typeDescription);
...@@ -87,7 +89,7 @@ public class TypeSystemTest extends BaseTest { ...@@ -87,7 +89,7 @@ public class TypeSystemTest extends BaseTest {
.createClassTypeDef(typeName, typeDescription, ImmutableSet.<String>of(), .createClassTypeDef(typeName, typeDescription, ImmutableSet.<String>of(),
TypesUtil.createRequiredAttrDef("type", DataTypes.STRING_TYPE)); TypesUtil.createRequiredAttrDef("type", DataTypes.STRING_TYPE));
getTypeSystem().defineClassType(classType); getTypeSystem().defineClassType(classType);
Assert.assertTrue(getTypeSystem().getTypeNames().contains(typeName)); assertTrue(getTypeSystem().getTypeNames().contains(typeName));
type = getTypeSystem().getDataType(ClassType.class, typeName); type = getTypeSystem().getDataType(ClassType.class, typeName);
Assert.assertNotNull(type); Assert.assertNotNull(type);
Assert.assertEquals(type.getDescription(), typeDescription); Assert.assertEquals(type.getDescription(), typeDescription);
...@@ -95,7 +97,7 @@ public class TypeSystemTest extends BaseTest { ...@@ -95,7 +97,7 @@ public class TypeSystemTest extends BaseTest {
typeName = "struct_type"; typeName = "struct_type";
typeDescription = typeName + description; typeDescription = typeName + description;
getTypeSystem().defineStructType(typeName, typeDescription, true, createRequiredAttrDef("a", DataTypes.INT_TYPE)); getTypeSystem().defineStructType(typeName, typeDescription, true, createRequiredAttrDef("a", DataTypes.INT_TYPE));
Assert.assertTrue(getTypeSystem().getTypeNames().contains(typeName)); assertTrue(getTypeSystem().getTypeNames().contains(typeName));
type = getTypeSystem().getDataType(StructType.class, typeName); type = getTypeSystem().getDataType(StructType.class, typeName);
Assert.assertNotNull(type); Assert.assertNotNull(type);
Assert.assertEquals(type.getDescription(), typeDescription); Assert.assertEquals(type.getDescription(), typeDescription);
...@@ -106,7 +108,7 @@ public class TypeSystemTest extends BaseTest { ...@@ -106,7 +108,7 @@ public class TypeSystemTest extends BaseTest {
public void testIsRegistered() throws Exception { public void testIsRegistered() throws Exception {
getTypeSystem().defineEnumType("enum_test", new EnumValue("0", 0), new EnumValue("1", 1), new EnumValue("2", 2), getTypeSystem().defineEnumType("enum_test", new EnumValue("0", 0), new EnumValue("1", 1), new EnumValue("2", 2),
new EnumValue("3", 3)); new EnumValue("3", 3));
Assert.assertTrue(getTypeSystem().isRegistered("enum_test")); assertTrue(getTypeSystem().isRegistered("enum_test"));
} }
@Test @Test
...@@ -182,9 +184,9 @@ public class TypeSystemTest extends BaseTest { ...@@ -182,9 +184,9 @@ public class TypeSystemTest extends BaseTest {
ClassType bc = ts.getDataType(ClassType.class, "B"); ClassType bc = ts.getDataType(ClassType.class, "B");
ClassType cc = ts.getDataType(ClassType.class, "C"); ClassType cc = ts.getDataType(ClassType.class, "C");
Assert.assertTrue(ac.compareTo(bc) < 0); assertTrue(ac.compareTo(bc) < 0);
Assert.assertTrue(bc.compareTo(cc) < 0); assertTrue(bc.compareTo(cc) < 0);
Assert.assertTrue(ac.compareTo(cc) < 0); assertTrue(ac.compareTo(cc) < 0);
} }
@Test @Test
...@@ -223,4 +225,31 @@ public class TypeSystemTest extends BaseTest { ...@@ -223,4 +225,31 @@ public class TypeSystemTest extends BaseTest {
Assert.assertEquals(traitNames.size(), 4); Assert.assertEquals(traitNames.size(), 4);
Assert.assertEquals(classNames.size(), 3); Assert.assertEquals(classNames.size(), 3);
} }
@Test
public void testTypeNamesAreNotDuplicated() {
TypeSystem typeSystem = getTypeSystem();
ImmutableList<String> traitNames = typeSystem.getTypeNamesByCategory(DataTypes.TypeCategory.TRAIT);
int numTraits = traitNames.size();
HashMap<String, IDataType> typesAdded = new HashMap<>();
String traitName = "dup_type_test" + random();
TraitType traitType = new TraitType(typeSystem, traitName, null, null, 0);
typesAdded.put(traitName, traitType);
typeSystem.commitTypes(typesAdded);
traitNames = typeSystem.getTypeNamesByCategory(DataTypes.TypeCategory.TRAIT);
Assert.assertEquals(traitNames.size(), numTraits+1);
// add again with another trait this time
traitName = "dup_type_test" + random();
TraitType traitTypeNew = new TraitType(typeSystem, traitName, null, null, 0);
typesAdded.put(traitName, traitTypeNew);
typeSystem.commitTypes(typesAdded);
traitNames = typeSystem.getTypeNamesByCategory(DataTypes.TypeCategory.TRAIT);
Assert.assertEquals(traitNames.size(), numTraits+2);
}
} }
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.atlas; package org.apache.atlas;
import org.apache.atlas.security.SecurityProperties;
import org.apache.atlas.web.service.EmbeddedServer; import org.apache.atlas.web.service.EmbeddedServer;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.GnuParser;
...@@ -96,10 +97,11 @@ public final class Atlas { ...@@ -96,10 +97,11 @@ public final class Atlas {
setApplicationHome(); setApplicationHome();
Configuration configuration = ApplicationProperties.get(); Configuration configuration = ApplicationProperties.get();
final String enableTLSFlag = configuration.getString("atlas.enableTLS"); final String enableTLSFlag = configuration.getString(SecurityProperties.TLS_ENABLED);
final int appPort = getApplicationPort(cmd, enableTLSFlag, configuration); final int appPort = getApplicationPort(cmd, enableTLSFlag, configuration);
System.setProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT, String.valueOf(appPort));
final boolean enableTLS = isTLSEnabled(enableTLSFlag, appPort); final boolean enableTLS = isTLSEnabled(enableTLSFlag, appPort);
configuration.setProperty("atlas.enableTLS", String.valueOf(enableTLS)); configuration.setProperty(SecurityProperties.TLS_ENABLED, String.valueOf(enableTLS));
showStartupInfo(buildConfiguration, enableTLS, appPort); showStartupInfo(buildConfiguration, enableTLS, appPort);
...@@ -147,7 +149,7 @@ public final class Atlas { ...@@ -147,7 +149,7 @@ public final class Atlas {
private static boolean isTLSEnabled(String enableTLSFlag, int appPort) { private static boolean isTLSEnabled(String enableTLSFlag, int appPort) {
return Boolean.valueOf(StringUtils.isEmpty(enableTLSFlag) ? return Boolean.valueOf(StringUtils.isEmpty(enableTLSFlag) ?
System.getProperty("atlas.enableTLS", (appPort % 1000) == 443 ? "true" : "false") : enableTLSFlag); System.getProperty(SecurityProperties.TLS_ENABLED, (appPort % 1000) == 443 ? "true" : "false") : enableTLSFlag);
} }
private static void showStartupInfo(PropertiesConfiguration buildConfiguration, boolean enableTLS, int appPort) { private static void showStartupInfo(PropertiesConfiguration buildConfiguration, boolean enableTLS, int appPort) {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.filters;
import org.apache.atlas.web.service.ActiveInstanceState;
import org.apache.atlas.web.service.ServiceState;
import org.apache.hadoop.http.HtmlQuoting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.HttpHeaders;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A servlet {@link Filter} that redirects web requests from a passive Atlas server instance to an active one.
*
* All requests to an active instance pass through. Requests received by a passive instance are redirected
* by identifying the currently active server. Requests to servers which are in transition are returned with
* an error SERVICE_UNAVAILABLE. Identification of this state is carried out using
* {@link ServiceState} and {@link ActiveInstanceState}.
*/
@Singleton
public class ActiveServerFilter implements Filter {
private static final Logger LOG = LoggerFactory.getLogger(ActiveServerFilter.class);
private final ActiveInstanceState activeInstanceState;
private ServiceState serviceState;
@Inject
public ActiveServerFilter(ActiveInstanceState activeInstanceState, ServiceState serviceState) {
this.activeInstanceState = activeInstanceState;
this.serviceState = serviceState;
}
@Override
public void init(FilterConfig filterConfig) throws ServletException {
LOG.info("ActiveServerFilter initialized");
}
/**
* Determines if this Atlas server instance is passive and redirects to active if so.
*
* @param servletRequest Request object from which the URL and other parameters are determined.
* @param servletResponse Response object to handle the redirect.
* @param filterChain Chain to pass through requests if the instance is Active.
* @throws IOException
* @throws ServletException
*/
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse,
FilterChain filterChain) throws IOException, ServletException {
if (isInstanceActive()) {
LOG.debug("Active. Passing request downstream");
filterChain.doFilter(servletRequest, servletResponse);
} else if (serviceState.isInstanceInTransition()) {
HttpServletResponse httpServletResponse = (HttpServletResponse) servletResponse;
LOG.error("Instance in transition. Service may not be ready to return a result");
httpServletResponse.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
} else {
HttpServletResponse httpServletResponse = (HttpServletResponse) servletResponse;
String activeServerAddress = activeInstanceState.getActiveServerAddress();
if (activeServerAddress == null) {
LOG.error("Could not retrieve active server address as it is null. Cannot redirect request {}",
((HttpServletRequest)servletRequest).getRequestURI());
httpServletResponse.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
} else {
handleRedirect((HttpServletRequest) servletRequest, httpServletResponse, activeServerAddress);
}
}
}
boolean isInstanceActive() {
return serviceState.getState() == ServiceState.ServiceStateValue.ACTIVE;
}
private void handleRedirect(HttpServletRequest servletRequest, HttpServletResponse httpServletResponse,
String activeServerAddress) throws IOException {
HttpServletRequest httpServletRequest = servletRequest;
String requestURI = httpServletRequest.getRequestURI();
String queryString = httpServletRequest.getQueryString();
if ((queryString != null) && (!queryString.isEmpty())) {
requestURI += "?" + queryString;
}
String quotedUri = HtmlQuoting.quoteHtmlChars(requestURI);
if (quotedUri == null) {
quotedUri = "/";
}
String redirectLocation = activeServerAddress + quotedUri;
LOG.info("Not active. Redirecting to {}", redirectLocation);
// A POST/PUT/DELETE require special handling by sending HTTP 307 instead of the regular 301/302.
// Reference: http://stackoverflow.com/questions/2068418/whats-the-difference-between-a-302-and-a-307-redirect
if (isUnsafeHttpMethod(httpServletRequest)) {
httpServletResponse.setHeader(HttpHeaders.LOCATION, redirectLocation);
httpServletResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT);
} else {
httpServletResponse.sendRedirect(redirectLocation);
}
}
private boolean isUnsafeHttpMethod(HttpServletRequest httpServletRequest) {
String method = httpServletRequest.getMethod();
return (method.equals(HttpMethod.POST)) ||
(method.equals(HttpMethod.PUT)) ||
(method.equals(HttpMethod.DELETE));
}
@Override
public void destroy() {
}
}
...@@ -34,11 +34,14 @@ import org.apache.atlas.ApplicationProperties; ...@@ -34,11 +34,14 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.RepositoryMetadataModule; import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.notification.NotificationModule; import org.apache.atlas.notification.NotificationModule;
import org.apache.atlas.repository.graph.GraphProvider; import org.apache.atlas.repository.graph.GraphProvider;
import org.apache.atlas.service.Services; import org.apache.atlas.service.Services;
import org.apache.atlas.web.filters.ActiveServerFilter;
import org.apache.atlas.web.filters.AtlasAuthenticationFilter; import org.apache.atlas.web.filters.AtlasAuthenticationFilter;
import org.apache.atlas.web.filters.AuditFilter; import org.apache.atlas.web.filters.AuditFilter;
import org.apache.atlas.web.service.ActiveInstanceElectorModule;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.ConfigurationException;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -72,11 +75,26 @@ public class GuiceServletConfig extends GuiceServletContextListener { ...@@ -72,11 +75,26 @@ public class GuiceServletConfig extends GuiceServletContextListener {
LoginProcessor loginProcessor = new LoginProcessor(); LoginProcessor loginProcessor = new LoginProcessor();
loginProcessor.login(); loginProcessor.login();
injector = Guice.createInjector(getRepositoryModule(), new NotificationModule(), injector = Guice.createInjector(getRepositoryModule(), new ActiveInstanceElectorModule(),
new JerseyServletModule() { new NotificationModule(), new JerseyServletModule() {
private Configuration appConfiguration = null;
private Configuration getConfiguration() {
if (appConfiguration == null) {
try {
appConfiguration = ApplicationProperties.get();
} catch (AtlasException e) {
LOG.warn("Could not load application configuration", e);
}
}
return appConfiguration;
}
@Override @Override
protected void configureServlets() { protected void configureServlets() {
filter("/*").through(AuditFilter.class); filter("/*").through(AuditFilter.class);
configureActiveServerFilterIfNecessary();
try { try {
configureAuthenticationFilter(); configureAuthenticationFilter();
} catch (ConfigurationException e) { } catch (ConfigurationException e) {
...@@ -92,16 +110,25 @@ public class GuiceServletConfig extends GuiceServletContextListener { ...@@ -92,16 +110,25 @@ public class GuiceServletConfig extends GuiceServletContextListener {
serve("/" + AtlasClient.BASE_URI + "*").with(GuiceContainer.class, params); serve("/" + AtlasClient.BASE_URI + "*").with(GuiceContainer.class, params);
} }
private void configureActiveServerFilterIfNecessary() {
Configuration configuration = getConfiguration();
if ((configuration == null) ||
!HAConfiguration.isHAEnabled(configuration)) {
LOG.info("HA configuration is disabled, not activating ActiveServerFilter");
} else {
filter("/*").through(ActiveServerFilter.class);
}
}
private void configureAuthenticationFilter() throws ConfigurationException { private void configureAuthenticationFilter() throws ConfigurationException {
try { Configuration configuration = getConfiguration();
Configuration configuration = ApplicationProperties.get(); if (configuration == null) {
throw new ConfigurationException("Could not load application configuration");
}
if (Boolean.valueOf(configuration.getString(HTTP_AUTHENTICATION_ENABLED))) { if (Boolean.valueOf(configuration.getString(HTTP_AUTHENTICATION_ENABLED))) {
LOG.info("Enabling AuthenticationFilter"); LOG.info("Enabling AuthenticationFilter");
filter("/*").through(AtlasAuthenticationFilter.class); filter("/*").through(AtlasAuthenticationFilter.class);
} }
} catch (AtlasException e) {
LOG.warn("Error loading configuration and initializing authentication filter", e);
}
} }
}); });
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.service;
import com.google.inject.AbstractModule;
import com.google.inject.multibindings.Multibinder;
import org.apache.atlas.notification.NotificationHookConsumer;
import org.apache.atlas.repository.audit.HBaseBasedAuditRepository;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.service.Service;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.services.DefaultMetadataService;
import org.apache.atlas.web.filters.ActiveServerFilter;
/**
* A Guice module that registers the handlers of High Availability state change handlers and other services.
*
* Any new handler that should react to HA state change should be registered here.
*/
public class ActiveInstanceElectorModule extends AbstractModule {
@Override
protected void configure() {
Multibinder<ActiveStateChangeHandler> activeStateChangeHandlerBinder =
Multibinder.newSetBinder(binder(), ActiveStateChangeHandler.class);
activeStateChangeHandlerBinder.addBinding().to(GraphBackedSearchIndexer.class);
activeStateChangeHandlerBinder.addBinding().to(DefaultMetadataService.class);
activeStateChangeHandlerBinder.addBinding().to(NotificationHookConsumer.class);
activeStateChangeHandlerBinder.addBinding().to(HBaseBasedAuditRepository.class);
Multibinder<Service> serviceBinder = Multibinder.newSetBinder(binder(), Service.class);
serviceBinder.addBinding().to(ActiveInstanceElectorService.class);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.service;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
/**
* A service that implements leader election to determine whether this Atlas server is Active.
*
* The service implements leader election through <a href="http://curator.apache.org/">Curator</a>'s
* {@link LeaderLatch} recipe. The service also implements {@link LeaderLatchListener} to get
* notified of changes to leadership state. Upon becoming leader, this instance is treated as the
* active Atlas instance and calls {@link ActiveStateChangeHandler}s to activate them. Conversely,
* on being removed from leadership, this instance is treated as a passive instance and calls
* {@link ActiveStateChangeHandler}s to deactivate them.
*/
@Singleton
public class ActiveInstanceElectorService implements Service, LeaderLatchListener {
private static final Logger LOG = LoggerFactory.getLogger(ActiveInstanceElectorService.class);
private final Configuration configuration;
private final ServiceState serviceState;
private final ActiveInstanceState activeInstanceState;
private Collection<Provider<ActiveStateChangeHandler>> activeStateChangeHandlerProviders;
private Collection<ActiveStateChangeHandler> activeStateChangeHandlers;
private CuratorFactory curatorFactory;
private LeaderLatch leaderLatch;
private String serverId;
/**
* Create a new instance of {@link ActiveInstanceElectorService}
* @param activeStateChangeHandlerProviders The list of registered {@link ActiveStateChangeHandler}s that
* must be called back on state changes.
* @throws AtlasException
*/
@Inject
public ActiveInstanceElectorService(
Collection<Provider<ActiveStateChangeHandler>> activeStateChangeHandlerProviders,
CuratorFactory curatorFactory, ActiveInstanceState activeInstanceState,
ServiceState serviceState)
throws AtlasException {
this(ApplicationProperties.get(), activeStateChangeHandlerProviders,
curatorFactory, activeInstanceState, serviceState);
}
ActiveInstanceElectorService(Configuration configuration,
Collection<Provider<ActiveStateChangeHandler>> activeStateChangeHandlerProviders,
CuratorFactory curatorFactory, ActiveInstanceState activeInstanceState,
ServiceState serviceState) {
this.configuration = configuration;
this.activeStateChangeHandlerProviders = activeStateChangeHandlerProviders;
this.activeStateChangeHandlers = new ArrayList<>();
this.curatorFactory = curatorFactory;
this.activeInstanceState = activeInstanceState;
this.serviceState = serviceState;
}
/**
* Join leader election on starting up.
*
* If Atlas High Availability configuration is disabled, this operation is a no-op.
* @throws AtlasException
*/
@Override
public void start() throws AtlasException {
if (!HAConfiguration.isHAEnabled(configuration)) {
LOG.info("HA is not enabled, no need to start leader election service");
return;
}
cacheActiveStateChangeHandlers();
serverId = HAConfiguration.getAtlasServerId(configuration);
joinElection();
}
private void joinElection() {
LOG.info("Starting leader election for {}", serverId);
leaderLatch = curatorFactory.leaderLatchInstance(serverId);
leaderLatch.addListener(this);
try {
leaderLatch.start();
LOG.info("Leader latch started for {}.", serverId);
} catch (Exception e) {
LOG.info("Exception while starting leader latch for {}.", serverId, e);
}
}
/**
* Leave leader election process and clean up resources on shutting down.
*
* If Atlas High Availability configuration is disabled, this operation is a no-op.
* @throws AtlasException
*/
@Override
public void stop() {
if (!HAConfiguration.isHAEnabled(configuration)) {
LOG.info("HA is not enabled, no need to stop leader election service");
return;
}
try {
leaderLatch.close();
curatorFactory.close();
} catch (IOException e) {
LOG.error("Error closing leader latch", e);
}
}
/**
* Call all registered {@link ActiveStateChangeHandler}s on being elected active.
*
* In addition, shared state information about this instance becoming active is updated
* using {@link ActiveInstanceState}.
*/
@Override
public void isLeader() {
LOG.warn("Server instance with server id {} is elected as leader", serverId);
serviceState.becomingActive();
try {
for (ActiveStateChangeHandler handler : activeStateChangeHandlers) {
handler.instanceIsActive();
}
activeInstanceState.update(serverId);
serviceState.setActive();
} catch (Exception e) {
LOG.error("Got exception while activating", e);
notLeader();
rejoinElection();
}
}
private void cacheActiveStateChangeHandlers() {
if (activeStateChangeHandlers.size()==0) {
for (Provider<ActiveStateChangeHandler> provider : activeStateChangeHandlerProviders) {
ActiveStateChangeHandler handler = provider.get();
activeStateChangeHandlers.add(handler);
}
}
}
private void rejoinElection() {
try {
leaderLatch.close();
joinElection();
} catch (IOException e) {
LOG.error("Error rejoining election", e);
}
}
/**
* Call all registered {@link ActiveStateChangeHandler}s on becoming passive instance.
*/
@Override
public void notLeader() {
LOG.warn("Server instance with server id {} is removed as leader", serverId);
serviceState.becomingPassive();
for (ActiveStateChangeHandler handler: activeStateChangeHandlers) {
try {
handler.instanceIsPassive();
} catch (AtlasException e) {
LOG.error("Error while reacting to passive state.", e);
}
}
serviceState.setPassive();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.service;
import com.google.inject.Inject;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.Charset;
/**
* An object that encapsulates storing and retrieving state related to an Active Atlas server.
*
* The current implementation uses Zookeeper to store and read this state from. It does this
* under a read-write lock implemented using Curator's {@link InterProcessReadWriteLock} to
* provide for safety across multiple processes.
*/
public class ActiveInstanceState {
private final Configuration configuration;
private final CuratorFactory curatorFactory;
public static final String APACHE_ATLAS_ACTIVE_SERVER_INFO = "/apache_atlas_active_server_info";
private static final Logger LOG = LoggerFactory.getLogger(ActiveInstanceState.class);
/**
* Create a new instance of {@link ActiveInstanceState}.
* @param curatorFactory an instance of {@link CuratorFactory} to get the {@link InterProcessReadWriteLock}
* @throws AtlasException
*/
@Inject
public ActiveInstanceState(CuratorFactory curatorFactory) throws AtlasException {
this(ApplicationProperties.get(), curatorFactory);
}
/**
* Create a new instance of {@link ActiveInstanceState}.
* @param configuration an instance of {@link Configuration} created from Atlas configuration
* @param curatorFactory an instance of {@link CuratorFactory} to get the {@link InterProcessReadWriteLock}
* @throws AtlasException
*/
public ActiveInstanceState(Configuration configuration, CuratorFactory curatorFactory) {
this.configuration = configuration;
this.curatorFactory = curatorFactory;
}
/**
* Update state of the active server instance.
*
* This method writes this instance's Server Address to a shared node in Zookeeper.
* This information is used by other passive instances to locate the current active server.
* @throws Exception
* @param serverId ID of this server instance
*/
public void update(String serverId) throws Exception {
CuratorFramework client = curatorFactory.clientInstance();
String atlasServerAddress = HAConfiguration.getBoundAddressForId(configuration, serverId);
Stat serverInfo = client.checkExists().forPath(APACHE_ATLAS_ACTIVE_SERVER_INFO);
if (serverInfo == null) {
client.create().withMode(CreateMode.EPHEMERAL).forPath(APACHE_ATLAS_ACTIVE_SERVER_INFO);
}
client.setData().forPath(APACHE_ATLAS_ACTIVE_SERVER_INFO,
atlasServerAddress.getBytes(Charset.forName("UTF-8")));
}
/**
* Retrieve state of the active server instance.
*
* This method reads the active server location from the shared node in Zookeeper.
* @return the active server's address and port of form http://host-or-ip:port
*/
public String getActiveServerAddress() {
CuratorFramework client = curatorFactory.clientInstance();
String serverAddress = null;
try {
byte[] bytes = client.getData().forPath(APACHE_ATLAS_ACTIVE_SERVER_INFO);
serverAddress = new String(bytes, Charset.forName("UTF-8"));
} catch (Exception e) {
LOG.error("Error getting active server address", e);
}
return serverAddress;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.service;
import com.google.inject.Singleton;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
* A factory to create objects related to Curator.
*
* Allows for stubbing in tests.
*/
@Singleton
public class CuratorFactory {
public static final String APACHE_ATLAS_LEADER_ELECTOR_PATH = "/apache_atlas_leader_elector_path";
private final Configuration configuration;
private CuratorFramework curatorFramework;
/**
* Initializes the {@link CuratorFramework} that is used for all interaction with Zookeeper.
* @throws AtlasException
*/
public CuratorFactory() throws AtlasException {
configuration = ApplicationProperties.get();
initializeCuratorFramework();
}
private void initializeCuratorFramework() {
HAConfiguration.ZookeeperProperties zookeeperProperties =
HAConfiguration.getZookeeperProperties(configuration);
curatorFramework = CuratorFrameworkFactory.builder().
connectString(zookeeperProperties.getConnectString()).
sessionTimeoutMs(zookeeperProperties.getSessionTimeout()).
retryPolicy(new ExponentialBackoffRetry(
zookeeperProperties.getRetriesSleepTimeMillis(), zookeeperProperties.getNumRetries())).build();
curatorFramework.start();
}
/**
* Cleanup resources related to {@link CuratorFramework}.
*
* After this call, no further calls to any curator objects should be done.
*/
public void close() {
curatorFramework.close();
}
/**
* Returns a pre-created instance of {@link CuratorFramework}.
*
* This method can be called any number of times to access the {@link CuratorFramework} used in the
* application.
* @return
*/
public CuratorFramework clientInstance() {
return curatorFramework;
}
/**
* Create a new instance {@link LeaderLatch}
* @param serverId the ID used to register this instance with curator.
* This ID should typically be obtained using
* {@link HAConfiguration#getAtlasServerId(Configuration)}
* @return
*/
public LeaderLatch leaderLatchInstance(String serverId) {
return new LeaderLatch(curatorFramework, APACHE_ATLAS_LEADER_ELECTOR_PATH, serverId);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.service;
import com.google.common.base.Preconditions;
import com.google.inject.Singleton;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A class that maintains the state of this instance.
*
* The states are maintained at a granular level, including in-transition states. The transitions are
* directed by {@link ActiveInstanceElectorService}.
*/
@Singleton
public class ServiceState {
private static final Logger LOG = LoggerFactory.getLogger(ServiceState.class);
public enum ServiceStateValue {
ACTIVE,
PASSIVE,
BECOMING_ACTIVE,
BECOMING_PASSIVE
}
private Configuration configuration;
private volatile ServiceStateValue state;
public ServiceState() throws AtlasException {
this(ApplicationProperties.get());
}
public ServiceState(Configuration configuration) {
this.configuration = configuration;
state = !HAConfiguration.isHAEnabled(configuration) ?
ServiceStateValue.ACTIVE : ServiceStateValue.PASSIVE;
}
public ServiceStateValue getState() {
return state;
}
public void becomingActive() {
LOG.warn("Instance becoming active from {}", state);
setState(ServiceStateValue.BECOMING_ACTIVE);
}
private void setState(ServiceStateValue newState) {
Preconditions.checkState(HAConfiguration.isHAEnabled(configuration),
"Cannot change state as requested, as HA is not enabled for this instance.");
state = newState;
}
public void setActive() {
LOG.warn("Instance is active from {}", state);
setState(ServiceStateValue.ACTIVE);
}
public void becomingPassive() {
LOG.warn("Instance becoming passive from {}", state);
setState(ServiceStateValue.BECOMING_PASSIVE);
}
public void setPassive() {
LOG.warn("Instance is passive from {}", state);
setState(ServiceStateValue.PASSIVE);
}
public boolean isInstanceInTransition() {
ServiceStateValue state = getState();
return state == ServiceStateValue.BECOMING_ACTIVE
|| state == ServiceStateValue.BECOMING_PASSIVE;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.filters;
import org.apache.atlas.web.service.ActiveInstanceState;
import org.apache.atlas.web.service.ServiceState;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.HttpMethod;
import java.io.IOException;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
public class ActiveServerFilterTest {
public static final String ACTIVE_SERVER_ADDRESS = "http://localhost:21000/";
@Mock
private ActiveInstanceState activeInstanceState;
@Mock
private HttpServletRequest servletRequest;
@Mock
private HttpServletResponse servletResponse;
@Mock
private FilterChain filterChain;
@Mock
private ServiceState serviceState;
@BeforeMethod
public void setUp() {
MockitoAnnotations.initMocks(this);
}
@Test
public void testShouldPassThroughRequestsIfActive() throws IOException, ServletException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState);
activeServerFilter.doFilter(servletRequest, servletResponse, filterChain);
verify(filterChain).doFilter(servletRequest, servletResponse);
}
@Test
public void testShouldFailIfCannotRetrieveActiveServerAddress() throws IOException, ServletException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState);
when(activeInstanceState.getActiveServerAddress()).thenReturn(null);
activeServerFilter.doFilter(servletRequest, servletResponse, filterChain);
verify(servletResponse).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
}
@Test
public void testShouldRedirectRequestToActiveServerAddress() throws IOException, ServletException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState);
when(activeInstanceState.getActiveServerAddress()).thenReturn(ACTIVE_SERVER_ADDRESS);
when(servletRequest.getRequestURI()).thenReturn("types");
when(servletRequest.getMethod()).thenReturn(HttpMethod.GET);
activeServerFilter.doFilter(servletRequest, servletResponse, filterChain);
verify(servletResponse).sendRedirect(ACTIVE_SERVER_ADDRESS+"types");
}
@Test
public void testRedirectedRequestShouldContainQueryParameters() throws IOException, ServletException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState);
when(activeInstanceState.getActiveServerAddress()).thenReturn(ACTIVE_SERVER_ADDRESS);
when(servletRequest.getMethod()).thenReturn(HttpMethod.GET);
when(servletRequest.getRequestURI()).thenReturn("types");
when(servletRequest.getQueryString()).thenReturn("query=TRAIT");
activeServerFilter.doFilter(servletRequest, servletResponse, filterChain);
verify(servletResponse).sendRedirect(ACTIVE_SERVER_ADDRESS+"types?query=TRAIT");
}
@Test
public void testShouldRedirectPOSTRequest() throws IOException, ServletException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState);
when(activeInstanceState.getActiveServerAddress()).thenReturn(ACTIVE_SERVER_ADDRESS);
when(servletRequest.getMethod()).thenReturn(HttpMethod.POST);
when(servletRequest.getRequestURI()).thenReturn("types");
activeServerFilter.doFilter(servletRequest, servletResponse, filterChain);
verify(servletResponse).setHeader("Location", ACTIVE_SERVER_ADDRESS+"types");
verify(servletResponse).setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT);
}
@Test
public void testShouldRedirectPUTRequest() throws IOException, ServletException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState);
when(activeInstanceState.getActiveServerAddress()).thenReturn(ACTIVE_SERVER_ADDRESS);
when(servletRequest.getMethod()).thenReturn(HttpMethod.PUT);
when(servletRequest.getRequestURI()).thenReturn("types");
activeServerFilter.doFilter(servletRequest, servletResponse, filterChain);
verify(servletResponse).setHeader("Location", ACTIVE_SERVER_ADDRESS+"types");
verify(servletResponse).setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT);
}
@Test
public void testShouldRedirectDELETERequest() throws IOException, ServletException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState);
when(activeInstanceState.getActiveServerAddress()).thenReturn(ACTIVE_SERVER_ADDRESS);
when(servletRequest.getMethod()).thenReturn(HttpMethod.DELETE);
when(servletRequest.getRequestURI()).
thenReturn("api/atlas/entities/6ebb039f-eaa5-4b9c-ae44-799c7910545d/traits/test_tag_ha3");
activeServerFilter.doFilter(servletRequest, servletResponse, filterChain);
verify(servletResponse).setHeader("Location", ACTIVE_SERVER_ADDRESS
+ "api/atlas/entities/6ebb039f-eaa5-4b9c-ae44-799c7910545d/traits/test_tag_ha3");
verify(servletResponse).setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT);
}
@Test
public void testShouldReturnServiceUnavailableIfStateBecomingActive() throws IOException, ServletException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.BECOMING_ACTIVE);
ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState);
activeServerFilter.doFilter(servletRequest, servletResponse, filterChain);
verify(servletResponse).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.service;
import com.google.inject.Provider;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.commons.configuration.Configuration;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class ActiveInstanceElectorServiceTest {
@Mock
private Configuration configuration;
@Mock
private CuratorFactory curatorFactory;
@Mock
private ActiveInstanceState activeInstanceState;
@Mock
private ServiceState serviceState;
@BeforeMethod
public void setup() {
System.setProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT, "21000");
MockitoAnnotations.initMocks(this);
}
@Test
public void testLeaderElectionIsJoinedOnStart() throws Exception {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"});
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000");
LeaderLatch leaderLatch = mock(LeaderLatch.class);
when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch);
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new ArrayList(), curatorFactory,
activeInstanceState, serviceState);
activeInstanceElectorService.start();
verify(leaderLatch).start();
}
@Test
public void testListenerIsAddedForActiveInstanceCallbacks() throws Exception {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"});
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000");
LeaderLatch leaderLatch = mock(LeaderLatch.class);
when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch);
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new ArrayList(), curatorFactory,
activeInstanceState, serviceState);
activeInstanceElectorService.start();
verify(leaderLatch).addListener(activeInstanceElectorService);
}
@Test
public void testLeaderElectionIsNotStartedIfNotInHAMode() throws AtlasException {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new ArrayList(), curatorFactory,
activeInstanceState, serviceState);
activeInstanceElectorService.start();
verifyZeroInteractions(curatorFactory);
}
@Test
public void testLeaderElectionIsLeftOnStop() throws IOException, AtlasException {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"});
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000");
LeaderLatch leaderLatch = mock(LeaderLatch.class);
when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch);
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new ArrayList(), curatorFactory,
activeInstanceState, serviceState);
activeInstanceElectorService.start();
activeInstanceElectorService.stop();
verify(leaderLatch).close();
}
@Test
public void testCuratorFactoryIsClosedOnStop() throws AtlasException {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"});
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000");
LeaderLatch leaderLatch = mock(LeaderLatch.class);
when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch);
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new ArrayList(), curatorFactory,
activeInstanceState, serviceState);
activeInstanceElectorService.start();
activeInstanceElectorService.stop();
verify(curatorFactory).close();
}
@Test
public void testNoActionOnStopIfHAModeIsDisabled() {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new ArrayList(), curatorFactory,
activeInstanceState, serviceState);
activeInstanceElectorService.stop();
verifyZeroInteractions(curatorFactory);
}
@Test
public void testRegisteredHandlersAreNotifiedWhenInstanceIsActive() throws AtlasException {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"});
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000");
LeaderLatch leaderLatch = mock(LeaderLatch.class);
when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch);
Collection<Provider<ActiveStateChangeHandler>> changeHandlers = new ArrayList();
final ActiveStateChangeHandler handler1 = mock(ActiveStateChangeHandler.class);
final ActiveStateChangeHandler handler2 = mock(ActiveStateChangeHandler.class);
changeHandlers.add(new Provider<ActiveStateChangeHandler>() {
@Override
public ActiveStateChangeHandler get() {
return handler1;
}
});
changeHandlers.add(new Provider<ActiveStateChangeHandler>() {
@Override
public ActiveStateChangeHandler get() {
return handler2;
}
});
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory,
activeInstanceState, serviceState);
activeInstanceElectorService.start();
activeInstanceElectorService.isLeader();
verify(handler1).instanceIsActive();
verify(handler2).instanceIsActive();
}
@Test
public void testSharedStateIsUpdatedWhenInstanceIsActive() throws Exception {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"});
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000");
LeaderLatch leaderLatch = mock(LeaderLatch.class);
when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch);
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new ArrayList(), curatorFactory,
activeInstanceState, serviceState);
activeInstanceElectorService.start();
activeInstanceElectorService.isLeader();
verify(activeInstanceState).update("id1");
}
@Test
public void testRegisteredHandlersAreNotifiedOfPassiveWhenStateUpdateFails() throws Exception {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"});
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000");
LeaderLatch leaderLatch = mock(LeaderLatch.class);
when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch);
Collection<Provider<ActiveStateChangeHandler>> changeHandlers = new ArrayList();
final ActiveStateChangeHandler handler1 = mock(ActiveStateChangeHandler.class);
final ActiveStateChangeHandler handler2 = mock(ActiveStateChangeHandler.class);
changeHandlers.add(new Provider<ActiveStateChangeHandler>() {
@Override
public ActiveStateChangeHandler get() {
return handler1;
}
});
changeHandlers.add(new Provider<ActiveStateChangeHandler>() {
@Override
public ActiveStateChangeHandler get() {
return handler2;
}
});
doThrow(new Exception()).when(activeInstanceState).update("id1");
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory,
activeInstanceState, serviceState);
activeInstanceElectorService.start();
activeInstanceElectorService.isLeader();
verify(handler1).instanceIsPassive();
verify(handler2).instanceIsPassive();
}
@Test
public void testElectionIsRejoinedWhenStateUpdateFails() throws Exception {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"});
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000");
LeaderLatch leaderLatch = mock(LeaderLatch.class);
when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch);
doThrow(new Exception()).when(activeInstanceState).update("id1");
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new ArrayList(), curatorFactory,
activeInstanceState, serviceState);
activeInstanceElectorService.start();
activeInstanceElectorService.isLeader();
InOrder inOrder = inOrder(leaderLatch, curatorFactory);
inOrder.verify(leaderLatch).close();
inOrder.verify(curatorFactory).leaderLatchInstance("id1");
inOrder.verify(leaderLatch).addListener(activeInstanceElectorService);
inOrder.verify(leaderLatch).start();
}
@Test
public void testRegisteredHandlersAreNotifiedOfPassiveWhenInstanceIsPassive() throws AtlasException {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"});
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000");
LeaderLatch leaderLatch = mock(LeaderLatch.class);
when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch);
Collection<Provider<ActiveStateChangeHandler>> changeHandlers = new ArrayList();
final ActiveStateChangeHandler handler1 = mock(ActiveStateChangeHandler.class);
final ActiveStateChangeHandler handler2 = mock(ActiveStateChangeHandler.class);
changeHandlers.add(new Provider<ActiveStateChangeHandler>() {
@Override
public ActiveStateChangeHandler get() {
return handler1;
}
});
changeHandlers.add(new Provider<ActiveStateChangeHandler>() {
@Override
public ActiveStateChangeHandler get() {
return handler2;
}
});
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory,
activeInstanceState, serviceState);
activeInstanceElectorService.start();
activeInstanceElectorService.notLeader();
verify(handler1).instanceIsPassive();
verify(handler2).instanceIsPassive();
}
@Test
public void testActiveStateSetOnBecomingLeader() {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new ArrayList(),
curatorFactory, activeInstanceState, serviceState);
activeInstanceElectorService.isLeader();
InOrder inOrder = inOrder(serviceState);
inOrder.verify(serviceState).becomingActive();
inOrder.verify(serviceState).setActive();
}
@Test
public void testPassiveStateSetOnLoosingLeadership() {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new ArrayList(),
curatorFactory, activeInstanceState, serviceState);
activeInstanceElectorService.notLeader();
InOrder inOrder = inOrder(serviceState);
inOrder.verify(serviceState).becomingPassive();
inOrder.verify(serviceState).setPassive();
}
@Test
public void testPassiveStateSetIfActivationFails() throws Exception {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"});
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000");
LeaderLatch leaderLatch = mock(LeaderLatch.class);
when(curatorFactory.leaderLatchInstance("id1")).thenReturn(leaderLatch);
doThrow(new Exception()).when(activeInstanceState).update("id1");
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new ArrayList(),
curatorFactory, activeInstanceState, serviceState);
activeInstanceElectorService.start();
activeInstanceElectorService.isLeader();
InOrder inOrder = inOrder(serviceState);
inOrder.verify(serviceState).becomingActive();
inOrder.verify(serviceState).becomingPassive();
inOrder.verify(serviceState).setPassive();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.service;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CreateBuilder;
import org.apache.curator.framework.api.ExistsBuilder;
import org.apache.curator.framework.api.GetDataBuilder;
import org.apache.curator.framework.api.SetDataBuilder;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import java.nio.charset.Charset;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import static org.testng.Assert.assertNull;
public class ActiveInstanceStateTest {
private static final String HOST_PORT = "127.0.0.1:21000";
public static final String SERVER_ADDRESS = "http://" + HOST_PORT;
@Mock
private Configuration configuration;
@Mock
private CuratorFactory curatorFactory;
@Mock
private CuratorFramework curatorFramework;
@BeforeTest
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Test
public void testSharedPathIsCreatedIfNotExists() throws Exception {
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn(HOST_PORT);
when(curatorFactory.clientInstance()).thenReturn(curatorFramework);
ExistsBuilder existsBuilder = mock(ExistsBuilder.class);
when(curatorFramework.checkExists()).thenReturn(existsBuilder);
when(existsBuilder.forPath(ActiveInstanceState.APACHE_ATLAS_ACTIVE_SERVER_INFO)).thenReturn(null);
CreateBuilder createBuilder = mock(CreateBuilder.class);
when(curatorFramework.create()).thenReturn(createBuilder);
when(createBuilder.withMode(CreateMode.EPHEMERAL)).thenReturn(createBuilder);
SetDataBuilder setDataBuilder = mock(SetDataBuilder.class);
when(curatorFramework.setData()).thenReturn(setDataBuilder);
ActiveInstanceState activeInstanceState = new ActiveInstanceState(configuration, curatorFactory);
activeInstanceState.update("id1");
verify(createBuilder).forPath(ActiveInstanceState.APACHE_ATLAS_ACTIVE_SERVER_INFO);
}
@Test
public void testDataIsUpdatedWithAtlasServerAddress() throws Exception {
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn(HOST_PORT);
when(curatorFactory.clientInstance()).thenReturn(curatorFramework);
ExistsBuilder existsBuilder = mock(ExistsBuilder.class);
when(curatorFramework.checkExists()).thenReturn(existsBuilder);
when(existsBuilder.forPath(ActiveInstanceState.APACHE_ATLAS_ACTIVE_SERVER_INFO)).thenReturn(new Stat());
SetDataBuilder setDataBuilder = mock(SetDataBuilder.class);
when(curatorFramework.setData()).thenReturn(setDataBuilder);
ActiveInstanceState activeInstanceState = new ActiveInstanceState(configuration, curatorFactory);
activeInstanceState.update("id1");
verify(setDataBuilder).forPath(
ActiveInstanceState.APACHE_ATLAS_ACTIVE_SERVER_INFO,
SERVER_ADDRESS.getBytes(Charset.forName("UTF-8")));
}
@Test
public void testShouldReturnActiveServerAddress() throws Exception {
when(curatorFactory.clientInstance()).thenReturn(curatorFramework);
GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
when(curatorFramework.getData()).thenReturn(getDataBuilder);
when(getDataBuilder.forPath(ActiveInstanceState.APACHE_ATLAS_ACTIVE_SERVER_INFO)).
thenReturn(SERVER_ADDRESS.getBytes(Charset.forName("UTF-8")));
ActiveInstanceState activeInstanceState = new ActiveInstanceState(configuration, curatorFactory);
String actualServerAddress = activeInstanceState.getActiveServerAddress();
assertEquals(SERVER_ADDRESS, actualServerAddress);
}
@Test
public void testShouldHandleExceptionsInFetchingServerAddress() throws Exception {
when(curatorFactory.clientInstance()).thenReturn(curatorFramework);
GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
when(curatorFramework.getData()).thenReturn(getDataBuilder);
when(getDataBuilder.forPath(ActiveInstanceState.APACHE_ATLAS_ACTIVE_SERVER_INFO)).
thenThrow(new Exception());
ActiveInstanceState activeInstanceState = new ActiveInstanceState(configuration, curatorFactory);
assertNull(activeInstanceState.getActiveServerAddress());
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.service;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.commons.configuration.Configuration;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
public class ServiceStateTest {
@Mock
private Configuration configuration;
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Test
public void testShouldBeActiveIfHAIsDisabled() {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
ServiceState serviceState = new ServiceState(configuration);
assertEquals(ServiceState.ServiceStateValue.ACTIVE, serviceState.getState());
}
@Test(expectedExceptions = IllegalStateException.class)
public void testShouldDisallowTransitionIfHAIsDisabled() {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
ServiceState serviceState = new ServiceState(configuration);
serviceState.becomingPassive();
fail("Should not allow transition");
}
@Test
public void testShouldChangeStateIfHAIsEnabled() {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
ServiceState serviceState = new ServiceState(configuration);
serviceState.becomingPassive();
assertEquals(ServiceState.ServiceStateValue.BECOMING_PASSIVE, serviceState.getState());
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment