Commit 085d5c86 by Shwetha GS

ATLAS-347 Atlas search APIs should allow pagination of results (shwethags)

parent 67acb9d6
......@@ -53,7 +53,6 @@ import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -218,7 +217,7 @@ public class HiveMetaStoreBridge {
private Referenceable getEntityReferenceFromDSL(String typeName, String dslQuery) throws Exception {
AtlasClient dgiClient = getAtlasClient();
JSONArray results = dgiClient.searchByDSL(dslQuery);
JSONArray results = dgiClient.searchByDSL(dslQuery, 1, 0);
if (results.length() == 0) {
return null;
} else {
......@@ -501,17 +500,6 @@ public class HiveMetaStoreBridge {
atlasClient.updateEntity(referenceable.getId().id, referenceable);
}
private Referenceable getEntityReferenceFromGremlin(String typeName, String gremlinQuery)
throws AtlasServiceException, JSONException {
AtlasClient client = getAtlasClient();
JSONArray results = client.searchByGremlin(gremlinQuery);
if (results.length() == 0) {
return null;
}
String guid = results.getJSONObject(0).getString(SEARCH_ENTRY_GUID_ATTR);
return new Referenceable(guid, typeName, null);
}
public Referenceable fillStorageDesc(StorageDescriptor storageDesc, String tableQualifiedName,
String sdQualifiedName, Id tableId) throws Exception {
LOG.debug("Filling storage descriptor information for " + storageDesc);
......
......@@ -32,7 +32,6 @@ import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.mapred.TextInputFormat;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
......@@ -41,7 +40,6 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import scala.actors.threadpool.Arrays;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
......@@ -98,12 +96,12 @@ public class HiveMetaStoreBridgeTest {
// return existing table
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME,
HiveDataTypes.HIVE_TABLE.getName(), false))).thenReturn(
HiveDataTypes.HIVE_TABLE.getName(), false), 1, 0)).thenReturn(
getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTables.get(0));
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(),
processQualifiedName))).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
processQualifiedName), 1, 0)).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
bridge.importHiveMetadata(true);
......@@ -117,7 +115,7 @@ public class HiveMetaStoreBridgeTest {
private void returnExistingDatabase(String databaseName, AtlasClient atlasClient, String clusterName)
throws AtlasServiceException, JSONException {
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getDatabaseDSLQuery(clusterName, databaseName,
HiveDataTypes.HIVE_DB.getName()))).thenReturn(
HiveDataTypes.HIVE_DB.getName()), 1, 0)).thenReturn(
getEntityReference("72e06b34-9151-4023-aa9d-b82103a50e76"));
}
......@@ -147,12 +145,11 @@ public class HiveMetaStoreBridgeTest {
returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME,
TEST_TABLE_NAME,
HiveDataTypes.HIVE_TABLE.getName(), false))).thenReturn(
TEST_TABLE_NAME, HiveDataTypes.HIVE_TABLE.getName(), false), 1, 0)).thenReturn(
getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTable);
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(),
processQualifiedName))).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
processQualifiedName), 1, 0)).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
Partition partition = mock(Partition.class);
......@@ -180,13 +177,12 @@ public class HiveMetaStoreBridgeTest {
when(hiveClient.getTable(TEST_DB_NAME, TEST_TABLE_NAME)).thenThrow(new RuntimeException("Timeout while reading data from hive metastore"));
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME,
table2Name,
HiveDataTypes.HIVE_TABLE.getName(), false))).thenReturn(
table2Name, HiveDataTypes.HIVE_TABLE.getName(), false), 1, 0)).thenReturn(
getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTables.get(1));
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(),
processQualifiedName))).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
processQualifiedName), 1, 0)).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
try {
......@@ -206,13 +202,12 @@ public class HiveMetaStoreBridgeTest {
when(hiveClient.getTable(TEST_DB_NAME, TEST_TABLE_NAME)).thenThrow(new RuntimeException("Timeout while reading data from hive metastore"));
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME,
table2Name,
HiveDataTypes.HIVE_TABLE.getName(), false))).thenReturn(
table2Name, HiveDataTypes.HIVE_TABLE.getName(), false), 10, 0)).thenReturn(
getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTables.get(1));
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(),
processQualifiedName))).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
processQualifiedName), 10, 0)).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
try {
......@@ -255,6 +250,4 @@ public class HiveMetaStoreBridgeTest {
return attrValue.equals(((Referenceable) o).get(attrName));
}
}
}
......@@ -131,12 +131,12 @@ public class SqoopHookIT {
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results = atlasClient.search(query);
JSONArray results = atlasClient.search(query, 10, 0);
return results.length() > 0;
}
});
JSONArray results = atlasClient.search(query);
JSONArray results = atlasClient.search(query, 10, 0);
JSONObject row = results.getJSONObject(0).getJSONObject("t");
return row.getString("id");
......
......@@ -139,7 +139,7 @@ public class StormAtlasHookIT {
String query = String.format("from %s where name = \"%s\"",
StormDataTypes.STORM_TOPOLOGY.getName(), TOPOLOGY_NAME);
JSONArray results = atlasClient.search(query);
JSONArray results = atlasClient.search(query, 10, 0);
JSONObject row = results.getJSONObject(0);
return row.has("$id$") ? row.getJSONObject("$id$").getString("id"): null;
......
......@@ -101,6 +101,8 @@ public class AtlasClient {
public static final String URI_TRAITS = "traits";
public static final String QUERY = "query";
public static final String LIMIT = "limit";
public static final String OFFSET = "offset";
public static final String QUERY_TYPE = "queryType";
public static final String ATTRIBUTE_NAME = "property";
public static final String ATTRIBUTE_VALUE = "value";
......@@ -479,7 +481,6 @@ public class AtlasClient {
//Search operations
SEARCH(BASE_URI + URI_SEARCH, HttpMethod.GET, Response.Status.OK),
SEARCH_DSL(BASE_URI + URI_SEARCH + "/dsl", HttpMethod.GET, Response.Status.OK),
SEARCH_GREMLIN(BASE_URI + URI_SEARCH + "/gremlin", HttpMethod.GET, Response.Status.OK),
SEARCH_FULL_TEXT(BASE_URI + URI_SEARCH + "/fulltext", HttpMethod.GET, Response.Status.OK),
//Lineage operations based on dataset name
......@@ -981,17 +982,21 @@ public class AtlasClient {
}
/**
* Search using gremlin/dsl/full text
* Search using dsl/full text
* @param searchQuery
* @return
* @param limit number of rows to be returned in the result, used for pagination. maxlimit > limit > 0. -1 maps to atlas.search.defaultlimit property value
* @param offset offset to the results returned, used for pagination. offset >= 0. -1 maps to offset 0
* @return Query results
* @throws AtlasServiceException
*/
public JSONArray search(final String searchQuery) throws AtlasServiceException {
public JSONArray search(final String searchQuery, final int limit, final int offset) throws AtlasServiceException {
JSONObject result = callAPIWithRetries(API.SEARCH, null, new ResourceCreator() {
@Override
public WebResource createResource() {
WebResource resource = getResource(API.SEARCH);
resource = resource.queryParam(QUERY, searchQuery);
resource = resource.queryParam(LIMIT, String.valueOf(limit));
resource = resource.queryParam(OFFSET, String.valueOf(offset));
return resource;
}
});
......@@ -1006,39 +1011,20 @@ public class AtlasClient {
/**
* Search given query DSL
* @param query DSL query
* @param limit number of rows to be returned in the result, used for pagination. maxlimit > limit > 0. -1 maps to atlas.search.defaultlimit property value
* @param offset offset to the results returned, used for pagination. offset >= 0. -1 maps to offset 0
* @return result json object
* @throws AtlasServiceException
*/
public JSONArray searchByDSL(final String query) throws AtlasServiceException {
public JSONArray searchByDSL(final String query, final int limit, final int offset) throws AtlasServiceException {
LOG.debug("DSL query: {}", query);
JSONObject result = callAPIWithRetries(API.SEARCH_DSL, null, new ResourceCreator() {
@Override
public WebResource createResource() {
WebResource resource = getResource(API.SEARCH_DSL);
resource = resource.queryParam(QUERY, query);
return resource;
}
});
try {
return result.getJSONArray(RESULTS);
} catch (JSONException e) {
throw new AtlasServiceException(e);
}
}
/**
* Search given gremlin query
* @param gremlinQuery Gremlin query
* @return result json object
* @throws AtlasServiceException
*/
public JSONArray searchByGremlin(final String gremlinQuery) throws AtlasServiceException {
LOG.debug("Gremlin query: " + gremlinQuery);
JSONObject result = callAPIWithRetries(API.SEARCH_GREMLIN, null, new ResourceCreator() {
@Override
public WebResource createResource() {
WebResource resource = getResource(API.SEARCH_GREMLIN);
resource = resource.queryParam(QUERY, gremlinQuery);
resource = resource.queryParam(LIMIT, String.valueOf(limit));
resource = resource.queryParam(OFFSET, String.valueOf(offset));
return resource;
}
});
......@@ -1052,15 +1038,20 @@ public class AtlasClient {
/**
* Search given full text search
* @param query Query
* @param limit number of rows to be returned in the result, used for pagination. maxlimit > limit > 0. -1 maps to atlas.search.defaultlimit property value
* @param offset offset to the results returned, used for pagination. offset >= 0. -1 maps to offset 0
* NOTE: Pagination is not implemented currently for full text search, so limit and offset are not used
* @return result json object
* @throws AtlasServiceException
*/
public JSONObject searchByFullText(final String query) throws AtlasServiceException {
public JSONObject searchByFullText(final String query, final int limit, final int offset) throws AtlasServiceException {
return callAPIWithRetries(API.SEARCH_FULL_TEXT, null, new ResourceCreator() {
@Override
public WebResource createResource() {
WebResource resource = getResource(API.SEARCH_FULL_TEXT);
resource = resource.queryParam(QUERY, query);
resource = resource.queryParam(LIMIT, String.valueOf(limit));
resource = resource.queryParam(OFFSET, String.valueOf(offset));
return resource;
}
});
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas;
import org.apache.commons.configuration.Configuration;
/**
* Utility for reading properties in atlas-application.properties.
*/
public final class AtlasProperties {
private static final Configuration APPLICATION_PROPERTIES;
private AtlasProperties() { }
static {
try {
APPLICATION_PROPERTIES = ApplicationProperties.get();
} catch (AtlasException e) {
throw new RuntimeException(e);
}
}
/**
* Enum that encapsulated each property name and its default value.
*/
public enum AtlasProperty {
SEARCH_MAX_LIMIT("atlas.search.maxlimit", 10000),
SEARCH_DEFAULT_LIMIT("atlas.search.defaultlimit", 100);
private final String propertyName;
private final Object defaultValue;
AtlasProperty(String propertyName, Object defaultValue) {
this.propertyName = propertyName;
this.defaultValue = defaultValue;
}
}
public static <T> T getProperty(AtlasProperty property) {
Object value = APPLICATION_PROPERTIES.getProperty(property.propertyName);
if (value == null) {
return (T) property.defaultValue;
} else {
return (T) value;
}
}
}
......@@ -158,7 +158,7 @@ public final class ParamChecker {
* @param maxValue
* @param name
*/
public static void lessThan(short value, short maxValue, String name) {
public static void lessThan(long value, long maxValue, String name) {
if (value <= 0) {
throw new IllegalArgumentException(name + " should be > 0, current value " + value);
}
......@@ -166,4 +166,10 @@ public final class ParamChecker {
throw new IllegalArgumentException(name + " should be <= " + maxValue + ", current value " + value);
}
}
public static void greaterThan(long value, long minValue, String name) {
if (value <= minValue) {
throw new IllegalArgumentException(name + " should be > " + minValue + ", current value " + value);
}
}
}
......@@ -134,6 +134,18 @@ atlas.lineage.hive.table.schema.query=hive_table where name=?, columns
</verbatim>
---++ Search Configs
Search APIs (DSL and full text search) support pagination and have optional limit and offset arguments. Following configs are related to search pagination
<verbatim>
# Default limit used when limit is not specified in API
atlas.search.defaultlimit=100
# Maximum limit allowed in API. Limits maximum results that can be fetched to make sure the atlas server doesn't run out of memory
atlas.search.maxlimit=10000
</verbatim>
---++ Notification Configs
Refer http://kafka.apache.org/documentation.html#configuration for Kafka configuration. All Kafka configs should be prefixed with 'atlas.kafka.'
......
......@@ -11,9 +11,9 @@ The grammar for the DSL is below.
<verbatim>
queryWithPath: query ~ opt(WITHPATH)
query: rep1sep(singleQuery, opt(COMMA))
query: querySrc ~ opt(loopExpression) ~ opt(selectClause) ~ opt(orderby) ~ opt(limitOffset)
singleQuery: singleQrySrc ~ opt(loopExpression) ~ opt(selectClause) ~ opt(orderby) ~ opt(limitOffset)
querySrc: rep1sep(singleQrySrc, opt(COMMA))
singleQrySrc = FROM ~ fromSrc ~ opt(WHERE) ~ opt(expr ^? notIdExpression) |
WHERE ~ (expr ^? notIdExpression) |
......@@ -22,7 +22,7 @@ singleQrySrc = FROM ~ fromSrc ~ opt(WHERE) ~ opt(expr ^? notIdExpression) |
fromSrc: identifier ~ AS ~ alias | identifier
orderby: ORDERBY ~ order ~ opt (sortOrder)
orderby: ORDERBY ~ expr ~ opt (sortOrder)
limitOffset: LIMIT ~ lmt ~ opt (offset)
......@@ -87,24 +87,30 @@ Language Notes:
* The _!WithPath_ clause can be used with transitive closure queries to retrieve the Path that
connects the two related Entities. (We also provide a higher level interface for Closure Queries
see scaladoc for 'org.apache.atlas.query.ClosureQuery')
* ORDERBY is optional. Orderby clause should be specified in single quote ('). When order by clause is specified case insensitive sorting is done in ascending order.
For sorting in descending order specify 'DESC' after order by clause. If no order by is specified then no default sorting is applied.
* ORDERBY is optional. When order by clause is specified, case insensitive sorting is done based on the column specified.
For sorting in descending order specify 'DESC' after order by clause. If no order by is specified, then no default sorting is applied.
* LIMIT is optional. It limits the maximum number of objects to be fetched starting from specified optional offset. If no offset is specified count starts from beginning.
* There are couple of Predicate functions different from SQL:
* _is_ or _isa_can be used to filter Entities that have a particular Trait.
* _has_ can be used to filter Entities that have a value for a particular Attribute.
* When querying for a space delimited multiple-word identifier, it need to be enclosed within
backquote (`)
* Any identifiers or constants with special characters(space,$,",{,}) should be enclosed within backquote (`)
---+++ DSL Examples
* from DB
For the model,
Asset - attributes name, owner, description
DB - supertype Asset - attributes clusterName, parameters, comment
Column - extends Asset - attributes type, comment
Table - supertype Asset - db, columns, parameters, comment
Traits - PII, Log Data
DSL queries:
* from DB
* DB where name="Reporting" select name, owner
* DB where name="Reporting" select name, owner orderby 'name'
* DB where name="Reporting" select name, owner orderby name
* DB where name="Reporting" select name limit 10
* DB where name="Reporting" select name, owner limit 10 offset 0
* DB where name="Reporting" select name, owner orderby 'name' limit 10 offset 5
* DB where name="Reporting" select name, owner orderby 'name' desc limit 10 offset 5
* DB where name="Reporting" select name, owner orderby name limit 10 offset 5
* DB where name="Reporting" select name, owner orderby name desc limit 10 offset 5
* DB has name
* DB is !JdbcAccess
* Column where Column isa PII
......@@ -112,7 +118,6 @@ Language Notes:
* Table where name="sales_fact", columns as column select column.name, column.dataType, column.comment
* `Log Data`
---++ Full-text Search
Atlas also exposes a lucene style full-text search capability.
\ No newline at end of file
......@@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES:
ALL CHANGES:
ATLAS-347 Atlas search APIs should allow pagination of results (shwethags)
ATLAS-639 Exception for lineage request (svimal2106 via shwethags)
ATLAS-1022 Update typesystem wiki with details (yhemanth via shwethags)
ATLAS-1021 Update Atlas architecture wiki (yhemanth via sumasai)
......
......@@ -22,12 +22,14 @@ import com.thinkaurelius.titan.core.TitanGraph;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasProperties;
import org.apache.atlas.GraphTransaction;
import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy;
import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
import org.apache.atlas.query.GremlinQueryResult;
import org.apache.atlas.query.InputLineageClosureQuery;
import org.apache.atlas.query.OutputLineageClosureQuery;
import org.apache.atlas.query.QueryParams;
import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.graph.GraphProvider;
import org.apache.atlas.typesystem.exception.EntityNotFoundException;
......@@ -173,7 +175,8 @@ public class DataSetLineageService implements LineageService {
private String getSchemaForId(String typeName, String guid) throws DiscoveryException {
final String schemaQuery =
String.format(propertiesConf.getString(DATASET_SCHEMA_QUERY_PREFIX + typeName), guid);
return discoveryService.searchByDSL(schemaQuery);
int limit = AtlasProperties.getProperty(AtlasProperties.AtlasProperty.SEARCH_MAX_LIMIT);
return discoveryService.searchByDSL(schemaQuery, new QueryParams(limit, 0));
}
@Override
......@@ -192,7 +195,7 @@ public class DataSetLineageService implements LineageService {
*/
private ReferenceableInstance validateDatasetNameExists(String datasetName) throws AtlasException {
final String tableExistsQuery = String.format(DATASET_NAME_EXISTS_QUERY, datasetName);
GremlinQueryResult queryResult = discoveryService.evaluate(tableExistsQuery);
GremlinQueryResult queryResult = discoveryService.evaluate(tableExistsQuery, new QueryParams(1, 0));
if (!(queryResult.rows().length() > 0)) {
throw new EntityNotFoundException(datasetName + " does not exist");
}
......@@ -207,7 +210,7 @@ public class DataSetLineageService implements LineageService {
*/
private String validateDatasetExists(String guid) throws AtlasException {
final String datasetExistsQuery = String.format(DATASET_EXISTS_QUERY, guid);
GremlinQueryResult queryResult = discoveryService.evaluate(datasetExistsQuery);
GremlinQueryResult queryResult = discoveryService.evaluate(datasetExistsQuery, new QueryParams(1, 0));
if (!(queryResult.rows().length() > 0)) {
throw new EntityNotFoundException("Dataset with guid = " + guid + " does not exist");
}
......
......@@ -18,6 +18,8 @@
package org.apache.atlas.discovery;
import org.apache.atlas.query.QueryParams;
import java.util.List;
import java.util.Map;
......@@ -27,17 +29,22 @@ import java.util.Map;
public interface DiscoveryService {
/**
* Full text search
* Searches using Full text query
* @param query query string
* @param queryParams Default query parameters like limit, offset
* @return results json
* @throws DiscoveryException
*/
String searchByFullText(String query) throws DiscoveryException;
String searchByFullText(String query, QueryParams queryParams) throws DiscoveryException;
/**
* Search using query DSL.
*
* @param dslQuery query in DSL format.
* @return JSON representing the type and results.
* Searches using DSL query
* @param dslQuery query string
* @param queryParams Default query parameters like limit, offset
* @return results json
* @throws DiscoveryException
*/
String searchByDSL(String dslQuery) throws DiscoveryException;
String searchByDSL(String dslQuery, QueryParams queryParams) throws DiscoveryException;
/**
* Assumes the User is familiar with the persistence structure of the Repository.
......
......@@ -142,7 +142,10 @@ public class DefaultGraphPersistenceStrategy implements GraphPersistenceStrategi
if (dataType.getName().equals(idType.getName())) {
structInstance.set(idType.typeNameAttrName(), GraphHelper.getProperty(structVertex, typeAttributeName()));
structInstance.set(idType.idAttrName(), GraphHelper.getProperty(structVertex, idAttributeName()));
structInstance.set(idType.stateAttrName(), GraphHelper.getProperty(structVertex, stateAttributeName()));
String stateValue = GraphHelper.getProperty(structVertex, stateAttributeName());
if (stateValue != null) {
structInstance.set(idType.stateAttrName(), stateValue);
}
} else {
metadataRepository.getGraphToInstanceMapper()
.mapVertexToInstance(structVertex, structInstance, structType.fieldMapping().fields);
......
......@@ -32,6 +32,7 @@ import org.apache.atlas.query.GremlinEvaluator;
import org.apache.atlas.query.GremlinQuery;
import org.apache.atlas.query.GremlinQueryResult;
import org.apache.atlas.query.GremlinTranslator;
import org.apache.atlas.query.QueryParams;
import org.apache.atlas.query.QueryParser;
import org.apache.atlas.query.QueryProcessor;
import org.apache.atlas.repository.Constants;
......@@ -83,8 +84,8 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
// .html#query-string-syntax for query syntax
@Override
@GraphTransaction
public String searchByFullText(String query) throws DiscoveryException {
String graphQuery = String.format("v.%s:(%s)", Constants.ENTITY_TEXT_PROPERTY_KEY, query);
public String searchByFullText(String query, QueryParams queryParams) throws DiscoveryException {
String graphQuery = String.format("v.\"%s\":(%s)", Constants.ENTITY_TEXT_PROPERTY_KEY, query);
LOG.debug("Full text query: {}", graphQuery);
Iterator<TitanIndexQuery.Result<Vertex>> results =
titanGraph.indexQuery(Constants.FULLTEXT_INDEX, graphQuery).vertices().iterator();
......@@ -112,27 +113,20 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
return response.toString();
}
/**
* Search using query DSL.
*
* @param dslQuery query in DSL format.
* @return JSON representing the type and results.
*/
@Override
@GraphTransaction
public String searchByDSL(String dslQuery) throws DiscoveryException {
LOG.info("Executing dsl query={}", dslQuery);
GremlinQueryResult queryResult = evaluate(dslQuery);
public String searchByDSL(String dslQuery, QueryParams queryParams) throws DiscoveryException {
GremlinQueryResult queryResult = evaluate(dslQuery, queryParams);
return queryResult.toJson();
}
public GremlinQueryResult evaluate(String dslQuery) throws DiscoveryException {
public GremlinQueryResult evaluate(String dslQuery, QueryParams queryParams) throws DiscoveryException {
LOG.info("Executing dsl query={}", dslQuery);
try {
Either<Parsers.NoSuccess, Expressions.Expression> either = QueryParser.apply(dslQuery);
Either<Parsers.NoSuccess, Expressions.Expression> either = QueryParser.apply(dslQuery, queryParams);
if (either.isRight()) {
Expressions.Expression expression = either.right().get();
return evaluate(expression);
return evaluate(dslQuery, expression);
} else {
throw new DiscoveryException("Invalid expression : " + dslQuery + ". " + either.left());
}
......@@ -141,8 +135,16 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
}
}
public GremlinQueryResult evaluate(Expressions.Expression expression) {
private GremlinQueryResult evaluate(String dslQuery, Expressions.Expression expression) {
Expressions.Expression validatedExpression = QueryProcessor.validate(expression);
//If the final limit is 0, don't launch the query, return with 0 rows
if (validatedExpression instanceof Expressions.LimitExpression
&& ((Expressions.LimitExpression) validatedExpression).limit().rawValue() == 0) {
return new GremlinQueryResult(dslQuery, validatedExpression.dataType(),
scala.collection.immutable.List.empty());
}
GremlinQuery gremlinQuery = new GremlinTranslator(validatedExpression, graphPersistenceStrategy).translate();
LOG.debug("Query = {}", validatedExpression);
LOG.debug("Expression Tree = {}", validatedExpression.treeString());
......@@ -176,13 +178,10 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
}
}
private List<Map<String, String>> extractResult(Object o) throws DiscoveryException {
if (!(o instanceof List)) {
throw new DiscoveryException(String.format("Cannot process result %s", o.toString()));
}
List l = (List) o;
private List<Map<String, String>> extractResult(final Object o) throws DiscoveryException {
List<Map<String, String>> result = new ArrayList<>();
if (o instanceof List) {
List l = (List) o;
for (Object r : l) {
Map<String, String> oRow = new HashMap<>();
......@@ -211,6 +210,11 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
result.add(oRow);
}
} else {
result.add(new HashMap<String, String>() {{
put("result", o.toString());
}});
}
return result;
}
}
......@@ -18,11 +18,8 @@
package org.apache.atlas.query
import java.util
import com.google.common.collect.ImmutableCollection
import org.apache.atlas.AtlasException
import org.apache.atlas.typesystem.ITypedInstance
import org.apache.atlas.typesystem.types.DataTypes.{ArrayType, PrimitiveType, TypeCategory}
import org.apache.atlas.typesystem.types._
......@@ -35,15 +32,15 @@ object Expressions {
extends AtlasException(message, cause, enableSuppression, writableStackTrace) {
def this(e: Expression, message: String) {
this(e, message, null, false, false)
this(e, message, null, false, true)
}
def this(e: Expression, message: String, cause: Throwable) {
this(e, message, cause, false, false)
this(e, message, cause, false, true)
}
def this(e: Expression, cause: Throwable) {
this(e, null, cause, false, false)
this(e, null, cause, false, true)
}
override def getMessage: String = {
......@@ -333,7 +330,7 @@ object Expressions {
def limit(lmt: Literal[Integer], offset : Literal[Integer]) = new LimitExpression(this, lmt, offset)
def order(odr: String, asc: Boolean) = new OrderExpression(this, odr, asc)
def order(odr: Expression, asc: Boolean) = new OrderExpression(this, odr, asc)
}
trait BinaryNode {
......@@ -788,9 +785,9 @@ object Expressions {
}
}
case class OrderExpression(child: Expression, odr: String, asc: Boolean) extends Expression with UnaryNode {
case class OrderExpression(child: Expression, odr: Expression, asc: Boolean) extends Expression with UnaryNode {
override def toString = s"$child order $odr asc $asc"
override def toString = s"$child orderby $odr asc $asc"
lazy val dataType = {
if (!resolved) {
......
......@@ -317,13 +317,20 @@ class GremlinTranslator(expr: Expression,
s"${genQuery(child, inSelect)}.path"
}
case order@OrderExpression(child, odr, asc) => {
var orderExpression = odr
if(odr.isInstanceOf[BackReference]) { orderExpression = odr.asInstanceOf[BackReference].reference }
else if (odr.isInstanceOf[AliasExpression]) { orderExpression = odr.asInstanceOf[AliasExpression].child}
val orderbyProperty = genQuery(orderExpression, false)
val bProperty = s"it.b.$orderbyProperty"
val aProperty = s"it.a.$orderbyProperty"
val aCondition = s"($aProperty != null ? $aProperty.toLowerCase(): $aProperty)"
val bCondition = s"($bProperty != null ? $bProperty.toLowerCase(): $bProperty)"
var orderby = ""
asc match {
//builds a closure comparison function based on provided order by clause in DSL. This will be used to sort the results by gremlin order pipe.
//Ordering is case insensitive.
case false=> orderby = s"order{(it.b.getProperty('$odr') !=null ? it.b.getProperty('$odr').toLowerCase(): it.b.getProperty('$odr')) <=> (it.a.getProperty('$odr') != null ? it.a.getProperty('$odr').toLowerCase(): it.a.getProperty('$odr'))}"//descending
case _ => orderby = s"order{(it.a.getProperty('$odr') != null ? it.a.getProperty('$odr').toLowerCase(): it.a.getProperty('$odr')) <=> (it.b.getProperty('$odr') !=null ? it.b.getProperty('$odr').toLowerCase(): it.b.getProperty('$odr'))}"
case false=> orderby = s"order{$bCondition <=> $aCondition}"//descending
case _ => orderby = s"order{$aCondition <=> $bCondition}"
}
s"""${genQuery(child, inSelect)}.$orderby"""
}
......
......@@ -93,7 +93,7 @@ trait ExpressionUtils {
input.limit(lmt, offset)
}
def order(input: Expression, odr: String, asc: Boolean) = {
def order(input: Expression, odr: Expression, asc: Boolean) = {
input.order(odr, asc)
}
......@@ -118,6 +118,9 @@ trait ExpressionUtils {
sngQuery2.transformUp(replaceIdWithField(leftSrcId, snglQuery1.field(leftSrcId.name)))
}
}
case class QueryParams(limit: Int, offset: Int)
/**
* Query parser is used to parse the DSL query. It uses scala PackratParsers and pattern matching to extract the expressions.
* It builds up a expression tree.
......@@ -134,7 +137,12 @@ object QueryParser extends StandardTokenParsers with QueryKeywords with Expressi
override val lexical = new QueryLexer(queryreservedWords, querydelims)
def apply(input: String): Either[NoSuccess, Expression] = synchronized {
/**
* @param input query string
* @param queryParams query parameters that contains limit and offset
* @return
*/
def apply(input: String)(implicit queryParams: QueryParams = null): Either[NoSuccess, Expression] = synchronized {
phrase(queryWithPath)(new lexical.Scanner(input)) match {
case Success(r, x) => Right(r)
case f@Failure(m, x) => Left(f)
......@@ -142,23 +150,21 @@ object QueryParser extends StandardTokenParsers with QueryKeywords with Expressi
}
}
def queryWithPath = query ~ opt(WITHPATH) ^^ {
import scala.math._
def queryWithPath(implicit queryParams: QueryParams) = query ~ opt(WITHPATH) ^^ {
case q ~ None => q
case q ~ p => q.path()
}
def query: Parser[Expression] = rep1sep(singleQuery, opt(COMMA)) ^^ { l => l match {
case h :: Nil => h
case h :: t => t.foldLeft(h)(merge(_, _))
}
}
/**
* A singleQuery can have the following forms:
* 1. SrcQuery [select] [orderby desc] [Limit x offset y] -> source query followed by optional select statement followed by optional order by followed by optional limit
* eg: Select "hive_db where hive_db has name orderby 'hive_db.owner' limit 2 offset 1"
*
* @return
*/
def singleQuery = singleQrySrc ~ opt(loopExpression) ~ opt(selectClause) ~ opt(orderby) ~ opt(limitOffset) ^^ {
def query(implicit queryParams: QueryParams) = querySrc ~ opt(loopExpression) ~ opt(selectClause) ~ opt(orderby) ~ opt(limitOffset) ^^ {
case s ~ l ~ sel ~ odr ~ lmtoff => {
var expressiontree = s
if (l.isDefined) //Note: The order of if statements is important.
......@@ -169,18 +175,30 @@ object QueryParser extends StandardTokenParsers with QueryKeywords with Expressi
{
expressiontree = order(expressiontree, odr.get._1, odr.get._2)
}
if (lmtoff.isDefined)
{
expressiontree = limit(expressiontree, int (lmtoff.get._1), int (lmtoff.get._2))
}
if (sel.isDefined)
{
expressiontree = select(expressiontree, sel.get)
}
if (queryParams != null && lmtoff.isDefined)
{
val mylimit = int(min(queryParams.limit, max(lmtoff.get._1 - queryParams.offset, 0)))
val myoffset = int(queryParams.offset + lmtoff.get._2)
expressiontree = limit(expressiontree, mylimit, myoffset)
} else if(lmtoff.isDefined) {
expressiontree = limit(expressiontree, int(lmtoff.get._1), int(lmtoff.get._2))
} else if(queryParams != null) {
expressiontree = limit(expressiontree, int(queryParams.limit), int(queryParams.offset))
}
expressiontree
}
}
def querySrc: Parser[Expression] = rep1sep(singleQrySrc, opt(COMMA)) ^^ { l => l match {
case h :: Nil => h
case h :: t => t.foldLeft(h)(merge(_, _))
}
}
/**
* A SingleQuerySrc can have the following forms:
* 1. FROM id [WHERE] [expr] -> from optionally followed by a filter
......@@ -218,14 +236,14 @@ object QueryParser extends StandardTokenParsers with QueryKeywords with Expressi
def fromSrc = identifier ~ AS ~ alias ^^ { case s ~ a ~ al => s.as(al)} |
identifier
def orderby = ORDERBY ~ order ~ opt (asce) ^^ {
def orderby = ORDERBY ~ expr ~ opt (asce) ^^ {
case o ~ odr ~ None => (odr, true)
case o ~ odr ~ asc => (odr, asc.get)
}
def limitOffset = LIMIT ~ lmt ~ opt (offset) ^^ {
case l ~ lt ~ None => (lt, 0)
case l ~ lt ~ of => (lt, of.get)
def limitOffset: Parser[(Int, Int)] = LIMIT ~ lmt ~ opt (offset) ^^ {
case l ~ lt ~ None => (lt.toInt, 0)
case l ~ lt ~ of => (lt.toInt, of.get.toInt)
}
def offset = OFFSET ~ ofset ^^ {
......@@ -237,7 +255,7 @@ object QueryParser extends StandardTokenParsers with QueryKeywords with Expressi
case _ => true
}
def loopExpression: Parser[(Expression, Option[Literal[Integer]], Option[String])] =
def loopExpression(implicit queryParams: QueryParams): Parser[(Expression, Option[Literal[Integer]], Option[String])] =
LOOP ~ (LPAREN ~> query <~ RPAREN) ~ opt(intConstant <~ TIMES) ~ opt(AS ~> alias) ^^ {
case l ~ e ~ None ~ a => (e, None, a)
case l ~ e ~ Some(i) ~ a => (e, Some(int(i)), a)
......@@ -297,8 +315,6 @@ object QueryParser extends StandardTokenParsers with QueryKeywords with Expressi
def ofset = intConstant
def order = ident | stringLit
def asc = ident | stringLit
def literal = booleanConstant ^^ {
......
......@@ -97,7 +97,7 @@ class Resolver(srcExpr: Option[Expression] = None, aliases: Map[String, Expressi
}
case order@OrderExpression(child, odr, asc) => {
val r = new Resolver(Some(child), child.namedExpressions)
return new OrderExpression(child.transformUp(r), odr, asc)
return new OrderExpression(child, odr.transformUp(r), asc)
}
case x => x
}
......
......@@ -22,7 +22,7 @@ import java.util
import java.util.concurrent.atomic.AtomicInteger
import org.apache.atlas.AtlasException
import org.apache.atlas.query.Expressions.{PathExpression, SelectExpression}
import org.apache.atlas.query.Expressions.{LimitExpression, PathExpression, SelectExpression}
import org.apache.atlas.repository.Constants
import org.apache.atlas.typesystem.types.DataTypes.{ArrayType, PrimitiveType, TypeCategory}
import org.apache.atlas.typesystem.types._
......@@ -80,7 +80,7 @@ object TypeUtils {
val resultAttr = new AttributeDefinition(resultAttrName, resultType.getName, Multiplicity.REQUIRED, false, null)
val typName = s"${TEMP_STRUCT_NAME_PREFIX}${tempStructCounter.getAndIncrement}"
val m : java.util.HashMap[String, IDataType[_]] = new util.HashMap[String, IDataType[_]]()
if ( pE.child.isInstanceOf[SelectExpression]) {
if (pE.child.isInstanceOf[SelectExpression] || pE.child.isInstanceOf[LimitExpression]) {
m.put(pE.child.dataType.getName, pE.child.dataType)
}
typSystem.defineQueryResultType(typName, m, pathAttr, resultAttr);
......
......@@ -23,6 +23,7 @@ import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.BaseRepositoryTest;
import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.query.QueryParams;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
......@@ -117,7 +118,7 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
@Test(dataProvider = "dslQueriesProvider")
public void testSearchByDSLQueries(String dslQuery) throws Exception {
System.out.println("Executing dslQuery = " + dslQuery);
String jsonResults = discoveryService.searchByDSL(dslQuery);
String jsonResults = discoveryService.searchByDSL(dslQuery, new QueryParams(100, 0));
assertNotNull(jsonResults);
JSONObject results = new JSONObject(jsonResults);
......
......@@ -30,6 +30,7 @@ import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.RequestContext;
import org.apache.atlas.TestUtils;
import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
import org.apache.atlas.query.QueryParams;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.RepositoryException;
import org.apache.atlas.typesystem.IStruct;
......@@ -91,6 +92,7 @@ public class GraphBackedMetadataRepositoryTest {
private TypeSystem typeSystem;
private String guid;
private QueryParams queryParams = new QueryParams(100, 0);
@BeforeClass
public void setUp() throws Exception {
......@@ -424,7 +426,7 @@ public class GraphBackedMetadataRepositoryTest {
public void testSearchByDSLQuery() throws Exception {
String dslQuery = "hive_database as PII";
System.out.println("Executing dslQuery = " + dslQuery);
String jsonResults = discoveryService.searchByDSL(dslQuery);
String jsonResults = discoveryService.searchByDSL(dslQuery, queryParams);
Assert.assertNotNull(jsonResults);
JSONObject results = new JSONObject(jsonResults);
......@@ -457,7 +459,7 @@ public class GraphBackedMetadataRepositoryTest {
public void testSearchByDSLWithInheritance() throws Exception {
String dslQuery = "Person where name = 'Jane'";
System.out.println("Executing dslQuery = " + dslQuery);
String jsonResults = discoveryService.searchByDSL(dslQuery);
String jsonResults = discoveryService.searchByDSL(dslQuery, queryParams);
Assert.assertNotNull(jsonResults);
JSONObject results = new JSONObject(jsonResults);
......@@ -488,7 +490,7 @@ public class GraphBackedMetadataRepositoryTest {
TestUtils.dumpGraph(graphProvider.get());
System.out.println("Executing dslQuery = " + dslQuery);
String jsonResults = discoveryService.searchByDSL(dslQuery);
String jsonResults = discoveryService.searchByDSL(dslQuery, queryParams);
Assert.assertNotNull(jsonResults);
JSONObject results = new JSONObject(jsonResults);
......@@ -522,7 +524,7 @@ public class GraphBackedMetadataRepositoryTest {
//person in hr department whose name is john
Thread.sleep(sleepInterval);
String response = discoveryService.searchByFullText("john");
String response = discoveryService.searchByFullText("john", queryParams);
Assert.assertNotNull(response);
JSONArray results = new JSONArray(response);
Assert.assertEquals(results.length(), 1);
......@@ -530,7 +532,7 @@ public class GraphBackedMetadataRepositoryTest {
Assert.assertEquals(row.get("typeName"), "Person");
//person in hr department who lives in santa clara
response = discoveryService.searchByFullText("Jane AND santa AND clara");
response = discoveryService.searchByFullText("Jane AND santa AND clara", queryParams);
Assert.assertNotNull(response);
results = new JSONArray(response);
Assert.assertEquals(results.length(), 1);
......@@ -538,7 +540,7 @@ public class GraphBackedMetadataRepositoryTest {
Assert.assertEquals(row.get("typeName"), "Manager");
//search for person in hr department whose name starts is john/jahn
response = discoveryService.searchByFullText("hr AND (john OR jahn)");
response = discoveryService.searchByFullText("hr AND (john OR jahn)", queryParams);
Assert.assertNotNull(response);
results = new JSONArray(response);
Assert.assertEquals(results.length(), 1);
......
......@@ -31,6 +31,7 @@ import org.apache.atlas.RequestContext;
import org.apache.atlas.TestUtils;
import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.query.QueryParams;
import org.apache.atlas.repository.audit.EntityAuditRepository;
import org.apache.atlas.repository.audit.HBaseBasedAuditRepository;
import org.apache.atlas.repository.audit.HBaseTestUtils;
......@@ -230,8 +231,9 @@ public class DefaultMetadataServiceTest {
assertReferenceableEquals(instance, entity);
//Verify that search with reserved characters works - for string attribute
String responseJson = discoveryService.searchByDSL(
String.format("`%s` where `%s` = '%s'", typeDefinition.typeName, strAttrName, entity.get(strAttrName)));
String query =
String.format("`%s` where `%s` = '%s'", typeDefinition.typeName, strAttrName, entity.get(strAttrName));
String responseJson = discoveryService.searchByDSL(query, new QueryParams(1, 0));
JSONObject response = new JSONObject(responseJson);
assertEquals(response.getJSONArray("rows").length(), 1);
}
......
......@@ -229,22 +229,17 @@ public class LocalAtlasClient extends AtlasClient {
}
@Override
public JSONArray search(final String searchQuery) throws AtlasServiceException {
public JSONArray search(final String searchQuery, final int limit, final int offset) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public JSONArray searchByDSL(final String query) throws AtlasServiceException {
public JSONArray searchByDSL(final String query, final int limit, final int offset) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public JSONArray searchByGremlin(final String gremlinQuery) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public JSONObject searchByFullText(final String query) throws AtlasServiceException {
public JSONObject searchByFullText(final String query, final int limit, final int offset) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
......
......@@ -462,7 +462,7 @@ public class QuickStart {
private void search() throws Exception {
for (String dslQuery : getDSLQueries()) {
JSONArray results = metadataServiceClient.search(dslQuery);
JSONArray results = metadataServiceClient.search(dslQuery, 10, 0);
if (results != null) {
System.out.println("query [" + dslQuery + "] returned [" + results.length() + "] rows");
} else {
......
......@@ -20,10 +20,13 @@ package org.apache.atlas.web.resources;
import com.google.common.base.Preconditions;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.utils.ParamChecker;
import org.apache.atlas.AtlasProperties;
import org.apache.atlas.classification.InterfaceAudience;
import org.apache.atlas.discovery.DiscoveryException;
import org.apache.atlas.discovery.DiscoveryService;
import org.apache.atlas.query.QueryParams;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.utils.ParamChecker;
import org.apache.atlas.web.util.Servlets;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
......@@ -34,6 +37,7 @@ import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
......@@ -42,7 +46,6 @@ import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Jersey Resource for metadata operations.
......@@ -56,6 +59,7 @@ public class MetadataDiscoveryResource {
private static final String QUERY_TYPE_DSL = "dsl";
private static final String QUERY_TYPE_GREMLIN = "gremlin";
private static final String QUERY_TYPE_FULLTEXT = "full-text";
private static final String LIMIT_OFFSET_DEFAULT = "-1";
private final DiscoveryService discoveryService;
......@@ -73,50 +77,49 @@ public class MetadataDiscoveryResource {
/**
* Search using a given query.
*
* @param query search query in raw gremlin or DSL format falling back to full text.
* @param query search query in DSL format falling back to full text.
* @param limit number of rows to be returned in the result, used for pagination. maxlimit > limit > 0. -1 maps to atlas.search.defaultlimit property value
* @param offset offset to the results returned, used for pagination. offset >= 0. -1 maps to offset 0
* @return JSON representing the type and results.
*/
@GET
@Path("search")
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE)
public Response search(@QueryParam("query") String query) {
JSONObject response;
public Response search(@QueryParam("query") String query,
@DefaultValue(LIMIT_OFFSET_DEFAULT) @QueryParam("limit") int limit,
@DefaultValue(LIMIT_OFFSET_DEFAULT) @QueryParam("offset") int offset) {
AtlasPerfTracer perf = null;
try { // fall back to dsl
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "MetadataDiscoveryResource.search(" + query + ")");
}
ParamChecker.notEmpty(query, "query cannot be null");
final String jsonResultStr = discoveryService.searchByDSL(query);
response = new DSLJSONResponseBuilder().results(jsonResultStr).query(query).build();
return Response.ok(response).build();
} catch (IllegalArgumentException e) {
LOG.error("Unable to get entity list for empty query", e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (Throwable throwable) {
LOG.error("Unable to get entity list for query {} using dsl", query, throwable);
return searchUsingFullText(query);
} finally {
AtlasPerfTracer.log(perf);
Response response = searchUsingQueryDSL(query, limit, offset);
if (response.getStatus() != Response.Status.OK.getStatusCode()) {
response = searchUsingFullText(query, limit, offset);
}
AtlasPerfTracer.log(perf);
return response;
}
/**
* Search using query DSL format.
*
* @param dslQuery search query in DSL format.
* @param limit number of rows to be returned in the result, used for pagination. maxlimit > limit > 0. -1 maps to atlas.search.defaultlimit property value
* @param offset offset to the results returned, used for pagination. offset >= 0. -1 maps to offset 0
* Limit and offset in API are used in conjunction with limit and offset in DSL query
* Final limit = min(API limit, max(query limit - API offset, 0))
* Final offset = API offset + query offset
*
* @return JSON representing the type and results.
*/
@GET
@Path("search/dsl")
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE)
public Response searchUsingQueryDSL(@QueryParam("query") String dslQuery) {
public Response searchUsingQueryDSL(@QueryParam("query") String dslQuery,
@DefaultValue(LIMIT_OFFSET_DEFAULT) @QueryParam("limit") int limit,
@DefaultValue(LIMIT_OFFSET_DEFAULT) @QueryParam("offset") int offset) {
AtlasPerfTracer perf = null;
try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
......@@ -124,7 +127,8 @@ public class MetadataDiscoveryResource {
}
ParamChecker.notEmpty(dslQuery, "dslQuery cannot be null");
final String jsonResultStr = discoveryService.searchByDSL(dslQuery);
QueryParams queryParams = validateQueryParams(limit, offset);
final String jsonResultStr = discoveryService.searchByDSL(dslQuery, queryParams);
JSONObject response = new DSLJSONResponseBuilder().results(jsonResultStr).query(dslQuery).build();
......@@ -140,6 +144,28 @@ public class MetadataDiscoveryResource {
}
}
private QueryParams validateQueryParams(int limitParam, int offsetParam) {
int maxLimit = AtlasProperties.getProperty(AtlasProperties.AtlasProperty.SEARCH_MAX_LIMIT);
int defaultLimit = AtlasProperties.getProperty(AtlasProperties.AtlasProperty.SEARCH_DEFAULT_LIMIT);
int limit = defaultLimit;
boolean limitSet = (limitParam != Integer.valueOf(LIMIT_OFFSET_DEFAULT));
if (limitSet) {
ParamChecker.lessThan(limitParam, maxLimit, "limit");
ParamChecker.greaterThan(limitParam, 0, "limit");
limit = limitParam;
}
int offset = 0;
boolean offsetSet = (offsetParam != Integer.valueOf(LIMIT_OFFSET_DEFAULT));
if (offsetSet) {
ParamChecker.greaterThan(offsetParam, -1, "offset");
offset = offsetParam;
}
return new QueryParams(limit, offset);
}
/**
* Search using raw gremlin query format.
*
......@@ -189,13 +215,18 @@ public class MetadataDiscoveryResource {
* Search using full text search.
*
* @param query search query.
* @param limit number of rows to be returned in the result, used for pagination. maxlimit > limit > 0. -1 maps to atlas.search.defaultlimit property value
* @param offset offset to the results returned, used for pagination. offset >= 0. -1 maps to offset 0
* NOTE: Pagination is not implemented currently for full text search, so limit and offset are not used
* @return JSON representing the type and results.
*/
@GET
@Path("search/fulltext")
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE)
public Response searchUsingFullText(@QueryParam("query") String query) {
public Response searchUsingFullText(@QueryParam("query") String query,
@DefaultValue(LIMIT_OFFSET_DEFAULT) @QueryParam("limit") int limit,
@DefaultValue(LIMIT_OFFSET_DEFAULT) @QueryParam("offset") int offset) {
AtlasPerfTracer perf = null;
try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
......@@ -203,7 +234,8 @@ public class MetadataDiscoveryResource {
}
ParamChecker.notEmpty(query, "query cannot be null or empty");
final String jsonResultStr = discoveryService.searchByFullText(query);
QueryParams queryParams = validateQueryParams(limit, offset);
final String jsonResultStr = discoveryService.searchByFullText(query, queryParams);
JSONArray rowsJsonArr = new JSONArray(jsonResultStr);
JSONObject response = new FullTextJSonResponseBuilder().results(rowsJsonArr).query(query).build();
......
......@@ -72,8 +72,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
entity.get("name")));
JSONArray results = searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, entity.get("name")));
return results.length() == 1;
}
});
......@@ -90,8 +89,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
entity.get("name")));
JSONArray results = searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, entity.get("name")));
return results.length() == 1;
}
});
......@@ -146,14 +144,13 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
newName));
JSONArray results = searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, newName));
return results.length() == 1;
}
});
//no entity with the old qualified name
JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
JSONArray results = searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
assertEquals(results.length(), 0);
}
......@@ -195,8 +192,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
dbName));
JSONArray results = searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
return results.length() == 1;
}
});
......
......@@ -54,6 +54,7 @@ import org.apache.atlas.utils.ParamChecker;
import org.apache.atlas.web.util.Servlets;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -342,4 +343,8 @@ public abstract class BaseResourceIT {
}
};
}
protected JSONArray searchByDSL(String dslQuery) throws AtlasServiceException {
return serviceClient.searchByDSL(dslQuery, 10, 0);
}
}
......@@ -178,8 +178,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
}
});
JSONArray results =
serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
JSONArray results = searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
assertEquals(results.length(), 1);
//create entity again shouldn't create another instance with same unique attribute value
......@@ -197,7 +196,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
//expected timeout
}
results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
results = searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
assertEquals(results.length(), 1);
//Test the same across references
......@@ -208,7 +207,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
table.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableName);
serviceClient.createEntity(table);
results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
results = searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
assertEquals(results.length(), 1);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment