Commit cf455a52 by Sarath Subramanian

ATLAS-3558: Improve lineage performance using in-memory traversal

parent 6435223d
...@@ -62,7 +62,8 @@ public enum AtlasConfiguration { ...@@ -62,7 +62,8 @@ public enum AtlasConfiguration {
CUSTOM_ATTRIBUTE_KEY_MAX_LENGTH("atlas.custom.attribute.key.max.length", 50), CUSTOM_ATTRIBUTE_KEY_MAX_LENGTH("atlas.custom.attribute.key.max.length", 50),
CUSTOM_ATTRIBUTE_VALUE_MAX_LENGTH("atlas.custom.attribute.value.max.length", 500), CUSTOM_ATTRIBUTE_VALUE_MAX_LENGTH("atlas.custom.attribute.value.max.length", 500),
LABEL_MAX_LENGTH("atlas.entity.label.max.length", 50), LABEL_MAX_LENGTH("atlas.entity.label.max.length", 50),
IMPORT_TEMP_DIRECTORY("atlas.import.temp.directory", ""); IMPORT_TEMP_DIRECTORY("atlas.import.temp.directory", ""),
LINEAGE_USING_GREMLIN("atlas.lineage.query.use.gremlin", false);
private static final Configuration APPLICATION_PROPERTIES; private static final Configuration APPLICATION_PROPERTIES;
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package org.apache.atlas.discovery; package org.apache.atlas.discovery;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.authorize.AtlasAuthorizationUtils; import org.apache.atlas.authorize.AtlasAuthorizationUtils;
...@@ -33,6 +34,7 @@ import org.apache.atlas.model.lineage.AtlasLineageInfo; ...@@ -33,6 +34,7 @@ import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection; import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation; import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation;
import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
...@@ -67,6 +69,8 @@ import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.B ...@@ -67,6 +69,8 @@ import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.B
import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.INPUT; import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.INPUT;
import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.OUTPUT; import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.OUTPUT;
import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
import static org.apache.atlas.repository.graphdb.AtlasEdgeDirection.IN;
import static org.apache.atlas.repository.graphdb.AtlasEdgeDirection.OUT;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.FULL_LINEAGE_DATASET; import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.FULL_LINEAGE_DATASET;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.FULL_LINEAGE_PROCESS; import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.FULL_LINEAGE_PROCESS;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.PARTIAL_LINEAGE_DATASET; import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.PARTIAL_LINEAGE_DATASET;
...@@ -76,9 +80,10 @@ import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery. ...@@ -76,9 +80,10 @@ import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.
public class EntityLineageService implements AtlasLineageService { public class EntityLineageService implements AtlasLineageService {
private static final Logger LOG = LoggerFactory.getLogger(EntityLineageService.class); private static final Logger LOG = LoggerFactory.getLogger(EntityLineageService.class);
private static final String PROCESS_INPUTS_EDGE = "__Process.inputs"; private static final String PROCESS_INPUTS_EDGE = "__Process.inputs";
private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs"; private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs";
private static final String COLUMNS = "columns"; private static final String COLUMNS = "columns";
private static final boolean LINEAGE_USING_GREMLIN = AtlasConfiguration.LINEAGE_USING_GREMLIN.getBoolean();
private final AtlasGraph graph; private final AtlasGraph graph;
private final AtlasGremlinQueryProvider gremlinQueryProvider; private final AtlasGremlinQueryProvider gremlinQueryProvider;
...@@ -116,21 +121,12 @@ public class EntityLineageService implements AtlasLineageService { ...@@ -116,21 +121,12 @@ public class EntityLineageService implements AtlasLineageService {
if (!isProcess) { if (!isProcess) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, entity.getTypeName()); throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, entity.getTypeName());
} }
} }
if (direction != null) { if (LINEAGE_USING_GREMLIN) {
if (direction.equals(INPUT)) { ret = getLineageInfoV1(guid, direction, depth, isDataSet);
ret = getLineageInfo(guid, INPUT, depth, isDataSet);
} else if (direction.equals(OUTPUT)) {
ret = getLineageInfo(guid, OUTPUT, depth, isDataSet);
} else if (direction.equals(BOTH)) {
ret = getBothLineageInfo(guid, depth, isDataSet);
} else {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS, "direction", direction.toString());
}
} else { } else {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS, "direction", null); ret = getLineageInfoV2(guid, direction, depth, isDataSet);
} }
return ret; return ret;
...@@ -204,6 +200,20 @@ public class EntityLineageService implements AtlasLineageService { ...@@ -204,6 +200,20 @@ public class EntityLineageService implements AtlasLineageService {
return columnIds.contains(e.getValue().getGuid()); return columnIds.contains(e.getValue().getGuid());
} }
private AtlasLineageInfo getLineageInfoV1(String guid, LineageDirection direction, int depth, boolean isDataSet) throws AtlasBaseException {
AtlasLineageInfo ret;
if (direction.equals(INPUT)) {
ret = getLineageInfo(guid, INPUT, depth, isDataSet);
} else if (direction.equals(OUTPUT)) {
ret = getLineageInfo(guid, OUTPUT, depth, isDataSet);
} else {
ret = getBothLineageInfoV1(guid, depth, isDataSet);
}
return ret;
}
private AtlasLineageInfo getLineageInfo(String guid, LineageDirection direction, int depth, boolean isDataSet) throws AtlasBaseException { private AtlasLineageInfo getLineageInfo(String guid, LineageDirection direction, int depth, boolean isDataSet) throws AtlasBaseException {
final Map<String, Object> bindings = new HashMap<>(); final Map<String, Object> bindings = new HashMap<>();
String lineageQuery = getLineageQuery(guid, direction, depth, isDataSet, bindings); String lineageQuery = getLineageQuery(guid, direction, depth, isDataSet, bindings);
...@@ -241,6 +251,122 @@ public class EntityLineageService implements AtlasLineageService { ...@@ -241,6 +251,122 @@ public class EntityLineageService implements AtlasLineageService {
return new AtlasLineageInfo(guid, entities, relations, direction, depth); return new AtlasLineageInfo(guid, entities, relations, direction, depth);
} }
private AtlasLineageInfo getLineageInfoV2(String guid, LineageDirection direction, int depth, boolean isDataSet) throws AtlasBaseException {
AtlasLineageInfo ret = initializeLineageInfo(guid, direction, depth);
if (depth == 0) {
depth = -1;
}
if (isDataSet) {
AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(guid);
if (direction == INPUT || direction == BOTH) {
traverseEdges(datasetVertex, true, depth, ret);
}
if (direction == OUTPUT || direction == BOTH) {
traverseEdges(datasetVertex, false, depth, ret);
}
} else {
AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(guid);
// make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1'
if (direction == INPUT || direction == BOTH) {
Iterable<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_INPUTS_EDGE);
for (AtlasEdge processEdge : processEdges) {
addEdgeToResult(processEdge, ret);
AtlasVertex datasetVertex = processEdge.getInVertex();
traverseEdges(datasetVertex, true, depth - 1, ret);
}
}
if (direction == OUTPUT || direction == BOTH) {
Iterable<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_OUTPUTS_EDGE);
for (AtlasEdge processEdge : processEdges) {
addEdgeToResult(processEdge, ret);
AtlasVertex datasetVertex = processEdge.getInVertex();
traverseEdges(datasetVertex, false, depth - 1, ret);
}
}
}
return ret;
}
private void traverseEdges(AtlasVertex datasetVertex, boolean isInput, int depth, AtlasLineageInfo ret) throws AtlasBaseException {
traverseEdges(datasetVertex, isInput, depth, new HashSet<>(), ret);
}
private void traverseEdges(AtlasVertex datasetVertex, boolean isInput, int depth, Set<String> visitedVertices, AtlasLineageInfo ret) throws AtlasBaseException {
if (depth != 0) {
// keep track of visited vertices to avoid circular loop
visitedVertices.add(getId(datasetVertex));
Iterable<AtlasEdge> incomingEdges = datasetVertex.getEdges(IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE);
for (AtlasEdge incomingEdge : incomingEdges) {
AtlasVertex processVertex = incomingEdge.getOutVertex();
Iterable<AtlasEdge> outgoingEdges = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE);
for (AtlasEdge outgoingEdge : outgoingEdges) {
AtlasVertex entityVertex = outgoingEdge.getInVertex();
if (entityVertex != null) {
addEdgeToResult(incomingEdge, ret);
addEdgeToResult(outgoingEdge, ret);
if (!visitedVertices.contains(getId(entityVertex))) {
traverseEdges(entityVertex, isInput, depth - 1, visitedVertices, ret);
}
}
}
}
}
}
private void addEdgeToResult(AtlasEdge edge, AtlasLineageInfo lineageInfo) throws AtlasBaseException {
if (!lineageContainsEdge(lineageInfo, edge)) {
processEdge(edge, lineageInfo);
}
}
private boolean lineageContainsEdge(AtlasLineageInfo lineageInfo, AtlasEdge edge) {
boolean ret = false;
if (lineageInfo != null && CollectionUtils.isNotEmpty(lineageInfo.getRelations()) && edge != null) {
String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class);
Set<LineageRelation> relations = lineageInfo.getRelations();
for (LineageRelation relation : relations) {
if (relation.getRelationshipId().equals(relationGuid)) {
ret = true;
break;
}
}
}
return ret;
}
private void processEdge(final AtlasEdge edge, final AtlasLineageInfo lineageInfo) throws AtlasBaseException {
processEdge(edge, lineageInfo.getGuidEntityMap(), lineageInfo.getRelations());
}
private AtlasLineageInfo initializeLineageInfo(String guid, LineageDirection direction, int depth) {
return new AtlasLineageInfo(guid, new HashMap<>(), new HashSet<>(), direction, depth);
}
private static String getId(AtlasVertex vertex) {
return vertex.getIdForDisplay();
}
private List executeGremlinScript(Map<String, Object> bindings, String lineageQuery) throws AtlasBaseException { private List executeGremlinScript(Map<String, Object> bindings, String lineageQuery) throws AtlasBaseException {
List ret; List ret;
ScriptEngine engine = graph.getGremlinScriptEngine(); ScriptEngine engine = graph.getGremlinScriptEngine();
...@@ -281,7 +407,7 @@ public class EntityLineageService implements AtlasLineageService { ...@@ -281,7 +407,7 @@ public class EntityLineageService implements AtlasLineageService {
} }
} }
private AtlasLineageInfo getBothLineageInfo(String guid, int depth, boolean isDataSet) throws AtlasBaseException { private AtlasLineageInfo getBothLineageInfoV1(String guid, int depth, boolean isDataSet) throws AtlasBaseException {
AtlasLineageInfo inputLineage = getLineageInfo(guid, INPUT, depth, isDataSet); AtlasLineageInfo inputLineage = getLineageInfo(guid, INPUT, depth, isDataSet);
AtlasLineageInfo outputLineage = getLineageInfo(guid, OUTPUT, depth, isDataSet); AtlasLineageInfo outputLineage = getLineageInfo(guid, OUTPUT, depth, isDataSet);
AtlasLineageInfo ret = inputLineage; AtlasLineageInfo ret = inputLineage;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment