Commit 8063de4a by Sarath Subramanian

ATLAS-2558: Reevaluate propagated tags based on blocked classifications in relationship edge

parent f622751d
......@@ -79,6 +79,7 @@ import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_STATE_PR
import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_NAME_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_NAME_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_PROPAGATE_KEY;
import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.BOTH;
......@@ -86,6 +87,7 @@ import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelation
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TAG_PROPAGATION_IMPACTED_INSTANCES;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TAG_PROPAGATION_IMPACTED_INSTANCES_WITH_RESTRICTIONS;
/**
* Utility class for graph operations.
......@@ -479,6 +481,22 @@ public final class GraphHelper {
return ret;
}
public static List<AtlasVertex> getAllPropagatedEntityVertices(AtlasVertex classificationVertex) {
List<AtlasVertex> ret = new ArrayList<>();
if (classificationVertex != null) {
List<AtlasEdge> edges = getPropagatedEdges(classificationVertex);
if (CollectionUtils.isNotEmpty(edges)) {
for (AtlasEdge edge : edges) {
ret.add(edge.getOutVertex());
}
}
}
return ret;
}
public static Iterator<AtlasEdge> getIncomingEdgesByLabel(AtlasVertex instanceVertex, String edgeLabel) {
return getAdjacentEdgesByLabel(instanceVertex, AtlasEdgeDirection.IN, edgeLabel);
}
......@@ -849,6 +867,53 @@ public final class GraphHelper {
return ret;
}
public List<AtlasVertex> getPropagatedEntityVertices(AtlasVertex classificationVertex) throws AtlasBaseException {
List<AtlasVertex> ret = new ArrayList<>();
if (classificationVertex != null) {
String entityGuid = getClassificationEntityGuid(classificationVertex);
String classificationId = classificationVertex.getIdForDisplay();
List<AtlasVertex> impactedEntityVertices = getAllPropagatedEntityVertices(classificationVertex);
List<AtlasVertex> impactedEntityVerticesWithRestrictions = getImpactedVerticesWithRestrictions(entityGuid, classificationId);
if (impactedEntityVertices.size() > impactedEntityVerticesWithRestrictions.size()) {
ret = (List<AtlasVertex>) CollectionUtils.subtract(impactedEntityVertices, impactedEntityVerticesWithRestrictions);
} else {
ret = (List<AtlasVertex>) CollectionUtils.subtract(impactedEntityVerticesWithRestrictions, impactedEntityVertices);
}
}
return ret;
}
public List<AtlasVertex> getImpactedVerticesWithRestrictions(String guid, String classificationId) throws AtlasBaseException {
ScriptEngine scriptEngine = graph.getGremlinScriptEngine();
Bindings bindings = scriptEngine.createBindings();
String query = queryProvider.getQuery(TAG_PROPAGATION_IMPACTED_INSTANCES_WITH_RESTRICTIONS);
List<AtlasVertex> ret = new ArrayList<>();
bindings.put("g", graph);
bindings.put("guid", guid);
bindings.put("classificationId", classificationId);
try {
Object resultObj = graph.executeGremlinScript(scriptEngine, bindings, query, false);
if (resultObj instanceof List && CollectionUtils.isNotEmpty((List) resultObj)) {
List<?> results = (List) resultObj;
Object firstElement = results.get(0);
if (firstElement instanceof AtlasVertex) {
ret = (List<AtlasVertex>) results;
}
}
} catch (ScriptException e) {
throw new AtlasBaseException(AtlasErrorCode.GREMLIN_SCRIPT_EXECUTION_FAILED, e);
}
return ret;
}
public List<AtlasVertex> getImpactedVerticesWithReferences(String guid, String relationshipGuid) throws AtlasBaseException {
ScriptEngine scriptEngine = graph.getGremlinScriptEngine();
Bindings bindings = scriptEngine.createBindings();
......@@ -936,6 +1001,49 @@ public final class GraphHelper {
return ret;
}
public static List<AtlasVertex> getClassificationVertices(AtlasEdge edge) {
List<AtlasVertex> ret = new ArrayList<>();
if (edge != null) {
PropagateTags propagateTags = getPropagateTags(edge);
AtlasVertex outVertex = edge.getOutVertex();
AtlasVertex inVertex = edge.getInVertex();
if (propagateTags == PropagateTags.ONE_TO_TWO || propagateTags == PropagateTags.BOTH) {
ret.addAll(getPropagationEnabledClassificationVertices(outVertex));
}
if (propagateTags == PropagateTags.TWO_TO_ONE || propagateTags == PropagateTags.BOTH) {
ret.addAll(getPropagationEnabledClassificationVertices(inVertex));
}
}
return ret;
}
public static List<AtlasVertex> getPropagationEnabledClassificationVertices(AtlasVertex entityVertex) {
List<AtlasVertex> ret = new ArrayList<>();
Iterable edges = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL).edges();
if (edges != null) {
Iterator<AtlasEdge> iterator = edges.iterator();
while (iterator.hasNext()) {
AtlasEdge edge = iterator.next();
if (edge != null) {
AtlasVertex classificationVertex = edge.getInVertex();
if (isPropagationEnabled(classificationVertex)) {
ret.add(classificationVertex);
}
}
}
}
return ret;
}
public static List<AtlasEdge> getClassificationEdges(AtlasVertex entityVertex) {
return getClassificationEdges(entityVertex, false);
}
......@@ -1033,6 +1141,14 @@ public final class GraphHelper {
return (getState(element) == Id.EntityState.DELETED) ? AtlasRelationship.Status.DELETED : AtlasRelationship.Status.ACTIVE;
}
public static String getClassificationName(AtlasVertex classificationVertex) {
return AtlasGraphUtilsV1.getProperty(classificationVertex, CLASSIFICATION_VERTEX_NAME_KEY, String.class);
}
public static String getClassificationEntityGuid(AtlasVertex classificationVertex) {
return AtlasGraphUtilsV1.getProperty(classificationVertex, CLASSIFICATION_ENTITY_GUID, String.class);
}
public static AtlasClassification.PropagationState getClassificationEdgeState(AtlasEdge edge) {
AtlasClassification.PropagationState ret = null;
......
......@@ -33,6 +33,7 @@ import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasMapType;
import org.apache.atlas.type.AtlasStructType;
......@@ -49,6 +50,7 @@ import java.util.*;
import static org.apache.atlas.model.instance.AtlasClassification.PropagationState.ACTIVE;
import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_NAME_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TRAIT_NAMES_PROPERTY_KEY;
......@@ -56,6 +58,9 @@ import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX;
import static org.apache.atlas.repository.graph.GraphHelper.addToPropagatedTraitNames;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdgeState;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdges;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityGuid;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationName;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedClassificationEdge;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedEdges;
import static org.apache.atlas.repository.graph.GraphHelper.getTraitNames;
import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
......@@ -353,36 +358,94 @@ public abstract class DeleteHandlerV1 {
}
}
public List<AtlasVertex> addTagPropagation(AtlasVertex classificationVertex, List<AtlasVertex> propagatedEntityVertices) {
List<AtlasVertex> ret = null;
if (CollectionUtils.isNotEmpty(propagatedEntityVertices) && classificationVertex != null) {
String classificationName = getTypeName(classificationVertex);
AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classificationName);
for (AtlasVertex propagatedEntityVertex : propagatedEntityVertices) {
AtlasEdge existingEdge = getPropagatedClassificationEdge(propagatedEntityVertex, classificationVertex);
if (existingEdge != null) {
continue;
}
String entityTypeName = getTypeName(propagatedEntityVertex);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName);
if (classificationType.canApplyToEntityType(entityType)) {
if (LOG.isDebugEnabled()) {
LOG.debug(" --> Adding propagated classification: [{}] to {} ({}) using edge label: [{}]", classificationName, getTypeName(propagatedEntityVertex),
GraphHelper.getGuid(propagatedEntityVertex), CLASSIFICATION_LABEL);
}
if (ret == null) {
ret = new ArrayList<>();
}
ret.add(propagatedEntityVertex);
graphHelper.addClassificationEdge(propagatedEntityVertex, classificationVertex, true);
addToPropagatedTraitNames(propagatedEntityVertex, classificationName);
}
}
}
return ret;
}
public List<AtlasVertex> removeTagPropagation(AtlasVertex classificationVertex) throws AtlasBaseException {
List<AtlasVertex> ret = new ArrayList<>();
if (classificationVertex != null) {
String classificationName = getTypeName(classificationVertex);
List<AtlasEdge> propagatedEdges = getPropagatedEdges(classificationVertex);
List<AtlasEdge> propagatedEdges = getPropagatedEdges(classificationVertex);
if (CollectionUtils.isNotEmpty(propagatedEdges)) {
for (AtlasEdge propagatedEdge : propagatedEdges) {
AtlasVertex propagatedEntityVertex = propagatedEdge.getOutVertex();
deletePropagatedEdge(propagatedEdge);
if (LOG.isDebugEnabled()) {
LOG.debug("Removing propagated classification: [{}] from: [{}][{}] with edge label: [{}]", classificationName,
getTypeName(propagatedEntityVertex), GraphHelper.getGuid(propagatedEntityVertex), CLASSIFICATION_LABEL);
}
ret.add(propagatedEdge.getOutVertex());
}
}
}
if (getClassificationEdgeState(propagatedEdge) == ACTIVE) {
removeFromPropagatedTraitNames(propagatedEntityVertex, classificationName);
}
return ret;
}
deleteEdge(propagatedEdge, true);
public void removeTagPropagation(AtlasVertex classificationVertex, List<AtlasVertex> entityVertices) throws AtlasBaseException {
if (classificationVertex != null && CollectionUtils.isNotEmpty(entityVertices)) {
String classificationName = getClassificationName(classificationVertex);
String entityGuid = getClassificationEntityGuid(classificationVertex);
updateModificationMetadata(propagatedEntityVertex);
for (AtlasVertex entityVertex : entityVertices) {
AtlasEdge propagatedEdge = getPropagatedClassificationEdge(entityVertex, classificationName, entityGuid);
ret.add(propagatedEntityVertex);
if (propagatedEdge != null) {
deletePropagatedEdge(propagatedEdge);
}
}
}
}
return ret;
public void deletePropagatedEdge(AtlasEdge edge) throws AtlasBaseException {
String classificationName = AtlasGraphUtilsV1.getProperty(edge, CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, String.class);
AtlasVertex entityVertex = edge.getOutVertex();
if (LOG.isDebugEnabled()) {
LOG.debug("Removing propagated classification: [{}] from: [{}][{}] with edge label: [{}]", classificationName,
getTypeName(entityVertex), GraphHelper.getGuid(entityVertex), CLASSIFICATION_LABEL);
}
if (getClassificationEdgeState(edge) == ACTIVE) {
removeFromPropagatedTraitNames(entityVertex, classificationName);
}
deleteEdge(edge, true);
updateModificationMetadata(entityVertex);
}
private void removeFromPropagatedTraitNames(AtlasVertex entityVertex, String classificationName) {
......
......@@ -1362,7 +1362,7 @@ public class EntityGraphMapper {
LOG.debug("Propagating tag: [{}][{}] to {}", classificationName, entityTypeName, getTypeNames(entitiesToPropagateTo));
}
List<AtlasVertex> entitiesPropagatedTo = addTagPropagation(classificationVertex, entitiesToPropagateTo);
List<AtlasVertex> entitiesPropagatedTo = deleteHandler.addTagPropagation(classificationVertex, entitiesToPropagateTo);
if (entitiesPropagatedTo != null) {
for (AtlasVertex entityPropagatedTo : entitiesPropagatedTo) {
......@@ -1427,7 +1427,7 @@ public class EntityGraphMapper {
// remove classification from propagated entities if propagation is turned on
if (isPropagationEnabled(classificationVertex)) {
List<AtlasVertex> impactedVertices = removeTagPropagation(classificationVertex);
List<AtlasVertex> impactedVertices = deleteHandler.removeTagPropagation(classificationVertex);
if (CollectionUtils.isNotEmpty(impactedVertices)) {
for (AtlasVertex impactedVertex : impactedVertices) {
......@@ -1567,7 +1567,7 @@ public class EntityGraphMapper {
}
}
List<AtlasVertex> entitiesPropagatedTo = addTagPropagation(classificationVertex, entitiesToPropagateTo);
List<AtlasVertex> entitiesPropagatedTo = deleteHandler.addTagPropagation(classificationVertex, entitiesToPropagateTo);
if (entitiesPropagatedTo != null) {
for (AtlasVertex entityPropagatedTo : entitiesPropagatedTo) {
......@@ -1576,7 +1576,7 @@ public class EntityGraphMapper {
}
}
} else {
List<AtlasVertex> impactedVertices = removeTagPropagation(classificationVertex);
List<AtlasVertex> impactedVertices = deleteHandler.removeTagPropagation(classificationVertex);
if (CollectionUtils.isNotEmpty(impactedVertices)) {
if (removedPropagations == null) {
......@@ -1670,49 +1670,6 @@ public class EntityGraphMapper {
}
}
private List<AtlasVertex> addTagPropagation(AtlasVertex classificationVertex, List<AtlasVertex> propagatedEntityVertices) {
List<AtlasVertex> ret = null;
if (CollectionUtils.isNotEmpty(propagatedEntityVertices) && classificationVertex != null) {
String classificationName = getTypeName(classificationVertex);
AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classificationName);
for (AtlasVertex propagatedEntityVertex : propagatedEntityVertices) {
AtlasEdge existingEdge = getPropagatedClassificationEdge(propagatedEntityVertex, classificationVertex);
if (existingEdge != null) {
continue;
}
String entityTypeName = getTypeName(propagatedEntityVertex);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName);
if (classificationType.canApplyToEntityType(entityType)) {
if (LOG.isDebugEnabled()) {
LOG.debug(" --> Adding propagated classification: [{}] to {} ({}) using edge label: [{}]", classificationName, getTypeName(propagatedEntityVertex),
GraphHelper.getGuid(propagatedEntityVertex), CLASSIFICATION_LABEL);
}
if (ret == null) {
ret = new ArrayList<>();
}
ret.add(propagatedEntityVertex);
graphHelper.addClassificationEdge(propagatedEntityVertex, classificationVertex, true);
addToPropagatedTraitNames(propagatedEntityVertex, classificationName);
}
}
}
return ret;
}
private List<AtlasVertex> removeTagPropagation(AtlasVertex classificationVertex) throws AtlasBaseException {
return deleteHandler.removeTagPropagation(classificationVertex);
}
private AtlasEdge mapClassification(EntityOperation operation, final EntityMutationContext context, AtlasClassification classification,
AtlasEntityType entityType, AtlasVertex parentInstanceVertex, AtlasVertex traitInstanceVertex)
throws AtlasBaseException {
......
......@@ -81,7 +81,27 @@ import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_VALIDITY_PERIODS_KEY;
import static org.apache.atlas.repository.Constants.TERM_ASSIGNMENT_LABEL;
import static org.apache.atlas.repository.graph.GraphHelper.*;
import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX;
import static org.apache.atlas.repository.graph.GraphHelper.addToPropagatedTraitNames;
import static org.apache.atlas.repository.graph.GraphHelper.getAdjacentEdgesByLabel;
import static org.apache.atlas.repository.graph.GraphHelper.getAllClassificationEdges;
import static org.apache.atlas.repository.graph.GraphHelper.getAllTraitNames;
import static org.apache.atlas.repository.graph.GraphHelper.getAssociatedEntityVertex;
import static org.apache.atlas.repository.graph.GraphHelper.getBlockedClassificationIds;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdge;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdgeState;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertices;
import static org.apache.atlas.repository.graph.GraphHelper.getGuid;
import static org.apache.atlas.repository.graph.GraphHelper.getIncomingEdgesByLabel;
import static org.apache.atlas.repository.graph.GraphHelper.getOutGoingEdgesByLabel;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedClassificationEdge;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagationEnabledClassificationVertices;
import static org.apache.atlas.repository.graph.GraphHelper.getRelationshipGuid;
import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
import static org.apache.atlas.repository.graph.GraphHelper.isPropagatedClassificationEdge;
import static org.apache.atlas.repository.graph.GraphHelper.isPropagationEnabled;
import static org.apache.atlas.repository.graph.GraphHelper.removeFromPropagatedTraitNames;
import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getIdFromVertex;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.BOTH;
......@@ -455,29 +475,6 @@ public final class EntityGraphRetriever {
}
}
public List<AtlasVertex> getPropagationEnabledClassificationVertices(AtlasVertex entityVertex) {
List<AtlasVertex> ret = new ArrayList<>();
Iterable edges = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL).edges();
if (edges != null) {
Iterator<AtlasEdge> iterator = edges.iterator();
while (iterator.hasNext()) {
AtlasEdge edge = iterator.next();
if (edge != null) {
AtlasVertex classificationVertex = edge.getInVertex();
if (isPropagationEnabled(classificationVertex)) {
ret.add(classificationVertex);
}
}
}
}
return ret;
}
public List<AtlasClassification> getAllClassifications(AtlasVertex entityVertex) throws AtlasBaseException {
List<AtlasClassification> ret = new ArrayList<>();
Iterable edges = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL).edges();
......@@ -1033,24 +1030,6 @@ public final class EntityGraphRetriever {
relationship.setBlockedPropagatedClassifications(blockedClassifications);
}
public List<AtlasVertex> getClassificationVertices(AtlasEdge edge) {
List<AtlasVertex> ret = new ArrayList<>();
if (edge != null) {
PropagateTags propagateTags = getPropagateTags(edge);
if (propagateTags == PropagateTags.ONE_TO_TWO || propagateTags == PropagateTags.BOTH) {
ret.addAll(getPropagationEnabledClassificationVertices(edge.getOutVertex()));
}
if (propagateTags == PropagateTags.TWO_TO_ONE || propagateTags == PropagateTags.BOTH) {
ret.addAll(getPropagationEnabledClassificationVertices(edge.getInVertex()));
}
}
return ret;
}
private void mapAttributes(AtlasEdge edge, AtlasRelationship relationship) throws AtlasBaseException {
AtlasType objType = typeRegistry.getType(relationship.getTypeName());
......
......@@ -66,6 +66,12 @@ public class AtlasGremlin3QueryProvider extends AtlasGremlin2QueryProvider {
"inE().has('__state', 'ACTIVE').has('tagPropagation', within('TWO_TO_ONE', 'BOTH')).outV())" +
".dedup().where(without('src')).simplePath()).emit().toList();";
case TAG_PROPAGATION_IMPACTED_INSTANCES_WITH_RESTRICTIONS:
return "g.V().has('__guid', guid).aggregate('src')" +
".repeat(union(outE().has('__state', 'ACTIVE').has('tagPropagation', within('ONE_TO_TWO', 'BOTH')).not(has('blockedPropagatedClassifications', org.janusgraph.core.attribute.Text.textContains(classificationId))).inV(), " +
"inE().has('__state', 'ACTIVE').has('tagPropagation', within('TWO_TO_ONE', 'BOTH')).not(has('blockedPropagatedClassifications', org.janusgraph.core.attribute.Text.textContains(classificationId))).outV())" +
".dedup().where(without('src')).simplePath()).emit().toList();";
case TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL:
return "g.V().has('__guid', guid).aggregate('src')" +
".repeat(union(outE().has('__state', 'ACTIVE').has('tagPropagation', within('ONE_TO_TWO', 'BOTH')).has('_r__guid', neq(relationshipGuid)).inV(), " +
......
......@@ -84,6 +84,7 @@ public abstract class AtlasGremlinQueryProvider {
COMPARE_NOT_NULL,
TAG_PROPAGATION_IMPACTED_INSTANCES,
TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL
TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL,
TAG_PROPAGATION_IMPACTED_INSTANCES_WITH_RESTRICTIONS
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment