From bb1c386a7c6c2d649f09b103105712a78ec32507 Mon Sep 17 00:00:00 2001 From: Madhan Neethiraj <madhan@apache.org> Date: Sat, 18 Feb 2017 23:01:13 -0800 Subject: [PATCH] ATLAS-1569: cleared contents of RequestContextV1 at the end of the request --- client/src/main/java/org/apache/atlas/AtlasEntitiesClientV2.java | 8 ++++---- webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java | 56 +++++++++++++++++++++++++++++++++----------------------- webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java | 3 +++ 3 files changed, 40 insertions(+), 27 deletions(-) diff --git a/client/src/main/java/org/apache/atlas/AtlasEntitiesClientV2.java b/client/src/main/java/org/apache/atlas/AtlasEntitiesClientV2.java index 9ad9c16..b16bb58 100644 --- a/client/src/main/java/org/apache/atlas/AtlasEntitiesClientV2.java +++ b/client/src/main/java/org/apache/atlas/AtlasEntitiesClientV2.java @@ -43,11 +43,11 @@ public class AtlasEntitiesClientV2 extends AtlasBaseClient { private static final APIInfo GET_ENTITY_BY_GUID = new APIInfo(ENTITY_API + "guid/%s", HttpMethod.GET, Response.Status.OK); private static final APIInfo GET_ENTITY_BY_ATTRIBUTE = new APIInfo(ENTITY_API + "uniqueAttribute/type/%s", HttpMethod.GET, Response.Status.OK); - private static final APIInfo CREATE_ENTITY = new APIInfo(ENTITY_API, HttpMethod.POST, Response.Status.OK); - private static final APIInfo UPDATE_ENTITY = CREATE_ENTITY; - private static final APIInfo UPDATE_ENTITY_BY_ATTRIBUTE = new APIInfo(ENTITY_API + "uniqueAttribute/type/%s", HttpMethod.PUT, Response.Status.OK); + public static final APIInfo CREATE_ENTITY = new APIInfo(ENTITY_API, HttpMethod.POST, Response.Status.OK); + public static final APIInfo UPDATE_ENTITY = CREATE_ENTITY; + public static final APIInfo UPDATE_ENTITY_BY_ATTRIBUTE = new APIInfo(ENTITY_API + "uniqueAttribute/type/%s", HttpMethod.PUT, Response.Status.OK); private static final APIInfo DELETE_ENTITY_BY_GUID = new APIInfo(ENTITY_API + "guid/%s", HttpMethod.DELETE, Response.Status.OK); - private static final APIInfo DELETE_ENTITY_BY_ATTRIBUTE = new APIInfo(ENTITY_API + "uniqueAttribute/type/%s", HttpMethod.DELETE, Response.Status.OK); + public static final APIInfo DELETE_ENTITY_BY_ATTRIBUTE = new APIInfo(ENTITY_API + "uniqueAttribute/type/%s", HttpMethod.DELETE, Response.Status.OK); private static final APIInfo GET_ENTITIES_BY_GUIDS = new APIInfo(ENTITY_API + "bulk/", HttpMethod.GET, Response.Status.OK); private static final APIInfo CREATE_ENTITIES = new APIInfo(ENTITY_API + "bulk/", HttpMethod.POST, Response.Status.OK); diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index c16fd66..44c7995 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -22,9 +22,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Singleton; import kafka.consumer.ConsumerTimeoutException; import org.apache.atlas.ApplicationProperties; -import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.RequestContextV1; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.model.instance.AtlasEntity; @@ -55,6 +55,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.atlas.AtlasEntitiesClientV2.CREATE_ENTITY; +import static org.apache.atlas.AtlasEntitiesClientV2.DELETE_ENTITY_BY_ATTRIBUTE; +import static org.apache.atlas.AtlasEntitiesClientV2.UPDATE_ENTITY; +import static org.apache.atlas.AtlasEntitiesClientV2.UPDATE_ENTITY_BY_ATTRIBUTE; import static org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest; import static org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequest; import static org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest; @@ -242,16 +246,18 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl AtlasEntity.AtlasEntitiesWithExtInfo entities; for (int numRetries = 0; numRetries < maxRetries; numRetries++) { if (LOG.isDebugEnabled()) { - LOG.debug("Running attempt {}", numRetries); + LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries); } try { + RequestContextV1.get().setUser(messageUser); + switch (message.getType()) { case ENTITY_CREATE: - if (LOG.isDebugEnabled()) { - LOG.debug("EntityCreate via hook"); - } EntityCreateRequest createRequest = (EntityCreateRequest) message; - audit(messageUser, AtlasClient.API.CREATE_ENTITY); + + if (numRetries == 0) { // audit only on the first attempt + audit(messageUser, CREATE_ENTITY.getMethod(), CREATE_ENTITY.getPath()); + } entities = instanceConverter.toAtlasEntities(createRequest.getEntities()); @@ -259,11 +265,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl break; case ENTITY_PARTIAL_UPDATE: - if (LOG.isDebugEnabled()) { - LOG.debug("EntityPartialUpdate via hook"); - } final EntityPartialUpdateRequest partialUpdateRequest = (EntityPartialUpdateRequest) message; - audit(messageUser, AtlasClient.API.UPDATE_ENTITY_PARTIAL); + + if (numRetries == 0) { // audit only on the first attempt + audit(messageUser, UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(), + String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(), partialUpdateRequest.getTypeName())); + } Referenceable referenceable = partialUpdateRequest.getEntity(); entities = instanceConverter.toAtlasEntity(referenceable); @@ -280,11 +287,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl break; case ENTITY_DELETE: - if (LOG.isDebugEnabled()) { - LOG.debug("EntityDelete via hook"); - } final EntityDeleteRequest deleteRequest = (EntityDeleteRequest) message; - audit(messageUser, AtlasClient.API.DELETE_ENTITY); + + if (numRetries == 0) { // audit only on the first attempt + audit(messageUser, DELETE_ENTITY_BY_ATTRIBUTE.getMethod(), + String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(), deleteRequest.getTypeName())); + } try { AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName()); @@ -296,23 +304,23 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl break; case ENTITY_FULL_UPDATE: - if (LOG.isDebugEnabled()) { - LOG.debug("EntityFullUpdate via hook"); - } EntityUpdateRequest updateRequest = (EntityUpdateRequest) message; - audit(messageUser, AtlasClient.API.UPDATE_ENTITY); + + if (numRetries == 0) { // audit only on the first attempt + audit(messageUser, UPDATE_ENTITY.getMethod(), UPDATE_ENTITY.getPath()); + } entities = instanceConverter.toAtlasEntities(updateRequest.getEntities()); atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false); break; default: - throw new IllegalStateException("Unhandled exception!"); + throw new IllegalStateException("Unknown notification type: " + message.getType().name()); } break; } catch (Throwable e) { - LOG.warn("Error handling message: {}", e.getMessage()); + LOG.warn("Error handling message", e); try{ LOG.info("Sleeping for {} ms before retry", consumerRetryInterval); Thread.sleep(consumerRetryInterval); @@ -328,6 +336,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } return; } + } finally { + RequestContextV1.clear(); } } commit(); @@ -375,12 +385,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } } - private void audit(String messageUser, AtlasClient.API api) { + private void audit(String messageUser, String method, String path) { if (LOG.isDebugEnabled()) { - LOG.debug("==> audit({},{})", messageUser, api); + LOG.debug("==> audit({},{}, {})", messageUser, method, path); } - AuditFilter.audit(messageUser, THREADNAME_PREFIX, api.getMethod(), LOCALHOST, api.getPath(), LOCALHOST, + AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, path, LOCALHOST, DateTimeHelper.formatDateUTC(new Date())); } } diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java index 030788a..d804f21 100755 --- a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java +++ b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java @@ -23,6 +23,7 @@ import com.google.inject.Singleton; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; import org.apache.atlas.RequestContext; +import org.apache.atlas.RequestContextV1; import org.apache.atlas.metrics.Metrics; import org.apache.commons.configuration.Configuration; import org.apache.atlas.util.AtlasRepositoryConfiguration; @@ -73,6 +74,7 @@ public class AuditFilter implements Filter { currentThread.setName(formatName(oldName, requestId)); RequestContext requestContext = RequestContext.createContext(); requestContext.setUser(user); + RequestContextV1.get().setUser(user); recordAudit(httpRequest, requestTimeISO9601, user); filterChain.doFilter(request, response); } finally { @@ -81,6 +83,7 @@ public class AuditFilter implements Filter { currentThread.setName(oldName); recordMetrics(); RequestContext.clear(); + RequestContextV1.clear(); } } -- libgit2 0.27.1