Commit 154dda0e by Madhan Neethiraj

ATLAS-2169: delete request fails when hard-delete is configured

parent ad6b07a9
...@@ -195,7 +195,7 @@ public class EntityMutationResponse { ...@@ -195,7 +195,7 @@ public class EntityMutationResponse {
public void addEntity(EntityOperation op, AtlasEntityHeader header) { public void addEntity(EntityOperation op, AtlasEntityHeader header) {
// if an entity is already included in CREATE, ignore subsequent UPDATE, PARTIAL_UPDATE // if an entity is already included in CREATE, ignore subsequent UPDATE, PARTIAL_UPDATE
if (op == EntityOperation.UPDATE || op == EntityOperation.PARTIAL_UPDATE) { if (op == EntityOperation.UPDATE || op == EntityOperation.PARTIAL_UPDATE) {
if (entityHeaderExists(getCreatedEntities(), header)) { if (entityHeaderExists(getCreatedEntities(), header.getGuid())) {
return; return;
} }
} }
...@@ -211,17 +211,42 @@ public class EntityMutationResponse { ...@@ -211,17 +211,42 @@ public class EntityMutationResponse {
mutatedEntities.put(op, opEntities); mutatedEntities.put(op, opEntities);
} }
if (!entityHeaderExists(opEntities, header)) { if (!entityHeaderExists(opEntities, header.getGuid())) {
opEntities.add(header); opEntities.add(header);
} }
} }
private boolean entityHeaderExists(List<AtlasEntityHeader> entityHeaders, AtlasEntityHeader newEntityHeader) { @JsonIgnore
public void addEntity(EntityOperation op, AtlasObjectId entity) {
if (mutatedEntities == null) {
mutatedEntities = new HashMap<>();
} else {
// if an entity is already included in CREATE, ignore subsequent UPDATE, PARTIAL_UPDATE
if (op == EntityOperation.UPDATE || op == EntityOperation.PARTIAL_UPDATE) {
if (entityHeaderExists(getCreatedEntities(), entity.getGuid())) {
return;
}
}
}
List<AtlasEntityHeader> opEntities = mutatedEntities.get(op);
if (opEntities == null) {
opEntities = new ArrayList<>();
mutatedEntities.put(op, opEntities);
}
if (!entityHeaderExists(opEntities, entity.getGuid())) {
opEntities.add(new AtlasEntityHeader(entity.getTypeName(), entity.getGuid(), entity.getUniqueAttributes()));
}
}
private boolean entityHeaderExists(List<AtlasEntityHeader> entityHeaders, String guid) {
boolean ret = false; boolean ret = false;
if (CollectionUtils.isNotEmpty(entityHeaders) && newEntityHeader != null) { if (CollectionUtils.isNotEmpty(entityHeaders) && guid != null) {
for (AtlasEntityHeader entityHeader : entityHeaders) { for (AtlasEntityHeader entityHeader : entityHeaders) {
if (StringUtils.equals(entityHeader.getGuid(), newEntityHeader.getGuid())) { if (StringUtils.equals(entityHeader.getGuid(), guid)) {
ret = true; ret = true;
break; break;
} }
......
...@@ -68,9 +68,6 @@ import java.util.Objects; ...@@ -68,9 +68,6 @@ import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.BOTH;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
/** /**
* Utility class for graph operations. * Utility class for graph operations.
...@@ -888,24 +885,23 @@ public final class GraphHelper { ...@@ -888,24 +885,23 @@ public final class GraphHelper {
* Guid and AtlasVertex combo * Guid and AtlasVertex combo
*/ */
public static class VertexInfo { public static class VertexInfo {
private String guid; private final AtlasObjectId entity;
private AtlasVertex vertex; private final AtlasVertex vertex;
private String typeName;
public VertexInfo(String guid, AtlasVertex vertex, String typeName) { public VertexInfo(AtlasObjectId entity, AtlasVertex vertex) {
this.guid = guid; this.entity = entity;
this.vertex = vertex; this.vertex = vertex;
this.typeName = typeName;
} }
public String getGuid() { public AtlasObjectId getEntity() { return entity; }
return guid;
}
public AtlasVertex getVertex() { public AtlasVertex getVertex() {
return vertex; return vertex;
} }
public String getGuid() {
return entity.getGuid();
}
public String getTypeName() { public String getTypeName() {
return typeName; return entity.getTypeName();
} }
@Override @Override
...@@ -913,14 +909,13 @@ public final class GraphHelper { ...@@ -913,14 +909,13 @@ public final class GraphHelper {
if (this == o) return true; if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
VertexInfo that = (VertexInfo) o; VertexInfo that = (VertexInfo) o;
return Objects.equals(guid, that.guid) && return Objects.equals(entity, that.entity) &&
Objects.equals(vertex, that.vertex) && Objects.equals(vertex, that.vertex);
Objects.equals(typeName, that.typeName);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(guid, vertex, typeName); return Objects.hash(entity, vertex);
} }
} }
...@@ -1304,39 +1299,4 @@ public final class GraphHelper { ...@@ -1304,39 +1299,4 @@ public final class GraphHelper {
return StringUtils.isNotEmpty(edge.getLabel()) ? edgeLabel.startsWith("r:") : false; return StringUtils.isNotEmpty(edge.getLabel()) ? edgeLabel.startsWith("r:") : false;
} }
public static AtlasObjectId getReferenceObjectId(AtlasEdge edge, AtlasRelationshipEdgeDirection relationshipDirection,
AtlasVertex parentVertex) {
AtlasObjectId ret = null;
if (relationshipDirection == OUT) {
ret = getAtlasObjectIdForInVertex(edge);
} else if (relationshipDirection == IN) {
ret = getAtlasObjectIdForOutVertex(edge);
} else if (relationshipDirection == BOTH){
// since relationship direction is BOTH, edge direction can be inward or outward
// compare with parent entity vertex and pick the right reference vertex
if (verticesEquals(parentVertex, edge.getOutVertex())) {
ret = getAtlasObjectIdForInVertex(edge);
} else {
ret = getAtlasObjectIdForOutVertex(edge);
}
}
return ret;
}
public static AtlasObjectId getAtlasObjectIdForOutVertex(AtlasEdge edge) {
return new AtlasObjectId(getGuid(edge.getOutVertex()), getTypeName(edge.getOutVertex()));
}
public static AtlasObjectId getAtlasObjectIdForInVertex(AtlasEdge edge) {
return new AtlasObjectId(getGuid(edge.getInVertex()), getTypeName(edge.getInVertex()));
}
private static boolean verticesEquals(AtlasVertex vertexA, AtlasVertex vertexB) {
return StringUtils.equals(getGuid(vertexB), getGuid(vertexA));
}
} }
\ No newline at end of file
...@@ -158,7 +158,7 @@ public class AtlasEntityChangeNotifier { ...@@ -158,7 +158,7 @@ public class AtlasEntityChangeNotifier {
return; return;
} }
List<Referenceable> typedRefInsts = toReferenceables(entityHeaders); List<Referenceable> typedRefInsts = toReferenceables(entityHeaders, operation);
for (EntityChangeListener listener : entityChangeListeners) { for (EntityChangeListener listener : entityChangeListeners) {
try { try {
...@@ -180,11 +180,18 @@ public class AtlasEntityChangeNotifier { ...@@ -180,11 +180,18 @@ public class AtlasEntityChangeNotifier {
} }
} }
private List<Referenceable> toReferenceables(List<AtlasEntityHeader> entityHeaders) throws AtlasBaseException { private List<Referenceable> toReferenceables(List<AtlasEntityHeader> entityHeaders, EntityOperation operation) throws AtlasBaseException {
List<Referenceable> ret = new ArrayList<>(entityHeaders.size()); List<Referenceable> ret = new ArrayList<>(entityHeaders.size());
for (AtlasEntityHeader entityHeader : entityHeaders) { // delete notifications don't need all attributes. Hence the special handling for delete operation
ret.add(toReferenceable(entityHeader.getGuid())); if (operation == EntityOperation.DELETE) {
for (AtlasEntityHeader entityHeader : entityHeaders) {
ret.add(new Referenceable(entityHeader.getGuid(), entityHeader.getTypeName(), entityHeader.getAttributes()));
}
} else {
for (AtlasEntityHeader entityHeader : entityHeaders) {
ret.add(toReferenceable(entityHeader.getGuid()));
}
} }
return ret; return ret;
......
...@@ -647,14 +647,16 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { ...@@ -647,14 +647,16 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
private EntityMutationResponse deleteVertices(Collection<AtlasVertex> deletionCandidates) throws AtlasBaseException { private EntityMutationResponse deleteVertices(Collection<AtlasVertex> deletionCandidates) throws AtlasBaseException {
EntityMutationResponse response = new EntityMutationResponse(); EntityMutationResponse response = new EntityMutationResponse();
deleteHandler.deleteEntities(deletionCandidates); RequestContextV1 req = RequestContextV1.get();
RequestContextV1 req = RequestContextV1.get();
for (AtlasObjectId id : req.getDeletedEntityIds()) { deleteHandler.deleteEntities(deletionCandidates); // this will update req with list of deleted/updated entities
response.addEntity(DELETE, EntityGraphMapper.constructHeader(id));
for (AtlasObjectId entity : req.getDeletedEntities()) {
response.addEntity(DELETE, entity);
} }
for (AtlasObjectId id : req.getUpdatedEntityIds()) { for (AtlasObjectId entity : req.getUpdatedEntities()) {
response.addEntity(UPDATE, EntityGraphMapper.constructHeader(id)); response.addEntity(UPDATE, entity);
} }
return response; return response;
......
...@@ -85,6 +85,7 @@ public class EntityGraphMapper { ...@@ -85,6 +85,7 @@ public class EntityGraphMapper {
private final AtlasGraph graph; private final AtlasGraph graph;
private final DeleteHandlerV1 deleteHandler; private final DeleteHandlerV1 deleteHandler;
private final AtlasTypeRegistry typeRegistry; private final AtlasTypeRegistry typeRegistry;
private final EntityGraphRetriever entityRetriever;
private final AtlasRelationshipStore relationshipStore; private final AtlasRelationshipStore relationshipStore;
@Inject @Inject
...@@ -92,6 +93,7 @@ public class EntityGraphMapper { ...@@ -92,6 +93,7 @@ public class EntityGraphMapper {
AtlasRelationshipStore relationshipStore) { AtlasRelationshipStore relationshipStore) {
this.deleteHandler = deleteHandler; this.deleteHandler = deleteHandler;
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.entityRetriever = new EntityGraphRetriever(typeRegistry);
this.graph = atlasGraph; this.graph = atlasGraph;
this.relationshipStore = relationshipStore; this.relationshipStore = relationshipStore;
} }
...@@ -192,16 +194,16 @@ public class EntityGraphMapper { ...@@ -192,16 +194,16 @@ public class EntityGraphMapper {
RequestContextV1 req = RequestContextV1.get(); RequestContextV1 req = RequestContextV1.get();
for (AtlasObjectId id : req.getDeletedEntityIds()) { for (AtlasObjectId entity : req.getDeletedEntities()) {
resp.addEntity(DELETE, constructHeader(id)); resp.addEntity(DELETE, entity);
} }
for (AtlasObjectId id : req.getUpdatedEntityIds()) { for (AtlasObjectId entity : req.getUpdatedEntities()) {
if (isPartialUpdate) { if (isPartialUpdate) {
resp.addEntity(PARTIAL_UPDATE, constructHeader(id)); resp.addEntity(PARTIAL_UPDATE, entity);
} }
else { else {
resp.addEntity(UPDATE, constructHeader(id)); resp.addEntity(UPDATE, entity);
} }
} }
...@@ -423,7 +425,6 @@ public class EntityGraphMapper { ...@@ -423,7 +425,6 @@ public class EntityGraphMapper {
AtlasVertex attrVertex = context.getDiscoveryContext().getResolvedEntityVertex(getGuid(ctx.getValue())); AtlasVertex attrVertex = context.getDiscoveryContext().getResolvedEntityVertex(getGuid(ctx.getValue()));
recordEntityUpdate(attrVertex); recordEntityUpdate(attrVertex);
updateModificationMetadata(attrVertex);
} }
//delete old reference //delete old reference
...@@ -494,9 +495,13 @@ public class EntityGraphMapper { ...@@ -494,9 +495,13 @@ public class EntityGraphMapper {
} }
if (inverseUpdated) { if (inverseUpdated) {
updateModificationMetadata(inverseVertex); RequestContextV1 requestContext = RequestContextV1.get();
AtlasObjectId inverseEntityId = new AtlasObjectId(getIdFromVertex(inverseVertex), inverseType.getTypeName());
RequestContextV1.get().recordEntityUpdate(inverseEntityId); if (!requestContext.isDeletedEntity(GraphHelper.getGuid(inverseVertex))) {
updateModificationMetadata(inverseVertex);
requestContext.recordEntityUpdate(entityRetriever.toAtlasObjectId(inverseVertex));
}
} }
} }
...@@ -1441,31 +1446,14 @@ public class EntityGraphMapper { ...@@ -1441,31 +1446,14 @@ public class EntityGraphMapper {
return ret; return ret;
} }
private void recordEntityUpdate(AtlasVertex vertex) { private void recordEntityUpdate(AtlasVertex vertex) throws AtlasBaseException {
AtlasObjectId objectId = new AtlasObjectId(GraphHelper.getGuid(vertex), GraphHelper.getTypeName(vertex)); RequestContextV1 req = RequestContextV1.get();
RequestContextV1 req = RequestContextV1.get();
if (!objectIdsContain(req.getUpdatedEntityIds(), objectId) && !objectIdsContain(req.getCreatedEntityIds(), objectId)) {
req.recordEntityUpdate(objectId);
}
}
private boolean objectIdsContain(Collection<AtlasObjectId> objectIds, AtlasObjectId objectId) {
boolean ret = false;
if (CollectionUtils.isEmpty(objectIds)) { if (!req.isUpdatedEntity(GraphHelper.getGuid(vertex))) {
ret = false; updateModificationMetadata(vertex);
} else { req.recordEntityUpdate(entityRetriever.toAtlasObjectId(vertex));
for (AtlasObjectId id : objectIds) {
if (StringUtils.equals(id.getGuid(), objectId.getGuid())) {
ret = true;
break;
}
}
} }
return ret;
} }
private static void compactAttributes(AtlasEntity entity) { private static void compactAttributes(AtlasEntity entity) {
......
...@@ -147,6 +147,70 @@ public final class EntityGraphRetriever { ...@@ -147,6 +147,70 @@ public final class EntityGraphRetriever {
return atlasVertex != null ? mapVertexToAtlasEntityHeader(atlasVertex, attributes) : null; return atlasVertex != null ? mapVertexToAtlasEntityHeader(atlasVertex, attributes) : null;
} }
public AtlasEntityHeader toAtlasEntityHeader(AtlasEntity entity) {
AtlasEntityHeader ret = null;
String typeName = entity.getTypeName();
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
if (entityType != null) {
Map<String, Object> uniqueAttributes = new HashMap<>();
for (AtlasAttribute attribute : entityType.getUniqAttributes().values()) {
Object attrValue = entity.getAttribute(attribute.getName());
if (attrValue != null) {
uniqueAttributes.put(attribute.getName(), attrValue);
}
}
ret = new AtlasEntityHeader(entity.getTypeName(), entity.getGuid(), uniqueAttributes);
}
return ret;
}
public AtlasObjectId toAtlasObjectId(AtlasVertex entityVertex) throws AtlasBaseException {
AtlasObjectId ret = null;
String typeName = entityVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
if (entityType != null) {
Map<String, Object> uniqueAttributes = new HashMap<>();
for (AtlasAttribute attribute : entityType.getUniqAttributes().values()) {
Object attrValue = getVertexAttribute(entityVertex, attribute);
if (attrValue != null) {
uniqueAttributes.put(attribute.getName(), attrValue);
}
}
ret = new AtlasObjectId(entityVertex.getProperty(Constants.GUID_PROPERTY_KEY, String.class), typeName, uniqueAttributes);
}
return ret;
}
public AtlasVertex getReferencedEntityVertex(AtlasEdge edge, AtlasRelationshipEdgeDirection relationshipDirection, AtlasVertex parentVertex) throws AtlasBaseException {
AtlasVertex entityVertex = null;
if (relationshipDirection == OUT) {
entityVertex = edge.getInVertex();
} else if (relationshipDirection == IN) {
entityVertex = edge.getOutVertex();
} else if (relationshipDirection == BOTH){
// since relationship direction is BOTH, edge direction can be inward or outward
// compare with parent entity vertex and pick the right reference vertex
if (StringUtils.equals(GraphHelper.getGuid(parentVertex), GraphHelper.getGuid(edge.getOutVertex()))) {
entityVertex = edge.getInVertex();
} else {
entityVertex = edge.getOutVertex();
}
}
return entityVertex;
}
public AtlasVertex getEntityVertex(String guid) throws AtlasBaseException { public AtlasVertex getEntityVertex(String guid) throws AtlasBaseException {
AtlasVertex ret = AtlasGraphUtilsV1.findByGuid(guid); AtlasVertex ret = AtlasGraphUtilsV1.findByGuid(guid);
......
...@@ -31,18 +31,15 @@ public class RequestContextV1 { ...@@ -31,18 +31,15 @@ public class RequestContextV1 {
private static final ThreadLocal<RequestContextV1> CURRENT_CONTEXT = new ThreadLocal<>(); private static final ThreadLocal<RequestContextV1> CURRENT_CONTEXT = new ThreadLocal<>();
private Set<AtlasObjectId> createdEntityIds = new LinkedHashSet<>(); private final Map<String, AtlasObjectId> updatedEntities = new HashMap<>();
private Set<AtlasObjectId> updatedEntityIds = new LinkedHashSet<>(); private final Map<String, AtlasObjectId> deletedEntities = new HashMap<>();
private Set<AtlasObjectId> deletedEntityIds = new LinkedHashSet<>(); private final Map<String, AtlasEntityWithExtInfo> entityCacheV2 = new HashMap<>();
private Map<String, AtlasEntityWithExtInfo> entityCacheV2 = new HashMap<>(); private final Metrics metrics = new Metrics();
private final long requestTime = System.currentTimeMillis();
private String user; private String user;
private final long requestTime;
private Metrics metrics = new Metrics();
private RequestContextV1() { private RequestContextV1() {
requestTime = System.currentTimeMillis();
} }
//To handle gets from background threads where createContext() is not called //To handle gets from background threads where createContext() is not called
...@@ -62,9 +59,9 @@ public class RequestContextV1 { ...@@ -62,9 +59,9 @@ public class RequestContextV1 {
RequestContextV1 instance = CURRENT_CONTEXT.get(); RequestContextV1 instance = CURRENT_CONTEXT.get();
if (instance != null) { if (instance != null) {
if (instance.entityCacheV2 != null) { instance.updatedEntities.clear();
instance.entityCacheV2.clear(); instance.deletedEntities.clear();
} instance.entityCacheV2.clear();
} }
CURRENT_CONTEXT.remove(); CURRENT_CONTEXT.remove();
...@@ -78,24 +75,16 @@ public class RequestContextV1 { ...@@ -78,24 +75,16 @@ public class RequestContextV1 {
this.user = user; this.user = user;
} }
public void recordEntityCreate(Collection<AtlasObjectId> createdEntityIds) { public void recordEntityUpdate(AtlasObjectId entity) {
this.createdEntityIds.addAll(createdEntityIds); if (entity != null && entity.getGuid() != null) {
} updatedEntities.put(entity.getGuid(), entity);
}
public void recordEntityCreate(AtlasObjectId createdEntityId) {
this.createdEntityIds.add(createdEntityId);
}
public void recordEntityUpdate(Collection<AtlasObjectId> updatedEntityIds) {
this.updatedEntityIds.addAll(updatedEntityIds);
}
public void recordEntityUpdate(AtlasObjectId entityId) {
this.updatedEntityIds.add(entityId);
} }
public void recordEntityDelete(AtlasObjectId entityId) { public void recordEntityDelete(AtlasObjectId entity) {
deletedEntityIds.add(entityId); if (entity != null && entity.getGuid() != null) {
deletedEntities.put(entity.getGuid(), entity);
}
} }
/** /**
...@@ -108,16 +97,12 @@ public class RequestContextV1 { ...@@ -108,16 +97,12 @@ public class RequestContextV1 {
} }
} }
public Collection<AtlasObjectId> getCreatedEntityIds() { public Collection<AtlasObjectId> getUpdatedEntities() {
return createdEntityIds; return updatedEntities.values();
}
public Collection<AtlasObjectId> getUpdatedEntityIds() {
return updatedEntityIds;
} }
public Collection<AtlasObjectId> getDeletedEntityIds() { public Collection<AtlasObjectId> getDeletedEntities() {
return deletedEntityIds; return deletedEntities.values();
} }
/** /**
...@@ -135,8 +120,12 @@ public class RequestContextV1 { ...@@ -135,8 +120,12 @@ public class RequestContextV1 {
return requestTime; return requestTime;
} }
public boolean isDeletedEntity(AtlasObjectId entityId) { public boolean isUpdatedEntity(String guid) {
return deletedEntityIds.contains(entityId); return updatedEntities.containsKey(guid);
}
public boolean isDeletedEntity(String guid) {
return deletedEntities.containsKey(guid);
} }
public static Metrics getMetrics() { public static Metrics getMetrics() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment