Commit 52ef9e7f by Madhan Neethiraj

ATLAS-2823: updated hooks to support asynchronous notifications

parent 1179aff8
......@@ -18,13 +18,10 @@
package org.apache.atlas.falcon.hook;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.falcon.bridge.FalconBridge;
import org.apache.atlas.falcon.event.FalconEvent;
import org.apache.atlas.falcon.publisher.FalconEventPublisher;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.kafka.NotificationProvider;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
......@@ -32,17 +29,12 @@ import org.apache.falcon.FalconException;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.hadoop.util.ShutdownHookManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Falcon hook sends lineage information to the Atlas Service.
......@@ -50,27 +42,6 @@ import java.util.concurrent.TimeUnit;
public class FalconHook extends AtlasHook implements FalconEventPublisher {
private static final Logger LOG = LoggerFactory.getLogger(FalconHook.class);
public static final String CONF_PREFIX = "atlas.hook.falcon.";
private static final String MIN_THREADS = CONF_PREFIX + "minThreads";
private static final String MAX_THREADS = CONF_PREFIX + "maxThreads";
private static final String KEEP_ALIVE_TIME = CONF_PREFIX + "keepAliveTime";
public static final String QUEUE_SIZE = CONF_PREFIX + "queueSize";
public static final String CONF_SYNC = CONF_PREFIX + "synchronous";
public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
// wait time determines how long we wait before we exit the jvm on
// shutdown. Pending requests after that will not be sent.
private static final int WAIT_TIME = 3;
private static ExecutorService executor;
private static final int minThreadsDefault = 5;
private static final int maxThreadsDefault = 5;
private static final long keepAliveTimeDefault = 10;
private static final int queueSizeDefault = 10000;
private static boolean sync;
private static ConfigurationStore STORE;
private enum Operation {
......@@ -80,45 +51,11 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher {
static {
try {
// initialize the async facility to process hook calls. We don't
// want to do this inline since it adds plenty of overhead for the query.
int minThreads = atlasProperties.getInt(MIN_THREADS, minThreadsDefault);
int maxThreads = atlasProperties.getInt(MAX_THREADS, maxThreadsDefault);
long keepAliveTime = atlasProperties.getLong(KEEP_ALIVE_TIME, keepAliveTimeDefault);
int queueSize = atlasProperties.getInt(QUEUE_SIZE, queueSizeDefault);
sync = atlasProperties.getBoolean(CONF_SYNC, false);
executor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTime, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(queueSize),
new ThreadFactoryBuilder().setNameFormat("Atlas Logger %d").build());
ShutdownHookManager.get().addShutdownHook(new Thread() {
@Override
public void run() {
try {
LOG.info("==> Shutdown of Atlas Falcon Hook");
executor.shutdown();
executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS);
executor = null;
} catch (InterruptedException ie) {
LOG.info("Interrupt received in shutdown.");
} finally {
LOG.info("<== Shutdown of Atlas Falcon Hook");
}
// shutdown client
}
}, AtlasConstants.ATLAS_SHUTDOWN_HOOK_PRIORITY);
STORE = ConfigurationStore.get();
notificationInterface = NotificationProvider.get();
} catch (Exception e) {
LOG.error("Caught exception initializing the falcon hook.", e);
}
LOG.info("Created Atlas Hook for Falcon");
}
......@@ -126,30 +63,12 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher {
public void publish(final Data data) {
final FalconEvent event = data.getEvent();
try {
if (sync) {
fireAndForget(event);
} else {
executor.submit(new Runnable() {
@Override
public void run() {
try {
fireAndForget(event);
} catch (Throwable e) {
LOG.info("Atlas hook failed", e);
}
}
});
}
fireAndForget(event);
} catch (Throwable t) {
LOG.warn("Error in processing data {}", data, t);
}
}
@Override
protected String getNumberOfRetriesPropertyKey() {
return HOOK_NUM_RETRIES;
}
private void fireAndForget(FalconEvent event) throws FalconException, URISyntaxException {
LOG.info("Entered Atlas hook for Falcon hook operation {}", event.getOperation());
List<HookNotification> messages = new ArrayList<>();
......@@ -163,7 +82,7 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher {
break;
}
notifyEntities(messages);
notifyEntities(messages, null);
}
private List<Referenceable> createEntities(FalconEvent event, String user) throws FalconException, URISyntaxException {
......
......@@ -26,6 +26,8 @@ import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -43,7 +45,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class);
public static final String CONF_PREFIX = "atlas.hook.hive.";
public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
public static final String HOOK_DATABASE_NAME_CACHE_COUNT = CONF_PREFIX + "database.name.cache.count";
public static final String HOOK_TABLE_NAME_CACHE_COUNT = CONF_PREFIX + "table.name.cache.count";
public static final String CONF_CLUSTER_NAME = "atlas.cluster.name";
......@@ -72,11 +73,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
@Override
protected String getNumberOfRetriesPropertyKey() {
return HOOK_NUM_RETRIES;
}
@Override
public void run(HookContext hookContext) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HiveHook.run({})", hookContext.getOperationName());
......@@ -153,7 +149,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
if (event != null) {
super.notifyEntities(event.getNotificationMessages());
final UserGroupInformation ugi = hookContext.getUgi() == null ? Utils.getUGI() : hookContext.getUgi();
super.notifyEntities(event.getNotificationMessages(), ugi);
}
} catch (Throwable t) {
LOG.error("HiveHook.run(): failed to process operation {}", hookContext.getOperationName(), t);
......
......@@ -52,8 +52,6 @@ import java.util.Date;
public class SqoopHook extends SqoopJobDataPublisher {
private static final Logger LOG = LoggerFactory.getLogger(SqoopHook.class);
public static final String CONF_PREFIX = "atlas.hook.sqoop.";
public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
public static final String ATLAS_CLUSTER_NAME = "atlas.cluster.name";
public static final String DEFAULT_CLUSTER_NAME = "primary";
......@@ -71,8 +69,12 @@ public class SqoopHook extends SqoopJobDataPublisher {
public static final String OUTPUTS = "outputs";
public static final String ATTRIBUTE_DB = "db";
private static final AtlasHookImpl atlasHook;
static {
org.apache.hadoop.conf.Configuration.addDefaultResource("sqoop-site.xml");
atlasHook = new AtlasHookImpl();
}
@Override
......@@ -95,7 +97,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
HookNotification message = new EntityCreateRequestV2(AtlasHook.getUser(), entities);
AtlasHook.notifyEntities(Collections.singletonList(message), atlasProperties.getInt(HOOK_NUM_RETRIES, 3));
atlasHook.sendNotification(message);
} catch(Exception e) {
LOG.error("SqoopHook.publish() failed", e);
......@@ -225,4 +227,10 @@ public class SqoopHook extends SqoopJobDataPublisher {
return name.toString();
}
private static class AtlasHookImpl extends AtlasHook {
public void sendNotification(HookNotification notification) {
super.notifyEntities(Collections.singletonList(notification), null);
}
}
}
......@@ -63,17 +63,10 @@ import java.util.Date;
public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(StormAtlasHook.class);
private static final String CONF_PREFIX = "atlas.hook.storm.";
private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
public static final String ANONYMOUS_OWNER = "anonymous"; // if Storm topology does not contain the owner instance; possible if Storm is running in unsecure mode.
public static final String HBASE_NAMESPACE_DEFAULT = "default";
public static final String ATTRIBUTE_DB = "db";
@Override
protected String getNumberOfRetriesPropertyKey() {
return HOOK_NUM_RETRIES;
}
/**
* This is the client-side hook that storm fires when a topology is added.
*
......@@ -106,7 +99,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
List<HookNotification> hookNotifications = Collections.singletonList(new EntityCreateRequestV2(user, entity));
notifyEntities(hookNotifications);
notifyEntities(hookNotifications, null);
} catch (Exception e) {
throw new RuntimeException("Atlas hook is unable to process the topology.", e);
}
......
......@@ -55,7 +55,7 @@ public class AtlasHookTest {
List<HookNotification> hookNotifications = new ArrayList<>();
doThrow(new NotificationException(new Exception())).when(notificationInterface)
.send(NotificationInterface.NotificationType.HOOK, hookNotifications);
AtlasHook.notifyEntitiesInternal(hookNotifications, 0, notificationInterface, false,
AtlasHook.notifyEntitiesInternal(hookNotifications, 0, null, notificationInterface, false,
failedMessagesLogger);
// if we've reached here, the method finished OK.
}
......@@ -69,7 +69,7 @@ public class AtlasHookTest {
};
doThrow(new NotificationException(new Exception())).when(notificationInterface)
.send(NotificationInterface.NotificationType.HOOK, hookNotifications);
AtlasHook.notifyEntitiesInternal(hookNotifications, 2, notificationInterface, false,
AtlasHook.notifyEntitiesInternal(hookNotifications, 2, null, notificationInterface, false,
failedMessagesLogger);
verify(notificationInterface, times(2)).
......@@ -86,7 +86,7 @@ public class AtlasHookTest {
doThrow(new NotificationException(new Exception(), Arrays.asList("test message")))
.when(notificationInterface)
.send(NotificationInterface.NotificationType.HOOK, hookNotifications);
AtlasHook.notifyEntitiesInternal(hookNotifications, 2, notificationInterface, true,
AtlasHook.notifyEntitiesInternal(hookNotifications, 2, null, notificationInterface, true,
failedMessagesLogger);
verify(failedMessagesLogger, times(1)).log("test message");
......@@ -98,7 +98,7 @@ public class AtlasHookTest {
doThrow(new NotificationException(new Exception(), Arrays.asList("test message")))
.when(notificationInterface)
.send(NotificationInterface.NotificationType.HOOK, hookNotifications);
AtlasHook.notifyEntitiesInternal(hookNotifications, 2, notificationInterface, false,
AtlasHook.notifyEntitiesInternal(hookNotifications, 2, null, notificationInterface, false,
failedMessagesLogger);
verifyZeroInteractions(failedMessagesLogger);
......@@ -114,7 +114,7 @@ public class AtlasHookTest {
doThrow(new NotificationException(new Exception(), Arrays.asList("test message1", "test message2")))
.when(notificationInterface)
.send(NotificationInterface.NotificationType.HOOK, hookNotifications);
AtlasHook.notifyEntitiesInternal(hookNotifications, 2, notificationInterface, true,
AtlasHook.notifyEntitiesInternal(hookNotifications, 2, null, notificationInterface, true,
failedMessagesLogger);
verify(failedMessagesLogger, times(1)).log("test message1");
......@@ -126,7 +126,7 @@ public class AtlasHookTest {
List<HookNotification> hookNotifications = new ArrayList<>();
doThrow(new RuntimeException("test message")).when(notificationInterface)
.send(NotificationInterface.NotificationType.HOOK, hookNotifications);
AtlasHook.notifyEntitiesInternal(hookNotifications, 2, notificationInterface, true,
AtlasHook.notifyEntitiesInternal(hookNotifications, 2, null, notificationInterface, true,
failedMessagesLogger);
verifyZeroInteractions(failedMessagesLogger);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment