Commit f8dd5dcb by Suma Shivaprasad

ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after…

ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown (yhmenath via sumasai)
parent 6dfbda11
......@@ -67,5 +67,11 @@
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
......@@ -19,6 +19,7 @@
package org.apache.atlas;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig;
......@@ -67,6 +68,7 @@ public class AtlasClient {
public static final String DATATYPE = "dataType";
public static final String BASE_URI = "api/atlas/";
public static final String ADMIN_VERSION = "admin/version";
public static final String TYPES = "types";
public static final String URI_ENTITY = "entities";
public static final String URI_SEARCH = "discovery/search";
......@@ -126,11 +128,29 @@ public class AtlasClient {
service = client.resource(UriBuilder.fromUri(baseUrl).build());
}
// for testing
AtlasClient(WebResource service) {
this.service = service;
}
protected Configuration getClientProperties() throws AtlasException {
return ApplicationProperties.get();
}
enum API {
public boolean isServerReady() throws AtlasServiceException {
WebResource resource = getResource(API.VERSION);
try {
callAPIWithResource(API.VERSION, resource);
return true;
} catch (ClientHandlerException che) {
return false;
}
}
public enum API {
//Admin operations
VERSION(BASE_URI + ADMIN_VERSION, HttpMethod.GET, Response.Status.OK),
//Type operations
CREATE_TYPE(BASE_URI + TYPES, HttpMethod.POST, Response.Status.CREATED),
......
......@@ -22,6 +22,7 @@ import com.google.inject.Singleton;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.codehaus.jettison.json.JSONArray;
......@@ -42,6 +43,7 @@ public class NotificationHookConsumer implements Service {
public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
public static final String ATLAS_ENDPOINT_PROPERTY = "atlas.rest.address";
public static final int SERVER_READY_WAIT_TIME_MS = 1000;
@Inject
private NotificationInterface notificationInterface;
......@@ -77,15 +79,32 @@ public class NotificationHookConsumer implements Service {
}
}
static class Timer {
public void sleep(int interval) throws InterruptedException {
Thread.sleep(interval);
}
}
class HookConsumer implements Runnable {
private final NotificationConsumer<JSONArray> consumer;
private final AtlasClient client;
public HookConsumer(NotificationConsumer<JSONArray> consumer) {
this(atlasClient, consumer);
}
public HookConsumer(AtlasClient client, NotificationConsumer<JSONArray> consumer) {
this.client = client;
this.consumer = consumer;
}
@Override
public void run() {
if (!serverAvailable(new NotificationHookConsumer.Timer())) {
return;
}
while(consumer.hasNext()) {
JSONArray entityJson = consumer.next();
LOG.info("Processing message {}", entityJson);
......@@ -98,5 +117,28 @@ public class NotificationHookConsumer implements Service {
}
}
}
boolean serverAvailable(Timer timer) {
try {
while (!client.isServerReady()) {
try {
LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...",
SERVER_READY_WAIT_TIME_MS);
timer.sleep(SERVER_READY_WAIT_TIME_MS);
} catch (InterruptedException e) {
LOG.info("Interrupted while waiting for Atlas Server to become ready, " +
"exiting consumer thread.", e);
return false;
}
}
} catch (AtlasServiceException e) {
LOG.info(
"Handled AtlasServiceException while waiting for Atlas Server to become ready, " +
"exiting consumer thread.", e);
return false;
}
LOG.info("Atlas Server is ready, can start reading Kafka events.");
return true;
}
}
}
......@@ -323,7 +323,7 @@
<node.version>v0.10.30</node.version>
<slf4j.version>1.7.7</slf4j.version>
<jetty.version>9.2.12.v20150709</jetty.version>
<jersey.version>1.10</jersey.version>
<jersey.version>1.19</jersey.version>
<jackson.version>1.8.3</jackson.version>
<tinkerpop.version>2.6.0</tinkerpop.version>
<titan.version>0.5.4</titan.version>
......
......@@ -14,6 +14,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags)
ALL CHANGES:
ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown (yhemanth via sumasai)
ATLAS-382 Fixed Hive Bridge doc for ATLAS cluster name (sumasai)
ATLAS-244 UI: Add Tag Tab (darshankumar89 via sumasai)
ATLAS-376 UI: Use the Schema API of the backend to populate details for Schema tab (darshankumar89 via sumasai)
......
......@@ -124,7 +124,7 @@ public class GuiceServletConfig extends GuiceServletContextListener {
}
protected void startServices() {
LOG.debug("Starting services");
LOG.info("Starting services");
Services services = injector.getInstance(Services.class);
services.start();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment