Commit 46b9b7c8 by Sarath Subramanian

ATLAS-2927: Update lineage query for Process entities

parent 3b8a34c5
...@@ -41,7 +41,6 @@ import org.apache.atlas.type.AtlasEntityType; ...@@ -41,7 +41,6 @@ import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil; import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.util.AtlasGremlinQueryProvider; import org.apache.atlas.util.AtlasGremlinQueryProvider;
import org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery;
import org.apache.atlas.v1.model.lineage.SchemaResponse.SchemaDetails; import org.apache.atlas.v1.model.lineage.SchemaResponse.SchemaDetails;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils; import org.apache.commons.collections.MapUtils;
...@@ -51,6 +50,8 @@ import org.slf4j.LoggerFactory; ...@@ -51,6 +50,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.inject.Inject; import javax.inject.Inject;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
...@@ -61,6 +62,7 @@ import java.util.stream.Collectors; ...@@ -61,6 +62,7 @@ import java.util.stream.Collectors;
import static org.apache.atlas.AtlasClient.DATA_SET_SUPER_TYPE; import static org.apache.atlas.AtlasClient.DATA_SET_SUPER_TYPE;
import static org.apache.atlas.AtlasClient.PROCESS_SUPER_TYPE; import static org.apache.atlas.AtlasClient.PROCESS_SUPER_TYPE;
import static org.apache.atlas.AtlasErrorCode.INSTANCE_LINEAGE_QUERY_FAILED;
import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.BOTH; import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.BOTH;
import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.INPUT; import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.INPUT;
import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.OUTPUT; import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.OUTPUT;
...@@ -203,16 +205,16 @@ public class EntityLineageService implements AtlasLineageService { ...@@ -203,16 +205,16 @@ public class EntityLineageService implements AtlasLineageService {
} }
private AtlasLineageInfo getLineageInfo(String guid, LineageDirection direction, int depth, boolean isDataSet) throws AtlasBaseException { private AtlasLineageInfo getLineageInfo(String guid, LineageDirection direction, int depth, boolean isDataSet) throws AtlasBaseException {
final Map<String, Object> bindings = new HashMap<>();
String lineageQuery = getLineageQuery(guid, direction, depth, isDataSet, bindings);
List results = executeGremlinScript(bindings, lineageQuery);
Map<String, AtlasEntityHeader> entities = new HashMap<>(); Map<String, AtlasEntityHeader> entities = new HashMap<>();
Set<LineageRelation> relations = new HashSet<>(); Set<LineageRelation> relations = new HashSet<>();
String lineageQuery = getLineageQuery(guid, direction, depth, isDataSet);
List edgeMapList = (List) graph.executeGremlinScript(lineageQuery, false); if (CollectionUtils.isNotEmpty(results)) {
for (Object result : results) {
if (CollectionUtils.isNotEmpty(edgeMapList)) { if (result instanceof Map) {
for (Object edgeMap : edgeMapList) { for (final Object o : ((Map) result).entrySet()) {
if (edgeMap instanceof Map) {
for (final Object o : ((Map) edgeMap).entrySet()) {
final Map.Entry entry = (Map.Entry) o; final Map.Entry entry = (Map.Entry) o;
Object value = entry.getValue(); Object value = entry.getValue();
...@@ -230,6 +232,8 @@ public class EntityLineageService implements AtlasLineageService { ...@@ -230,6 +232,8 @@ public class EntityLineageService implements AtlasLineageService {
LOG.warn("Invalid value of type {} found, ignoring", (value != null ? value.getClass().getSimpleName() : "null")); LOG.warn("Invalid value of type {} found, ignoring", (value != null ? value.getClass().getSimpleName() : "null"));
} }
} }
} else if (result instanceof AtlasEdge) {
processEdge((AtlasEdge) result, entities, relations);
} }
} }
} }
...@@ -237,6 +241,21 @@ public class EntityLineageService implements AtlasLineageService { ...@@ -237,6 +241,21 @@ public class EntityLineageService implements AtlasLineageService {
return new AtlasLineageInfo(guid, entities, relations, direction, depth); return new AtlasLineageInfo(guid, entities, relations, direction, depth);
} }
private List executeGremlinScript(Map<String, Object> bindings, String lineageQuery) throws AtlasBaseException {
List ret;
ScriptEngine engine = graph.getGremlinScriptEngine();
try {
ret = (List) graph.executeGremlinScript(engine, bindings, lineageQuery, false);
} catch (ScriptException e) {
throw new AtlasBaseException(INSTANCE_LINEAGE_QUERY_FAILED, lineageQuery);
} finally {
graph.releaseGremlinScriptEngine(engine);
}
return ret;
}
private void processEdge(final AtlasEdge edge, final Map<String, AtlasEntityHeader> entities, final Set<LineageRelation> relations) throws AtlasBaseException { private void processEdge(final AtlasEdge edge, final Map<String, AtlasEntityHeader> entities, final Set<LineageRelation> relations) throws AtlasBaseException {
AtlasVertex inVertex = edge.getInVertex(); AtlasVertex inVertex = edge.getInVertex();
AtlasVertex outVertex = edge.getOutVertex(); AtlasVertex outVertex = edge.getOutVertex();
...@@ -274,29 +293,32 @@ public class EntityLineageService implements AtlasLineageService { ...@@ -274,29 +293,32 @@ public class EntityLineageService implements AtlasLineageService {
return ret; return ret;
} }
private String getLineageQuery(String entityGuid, LineageDirection direction, int depth, boolean isDataSet) { private String getLineageQuery(String entityGuid, LineageDirection direction, int depth, boolean isDataSet, Map<String, Object> bindings) {
String ret = null; String incomingFrom = null;
String outgoingTo = null;
String ret;
if (direction.equals(INPUT)) { if (direction.equals(INPUT)) {
ret = generateLineageQuery(entityGuid, depth, isDataSet, PROCESS_OUTPUTS_EDGE, PROCESS_INPUTS_EDGE); incomingFrom = PROCESS_OUTPUTS_EDGE;
outgoingTo = PROCESS_INPUTS_EDGE;
} else if (direction.equals(OUTPUT)) { } else if (direction.equals(OUTPUT)) {
ret = generateLineageQuery(entityGuid, depth, isDataSet, PROCESS_INPUTS_EDGE, PROCESS_OUTPUTS_EDGE); incomingFrom = PROCESS_INPUTS_EDGE;
outgoingTo = PROCESS_OUTPUTS_EDGE;
} }
return ret; bindings.put("guid", entityGuid);
} bindings.put("incomingEdgeLabel", incomingFrom);
bindings.put("outgoingEdgeLabel", outgoingTo);
private String generateLineageQuery(String entityGuid, int depth, boolean isDataSet, String incomingFrom, String outgoingTo) { bindings.put("depth", depth);
String lineageQuery;
if (depth < 1) { if (depth < 1) {
String query = isDataSet ? gremlinQueryProvider.getQuery(FULL_LINEAGE_DATASET) : gremlinQueryProvider.getQuery(FULL_LINEAGE_PROCESS); ret = isDataSet ? gremlinQueryProvider.getQuery(FULL_LINEAGE_DATASET) :
lineageQuery = String.format(query, entityGuid, incomingFrom, outgoingTo); gremlinQueryProvider.getQuery(FULL_LINEAGE_PROCESS);
} else { } else {
String query = isDataSet ? gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_DATASET) : gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_PROCESS); ret = isDataSet ? gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_DATASET) :
lineageQuery = String.format(query, entityGuid, incomingFrom, outgoingTo, depth); gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_PROCESS);
} }
return lineageQuery;
return ret;
} }
} }
\ No newline at end of file
...@@ -46,13 +46,13 @@ public class AtlasGremlin3QueryProvider extends AtlasGremlin2QueryProvider { ...@@ -46,13 +46,13 @@ public class AtlasGremlin3QueryProvider extends AtlasGremlin2QueryProvider {
case EXPORT_TYPE_ALL_FOR_TYPE: case EXPORT_TYPE_ALL_FOR_TYPE:
return "g.V().has('__typeName', within(typeName)).has('__guid').values('__guid').toList()"; return "g.V().has('__typeName', within(typeName)).has('__guid').values('__guid').toList()";
case FULL_LINEAGE_DATASET: case FULL_LINEAGE_DATASET:
return "g.V().has('__guid', '%s').repeat(__.inE('%s').as('e1').outV().outE('%s').as('e2').inV()).emit().select('e1', 'e2').toList()"; return "g.V().has('__guid', guid).repeat(__.inE(incomingEdgeLabel).as('e1').outV().outE(outgoingEdgeLabel).as('e2').inV()).emit().select('e1', 'e2').toList()";
case PARTIAL_LINEAGE_DATASET: case PARTIAL_LINEAGE_DATASET:
return "g.V().has('__guid', '%s').repeat(__.inE('%s').as('e1').outV().outE('%s').as('e2').inV()).times(%s).emit().select('e1', 'e2').toList()"; return "g.V().has('__guid', guid).repeat(__.inE(incomingEdgeLabel).as('e1').outV().outE(outgoingEdgeLabel).as('e2').inV()).times(depth).emit().select('e1', 'e2').toList()";
case FULL_LINEAGE_PROCESS: case FULL_LINEAGE_PROCESS:
return "g.V().has('__guid', '%s').repeat(__.outE('%s').as('e1').inV().inE('%s').as('e2').outV()).emit().select('e1', 'e2').toList()"; return "g.V().has('__guid', guid).outE(outgoingEdgeLabel).store('e').inV().repeat(__.inE(incomingEdgeLabel).store('e').outV().outE(outgoingEdgeLabel).store('e').inV()).cap('e').unfold().toList()";
case PARTIAL_LINEAGE_PROCESS: case PARTIAL_LINEAGE_PROCESS:
return "g.V().has('__guid', '%s').repeat(__.outE('%s').as('e1').inV().inE('%s').as('e2').outV()).times(%s).emit().select('e1', 'e2').toList()"; return "g.V().has('__guid', guid).outE(outgoingEdgeLabel).store('e').inV().repeat(__.inE(incomingEdgeLabel).store('e').outV().outE(outgoingEdgeLabel).store('e').inV()).times(depth).cap('e').unfold().toList()";
case TO_RANGE_LIST: case TO_RANGE_LIST:
return ".range(startIdx, endIdx).toList()"; return ".range(startIdx, endIdx).toList()";
case RELATIONSHIP_SEARCH: case RELATIONSHIP_SEARCH:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment