Commit f2a49bea by Sarath Subramanian

ATLAS-1988: added REST API to search for related entities

Signed-off-by: 's avatarMadhan Neethiraj <madhan@apache.org> (cherry picked from commit de60a6560113e311cc46a773bedd4071c7d413fe)
parent 63ed2a5d
...@@ -92,7 +92,10 @@ public enum AtlasErrorCode { ...@@ -92,7 +92,10 @@ public enum AtlasErrorCode {
RELATIONSHIP_INVALID_ENDTYPE(400, "ATLAS-400-00-045", "Invalid entity-type for relationship attribute ‘{0}’: entity specified (guid={1}) is of type ‘{2}’, but expected type is ‘{3}’"), RELATIONSHIP_INVALID_ENDTYPE(400, "ATLAS-400-00-045", "Invalid entity-type for relationship attribute ‘{0}’: entity specified (guid={1}) is of type ‘{2}’, but expected type is ‘{3}’"),
UNKNOWN_CLASSIFICATION(400, "ATLAS-400-00-046", "{0}: Unknown/invalid classification"), UNKNOWN_CLASSIFICATION(400, "ATLAS-400-00-046", "{0}: Unknown/invalid classification"),
INVALID_SEARCH_PARAMS(400, "ATLAS-400-00-047", "No search parameter was found. One of the following MUST be specified in the request; typeName, classification or queryText"), INVALID_SEARCH_PARAMS(400, "ATLAS-400-00-047", "No search parameter was found. One of the following MUST be specified in the request; typeName, classification or queryText"),
// All Not found enums go here INVALID_RELATIONSHIP_ATTRIBUTE(400, "ATLAS-400-00-048", "Expected attribute {0} to be a relationship but found type {}"),
INVALID_RELATIONSHIP_TYPE(400, "ATLAS-400-00-049", "Invalid entity type '{0}', guid '{1}' in relationship search"),
// All Not found enums go here
TYPE_NAME_NOT_FOUND(404, "ATLAS-404-00-001", "Given typename {0} was invalid"), TYPE_NAME_NOT_FOUND(404, "ATLAS-404-00-001", "Given typename {0} was invalid"),
TYPE_GUID_NOT_FOUND(404, "ATLAS-404-00-002", "Given type guid {0} was invalid"), TYPE_GUID_NOT_FOUND(404, "ATLAS-404-00-002", "Given type guid {0} was invalid"),
EMPTY_RESULTS(404, "ATLAS-404-00-004", "No result found for {0}"), EMPTY_RESULTS(404, "ATLAS-404-00-004", "No result found for {0}"),
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas;
public enum SortOrder {
ASCENDING, DESCENDING
}
\ No newline at end of file
...@@ -185,7 +185,7 @@ public class AtlasSearchResult implements Serializable { ...@@ -185,7 +185,7 @@ public class AtlasSearchResult implements Serializable {
'}'; '}';
} }
public enum AtlasQueryType { DSL, FULL_TEXT, GREMLIN, BASIC, ATTRIBUTE } public enum AtlasQueryType { DSL, FULL_TEXT, GREMLIN, BASIC, ATTRIBUTE, RELATIONSHIP }
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE) @JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package org.apache.atlas.discovery; package org.apache.atlas.discovery;
import org.apache.atlas.SortOrder;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.AtlasSearchResult; import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.discovery.SearchParameters; import org.apache.atlas.model.discovery.SearchParameters;
...@@ -65,4 +66,16 @@ public interface AtlasDiscoveryService { ...@@ -65,4 +66,16 @@ public interface AtlasDiscoveryService {
* @throws AtlasBaseException * @throws AtlasBaseException
*/ */
AtlasSearchResult searchWithParameters(SearchParameters searchParameters) throws AtlasBaseException; AtlasSearchResult searchWithParameters(SearchParameters searchParameters) throws AtlasBaseException;
/**
*
* @param guid unique ID of the entity.
* @param relation relation name.
* @param sortByAttribute sort the result using this attribute name, default value is 'name'
* @param sortOrder sorting order
* @param limit number of resultant rows (for pagination). [ limit > 0 ] and [ limit < maxlimit ]. -1 maps to atlas.search.defaultlimit property.
* @param offset offset to the results returned (for pagination). [ offset >= 0 ]. -1 maps to offset 0.
* @return AtlasSearchResult
*/
AtlasSearchResult searchRelatedEntities(String guid, String relation, String sortByAttribute, SortOrder sortOrder, int limit, int offset) throws AtlasBaseException;
} }
...@@ -21,6 +21,7 @@ import org.apache.atlas.ApplicationProperties; ...@@ -21,6 +21,7 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.SortOrder;
import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy; import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
...@@ -67,6 +68,7 @@ import scala.util.Either; ...@@ -67,6 +68,7 @@ import scala.util.Either;
import scala.util.parsing.combinator.Parsers.NoSuccess; import scala.util.parsing.combinator.Parsers.NoSuccess;
import javax.inject.Inject; import javax.inject.Inject;
import javax.script.Bindings;
import javax.script.ScriptEngine; import javax.script.ScriptEngine;
import javax.script.ScriptException; import javax.script.ScriptException;
import java.util.*; import java.util.*;
...@@ -74,10 +76,19 @@ import java.util.*; ...@@ -74,10 +76,19 @@ import java.util.*;
import static org.apache.atlas.AtlasErrorCode.CLASSIFICATION_NOT_FOUND; import static org.apache.atlas.AtlasErrorCode.CLASSIFICATION_NOT_FOUND;
import static org.apache.atlas.AtlasErrorCode.DISCOVERY_QUERY_FAILED; import static org.apache.atlas.AtlasErrorCode.DISCOVERY_QUERY_FAILED;
import static org.apache.atlas.AtlasErrorCode.UNKNOWN_TYPENAME; import static org.apache.atlas.AtlasErrorCode.UNKNOWN_TYPENAME;
import static org.apache.atlas.SortOrder.DESCENDING;
import static org.apache.atlas.model.TypeCategory.ARRAY;
import static org.apache.atlas.model.TypeCategory.MAP;
import static org.apache.atlas.model.TypeCategory.OBJECT_ID_TYPE;
import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.RELATIONSHIP_SEARCH;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.RELATIONSHIP_SEARCH_DESCENDING_SORT;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.RELATIONSHIP_SEARCH_ASCENDING_SORT;
@Component @Component
public class EntityDiscoveryService implements AtlasDiscoveryService { public class EntityDiscoveryService implements AtlasDiscoveryService {
private static final Logger LOG = LoggerFactory.getLogger(EntityDiscoveryService.class); private static final Logger LOG = LoggerFactory.getLogger(EntityDiscoveryService.class);
private static final String DEFAULT_SORT_ATTRIBUTE_NAME = "name";
private final AtlasGraph graph; private final AtlasGraph graph;
private final DefaultGraphPersistenceStrategy graphPersistenceStrategy; private final DefaultGraphPersistenceStrategy graphPersistenceStrategy;
...@@ -491,6 +502,98 @@ public class EntityDiscoveryService implements AtlasDiscoveryService { ...@@ -491,6 +502,98 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
return ret; return ret;
} }
@Override
@GraphTransaction
public AtlasSearchResult searchRelatedEntities(String guid, String relation, String sortByAttributeName,
SortOrder sortOrder, int limit, int offset) throws AtlasBaseException {
AtlasSearchResult ret = new AtlasSearchResult(AtlasQueryType.RELATIONSHIP);
if (StringUtils.isEmpty(guid) || StringUtils.isEmpty(relation)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "guid: '" + guid + "', relation: '" + relation + "'");
}
AtlasVertex entityVertex = entityRetriever.getEntityVertex(guid);
String entityTypeName = GraphHelper.getTypeName(entityVertex);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName);
if (entityType == null) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_TYPE, entityTypeName, guid);
}
AtlasAttribute attribute = entityType.getAttribute(relation);
if (attribute != null) {
if (isRelationshipAttribute(attribute)) {
relation = EDGE_LABEL_PREFIX + attribute.getQualifiedName();
} else {
throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_ATTRIBUTE, relation, attribute.getTypeName());
}
}
if (StringUtils.isEmpty(sortByAttributeName)) {
sortByAttributeName = DEFAULT_SORT_ATTRIBUTE_NAME;
}
AtlasAttribute sortByAttribute = entityType.getAttribute(sortByAttributeName);
if (sortByAttribute == null) {
sortByAttributeName = null;
sortOrder = null;
} else {
sortByAttributeName = sortByAttribute.getQualifiedName();
if (sortOrder == null) {
sortOrder = SortOrder.ASCENDING;
}
}
String relatedEntitiesQuery = getRelatedEntitiesQuery(sortOrder);
ScriptEngine scriptEngine = graph.getGremlinScriptEngine();
Bindings bindings = scriptEngine.createBindings();
QueryParams params = validateSearchParams(limit, offset);
bindings.put("g", graph);
bindings.put("guid", guid);
bindings.put("relation", relation);
bindings.put("sortAttributeName", sortByAttributeName);
bindings.put("offset", params.offset());
bindings.put("limit", params.offset() + params.limit());
try {
Object result = graph.executeGremlinScript(scriptEngine, bindings, relatedEntitiesQuery, false);
if (result instanceof List && CollectionUtils.isNotEmpty((List) result)) {
List<?> queryResult = (List) result;
Object firstElement = queryResult.get(0);
if (firstElement instanceof AtlasVertex) {
List<AtlasVertex> vertices = (List<AtlasVertex>) queryResult;
List<AtlasEntityHeader> resultList = new ArrayList<>(vertices.size());
for (AtlasVertex vertex : vertices) {
resultList.add(entityRetriever.toAtlasEntityHeader(vertex));
}
ret.setEntities(resultList);
}
}
if (ret.getEntities() == null) {
ret.setEntities(new ArrayList<AtlasEntityHeader>());
}
} catch (ScriptException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Gremlin script execution failed for relationship search query: " + e);
}
throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "Relationship search query failed");
} finally {
graph.releaseGremlinScriptEngine(scriptEngine);
}
return ret;
}
public int getMaxResultSetSize() { public int getMaxResultSetSize() {
return maxResultSetSize; return maxResultSetSize;
} }
...@@ -646,4 +749,35 @@ public class EntityDiscoveryService implements AtlasDiscoveryService { ...@@ -646,4 +749,35 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
return ""; return "";
} }
private boolean isRelationshipAttribute(AtlasAttribute attribute) throws AtlasBaseException {
boolean ret = true;
AtlasType attrType = attribute.getAttributeType();
if (attrType.getTypeCategory() == ARRAY) {
attrType = ((AtlasArrayType) attrType).getElementType();
} else if (attrType.getTypeCategory() == MAP) {
attrType = ((AtlasMapType) attrType).getValueType();
}
if (attrType.getTypeCategory() != OBJECT_ID_TYPE) {
ret = false;
}
return ret;
}
private String getRelatedEntitiesQuery(SortOrder sortOrder) {
final String ret;
if (sortOrder == null) {
ret = gremlinQueryProvider.getQuery(RELATIONSHIP_SEARCH);
} else if (sortOrder == DESCENDING) {
ret = gremlinQueryProvider.getQuery(RELATIONSHIP_SEARCH_DESCENDING_SORT);
} else {
ret = gremlinQueryProvider.getQuery(RELATIONSHIP_SEARCH_ASCENDING_SORT);
}
return ret;
}
} }
...@@ -143,7 +143,7 @@ public final class EntityGraphRetriever { ...@@ -143,7 +143,7 @@ public final class EntityGraphRetriever {
return atlasVertex != null ? mapVertexToAtlasEntityHeader(atlasVertex, attributes) : null; return atlasVertex != null ? mapVertexToAtlasEntityHeader(atlasVertex, attributes) : null;
} }
private AtlasVertex getEntityVertex(String guid) throws AtlasBaseException { public AtlasVertex getEntityVertex(String guid) throws AtlasBaseException {
AtlasVertex ret = AtlasGraphUtilsV1.findByGuid(guid); AtlasVertex ret = AtlasGraphUtilsV1.findByGuid(guid);
if (ret == null) { if (ret == null) {
......
...@@ -95,6 +95,12 @@ public class AtlasGremlin2QueryProvider extends AtlasGremlinQueryProvider { ...@@ -95,6 +95,12 @@ public class AtlasGremlin2QueryProvider extends AtlasGremlinQueryProvider {
return ".filter({it.getProperty('%s').endsWith(%s)})"; return ".filter({it.getProperty('%s').endsWith(%s)})";
case COMPARE_CONTAINS: case COMPARE_CONTAINS:
return ".filter({it.getProperty('%s').contains(%s)})"; return ".filter({it.getProperty('%s').contains(%s)})";
case RELATIONSHIP_SEARCH:
return "g.V('__guid', guid).both(relation)[offset..<limit].toList()";
case RELATIONSHIP_SEARCH_DESCENDING_SORT:
return "g.V('__guid', guid).both(relation)[offset..<limit].order{it.b.getProperty(sortAttributeName) <=> it.a.getProperty(sortAttributeName)}.toList()";
case RELATIONSHIP_SEARCH_ASCENDING_SORT:
return "g.V('__guid', guid).both(relation)[offset..<limit].order{it.a.getProperty(sortAttributeName) <=> it.b.getProperty(sortAttributeName)}.toList()";
} }
// Should never reach this point // Should never reach this point
return null; return null;
......
...@@ -61,6 +61,9 @@ public abstract class AtlasGremlinQueryProvider { ...@@ -61,6 +61,9 @@ public abstract class AtlasGremlinQueryProvider {
BASIC_SEARCH_STATE_FILTER, BASIC_SEARCH_STATE_FILTER,
TO_RANGE_LIST, TO_RANGE_LIST,
GUID_PREFIX_FILTER, GUID_PREFIX_FILTER,
RELATIONSHIP_SEARCH,
RELATIONSHIP_SEARCH_ASCENDING_SORT,
RELATIONSHIP_SEARCH_DESCENDING_SORT,
// Comparison clauses // Comparison clauses
COMPARE_LT, COMPARE_LT,
...@@ -72,6 +75,6 @@ public abstract class AtlasGremlinQueryProvider { ...@@ -72,6 +75,6 @@ public abstract class AtlasGremlinQueryProvider {
COMPARE_MATCHES, COMPARE_MATCHES,
COMPARE_STARTS_WITH, COMPARE_STARTS_WITH,
COMPARE_ENDS_WITH, COMPARE_ENDS_WITH,
COMPARE_CONTAINS, COMPARE_CONTAINS
} }
} }
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.atlas.web.rest; package org.apache.atlas.web.rest;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.SortOrder;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.discovery.AtlasDiscoveryService; import org.apache.atlas.discovery.AtlasDiscoveryService;
import org.apache.atlas.model.discovery.AtlasSearchResult; import org.apache.atlas.model.discovery.AtlasSearchResult;
...@@ -266,6 +267,44 @@ public class DiscoveryREST { ...@@ -266,6 +267,44 @@ public class DiscoveryREST {
} }
} }
/**
* Relationship search to search for related entities satisfying the search parameters
* @param guid Attribute name
* @param relation relationName
* @param sortByAttribute sort the result using this attribute name, default value is 'name'
* @param sortOrder sorting order
* @param limit limit the result set to only include the specified number of entries
* @param offset start offset of the result set (useful for pagination)
* @return Atlas search result
* @throws AtlasBaseException
*
* @HTTP 200 On successful search
* @HTTP 400 guid is not a valid entity type or attributeName is not a valid relationship attribute
*/
@GET
@Path("relationship")
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE)
public AtlasSearchResult searchRelatedEntities(@QueryParam("guid") String guid,
@QueryParam("relation") String relation,
@QueryParam("sortBy") String sortByAttribute,
@QueryParam("sortOrder") SortOrder sortOrder,
@QueryParam("limit") int limit,
@QueryParam("offset") int offset) throws AtlasBaseException {
AtlasPerfTracer perf = null;
try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DiscoveryREST.relatedEntitiesSearchUsingGremlin(" + guid +
", " + relation + ", " + sortByAttribute + ", " + sortOrder + ", " + limit + ", " + offset + ")");
}
return atlasDiscoveryService.searchRelatedEntities(guid, relation, sortByAttribute, sortOrder, limit, offset);
} finally {
AtlasPerfTracer.log(perf);
}
}
private boolean isEmpty(SearchParameters.FilterCriteria filterCriteria) { private boolean isEmpty(SearchParameters.FilterCriteria filterCriteria) {
return filterCriteria == null || return filterCriteria == null ||
(StringUtils.isEmpty(filterCriteria.getAttributeName()) && CollectionUtils.isEmpty(filterCriteria.getCriterion())); (StringUtils.isEmpty(filterCriteria.getAttributeName()) && CollectionUtils.isEmpty(filterCriteria.getCriterion()));
......
...@@ -100,8 +100,30 @@ public class NotificationHookConsumerKafkaTest { ...@@ -100,8 +100,30 @@ public class NotificationHookConsumerKafkaTest {
produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity())); produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity()));
NotificationConsumer<HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, false); NotificationConsumer<HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, false);
NotificationHookConsumer notificationHookConsumer = NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
consumeOneMessage(consumer, hookConsumer);
verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean());
// produce another message, and make sure it moves ahead. If commit succeeded, this would work.
produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity()));
consumeOneMessage(consumer, hookConsumer);
verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
reset(atlasEntityStore);
}
finally {
kafkaNotification.close();
}
}
@Test
public void testConsumerConsumesNewMessageWithAutoCommitDisabled1() throws AtlasException, InterruptedException, AtlasBaseException {
try {
produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity()));
NotificationConsumer<HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, false);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
consumeOneMessage(consumer, hookConsumer); consumeOneMessage(consumer, hookConsumer);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment