Commit 7423addb by Sarath Subramanian

ATLAS-3563: Improve tag propagation performance using in-memory traversal

parent cf455a52
......@@ -37,6 +37,16 @@ public interface AtlasVertex<V, E> extends AtlasElement {
/**
* Gets the edges associated with this vertex going the
* specified direction that have the specified edgeLabels.
*
* @param direction
* @param edgeLabels
* @return
*/
Iterable<AtlasEdge<V, E>> getEdges(AtlasEdgeDirection direction, String[] edgeLabels);
/**
* Gets the edges associated with this vertex going the
* specified direction.
*
* @param in
......
......@@ -70,6 +70,14 @@ public class AtlasJanusVertex extends AtlasJanusElement<Vertex> implements Atlas
return graph.wrapEdges(edges);
}
@Override
public Iterable<AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>> getEdges(AtlasEdgeDirection dir, String[] edgeLabels) {
Direction direction = AtlasJanusObjectFactory.createDirection(dir);
Iterator<Edge> edges = getWrappedElement().edges(direction, edgeLabels);
return graph.wrapEdges(edges);
}
private JanusGraphVertex getAsJanusVertex() {
return (JanusGraphVertex)getWrappedElement();
}
......
......@@ -160,6 +160,7 @@ public enum AtlasErrorCode {
INVALID_CUSTOM_ATTRIBUTE_VALUE(400, "ATLAS-400-00-9A", "Invalid value: {0} in custom attribute, value length is greater than {1}"),
INVALID_LABEL_LENGTH(400, "ATLAS-400-00-9B", "Invalid label: {0}, label size should not be greater than {1}"),
INVALID_LABEL_CHARACTERS(400, "ATLAS-400-00-9C", "Invalid label: {0}, label should contain alphanumeric characters, '_' or '-'"),
INVALID_PROPAGATION_TYPE(400, "ATLAS-400-00-9D", "Invalid propagation {0} for relationship-type={1}. Default value is {2}"),
UNAUTHORIZED_ACCESS(403, "ATLAS-403-00-001", "{0} is not authorized to perform {1}"),
......
......@@ -19,7 +19,6 @@ package org.apache.atlas.type;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_DATE;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_INT;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_LONG;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_STRING;
import static org.apache.atlas.type.Constants.*;
......@@ -29,6 +28,7 @@ import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasEntityDef.AtlasRelationshipAttributeDef;
import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType;
import org.apache.atlas.utils.AtlasEntityUtil;
......@@ -87,6 +87,7 @@ public class AtlasEntityType extends AtlasStructType {
private List<AtlasAttribute> dynAttributes = Collections.emptyList();
private List<AtlasAttribute> dynEvalTriggerAttributes = Collections.emptyList();
private Map<String,List<TemplateToken>> parsedTemplates = Collections.emptyMap();
private Set<String> tagPropagationEdges = Collections.emptySet();
public AtlasEntityType(AtlasEntityDef entityDef) {
super(entityDef);
......@@ -138,6 +139,7 @@ public class AtlasEntityType extends AtlasStructType {
this.allSubTypes = new HashSet<>(); // this will be populated in resolveReferencesPhase2()
this.typeAndAllSubTypes = new HashSet<>(); // this will be populated in resolveReferencesPhase2()
this.relationshipAttributes = new HashMap<>(); // this will be populated in resolveReferencesPhase3()
this.tagPropagationEdges = new HashSet<>(); // this will be populated in resolveReferencesPhase2()
this.typeAndAllSubTypes.add(this.getTypeName());
......@@ -230,6 +232,8 @@ public class AtlasEntityType extends AtlasStructType {
}
}
}
tagPropagationEdges.addAll(superType.tagPropagationEdges);
}
ownedRefAttributes = new ArrayList<>();
......@@ -254,6 +258,7 @@ public class AtlasEntityType extends AtlasStructType {
typeAndAllSubTypesQryStr = ""; // will be computed on next access
relationshipAttributes = Collections.unmodifiableMap(relationshipAttributes);
ownedRefAttributes = Collections.unmodifiableList(ownedRefAttributes);
tagPropagationEdges = Collections.unmodifiableSet(tagPropagationEdges);
entityDef.setSubTypes(subTypes);
......@@ -275,6 +280,8 @@ public class AtlasEntityType extends AtlasStructType {
this.parsedTemplates = parseDynAttributeTemplates();
populateDynFlagsInfo();
LOG.info("resolveReferencesPhase3({}): tagPropagationEdges={}", getTypeName(), tagPropagationEdges);
}
public Set<String> getSuperTypes() {
......@@ -347,6 +354,14 @@ public class AtlasEntityType extends AtlasStructType {
@VisibleForTesting
public void setDynEvalTriggerAttributes(List<AtlasAttribute> dynEvalTriggerAttributes) { this.dynEvalTriggerAttributes = dynEvalTriggerAttributes; }
public Set<String> getTagPropagationEdges() {
return this.tagPropagationEdges;
}
public String[] getTagPropagationEdgesArray() {
return CollectionUtils.isNotEmpty(tagPropagationEdges) ? tagPropagationEdges.toArray(new String[tagPropagationEdges.size()]) : null;
}
public Map<String,List<TemplateToken>> getParsedTemplates() { return parsedTemplates; }
public AtlasAttribute getRelationshipAttribute(String attributeName, String relationshipType) {
......@@ -377,6 +392,38 @@ public class AtlasEntityType extends AtlasStructType {
}
attributes.put(relationshipType.getTypeName(), attribute);
// determine if tags from this entity-type propagate via this relationship
PropagateTags propagation = relationshipType.getRelationshipDef().getPropagateTags();
if (propagation == null) {
propagation = PropagateTags.NONE;
}
final boolean propagatesTags;
switch (propagation) {
case BOTH:
propagatesTags = true;
break;
case ONE_TO_TWO:
propagatesTags = StringUtils.equals(relationshipType.getEnd1Type().getTypeName(), getTypeName());
break;
case TWO_TO_ONE:
propagatesTags = StringUtils.equals(relationshipType.getEnd2Type().getTypeName(), getTypeName());
break;
case NONE:
default:
propagatesTags = false;
break;
}
if (propagatesTags) {
tagPropagationEdges.add(relationshipType.getRelationshipLabel());
}
}
public Set<String> getAttributeRelationshipTypes(String attributeName) {
......
......@@ -60,9 +60,6 @@ import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.script.Bindings;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
......@@ -84,10 +81,6 @@ import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isRef
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.BOTH;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TAG_PROPAGATION_IMPACTED_INSTANCES;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TAG_PROPAGATION_IMPACTED_INSTANCES_EXCLUDE_RELATIONSHIP;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TAG_PROPAGATION_IMPACTED_INSTANCES_WITH_RESTRICTIONS;
/**
* Utility class for graph operations.
......@@ -728,130 +721,6 @@ public final class GraphHelper {
return result;
}
public List<AtlasVertex> getIncludedImpactedVerticesWithReferences(AtlasVertex entityVertex, String relationshipGuid) throws AtlasBaseException {
List<AtlasVertex> ret = new ArrayList<>();
List<AtlasVertex> impactedVertices = getImpactedVerticesWithReferences(getGuid(entityVertex), relationshipGuid);
ret.add(entityVertex);
if (CollectionUtils.isNotEmpty(impactedVertices)) {
ret.addAll(impactedVertices);
}
return ret;
}
public List<AtlasVertex> getImpactedVertices(String guid) throws AtlasBaseException {
ScriptEngine scriptEngine = graph.getGremlinScriptEngine();
Bindings bindings = scriptEngine.createBindings();
String query = queryProvider.getQuery(TAG_PROPAGATION_IMPACTED_INSTANCES);
List<AtlasVertex> ret = new ArrayList<>();
bindings.put("g", graph);
bindings.put("guid", guid);
try {
Object resultObj = graph.executeGremlinScript(scriptEngine, bindings, query, false);
if (resultObj instanceof List && CollectionUtils.isNotEmpty((List) resultObj)) {
List<?> results = (List) resultObj;
Object firstElement = results.get(0);
if (firstElement instanceof AtlasVertex) {
ret = (List<AtlasVertex>) results;
}
}
} catch (ScriptException e) {
throw new AtlasBaseException(AtlasErrorCode.GREMLIN_SCRIPT_EXECUTION_FAILED, e);
}
return ret;
}
public List<AtlasVertex> getPropagatedEntityVertices(AtlasVertex classificationVertex) throws AtlasBaseException {
List<AtlasVertex> ret = new ArrayList<>();
if (classificationVertex != null) {
String entityGuid = getClassificationEntityGuid(classificationVertex);
String classificationId = classificationVertex.getIdForDisplay();
List<AtlasVertex> impactedEntityVertices = getAllPropagatedEntityVertices(classificationVertex);
List<AtlasVertex> impactedEntityVerticesWithRestrictions = getImpactedVerticesWithRestrictions(entityGuid, classificationId);
if (impactedEntityVertices.size() > impactedEntityVerticesWithRestrictions.size()) {
ret = (List<AtlasVertex>) CollectionUtils.subtract(impactedEntityVertices, impactedEntityVerticesWithRestrictions);
} else {
ret = (List<AtlasVertex>) CollectionUtils.subtract(impactedEntityVerticesWithRestrictions, impactedEntityVertices);
}
}
return ret;
}
public List<AtlasVertex> getImpactedVerticesWithRestrictions(String guid, String classificationId) throws AtlasBaseException {
return getImpactedVerticesWithRestrictions(guid, classificationId, null);
}
public List<AtlasVertex> getImpactedVerticesWithRestrictions(String guid, String classificationId, String guidRelationshipToExclude) throws AtlasBaseException {
ScriptEngine scriptEngine = graph.getGremlinScriptEngine();
Bindings bindings = scriptEngine.createBindings();
List<AtlasVertex> ret = new ArrayList<>();
String query = queryProvider.getQuery(TAG_PROPAGATION_IMPACTED_INSTANCES_WITH_RESTRICTIONS);
bindings.put("g", graph);
bindings.put("guid", guid);
bindings.put("classificationId", classificationId);
if (guidRelationshipToExclude != null) {
query = queryProvider.getQuery(TAG_PROPAGATION_IMPACTED_INSTANCES_EXCLUDE_RELATIONSHIP);
bindings.put("guidRelationshipToExclude", guidRelationshipToExclude);
}
try {
Object resultObj = graph.executeGremlinScript(scriptEngine, bindings, query, false);
if (resultObj instanceof List && CollectionUtils.isNotEmpty((List) resultObj)) {
List<?> results = (List) resultObj;
Object firstElement = results.get(0);
if (firstElement instanceof AtlasVertex) {
ret = (List<AtlasVertex>) results;
}
}
} catch (ScriptException e) {
throw new AtlasBaseException(AtlasErrorCode.GREMLIN_SCRIPT_EXECUTION_FAILED, e);
}
return ret;
}
public List<AtlasVertex> getImpactedVerticesWithReferences(String guid, String relationshipGuid) throws AtlasBaseException {
ScriptEngine scriptEngine = graph.getGremlinScriptEngine();
Bindings bindings = scriptEngine.createBindings();
String query = queryProvider.getQuery(TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL);
List<AtlasVertex> ret = new ArrayList<>();
bindings.put("g", graph);
bindings.put("guid", guid);
bindings.put("relationshipGuid", relationshipGuid);
try {
Object resultObj = graph.executeGremlinScript(scriptEngine, bindings, query, false);
if (resultObj instanceof List && CollectionUtils.isNotEmpty((List) resultObj)) {
List<?> results = (List) resultObj;
Object firstElement = results.get(0);
if (firstElement instanceof AtlasVertex) {
ret = (List<AtlasVertex>) results;
}
}
} catch (ScriptException e) {
throw new AtlasBaseException(AtlasErrorCode.GREMLIN_SCRIPT_EXECUTION_FAILED, e);
}
return ret;
}
/**
* Finds the Vertices that correspond to the given GUIDs. GUIDs
* that are not found in the graph will not be in the map.
......@@ -915,7 +784,7 @@ public final class GraphHelper {
return ret;
}
public static List<AtlasVertex> getClassificationVertices(AtlasEdge edge) {
public static List<AtlasVertex> getPropagatableClassifications(AtlasEdge edge) {
List<AtlasVertex> ret = new ArrayList<>();
if (edge != null && getStatus(edge) != DELETED) {
......@@ -935,26 +804,6 @@ public final class GraphHelper {
return ret;
}
public Map<AtlasVertex, List<AtlasVertex>> getClassificationPropagatedEntitiesMapping(List<AtlasVertex> classificationVertices) throws AtlasBaseException {
return getClassificationPropagatedEntitiesMapping(classificationVertices, null);
}
public Map<AtlasVertex, List<AtlasVertex>> getClassificationPropagatedEntitiesMapping(List<AtlasVertex> classificationVertices, String guidRelationshipToExclude) throws AtlasBaseException {
Map<AtlasVertex, List<AtlasVertex>> ret = new HashMap<>();
if (CollectionUtils.isNotEmpty(classificationVertices)) {
for (AtlasVertex classificationVertex : classificationVertices) {
String classificationId = classificationVertex.getIdForDisplay();
String sourceEntityId = getClassificationEntityGuid(classificationVertex);
List<AtlasVertex> entitiesPropagatingTo = getImpactedVerticesWithRestrictions(sourceEntityId, classificationId, guidRelationshipToExclude);
ret.put(classificationVertex, entitiesPropagatingTo);
}
}
return ret;
}
public static List<AtlasVertex> getPropagationEnabledClassificationVertices(AtlasVertex entityVertex) {
List<AtlasVertex> ret = new ArrayList<>();
Iterable edges = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL).edges();
......
......@@ -402,7 +402,7 @@ public abstract class DeleteHandlerV1 {
private void addTagPropagation(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasEdge edge) throws AtlasBaseException {
final List<AtlasVertex> classificationVertices = getPropagationEnabledClassificationVertices(fromVertex);
final List<AtlasVertex> propagatedEntityVertices = CollectionUtils.isNotEmpty(classificationVertices) ? graphHelper.getIncludedImpactedVerticesWithReferences(toVertex, getRelationshipGuid(edge)) : null;
final List<AtlasVertex> propagatedEntityVertices = CollectionUtils.isNotEmpty(classificationVertices) ? entityRetriever.getIncludedImpactedVerticesV2(toVertex, getRelationshipGuid(edge)) : null;
if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) {
if (LOG.isDebugEnabled()) {
......@@ -490,9 +490,9 @@ public abstract class DeleteHandlerV1 {
return;
}
List<AtlasVertex> currentClassificationVertices = getClassificationVertices(edge);
Map<AtlasVertex, List<AtlasVertex>> currentClassificationsMap = graphHelper.getClassificationPropagatedEntitiesMapping(currentClassificationVertices);
Map<AtlasVertex, List<AtlasVertex>> updatedClassificationsMap = graphHelper.getClassificationPropagatedEntitiesMapping(currentClassificationVertices, getRelationshipGuid(edge));
List<AtlasVertex> currentClassificationVertices = getPropagatableClassifications(edge);
Map<AtlasVertex, List<AtlasVertex>> currentClassificationsMap = entityRetriever.getClassificationPropagatedEntitiesMapping(currentClassificationVertices);
Map<AtlasVertex, List<AtlasVertex>> updatedClassificationsMap = entityRetriever.getClassificationPropagatedEntitiesMapping(currentClassificationVertices, getRelationshipGuid(edge));
Map<AtlasVertex, List<AtlasVertex>> removePropagationsMap = new HashMap<>();
if (MapUtils.isNotEmpty(currentClassificationsMap) && MapUtils.isEmpty(updatedClassificationsMap)) {
......@@ -598,7 +598,7 @@ public abstract class DeleteHandlerV1 {
private void removeTagPropagation(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasEdge edge) throws AtlasBaseException {
final List<AtlasVertex> classificationVertices = getPropagationEnabledClassificationVertices(fromVertex);
final List<AtlasVertex> impactedEntityVertices = CollectionUtils.isNotEmpty(classificationVertices) ? graphHelper.getIncludedImpactedVerticesWithReferences(toVertex, getRelationshipGuid(edge)) : null;
final List<AtlasVertex> impactedEntityVertices = CollectionUtils.isNotEmpty(classificationVertices) ? entityRetriever.getIncludedImpactedVerticesV2(toVertex, getRelationshipGuid(edge)) : null;
if (CollectionUtils.isNotEmpty(impactedEntityVertices)) {
if (LOG.isDebugEnabled()) {
......@@ -608,7 +608,7 @@ public abstract class DeleteHandlerV1 {
for (AtlasVertex classificationVertex : classificationVertices) {
String classificationName = getTypeName(classificationVertex);
AtlasVertex associatedEntityVertex = getAssociatedEntityVertex(classificationVertex);
List<AtlasVertex> referrals = graphHelper.getIncludedImpactedVerticesWithReferences(associatedEntityVertex, getRelationshipGuid(edge));
List<AtlasVertex> referrals = entityRetriever.getIncludedImpactedVerticesV2(associatedEntityVertex, getRelationshipGuid(edge));
for (AtlasVertex impactedEntityVertex : impactedEntityVertices) {
if (referrals.contains(impactedEntityVertex)) {
......
......@@ -1757,7 +1757,7 @@ public class EntityGraphMapper {
if (propagateTags) {
// compute propagatedEntityVertices only once
if (entitiesToPropagateTo == null) {
entitiesToPropagateTo = graphHelper.getImpactedVertices(guid);
entitiesToPropagateTo = entityRetriever.getImpactedVerticesV2(entityVertex);
}
if (CollectionUtils.isNotEmpty(entitiesToPropagateTo)) {
......@@ -2092,7 +2092,7 @@ public class EntityGraphMapper {
if (updatedTagPropagation != null && currentTagPropagation != updatedTagPropagation) {
if (updatedTagPropagation) {
if (CollectionUtils.isEmpty(entitiesToPropagateTo)) {
entitiesToPropagateTo = graphHelper.getImpactedVerticesWithRestrictions(guid, classificationVertex.getIdForDisplay());
entitiesToPropagateTo = entityRetriever.getImpactedVerticesV2(entityVertex, null, classificationVertex.getIdForDisplay());
}
if (CollectionUtils.isNotEmpty(entitiesToPropagateTo)) {
......
......@@ -35,6 +35,7 @@ import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.model.instance.AtlasRelationship.AtlasRelationshipWithExtInfo;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.typedef.AtlasRelationshipDef;
import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags;
import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.repository.Constants;
......@@ -66,6 +67,7 @@ import javax.inject.Inject;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
......@@ -97,6 +99,9 @@ import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_INT;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_LONG;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_SHORT;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_STRING;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.NONE;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.TWO_TO_ONE;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_VALIDITY_PERIODS_KEY;
......@@ -397,6 +402,137 @@ public class EntityGraphRetriever {
return ret;
}
public void evaluateClassificationPropagation(AtlasVertex classificationVertex, List<AtlasVertex> entitiesToAddPropagation, List<AtlasVertex> entitiesToRemovePropagation) {
if (classificationVertex != null) {
String entityGuid = getClassificationEntityGuid(classificationVertex);
AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(entityGuid);
String classificationId = classificationVertex.getIdForDisplay();
List<AtlasVertex> propagatedEntities = getAllPropagatedEntityVertices(classificationVertex);
List<AtlasVertex> impactedEntities = getImpactedVerticesV2(entityVertex, null, classificationId);
List<AtlasVertex> entityVertices = (List<AtlasVertex>) CollectionUtils.subtract(propagatedEntities, impactedEntities);
if (CollectionUtils.isNotEmpty(entityVertices)) {
entitiesToRemovePropagation.addAll(entityVertices);
}
entityVertices = (List<AtlasVertex>) CollectionUtils.subtract(impactedEntities, propagatedEntities);
if (CollectionUtils.isNotEmpty(entityVertices)) {
entitiesToAddPropagation.addAll(entityVertices);
}
}
}
public Map<AtlasVertex, List<AtlasVertex>> getClassificationPropagatedEntitiesMapping(List<AtlasVertex> classificationVertices) {
return getClassificationPropagatedEntitiesMapping(classificationVertices, null);
}
public Map<AtlasVertex, List<AtlasVertex>> getClassificationPropagatedEntitiesMapping(List<AtlasVertex> classificationVertices, String relationshipGuidToExclude) {
Map<AtlasVertex, List<AtlasVertex>> ret = new HashMap<>();
if (CollectionUtils.isNotEmpty(classificationVertices)) {
for (AtlasVertex classificationVertex : classificationVertices) {
String classificationId = classificationVertex.getIdForDisplay();
String sourceEntityId = getClassificationEntityGuid(classificationVertex);
AtlasVertex sourceEntityVertex = AtlasGraphUtilsV2.findByGuid(sourceEntityId);
List<AtlasVertex> entitiesPropagatingTo = getImpactedVerticesV2(sourceEntityVertex, relationshipGuidToExclude, classificationId);
ret.put(classificationVertex, entitiesPropagatingTo);
}
}
return ret;
}
public List<AtlasVertex> getImpactedVerticesV2(AtlasVertex entityVertex) {
return getImpactedVerticesV2(entityVertex, null);
}
public List<AtlasVertex> getImpactedVerticesV2(AtlasVertex entityVertex, String relationshipGuidToExclude) {
List<AtlasVertex> ret = new ArrayList<>();
traverseImpactedVertices(entityVertex, relationshipGuidToExclude, null, new HashSet<>(), ret);
return ret;
}
public List<AtlasVertex> getIncludedImpactedVerticesV2(AtlasVertex entityVertex, String relationshipGuidToExclude) {
List<AtlasVertex> ret = new ArrayList<>(Arrays.asList(entityVertex));
traverseImpactedVertices(entityVertex, relationshipGuidToExclude, null, new HashSet<>(), ret);
return ret;
}
public List<AtlasVertex> getImpactedVerticesV2(AtlasVertex entityVertex, String relationshipGuidToExclude, String classificationId) {
List<AtlasVertex> ret = new ArrayList<>();
traverseImpactedVertices(entityVertex, relationshipGuidToExclude, classificationId, new HashSet<>(), ret);
return ret;
}
private void traverseImpactedVertices(AtlasVertex entityVertex, String relationshipGuidToExclude, String classificationId, Set<String> visitedVertices, List<AtlasVertex> result) {
visitedVertices.add(entityVertex.getIdForDisplay());
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(getTypeName(entityVertex));
String[] tagPropagationEdges = entityType != null ? entityType.getTagPropagationEdgesArray() : null;
if (tagPropagationEdges != null) {
Iterable<AtlasEdge> propagationEdges = entityVertex.getEdges(AtlasEdgeDirection.BOTH, tagPropagationEdges);
for (AtlasEdge propagationEdge : propagationEdges) {
PropagateTags tagPropagation = getPropagateTags(propagationEdge);
if (tagPropagation == null || tagPropagation == NONE) {
continue;
} else if (tagPropagation == TWO_TO_ONE) {
if (isOutVertex(entityVertex, propagationEdge)) {
continue;
}
} else if (tagPropagation == ONE_TO_TWO) {
if (!isOutVertex(entityVertex, propagationEdge)) {
continue;
}
}
if (relationshipGuidToExclude != null) {
if (StringUtils.equals(getRelationshipGuid(propagationEdge), relationshipGuidToExclude)) {
continue;
}
}
if (classificationId != null) {
List<String> blockedClassificationIds = getBlockedClassificationIds(propagationEdge);
if (CollectionUtils.isNotEmpty(blockedClassificationIds) && blockedClassificationIds.contains(classificationId)) {
continue;
}
}
AtlasVertex adjacentVertex = getOtherVertex(propagationEdge, entityVertex);
if (!visitedVertices.contains(adjacentVertex.getIdForDisplay())) {
result.add(adjacentVertex);
traverseImpactedVertices(adjacentVertex, relationshipGuidToExclude, classificationId, visitedVertices, result);
}
}
}
}
private boolean isOutVertex(AtlasVertex vertex, AtlasEdge edge) {
return StringUtils.equals(vertex.getIdForDisplay(), edge.getOutVertex().getIdForDisplay());
}
private AtlasVertex getOtherVertex(AtlasEdge edge, AtlasVertex vertex) {
AtlasVertex outVertex = edge.getOutVertex();
AtlasVertex inVertex = edge.getInVertex();
return StringUtils.equals(outVertex.getIdForDisplay(), vertex.getIdForDisplay()) ? inVertex : outVertex;
}
private AtlasVertex getEntityVertex(AtlasObjectId objId) throws AtlasBaseException {
AtlasVertex ret = null;
......@@ -1390,7 +1526,7 @@ public class EntityGraphRetriever {
}
private void readClassificationsFromEdge(AtlasEdge edge, AtlasRelationshipWithExtInfo relationshipWithExtInfo, boolean extendedInfo) throws AtlasBaseException {
List<AtlasVertex> classificationVertices = getClassificationVertices(edge);
List<AtlasVertex> classificationVertices = getPropagatableClassifications(edge);
List<String> blockedClassificationIds = getBlockedClassificationIds(edge);
AtlasRelationship relationship = relationshipWithExtInfo.getRelationship();
Set<AtlasClassification> propagatedClassifications = new HashSet<>();
......
......@@ -65,32 +65,6 @@ public class AtlasGremlin3QueryProvider extends AtlasGremlin2QueryProvider {
return "g.V().range(0,1).toList()";
case GREMLIN_SEARCH_RETURNS_EDGE_ID:
return "g.E().range(0,1).toList()";
case TAG_PROPAGATION_IMPACTED_INSTANCES:
return "g.V().has('__guid', guid).aggregate('src')" +
".repeat(union(outE().has('__state', 'ACTIVE').has('tagPropagation', within('ONE_TO_TWO', 'BOTH')).inV(), " +
"inE().has('__state', 'ACTIVE').has('tagPropagation', within('TWO_TO_ONE', 'BOTH')).outV())" +
".dedup().where(without('src')).simplePath()).emit().toList();";
case TAG_PROPAGATION_IMPACTED_INSTANCES_WITH_RESTRICTIONS:
return "g.V().has('__guid', guid).aggregate('src')" +
".repeat(union(outE().has('__state', 'ACTIVE').has('tagPropagation', within('ONE_TO_TWO', 'BOTH')).not(has('blockedPropagatedClassifications', org.janusgraph.core.attribute.Text.textContains(classificationId))).inV(), " +
"inE().has('__state', 'ACTIVE').has('tagPropagation', within('TWO_TO_ONE', 'BOTH')).not(has('blockedPropagatedClassifications', org.janusgraph.core.attribute.Text.textContains(classificationId))).outV())" +
".dedup().where(without('src')).simplePath()).emit().toList();";
case TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL:
return "g.V().has('__guid', guid).aggregate('src')" +
".repeat(union(outE().has('__state', 'ACTIVE').has('tagPropagation', within('ONE_TO_TWO', 'BOTH')).has('_r__guid', neq(relationshipGuid)).inV(), " +
"inE().has('__state', 'ACTIVE').has('tagPropagation', within('TWO_TO_ONE', 'BOTH')).has('_r__guid', neq(relationshipGuid)).outV())" +
".dedup().where(without('src')).simplePath()).emit().toList();";
case TAG_PROPAGATION_IMPACTED_INSTANCES_EXCLUDE_RELATIONSHIP:
return "g.V().has('__guid', guid).aggregate('src')" +
".repeat(union(outE().has('__state', 'ACTIVE').has('tagPropagation', within('ONE_TO_TWO', 'BOTH')).has('_r__guid', neq(guidRelationshipToExclude))" +
".not(has('blockedPropagatedClassifications', org.janusgraph.core.attribute.Text.textContains(classificationId))).inV(), " +
"inE().has('__state', 'ACTIVE').has('tagPropagation', within('TWO_TO_ONE', 'BOTH')).has('_r__guid', neq(guidRelationshipToExclude))" +
".not(has('blockedPropagatedClassifications', org.janusgraph.core.attribute.Text.textContains(classificationId))).outV())" +
".dedup().where(without('src')).simplePath()).emit().toList();";
}
return super.getQuery(gremlinQuery);
}
......
......@@ -85,10 +85,5 @@ public abstract class AtlasGremlinQueryProvider {
COMPARE_CONTAINS,
COMPARE_IS_NULL,
COMPARE_NOT_NULL,
TAG_PROPAGATION_IMPACTED_INSTANCES,
TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL,
TAG_PROPAGATION_IMPACTED_INSTANCES_WITH_RESTRICTIONS,
TAG_PROPAGATION_IMPACTED_INSTANCES_EXCLUDE_RELATIONSHIP
}
}
......@@ -18,6 +18,7 @@
package org.apache.atlas.repository.tagpropagation;
import com.vividsolutions.jts.util.Assert;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.TestModules;
import org.apache.atlas.discovery.AtlasLineageService;
......@@ -35,7 +36,6 @@ import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.impexp.ImportService;
import org.apache.atlas.repository.impexp.ZipFileResourceTestUtils;
import org.apache.atlas.repository.impexp.ZipSource;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
import org.apache.atlas.runner.LocalSolrRunner;
......@@ -49,7 +49,6 @@ import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
......@@ -320,20 +319,16 @@ public class ClassificationPropagationTest {
// validate tag2 is propagated to employees_union
assertClassificationExistInEntity(EMPLOYEES_UNION_TABLE, tag2);
//update propagation to BOTH for edge process3 --> employee_union
//update propagation to BOTH for edge process3 --> employee_union. This should fail
AtlasRelationship process3_employee_union_relationship = getRelationship(EMPLOYEES_UNION_PROCESS, EMPLOYEES_UNION_TABLE);
assertEquals(process3_employee_union_relationship.getPropagateTags(), ONE_TO_TWO);
process3_employee_union_relationship.setPropagateTags(BOTH);
relationshipStore.update(process3_employee_union_relationship);
// process3 should get 'tag4' from employee_union and employee_union should get tag3 from process3 (BOTH)
assertClassificationExistInEntity(EMPLOYEES_UNION_PROCESS, tag4);
assertClassificationExistInEntity(EMPLOYEES_UNION_TABLE, tag3);
//update propagation to ONE_TO_TWO for edge process3 --> employee_union
process3_employee_union_relationship.setPropagateTags(ONE_TO_TWO);
relationshipStore.update(process3_employee_union_relationship);
assertClassificationNotExistInEntity(EMPLOYEES_UNION_PROCESS, tag4);
try {
relationshipStore.update(process3_employee_union_relationship);
} catch (AtlasBaseException ex) {
assertEquals(ex.getAtlasErrorCode(), AtlasErrorCode.INVALID_PROPAGATION_TYPE);
}
//cleanup
deleteClassification(hdfs_employees, tag1);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment