Commit b32e547e by apoorvnaik Committed by Madhan Neethiraj

ATLAS-2489: Lineage info to include relationship guid

Change-Id: Ie26d1ad07b6ec66beb42830ad154a9dd81e7933f Signed-off-by: 's avatarMadhan Neethiraj <madhan@apache.org>
parent 5ebd7070
......@@ -145,12 +145,14 @@ public class AtlasLineageInfo implements Serializable {
public static class LineageRelation {
private String fromEntityId;
private String toEntityId;
private String relationshipId;
public LineageRelation() { }
public LineageRelation(String fromEntityId, String toEntityId) {
public LineageRelation(String fromEntityId, String toEntityId, final String relationshipId) {
this.fromEntityId = fromEntityId;
this.toEntityId = toEntityId;
this.relationshipId = relationshipId;
}
public String getFromEntityId() {
......@@ -169,18 +171,27 @@ public class AtlasLineageInfo implements Serializable {
this.toEntityId = toEntityId;
}
public String getRelationshipId() {
return relationshipId;
}
public void setRelationshipId(final String relationshipId) {
this.relationshipId = relationshipId;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LineageRelation that = (LineageRelation) o;
return Objects.equals(fromEntityId, that.fromEntityId) &&
Objects.equals(toEntityId, that.toEntityId);
Objects.equals(toEntityId, that.toEntityId) &&
Objects.equals(relationshipId, that.relationshipId);
}
@Override
public int hashCode() {
return Objects.hash(fromEntityId, toEntityId);
return Objects.hash(fromEntityId, toEntityId, relationshipId);
}
@Override
......@@ -188,6 +199,7 @@ public class AtlasLineageInfo implements Serializable {
return "LineageRelation{" +
"fromEntityId='" + fromEntityId + '\'' +
", toEntityId='" + toEntityId + '\'' +
", relationshipId='" + relationshipId + '\'' +
'}';
}
}
......
......@@ -33,6 +33,8 @@ import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
......@@ -46,13 +48,14 @@ import org.apache.atlas.v1.model.lineage.SchemaResponse.SchemaDetails;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
......@@ -60,8 +63,10 @@ import java.util.stream.Collectors;
@Service
public class EntityLineageService implements AtlasLineageService {
private static final String INPUT_PROCESS_EDGE = "__Process.inputs";
private static final String OUTPUT_PROCESS_EDGE = "__Process.outputs";
private static final Logger LOG = LoggerFactory.getLogger(EntityLineageService.class);
private static final String PROCESS_INPUTS_EDGE = "__Process.inputs";
private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs";
private static final String COLUMNS = "columns";
private final AtlasGraph graph;
......@@ -162,7 +167,7 @@ public class EntityLineageService implements AtlasLineageService {
List<String> ret = new ArrayList<>();
Object columnObjs = entity.getAttribute(COLUMNS);
if (columnObjs != null && columnObjs instanceof List) {
if (columnObjs instanceof List) {
for (Object pkObj : (List) columnObjs) {
if (pkObj instanceof AtlasObjectId) {
ret.add(((AtlasObjectId) pkObj).getGuid());
......@@ -182,42 +187,59 @@ public class EntityLineageService implements AtlasLineageService {
Set<LineageRelation> relations = new HashSet<>();
String lineageQuery = getLineageQuery(guid, direction, depth);
List paths = (List) graph.executeGremlinScript(lineageQuery, true);
if (CollectionUtils.isNotEmpty(paths)) {
for (Object path : paths) {
if (path instanceof List) {
List vertices = (List) path;
List edgeMapList = (List) graph.executeGremlinScript(lineageQuery, false);
if (CollectionUtils.isNotEmpty(vertices)) {
AtlasEntityHeader prev = null;
if (CollectionUtils.isNotEmpty(edgeMapList)) {
for (Object edgeMap : edgeMapList) {
if (edgeMap instanceof Map) {
for (final Object o : ((Map) edgeMap).entrySet()) {
final Map.Entry entry = (Map.Entry) o;
Object value = entry.getValue();
for (Object vertex : vertices) {
if (!(vertex instanceof AtlasVertex)) {
continue;
if (value instanceof List) {
for (Object elem : (List) value) {
if (elem instanceof AtlasEdge) {
processEdge((AtlasEdge) elem, entities, relations);
} else {
LOG.warn("Invalid value of type {} found, ignoring", (elem != null ? elem.getClass().getSimpleName() : "null"));
}
AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader((AtlasVertex) vertex);
if (!entities.containsKey(entity.getGuid())) {
entities.put(entity.getGuid(), entity);
}
if (prev != null) {
if (direction.equals(LineageDirection.INPUT)) {
relations.add(new LineageRelation(entity.getGuid(), prev.getGuid()));
} else if (direction.equals(LineageDirection.OUTPUT)) {
relations.add(new LineageRelation(prev.getGuid(), entity.getGuid()));
} else if (value instanceof AtlasEdge) {
processEdge((AtlasEdge) value, entities, relations);
} else {
LOG.warn("Invalid value of type {} found, ignoring", (value != null ? value.getClass().getSimpleName() : "null"));
}
}
prev = entity;
}
}
}
return new AtlasLineageInfo(guid, entities, relations, direction, depth);
}
private void processEdge(final AtlasEdge edge, final Map<String, AtlasEntityHeader> entities, final Set<LineageRelation> relations) throws AtlasBaseException {
AtlasVertex inVertex = edge.getInVertex();
AtlasVertex outVertex = edge.getOutVertex();
String inGuid = AtlasGraphUtilsV1.getIdFromVertex(inVertex);
String outGuid = AtlasGraphUtilsV1.getIdFromVertex(outVertex);
String relationGuid = AtlasGraphUtilsV1.getProperty(edge, Constants.RELATIONSHIP_GUID_PROPERTY_KEY, String.class);
boolean isInputEdge = edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE);
if (!entities.containsKey(inGuid)) {
AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeader(inVertex);
entities.put(inGuid, entityHeader);
}
return new AtlasLineageInfo(guid, entities, relations, direction, depth);
if (!entities.containsKey(outGuid)) {
AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeader(outVertex);
entities.put(outGuid, entityHeader);
}
if (isInputEdge) {
relations.add(new LineageRelation(inGuid, outGuid, relationGuid));
} else {
relations.add(new LineageRelation(outGuid, inGuid, relationGuid));
}
}
private AtlasLineageInfo getBothLineageInfo(String guid, int depth) throws AtlasBaseException {
......@@ -236,10 +258,10 @@ public class EntityLineageService implements AtlasLineageService {
String lineageQuery = null;
if (direction.equals(LineageDirection.INPUT)) {
lineageQuery = generateLineageQuery(entityGuid, depth, OUTPUT_PROCESS_EDGE, INPUT_PROCESS_EDGE);
lineageQuery = generateLineageQuery(entityGuid, depth, PROCESS_OUTPUTS_EDGE, PROCESS_INPUTS_EDGE);
} else if (direction.equals(LineageDirection.OUTPUT)) {
lineageQuery = generateLineageQuery(entityGuid, depth, INPUT_PROCESS_EDGE, OUTPUT_PROCESS_EDGE);
lineageQuery = generateLineageQuery(entityGuid, depth, PROCESS_INPUTS_EDGE, PROCESS_OUTPUTS_EDGE);
}
return lineageQuery;
......
......@@ -44,13 +44,9 @@ public class AtlasGremlin3QueryProvider extends AtlasGremlin2QueryProvider {
case EXPORT_BY_GUID_CONNECTED_OUT_EDGE:
return "g.V().has('__guid', startGuid).outE().inV().has('__guid').project('__guid', 'isProcess').by('__guid').by(map {it.get().values('__superTypeNames').toSet().contains('Process')}).dedup().toList()";
case FULL_LINEAGE:
return "g.V().has('__guid', '%s').repeat(__.in('%s').out('%s'))." +
"emit(has('__superTypeNames').and().properties('__superTypeNames').hasValue('DataSet'))." +
"path().toList()";
return "g.V().has('__guid', '%s').repeat(__.inE('%s').as('e1').outV().outE('%s').as('e2').inV()).emit().select('e1', 'e2').toList()";
case PARTIAL_LINEAGE:
return "g.V().has('__guid', '%s').repeat(__.in('%s').out('%s')).times(%s)." +
"emit(has('__superTypeNames').and().properties('__superTypeNames').hasValue('DataSet'))." +
"path().toList()";
return "g.V().has('__guid', '%s').repeat(__.inE('%s').as('e1').outV().outE('%s').as('e2').inV()).times(%s).emit().select('e1', 'e2').toList()";
case TO_RANGE_LIST:
return ".range(startIdx, endIdx).toList()";
case RELATIONSHIP_SEARCH:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment