Commit b866e48f by sidmishra Committed by Sarath Subramanian

ATLAS-3727: Added REST API to get Admin Audit Details for an Admin Audit Guid…

ATLAS-3727: Added REST API to get Admin Audit Details for an Admin Audit Guid and Changed get entity audit Rest to support operation based filter Signed-off-by: 's avatarSarath Subramanian <sarath@apache.org>
parent c1779231
......@@ -21,6 +21,7 @@ package org.apache.atlas.model.audit;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.AtlasBaseModelObject;
import java.io.Serializable;
......@@ -36,10 +37,33 @@ public class AtlasAuditEntry extends AtlasBaseModelObject implements Serializabl
private static final long serialVersionUID = 1L;
public enum AuditOperation {
PURGE,
EXPORT,
IMPORT,
IMPORT_DELETE_REPL
PURGE("PURGE"),
EXPORT("EXPORT"),
IMPORT("IMPORT"),
IMPORT_DELETE_REPL("IMPORT_DELETE_REPL");
private final String type;
AuditOperation(String type) {
this.type = type;
}
public EntityAuditEventV2.EntityAuditActionV2 toEntityAuditActionV2() throws AtlasBaseException {
switch (this.type) {
case "PURGE":
return EntityAuditEventV2.EntityAuditActionV2.ENTITY_PURGE;
default:
try {
return EntityAuditEventV2.EntityAuditActionV2.fromString(this.type);
} catch (IllegalArgumentException e) {
throw new AtlasBaseException("Invalid operation for Entity Audit Event V2: " + this.type);
}
}
}
public String getType() {
return type;
}
}
private String userName;
......
......@@ -22,7 +22,9 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.lang.StringUtils;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
......@@ -242,4 +244,26 @@ public class EntityAuditEventV2 implements Serializable {
return sb.toString();
}
@JsonIgnore
public AtlasEntityHeader getEntityHeader() {
AtlasEntityHeader ret = null;
String jsonPartFromDetails = getJsonPartFromDetails();
if(StringUtils.isNotEmpty(jsonPartFromDetails)) {
ret = AtlasType.fromJson(jsonPartFromDetails, AtlasEntityHeader.class);
}
return ret;
}
private String getJsonPartFromDetails() {
String ret = null;
if(StringUtils.isNotEmpty(details)) {
int bracketStartPosition = details.indexOf("{");
if(bracketStartPosition != -1) {
ret = details.substring(bracketStartPosition);
}
}
return ret;
}
}
\ No newline at end of file
......@@ -86,7 +86,7 @@ public abstract class AbstractStorageBasedAuditRepository implements Service, En
@Override
public List<Object> listEvents(String entityId, String startKey, short maxResults) throws AtlasBaseException {
List ret = listEventsV2(entityId, startKey, maxResults);
List ret = listEventsV2(entityId, null, startKey, maxResults);
try {
if (CollectionUtils.isEmpty(ret)) {
......
......@@ -28,6 +28,7 @@ import org.apache.atlas.model.audit.AtlasAuditEntry.AuditOperation;
import org.apache.atlas.model.audit.AuditSearchParameters;
import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.repository.ogm.AtlasAuditEntryDTO;
import org.apache.atlas.repository.ogm.DataAccess;
......@@ -104,14 +105,25 @@ public class AtlasAuditService {
searchParameters.setAttributes(getAuditEntityAttributes());
AtlasSearchResult result = discoveryService.searchWithParameters(searchParameters);
return toAtlasAuditEntry(result);
return toAtlasAuditEntries(result);
}
public AtlasAuditEntry toAtlasAuditEntry(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
AtlasAuditEntry ret = null;
if(entityWithExtInfo != null && entityWithExtInfo.getEntity() != null) {
ret = AtlasAuditEntryDTO.from(entityWithExtInfo.getEntity().getGuid(),
entityWithExtInfo.getEntity().getAttributes());
}
return ret;
}
private Set<String> getAuditEntityAttributes() {
return AtlasAuditEntryDTO.getAttributes();
}
private List<AtlasAuditEntry> toAtlasAuditEntry(AtlasSearchResult result) {
private List<AtlasAuditEntry> toAtlasAuditEntries(AtlasSearchResult result) {
List<AtlasAuditEntry> ret = new ArrayList<>();
if(CollectionUtils.isNotEmpty(result.getEntities())) {
......
......@@ -162,7 +162,7 @@ public class CassandraBasedAuditRepository extends AbstractStorageBasedAuditRepo
}
@Override
public List<EntityAuditEventV2> listEventsV2(String entityId, String startKey, short maxResults) throws AtlasBaseException {
public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String startKey, short maxResults) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Listing events for entity id {}, starting timestamp {}, #records {}", entityId, startKey, maxResults);
}
......
......@@ -144,8 +144,7 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasEntity entity : entities) {
EntityAuditEventV2 event = createEvent(entity, ENTITY_PURGE, "Purged entity");
EntityAuditEventV2 event = createEvent(entity, ENTITY_PURGE);
events.add(event);
}
......
......@@ -71,12 +71,13 @@ public interface EntityAuditRepository {
/**
* List events for the given entity id in decreasing order of timestamp, from the given timestamp. Returns n results
* @param entityId entity id
* @param auditAction operation to be used for search at HBase column
* @param startKey key for the first event to be returned, used for pagination
* @param n number of events to be returned
* @param maxResultCount Max numbers of events to be returned
* @return list of events
* @throws AtlasBaseException
*/
List<EntityAuditEventV2> listEventsV2(String entityId, String startKey, short n) throws AtlasBaseException;
List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String startKey, short maxResultCount) throws AtlasBaseException;
/***
* List events for given time range where classifications have been added, deleted or updated.
......
......@@ -42,8 +42,10 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.compress.Compression;
......@@ -105,6 +107,7 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
private static final String FIELD_SEPARATOR = ":";
private static final long ATLAS_HBASE_KEYVALUE_DEFAULT_SIZE = 1024 * 1024;
private static Configuration APPLICATION_PROPERTIES = null;
private static final int DEFAULT_CACHING = 200;
private static boolean persistEntityDefinition;
......@@ -223,10 +226,9 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
}
}
@Override
public List<EntityAuditEventV2> listEventsV2(String entityId, String startKey, short n) throws AtlasBaseException {
public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditActionV2 auditAction, String startKey, short maxResultCount) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Listing events for entity id {}, starting timestamp {}, #records {}", entityId, startKey, n);
LOG.debug("Listing events for entity id {}, operation {}, starting key{}, maximum result count {}", entityId, auditAction.toString(), startKey, maxResultCount);
}
Table table = null;
......@@ -238,14 +240,30 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
/**
* Scan Details:
* In hbase, the events are stored in increasing order of timestamp. So, doing reverse scan to get the latest event first
* Page filter is set to limit the number of results returned.
* Page filter is set to limit the number of results returned if needed
* Stop row is set to the entity id to avoid going past the current entity while scanning
* small is set to true to optimise RPC calls as the scanner is created per request
* SingleColumnValueFilter is been used to match the operation at COLUMN_FAMILY->COLUMN_ACTION
* Small is set to true to optimise RPC calls as the scanner is created per request
* setCaching(DEFAULT_CACHING) will increase the payload size to DEFAULT_CACHING rows per remote call and
* both types of next() take these settings into account.
*/
Scan scan = new Scan().setReversed(true).setFilter(new PageFilter(n))
.setStopRow(Bytes.toBytes(entityId))
.setCaching(n)
.setSmall(true);
Scan scan = new Scan().setReversed(true)
.setCaching(DEFAULT_CACHING)
.setSmall(true);
if(maxResultCount > -1) {
scan.setFilter(new PageFilter(maxResultCount));
}
if (auditAction != null) {
Filter filterAction = new SingleColumnValueFilter(COLUMN_FAMILY,
COLUMN_ACTION, CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(auditAction.toString())));
scan.setFilter(filterAction);
}
if(StringUtils.isNotBlank(entityId)) {
scan.setStopRow(Bytes.toBytes(entityId));
}
if (StringUtils.isEmpty(startKey)) {
//Set start row to entity id + max long value
......@@ -260,13 +278,14 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
Result result;
//PageFilter doesn't ensure n results are returned. The filter is per region server.
//So, adding extra check on n here
while ((result = scanner.next()) != null && events.size() < n) {
//PageFilter doesn't ensure maxResultCount results are returned. The filter is per region server.
//So, adding extra check on maxResultCount
while ((result = scanner.next()) != null && (maxResultCount == -1 || events.size() < maxResultCount)) {
EntityAuditEventV2 event = fromKeyV2(result.getRow());
//In case the user sets random start key, guarding against random events
if (!event.getEntityId().equals(entityId)) {
//In case the user sets random start key, guarding against random events if entityId is provided
if (StringUtils.isNotBlank(entityId) && !event.getEntityId().equals(entityId)) {
continue;
}
......@@ -286,7 +305,8 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
}
if (LOG.isDebugEnabled()) {
LOG.debug("Got events for entity id {}, starting timestamp {}, #records {}", entityId, startKey, events.size());
LOG.debug("Got events for entity id {}, operation {}, starting key{}, maximum result count {}, #records returned {}",
entityId, auditAction.toString(), startKey, maxResultCount, events.size());
}
return events;
......@@ -304,7 +324,7 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
@Override
public List<Object> listEvents(String entityId, String startKey, short maxResults) throws AtlasBaseException {
List ret = listEventsV2(entityId, startKey, maxResults);
List ret = listEventsV2(entityId, null, startKey, maxResults);
try {
if (CollectionUtils.isEmpty(ret)) {
......
......@@ -102,7 +102,7 @@ public class InMemoryEntityAuditRepository implements EntityAuditRepository {
}
@Override
public List<EntityAuditEventV2> listEventsV2(String entityId, String startKey, short maxResults) {
public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String startKey, short maxResults) {
List<EntityAuditEventV2> events = new ArrayList<>();
String myStartKey = startKey;
......@@ -137,7 +137,7 @@ public class InMemoryEntityAuditRepository implements EntityAuditRepository {
@Override
public List<Object> listEvents(String entityId, String startKey, short maxResults) {
List events = listEventsV2(entityId, startKey, maxResults);
List events = listEventsV2(entityId, null, startKey, maxResults);
if (CollectionUtils.isEmpty(events)) {
events = listEventsV1(entityId, startKey, maxResults);
......
......@@ -63,7 +63,7 @@ public class NoopEntityAuditRepository implements EntityAuditRepository {
}
@Override
public List<EntityAuditEventV2> listEventsV2(String entityId, String startKey, short n) {
public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String startKey, short maxResultCount) {
return Collections.emptyList();
}
......
......@@ -114,7 +114,7 @@ public class AuditRepositoryTestBase {
eventRepository.putEventsV2(event);
List<EntityAuditEventV2> events = eventRepository.listEventsV2(event.getEntityId(), null, (short) 10);
List<EntityAuditEventV2> events = eventRepository.listEventsV2(event.getEntityId(), null, null, (short) 10);
assertEquals(events.size(), 1);
assertEventV2Equals(events.get(0), event);
......@@ -140,14 +140,14 @@ public class AuditRepositoryTestBase {
}
//Use ts for which there is no event - ts + 2
List<EntityAuditEventV2> events = eventRepository.listEventsV2(id2, null, (short) 3);
List<EntityAuditEventV2> events = eventRepository.listEventsV2(id2, null, null, (short) 3);
assertEquals(events.size(), 3);
assertEventV2Equals(events.get(0), expectedEvents.get(0));
assertEventV2Equals(events.get(1), expectedEvents.get(1));
assertEventV2Equals(events.get(2), expectedEvents.get(2));
//Use last event's timestamp for next list(). Should give only 1 event and shouldn't include events from other id
events = eventRepository.listEventsV2(id2, events.get(2).getEventKey(), (short) 3);
events = eventRepository.listEventsV2(id2, null, events.get(2).getEventKey(), (short) 3);
assertEquals(events.size(), 1);
assertEventV2Equals(events.get(0), expectedEvents.get(2));
}
......
......@@ -32,6 +32,8 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.audit.AtlasAuditEntry;
import org.apache.atlas.model.audit.AtlasAuditEntry.AuditOperation;
import org.apache.atlas.model.audit.AuditSearchParameters;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.impexp.AtlasImportRequest;
......@@ -46,6 +48,7 @@ import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.metrics.AtlasMetrics;
import org.apache.atlas.model.patches.AtlasPatch.AtlasPatches;
import org.apache.atlas.repository.audit.AtlasAuditService;
import org.apache.atlas.repository.audit.EntityAuditRepository;
import org.apache.atlas.repository.impexp.AtlasServerService;
import org.apache.atlas.repository.impexp.ExportImportAuditService;
import org.apache.atlas.repository.impexp.ExportService;
......@@ -94,6 +97,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
......@@ -151,6 +155,7 @@ public class AdminResource {
private final AtlasPatchManager patchManager;
private final AtlasAuditService auditService;
private final String defaultUIVersion;
private final EntityAuditRepository auditRepository;
static {
try {
......@@ -166,7 +171,7 @@ public class AdminResource {
MigrationProgressService migrationProgressService,
AtlasServerService serverService,
ExportImportAuditService exportImportAuditService, AtlasEntityStore entityStore,
AtlasPatchManager patchManager, AtlasAuditService auditService) {
AtlasPatchManager patchManager, AtlasAuditService auditService, EntityAuditRepository auditRepository) {
this.serviceState = serviceState;
this.metricsService = metricsService;
this.exportService = exportService;
......@@ -180,6 +185,8 @@ public class AdminResource {
this.importExportOperationLock = new ReentrantLock();
this.patchManager = patchManager;
this.auditService = auditService;
this.auditRepository = auditRepository;
if (atlasProperties != null) {
defaultUIVersion = atlasProperties.getString(DEFAULT_UI_VERSION, UI_VERSION_V2);
} else {
......@@ -588,7 +595,7 @@ public class AdminResource {
try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "getAtlasAudit(" + auditSearchParameters + ")");
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "AdminResource.getAtlasAudits(" + auditSearchParameters + ")");
}
return auditService.get(auditSearchParameters);
......@@ -597,6 +604,49 @@ public class AdminResource {
}
}
@GET
@Path("/audit/{auditGuid}/details")
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE)
public List<AtlasEntityHeader> getAuditDetails(@PathParam("auditGuid") String auditGuid,
@QueryParam("limit") @DefaultValue("10") int limit,
@QueryParam("offset") @DefaultValue("0") int offset) throws AtlasBaseException {
AtlasPerfTracer perf = null;
try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "AdminResource.getAuditDetails(" + auditGuid + ", " + limit + ", " + offset + ")");
}
List<AtlasEntityHeader> ret = new ArrayList<>();
AtlasAuditEntry auditEntry = auditService.toAtlasAuditEntry(entityStore.getById(auditGuid, false, true));
if(auditEntry != null && StringUtils.isNotEmpty(auditEntry.getResult())) {
String[] listOfResultGuid = auditEntry.getResult().split(",");
EntityAuditActionV2 auditAction = auditEntry.getOperation().toEntityAuditActionV2();
if(offset <= listOfResultGuid.length) {
for(int index=offset; index < listOfResultGuid.length && index < (offset + limit); index++) {
List<EntityAuditEventV2> events = auditRepository.listEventsV2(listOfResultGuid[index], auditAction, null, (short)1);
for (EntityAuditEventV2 event : events) {
AtlasEntityHeader entityHeader = event.getEntityHeader();
if(entityHeader != null) {
ret.add(entityHeader);
}
}
}
}
}
return ret;
} finally {
AtlasPerfTracer.log(perf);
}
}
@GET
@Path("activeSearches")
@Produces(Servlets.JSON_MEDIA_TYPE)
......
......@@ -25,6 +25,7 @@ import org.apache.atlas.bulkimport.BulkImportResponse;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
......@@ -796,6 +797,7 @@ public class EntityREST {
@GET
@Path("{guid}/audit")
public List<EntityAuditEventV2> getAuditEvents(@PathParam("guid") String guid, @QueryParam("startKey") String startKey,
@QueryParam("auditAction") EntityAuditActionV2 auditAction,
@QueryParam("count") @DefaultValue("100") short count) throws AtlasBaseException {
AtlasPerfTracer perf = null;
......@@ -804,16 +806,21 @@ public class EntityREST {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityREST.getAuditEvents(" + guid + ", " + startKey + ", " + count + ")");
}
List events = auditRepository.listEvents(guid, startKey, count);
List<EntityAuditEventV2> ret = new ArrayList<>();
for (Object event : events) {
if (event instanceof EntityAuditEventV2) {
ret.add((EntityAuditEventV2) event);
} else if (event instanceof EntityAuditEvent) {
ret.add(instanceConverter.toV2AuditEvent((EntityAuditEvent) event));
} else {
LOG.warn("unknown entity-audit event type {}. Ignored", event != null ? event.getClass().getCanonicalName() : "null");
List<EntityAuditEventV2> ret = new ArrayList<>();
if(auditAction != null) {
ret = auditRepository.listEventsV2(guid, auditAction, startKey, count);
} else {
List events = auditRepository.listEvents(guid, startKey, count);
for (Object event : events) {
if (event instanceof EntityAuditEventV2) {
ret.add((EntityAuditEventV2) event);
} else if (event instanceof EntityAuditEvent) {
ret.add(instanceConverter.toV2AuditEvent((EntityAuditEvent) event));
} else {
LOG.warn("unknown entity-audit event type {}. Ignored", event != null ? event.getClass().getCanonicalName() : "null");
}
}
}
......
......@@ -51,7 +51,7 @@ public class AdminResourceTest {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null);
Response response = adminResource.getStatus();
assertEquals(response.getStatus(), HttpServletResponse.SC_OK);
JsonNode entity = AtlasJson.parseToV1JsonNode((String) response.getEntity());
......@@ -62,7 +62,7 @@ public class AdminResourceTest {
public void testResourceGetsValueFromServiceState() throws IOException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null);
Response response = adminResource.getStatus();
verify(serviceState).getState();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment