Commit ec00aed1 by Sarath Subramanian Committed by Madhan Neethiraj

ATLAS-2481: entity-classification edges - labelled as 'classifiedAs', with additional properties

parent 02f382a2
......@@ -116,9 +116,12 @@ public final class Constants {
public static final String ATTRIBUTE_NAME_VERSION = "version";
public static final String TEMP_STRUCT_NAME_PREFIX = "__tempQueryResultStruct";
public static final String CLASSIFICATION_ENTITY_GUID = INTERNAL_PROPERTY_KEY_PREFIX + "entityGuid";
public static final String CLASSIFICATION_PROPAGATE_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "propagate";
public static final String CLASSIFICATION_VALIDITY_PERIODS_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "validityPeriods";
public static final String CLASSIFICATION_ENTITY_GUID = INTERNAL_PROPERTY_KEY_PREFIX + "entityGuid";
public static final String CLASSIFICATION_VALIDITY_PERIODS_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "validityPeriods";
public static final String CLASSIFICATION_VERTEX_PROPAGATE_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "propagate";
public static final String CLASSIFICATION_EDGE_NAME_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "name";
public static final String CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "isPropagated";
public static final String CLASSIFICATION_LABEL = "classifiedAs";
private Constants() {
}
......
......@@ -105,6 +105,15 @@ public interface AtlasGraphManagement {
AtlasGraphIndex getGraphIndex(String indexName);
/**
* Checks if a vertex-centric edge exists already.
*
* @param label
* @param indexName
* @return
*/
boolean edgeIndexExist(String label, String indexName);
/**
* Creates a mixed Vertex index for the graph.
*
* @param name the name of the index to create
......@@ -123,6 +132,16 @@ public interface AtlasGraphManagement {
void createEdgeMixedIndex(String index, String backingIndex, List<AtlasPropertyKey> propertyKeys);
/**
* Creates a vertex-centric edge index for the graph.
*
* @param label edge label name
* @param indexName name of the edge index
* @param edgeDirection direction of the edge to index
* @param propertyKeys edge property keys to be added to the index
*/
void createEdgeIndex(String label, String indexName, AtlasEdgeDirection edgeDirection, List<AtlasPropertyKey> propertyKeys);
/**
* Creates a full text index for the given property.
*
* @param index the name of the index to create
......
......@@ -52,18 +52,32 @@ public interface AtlasGraphQuery<V, E> {
*/
AtlasGraphQuery<V, E> in(String propertyKey, Collection<?> values);
/**
* Executes the query and returns the matching edges.
* @return
*/
Iterable<AtlasEdge<V, E>> edges();
/**
* Executes the query and returns the matching vertices.
* Executes the query and returns the matching edges till the max limit
* @param limit max number of vertices
* @return
*/
Iterable<AtlasVertex<V, E>> vertices();
Iterable<AtlasEdge<V, E>> edges(int limit);
/**
* Executes the query and returns the matching edges.
* Executes the query and returns the matching edges from given offset till the max limit
* @param offset starting offset
* @param limit max number of vertices
* @return
*/
Iterable<AtlasEdge<V, E>> edges();
Iterable<AtlasEdge<V, E>> edges(int offset, int limit);
/**
* Executes the query and returns the matching vertices.
* @return
*/
Iterable<AtlasVertex<V, E>> vertices();
/**
* Executes the query and returns the matching vertices from given offset till the max limit
......
......@@ -67,4 +67,21 @@ public interface AtlasVertexQuery<V, E> {
* @return
*/
long count();
/**
* Specifies the edge label that should be queried.
*
* @param label
* @return
*/
AtlasVertexQuery<V, E> label(String label);
/**
* Returns edges that matches property key and value.
*
* @param key
* @param value
* @return
*/
AtlasVertexQuery<V, E> has(String key, Object value);
}
......@@ -48,6 +48,21 @@ public interface NativeTinkerpopGraphQuery<V, E> {
/**
* Executes graph query
* @param limit Max edges to return
* @return
*/
Iterable<AtlasEdge<V, E>> edges(int limit);
/**
* Executes graph query
* @param offset Starting offset
* @param limit Max edges to return
* @return
*/
Iterable<AtlasEdge<V, E>> edges(int offset, int limit);
/**
* Executes graph query
* @param limit Max vertices to return
* @return
*/
......
......@@ -157,6 +157,45 @@ public abstract class TinkerpopGraphQuery<V, E> implements AtlasGraphQuery<V, E>
}
@Override
public Iterable<AtlasEdge<V, E>> edges(int limit) {
return edges(0, limit);
}
@Override
public Iterable<AtlasEdge<V, E>> edges(int offset, int limit) {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing: " + queryCondition);
}
Preconditions.checkArgument(offset >= 0, "Offset must be non-negative");
Preconditions.checkArgument(limit >= 0, "Limit must be non-negative");
// Compute the overall result by combining the results of all the AndConditions (nested within OR) together.
Set<AtlasEdge<V, E>> result = new HashSet<>();
long resultIdx = 0;
for(AndCondition andExpr : queryCondition.getAndTerms()) {
if (result.size() == limit) {
break;
}
NativeTinkerpopGraphQuery<V, E> andQuery = andExpr.create(getQueryFactory());
for(AtlasEdge<V, E> edge : andQuery.edges(offset + limit)) {
if (resultIdx >= offset) {
result.add(edge);
if (result.size() == limit) {
break;
}
}
resultIdx++;
}
}
return result;
}
@Override
public Iterable<AtlasVertex<V, E>> vertices(int limit) {
return vertices(0, limit);
}
......
......@@ -18,6 +18,8 @@
package org.apache.atlas.repository.graphdb.janus;
import com.google.common.base.Preconditions;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.janusgraph.core.Cardinality;
import org.janusgraph.core.EdgeLabel;
import org.janusgraph.core.PropertyKey;
......@@ -83,6 +85,22 @@ public class AtlasJanusGraphManagement implements AtlasGraphManagement {
}
@Override
public void createEdgeIndex(String label, String indexName, AtlasEdgeDirection edgeDirection, List<AtlasPropertyKey> propertyKeys) {
EdgeLabel edgeLabel = management.getEdgeLabel(label);
if (edgeLabel == null) {
edgeLabel = management.makeEdgeLabel(label).make();
}
Direction direction = AtlasJanusObjectFactory.createDirection(edgeDirection);
PropertyKey[] keys = AtlasJanusObjectFactory.createPropertyKeys(propertyKeys);
if (management.getRelationIndex(edgeLabel, indexName) == null) {
management.buildEdgeIndex(edgeLabel, indexName, direction, keys);
}
}
@Override
public void createFullTextMixedIndex(String indexName, String backingIndex, List<AtlasPropertyKey> propertyKeys) {
IndexBuilder indexBuilder = management.buildIndex(indexName, Vertex.class);
......@@ -192,6 +210,13 @@ public class AtlasJanusGraphManagement implements AtlasGraphManagement {
}
@Override
public boolean edgeIndexExist(String label, String indexName) {
EdgeLabel edgeLabel = management.getEdgeLabel(label);
return edgeLabel != null && management.getRelationIndex(edgeLabel, indexName) != null;
}
@Override
public void createVertexCompositeIndex(String propertyName, boolean isUnique, List<AtlasPropertyKey> propertyKeys) {
IndexBuilder indexBuilder = management.buildIndex(propertyName, Vertex.class);
......
......@@ -25,6 +25,8 @@ import org.apache.tinkerpop.gremlin.structure.Direction;
import org.janusgraph.core.Cardinality;
import org.janusgraph.core.PropertyKey;
import java.util.ArrayList;
import java.util.List;
/**
* Factory that serves up instances of Janus/Tinkerpop classes that correspond to
......@@ -61,7 +63,7 @@ public final class AtlasJanusObjectFactory {
/**
* Converts a Multiplicity to a Cardinality.
*
* @param multiplicity
* @param cardinality
* @return
*/
public static Cardinality createCardinality(AtlasCardinality cardinality) {
......@@ -82,4 +84,16 @@ public final class AtlasJanusObjectFactory {
return ((AtlasJanusPropertyKey)key).getWrappedPropertyKey();
}
public static PropertyKey[] createPropertyKeys(List<AtlasPropertyKey> keys) {
PropertyKey[] ret = new PropertyKey[keys.size()];
int i = 0;
for (AtlasPropertyKey key : keys) {
ret[i] = createPropertyKey(key);
i++;
}
return ret;
}
}
......@@ -76,5 +76,15 @@ public class AtlasJanusVertexQuery implements AtlasVertexQuery<AtlasJanusVertex,
return query.count();
}
@Override
public AtlasVertexQuery<AtlasJanusVertex, AtlasJanusEdge> label(String label) {
query.labels(label);
return this;
}
@Override
public AtlasVertexQuery<AtlasJanusVertex, AtlasJanusEdge> has(String key, Object value) {
query.has(key, value);
return this;
}
}
......@@ -17,6 +17,7 @@
*/
package org.apache.atlas.repository.graphdb.janus.query;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.janusgraph.core.JanusGraphEdge;
import org.janusgraph.core.JanusGraphQuery;
import org.janusgraph.core.JanusGraphVertex;
......@@ -64,6 +65,28 @@ public class NativeJanusGraphQuery implements NativeTinkerpopGraphQuery<AtlasJan
}
@Override
public Iterable<AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>> edges(int limit) {
Iterable<JanusGraphEdge> it = query.limit(limit).edges();
return graph.wrapEdges(it);
}
@Override
public Iterable<AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>> edges(int offset, int limit) {
List<Edge> result = new ArrayList<>(limit);
Iterator<? extends Edge> iter = query.limit(offset + limit).edges().iterator();
for (long resultIdx = 0; iter.hasNext() && result.size() < limit; resultIdx++) {
if (resultIdx < offset) {
continue;
}
result.add(iter.next());
}
return graph.wrapEdges(result);
}
@Override
public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> vertices(int limit) {
Iterable<JanusGraphVertex> it = query.limit(limit).vertices();
return graph.wrapVertices(it);
......
......@@ -20,14 +20,15 @@ package org.apache.atlas.repository.graphdb.titan0;
import com.thinkaurelius.titan.core.Cardinality;
import com.thinkaurelius.titan.core.EdgeLabel;
import com.thinkaurelius.titan.core.PropertyKey;
import com.thinkaurelius.titan.core.schema.Mapping;
import com.thinkaurelius.titan.core.schema.PropertyKeyMaker;
import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
import com.thinkaurelius.titan.core.schema.TitanManagement;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.Vertex;
import org.apache.atlas.repository.graphdb.AtlasCardinality;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasEdgeLabel;
import org.apache.atlas.repository.graphdb.AtlasGraphIndex;
import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
......@@ -62,6 +63,20 @@ public class Titan0GraphManagement implements AtlasGraphManagement {
}
@Override
public void createEdgeIndex(String label, String indexName, AtlasEdgeDirection edgeDirection, List<AtlasPropertyKey> propertyKeys) {
EdgeLabel edgeLabel = management.getEdgeLabel(label);
if (edgeLabel == null) {
edgeLabel = management.makeEdgeLabel(label).make();
}
Direction direction = TitanObjectFactory.createDirection(edgeDirection);
PropertyKey[] keys = TitanObjectFactory.createPropertyKeys(propertyKeys);
management.buildEdgeIndex(edgeLabel, indexName, direction, keys);
}
@Override
public void createFullTextMixedIndex(String index, String backingIndex, List<AtlasPropertyKey> propertyKeys) {
}
......@@ -201,4 +216,10 @@ public class Titan0GraphManagement implements AtlasGraphManagement {
return GraphDbObjectFactory.createGraphIndex(index);
}
@Override
public boolean edgeIndexExist(String label, String indexName) {
EdgeLabel edgeLabel = management.getEdgeLabel(label);
return edgeLabel != null && management.getRelationIndex(edgeLabel, indexName) != null;
}
}
......@@ -77,4 +77,16 @@ public class Titan0VertexQuery implements AtlasVertexQuery<Titan0Vertex, Titan0E
public long count() {
return vertexQuery.count();
}
@Override
public AtlasVertexQuery<Titan0Vertex, Titan0Edge> label(String label) {
vertexQuery.labels(label);
return this;
}
@Override
public AtlasVertexQuery<Titan0Vertex, Titan0Edge> has(String key, Object value) {
vertexQuery.has(key, value);
return this;
}
}
......@@ -25,6 +25,9 @@ import com.thinkaurelius.titan.core.Cardinality;
import com.thinkaurelius.titan.core.PropertyKey;
import com.tinkerpop.blueprints.Direction;
import java.util.ArrayList;
import java.util.List;
/**
* Factory that serves up instances of Titan/Tinkerpop classes that correspond to
* graph database abstraction layer/Atlas classes.
......@@ -60,7 +63,7 @@ public final class TitanObjectFactory {
/**
* Converts a Multiplicity to a Cardinality.
*
* @param multiplicity
* @param cardinality
* @return
*/
public static Cardinality createCardinality(AtlasCardinality cardinality) {
......@@ -81,4 +84,16 @@ public final class TitanObjectFactory {
return ((Titan0PropertyKey)key).getWrappedPropertyKey();
}
public static PropertyKey[] createPropertyKeys(List<AtlasPropertyKey> keys) {
PropertyKey[] ret = new PropertyKey[keys.size()];
int i = 0;
for (AtlasPropertyKey key : keys) {
ret[i] = createPropertyKey(key);
i++;
}
return ret;
}
}
......@@ -23,6 +23,7 @@ import com.thinkaurelius.titan.core.attribute.Contain;
import com.thinkaurelius.titan.core.attribute.Text;
import com.thinkaurelius.titan.graphdb.query.TitanPredicate;
import com.tinkerpop.blueprints.Compare;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Vertex;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery.ComparisionOperator;
......@@ -64,6 +65,29 @@ public class NativeTitan0GraphQuery implements NativeTinkerpopGraphQuery<Titan0V
Iterable it = query.edges();
return graph.wrapEdges(it);
}
@Override
public Iterable<AtlasEdge<Titan0Vertex, Titan0Edge>> edges(int limit) {
Iterable it = query.limit(limit).edges();
return graph.wrapEdges(it);
}
@Override
public Iterable<AtlasEdge<Titan0Vertex, Titan0Edge>> edges(int offset, int limit) {
List<Edge> result = new ArrayList<>(limit);
Iterator<Edge> iter = query.limit(offset + limit).edges().iterator();
for (long resultIdx = 0; iter.hasNext() && result.size() < limit; resultIdx++) {
if (resultIdx < offset) {
continue;
}
result.add(iter.next());
}
return graph.wrapEdges(result);
}
@Override
public Iterable<AtlasVertex<Titan0Vertex, Titan0Edge>> vertices(int limit) {
Iterable it = query.limit(limit).vertices();
......
......@@ -125,6 +125,8 @@ public enum AtlasErrorCode {
CLASSIFICATION_UPDATE_FROM_PROPAGATED_ENTITY(400, "ATLAS-400-00-06B", "Update to classification {0} is not allowed from propagated entity"),
CLASSIFICATION_DELETE_FROM_PROPAGATED_ENTITY(400, "ATLAS-400-00-06C", "Delete of classification {0} is not allowed from propagated entity"),
CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY(400, "ATLAS-400-00-06D", "Classification {0} is not associated with entity"),
NO_CLASSIFICATIONS_FOUND_FOR_ENTITY(400, "ATLAS-400-00-06E", "No classifications associated with entity: {0}"),
INVALID_CLASSIFICATION_PARAMS(400, "ATLAS-400-00-06F", "Invalid classification parameters passed for {0} operation for entity: {1}"),
UNAUTHORIZED_ACCESS(403, "ATLAS-403-00-001", "{0} is not authorized to perform {1}"),
......
......@@ -462,7 +462,7 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader(atlasVertex, resultAttributes);
if(searchParameters.getIncludeClassificationAttributes()) {
entity.setClassifications(entityRetriever.getClassifications(atlasVertex));
entity.setClassifications(entityRetriever.getAllClassifications(atlasVertex));
}
ret.addEntity(entity);
......
......@@ -46,7 +46,7 @@ enum GremlinClause {
TEXT_CONTAINS("has('%s', org.janusgraph.core.attribute.Text.textRegex(%s))"),
TEXT_PREFIX("has('%s', org.janusgraph.core.attribute.Text.textPrefix(%s))"),
TEXT_SUFFIX("has('%s', org.janusgraph.core.attribute.Text.textRegex(\".*\" + %s))"),
TRAIT("or(has('__traitNames', within('%s')), has('__propagatedTraitNames', within('%s')))"),
TRAIT("outE('classifiedAs').has('__name', within('%s')).outV()"),
SELECT_NOOP_FN("def f(r){ r }; "),
SELECT_FN("def f(r){ t=[[%s]]; %s r.each({t.add([%s])}); t.unique(); }; "),
SELECT_ONLY_AGG_FN("def f(r){ t=[[%s]]; %s t.add([%s]); t;}; "),
......
......@@ -37,6 +37,7 @@ import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.IndexException;
import org.apache.atlas.repository.RepositoryException;
import org.apache.atlas.repository.graphdb.AtlasCardinality;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphIndex;
import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
......@@ -59,6 +60,7 @@ import javax.inject.Inject;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
......@@ -66,6 +68,9 @@ import java.util.Set;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.*;
import static org.apache.atlas.repository.Constants.BACKING_INDEX;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_NAME_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.CREATED_BY_KEY;
import static org.apache.atlas.repository.Constants.EDGE_INDEX;
import static org.apache.atlas.repository.Constants.ENTITY_TEXT_PROPERTY_KEY;
......@@ -280,6 +285,11 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
createVertexIndex(management, TYPENAME_PROPERTY_KEY, String.class, true, SINGLE, true, true);
createVertexIndex(management, VERTEX_TYPE_PROPERTY_KEY, String.class, false, SINGLE, true, true);
// create vertex-centric index
createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, String.class, SINGLE);
createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, Boolean.class, SINGLE);
createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, Arrays.asList(CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY));
// create edge indexes
createEdgeIndex(management, RELATIONSHIP_GUID_PROPERTY_KEY, String.class, SINGLE, true);
......@@ -475,6 +485,55 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
return propertyKey;
}
private void createVertexCentricIndex(AtlasGraphManagement management, String edgeLabel, AtlasEdgeDirection edgeDirection,
String propertyName, Class propertyClass, AtlasCardinality cardinality) {
AtlasPropertyKey propertyKey = management.getPropertyKey(propertyName);
if (propertyKey == null) {
propertyKey = management.makePropertyKey(propertyName, propertyClass, cardinality);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Creating vertex-centric index for edge label: {} direction: {} for property: {} of type: {} ",
edgeLabel, edgeDirection.name(), propertyName, propertyClass.getName());
}
final String indexName = edgeLabel + propertyKey.getName();
if (!management.edgeIndexExist(edgeLabel, indexName)) {
management.createEdgeIndex(edgeLabel, indexName, edgeDirection, Collections.singletonList(propertyKey));
LOG.info("Created vertex-centric index for edge label: {} direction: {} for property: {} of type: {}",
edgeLabel, edgeDirection.name(), propertyName, propertyClass.getName());
}
}
private void createVertexCentricIndex(AtlasGraphManagement management, String edgeLabel, AtlasEdgeDirection edgeDirection, List<String> propertyNames) {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating vertex-centric index for edge label: {} direction: {} for properties: {}",
edgeLabel, edgeDirection.name(), propertyNames);
}
String indexName = edgeLabel;
List<AtlasPropertyKey> propertyKeys = new ArrayList<>();
for (String propertyName : propertyNames) {
AtlasPropertyKey propertyKey = management.getPropertyKey(propertyName);
if (propertyKey != null) {
propertyKeys.add(propertyKey);
indexName = indexName + propertyKey.getName();
}
}
if (!management.edgeIndexExist(edgeLabel, indexName) && CollectionUtils.isNotEmpty(propertyKeys)) {
management.createEdgeIndex(edgeLabel, indexName, edgeDirection, propertyKeys);
LOG.info("Created vertex-centric index for edge label: {} direction: {} for properties: {}", edgeLabel, edgeDirection.name(), propertyNames);
}
}
private AtlasPropertyKey createEdgeIndex(AtlasGraphManagement management, String propertyName, Class propertyClass,
AtlasCardinality cardinality, boolean createCompositeIndex) {
AtlasPropertyKey propertyKey = management.getPropertyKey(propertyName);
......
......@@ -26,11 +26,11 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.model.typedef.AtlasRelationshipDef;
import org.apache.atlas.repository.graphdb.AtlasVertexQuery;
import org.apache.atlas.util.AtlasGremlinQueryProvider;
import org.apache.atlas.v1.model.instance.Id;
import org.apache.atlas.v1.model.instance.Referenceable;
......@@ -74,8 +74,11 @@ import java.util.Set;
import java.util.UUID;
import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_NAME_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_PROPAGATE_KEY;
import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TRAIT_NAMES_PROPERTY_KEY;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.BOTH;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
......@@ -181,24 +184,39 @@ public final class GraphHelper {
return vertexWithoutIdentity;
}
public AtlasEdge addClassificationEdge(AtlasVertex entityVertex, AtlasVertex classificationVertex, boolean isPropagated) {
AtlasEdge ret = addEdge(entityVertex, classificationVertex, CLASSIFICATION_LABEL);
if (ret != null) {
AtlasGraphUtilsV1.setProperty(ret, CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, getTypeName(classificationVertex));
AtlasGraphUtilsV1.setProperty(ret, CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, isPropagated);
}
return ret;
}
public AtlasEdge addEdge(AtlasVertex fromVertex, AtlasVertex toVertex, String edgeLabel) {
AtlasEdge ret;
if (LOG.isDebugEnabled()) {
LOG.debug("Adding edge for {} -> label {} -> {}", string(fromVertex), edgeLabel, string(toVertex));
}
AtlasEdge edge = graph.addEdge(fromVertex, toVertex, edgeLabel);
ret = graph.addEdge(fromVertex, toVertex, edgeLabel);
setProperty(edge, Constants.STATE_PROPERTY_KEY, Id.EntityState.ACTIVE.name());
setProperty(edge, Constants.TIMESTAMP_PROPERTY_KEY, RequestContextV1.get().getRequestTime());
setProperty(edge, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContextV1.get().getRequestTime());
setProperty(edge, Constants.CREATED_BY_KEY, RequestContextV1.get().getUser());
setProperty(edge, Constants.MODIFIED_BY_KEY, RequestContextV1.get().getUser());
if (ret != null) {
setProperty(ret, Constants.STATE_PROPERTY_KEY, Id.EntityState.ACTIVE.name());
setProperty(ret, Constants.TIMESTAMP_PROPERTY_KEY, RequestContextV1.get().getRequestTime());
setProperty(ret, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContextV1.get().getRequestTime());
setProperty(ret, Constants.CREATED_BY_KEY, RequestContextV1.get().getUser());
setProperty(ret, Constants.MODIFIED_BY_KEY, RequestContextV1.get().getUser());
if (LOG.isDebugEnabled()) {
LOG.debug("Added {}", string(edge));
if (LOG.isDebugEnabled()) {
LOG.debug("Added {}", string(ret));
}
}
return edge;
return ret;
}
public AtlasEdge getOrCreateEdge(AtlasVertex outVertex, AtlasVertex inVertex, String edgeLabel) throws RepositoryException {
......@@ -364,6 +382,91 @@ public final class GraphHelper {
return null;
}
public static boolean isPropagationEnabled(AtlasVertex classificationVertex) {
boolean ret = false;
if (classificationVertex != null) {
Boolean enabled = AtlasGraphUtilsV1.getProperty(classificationVertex, CLASSIFICATION_VERTEX_PROPAGATE_KEY, Boolean.class);
ret = (enabled == null) ? true : enabled;
}
return ret;
}
public static AtlasVertex getClassificationVertex(AtlasVertex entityVertex, String classificationName) {
AtlasVertex ret = null;
AtlasEdge edge = getClassificationEdge(entityVertex, classificationName);
if (edge != null) {
ret = edge.getInVertex();
}
return ret;
}
public static AtlasEdge getClassificationEdge(AtlasVertex entityVertex, String classificationName) {
AtlasEdge ret = null;
Iterable edges = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL)
.has(CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, false)
.has(CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, classificationName).edges();
if (edges != null) {
Iterator<AtlasEdge> iterator = edges.iterator();
if (iterator.hasNext()) {
AtlasEdge edge = iterator.next();
ret = (edge != null) ? edge : null;
}
}
return ret;
}
public static List<String> getPropagatedEntities(AtlasVertex classificationVertex) {
List<String> ret = new ArrayList<>();
List<AtlasVertex> entityVertices = getPropagatedEntityVertices(classificationVertex);
if (CollectionUtils.isNotEmpty(entityVertices)) {
for (AtlasVertex entityVertex : entityVertices) {
ret.add(getGuid(entityVertex));
}
}
return ret;
}
public static List<AtlasVertex> getPropagatedEntityVertices(AtlasVertex classificationVertex) {
List<AtlasVertex> ret = new ArrayList<>();
List<AtlasEdge> propagatedEdges = getPropagatedEdges(classificationVertex);
if (CollectionUtils.isNotEmpty(propagatedEdges)) {
for (AtlasEdge propagatedEdge : propagatedEdges) {
ret.add(propagatedEdge.getOutVertex());
}
}
return ret;
}
public static List<AtlasEdge> getPropagatedEdges(AtlasVertex classificationVertex) {
List<AtlasEdge> ret = new ArrayList<>();
Iterable edges = classificationVertex.query().direction(AtlasEdgeDirection.IN).label(CLASSIFICATION_LABEL)
.has(CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, true)
.has(CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, getTypeName(classificationVertex)).edges();
if (edges != null) {
Iterator<AtlasEdge> iterator = edges.iterator();
while (iterator.hasNext()) {
AtlasEdge edge = iterator.next();
ret.add(edge);
}
}
return ret;
}
public static Iterator<AtlasEdge> getIncomingEdgesByLabel(AtlasVertex instanceVertex, String edgeLabel) {
return getAdjacentEdgesByLabel(instanceVertex, AtlasEdgeDirection.IN, edgeLabel);
}
......@@ -783,52 +886,31 @@ public final class GraphHelper {
return traitName;
}
public static String getPropagatedEdgeLabel(String classificationName) {
return "propagated:" + classificationName;
public static List<String> getTraitNames(AtlasVertex entityVertex) {
return getTraitNames(entityVertex, false);
}
public static List<String> getAllTraitNames(AtlasVertex<?, ?> entityVertex) {
ArrayList<String> ret = new ArrayList<>();
if (entityVertex != null) {
Collection<String> traitNames = entityVertex.getPropertyValues(TRAIT_NAMES_PROPERTY_KEY, String.class);
if (CollectionUtils.isNotEmpty(traitNames)) {
ret.addAll(traitNames);
}
traitNames = entityVertex.getPropertyValues(PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, String.class);
if (CollectionUtils.isNotEmpty(traitNames)) {
ret.addAll(traitNames);
}
}
return ret;
public static List<String> getAllTraitNames(AtlasVertex entityVertex) {
return getTraitNames(entityVertex, null);
}
public static List<String> getTraitNames(AtlasVertex<?,?> entityVertex) {
ArrayList<String> ret = new ArrayList<>();
if (entityVertex != null) {
Collection<String> traitNames = entityVertex.getPropertyValues(TRAIT_NAMES_PROPERTY_KEY, String.class);
public static List<String> getTraitNames(AtlasVertex entityVertex, Boolean propagated) {
List<String> ret = new ArrayList<>();
AtlasVertexQuery query = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL);
if (CollectionUtils.isNotEmpty(traitNames)) {
ret.addAll(traitNames);
}
if (propagated != null) {
query = query.has(CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, propagated);
}
return ret;
}
Iterable edges = query.edges();
public static List<String> getPropagatedTraitNames(AtlasVertex<?,?> entityVertex) {
ArrayList<String> ret = new ArrayList<>();
if (edges != null) {
Iterator<AtlasEdge> iterator = edges.iterator();
if (entityVertex != null) {
Collection<String> traitNames = entityVertex.getPropertyValues(PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, String.class);
while (iterator.hasNext()) {
AtlasEdge edge = iterator.next();
if (CollectionUtils.isNotEmpty(traitNames)) {
ret.addAll(traitNames);
ret.add(AtlasGraphUtilsV1.getProperty(edge, CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, String.class));
}
}
......@@ -918,25 +1000,6 @@ public final class GraphHelper {
return element.getProperty(Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class);
}
public List<AtlasVertex> getPropagatedEntityVerticesFromClassification(AtlasVertex classificationVertex) {
List<AtlasVertex> ret = new ArrayList<>();
if (classificationVertex != null) {
String classificationName = getTypeName(classificationVertex);
Iterator<AtlasEdge> iterator = getIncomingEdgesByLabel(classificationVertex, getPropagatedEdgeLabel(classificationName));
while (iterator != null && iterator.hasNext()) {
AtlasEdge propagatedEdge = iterator.next();
if (propagatedEdge != null) {
ret.add(propagatedEdge.getOutVertex());
}
}
}
return ret;
}
/**
* For the given type, finds an unique attribute and checks if there is an existing instance with the same
* unique value
......@@ -1092,18 +1155,17 @@ public final class GraphHelper {
}
public static AtlasVertex getAssociatedEntityVertex(AtlasVertex classificationVertex) {
AtlasVertex ret = null;
if (classificationVertex != null) {
Iterator<AtlasEdge> iterator = getIncomingEdgesByLabel(classificationVertex, getTypeName(classificationVertex));
while (iterator != null && iterator.hasNext()) {
AtlasVertex ret = null;
Iterable edges = classificationVertex.query().direction(AtlasEdgeDirection.IN).label(CLASSIFICATION_LABEL)
.has(CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, false)
.has(CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, getTypeName(classificationVertex)).edges();
if (edges != null) {
Iterator<AtlasEdge> iterator = edges.iterator();
if (iterator != null && iterator.hasNext()) {
AtlasEdge edge = iterator.next();
if (edge != null) {
ret = edge.getOutVertex();
break;
}
ret = edge.getOutVertex();
}
}
......@@ -1564,7 +1626,7 @@ public final class GraphHelper {
}
public static void removePropagatedTraitNameFromVertex(AtlasVertex entityVertex, String propagatedTraitName) {
List<String> propagatedTraitNames = getPropagatedTraitNames(entityVertex);
List<String> propagatedTraitNames = getTraitNames(entityVertex, true);
if (CollectionUtils.isNotEmpty(propagatedTraitNames) && propagatedTraitNames.contains(propagatedTraitName)) {
propagatedTraitNames.remove(propagatedTraitName);
......
......@@ -40,6 +40,7 @@ import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -47,12 +48,12 @@ import org.slf4j.LoggerFactory;
import java.util.*;
import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY;
import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX;
import static org.apache.atlas.repository.graph.GraphHelper.addListProperty;
import static org.apache.atlas.repository.graph.GraphHelper.getIncomingEdgesByLabel;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedEdgeLabel;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedTraitNames;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedEdges;
import static org.apache.atlas.repository.graph.GraphHelper.getTraitNames;
import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
import static org.apache.atlas.repository.graph.GraphHelper.isRelationshipEdge;
import static org.apache.atlas.repository.graph.GraphHelper.string;
......@@ -350,21 +351,20 @@ public abstract class DeleteHandlerV1 {
}
}
public void removeTagPropagation(AtlasVertex classificationVertex) throws AtlasBaseException {
if (classificationVertex != null) {
String classificationName = getTypeName(classificationVertex);
Iterator<AtlasEdge> iterator = getIncomingEdgesByLabel(classificationVertex, getPropagatedEdgeLabel(classificationName));
public List<AtlasVertex> removeTagPropagation(AtlasVertex classificationVertex) throws AtlasBaseException {
List<AtlasVertex> ret = new ArrayList<>();
// remove classification from propagated entity vertices
while (iterator != null && iterator.hasNext()) {
AtlasEdge propagatedEdge = iterator.next();
if (classificationVertex != null) {
String classificationName = getTypeName(classificationVertex);
List<AtlasEdge> propagatedEdges = getPropagatedEdges(classificationVertex);
if (propagatedEdge != null) {
if (CollectionUtils.isNotEmpty(propagatedEdges)) {
for (AtlasEdge propagatedEdge : propagatedEdges) {
AtlasVertex propagatedEntityVertex = propagatedEdge.getOutVertex();
if (LOG.isDebugEnabled()) {
LOG.debug("Removing propagated classification: [{}] from: [{}][{}] with edge label: [{}]", classificationName,
getTypeName(propagatedEntityVertex), GraphHelper.getGuid(propagatedEntityVertex), getPropagatedEdgeLabel(classificationName));
getTypeName(propagatedEntityVertex), GraphHelper.getGuid(propagatedEntityVertex), CLASSIFICATION_LABEL);
}
removePropagatedTraitName(propagatedEntityVertex, classificationName);
......@@ -372,14 +372,18 @@ public abstract class DeleteHandlerV1 {
deleteEdge(propagatedEdge, true);
updateModificationMetadata(propagatedEntityVertex);
ret.add(propagatedEntityVertex);
}
}
}
return ret;
}
private void removePropagatedTraitName(AtlasVertex entityVertex, String classificationName) {
if (entityVertex != null && StringUtils.isNotEmpty(classificationName)) {
List<String> propagatedTraitNames = getPropagatedTraitNames(entityVertex);
List<String> propagatedTraitNames = getTraitNames(entityVertex, true);
propagatedTraitNames.remove(classificationName);
......
......@@ -22,7 +22,6 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TimeBoundary;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
......@@ -74,15 +73,19 @@ import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DE
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality.SET;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TRAIT_NAMES_PROPERTY_KEY;
import static org.apache.atlas.repository.graph.GraphHelper.addListProperty;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedEdgeLabel;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdge;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertex;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedEntities;
import static org.apache.atlas.repository.graph.GraphHelper.getTraitLabel;
import static org.apache.atlas.repository.graph.GraphHelper.getTraitNames;
import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
import static org.apache.atlas.repository.graph.GraphHelper.getTypeNames;
import static org.apache.atlas.repository.graph.GraphHelper.isPropagationEnabled;
import static org.apache.atlas.repository.graph.GraphHelper.isRelationshipEdge;
import static org.apache.atlas.repository.graph.GraphHelper.string;
import static org.apache.atlas.repository.graph.GraphHelper.updateModificationMetadata;
......@@ -260,7 +263,6 @@ public class EntityGraphMapper {
AtlasGraphUtilsV1.addProperty(ret, Constants.SUPER_TYPES_PROPERTY_KEY, classificationType.getAllSuperTypes());
AtlasGraphUtilsV1.setProperty(ret, Constants.CLASSIFICATION_ENTITY_GUID, classification.getEntityGuid());
AtlasGraphUtilsV1.setProperty(ret, Constants.CLASSIFICATION_PROPAGATE_KEY, classification.isPropagate());
return ret;
}
......@@ -1308,10 +1310,10 @@ public class EntityGraphMapper {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
}
String entityTypeName = AtlasGraphUtilsV1.getTypeName(entityVertex);
final AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName);
List<AtlasVertex> propagatedEntityVertices = null;
List<AtlasClassification> propagagedClassifications = null;
final String entityTypeName = AtlasGraphUtilsV1.getTypeName(entityVertex);
final AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName);
List<AtlasVertex> entitiesToPropagateTo = null;
Map<AtlasVertex, List<AtlasClassification>> propagations = null;
for (AtlasClassification classification : classifications) {
String classificationName = classification.getTypeName();
......@@ -1338,22 +1340,30 @@ public class EntityGraphMapper {
if (propagateTags) {
// compute propagatedEntityVertices only once
if (propagatedEntityVertices == null) {
propagatedEntityVertices = graphHelper.getImpactedVertices(guid);
if (entitiesToPropagateTo == null) {
entitiesToPropagateTo = graphHelper.getImpactedVertices(guid);
}
if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Propagating tag: [{}][{}] to {}", classificationName, entityTypeName, getTypeNames(propagatedEntityVertices));
if (CollectionUtils.isNotEmpty(entitiesToPropagateTo)) {
if (propagations == null) {
propagations = new HashMap<>(entitiesToPropagateTo.size());
for (AtlasVertex entityToPropagateTo : entitiesToPropagateTo) {
propagations.put(entityToPropagateTo, new ArrayList<>());
}
}
if (propagagedClassifications == null) {
propagagedClassifications = new ArrayList<>();
if (LOG.isDebugEnabled()) {
LOG.debug("Propagating tag: [{}][{}] to {}", classificationName, entityTypeName, getTypeNames(entitiesToPropagateTo));
}
propagagedClassifications.add(classification);
List<AtlasVertex> entitiesPropagatedTo = addTagPropagation(classificationVertex, entitiesToPropagateTo);
addTagPropagation(classificationVertex, propagatedEntityVertices);
if (entitiesPropagatedTo != null) {
for (AtlasVertex entityPropagatedTo : entitiesPropagatedTo) {
propagations.get(entityPropagatedTo).add(classification);
}
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(" --> Not propagating classification: [{}][{}] - no entities found to propagate to.", getTypeName(classificationVertex), entityTypeName);
......@@ -1369,171 +1379,235 @@ public class EntityGraphMapper {
// notify listeners on classification addition
List<AtlasVertex> notificationVertices = new ArrayList<AtlasVertex>() {{ add(entityVertex); }};
if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) {
notificationVertices.addAll(propagatedEntityVertices);
if (CollectionUtils.isNotEmpty(entitiesToPropagateTo)) {
notificationVertices.addAll(entitiesToPropagateTo);
}
for (AtlasVertex vertex : notificationVertices) {
String entityGuid = GraphHelper.getGuid(vertex);
AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid);
AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
List<AtlasClassification> addedClassifications = StringUtils.equals(entityGuid, guid) ? classifications : Collections.emptyList();
List<AtlasClassification> addedClassifications = StringUtils.equals(entityGuid, guid) ? classifications : propagations.get(vertex);
entityChangeNotifier.onClassificationAddedToEntity(entity, addedClassifications);
if (CollectionUtils.isNotEmpty(addedClassifications)) {
entityChangeNotifier.onClassificationAddedToEntity(entity, addedClassifications);
}
}
}
}
public void deleteClassifications(String guid, List<String> classificationNames) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classificationNames)) {
AtlasVertex entityVertex = AtlasGraphUtilsV1.findByGuid(guid);
public void deleteClassifications(String entityGuid, List<String> classificationNames) throws AtlasBaseException {
if (CollectionUtils.isEmpty(classificationNames)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_CLASSIFICATION_PARAMS, "delete", entityGuid);
}
if (entityVertex == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
}
AtlasVertex entityVertex = AtlasGraphUtilsV1.findByGuid(entityGuid);
List<String> traitNames = getTraitNames(entityVertex);
if (entityVertex == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, entityGuid);
}
List<String> traitNames = getTraitNames(entityVertex);
validateClassificationExists(traitNames, classificationNames);
if (CollectionUtils.isEmpty(traitNames)) {
throw new AtlasBaseException(AtlasErrorCode.NO_CLASSIFICATIONS_FOUND_FOR_ENTITY, entityGuid);
}
Set<String> impactedEntities = new HashSet<String>() {{ add(guid); }};
validateClassificationExists(traitNames, classificationNames);
for (String classificationName : classificationNames) {
AtlasEdge classificationEdge = graphHelper.getEdgeForLabel(entityVertex, getTraitLabel(classificationName));
AtlasVertex classificationVertex = classificationEdge.getInVertex();
Map<AtlasVertex, List<String>> removedClassifications = new HashMap<>();
// remove classification from propagated entity vertices
boolean propagationEnabled = entityRetriever.isPropagationEnabled(classificationVertex);
for (String classificationName : classificationNames) {
AtlasVertex classificationVertex = getClassificationVertex(entityVertex, classificationName);
if (propagationEnabled) {
List<AtlasVertex> impactedEntityVertices = graphHelper.getPropagatedEntityVerticesFromClassification(classificationVertex);
// remove classification from propagated entities if propagation is turned on
if (isPropagationEnabled(classificationVertex)) {
List<AtlasVertex> impactedVertices = removeTagPropagation(classificationVertex);
if (CollectionUtils.isNotEmpty(impactedEntityVertices)) {
removeTagPropagation(classificationVertex);
if (CollectionUtils.isNotEmpty(impactedVertices)) {
for (AtlasVertex impactedVertex : impactedVertices) {
List<String> classifications = removedClassifications.get(impactedVertex);
for (AtlasVertex impactedEntityVertex : impactedEntityVertices) {
impactedEntities.add(GraphHelper.getGuid(impactedEntityVertex));
if (classifications == null) {
classifications = new ArrayList<>();
removedClassifications.put(impactedVertex, classifications);
}
classifications.add(classificationName);
}
}
}
// remove classification from associated entity vertex
if (LOG.isDebugEnabled()) {
LOG.debug("Removing classification: [{}] from: [{}][{}] with edge label: [{}]", classificationName, getTypeName(entityVertex), guid, getTraitLabel(classificationName));
}
// remove classifications from associated entity
if (LOG.isDebugEnabled()) {
LOG.debug("Removing classification: [{}] from: [{}][{}] with edge label: [{}]", classificationName,
getTypeName(entityVertex), entityGuid, CLASSIFICATION_LABEL);
}
deleteHandler.deleteEdgeReference(classificationEdge, CLASSIFICATION, false, true, entityVertex);
AtlasEdge edge = getClassificationEdge(entityVertex, classificationName);
traitNames.remove(classificationName);
}
deleteHandler.deleteEdgeReference(edge, CLASSIFICATION, false, true, entityVertex);
updateTraitNamesProperty(entityVertex, traitNames);
traitNames.remove(classificationName);
}
updateModificationMetadata(entityVertex);
removedClassifications.put(entityVertex, classificationNames);
for (String entityGuid : impactedEntities) {
AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid);
AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
List<String> deletedClassificationNames = StringUtils.equals(entityGuid, guid) ? classificationNames : Collections.emptyList();
updateTraitNamesProperty(entityVertex, traitNames);
entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassificationNames);
}
updateModificationMetadata(entityVertex);
for (Map.Entry<AtlasVertex, List<String>> entry : removedClassifications.entrySet()) {
String guid = GraphHelper.getGuid(entry.getKey());
List<String> deletedClassificationNames = entry.getValue();
AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(guid);
AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassificationNames);
}
}
public void updateClassifications(EntityMutationContext context, String guid, List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications)) {
AtlasVertex entityVertex = AtlasGraphUtilsV1.findByGuid(guid);
if (CollectionUtils.isEmpty(classifications)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_CLASSIFICATION_PARAMS, "update", guid);
}
if (entityVertex == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
AtlasVertex entityVertex = AtlasGraphUtilsV1.findByGuid(guid);
if (entityVertex == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
}
String entityTypeName = AtlasGraphUtilsV1.getTypeName(entityVertex);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName);
List<AtlasClassification> updatedClassifications = new ArrayList<>();
List<AtlasVertex> entitiesToPropagateTo = new ArrayList<>();
Map<AtlasVertex, List<AtlasClassification>> addedPropagations = null;
Map<AtlasVertex, List<String>> removedPropagations = null;
for (AtlasClassification classification : classifications) {
String classificationName = classification.getTypeName();
String classificationEntityGuid = classification.getEntityGuid();
if (StringUtils.isNotEmpty(classificationEntityGuid) && !StringUtils.equalsIgnoreCase(guid, classificationEntityGuid)) {
throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_UPDATE_FROM_PROPAGATED_ENTITY, classificationName);
}
String entityTypeName = AtlasGraphUtilsV1.getTypeName(entityVertex);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName);
List<AtlasClassification> updatedClassifications = new ArrayList<>();
List<AtlasVertex> propagatedEntityVertices = new ArrayList<>();
AtlasVertex classificationVertex = getClassificationVertex(entityVertex, classificationName);
for (AtlasClassification classification : classifications) {
String classificationName = classification.getTypeName();
String classificationEntityGuid = classification.getEntityGuid();
if (classificationVertex == null) {
throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY, classificationName);
}
if (StringUtils.isNotEmpty(classificationEntityGuid) && !StringUtils.equalsIgnoreCase(guid, classificationEntityGuid)) {
throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_UPDATE_FROM_PROPAGATED_ENTITY, classificationName);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Updating classification {} for entity {}", classification, guid);
}
String relationshipLabel = getTraitLabel(entityTypeName, classificationName);
AtlasEdge classificationEdge = graphHelper.getEdgeForLabel(entityVertex, relationshipLabel);
AtlasClassification currentClassification = entityRetriever.toAtlasClassification(classificationVertex);
if (classificationEdge == null) {
throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY, classificationName);
}
validateAndNormalizeForUpdate(classification);
if (LOG.isDebugEnabled()) {
LOG.debug("Updating classification {} for entity {}", classification, guid);
Map<String, Object> classificationAttributes = classification.getAttributes();
if (MapUtils.isNotEmpty(classificationAttributes)) {
for (String attributeName : classificationAttributes.keySet()) {
currentClassification.setAttribute(attributeName, classificationAttributes.get(attributeName));
}
}
AtlasVertex classificationVertex = classificationEdge.getInVertex();
AtlasClassification currentClassification = entityRetriever.toAtlasClassification(classificationVertex);
if (LOG.isDebugEnabled()) {
LOG.debug("updating vertex {} for trait {}", string(classificationVertex), classificationName);
}
validateAndNormalizeForUpdate(classification);
mapClassification(EntityOperation.UPDATE, context, classification, entityType, entityVertex, classificationVertex);
Map<String, Object> classificationAttributes = classification.getAttributes();
// handle update of 'propagate' flag
boolean currentTagPropagation = currentClassification.isPropagate();
boolean updatedTagPropagation = classification.isPropagate();
if (MapUtils.isNotEmpty(classificationAttributes)) {
for (String attributeName : classificationAttributes.keySet()) {
currentClassification.setAttribute(attributeName, classificationAttributes.get(attributeName));
// compute propagatedEntityVertices once and use it for subsequent iterations and notifications
if (currentTagPropagation != updatedTagPropagation) {
if (updatedTagPropagation) {
if (CollectionUtils.isEmpty(entitiesToPropagateTo)) {
entitiesToPropagateTo = graphHelper.getImpactedVertices(guid);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("updating vertex {} for trait {}", string(classificationVertex), classificationName);
}
mapClassification(EntityOperation.UPDATE, context, classification, entityType, entityVertex, classificationVertex);
if (CollectionUtils.isNotEmpty(entitiesToPropagateTo)) {
if (addedPropagations == null) {
addedPropagations = new HashMap<>(entitiesToPropagateTo.size());
// handle update of 'propagate' flag
boolean currentTagPropagation = currentClassification.isPropagate();
boolean updatedTagPropagation = classification.isPropagate();
for (AtlasVertex entityToPropagateTo : entitiesToPropagateTo) {
addedPropagations.put(entityToPropagateTo, new ArrayList<>());
}
}
// compute propagatedEntityVertices once and use it for subsequent iterations and notifications
if (CollectionUtils.isEmpty(propagatedEntityVertices)) {
propagatedEntityVertices = (currentTagPropagation) ? graphHelper.getPropagatedEntityVerticesFromClassification(classificationVertex) :
graphHelper.getImpactedVertices(guid);
}
List<AtlasVertex> entitiesPropagatedTo = addTagPropagation(classificationVertex, entitiesToPropagateTo);
if (currentTagPropagation != updatedTagPropagation) {
if (updatedTagPropagation) {
addTagPropagation(classificationVertex, propagatedEntityVertices);
} else {
removeTagPropagation(classificationVertex);
if (entitiesPropagatedTo != null) {
for (AtlasVertex entityPropagatedTo : entitiesPropagatedTo) {
addedPropagations.get(entityPropagatedTo).add(classification);
}
}
}
} else {
List<AtlasVertex> impactedVertices = removeTagPropagation(classificationVertex);
AtlasGraphUtilsV1.setProperty(classificationVertex, Constants.CLASSIFICATION_PROPAGATE_KEY, updatedTagPropagation);
}
if (CollectionUtils.isNotEmpty(impactedVertices)) {
if (removedPropagations == null) {
removedPropagations = new HashMap<>();
updatedClassifications.add(currentClassification);
}
for (AtlasVertex impactedVertex : impactedVertices) {
List<String> removedClassifications = removedPropagations.get(impactedVertex);
// notify listeners on classification update
List<AtlasVertex> notificationVertices = new ArrayList<AtlasVertex>() {{ add(entityVertex); }};
if (removedClassifications == null) {
removedClassifications = new ArrayList<>();
removedPropagations.put(impactedVertex, removedClassifications);
}
if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) {
notificationVertices.addAll(propagatedEntityVertices);
removedClassifications.add(classification.getTypeName());
}
}
}
}
}
for (AtlasVertex vertex : notificationVertices) {
String entityGuid = GraphHelper.getGuid(vertex);
AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid);
AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
List<AtlasClassification> updatedClassificationList = StringUtils.equals(entityGuid, guid) ? updatedClassifications : Collections.emptyList();
updatedClassifications.add(currentClassification);
}
// notify listeners on classification update
List<AtlasVertex> notificationVertices = new ArrayList<AtlasVertex>() {{ add(entityVertex); }};
if (CollectionUtils.isNotEmpty(entitiesToPropagateTo)) {
notificationVertices.addAll(entitiesToPropagateTo);
}
for (AtlasVertex vertex : notificationVertices) {
String entityGuid = GraphHelper.getGuid(vertex);
AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid);
AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
List<AtlasClassification> updatedClassificationList = StringUtils.equals(entityGuid, guid) ? updatedClassifications : Collections.emptyList();
entityChangeNotifier.onClassificationUpdatedToEntity(entity, updatedClassificationList);
entityChangeNotifier.onClassificationUpdatedToEntity(entity, updatedClassificationList);
}
if (removedPropagations != null) {
for (Map.Entry<AtlasVertex, List<String>> entry : removedPropagations.entrySet()) {
AtlasVertex vertex = entry.getKey();
List<String> removedClassifications = entry.getValue();
String entityGuid = GraphHelper.getGuid(vertex);
AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid);
AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
entityChangeNotifier.onClassificationDeletedFromEntity(entity, removedClassifications);
}
}
}
private void addTagPropagation(AtlasVertex classificationVertex, List<AtlasVertex> propagatedEntityVertices) {
private List<AtlasVertex> addTagPropagation(AtlasVertex classificationVertex, List<AtlasVertex> propagatedEntityVertices) {
List<AtlasVertex> ret = null;
if (CollectionUtils.isNotEmpty(propagatedEntityVertices) && classificationVertex != null) {
String classificationName = getTypeName(classificationVertex);
AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classificationName);
......@@ -1545,19 +1619,27 @@ public class EntityGraphMapper {
if (classificationType.canApplyToEntityType(entityType)) {
if (LOG.isDebugEnabled()) {
LOG.debug(" --> Adding propagated classification: [{}] to {} ({}) using edge label: [{}]", classificationName, getTypeName(propagatedEntityVertex),
GraphHelper.getGuid(propagatedEntityVertex), getPropagatedEdgeLabel(classificationName));
GraphHelper.getGuid(propagatedEntityVertex), CLASSIFICATION_LABEL);
}
graphHelper.addEdge(propagatedEntityVertex, classificationVertex, getPropagatedEdgeLabel(classificationName));
if (ret == null) {
ret = new ArrayList<>();
}
ret.add(propagatedEntityVertex);
graphHelper.addClassificationEdge(propagatedEntityVertex, classificationVertex, true);
addListProperty(propagatedEntityVertex, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, classificationName);
}
}
}
return ret;
}
private void removeTagPropagation(AtlasVertex classificationVertex) throws AtlasBaseException {
deleteHandler.removeTagPropagation(classificationVertex);
private List<AtlasVertex> removeTagPropagation(AtlasVertex classificationVertex) throws AtlasBaseException {
return deleteHandler.removeTagPropagation(classificationVertex);
}
private AtlasEdge mapClassification(EntityOperation operation, final EntityMutationContext context, AtlasClassification classification,
......@@ -1571,17 +1653,18 @@ public class EntityGraphMapper {
// if 'null', don't update existing value in the classification
}
AtlasGraphUtilsV1.setProperty(traitInstanceVertex, Constants.CLASSIFICATION_VERTEX_PROPAGATE_KEY, classification.isPropagate());
// map all the attributes to this newly created AtlasVertex
mapAttributes(classification, traitInstanceVertex, operation, context);
// add an edge to the newly created AtlasVertex from the parent
String relationshipLabel = getTraitLabel(entityType.getTypeName(), classification.getTypeName());
AtlasEdge ret = getClassificationEdge(parentInstanceVertex, getTypeName(traitInstanceVertex));
try {
return graphHelper.getOrCreateEdge(parentInstanceVertex, traitInstanceVertex, relationshipLabel);
} catch (RepositoryException e) {
throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
if (ret == null) {
ret = graphHelper.addClassificationEdge(parentInstanceVertex, traitInstanceVertex, false);
}
return ret;
}
public void deleteClassifications(String guid) throws AtlasBaseException {
......
......@@ -63,7 +63,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
......@@ -72,6 +71,7 @@ import java.util.Set;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.*;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX;
import static org.apache.atlas.repository.graph.GraphHelper.addListProperty;
import static org.apache.atlas.repository.graph.GraphHelper.edgeExists;
import static org.apache.atlas.repository.graph.GraphHelper.getAdjacentEdgesByLabel;
import static org.apache.atlas.repository.graph.GraphHelper.getAllTraitNames;
......@@ -80,12 +80,9 @@ import static org.apache.atlas.repository.graph.GraphHelper.getGuid;
import static org.apache.atlas.repository.graph.GraphHelper.getIncomingEdgesByLabel;
import static org.apache.atlas.repository.graph.GraphHelper.getOutGoingEdgesByLabel;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedEdgeLabel;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedTraitNames;
import static org.apache.atlas.repository.graph.GraphHelper.getRelationshipGuid;
import static org.apache.atlas.repository.graph.GraphHelper.getTraitLabel;
import static org.apache.atlas.repository.graph.GraphHelper.getTraitNames;
import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
import static org.apache.atlas.repository.graph.GraphHelper.isPropagationEnabled;
import static org.apache.atlas.repository.graph.GraphHelper.removePropagatedTraitNameFromVertex;
import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getIdFromVertex;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
......@@ -181,7 +178,7 @@ public final class EntityGraphRetriever {
public AtlasEntityHeader toAtlasEntityHeaderWithClassifications(AtlasVertex entityVertex, Set<String> attributes) throws AtlasBaseException {
AtlasEntityHeader ret = toAtlasEntityHeader(entityVertex, attributes);
ret.setClassifications(getClassifications(entityVertex));
ret.setClassifications(getAllClassifications(entityVertex));
return ret;
}
......@@ -446,120 +443,18 @@ public final class EntityGraphRetriever {
}
}
public boolean isPropagationEnabled(AtlasVertex classificationVertex) {
boolean ret = false;
if (classificationVertex != null) {
Boolean enabled = AtlasGraphUtilsV1.getProperty(classificationVertex, CLASSIFICATION_PROPAGATE_KEY, Boolean.class);
ret = enabled == null ? true : enabled;
}
return ret;
}
public List<AtlasClassification> getClassifications(String guid) throws AtlasBaseException {
AtlasVertex instanceVertex = AtlasGraphUtilsV1.findByGuid(guid);
if (instanceVertex == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
}
return getClassifications(instanceVertex);
}
public List<AtlasClassification> getClassifications(AtlasVertex instanceVertex) throws AtlasBaseException {
final List<AtlasClassification> classifications = getClassifications(instanceVertex, null);
final List<AtlasClassification> propagatedClassifications = getPropagatedClassifications(instanceVertex, null);
classifications.addAll(propagatedClassifications);
return classifications;
}
public AtlasClassification getClassification(String guid, String classificationName) throws AtlasBaseException {
AtlasVertex instanceVertex = AtlasGraphUtilsV1.findByGuid(guid);
if (instanceVertex == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
}
List<AtlasClassification> classifications = null;
try {
classifications = getClassifications(instanceVertex, classificationName);
} catch (AtlasBaseException excp) {
// ignore and look for propagated classifications
classifications = getPropagatedClassifications(instanceVertex, classificationName);
}
if(CollectionUtils.isEmpty(classifications)) {
throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationName);
}
return classifications.get(0);
}
private List<AtlasClassification> getClassifications(AtlasVertex instanceVertex, String classificationNameFilter) throws AtlasBaseException {
List<AtlasClassification> ret = new ArrayList<>();
List<String> classificationNames = getTraitNames(instanceVertex);
if (CollectionUtils.isNotEmpty(classificationNames)) {
if (StringUtils.isNotEmpty(classificationNameFilter)) {
if (classificationNames.contains(classificationNameFilter)) {
ret.add(getClassification(instanceVertex, classificationNameFilter));
}
} else {
for (String classificationName : classificationNames) {
ret.add(getClassification(instanceVertex, classificationName));
}
}
}
if (ret.isEmpty() && StringUtils.isNotEmpty(classificationNameFilter)) {
throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationNameFilter);
}
return ret;
}
private List<AtlasClassification> getPropagatedClassifications(AtlasVertex instanceVertex, String classificationNameFilter) throws AtlasBaseException {
List<AtlasClassification> ret = new ArrayList<>();
List<String> classificationNames = getPropagatedTraitNames(instanceVertex);
if (CollectionUtils.isNotEmpty(classificationNames)) {
if (StringUtils.isNotEmpty(classificationNameFilter)) {
if (classificationNames.contains(classificationNameFilter)) {
ret.addAll(getAllPropagatedClassifications(instanceVertex, classificationNameFilter));
}
} else {
for (String classificationName : new HashSet<>(classificationNames)) {
ret.addAll(getAllPropagatedClassifications(instanceVertex, classificationName));
}
}
}
if (ret.isEmpty() && StringUtils.isNotEmpty(classificationNameFilter)) {
throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationNameFilter);
}
return ret;
}
private List<AtlasClassification> getAllPropagatedClassifications(AtlasVertex vertex, String classificationName) throws AtlasBaseException {
List<AtlasClassification> ret = new ArrayList<>();
String edgeLabel = getPropagatedEdgeLabel(classificationName);
Iterable<AtlasEdge> edges = vertex.getEdges(AtlasEdgeDirection.OUT, edgeLabel);
public List<AtlasClassification> getAllClassifications(AtlasVertex entityVertex) throws AtlasBaseException {
List<AtlasClassification> ret = new ArrayList<>();
Iterable edges = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL).edges();
if (edges != null) {
for (Iterator<AtlasEdge> iterator = edges.iterator(); iterator.hasNext(); ) {
Iterator<AtlasEdge> iterator = edges.iterator();
while (iterator.hasNext()) {
AtlasEdge edge = iterator.next();
if (edge != null) {
AtlasClassification classification = toAtlasClassification(edge.getInVertex());
ret.add(classification);
ret.add(toAtlasClassification(edge.getInVertex()));
}
}
}
......@@ -567,42 +462,21 @@ public final class EntityGraphRetriever {
return ret;
}
protected List<AtlasVertex> getPropagationEnabledClassificationVertices(AtlasVertex instanceVertex) {
List<AtlasVertex> ret = new ArrayList<>();
List<AtlasVertex> classificationVertices = getPropagationEnabledClassificationVertices(instanceVertex, false);
List<AtlasVertex> propagatedClassificationVertices = getPropagationEnabledClassificationVertices(instanceVertex, true);
protected List<AtlasVertex> getPropagationEnabledClassificationVertices(AtlasVertex entityVertex) {
List<AtlasVertex> ret = new ArrayList<>();
Iterable edges = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL).edges();
if (CollectionUtils.isNotEmpty(classificationVertices)) {
ret.addAll(classificationVertices);
}
if (CollectionUtils.isNotEmpty(propagatedClassificationVertices)) {
ret.addAll(propagatedClassificationVertices);
}
return ret;
}
private List<AtlasVertex> getPropagationEnabledClassificationVertices(AtlasVertex vertex, boolean propagated) {
List<AtlasVertex> ret = new ArrayList<>();
List<String> classificationNames = (propagated) ? getPropagatedTraitNames(vertex) : getTraitNames(vertex);
if (CollectionUtils.isNotEmpty(classificationNames)) {
for (String classificationName : classificationNames) {
String traitLabel = (propagated) ? getPropagatedEdgeLabel(classificationName) : getTraitLabel(classificationName);
Iterable<AtlasEdge> edges = vertex.getEdges(AtlasEdgeDirection.OUT, traitLabel);
if (edges != null) {
Iterator<AtlasEdge> iterator = edges.iterator();
if (edges != null) {
for (Iterator<AtlasEdge> iterator = edges.iterator(); iterator.hasNext(); ) {
AtlasEdge edge = iterator.next();
while (iterator.hasNext()) {
AtlasEdge edge = iterator.next();
if (edge != null) {
AtlasVertex classificationVertex = edge.getInVertex();
if (edge != null) {
AtlasVertex classificationVertex = edge.getInVertex();
if (isPropagationEnabled(classificationVertex)) {
ret.add(classificationVertex);
}
}
if (isPropagationEnabled(classificationVertex)) {
ret.add(classificationVertex);
}
}
}
......@@ -611,32 +485,10 @@ public final class EntityGraphRetriever {
return ret;
}
public AtlasClassification getClassification(AtlasVertex instanceVertex, String classificationName) throws AtlasBaseException {
AtlasClassification ret = getClassification(instanceVertex, classificationName, false);
// if no classification with the given name was directly associated, look for a propagated classification
if (ret == null) {
ret = getClassification(instanceVertex, classificationName, true);
}
return ret;
}
private AtlasClassification getClassification(AtlasVertex instanceVertex, String classificationName, boolean propagated) throws AtlasBaseException {
String traitLabel = (propagated) ? getPropagatedEdgeLabel(classificationName) : getTraitLabel(classificationName);
Iterable<AtlasEdge> edges = instanceVertex.getEdges(AtlasEdgeDirection.OUT, traitLabel);
AtlasEdge edge = (edges != null && edges.iterator().hasNext()) ? edges.iterator().next() : null;
AtlasClassification ret = edge != null ? toAtlasClassification(edge.getInVertex()) : null;
return ret;
}
private void mapClassifications(AtlasVertex entityVertex, AtlasEntity entity) throws AtlasBaseException {
final List<AtlasClassification> classifications = getClassifications(entityVertex, null);
final List<AtlasClassification> propagatedClassifications = getPropagatedClassifications(entityVertex, null);
final List<AtlasClassification> classifications = getAllClassifications(entityVertex);
entity.setClassifications(classifications);
entity.addClassifications(propagatedClassifications);
}
private Object mapVertexToAttribute(AtlasVertex entityVertex, AtlasAttribute attribute, AtlasEntityExtInfo entityExtInfo) throws AtlasBaseException {
......@@ -1107,7 +959,6 @@ public final class EntityGraphRetriever {
for (AtlasVertex classificationVertex : classificationVertices) {
String classificationName = getTypeName(classificationVertex);
String propagatedEdgeLabel = getPropagatedEdgeLabel(classificationName);
AtlasVertex associatedEntityVertex = getAssociatedEntityVertex(classificationVertex);
AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classificationName);
......@@ -1119,10 +970,10 @@ public final class EntityGraphRetriever {
}
continue;
} else if (edgeExists(impactedEntityVertex, classificationVertex, propagatedEdgeLabel)) {
} else if (edgeExists(impactedEntityVertex, classificationVertex, CLASSIFICATION_LABEL)) {
if (LOG.isDebugEnabled()) {
LOG.debug(" --> Propagated classification edge already exists from [{}] --> [{}][{}] using edge label: [{}]",
getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), propagatedEdgeLabel);
getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL);
}
continue;
......@@ -1142,12 +993,12 @@ public final class EntityGraphRetriever {
if (LOG.isDebugEnabled()) {
LOG.debug(" --> Creating propagated classification edge from [{}] --> [{}][{}] using edge label: [{}]",
getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), propagatedEdgeLabel);
getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL);
}
graphHelper.addEdge(impactedEntityVertex, classificationVertex, propagatedEdgeLabel);
graphHelper.addClassificationEdge(impactedEntityVertex, classificationVertex, true);
GraphHelper.addListProperty(impactedEntityVertex, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, classificationName);
addListProperty(impactedEntityVertex, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, classificationName);
}
}
}
......@@ -1164,19 +1015,18 @@ public final class EntityGraphRetriever {
for (AtlasVertex classificationVertex : classificationVertices) {
String classificationName = getTypeName(classificationVertex);
String propagatedEdgeLabel = getPropagatedEdgeLabel(classificationName);
AtlasVertex associatedEntityVertex = getAssociatedEntityVertex(classificationVertex);
List<AtlasVertex> referrals = graphHelper.getIncludedImpactedVerticesWithReferences(associatedEntityVertex, getRelationshipGuid(edge));
List<AtlasVertex> referrals = graphHelper.getIncludedImpactedVerticesWithReferences(associatedEntityVertex, getRelationshipGuid(edge));
for (AtlasVertex impactedEntityVertex : impactedEntityVertices) {
if (referrals.contains(impactedEntityVertex)) {
if (LOG.isDebugEnabled()) {
if (StringUtils.equals(getGuid(impactedEntityVertex), getGuid(associatedEntityVertex))) {
LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}], since [{}] is associated with [{}]",
getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), propagatedEdgeLabel, classificationName, getTypeName(associatedEntityVertex));
getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL, classificationName, getTypeName(associatedEntityVertex));
} else {
LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}], since [{}] is propagated through other path",
getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), propagatedEdgeLabel, classificationName);
getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL, classificationName);
}
}
......@@ -1184,14 +1034,14 @@ public final class EntityGraphRetriever {
}
// remove propagated classification edge and classificationName from propagatedTraitNames vertex property
if (edgeExists(impactedEntityVertex, classificationVertex, propagatedEdgeLabel)) {
if (edgeExists(impactedEntityVertex, classificationVertex, CLASSIFICATION_LABEL)) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug(" --> Removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}]",
getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), propagatedEdgeLabel);
getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL);
}
AtlasEdge propagatedEdge = graphHelper.getOrCreateEdge(impactedEntityVertex, classificationVertex, propagatedEdgeLabel);
AtlasEdge propagatedEdge = graphHelper.getOrCreateEdge(impactedEntityVertex, classificationVertex, CLASSIFICATION_LABEL);
graphHelper.removeEdge(propagatedEdge);
......@@ -1202,7 +1052,7 @@ public final class EntityGraphRetriever {
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] using edge label: [{}], since edge doesn't exist",
getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), propagatedEdgeLabel);
getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL);
}
}
}
......
......@@ -36,13 +36,13 @@ import static org.testng.Assert.fail;
public class GremlinQueryComposerTest {
@Test
public void classification() {
String expected = "g.V().or(has('__traitNames', within('PII')), has('__propagatedTraitNames', within('PII'))).dedup().limit(25).toList()";
String expected = "g.V().outE('classifiedAs').has('__name', within('PII')).outV().dedup().limit(25).toList()";
verify("PII", expected);
}
@Test()
public void dimension() {
String expected = "g.V().has('__typeName', 'Table').or(has('__traitNames', within('Dimension')), has('__propagatedTraitNames', within('Dimension'))).dedup().limit(25).toList()";
String expected = "g.V().has('__typeName', 'Table').outE('classifiedAs').has('__name', within('Dimension')).outV().dedup().limit(25).toList()";
verify("Table isa Dimension", expected);
verify("Table is Dimension", expected);
verify("Table where Table is Dimension", expected);
......@@ -295,14 +295,14 @@ public class GremlinQueryComposerTest {
@Test
public void keywordsInWhereClause() {
verify("Table as t where t has name and t isa Dimension",
"g.V().has('__typeName', 'Table').as('t').and(__.has('Table.name'),__.or(has('__traitNames', within('Dimension')), has('__propagatedTraitNames', within('Dimension')))).dedup().limit(25).toList()");
"g.V().has('__typeName', 'Table').as('t').and(__.has('Table.name'),__.outE('classifiedAs').has('__name', within('Dimension')).outV()).dedup().limit(25).toList()");
verify("Table as t where t has name and t.name = 'sales_fact'",
"g.V().has('__typeName', 'Table').as('t').and(__.has('Table.name'),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
verify("Table as t where t is Dimension and t.name = 'sales_fact'",
"g.V().has('__typeName', 'Table').as('t').and(__.or(has('__traitNames', within('Dimension')), has('__propagatedTraitNames', within('Dimension'))),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
verify("Table isa 'Dimension' and name = 'sales_fact'", "g.V().has('__typeName', 'Table').and(__.or(has('__traitNames', within('Dimension')), has('__propagatedTraitNames', within('Dimension'))),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
"g.V().has('__typeName', 'Table').as('t').and(__.outE('classifiedAs').has('__name', within('Dimension')).outV(),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
verify("Table isa 'Dimension' and name = 'sales_fact'", "g.V().has('__typeName', 'Table').and(__.outE('classifiedAs').has('__name', within('Dimension')).outV(),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
verify("Table has name and name = 'sales_fact'", "g.V().has('__typeName', 'Table').and(__.has('Table.name'),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
verify("Table is 'Dimension' and Table has owner and name = 'sales_fact'", "g.V().has('__typeName', 'Table').and(__.or(has('__traitNames', within('Dimension')), has('__propagatedTraitNames', within('Dimension'))),__.has('Table.owner'),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
verify("Table is 'Dimension' and Table has owner and name = 'sales_fact'", "g.V().has('__typeName', 'Table').and(__.outE('classifiedAs').has('__name', within('Dimension')).outV(),__.has('Table.owner'),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
verify("Table has name and Table has owner and name = 'sales_fact'", "g.V().has('__typeName', 'Table').and(__.has('Table.name'),__.has('Table.owner'),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment