Commit 25f3002e by Ashutosh Mestry

ATLAS-3762: Improve Edge creator using Genuine iterator.

parent 204275cc
...@@ -45,6 +45,16 @@ public interface AtlasVertex<V, E> extends AtlasElement { ...@@ -45,6 +45,16 @@ public interface AtlasVertex<V, E> extends AtlasElement {
*/ */
Iterable<AtlasEdge<V, E>> getEdges(AtlasEdgeDirection direction, String[] edgeLabels); Iterable<AtlasEdge<V, E>> getEdges(AtlasEdgeDirection direction, String[] edgeLabels);
long getEdgesCount(AtlasEdgeDirection direction, String edgeLabel);
/**
* Does vertex have edges specified by the direction and label
* @param dir
* @param edgeLabel
* @return
*/
boolean hasEdges(AtlasEdgeDirection dir, String edgeLabel);
/** /**
* Gets the edges associated with this vertex going the * Gets the edges associated with this vertex going the
* specified direction. * specified direction.
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.atlas.repository.graphdb.janus; package org.apache.atlas.repository.graphdb.janus;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
...@@ -25,7 +26,19 @@ import org.apache.atlas.AtlasErrorCode; ...@@ -25,7 +26,19 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.groovy.GroovyExpression; import org.apache.atlas.groovy.GroovyExpression;
import org.apache.atlas.repository.graphdb.*; import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphIndexClient;
import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
import org.apache.atlas.repository.graphdb.AtlasGraphTraversal;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasIndexQueryParameter;
import org.apache.atlas.repository.graphdb.AtlasPropertyKey;
import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.graphdb.GraphIndexQueryParameters;
import org.apache.atlas.repository.graphdb.GremlinVersion;
import org.apache.atlas.repository.graphdb.janus.query.AtlasJanusGraphQuery; import org.apache.atlas.repository.graphdb.janus.query.AtlasJanusGraphQuery;
import org.apache.atlas.repository.graphdb.utils.IteratorToIterableAdapter; import org.apache.atlas.repository.graphdb.utils.IteratorToIterableAdapter;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
...@@ -62,9 +75,12 @@ import javax.script.ScriptEngine; ...@@ -62,9 +75,12 @@ import javax.script.ScriptEngine;
import javax.script.ScriptException; import javax.script.ScriptException;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.*; import java.util.Collection;
import java.util.stream.Collectors; import java.util.HashSet;
import java.util.stream.StreamSupport; import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_DEFAULT; import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_DEFAULT;
import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_PROPERTY; import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_PROPERTY;
...@@ -403,7 +419,11 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE ...@@ -403,7 +419,11 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE
} }
public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> wrapVertices(Iterable<? extends Vertex> it) { public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> wrapVertices(Iterable<? extends Vertex> it) {
return StreamSupport.stream(it.spliterator(), false).map(input -> GraphDbObjectFactory.createVertex(AtlasJanusGraph.this, input)).collect(Collectors.toList());
return Iterables.transform(it,
(Function<Vertex, AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>>) input ->
GraphDbObjectFactory.createVertex(AtlasJanusGraph.this, input));
} }
public Iterable<AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>> wrapEdges(Iterator<? extends Edge> it) { public Iterable<AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>> wrapEdges(Iterator<? extends Edge> it) {
...@@ -413,7 +433,11 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE ...@@ -413,7 +433,11 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE
} }
public Iterable<AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>> wrapEdges(Iterable<? extends Edge> it) { public Iterable<AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>> wrapEdges(Iterable<? extends Edge> it) {
return StreamSupport.stream(it.spliterator(), false).map(input -> GraphDbObjectFactory.createEdge(AtlasJanusGraph.this, input)).collect(Collectors.toList());
return Iterables.transform(it,
(Function<Edge, AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>>) input ->
GraphDbObjectFactory.createEdge(AtlasJanusGraph.this, input));
} }
public void addMultiProperties(Set<String> names) { public void addMultiProperties(Set<String> names) {
......
...@@ -20,12 +20,14 @@ package org.apache.atlas.repository.graphdb.janus; ...@@ -20,12 +20,14 @@ package org.apache.atlas.repository.graphdb.janus;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.stream.StreamSupport;
import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException; import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.graphdb.AtlasVertexQuery; import org.apache.atlas.repository.graphdb.AtlasVertexQuery;
import org.apache.atlas.repository.graphdb.utils.IteratorToIterableAdapter;
import org.apache.tinkerpop.gremlin.structure.Direction; import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.Vertex;
...@@ -78,6 +80,21 @@ public class AtlasJanusVertex extends AtlasJanusElement<Vertex> implements Atlas ...@@ -78,6 +80,21 @@ public class AtlasJanusVertex extends AtlasJanusElement<Vertex> implements Atlas
return graph.wrapEdges(edges); return graph.wrapEdges(edges);
} }
@Override
public long getEdgesCount(AtlasEdgeDirection dir, String edgeLabel) {
Direction direction = AtlasJanusObjectFactory.createDirection(dir);
Iterator<Edge> it = getWrappedElement().edges(direction, edgeLabel);
IteratorToIterableAdapter<Edge> iterable = new IteratorToIterableAdapter<>(it);
return StreamSupport.stream(iterable.spliterator(), true).count();
}
@Override
public boolean hasEdges(AtlasEdgeDirection dir, String edgeLabel) {
Direction direction = AtlasJanusObjectFactory.createDirection(dir);
Iterator<Edge> edges = getWrappedElement().edges(direction, edgeLabel);
return edges.hasNext();
}
private JanusGraphVertex getAsJanusVertex() { private JanusGraphVertex getAsJanusVertex() {
return (JanusGraphVertex)getWrappedElement(); return (JanusGraphVertex)getWrappedElement();
} }
......
...@@ -35,7 +35,6 @@ import org.apache.atlas.repository.graphdb.AtlasVertexQuery; ...@@ -35,7 +35,6 @@ import org.apache.atlas.repository.graphdb.AtlasVertexQuery;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.type.AtlasArrayType; import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasMapType; import org.apache.atlas.type.AtlasMapType;
import org.apache.atlas.util.AtlasGremlinQueryProvider;
import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.atlas.v1.model.instance.Id; import org.apache.atlas.v1.model.instance.Id;
import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Referenceable;
...@@ -72,7 +71,6 @@ import java.util.List; ...@@ -72,7 +71,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.UUID;
import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE; import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED; import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
...@@ -269,6 +267,21 @@ public final class GraphHelper { ...@@ -269,6 +267,21 @@ public final class GraphHelper {
return ret; return ret;
} }
public static long getAdjacentEdgesCountByLabel(AtlasVertex instanceVertex, AtlasEdgeDirection direction, final String edgeLabel) {
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("getAdjacentEdgesCountByLabel");
if (LOG.isDebugEnabled()) {
LOG.debug("Finding edges for {} with label {}", string(instanceVertex), edgeLabel);
}
long ret = 0;
if(instanceVertex != null && edgeLabel != null) {
ret = instanceVertex.getEdgesCount(direction, edgeLabel);
}
RequestContext.get().endMetricRecord(metric);
return ret;
}
public static boolean isPropagationEnabled(AtlasVertex classificationVertex) { public static boolean isPropagationEnabled(AtlasVertex classificationVertex) {
boolean ret = false; boolean ret = false;
...@@ -437,6 +450,14 @@ public final class GraphHelper { ...@@ -437,6 +450,14 @@ public final class GraphHelper {
return getAdjacentEdgesByLabel(instanceVertex, AtlasEdgeDirection.OUT, edgeLabel); return getAdjacentEdgesByLabel(instanceVertex, AtlasEdgeDirection.OUT, edgeLabel);
} }
public static long getOutGoingEdgesCountByLabel(AtlasVertex instanceVertex, String edgeLabel) {
return getAdjacentEdgesCountByLabel(instanceVertex, AtlasEdgeDirection.OUT, edgeLabel);
}
public static long getInComingEdgesCountByLabel(AtlasVertex instanceVertex, String edgeLabel) {
return getAdjacentEdgesCountByLabel(instanceVertex, AtlasEdgeDirection.IN, edgeLabel);
}
public AtlasEdge getEdgeForLabel(AtlasVertex vertex, String edgeLabel, AtlasRelationshipEdgeDirection edgeDirection) { public AtlasEdge getEdgeForLabel(AtlasVertex vertex, String edgeLabel, AtlasRelationshipEdgeDirection edgeDirection) {
AtlasEdge ret; AtlasEdge ret;
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package org.apache.atlas.repository.store.graph.v2; package org.apache.atlas.repository.store.graph.v2;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.authorize.AtlasAuthorizationUtils; import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.authorize.AtlasPrivilege; import org.apache.atlas.authorize.AtlasPrivilege;
...@@ -49,6 +50,7 @@ import org.apache.atlas.type.AtlasRelationshipType; ...@@ -49,6 +50,7 @@ import org.apache.atlas.type.AtlasRelationshipType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute; import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil; import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils; import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
...@@ -67,24 +69,25 @@ import java.util.Map; ...@@ -67,24 +69,25 @@ import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.function.Function;
import static org.apache.atlas.AtlasConfiguration.NOTIFICATION_RELATIONSHIPS_ENABLED;
import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE; import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED; import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.*; import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.BOTH;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.NONE;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.TWO_TO_ONE;
import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.HOME_ID_KEY; import static org.apache.atlas.repository.Constants.HOME_ID_KEY;
import static org.apache.atlas.repository.Constants.PROVENANCE_TYPE_KEY; import static org.apache.atlas.repository.Constants.PROVENANCE_TYPE_KEY;
import static org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY; import static org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY;
import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.VERSION_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.VERSION_PROPERTY_KEY;
import static org.apache.atlas.AtlasConfiguration.NOTIFICATION_RELATIONSHIPS_ENABLED;
import static org.apache.atlas.repository.graph.GraphHelper.getBlockedClassificationIds; import static org.apache.atlas.repository.graph.GraphHelper.getBlockedClassificationIds;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityGuid; import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityGuid;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationName; import static org.apache.atlas.repository.graph.GraphHelper.getClassificationName;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagatableClassifications; import static org.apache.atlas.repository.graph.GraphHelper.getPropagatableClassifications;
import static org.apache.atlas.repository.graph.GraphHelper.getIncomingEdgesByLabel;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags; import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getState; import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getState;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getTypeName; import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getTypeName;
...@@ -776,23 +779,39 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore { ...@@ -776,23 +779,39 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
} }
public AtlasEdge getRelationshipEdge(AtlasVertex fromVertex, AtlasVertex toVertex, String relationshipLabel) { public AtlasEdge getRelationshipEdge(AtlasVertex fromVertex, AtlasVertex toVertex, String relationshipLabel) {
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("getRelationshipEdge");
AtlasEdge ret = null; AtlasEdge ret = null;
Iterator<AtlasEdge> edgesIterator = getIncomingEdgesByLabel(toVertex, relationshipLabel);
if (toVertex.hasEdges(AtlasEdgeDirection.IN, relationshipLabel) && fromVertex.hasEdges(AtlasEdgeDirection.OUT, relationshipLabel)) {
long fromVertexOutgoingEdgeCount = graphHelper.getOutGoingEdgesCountByLabel(fromVertex, relationshipLabel);
long toVertexIncomingEdgeCount = graphHelper.getInComingEdgesCountByLabel(toVertex, relationshipLabel);
if (toVertexIncomingEdgeCount < fromVertexOutgoingEdgeCount) {
Iterator<AtlasEdge> edgesIteratorIn = graphHelper.getIncomingEdgesByLabel(toVertex, relationshipLabel);
ret = getActiveEdgeFromList(edgesIteratorIn, fromVertex.getId(), e -> e.getOutVertex().getId());
} else {
Iterator<AtlasEdge> edgesIteratorOut = graphHelper.getOutGoingEdgesByLabel(fromVertex, relationshipLabel);
ret = getActiveEdgeFromList(edgesIteratorOut, toVertex.getId(), e -> e.getInVertex().getId());
}
}
RequestContext.get().endMetricRecord(metric);
return ret;
}
private AtlasEdge getActiveEdgeFromList(Iterator<AtlasEdge> edgesIterator, Object vertexIdToCompare, Function<AtlasEdge, Object> edgeIdFn) {
while (edgesIterator != null && edgesIterator.hasNext()) { while (edgesIterator != null && edgesIterator.hasNext()) {
AtlasEdge edge = edgesIterator.next(); AtlasEdge edge = edgesIterator.next();
if (edge != null) { if (edge != null) {
Status status = graphHelper.getStatus(edge); Status status = graphHelper.getStatus(edge);
if ((status == null || status == ACTIVE) && edge.getOutVertex().getId().equals(fromVertex.getId())) { if ((status == null || status == ACTIVE) && edgeIdFn.apply(edge).equals(vertexIdToCompare)) {
ret = edge; return edge;
break;
} }
} }
} }
return ret; return null;
} }
private Long getRelationshipVersion(AtlasRelationship relationship) { private Long getRelationshipVersion(AtlasRelationship relationship) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment