Commit afb9e618 by Suma Shivaprasad

ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after…

ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown (yhmenath via sumasai)
parent 6dfbda11
......@@ -67,5 +67,11 @@
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
......@@ -19,6 +19,7 @@
package org.apache.atlas;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig;
......@@ -67,6 +68,7 @@ public class AtlasClient {
public static final String DATATYPE = "dataType";
public static final String BASE_URI = "api/atlas/";
public static final String ADMIN_VERSION = "admin/version";
public static final String TYPES = "types";
public static final String URI_ENTITY = "entities";
public static final String URI_SEARCH = "discovery/search";
......@@ -126,11 +128,29 @@ public class AtlasClient {
service = client.resource(UriBuilder.fromUri(baseUrl).build());
}
// for testing
AtlasClient(WebResource service) {
this.service = service;
}
protected Configuration getClientProperties() throws AtlasException {
return ApplicationProperties.get();
}
enum API {
public boolean isServerReady() throws AtlasServiceException {
WebResource resource = getResource(API.VERSION);
try {
callAPIWithResource(API.VERSION, resource);
return true;
} catch (ClientHandlerException che) {
return false;
}
}
public enum API {
//Admin operations
VERSION(BASE_URI + ADMIN_VERSION, HttpMethod.GET, Response.Status.OK),
//Type operations
CREATE_TYPE(BASE_URI + TYPES, HttpMethod.POST, Response.Status.CREATED),
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import org.testng.annotations.Test;
import javax.ws.rs.core.Response;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class AtlasClientTest {
@Test
public void shouldVerifyServerIsReady() throws AtlasServiceException {
WebResource webResource = mock(WebResource.class);
AtlasClient atlasClient = new AtlasClient(webResource);
WebResource.Builder builder = setupBuilder(webResource);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(response.getEntity(String.class)).thenReturn("{\"Version\":\"version-rrelease\",\"Name\":\"apache-atlas\"," +
"\"Description\":\"Metadata Management and Data Governance Platform over Hadoop\"}");
when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenReturn(response);
assertTrue(atlasClient.isServerReady());
}
private WebResource.Builder setupBuilder(WebResource webResource) {
WebResource adminVersionResource = mock(WebResource.class);
when(webResource.path(AtlasClient.API.VERSION.getPath())).thenReturn(adminVersionResource);
WebResource.Builder builder = mock(WebResource.Builder.class);
when(adminVersionResource.accept(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder);
when(builder.type(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder);
return builder;
}
@Test
public void shouldReturnFalseIfServerIsNotReady() throws AtlasServiceException {
WebResource webResource = mock(WebResource.class);
AtlasClient atlasClient = new AtlasClient(webResource);
WebResource.Builder builder = setupBuilder(webResource);
when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenThrow(
new ClientHandlerException());
assertFalse(atlasClient.isServerReady());
}
}
......@@ -22,6 +22,7 @@ import com.google.inject.Singleton;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.codehaus.jettison.json.JSONArray;
......@@ -42,6 +43,7 @@ public class NotificationHookConsumer implements Service {
public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
public static final String ATLAS_ENDPOINT_PROPERTY = "atlas.rest.address";
public static final int SERVER_READY_WAIT_TIME_MS = 1000;
@Inject
private NotificationInterface notificationInterface;
......@@ -77,15 +79,32 @@ public class NotificationHookConsumer implements Service {
}
}
static class Timer {
public void sleep(int interval) throws InterruptedException {
Thread.sleep(interval);
}
}
class HookConsumer implements Runnable {
private final NotificationConsumer<JSONArray> consumer;
private final AtlasClient client;
public HookConsumer(NotificationConsumer<JSONArray> consumer) {
this(atlasClient, consumer);
}
public HookConsumer(AtlasClient client, NotificationConsumer<JSONArray> consumer) {
this.client = client;
this.consumer = consumer;
}
@Override
public void run() {
if (!serverAvailable(new NotificationHookConsumer.Timer())) {
return;
}
while(consumer.hasNext()) {
JSONArray entityJson = consumer.next();
LOG.info("Processing message {}", entityJson);
......@@ -98,5 +117,28 @@ public class NotificationHookConsumer implements Service {
}
}
}
boolean serverAvailable(Timer timer) {
try {
while (!client.isServerReady()) {
try {
LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...",
SERVER_READY_WAIT_TIME_MS);
timer.sleep(SERVER_READY_WAIT_TIME_MS);
} catch (InterruptedException e) {
LOG.info("Interrupted while waiting for Atlas Server to become ready, " +
"exiting consumer thread.", e);
return false;
}
}
} catch (AtlasServiceException e) {
LOG.info(
"Handled AtlasServiceException while waiting for Atlas Server to become ready, " +
"exiting consumer thread.", e);
return false;
}
LOG.info("Atlas Server is ready, can start reading Kafka events.");
return true;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.testng.annotations.Test;
import static org.mockito.Mockito.*;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue;
public class NotificationHookConsumerTest {
@Test
public void testConsumerCanProceedIfServerIsReady() throws InterruptedException, AtlasServiceException {
AtlasClient atlasClient = mock(AtlasClient.class);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
when(atlasClient.isServerReady()).thenReturn(true);
assertTrue(hookConsumer.serverAvailable(timer));
verifyZeroInteractions(timer);
}
@Test
public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws AtlasServiceException, InterruptedException {
AtlasClient atlasClient = mock(AtlasClient.class);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
when(atlasClient.isServerReady()).thenReturn(false, false, false, true);
assertTrue(hookConsumer.serverAvailable(timer));
verify(timer, times(3)).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
}
@Test
public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException, InterruptedException {
AtlasClient atlasClient = mock(AtlasClient.class);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
doThrow(new InterruptedException()).when(timer).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
when(atlasClient.isServerReady()).thenReturn(false);
assertFalse(hookConsumer.serverAvailable(timer));
}
@Test
public void testConsumerProceedsWithFalseOnAtlasServiceException() throws AtlasServiceException {
AtlasClient atlasClient = mock(AtlasClient.class);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
when(atlasClient.isServerReady()).thenThrow(new AtlasServiceException(AtlasClient.API.VERSION, new Exception()));
assertFalse(hookConsumer.serverAvailable(timer));
}
}
......@@ -323,7 +323,7 @@
<node.version>v0.10.30</node.version>
<slf4j.version>1.7.7</slf4j.version>
<jetty.version>9.2.12.v20150709</jetty.version>
<jersey.version>1.10</jersey.version>
<jersey.version>1.19</jersey.version>
<jackson.version>1.8.3</jackson.version>
<tinkerpop.version>2.6.0</tinkerpop.version>
<titan.version>0.5.4</titan.version>
......
......@@ -14,6 +14,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags)
ALL CHANGES:
ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown (yhemanth via sumasai)
ATLAS-382 Fixed Hive Bridge doc for ATLAS cluster name (sumasai)
ATLAS-244 UI: Add Tag Tab (darshankumar89 via sumasai)
ATLAS-376 UI: Use the Schema API of the backend to populate details for Schema tab (darshankumar89 via sumasai)
......
......@@ -124,7 +124,7 @@ public class GuiceServletConfig extends GuiceServletContextListener {
}
protected void startServices() {
LOG.debug("Starting services");
LOG.info("Starting services");
Services services = injector.getInstance(Services.class);
services.start();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment