Commit 967bf67e by Sarath Subramanian

ATLAS-2643: Re-evaluate tag propagation when a relationship edge is deleted

parent bdf16a5f
......@@ -86,6 +86,7 @@ import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelation
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TAG_PROPAGATION_IMPACTED_INSTANCES;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TAG_PROPAGATION_IMPACTED_INSTANCES_EXCLUDE_RELATIONSHIP;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TAG_PROPAGATION_IMPACTED_INSTANCES_WITH_RESTRICTIONS;
......@@ -879,15 +880,24 @@ public final class GraphHelper {
}
public List<AtlasVertex> getImpactedVerticesWithRestrictions(String guid, String classificationId) throws AtlasBaseException {
return getImpactedVerticesWithRestrictions(guid, classificationId, null);
}
public List<AtlasVertex> getImpactedVerticesWithRestrictions(String guid, String classificationId, String guidRelationshipToExclude) throws AtlasBaseException {
ScriptEngine scriptEngine = graph.getGremlinScriptEngine();
Bindings bindings = scriptEngine.createBindings();
String query = queryProvider.getQuery(TAG_PROPAGATION_IMPACTED_INSTANCES_WITH_RESTRICTIONS);
List<AtlasVertex> ret = new ArrayList<>();
String query = queryProvider.getQuery(TAG_PROPAGATION_IMPACTED_INSTANCES_WITH_RESTRICTIONS);
bindings.put("g", graph);
bindings.put("guid", guid);
bindings.put("classificationId", classificationId);
if (guidRelationshipToExclude != null) {
query = queryProvider.getQuery(TAG_PROPAGATION_IMPACTED_INSTANCES_EXCLUDE_RELATIONSHIP);
bindings.put("guidRelationshipToExclude", guidRelationshipToExclude);
}
try {
Object resultObj = graph.executeGremlinScript(scriptEngine, bindings, query, false);
......@@ -1013,6 +1023,26 @@ public final class GraphHelper {
return ret;
}
public Map<AtlasVertex, List<AtlasVertex>> getClassificationPropagatedEntitiesMapping(List<AtlasVertex> classificationVertices) throws AtlasBaseException {
return getClassificationPropagatedEntitiesMapping(classificationVertices, null);
}
public Map<AtlasVertex, List<AtlasVertex>> getClassificationPropagatedEntitiesMapping(List<AtlasVertex> classificationVertices, String guidRelationshipToExclude) throws AtlasBaseException {
Map<AtlasVertex, List<AtlasVertex>> ret = new HashMap<>();
if (CollectionUtils.isNotEmpty(classificationVertices)) {
for (AtlasVertex classificationVertex : classificationVertices) {
String classificationId = classificationVertex.getIdForDisplay();
String sourceEntityId = getClassificationEntityGuid(classificationVertex);
List<AtlasVertex> entitiesPropagatingTo = getImpactedVerticesWithRestrictions(sourceEntityId, classificationId, guidRelationshipToExclude);
ret.put(classificationVertex, entitiesPropagatingTo);
}
}
return ret;
}
public static List<AtlasVertex> getPropagationEnabledClassificationVertices(AtlasVertex entityVertex) {
List<AtlasVertex> ret = new ArrayList<>();
Iterable edges = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL).edges();
......
......@@ -252,17 +252,8 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_ALREADY_DELETED, guid);
}
// remove tag propagations
List<AtlasVertex> propagatedClassificationVertices = getClassificationVertices(edge);
deleteHandler.deleteRelationships(Collections.singleton(edge));
for (AtlasVertex classificationVertex : propagatedClassificationVertices) {
List<AtlasVertex> removePropagationFromVertices = graphHelper.getPropagatedEntityVertices(classificationVertex);
deleteHandler.removeTagPropagation(classificationVertex, removePropagationFromVertices);
}
// notify entities for added/removed classification propagation
entityChangeNotifier.notifyPropagatedEntities();
......@@ -460,14 +451,14 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
if (newTagPropagation != oldTagPropagation) {
List<AtlasVertex> currentClassificationVertices = getClassificationVertices(edge);
Map<AtlasVertex, List<AtlasVertex>> currentClassificationsMap = getClassificationPropagatedEntitiesMapping(currentClassificationVertices);
Map<AtlasVertex, List<AtlasVertex>> currentClassificationsMap = graphHelper.getClassificationPropagatedEntitiesMapping(currentClassificationVertices);
// Update propagation edge
AtlasGraphUtilsV1.setProperty(edge, Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, newTagPropagation.name());
List<AtlasVertex> updatedClassificationVertices = getClassificationVertices(edge);
List<AtlasVertex> classificationVerticesUnion = (List<AtlasVertex>) CollectionUtils.union(currentClassificationVertices, updatedClassificationVertices);
Map<AtlasVertex, List<AtlasVertex>> updatedClassificationsMap = getClassificationPropagatedEntitiesMapping(classificationVerticesUnion);
Map<AtlasVertex, List<AtlasVertex>> updatedClassificationsMap = graphHelper.getClassificationPropagatedEntitiesMapping(classificationVerticesUnion);
// compute add/remove propagations list
Map<AtlasVertex, List<AtlasVertex>> addPropagationsMap = new HashMap<>();
......@@ -510,22 +501,6 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
}
}
private Map<AtlasVertex, List<AtlasVertex>> getClassificationPropagatedEntitiesMapping(List<AtlasVertex> classificationVertices) throws AtlasBaseException {
Map<AtlasVertex, List<AtlasVertex>> ret = new HashMap<>();
if (CollectionUtils.isNotEmpty(classificationVertices)) {
for (AtlasVertex classificationVertex : classificationVertices) {
String classificationId = classificationVertex.getIdForDisplay();
String sourceEntityId = getClassificationEntityGuid(classificationVertex);
List<AtlasVertex> entitiesPropagatingTo = graphHelper.getImpactedVerticesWithRestrictions(sourceEntityId, classificationId);
ret.put(classificationVertex, entitiesPropagatingTo);
}
}
return ret;
}
private void validateRelationship(AtlasRelationship relationship) throws AtlasBaseException {
if (relationship == null) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "AtlasRelationship is null");
......
......@@ -44,6 +44,7 @@ import org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdg
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -56,6 +57,7 @@ import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.
import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_NAME_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TRAIT_NAMES_PROPERTY_KEY;
import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX;
import static org.apache.atlas.repository.graph.GraphHelper.addToPropagatedTraitNames;
......@@ -63,9 +65,9 @@ import static org.apache.atlas.repository.graph.GraphHelper.getAllClassification
import static org.apache.atlas.repository.graph.GraphHelper.getAssociatedEntityVertex;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdge;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdgeState;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdges;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityGuid;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationName;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertices;
import static org.apache.atlas.repository.graph.GraphHelper.getGuid;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedClassificationEdge;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedEdges;
......@@ -74,7 +76,6 @@ import static org.apache.atlas.repository.graph.GraphHelper.getRelationshipGuid;
import static org.apache.atlas.repository.graph.GraphHelper.getTraitNames;
import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
import static org.apache.atlas.repository.graph.GraphHelper.isPropagatedClassificationEdge;
import static org.apache.atlas.repository.graph.GraphHelper.isRelationshipEdge;
import static org.apache.atlas.repository.graph.GraphHelper.string;
import static org.apache.atlas.repository.graph.GraphHelper.updateModificationMetadata;
import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getIdFromEdge;
......@@ -438,6 +439,49 @@ public abstract class DeleteHandlerV1 {
return ret;
}
public void removeTagPropagation(AtlasEdge edge) throws AtlasBaseException {
if (edge == null || !isRelationshipEdge(edge)) {
return;
}
List<AtlasVertex> currentClassificationVertices = getClassificationVertices(edge);
Map<AtlasVertex, List<AtlasVertex>> currentClassificationsMap = graphHelper.getClassificationPropagatedEntitiesMapping(currentClassificationVertices);
Map<AtlasVertex, List<AtlasVertex>> updatedClassificationsMap = graphHelper.getClassificationPropagatedEntitiesMapping(currentClassificationVertices, getRelationshipGuid(edge));
Map<AtlasVertex, List<AtlasVertex>> removePropagationsMap = new HashMap<>();
if (MapUtils.isNotEmpty(currentClassificationsMap) && MapUtils.isEmpty(updatedClassificationsMap)) {
removePropagationsMap.putAll(currentClassificationsMap);
} else {
for (AtlasVertex classificationVertex : updatedClassificationsMap.keySet()) {
List<AtlasVertex> currentPropagatingEntities = currentClassificationsMap.containsKey(classificationVertex) ? currentClassificationsMap.get(classificationVertex) : Collections.emptyList();
List<AtlasVertex> updatedPropagatingEntities = updatedClassificationsMap.containsKey(classificationVertex) ? updatedClassificationsMap.get(classificationVertex) : Collections.emptyList();
List<AtlasVertex> entitiesRemoved = (List<AtlasVertex>) CollectionUtils.subtract(currentPropagatingEntities, updatedPropagatingEntities);
if (CollectionUtils.isNotEmpty(entitiesRemoved)) {
removePropagationsMap.put(classificationVertex, entitiesRemoved);
}
}
}
for (AtlasVertex classificationVertex : removePropagationsMap.keySet()) {
removeTagPropagation(classificationVertex, removePropagationsMap.get(classificationVertex));
}
}
public boolean isRelationshipEdge(AtlasEdge edge) {
boolean ret = false;
if (edge != null) {
String outVertexType = getTypeName(edge.getOutVertex());
String inVertexType = getTypeName(edge.getInVertex());
ret = GraphHelper.isRelationshipEdge(edge) || edge.getPropertyKeys().contains(RELATIONSHIP_GUID_PROPERTY_KEY) ||
(typeRegistry.getEntityTypeByName(outVertexType) != null && typeRegistry.getEntityTypeByName(inVertexType) != null);
}
return ret;
}
public List<AtlasVertex> removeTagPropagation(AtlasVertex classificationVertex) throws AtlasBaseException {
List<AtlasVertex> ret = new ArrayList<>();
......
......@@ -52,6 +52,9 @@ public class HardDeleteHandlerV1 extends DeleteHandlerV1 {
LOG.debug("==> HardDeleteHandlerV1.deleteEdge({}, {})", GraphHelper.string(edge), force);
}
// re-evaluate tag propagation
removeTagPropagation(edge);
graphHelper.removeEdge(edge);
}
}
......@@ -68,6 +68,9 @@ public class SoftDeleteHandlerV1 extends DeleteHandlerV1 {
LOG.debug("==> SoftDeleteHandlerV1.deleteEdge({}, {})",GraphHelper.string(edge), force);
}
// re-evaluate tag propagation
removeTagPropagation(edge);
if (force) {
graphHelper.removeEdge(edge);
} else {
......
......@@ -79,6 +79,14 @@ public class AtlasGremlin3QueryProvider extends AtlasGremlin2QueryProvider {
".repeat(union(outE().has('__state', 'ACTIVE').has('tagPropagation', within('ONE_TO_TWO', 'BOTH')).has('_r__guid', neq(relationshipGuid)).inV(), " +
"inE().has('__state', 'ACTIVE').has('tagPropagation', within('TWO_TO_ONE', 'BOTH')).has('_r__guid', neq(relationshipGuid)).outV())" +
".dedup().where(without('src')).simplePath()).emit().toList();";
case TAG_PROPAGATION_IMPACTED_INSTANCES_EXCLUDE_RELATIONSHIP:
return "g.V().has('__guid', guid).aggregate('src')" +
".repeat(union(outE().has('__state', 'ACTIVE').has('tagPropagation', within('ONE_TO_TWO', 'BOTH')).has('_r__guid', neq(guidRelationshipToExclude))" +
".not(has('blockedPropagatedClassifications', org.janusgraph.core.attribute.Text.textContains(classificationId))).inV(), " +
"inE().has('__state', 'ACTIVE').has('tagPropagation', within('TWO_TO_ONE', 'BOTH')).has('_r__guid', neq(guidRelationshipToExclude))" +
".not(has('blockedPropagatedClassifications', org.janusgraph.core.attribute.Text.textContains(classificationId))).outV())" +
".dedup().where(without('src')).simplePath()).emit().toList();";
}
return super.getQuery(gremlinQuery);
}
......
......@@ -86,6 +86,7 @@ public abstract class AtlasGremlinQueryProvider {
TAG_PROPAGATION_IMPACTED_INSTANCES,
TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL,
TAG_PROPAGATION_IMPACTED_INSTANCES_WITH_RESTRICTIONS
TAG_PROPAGATION_IMPACTED_INSTANCES_WITH_RESTRICTIONS,
TAG_PROPAGATION_IMPACTED_INSTANCES_EXCLUDE_RELATIONSHIP
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment