Commit 8cc12be1 by Sarath Subramanian

ATLAS-2905: Generate lineage information for process entities

parent d5f46e3f
...@@ -173,7 +173,7 @@ public enum AtlasErrorCode { ...@@ -173,7 +173,7 @@ public enum AtlasErrorCode {
RELATIONSHIPDEF_END_TYPE_NAME_NOT_FOUND(404, "ATLAS-404-00-00E", "RelationshipDef {0} endDef typename {0} cannot be found"), RELATIONSHIPDEF_END_TYPE_NAME_NOT_FOUND(404, "ATLAS-404-00-00E", "RelationshipDef {0} endDef typename {0} cannot be found"),
RELATIONSHIP_ALREADY_DELETED(404, "ATLAS-404-00-00F", "Attempting to delete a relationship which is already deleted : {0}"), RELATIONSHIP_ALREADY_DELETED(404, "ATLAS-404-00-00F", "Attempting to delete a relationship which is already deleted : {0}"),
INVALID_ENTITY_GUID_FOR_CLASSIFICATION_UPDATE(404, "ATLAS-404-00-010", "Updating entityGuid of classification is not allowed."), INVALID_ENTITY_GUID_FOR_CLASSIFICATION_UPDATE(404, "ATLAS-404-00-010", "Updating entityGuid of classification is not allowed."),
INSTANCE_GUID_NOT_DATASET(404, "ATLAS-404-00-011", "Given instance guid {0} is not a dataset"), INVALID_LINEAGE_ENTITY_TYPE(404, "ATLAS-404-00-011", "Given instance guid {0} with type {1} is not a valid lineage entity type."),
INSTANCE_GUID_DELETED(404, "ATLAS-404-00-012", "Given instance guid {0} has been deleted"), INSTANCE_GUID_DELETED(404, "ATLAS-404-00-012", "Given instance guid {0} has been deleted"),
NO_PROPAGATED_CLASSIFICATIONS_FOUND_FOR_ENTITY(404, "ATLAS-404-00-013", "No propagated classifications associated with entity: {0}"), NO_PROPAGATED_CLASSIFICATIONS_FOUND_FOR_ENTITY(404, "ATLAS-404-00-013", "No propagated classifications associated with entity: {0}"),
......
...@@ -19,7 +19,6 @@ ...@@ -19,7 +19,6 @@
package org.apache.atlas.discovery; package org.apache.atlas.discovery;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.authorize.AtlasAuthorizationUtils; import org.apache.atlas.authorize.AtlasAuthorizationUtils;
...@@ -33,7 +32,6 @@ import org.apache.atlas.model.instance.AtlasObjectId; ...@@ -33,7 +32,6 @@ import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.lineage.AtlasLineageInfo; import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection; import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation; import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.AtlasVertex;
...@@ -61,7 +59,16 @@ import java.util.Map; ...@@ -61,7 +59,16 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.atlas.AtlasClient.DATA_SET_SUPER_TYPE;
import static org.apache.atlas.AtlasClient.PROCESS_SUPER_TYPE;
import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.BOTH;
import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.INPUT;
import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.OUTPUT;
import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.FULL_LINEAGE_DATASET;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.FULL_LINEAGE_PROCESS;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.PARTIAL_LINEAGE_DATASET;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.PARTIAL_LINEAGE_PROCESS;
@Service @Service
public class EntityLineageService implements AtlasLineageService { public class EntityLineageService implements AtlasLineageService {
...@@ -87,7 +94,7 @@ public class EntityLineageService implements AtlasLineageService { ...@@ -87,7 +94,7 @@ public class EntityLineageService implements AtlasLineageService {
@Override @Override
@GraphTransaction @GraphTransaction
public AtlasLineageInfo getAtlasLineageInfo(String guid, LineageDirection direction, int depth) throws AtlasBaseException { public AtlasLineageInfo getAtlasLineageInfo(String guid, LineageDirection direction, int depth) throws AtlasBaseException {
AtlasLineageInfo lineageInfo; AtlasLineageInfo ret;
AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
...@@ -95,17 +102,28 @@ public class EntityLineageService implements AtlasLineageService { ...@@ -95,17 +102,28 @@ public class EntityLineageService implements AtlasLineageService {
AtlasEntityType entityType = atlasTypeRegistry.getEntityTypeByName(entity.getTypeName()); AtlasEntityType entityType = atlasTypeRegistry.getEntityTypeByName(entity.getTypeName());
if (entityType == null || !entityType.getTypeAndAllSuperTypes().contains(AtlasClient.DATA_SET_SUPER_TYPE)) { if (entityType == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_DATASET, guid); throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, entity.getTypeName());
}
boolean isDataSet = entityType.getTypeAndAllSuperTypes().contains(DATA_SET_SUPER_TYPE);
if (!isDataSet) {
boolean isProcess = entityType.getTypeAndAllSuperTypes().contains(PROCESS_SUPER_TYPE);
if (!isProcess) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, entity.getTypeName());
}
} }
if (direction != null) { if (direction != null) {
if (direction.equals(LineageDirection.INPUT)) { if (direction.equals(INPUT)) {
lineageInfo = getLineageInfo(guid, LineageDirection.INPUT, depth); ret = getLineageInfo(guid, INPUT, depth, isDataSet);
} else if (direction.equals(LineageDirection.OUTPUT)) { } else if (direction.equals(OUTPUT)) {
lineageInfo = getLineageInfo(guid, LineageDirection.OUTPUT, depth); ret = getLineageInfo(guid, OUTPUT, depth, isDataSet);
} else if (direction.equals(LineageDirection.BOTH)) { } else if (direction.equals(BOTH)) {
lineageInfo = getBothLineageInfo(guid, depth); ret = getBothLineageInfo(guid, depth, isDataSet);
} else { } else {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS, "direction", direction.toString()); throw new AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS, "direction", direction.toString());
} }
...@@ -113,7 +131,7 @@ public class EntityLineageService implements AtlasLineageService { ...@@ -113,7 +131,7 @@ public class EntityLineageService implements AtlasLineageService {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS, "direction", null); throw new AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS, "direction", null);
} }
return lineageInfo; return ret;
} }
@Override @Override
...@@ -184,10 +202,10 @@ public class EntityLineageService implements AtlasLineageService { ...@@ -184,10 +202,10 @@ public class EntityLineageService implements AtlasLineageService {
return columnIds.contains(e.getValue().getGuid()); return columnIds.contains(e.getValue().getGuid());
} }
private AtlasLineageInfo getLineageInfo(String guid, LineageDirection direction, int depth) throws AtlasBaseException { private AtlasLineageInfo getLineageInfo(String guid, LineageDirection direction, int depth, boolean isDataSet) throws AtlasBaseException {
Map<String, AtlasEntityHeader> entities = new HashMap<>(); Map<String, AtlasEntityHeader> entities = new HashMap<>();
Set<LineageRelation> relations = new HashSet<>(); Set<LineageRelation> relations = new HashSet<>();
String lineageQuery = getLineageQuery(guid, direction, depth); String lineageQuery = getLineageQuery(guid, direction, depth, isDataSet);
List edgeMapList = (List) graph.executeGremlinScript(lineageQuery, false); List edgeMapList = (List) graph.executeGremlinScript(lineageQuery, false);
...@@ -244,38 +262,39 @@ public class EntityLineageService implements AtlasLineageService { ...@@ -244,38 +262,39 @@ public class EntityLineageService implements AtlasLineageService {
} }
} }
private AtlasLineageInfo getBothLineageInfo(String guid, int depth) throws AtlasBaseException { private AtlasLineageInfo getBothLineageInfo(String guid, int depth, boolean isDataSet) throws AtlasBaseException {
AtlasLineageInfo inputLineage = getLineageInfo(guid, LineageDirection.INPUT, depth); AtlasLineageInfo inputLineage = getLineageInfo(guid, INPUT, depth, isDataSet);
AtlasLineageInfo outputLineage = getLineageInfo(guid, LineageDirection.OUTPUT, depth); AtlasLineageInfo outputLineage = getLineageInfo(guid, OUTPUT, depth, isDataSet);
AtlasLineageInfo ret = inputLineage; AtlasLineageInfo ret = inputLineage;
ret.getRelations().addAll(outputLineage.getRelations()); ret.getRelations().addAll(outputLineage.getRelations());
ret.getGuidEntityMap().putAll(outputLineage.getGuidEntityMap()); ret.getGuidEntityMap().putAll(outputLineage.getGuidEntityMap());
ret.setLineageDirection(LineageDirection.BOTH); ret.setLineageDirection(BOTH);
return ret; return ret;
} }
private String getLineageQuery(String entityGuid, LineageDirection direction, int depth) { private String getLineageQuery(String entityGuid, LineageDirection direction, int depth, boolean isDataSet) {
String lineageQuery = null; String ret = null;
if (direction.equals(LineageDirection.INPUT)) { if (direction.equals(INPUT)) {
lineageQuery = generateLineageQuery(entityGuid, depth, PROCESS_OUTPUTS_EDGE, PROCESS_INPUTS_EDGE); ret = generateLineageQuery(entityGuid, depth, isDataSet, PROCESS_OUTPUTS_EDGE, PROCESS_INPUTS_EDGE);
} else if (direction.equals(LineageDirection.OUTPUT)) { } else if (direction.equals(OUTPUT)) {
lineageQuery = generateLineageQuery(entityGuid, depth, PROCESS_INPUTS_EDGE, PROCESS_OUTPUTS_EDGE); ret = generateLineageQuery(entityGuid, depth, isDataSet, PROCESS_INPUTS_EDGE, PROCESS_OUTPUTS_EDGE);
} }
return lineageQuery; return ret;
} }
private String generateLineageQuery(String entityGuid, int depth, String incomingFrom, String outgoingTo) { private String generateLineageQuery(String entityGuid, int depth, boolean isDataSet, String incomingFrom, String outgoingTo) {
String lineageQuery; String lineageQuery;
if (depth < 1) { if (depth < 1) {
String query = gremlinQueryProvider.getQuery(AtlasGremlinQuery.FULL_LINEAGE); String query = isDataSet ? gremlinQueryProvider.getQuery(FULL_LINEAGE_DATASET) : gremlinQueryProvider.getQuery(FULL_LINEAGE_PROCESS);
lineageQuery = String.format(query, entityGuid, incomingFrom, outgoingTo); lineageQuery = String.format(query, entityGuid, incomingFrom, outgoingTo);
} else { } else {
String query = gremlinQueryProvider.getQuery(AtlasGremlinQuery.PARTIAL_LINEAGE); String query = isDataSet ? gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_DATASET) : gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_PROCESS);
lineageQuery = String.format(query, entityGuid, incomingFrom, outgoingTo, depth); lineageQuery = String.format(query, entityGuid, incomingFrom, outgoingTo, depth);
} }
return lineageQuery; return lineageQuery;
......
...@@ -44,13 +44,13 @@ public class AtlasGremlin2QueryProvider extends AtlasGremlinQueryProvider { ...@@ -44,13 +44,13 @@ public class AtlasGremlin2QueryProvider extends AtlasGremlinQueryProvider {
return "g.V().has('__typeName',typeName).filter({it.getProperty(attrName).matches(attrValue)}).has('__guid').__guid.toList()"; return "g.V().has('__typeName',typeName).filter({it.getProperty(attrName).matches(attrValue)}).has('__guid').__guid.toList()";
case EXPORT_TYPE_DEFAULT: case EXPORT_TYPE_DEFAULT:
return "g.V().has('__typeName',typeName).has(attrName, attrValue).has('__guid').__guid.toList()"; return "g.V().has('__typeName',typeName).has(attrName, attrValue).has('__guid').__guid.toList()";
case FULL_LINEAGE: case FULL_LINEAGE_DATASET:
return "g.V('__guid', '%s').as('src').in('%s').out('%s')." + return "g.V('__guid', '%s').as('src').in('%s').out('%s')." +
"loop('src', {((it.path.contains(it.object)) ? false : true)}, " + "loop('src', {((it.path.contains(it.object)) ? false : true)}, " +
"{((it.object.'__superTypeNames') ? " + "{((it.object.'__superTypeNames') ? " +
"(it.object.'__superTypeNames'.contains('DataSet')) : false)})." + "(it.object.'__superTypeNames'.contains('DataSet')) : false)})." +
"path().toList()"; "path().toList()";
case PARTIAL_LINEAGE: case PARTIAL_LINEAGE_DATASET:
return "g.V('__guid', '%s').as('src').in('%s').out('%s')." + return "g.V('__guid', '%s').as('src').in('%s').out('%s')." +
"loop('src', {it.loops <= %s}, {((it.object.'__superTypeNames') ? " + "loop('src', {it.loops <= %s}, {((it.object.'__superTypeNames') ? " +
"(it.object.'__superTypeNames'.contains('DataSet')) : false)})." + "(it.object.'__superTypeNames'.contains('DataSet')) : false)})." +
......
...@@ -45,10 +45,14 @@ public class AtlasGremlin3QueryProvider extends AtlasGremlin2QueryProvider { ...@@ -45,10 +45,14 @@ public class AtlasGremlin3QueryProvider extends AtlasGremlin2QueryProvider {
return "g.V().has('__guid', startGuid).outE().inV().has('__guid').project('__guid', 'isProcess').by('__guid').by(map {it.get().values('__superTypeNames').toSet().contains('Process')}).dedup().toList()"; return "g.V().has('__guid', startGuid).outE().inV().has('__guid').project('__guid', 'isProcess').by('__guid').by(map {it.get().values('__superTypeNames').toSet().contains('Process')}).dedup().toList()";
case EXPORT_TYPE_ALL_FOR_TYPE: case EXPORT_TYPE_ALL_FOR_TYPE:
return "g.V().has('__typeName', within(typeName)).has('__guid').values('__guid').toList()"; return "g.V().has('__typeName', within(typeName)).has('__guid').values('__guid').toList()";
case FULL_LINEAGE: case FULL_LINEAGE_DATASET:
return "g.V().has('__guid', '%s').repeat(__.inE('%s').as('e1').outV().outE('%s').as('e2').inV()).emit().select('e1', 'e2').toList()"; return "g.V().has('__guid', '%s').repeat(__.inE('%s').as('e1').outV().outE('%s').as('e2').inV()).emit().select('e1', 'e2').toList()";
case PARTIAL_LINEAGE: case PARTIAL_LINEAGE_DATASET:
return "g.V().has('__guid', '%s').repeat(__.inE('%s').as('e1').outV().outE('%s').as('e2').inV()).times(%s).emit().select('e1', 'e2').toList()"; return "g.V().has('__guid', '%s').repeat(__.inE('%s').as('e1').outV().outE('%s').as('e2').inV()).times(%s).emit().select('e1', 'e2').toList()";
case FULL_LINEAGE_PROCESS:
return "g.V().has('__guid', '%s').repeat(__.outE('%s').as('e1').inV().inE('%s').as('e2').outV()).emit().select('e1', 'e2').toList()";
case PARTIAL_LINEAGE_PROCESS:
return "g.V().has('__guid', '%s').repeat(__.outE('%s').as('e1').inV().inE('%s').as('e2').outV()).times(%s).emit().select('e1', 'e2').toList()";
case TO_RANGE_LIST: case TO_RANGE_LIST:
return ".range(startIdx, endIdx).toList()"; return ".range(startIdx, endIdx).toList()";
case RELATIONSHIP_SEARCH: case RELATIONSHIP_SEARCH:
......
...@@ -53,8 +53,10 @@ public abstract class AtlasGremlinQueryProvider { ...@@ -53,8 +53,10 @@ public abstract class AtlasGremlinQueryProvider {
EXPORT_TYPE_DEFAULT, EXPORT_TYPE_DEFAULT,
// Lineage Queries // Lineage Queries
FULL_LINEAGE, FULL_LINEAGE_DATASET,
PARTIAL_LINEAGE, FULL_LINEAGE_PROCESS,
PARTIAL_LINEAGE_DATASET,
PARTIAL_LINEAGE_PROCESS,
// Discovery Queries // Discovery Queries
BASIC_SEARCH_TYPE_FILTER, BASIC_SEARCH_TYPE_FILTER,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment