Commit b4a69415 by Madhan Neethiraj

ATLAS-1577: audit event generated for an entity overwrites previous event for the entity

parent 2fb3057b
...@@ -21,7 +21,7 @@ package org.apache.atlas.repository.audit; ...@@ -21,7 +21,7 @@ package org.apache.atlas.repository.audit;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.EntityAuditEvent.EntityAuditAction; import org.apache.atlas.EntityAuditEvent.EntityAuditAction;
import org.apache.atlas.RequestContext; import org.apache.atlas.RequestContextV1;
import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.IStruct;
...@@ -57,10 +57,8 @@ public class EntityAuditListener implements EntityChangeListener { ...@@ -57,10 +57,8 @@ public class EntityAuditListener implements EntityChangeListener {
@Override @Override
public void onEntitiesAdded(Collection<ITypedReferenceableInstance> entities) throws AtlasException { public void onEntitiesAdded(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
List<EntityAuditEvent> events = new ArrayList<>(); List<EntityAuditEvent> events = new ArrayList<>();
long currentTime = RequestContext.get().getRequestTime();
for (ITypedReferenceableInstance entity : entities) { for (ITypedReferenceableInstance entity : entities) {
EntityAuditEvent event = createEvent(entity, currentTime, EntityAuditAction.ENTITY_CREATE); EntityAuditEvent event = createEvent(entity, EntityAuditAction.ENTITY_CREATE);
events.add(event); events.add(event);
} }
...@@ -70,10 +68,8 @@ public class EntityAuditListener implements EntityChangeListener { ...@@ -70,10 +68,8 @@ public class EntityAuditListener implements EntityChangeListener {
@Override @Override
public void onEntitiesUpdated(Collection<ITypedReferenceableInstance> entities) throws AtlasException { public void onEntitiesUpdated(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
List<EntityAuditEvent> events = new ArrayList<>(); List<EntityAuditEvent> events = new ArrayList<>();
long currentTime = RequestContext.get().getRequestTime();
for (ITypedReferenceableInstance entity : entities) { for (ITypedReferenceableInstance entity : entities) {
EntityAuditEvent event = createEvent(entity, currentTime, EntityAuditAction.ENTITY_UPDATE); EntityAuditEvent event = createEvent(entity, EntityAuditAction.ENTITY_UPDATE);
events.add(event); events.add(event);
} }
...@@ -82,7 +78,7 @@ public class EntityAuditListener implements EntityChangeListener { ...@@ -82,7 +78,7 @@ public class EntityAuditListener implements EntityChangeListener {
@Override @Override
public void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException { public void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException {
EntityAuditEvent event = createEvent(entity, RequestContext.get().getRequestTime(), EntityAuditAction.TAG_ADD, EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_ADD,
"Added trait: " + InstanceSerialization.toJson(trait, true)); "Added trait: " + InstanceSerialization.toJson(trait, true));
auditRepository.putEvents(event); auditRepository.putEvents(event);
...@@ -90,8 +86,7 @@ public class EntityAuditListener implements EntityChangeListener { ...@@ -90,8 +86,7 @@ public class EntityAuditListener implements EntityChangeListener {
@Override @Override
public void onTraitDeleted(ITypedReferenceableInstance entity, String traitName) throws AtlasException { public void onTraitDeleted(ITypedReferenceableInstance entity, String traitName) throws AtlasException {
EntityAuditEvent event = createEvent(entity, RequestContext.get().getRequestTime(), EntityAuditAction.TAG_DELETE, EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_DELETE, "Deleted trait: " + traitName);
"Deleted trait: " + traitName);
auditRepository.putEvents(event); auditRepository.putEvents(event);
} }
...@@ -99,10 +94,8 @@ public class EntityAuditListener implements EntityChangeListener { ...@@ -99,10 +94,8 @@ public class EntityAuditListener implements EntityChangeListener {
@Override @Override
public void onEntitiesDeleted(Collection<ITypedReferenceableInstance> entities) throws AtlasException { public void onEntitiesDeleted(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
List<EntityAuditEvent> events = new ArrayList<>(); List<EntityAuditEvent> events = new ArrayList<>();
long currentTime = RequestContext.get().getRequestTime();
for (ITypedReferenceableInstance entity : entities) { for (ITypedReferenceableInstance entity : entities) {
EntityAuditEvent event = createEvent(entity, currentTime, EntityAuditAction.ENTITY_DELETE, "Deleted entity"); EntityAuditEvent event = createEvent(entity, EntityAuditAction.ENTITY_DELETE, "Deleted entity");
events.add(event); events.add(event);
} }
...@@ -113,16 +106,16 @@ public class EntityAuditListener implements EntityChangeListener { ...@@ -113,16 +106,16 @@ public class EntityAuditListener implements EntityChangeListener {
return auditRepository.listEvents(guid, null, (short) 10); return auditRepository.listEvents(guid, null, (short) 10);
} }
private EntityAuditEvent createEvent(ITypedReferenceableInstance entity, long ts, EntityAuditAction action) private EntityAuditEvent createEvent(ITypedReferenceableInstance entity, EntityAuditAction action)
throws AtlasException { throws AtlasException {
String detail = getAuditEventDetail(entity, action); String detail = getAuditEventDetail(entity, action);
return createEvent(entity, ts, action, detail); return createEvent(entity, action, detail);
} }
private EntityAuditEvent createEvent(ITypedReferenceableInstance entity, long ts, EntityAuditAction action, String details) private EntityAuditEvent createEvent(ITypedReferenceableInstance entity, EntityAuditAction action, String details)
throws AtlasException { throws AtlasException {
return new EntityAuditEvent(entity.getId()._getId(), ts, RequestContext.get().getUser(), action, details, entity); return new EntityAuditEvent(entity.getId()._getId(), RequestContextV1.get().getRequestTime(), RequestContextV1.get().getUser(), action, details, entity);
} }
private String getAuditEventDetail(ITypedReferenceableInstance entity, EntityAuditAction action) throws AtlasException { private String getAuditEventDetail(ITypedReferenceableInstance entity, EntityAuditAction action) throws AtlasException {
......
...@@ -64,6 +64,10 @@ public class RequestContext { ...@@ -64,6 +64,10 @@ public class RequestContext {
} }
} }
} }
// ensure that RequestContextV1 is also initialized for this request
RequestContextV1.get();
return CURRENT_CONTEXT.get(); return CURRENT_CONTEXT.get();
} }
...@@ -111,6 +115,8 @@ public class RequestContext { ...@@ -111,6 +115,8 @@ public class RequestContext {
public void setUser(String user) { public void setUser(String user) {
this.user = user; this.user = user;
RequestContextV1.get().setUser(user);
} }
public void recordEntityCreate(Collection<String> createdEntityIds) { public void recordEntityCreate(Collection<String> createdEntityIds) {
......
...@@ -24,6 +24,7 @@ import kafka.consumer.ConsumerTimeoutException; ...@@ -24,6 +24,7 @@ import kafka.consumer.ConsumerTimeoutException;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException; import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.RequestContextV1; import org.apache.atlas.RequestContextV1;
import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.listener.ActiveStateChangeHandler;
...@@ -249,7 +250,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -249,7 +250,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries); LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries);
} }
try { try {
RequestContextV1.get().setUser(messageUser); RequestContext requestContext = RequestContext.createContext();
requestContext.setUser(messageUser);
switch (message.getType()) { switch (message.getType()) {
case ENTITY_CREATE: case ENTITY_CREATE:
...@@ -337,6 +339,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -337,6 +339,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
return; return;
} }
} finally { } finally {
RequestContext.clear();
RequestContextV1.clear(); RequestContextV1.clear();
} }
} }
......
...@@ -74,7 +74,6 @@ public class AuditFilter implements Filter { ...@@ -74,7 +74,6 @@ public class AuditFilter implements Filter {
currentThread.setName(formatName(oldName, requestId)); currentThread.setName(formatName(oldName, requestId));
RequestContext requestContext = RequestContext.createContext(); RequestContext requestContext = RequestContext.createContext();
requestContext.setUser(user); requestContext.setUser(user);
RequestContextV1.get().setUser(user);
recordAudit(httpRequest, requestTimeISO9601, user); recordAudit(httpRequest, requestTimeISO9601, user);
filterChain.doFilter(request, response); filterChain.doFilter(request, response);
} finally { } finally {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment