Commit 09118ca7 by Sidharth Committed by Sarath Subramanian

ATLAS-3477: Add entity purge API in Admin Resource

parent 714423c9
......@@ -36,7 +36,8 @@ public enum AtlasPrivilege {
RELATIONSHIP_ADD("add-relationship"),
RELATIONSHIP_UPDATE("update-relationship"),
RELATIONSHIP_REMOVE("remove-relationship");
RELATIONSHIP_REMOVE("remove-relationship"),
ADMIN_PURGE("admin-purge");
private final String type;
......
......@@ -49,6 +49,7 @@ import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class AtlasClientV2 extends AtlasBaseClient {
// Type APIs
......@@ -63,6 +64,11 @@ public class AtlasClientV2 extends AtlasBaseClient {
private static final String GET_BY_NAME_TEMPLATE = TYPES_API + "%s/name/%s";
private static final String GET_BY_GUID_TEMPLATE = TYPES_API + "%s/guid/%s";
private static final String ENTITY_BULK_API = ENTITY_API + "bulk/";
//Admin Entity Purge
private static final String ADMIN_API = BASE_URI + "admin/";
private static final String ENTITY_PURGE_API = ADMIN_API + "purge/";
// Lineage APIs
private static final String LINEAGE_URI = BASE_URI + "v2/lineage/";
......@@ -362,6 +368,10 @@ public class AtlasClientV2 extends AtlasBaseClient {
return callAPI(API_V2.DELETE_ENTITIES_BY_GUIDS, EntityMutationResponse.class, "guid", guids);
}
public EntityMutationResponse purgeEntitiesByGuids(Set<String> guids) throws AtlasServiceException {
return callAPI(API_V2.PURGE_ENTITIES_BY_GUIDS, EntityMutationResponse.class, guids);
}
public AtlasClassifications getClassifications(String guid) throws AtlasServiceException {
return callAPI(formatPathParameters(API_V2.GET_CLASSIFICATIONS, guid), AtlasClassifications.class, null);
}
......@@ -555,6 +565,7 @@ public class AtlasClientV2 extends AtlasBaseClient {
public static final API_V2 CREATE_ENTITIES = new API_V2(ENTITY_BULK_API, HttpMethod.POST, Response.Status.OK);
public static final API_V2 UPDATE_ENTITIES = new API_V2(ENTITY_BULK_API, HttpMethod.POST, Response.Status.OK);
public static final API_V2 DELETE_ENTITIES_BY_GUIDS = new API_V2(ENTITY_BULK_API, HttpMethod.DELETE, Response.Status.OK);
public static final API_V2 PURGE_ENTITIES_BY_GUIDS = new API_V2(ENTITY_PURGE_API, HttpMethod.DELETE, Response.Status.OK);
public static final API_V2 GET_CLASSIFICATIONS = new API_V2(ENTITY_API + "guid/%s/classifications", HttpMethod.GET, Response.Status.OK);
public static final API_V2 ADD_CLASSIFICATIONS = new API_V2(ENTITY_API + "guid/%s/classifications", HttpMethod.POST, Response.Status.NO_CONTENT);
public static final API_V2 UPDATE_CLASSIFICATIONS = new API_V2(ENTITY_API + "guid/%s/classifications", HttpMethod.PUT, Response.Status.NO_CONTENT);
......
......@@ -56,6 +56,14 @@ public interface EntityChangeListenerV2 {
*/
void onEntitiesDeleted(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException;
/**
* This is upon purging entities from the repository.
*
* @param entities the purged entities
*/
void onEntitiesPurged(List<AtlasEntity> entities) throws AtlasBaseException;
/**
* This is upon adding new classifications to an entity.
*
......@@ -124,6 +132,13 @@ public interface EntityChangeListenerV2 {
void onRelationshipsDeleted(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException;
/**
* This is upon purging relationships from the repository.
*
* @param relationships the purged relationships
*/
void onRelationshipsPurged(List<AtlasRelationship> relationships) throws AtlasBaseException;
/**
* This is upon add new labels to an entity.
*
* @param entity the entity
......
......@@ -50,7 +50,7 @@ public class EntityAuditEventV2 implements Serializable {
ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE,
CLASSIFICATION_ADD, CLASSIFICATION_DELETE, CLASSIFICATION_UPDATE,
PROPAGATED_CLASSIFICATION_ADD, PROPAGATED_CLASSIFICATION_DELETE, PROPAGATED_CLASSIFICATION_UPDATE,
TERM_ADD, TERM_DELETE, LABEL_ADD, LABEL_DELETE;
TERM_ADD, TERM_DELETE, LABEL_ADD, LABEL_DELETE, ENTITY_PURGE;
public static EntityAuditActionV2 fromString(String strValue) {
switch (strValue) {
......@@ -60,6 +60,8 @@ public class EntityAuditEventV2 implements Serializable {
return ENTITY_UPDATE;
case "ENTITY_DELETE":
return ENTITY_DELETE;
case "ENTITY_PURGE":
return ENTITY_PURGE;
case "ENTITY_IMPORT_CREATE":
return ENTITY_IMPORT_CREATE;
case "ENTITY_IMPORT_UPDATE":
......
......@@ -115,6 +115,14 @@ public class EntityMutationResponse {
}
@JsonIgnore
public List<AtlasEntityHeader> getPurgedEntities() {
if ( mutatedEntities != null) {
return mutatedEntities.get(EntityOperation.PURGE);
}
return null;
}
@JsonIgnore
public AtlasEntityHeader getFirstEntityCreated() {
final List<AtlasEntityHeader> entitiesByOperation = getEntitiesByOperation(EntityOperation.CREATE);
if ( entitiesByOperation != null && entitiesByOperation.size() > 0) {
......
......@@ -48,7 +48,8 @@ public class EntityMutations implements Serializable {
CREATE,
UPDATE,
PARTIAL_UPDATE,
DELETE
DELETE,
PURGE
}
public static final class EntityMutation implements Serializable {
......
......@@ -54,6 +54,7 @@ import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_UPDATE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_CREATE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_DELETE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_PURGE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_CREATE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_DELETE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_UPDATE;
......@@ -133,6 +134,23 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
}
@Override
public void onEntitiesPurged(List<AtlasEntity> entities) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasEntity entity : entities) {
EntityAuditEventV2 event = createEvent(entity, ENTITY_PURGE, "Purged entity");
events.add(event);
}
auditRepository.putEventsV2(events);
RequestContext.get().endMetricRecord(metric);
}
@Override
public void onClassificationsAdded(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications)) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
......@@ -470,6 +488,9 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
case ENTITY_DELETE:
ret = "Deleted: ";
break;
case ENTITY_PURGE:
ret = "Purged: ";
break;
case CLASSIFICATION_ADD:
ret = "Added classification: ";
break;
......@@ -521,4 +542,11 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
LOG.debug("Relationship(s) deleted from repository(" + relationships.size() + ")");
}
}
@Override
public void onRelationshipsPurged(List<AtlasRelationship> relationships) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Relationship(s) purged from repository(" + relationships.size() + ")");
}
}
}
......@@ -203,11 +203,17 @@ public interface AtlasEntityStore {
*/
String getGuidByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException;
/*
* Return list of deleted entity guids
*/
EntityMutationResponse deleteByIds(List<String> guid) throws AtlasBaseException;
/*
* Return list of purged entity guids
*/
EntityMutationResponse purgeByIds(Set<String> guids) throws AtlasBaseException;
/**
* Add classification(s)
*/
......
......@@ -106,9 +106,16 @@ public abstract class DeleteHandlerV1 {
String guid = AtlasGraphUtilsV2.getIdFromVertex(instanceVertex);
AtlasEntity.Status state = getState(instanceVertex);
if (state == DELETED || requestContext.isDeletedEntity(guid)) {
boolean needToSkip = requestContext.isPurgeRequested() ? (state == ACTIVE || requestContext.isPurgedEntity(guid)) :
(state == DELETED || requestContext.isDeletedEntity(guid));
if (needToSkip) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping deletion of {} as it is already deleted", guid);
if(RequestContext.get().isPurgeRequested()) {
LOG.debug("Skipping purging of {} as it is active or already purged", guid);
} else {
LOG.debug("Skipping deletion of {} as it is already deleted", guid);
}
}
continue;
......@@ -149,10 +156,15 @@ public abstract class DeleteHandlerV1 {
public void deleteRelationships(Collection<AtlasEdge> edges, final boolean forceDelete) throws AtlasBaseException {
for (AtlasEdge edge : edges) {
boolean isInternal = isInternalType(edge.getInVertex()) && isInternalType(edge.getOutVertex());
boolean needToSkip = !isInternal && (RequestContext.get().isPurgeRequested() ? getState(edge) == ACTIVE : getState(edge) == DELETED);
if (!isInternal && getState(edge) == DELETED) {
if (needToSkip) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping deletion of {} as it is already deleted", getIdFromEdge(edge));
if(RequestContext.get().isPurgeRequested()) {
LOG.debug("Skipping purging of {} as it is active or already purged", getIdFromEdge(edge));
} else{
LOG.debug("Skipping deletion of {} as it is already deleted", getIdFromEdge(edge));
}
}
continue;
......@@ -183,8 +195,10 @@ public abstract class DeleteHandlerV1 {
AtlasVertex vertex = vertices.pop();
AtlasEntity.Status state = getState(vertex);
if (state == DELETED) {
//If the reference vertex is marked for deletion, skip it
//In case of purge If the reference vertex is active then skip it or else
//If the vertex marked for deletion, skip it
boolean needToSkip = RequestContext.get().isPurgeRequested() ? (state == ACTIVE) : (state == DELETED);
if (needToSkip) {
continue;
}
......@@ -221,7 +235,9 @@ public abstract class DeleteHandlerV1 {
} else {
AtlasEdge edge = graphHelper.getEdgeForLabel(vertex, edgeLabel);
if (edge == null || getState(edge) == DELETED) {
needToSkip = (edge == null || RequestContext.get().isPurgeRequested() ?
getState(edge) == ACTIVE : getState(edge) == DELETED);
if (needToSkip) {
continue;
}
......@@ -274,7 +290,9 @@ public abstract class DeleteHandlerV1 {
if (CollectionUtils.isNotEmpty(edges)) {
for (AtlasEdge edge : edges) {
if (edge == null || getState(edge) == DELETED) {
needToSkip = (edge == null || RequestContext.get().isPurgeRequested() ?
getState(edge) == ACTIVE : getState(edge) == DELETED);
if (needToSkip) {
continue;
}
......@@ -838,8 +856,10 @@ public abstract class DeleteHandlerV1 {
final String outId = GraphHelper.getGuid(outVertex);
final Status state = getState(outVertex);
if (state == DELETED || (outId != null && RequestContext.get().isDeletedEntity(outId))) {
//If the reference vertex is marked for deletion, skip updating the reference
boolean needToSkip = RequestContext.get().isPurgeRequested() ? state == ACTIVE || (outId != null && RequestContext.get().isPurgedEntity(outId)) :
state == DELETED || (outId != null && RequestContext.get().isDeletedEntity(outId));
if (needToSkip) {
return;
}
......@@ -954,7 +974,8 @@ public abstract class DeleteHandlerV1 {
for (AtlasEdge edge : incomingEdges) {
Status edgeState = getState(edge);
if (edgeState == ACTIVE) {
boolean isProceed = RequestContext.get().isPurgeRequested()? edgeState == DELETED : edgeState == ACTIVE;
if (isProceed) {
if (isRelationshipEdge(edge)) {
deleteRelationship(edge);
} else {
......
......@@ -102,6 +102,7 @@ public class AtlasEntityChangeNotifier {
List<AtlasEntityHeader> updatedEntities = entityMutationResponse.getUpdatedEntities();
List<AtlasEntityHeader> partiallyUpdatedEntities = entityMutationResponse.getPartialUpdatedEntities();
List<AtlasEntityHeader> deletedEntities = entityMutationResponse.getDeletedEntities();
List<AtlasEntityHeader> purgedEntities = entityMutationResponse.getPurgedEntities();
// complete full text mapping before calling toReferenceables(), from notifyListners(), to
// include all vertex updates in the current graph-transaction
......@@ -113,6 +114,7 @@ public class AtlasEntityChangeNotifier {
notifyListeners(updatedEntities, EntityOperation.UPDATE, isImport);
notifyListeners(partiallyUpdatedEntities, EntityOperation.PARTIAL_UPDATE, isImport);
notifyListeners(deletedEntities, EntityOperation.DELETE, isImport);
notifyListeners(purgedEntities, EntityOperation.PURGE, isImport);
notifyPropagatedEntities();
}
......@@ -340,7 +342,7 @@ public class AtlasEntityChangeNotifier {
private void notifyV1Listeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException {
if (instanceConverter != null) {
if (operation != EntityOperation.PURGE && instanceConverter != null) {
List<Referenceable> typedRefInsts = toReferenceables(entityHeaders, operation);
for (EntityChangeListener listener : entityChangeListeners) {
......@@ -381,6 +383,10 @@ public class AtlasEntityChangeNotifier {
case DELETE:
listener.onEntitiesDeleted(entities, isImport);
break;
case PURGE:
listener.onEntitiesPurged(entities);
break;
}
}
}
......@@ -399,6 +405,9 @@ public class AtlasEntityChangeNotifier {
case DELETE:
listener.onRelationshipsDeleted(relationships, isImport);
break;
case PURGE:
listener.onRelationshipsPurged(relationships);
break;
}
}
}
......@@ -485,7 +494,7 @@ public class AtlasEntityChangeNotifier {
final AtlasEntity entity;
// delete notifications don't need all attributes. Hence the special handling for delete operation
if (operation == EntityOperation.DELETE) {
if (operation == EntityOperation.DELETE || operation == EntityOperation.PURGE) {
entity = new AtlasEntity(entityHeader);
} else {
String entityGuid = entityHeader.getGuid();
......@@ -586,12 +595,23 @@ public class AtlasEntityChangeNotifier {
List<AtlasEntityHeader> updatedEntities = resp.getUpdatedEntities();
List<AtlasEntityHeader> partialUpdatedEntities = resp.getPartialUpdatedEntities();
List<AtlasEntityHeader> deletedEntities = resp.getDeletedEntities();
List<AtlasEntityHeader> purgedEntities = resp.getPurgedEntities();
// remove entities with DELETED status from created & updated lists
purgeDeletedEntities(createdEntities);
purgeDeletedEntities(updatedEntities);
purgeDeletedEntities(partialUpdatedEntities);
// remove entities purged in this mutation from created & updated lists
if (purgedEntities != null) {
for (AtlasEntityHeader entity : purgedEntities) {
purgeEntity(entity.getGuid(), deletedEntities);
purgeEntity(entity.getGuid(), createdEntities);
purgeEntity(entity.getGuid(), updatedEntities);
purgeEntity(entity.getGuid(), partialUpdatedEntities);
}
}
// remove entities deleted in this mutation from created & updated lists
if (deletedEntities != null) {
for (AtlasEntityHeader entity : deletedEntities) {
......
......@@ -22,6 +22,7 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.authorize.AtlasAdminAccessRequest;
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.authorize.AtlasEntityAccessRequest;
import org.apache.atlas.authorize.AtlasPrivilege;
......@@ -43,6 +44,7 @@ import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
import org.apache.atlas.store.DeleteType;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
......@@ -69,8 +71,7 @@ import java.util.Objects;
import java.util.Set;
import static java.lang.Boolean.FALSE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.*;
import static org.apache.atlas.repository.Constants.IS_INCOMPLETE_PROPERTY_KEY;
import static org.apache.atlas.repository.graph.GraphHelper.getCustomAttributes;
import static org.apache.atlas.repository.graph.GraphHelper.isEntityIncomplete;
......@@ -475,6 +476,42 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
@Override
@GraphTransaction
public EntityMutationResponse purgeByIds(Set<String> guids) throws AtlasBaseException {
if (CollectionUtils.isEmpty(guids)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified");
}
AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "purge entity: guids=", guids);
Collection<AtlasVertex> purgeCandidates = new ArrayList<>();
for (String guid : guids) {
AtlasVertex vertex = AtlasGraphUtilsV2.findDeletedByGuid(guid);
if (vertex == null) {
// Entity does not exist - treat as non-error, since the caller
// wanted to delete the entity and it's already gone.
LOG.warn("Purge request ignored for non-existent/active entity with guid " + guid);
continue;
}
purgeCandidates.add(vertex);
}
if (purgeCandidates.isEmpty()) {
LOG.info("No purge candidate entities were found for guids: " + guids + " which is already deleted");
}
EntityMutationResponse ret = purgeVertices(purgeCandidates);
// Notify the change listeners
entityChangeNotifier.onEntitiesMutated(ret, false);
return ret;
}
@Override
@GraphTransaction
public EntityMutationResponse deleteByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException {
if (MapUtils.isEmpty(uniqAttributes)) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, uniqAttributes.toString());
......@@ -1124,6 +1161,21 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
return response;
}
private EntityMutationResponse purgeVertices(Collection<AtlasVertex> purgeCandidates) throws AtlasBaseException {
EntityMutationResponse response = new EntityMutationResponse();
RequestContext req = RequestContext.get();
req.setDeleteType(DeleteType.HARD);
req.setPurgeRequested(true);
deleteDelegate.getHandler().deleteEntities(purgeCandidates); // this will update req with list of purged entities
for (AtlasEntityHeader entity : req.getDeletedEntities()) {
response.addEntity(PURGE, entity);
}
return response;
}
private void validateAndNormalize(AtlasClassification classification) throws AtlasBaseException {
AtlasClassificationType type = typeRegistry.getClassificationTypeByName(classification.getTypeName());
......
......@@ -353,6 +353,26 @@ public class AtlasGraphUtilsV2 {
return ret;
}
public static AtlasVertex findDeletedByGuid(String guid) {
AtlasVertex ret = GraphTransactionInterceptor.getVertexFromCache(guid);
if (ret == null) {
AtlasGraphQuery query = getGraphInstance().query()
.has(Constants.GUID_PROPERTY_KEY, guid)
.has(STATE_PROPERTY_KEY, Status.DELETED.name());
Iterator<AtlasVertex> results = query.vertices().iterator();
ret = results.hasNext() ? results.next() : null;
if (ret != null) {
GraphTransactionInterceptor.addToVertexCache(guid, ret);
}
}
return ret;
}
public static String getTypeNameFromGuid(String guid) {
String ret = null;
......
......@@ -51,6 +51,7 @@ public class RequestContext {
private final long requestTime = System.currentTimeMillis();
private final Map<String, AtlasEntityHeader> updatedEntities = new HashMap<>();
private final Map<String, AtlasEntityHeader> deletedEntities = new HashMap<>();
private final Map<String, AtlasEntityHeader> purgedEntities = new HashMap<>();
private final Map<String, AtlasEntity> entityCache = new HashMap<>();
private final Map<String, AtlasEntityWithExtInfo> entityExtInfoCache = new HashMap<>();
private final Map<String, List<AtlasClassification>> addedPropagations = new HashMap<>();
......@@ -64,6 +65,7 @@ public class RequestContext {
private String clientIPAddress;
private List<String> forwardedAddresses;
private DeleteType deleteType = DeleteType.DEFAULT;
private boolean isPurgeRequested = false;
private int maxAttempts = 1;
private int attemptCount = 1;
private boolean isImportInProgress = false;
......@@ -108,6 +110,7 @@ public class RequestContext {
public void clearCache() {
this.updatedEntities.clear();
this.deletedEntities.clear();
this.purgedEntities.clear();
this.entityCache.clear();
this.entityExtInfoCache.clear();
this.addedPropagations.clear();
......@@ -179,6 +182,10 @@ public class RequestContext {
isImportInProgress = importInProgress;
}
public boolean isPurgeRequested() { return isPurgeRequested; }
public void setPurgeRequested(boolean isPurgeRequested) { this.isPurgeRequested = isPurgeRequested; }
public boolean isInNotificationProcessing() {
return isInNotificationProcessing;
}
......@@ -215,13 +222,18 @@ public class RequestContext {
}
}
public void recordEntityDelete(AtlasEntityHeader entity) {
if (entity != null && entity.getGuid() != null) {
deletedEntities.put(entity.getGuid(), entity);
}
}
public void recordEntityPurge(AtlasEntityHeader entity) {
if (entity != null && entity.getGuid() != null) {
purgedEntities.put(entity.getGuid(), entity);
}
}
public void recordAddedPropagation(String guid, AtlasClassification classification) {
if (StringUtils.isNotEmpty(guid) && classification != null) {
List<AtlasClassification> classifications = addedPropagations.get(guid);
......@@ -302,6 +314,10 @@ public class RequestContext {
return deletedEntities.values();
}
public Collection<AtlasEntityHeader> getPurgedEntities() {
return purgedEntities.values();
}
/**
* Checks if an instance with the given guid is in the cache for this request. Either returns the instance
* or null if it is not in the cache.
......@@ -329,7 +345,9 @@ public class RequestContext {
return deletedEntities.containsKey(guid);
}
public boolean isPurgedEntity(String guid) {
return purgedEntities.containsKey(guid);
}
public MetricRecorder startMetricRecord(String name) { return metrics != null ? metrics.getMetricRecorder(name) : null; }
......
......@@ -97,6 +97,11 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
}
@Override
public void onEntitiesPurged(List<AtlasEntity> entities) throws AtlasBaseException {
// do nothing -> notification not sent out for term purged from entities as its been sent in case of delete
}
@Override
public void onClassificationsAdded(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
notifyEntityEvents(Collections.singletonList(entity), CLASSIFICATION_ADD);
}
......@@ -296,4 +301,9 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
public void onRelationshipsDeleted(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException {
notifyRelationshipEvents(relationships, RELATIONSHIP_DELETE);
}
@Override
public void onRelationshipsPurged(List<AtlasRelationship> relationships) throws AtlasBaseException {
// do nothing -> notification not sent out for term purged from entities as its been sent in case of delete
}
}
\ No newline at end of file
......@@ -37,6 +37,7 @@ import org.apache.atlas.model.impexp.ExportImportAuditEntry;
import org.apache.atlas.model.impexp.MigrationStatus;
import org.apache.atlas.model.instance.AtlasCheckStateRequest;
import org.apache.atlas.model.instance.AtlasCheckStateResult;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.metrics.AtlasMetrics;
import org.apache.atlas.model.patches.AtlasPatch.AtlasPatches;
import org.apache.atlas.repository.impexp.AtlasServerService;
......@@ -56,6 +57,7 @@ import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.web.filters.AtlasCSRFPreventionFilter;
import org.apache.atlas.web.service.ServiceState;
import org.apache.atlas.web.util.Servlets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
......@@ -431,6 +433,30 @@ public class AdminResource {
return result;
}
@DELETE
@Path("/purge")
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE)
public EntityMutationResponse purgeByIds(Set<String> guids) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(guids)) {
for (String guid : guids) {
Servlets.validateQueryParamLength("guid", guid);
}
}
AtlasPerfTracer perf = null;
try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "AdminResource.purgeByGuids(" + guids + ")");
}
return entityStore.purgeByIds(guids);
} finally {
AtlasPerfTracer.log(perf);
}
}
@POST
@Path("/importfile")
@Produces(Servlets.JSON_MEDIA_TYPE)
......
......@@ -21,6 +21,7 @@ package org.apache.atlas.web.integration;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.collect.Lists;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.EntityAuditEvent;
......@@ -49,6 +50,8 @@ import org.testng.annotations.Test;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.testng.Assert.*;
......@@ -60,6 +63,8 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT {
private static final Logger LOG = LoggerFactory.getLogger(EntityV2JerseyResourceIT.class);
private static final String ENTITY_NOTIFICATION_VERSION_PROPERTY = "atlas.notification.entity.version";
private final String DATABASE_NAME = "db" + randomString();
private final String TABLE_NAME = "table" + randomString();
private String traitName;
......@@ -755,20 +760,8 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT {
@Test
public void testDeleteEntities() throws Exception {
// Create 2 database entities
AtlasEntity db1 = new AtlasEntity(DATABASE_TYPE_V2);
String dbName1 = randomString();
db1.setAttribute("name", dbName1);
db1.setAttribute(NAME, dbName1);
db1.setAttribute("clusterName", randomString());
db1.setAttribute("description", randomString());
AtlasEntityHeader entity1Header = createEntity(db1);
AtlasEntity db2 = new AtlasEntity(DATABASE_TYPE_V2);
String dbName2 = randomString();
db2.setAttribute("name", dbName2);
db2.setAttribute(NAME, dbName2);
db2.setAttribute("clusterName", randomString());
db2.setAttribute("description", randomString());
AtlasEntityHeader entity2Header = createEntity(db2);
AtlasEntityHeader entity1Header = createRandomDatabaseEntity();
AtlasEntityHeader entity2Header = createRandomDatabaseEntity();
// Delete the database entities
EntityMutationResponse deleteResponse = atlasClientV2.deleteEntitiesByGuids(Arrays.asList(entity1Header.getGuid(), entity2Header.getGuid()));
......@@ -782,6 +775,49 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT {
}
@Test
public void testPurgeEntities() throws Exception {
// Create 2 database entities
AtlasEntityHeader entity1Header = createRandomDatabaseEntity();
AtlasEntityHeader entity2Header = createRandomDatabaseEntity();
ApplicationProperties.get().setProperty(ENTITY_NOTIFICATION_VERSION_PROPERTY, "v2");
// Delete the database entities
EntityMutationResponse deleteResponse = atlasClientV2.deleteEntitiesByGuids(Arrays.asList(entity1Header.getGuid(), entity2Header.getGuid()));
// Verify that deleteEntities() response has database entity guids
assertNotNull(deleteResponse);
assertNotNull(deleteResponse.getEntitiesByOperation(EntityMutations.EntityOperation.DELETE));
assertEquals(deleteResponse.getEntitiesByOperation(EntityMutations.EntityOperation.DELETE).size(), 2);
Thread.sleep(1000);
// Purge the database entities
Set<String> guids = Stream.of(entity1Header.getGuid(), entity2Header.getGuid()).collect(Collectors.toSet());
EntityMutationResponse purgeResponse = atlasClientV2.purgeEntitiesByGuids(guids);
// Verify that purgeEntities() response has database entity guids
assertNotNull(purgeResponse);
assertNotNull(purgeResponse.getEntitiesByOperation(EntityMutations.EntityOperation.PURGE));
assertEquals(purgeResponse.getEntitiesByOperation(EntityMutations.EntityOperation.PURGE).size(), 2);
}
@Test
public void testPurgeEntitiesWithoutDelete() throws Exception {
// Create 2 database entities
AtlasEntityHeader entity1Header = createRandomDatabaseEntity();
AtlasEntityHeader entity2Header = createRandomDatabaseEntity();
// Purge the database entities without delete
Set<String> guids = Stream.of(entity1Header.getGuid(), entity2Header.getGuid()).collect(Collectors.toSet());
EntityMutationResponse purgeResponse = atlasClientV2.purgeEntitiesByGuids(guids);
// Verify that purgeEntities() response has database entity guids
assertNotNull(purgeResponse);
assertNull(purgeResponse.getEntitiesByOperation(EntityMutations.EntityOperation.PURGE));
}
@Test
public void testDeleteEntityByUniqAttribute() throws Exception {
// Create database entity
AtlasEntity hiveDB = createHiveDB(DATABASE_NAME + randomUTF8());
......@@ -802,4 +838,14 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT {
put(name, value);
}};
}
private AtlasEntityHeader createRandomDatabaseEntity() {
AtlasEntity db = new AtlasEntity(DATABASE_TYPE_V2);
String dbName = randomString();
db.setAttribute("name", dbName);
db.setAttribute(NAME, dbName);
db.setAttribute("clusterName", randomString());
db.setAttribute("description", randomString());
return createEntity(db);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment