Commit bb1c386a by Madhan Neethiraj

ATLAS-1569: cleared contents of RequestContextV1 at the end of the request

parent 343d0b1f
......@@ -43,11 +43,11 @@ public class AtlasEntitiesClientV2 extends AtlasBaseClient {
private static final APIInfo GET_ENTITY_BY_GUID = new APIInfo(ENTITY_API + "guid/%s", HttpMethod.GET, Response.Status.OK);
private static final APIInfo GET_ENTITY_BY_ATTRIBUTE = new APIInfo(ENTITY_API + "uniqueAttribute/type/%s", HttpMethod.GET, Response.Status.OK);
private static final APIInfo CREATE_ENTITY = new APIInfo(ENTITY_API, HttpMethod.POST, Response.Status.OK);
private static final APIInfo UPDATE_ENTITY = CREATE_ENTITY;
private static final APIInfo UPDATE_ENTITY_BY_ATTRIBUTE = new APIInfo(ENTITY_API + "uniqueAttribute/type/%s", HttpMethod.PUT, Response.Status.OK);
public static final APIInfo CREATE_ENTITY = new APIInfo(ENTITY_API, HttpMethod.POST, Response.Status.OK);
public static final APIInfo UPDATE_ENTITY = CREATE_ENTITY;
public static final APIInfo UPDATE_ENTITY_BY_ATTRIBUTE = new APIInfo(ENTITY_API + "uniqueAttribute/type/%s", HttpMethod.PUT, Response.Status.OK);
private static final APIInfo DELETE_ENTITY_BY_GUID = new APIInfo(ENTITY_API + "guid/%s", HttpMethod.DELETE, Response.Status.OK);
private static final APIInfo DELETE_ENTITY_BY_ATTRIBUTE = new APIInfo(ENTITY_API + "uniqueAttribute/type/%s", HttpMethod.DELETE, Response.Status.OK);
public static final APIInfo DELETE_ENTITY_BY_ATTRIBUTE = new APIInfo(ENTITY_API + "uniqueAttribute/type/%s", HttpMethod.DELETE, Response.Status.OK);
private static final APIInfo GET_ENTITIES_BY_GUIDS = new APIInfo(ENTITY_API + "bulk/", HttpMethod.GET, Response.Status.OK);
private static final APIInfo CREATE_ENTITIES = new APIInfo(ENTITY_API + "bulk/", HttpMethod.POST, Response.Status.OK);
......
......@@ -22,9 +22,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Singleton;
import kafka.consumer.ConsumerTimeoutException;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.instance.AtlasEntity;
......@@ -55,6 +55,10 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.atlas.AtlasEntitiesClientV2.CREATE_ENTITY;
import static org.apache.atlas.AtlasEntitiesClientV2.DELETE_ENTITY_BY_ATTRIBUTE;
import static org.apache.atlas.AtlasEntitiesClientV2.UPDATE_ENTITY;
import static org.apache.atlas.AtlasEntitiesClientV2.UPDATE_ENTITY_BY_ATTRIBUTE;
import static org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest;
import static org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequest;
import static org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest;
......@@ -242,16 +246,18 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasEntity.AtlasEntitiesWithExtInfo entities;
for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
if (LOG.isDebugEnabled()) {
LOG.debug("Running attempt {}", numRetries);
LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries);
}
try {
RequestContextV1.get().setUser(messageUser);
switch (message.getType()) {
case ENTITY_CREATE:
if (LOG.isDebugEnabled()) {
LOG.debug("EntityCreate via hook");
}
EntityCreateRequest createRequest = (EntityCreateRequest) message;
audit(messageUser, AtlasClient.API.CREATE_ENTITY);
if (numRetries == 0) { // audit only on the first attempt
audit(messageUser, CREATE_ENTITY.getMethod(), CREATE_ENTITY.getPath());
}
entities = instanceConverter.toAtlasEntities(createRequest.getEntities());
......@@ -259,11 +265,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
break;
case ENTITY_PARTIAL_UPDATE:
if (LOG.isDebugEnabled()) {
LOG.debug("EntityPartialUpdate via hook");
}
final EntityPartialUpdateRequest partialUpdateRequest = (EntityPartialUpdateRequest) message;
audit(messageUser, AtlasClient.API.UPDATE_ENTITY_PARTIAL);
if (numRetries == 0) { // audit only on the first attempt
audit(messageUser, UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(),
String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(), partialUpdateRequest.getTypeName()));
}
Referenceable referenceable = partialUpdateRequest.getEntity();
entities = instanceConverter.toAtlasEntity(referenceable);
......@@ -280,11 +287,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
break;
case ENTITY_DELETE:
if (LOG.isDebugEnabled()) {
LOG.debug("EntityDelete via hook");
}
final EntityDeleteRequest deleteRequest = (EntityDeleteRequest) message;
audit(messageUser, AtlasClient.API.DELETE_ENTITY);
if (numRetries == 0) { // audit only on the first attempt
audit(messageUser, DELETE_ENTITY_BY_ATTRIBUTE.getMethod(),
String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(), deleteRequest.getTypeName()));
}
try {
AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName());
......@@ -296,23 +304,23 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
break;
case ENTITY_FULL_UPDATE:
if (LOG.isDebugEnabled()) {
LOG.debug("EntityFullUpdate via hook");
}
EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
audit(messageUser, AtlasClient.API.UPDATE_ENTITY);
if (numRetries == 0) { // audit only on the first attempt
audit(messageUser, UPDATE_ENTITY.getMethod(), UPDATE_ENTITY.getPath());
}
entities = instanceConverter.toAtlasEntities(updateRequest.getEntities());
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
break;
default:
throw new IllegalStateException("Unhandled exception!");
throw new IllegalStateException("Unknown notification type: " + message.getType().name());
}
break;
} catch (Throwable e) {
LOG.warn("Error handling message: {}", e.getMessage());
LOG.warn("Error handling message", e);
try{
LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
Thread.sleep(consumerRetryInterval);
......@@ -328,6 +336,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
return;
}
} finally {
RequestContextV1.clear();
}
}
commit();
......@@ -375,12 +385,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
}
private void audit(String messageUser, AtlasClient.API api) {
private void audit(String messageUser, String method, String path) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> audit({},{})", messageUser, api);
LOG.debug("==> audit({},{}, {})", messageUser, method, path);
}
AuditFilter.audit(messageUser, THREADNAME_PREFIX, api.getMethod(), LOCALHOST, api.getPath(), LOCALHOST,
AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, path, LOCALHOST,
DateTimeHelper.formatDateUTC(new Date()));
}
}
......@@ -23,6 +23,7 @@ import com.google.inject.Singleton;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.metrics.Metrics;
import org.apache.commons.configuration.Configuration;
import org.apache.atlas.util.AtlasRepositoryConfiguration;
......@@ -73,6 +74,7 @@ public class AuditFilter implements Filter {
currentThread.setName(formatName(oldName, requestId));
RequestContext requestContext = RequestContext.createContext();
requestContext.setUser(user);
RequestContextV1.get().setUser(user);
recordAudit(httpRequest, requestTimeISO9601, user);
filterChain.doFilter(request, response);
} finally {
......@@ -81,6 +83,7 @@ public class AuditFilter implements Filter {
currentThread.setName(oldName);
recordMetrics();
RequestContext.clear();
RequestContextV1.clear();
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment