Commit 8044ca48 by Madhan Neethiraj Committed by Suma Shivaprasad

ATLAS-1162: shutdown hooks to register with ShutdownHookManager, instead of System.Runtime

parent 96f2306f
...@@ -21,6 +21,7 @@ package org.apache.atlas.falcon.hook; ...@@ -21,6 +21,7 @@ package org.apache.atlas.falcon.hook;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Guice; import com.google.inject.Guice;
import com.google.inject.Injector; import com.google.inject.Injector;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.falcon.bridge.FalconBridge; import org.apache.atlas.falcon.bridge.FalconBridge;
import org.apache.atlas.falcon.event.FalconEvent; import org.apache.atlas.falcon.event.FalconEvent;
import org.apache.atlas.falcon.publisher.FalconEventPublisher; import org.apache.atlas.falcon.publisher.FalconEventPublisher;
...@@ -32,6 +33,7 @@ import org.apache.atlas.typesystem.Referenceable; ...@@ -32,6 +33,7 @@ import org.apache.atlas.typesystem.Referenceable;
import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.entity.v0.process.Process;
import org.apache.hadoop.util.ShutdownHookManager;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -90,19 +92,23 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher { ...@@ -90,19 +92,23 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher {
new LinkedBlockingQueue<Runnable>(queueSize), new LinkedBlockingQueue<Runnable>(queueSize),
new ThreadFactoryBuilder().setNameFormat("Atlas Logger %d").build()); new ThreadFactoryBuilder().setNameFormat("Atlas Logger %d").build());
Runtime.getRuntime().addShutdownHook(new Thread() { ShutdownHookManager.get().addShutdownHook(new Thread() {
@Override @Override
public void run() { public void run() {
try { try {
LOG.info("==> Shutdown of Atlas Falcon Hook");
executor.shutdown(); executor.shutdown();
executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS); executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS);
executor = null; executor = null;
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
LOG.info("Interrupt received in shutdown."); LOG.info("Interrupt received in shutdown.");
} finally {
LOG.info("<== Shutdown of Atlas Falcon Hook");
} }
// shutdown client // shutdown client
} }
}); }, AtlasConstants.ATLAS_SHUTDOWN_HOOK_PRIORITY);
STORE = ConfigurationStore.get(); STORE = ConfigurationStore.get();
......
...@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition; ...@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ShutdownHookManager;
import org.json.JSONObject; import org.json.JSONObject;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -122,19 +123,23 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -122,19 +123,23 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
new LinkedBlockingQueue<Runnable>(queueSize), new LinkedBlockingQueue<Runnable>(queueSize),
new ThreadFactoryBuilder().setNameFormat("Atlas Logger %d").build()); new ThreadFactoryBuilder().setNameFormat("Atlas Logger %d").build());
Runtime.getRuntime().addShutdownHook(new Thread() { ShutdownHookManager.get().addShutdownHook(new Thread() {
@Override @Override
public void run() { public void run() {
try { try {
LOG.info("==> Shutdown of Atlas Hive Hook");
executor.shutdown(); executor.shutdown();
executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS); executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS);
executor = null; executor = null;
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
LOG.info("Interrupt received in shutdown."); LOG.info("Interrupt received in shutdown.");
} finally {
LOG.info("<== Shutdown of Atlas Hive Hook");
} }
// shutdown client // shutdown client
} }
}); }, AtlasConstants.ATLAS_SHUTDOWN_HOOK_PRIORITY);
} }
setupOperationMap(); setupOperationMap();
......
...@@ -32,5 +32,6 @@ public final class AtlasConstants { ...@@ -32,5 +32,6 @@ public final class AtlasConstants {
public static final String DEFAULT_APP_PORT_STR = "21000"; public static final String DEFAULT_APP_PORT_STR = "21000";
public static final String ATLAS_REST_ADDRESS_KEY = "atlas.rest.address"; public static final String ATLAS_REST_ADDRESS_KEY = "atlas.rest.address";
public static final String DEFAULT_ATLAS_REST_ADDRESS = "http://localhost:21000"; public static final String DEFAULT_ATLAS_REST_ADDRESS = "http://localhost:21000";
public static final int ATLAS_SHUTDOWN_HOOK_PRIORITY = 30;
} }
...@@ -131,9 +131,9 @@ public abstract class AtlasHook { ...@@ -131,9 +131,9 @@ public abstract class AtlasHook {
} catch (Exception e) { } catch (Exception e) {
numRetries++; numRetries++;
if (numRetries < maxRetries) { if (numRetries < maxRetries) {
LOG.error("Notification send retry failed"); LOG.error("Failed to send notification - attempt #" + numRetries + "; error=" + e.getMessage());
try { try {
LOG.info("Sleeping for {} ms before retry", notificationRetryInterval); LOG.debug("Sleeping for {} ms before retry", notificationRetryInterval);
Thread.sleep(notificationRetryInterval); Thread.sleep(notificationRetryInterval);
} catch (InterruptedException ie){ } catch (InterruptedException ie){
LOG.error("Notification hook thread sleep interrupted"); LOG.error("Notification hook thread sleep interrupted");
......
...@@ -10,6 +10,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements for al ...@@ -10,6 +10,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements for al
ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai) ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai)
ALL CHANGES: ALL CHANGES:
ATLAS-1162 Register shutdown hooks with Hadoop's ShutdownHookManager, instead of directly with Java Runtime (mneethiraj via sumasai)
ATLAS-1098 Atlas allows creation of tag with name "isa" which causes exceptions during search (apoorvnaik via kevalbhatt) ATLAS-1098 Atlas allows creation of tag with name "isa" which causes exceptions during search (apoorvnaik via kevalbhatt)
ATLAS-1160 Update Atlas hive hook to read configuration from atlas-application.properties instead of hive-site.xml (mneethiraj via kevalbhatt) ATLAS-1160 Update Atlas hive hook to read configuration from atlas-application.properties instead of hive-site.xml (mneethiraj via kevalbhatt)
ATLAS-1154 Errors in Eclipse with web.xml (davidrad via dkantor) ATLAS-1154 Errors in Eclipse with web.xml (davidrad via dkantor)
......
...@@ -30,6 +30,7 @@ import org.apache.commons.cli.ParseException; ...@@ -30,6 +30,7 @@ import org.apache.commons.cli.ParseException;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -52,16 +53,20 @@ public final class Atlas { ...@@ -52,16 +53,20 @@ public final class Atlas {
private static EmbeddedServer server; private static EmbeddedServer server;
static { static {
Runtime.getRuntime().addShutdownHook(new Thread() { ShutdownHookManager.get().addShutdownHook(new Thread() {
@Override @Override
public void run() { public void run() {
try { try {
LOG.info("==> Shutdown of Atlas");
shutdown(); shutdown();
} catch (Exception e) { } catch (Exception e) {
LOG.debug("Failed to shutdown", e); LOG.error("Failed to shutdown", e);
} finally {
LOG.info("<== Shutdown of Atlas");
} }
} }
}); }, AtlasConstants.ATLAS_SHUTDOWN_HOOK_PRIORITY);
} }
private static void shutdown() { private static void shutdown() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment