Commit 7c262b40 by apoorvnaik Committed by Madhan Neethiraj

ATLAS-1880: search API with support for entity/tag attribute filters

parent 8101883c
......@@ -22,6 +22,7 @@ import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.core.util.MultivaluedMapImpl;
import org.apache.atlas.model.SearchFilter;
import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasClassification.AtlasClassifications;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
......@@ -100,9 +101,13 @@ public class AtlasClientV2 extends AtlasBaseClient {
private static final String DISCOVERY_URI = BASE_URI + "v2/search";
private static final String DSL_URI = DISCOVERY_URI + "/dsl";
private static final String FULL_TEXT_URI = DISCOVERY_URI + "/fulltext";
private static final String BASIC_SEARCH_URI = DISCOVERY_URI + "/basic";
private static final String FACETED_SEARCH_URI = BASIC_SEARCH_URI;
private static final APIInfo DSL_SEARCH = new APIInfo(DSL_URI, HttpMethod.GET, Response.Status.OK);
private static final APIInfo FULL_TEXT_SEARCH = new APIInfo(FULL_TEXT_URI, HttpMethod.GET, Response.Status.OK);
private static final APIInfo BASIC_SEARCH = new APIInfo(BASIC_SEARCH_URI, HttpMethod.GET, Response.Status.OK);
private static final APIInfo FACETED_SEARCH = new APIInfo(FACETED_SEARCH_URI, HttpMethod.POST, Response.Status.OK);
public AtlasClientV2(String[] baseUrl, String[] basicAuthUserNamePassword) {
......@@ -398,6 +403,23 @@ public class AtlasClientV2 extends AtlasBaseClient {
return callAPI(FULL_TEXT_SEARCH, AtlasSearchResult.class, queryParams);
}
public AtlasSearchResult basicSearch(final String typeName, final String classification, final String query,
final boolean excludeDeletedEntities, final int limit, final int offset) throws AtlasServiceException {
MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
queryParams.add("typeName", typeName);
queryParams.add("classification", classification);
queryParams.add(QUERY, query);
queryParams.add("excludeDeletedEntities", String.valueOf(excludeDeletedEntities));
queryParams.add(LIMIT, String.valueOf(limit));
queryParams.add(OFFSET, String.valueOf(offset));
return callAPI(BASIC_SEARCH, AtlasSearchResult.class, queryParams);
}
public AtlasSearchResult facetedSearch(SearchParameters searchParameters) throws AtlasServiceException {
return callAPI(FACETED_SEARCH, AtlasSearchResult.class, searchParameters);
}
private <T> T getTypeDefByName(final String name, Class<T> typeDefClass) throws AtlasServiceException {
String atlasPath = getAtlasPath(typeDefClass);
APIInfo apiInfo = new APIInfo(String.format(GET_BY_NAME_TEMPLATE, atlasPath, name), HttpMethod.GET, Response.Status.OK);
......
......@@ -96,7 +96,9 @@ public final class Constants {
public static final String QUALIFIED_NAME = "Referenceable.qualifiedName";
public static final String TYPE_NAME_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "typeName";
public static final String INDEX_SEARCH_MAX_RESULT_SET_SIZE = "atlas.graph.index.search.max-result-set-size";
public static final String INDEX_SEARCH_MAX_TYPES_COUNT = "atlas.graph.index.search.max-types-count";
public static final String INDEX_SEARCH_MAX_TAGS_COUNT = "atlas.graph.index.search.max-tags-count";
private Constants() {
}
......
......@@ -59,6 +59,23 @@
</layout>
</appender>
<!-- Uncomment the following for perf logs -->
<!--
<appender name="perf_appender" class="org.apache.log4j.DailyRollingFileAppender">
<param name="file" value="${atlas.log.dir}/atlas_perf.log" />
<param name="datePattern" value="'.'yyyy-MM-dd" />
<param name="append" value="true" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d|%t|%m%n" />
</layout>
</appender>
<logger name="org.apache.atlas.perf" additivity="false">
<level value="debug" />
<appender-ref ref="perf_appender" />
</logger>
-->
<logger name="org.apache.atlas" additivity="false">
<level value="info"/>
<appender-ref ref="FILE"/>
......
......@@ -47,7 +47,7 @@ public interface AtlasGraphQuery<V, E> {
* the specified list of values.
*
* @param propertyKey
* @param value
* @param values
* @return
*/
AtlasGraphQuery<V, E> in(String propertyKey, Collection<?> values);
......@@ -56,7 +56,6 @@ public interface AtlasGraphQuery<V, E> {
/**
* Executes the query and returns the matching vertices.
* @return
* @throws AtlasException
*/
Iterable<AtlasVertex<V, E>> vertices();
......@@ -66,16 +65,32 @@ public interface AtlasGraphQuery<V, E> {
*/
Iterable<AtlasEdge<V, E>> edges();
/**
* Executes the query and returns the matching vertices from given offset till the max limit
* @param limit max number of vertices
* @return
*/
Iterable<AtlasVertex<V, E>> vertices(int limit);
/**
* Executes the query and returns the matching vertices from given offset till the max limit
* @param offset starting offset
* @param limit max number of vertices
* @return
*/
Iterable<AtlasVertex<V, E>> vertices(int offset, int limit);
/**
* Adds a predicate that the returned vertices must have the specified
* property and that its value matches the criterion specified.
*
* @param propertyKey
* @param value
* @param op
* @param values
* @return
*/
AtlasGraphQuery<V, E> has(String propertyKey, ComparisionOperator compMethod, Object values);
AtlasGraphQuery<V, E> has(String propertyKey, QueryOperator op, Object values);
/**
* Adds a predicate that the vertices returned must satisfy the
......@@ -94,17 +109,31 @@ public interface AtlasGraphQuery<V, E> {
AtlasGraphQuery<V, E> createChildQuery();
interface QueryOperator {}
/**
* Comparison operators that can be used in an AtlasGraphQuery.
*/
enum ComparisionOperator {
enum ComparisionOperator implements QueryOperator {
GREATER_THAN,
GREATER_THAN_EQUAL,
EQUAL,
LESS_THAN,
LESS_THAN_EQUAL,
NOT_EQUAL
}
/**
* String/text matching that can be used in AtlasGraphQuery
*/
enum MatchingOperator implements QueryOperator {
CONTAINS,
PREFIX,
SUFFIX,
REGEX
}
/**
* Adds all of the predicates that have been added to this query to the
* specified query.
* @param otherQuery
......
......@@ -36,6 +36,14 @@ public interface AtlasIndexQuery<V, E> {
Iterator<Result<V, E>> vertices();
/**
* Gets the query results
* @param offset starting offset
* @param limit max number of results
* @return
*/
Iterator<Result<V, E>> vertices(int offset, int limit);
/**
* Query result from an index query.
*
* @param <V>
......
......@@ -42,15 +42,29 @@ public interface AtlasVertexQuery<V, E> {
Iterable<AtlasVertex<V, E>> vertices();
/**
* Returns the vertices that satisfy the query condition.
*
* @param limit Max number of vertices
* @return
*/
Iterable<AtlasVertex<V, E>> vertices(int limit);
/**
* Returns the incident edges that satisfy the query condition.
* @return
*/
Iterable<AtlasEdge<V, E>> edges();
/**
* Returns the incident edges that satisfy the query condition.
* @param limit Max number of edges
* @return
*/
Iterable<AtlasEdge<V, E>> edges(int limit);
/**
* Returns the number of elements that match the query.
* @return
*/
long count();
}
......@@ -17,12 +17,12 @@
*/
package org.apache.atlas.repository.graphdb.titan.query;
import java.util.Collection;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery.ComparisionOperator;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery.QueryOperator;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import java.util.Collection;
/**
* Interfaces that provides a thin wrapper around GraphQuery (used by Titan0) and
* TitanGraphQuery (used by Titan 1).
......@@ -47,6 +47,22 @@ public interface NativeTitanGraphQuery<V, E> {
Iterable<AtlasEdge<V, E>> edges();
/**
* Executes graph query
* @param limit Max vertices to return
* @return
*/
Iterable<AtlasVertex<V, E>> vertices(int limit);
/**
* Executes graph query
* @param offset Starting offset
* @param limit Max vertices to return
* @return
*/
Iterable<AtlasVertex<V, E>> vertices(int offset, int limit);
/**
* Adds an in condition to the query.
*
* @param propertyName
......@@ -61,6 +77,5 @@ public interface NativeTitanGraphQuery<V, E> {
* @param op
* @param value
*/
void has(String propertyName, ComparisionOperator op, Object value);
void has(String propertyName, QueryOperator op, Object value);
}
......@@ -17,11 +17,8 @@
*/
package org.apache.atlas.repository.graphdb.titan.query;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
......@@ -33,6 +30,13 @@ import org.apache.atlas.repository.graphdb.titan.query.expr.OrCondition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
/**
* Abstract implementation of AtlasGraphQuery that is used by both Titan 0.5.4
* and Titan 1.0.0.
......@@ -123,11 +127,10 @@ public abstract class TitanGraphQuery<V, E> implements AtlasGraphQuery<V, E> {
@Override
public Iterable<AtlasVertex<V, E>> vertices() {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing: " + queryCondition.toString());
LOG.debug("Executing: " + queryCondition);
}
//compute the overall result by unioning the results from all of the
//AndConditions together.
// Compute the overall result by combining the results of all the AndConditions (nested within OR) together.
Set<AtlasVertex<V, E>> result = new HashSet<>();
for(AndCondition andExpr : queryCondition.getAndTerms()) {
NativeTitanGraphQuery<V, E> andQuery = andExpr.create(getQueryFactory());
......@@ -141,11 +144,10 @@ public abstract class TitanGraphQuery<V, E> implements AtlasGraphQuery<V, E> {
@Override
public Iterable<AtlasEdge<V, E>> edges() {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing: " + queryCondition.toString());
LOG.debug("Executing: " + queryCondition);
}
//compute the overall result by unioning the results from all of the
//AndConditions together.
// Compute the overall result by combining the results of all the AndConditions (nested within OR) together.
Set<AtlasEdge<V, E>> result = new HashSet<>();
for(AndCondition andExpr : queryCondition.getAndTerms()) {
NativeTitanGraphQuery<V, E> andQuery = andExpr.create(getQueryFactory());
......@@ -157,7 +159,46 @@ public abstract class TitanGraphQuery<V, E> implements AtlasGraphQuery<V, E> {
}
@Override
public AtlasGraphQuery<V, E> has(String propertyKey, ComparisionOperator operator,
public Iterable<AtlasVertex<V, E>> vertices(int limit) {
return vertices(0, limit);
}
@Override
public Iterable<AtlasVertex<V, E>> vertices(int offset, int limit) {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing: " + queryCondition);
}
Preconditions.checkArgument(offset >= 0, "Offset must be non-negative");
Preconditions.checkArgument(limit >= 0, "Limit must be non-negative");
// Compute the overall result by combining the results of all the AndConditions (nested within OR) together.
Set<AtlasVertex<V, E>> result = new HashSet<>();
long resultIdx = 0;
for(AndCondition andExpr : queryCondition.getAndTerms()) {
if (result.size() == limit) {
break;
}
NativeTitanGraphQuery<V, E> andQuery = andExpr.create(getQueryFactory());
for(AtlasVertex<V, E> vertex : andQuery.vertices(offset + limit)) {
if (resultIdx >= offset) {
result.add(vertex);
if (result.size() == limit) {
break;
}
}
resultIdx++;
}
}
return result;
}
@Override
public AtlasGraphQuery<V, E> has(String propertyKey, QueryOperator operator,
Object value) {
queryCondition.andWith(new HasPredicate(propertyKey, operator, value));
return this;
......
......@@ -17,12 +17,12 @@
*/
package org.apache.atlas.repository.graphdb.titan.query.expr;
import java.util.ArrayList;
import java.util.List;
import org.apache.atlas.repository.graphdb.titan.query.NativeTitanGraphQuery;
import org.apache.atlas.repository.graphdb.titan.query.NativeTitanQueryFactory;
import java.util.ArrayList;
import java.util.List;
/**
* Represents an AndCondition in a graph query. Only vertices that
* satisfy the conditions in all of the query predicates will be returned
......@@ -78,7 +78,7 @@ public class AndCondition {
/**
* Creates a NativeTitanGraphQuery that can be used to evaluate this condition.
*
* @param graph
* @param factory
* @return
*/
public <V, E> NativeTitanGraphQuery<V, E> create(NativeTitanQueryFactory<V, E> factory) {
......
......@@ -17,7 +17,7 @@
*/
package org.apache.atlas.repository.graphdb.titan.query.expr;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery.ComparisionOperator;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery.QueryOperator;
import org.apache.atlas.repository.graphdb.titan.query.NativeTitanGraphQuery;
/**
......@@ -27,11 +27,10 @@ import org.apache.atlas.repository.graphdb.titan.query.NativeTitanGraphQuery;
public class HasPredicate implements QueryPredicate {
private String propertyName;
private ComparisionOperator op;
private QueryOperator op;
private Object value;
public HasPredicate(String propertyName, ComparisionOperator op, Object value) {
super();
public HasPredicate(String propertyName, QueryOperator op, Object value) {
this.propertyName = propertyName;
this.op = op;
this.value = value;
......
......@@ -285,7 +285,33 @@ public class GraphCentricQueryBuilder implements TitanGraphQuery<GraphCentricQue
}
if (index.isCompositeIndex()) {
subcondition = indexCover((CompositeIndexType) index, conditions, subcover);
CompositeIndexType compositeIndex = (CompositeIndexType)index;
subcondition = indexCover(compositeIndex, conditions, subcover);
// if this is unique index, use it!!
if (compositeIndex.getCardinality() == Cardinality.SINGLE && subcondition != null) {
bestCandidate = null; // will cause the outer while() to bail out
candidateSubcover = subcover;
candidateSubcondition = subcondition;
candidateSupportsSort = supportsSort;
if (log.isDebugEnabled()) {
log.debug("selected unique index {}", compositeIndex.getName());
}
if (coveredClauses.isEmpty()) {
isSorted = candidateSupportsSort;
}
coveredClauses.clear();;
coveredClauses.addAll(candidateSubcover);
jointQuery = new JointIndexQuery();
jointQuery.add(compositeIndex, serializer.getQuery(compositeIndex, (List<Object[]>)candidateSubcondition));
break;
}
} else {
subcondition = indexCover((MixedIndexType) index, conditions, serializer, subcover);
if (coveredClauses.isEmpty() && !supportsSort
......
......@@ -19,6 +19,7 @@ package org.apache.atlas.repository.graphdb.titan0;
import java.util.Iterator;
import com.google.common.base.Preconditions;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
......@@ -56,6 +57,26 @@ public class Titan0IndexQuery implements AtlasIndexQuery<Titan0Vertex, Titan0Edg
return Iterators.transform(results, function);
}
@Override
public Iterator<Result<Titan0Vertex, Titan0Edge>> vertices(int offset, int limit) {
Preconditions.checkArgument(offset >=0, "Index offset should be greater than or equals to 0");
Preconditions.checkArgument(limit >=0, "Index limit should be greater than or equals to 0");
Iterator<TitanIndexQuery.Result<Vertex>> results = wrappedIndexQuery
.offset(offset)
.limit(limit)
.vertices().iterator();
Function<TitanIndexQuery.Result<Vertex>, AtlasIndexQuery.Result<Titan0Vertex, Titan0Edge>> function =
new Function<TitanIndexQuery.Result<Vertex>, AtlasIndexQuery.Result<Titan0Vertex, Titan0Edge>>() {
@Override
public AtlasIndexQuery.Result<Titan0Vertex, Titan0Edge> apply(TitanIndexQuery.Result<Vertex> source) {
return new ResultImpl(source);
}
};
return Iterators.transform(results, function);
}
private final class ResultImpl implements AtlasIndexQuery.Result<Titan0Vertex, Titan0Edge> {
private TitanIndexQuery.Result<Vertex> wrappedResult;
......
......@@ -17,6 +17,7 @@
*/
package org.apache.atlas.repository.graphdb.titan0;
import com.google.common.base.Preconditions;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasVertex;
......@@ -53,12 +54,26 @@ public class Titan0VertexQuery implements AtlasVertexQuery<Titan0Vertex, Titan0E
}
@Override
public Iterable<AtlasVertex<Titan0Vertex, Titan0Edge>> vertices(int limit) {
Preconditions.checkArgument(limit >=0, "Limit should be greater than or equals to 0");
Iterable<Vertex> vertices = vertexQuery.limit(limit).vertices();
return graph.wrapVertices(vertices);
}
@Override
public Iterable<AtlasEdge<Titan0Vertex, Titan0Edge>> edges() {
Iterable<Edge> edges = vertexQuery.edges();
return graph.wrapEdges(edges);
}
@Override
public Iterable<AtlasEdge<Titan0Vertex, Titan0Edge>> edges(int limit) {
Preconditions.checkArgument(limit >=0, "Limit should be greater than or equals to 0");
Iterable<Edge> edges = vertexQuery.limit(limit).edges();
return graph.wrapEdges(edges);
}
@Override
public long count() {
return vertexQuery.count();
}
......
......@@ -17,21 +17,25 @@
*/
package org.apache.atlas.repository.graphdb.titan0.query;
import java.util.Collection;
import com.google.common.collect.Lists;
import com.thinkaurelius.titan.core.TitanGraphQuery;
import com.thinkaurelius.titan.core.attribute.Contain;
import com.thinkaurelius.titan.core.attribute.Text;
import com.thinkaurelius.titan.graphdb.query.TitanPredicate;
import com.tinkerpop.blueprints.Compare;
import com.tinkerpop.blueprints.Vertex;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery.ComparisionOperator;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery.MatchingOperator;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery.QueryOperator;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.graphdb.titan.query.NativeTitanGraphQuery;
import org.apache.atlas.repository.graphdb.titan0.Titan0GraphDatabase;
import org.apache.atlas.repository.graphdb.titan0.Titan0Edge;
import org.apache.atlas.repository.graphdb.titan0.Titan0Graph;
import org.apache.atlas.repository.graphdb.titan0.Titan0GraphDatabase;
import org.apache.atlas.repository.graphdb.titan0.Titan0Vertex;
import com.thinkaurelius.titan.core.TitanGraphQuery;
import com.thinkaurelius.titan.core.attribute.Contain;
import com.thinkaurelius.titan.graphdb.query.TitanPredicate;
import com.tinkerpop.blueprints.Compare;
import java.util.*;
/**
* Titan 0.5.4 implementation of NativeTitanGraphQuery.
......@@ -60,6 +64,28 @@ public class NativeTitan0GraphQuery implements NativeTitanGraphQuery<Titan0Verte
Iterable it = query.edges();
return graph.wrapEdges(it);
}
@Override
public Iterable<AtlasVertex<Titan0Vertex, Titan0Edge>> vertices(int limit) {
Iterable it = query.limit(limit).vertices();
return graph.wrapVertices(it);
}
@Override
public Iterable<AtlasVertex<Titan0Vertex, Titan0Edge>> vertices(int offset, int limit) {
List<Vertex> result = new ArrayList<>(limit);
Iterator<Vertex> iter = query.limit(offset + limit).vertices().iterator();
for (long resultIdx = 0; iter.hasNext() && result.size() < limit; resultIdx++) {
if (resultIdx < offset) {
continue;
}
result.add(iter.next());
}
return graph.wrapVertices(result);
}
@Override
public void in(String propertyName, Collection<?> values) {
......@@ -68,26 +94,48 @@ public class NativeTitan0GraphQuery implements NativeTitanGraphQuery<Titan0Verte
}
@Override
public void has(String propertyName, ComparisionOperator op, Object value) {
Compare c = getGremlinPredicate(op);
TitanPredicate pred = TitanPredicate.Converter.convert(c);
public void has(String propertyName, QueryOperator op, Object value) {
TitanPredicate pred;
if (op instanceof ComparisionOperator) {
Compare c = getGremlinPredicate((ComparisionOperator) op);
pred = TitanPredicate.Converter.convert(c);
} else {
pred = getGremlinPredicate((MatchingOperator) op);
}
query.has(propertyName, pred, value);
}
private Text getGremlinPredicate(MatchingOperator op) {
switch (op) {
case CONTAINS:
return Text.CONTAINS;
case PREFIX:
return Text.PREFIX;
case SUFFIX:
return Text.CONTAINS_REGEX;
case REGEX:
return Text.REGEX;
default:
throw new RuntimeException("Unsupported matching operator:" + op);
}
}
private Compare getGremlinPredicate(ComparisionOperator op) {
switch (op) {
case EQUAL:
return Compare.EQUAL;
case GREATER_THAN_EQUAL:
return Compare.GREATER_THAN_EQUAL;
case LESS_THAN_EQUAL:
return Compare.LESS_THAN_EQUAL;
case NOT_EQUAL:
return Compare.NOT_EQUAL;
default:
throw new RuntimeException("Unsupported comparison operator:" + op);
case EQUAL:
return Compare.EQUAL;
case GREATER_THAN:
return Compare.GREATER_THAN;
case GREATER_THAN_EQUAL:
return Compare.GREATER_THAN_EQUAL;
case LESS_THAN:
return Compare.LESS_THAN;
case LESS_THAN_EQUAL:
return Compare.LESS_THAN_EQUAL;
case NOT_EQUAL:
return Compare.NOT_EQUAL;
default:
throw new RuntimeException("Unsupported comparison operator:" + op);
}
}
......
......@@ -19,6 +19,7 @@ package org.apache.atlas.repository.graphdb.titan1;
import java.util.Iterator;
import com.google.common.base.Preconditions;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
......@@ -56,6 +57,27 @@ public class Titan1IndexQuery implements AtlasIndexQuery<Titan1Vertex, Titan1Edg
return Iterators.transform(results, function);
}
@Override
public Iterator<Result<Titan1Vertex, Titan1Edge>> vertices(int offset, int limit) {
Preconditions.checkArgument(offset >=0, "Index offset should be greater than or equals to 0");
Preconditions.checkArgument(limit >=0, "Index limit should be greater than or equals to 0");
Iterator<TitanIndexQuery.Result<TitanVertex>> results = query
.offset(offset)
.limit(limit)
.vertices().iterator();
Function<TitanIndexQuery.Result<TitanVertex>, Result<Titan1Vertex, Titan1Edge>> function =
new Function<TitanIndexQuery.Result<TitanVertex>, Result<Titan1Vertex, Titan1Edge>>() {
@Override
public Result<Titan1Vertex, Titan1Edge> apply(TitanIndexQuery.Result<TitanVertex> source) {
return new ResultImpl(source);
}
};
return Iterators.transform(results, function);
}
/**
* Titan 1.0.0 implementation of AtlasIndexQuery.Result.
*/
......
......@@ -17,6 +17,7 @@
*/
package org.apache.atlas.repository.graphdb.titan1;
import com.google.common.base.Preconditions;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasVertex;
......@@ -51,10 +52,23 @@ public class Titan1VertexQuery implements AtlasVertexQuery<Titan1Vertex, Titan1E
}
@Override
public Iterable<AtlasVertex<Titan1Vertex, Titan1Edge>> vertices(int limit) {
Preconditions.checkArgument(limit >=0, "Limit should be greater than or equals to 0");
Iterable vertices = query.limit(limit).vertices();
return graph.wrapVertices(vertices);
}
@Override
public Iterable<AtlasEdge<Titan1Vertex, Titan1Edge>> edges() {
Iterable edges = query.edges();
return graph.wrapEdges(edges);
}
@Override
public Iterable<AtlasEdge<Titan1Vertex, Titan1Edge>> edges(int limit) {
Preconditions.checkArgument(limit >=0, "Limit should be greater than or equals to 0");
Iterable edges = query.limit(limit).edges();
return graph.wrapEdges(edges);
}
@Override
......
......@@ -17,11 +17,16 @@
*/
package org.apache.atlas.repository.graphdb.titan1.query;
import java.util.Collection;
import com.thinkaurelius.titan.core.TitanEdge;
import com.thinkaurelius.titan.core.TitanGraphQuery;
import com.thinkaurelius.titan.core.TitanVertex;
import com.thinkaurelius.titan.core.attribute.Contain;
import com.thinkaurelius.titan.core.attribute.Text;
import com.thinkaurelius.titan.graphdb.query.TitanPredicate;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery.ComparisionOperator;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery.MatchingOperator;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery.QueryOperator;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.graphdb.titan.query.NativeTitanGraphQuery;
import org.apache.atlas.repository.graphdb.titan1.Titan1Edge;
......@@ -29,11 +34,9 @@ import org.apache.atlas.repository.graphdb.titan1.Titan1Graph;
import org.apache.atlas.repository.graphdb.titan1.Titan1GraphDatabase;
import org.apache.atlas.repository.graphdb.titan1.Titan1Vertex;
import org.apache.tinkerpop.gremlin.process.traversal.Compare;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import com.thinkaurelius.titan.core.TitanGraphQuery;
import com.thinkaurelius.titan.core.TitanVertex;
import com.thinkaurelius.titan.core.attribute.Contain;
import com.thinkaurelius.titan.graphdb.query.TitanPredicate;
import java.util.*;
/**
* Titan 1.0.0 implementation of NativeTitanGraphQuery.
......@@ -61,32 +64,77 @@ public class NativeTitan1GraphQuery implements NativeTitanGraphQuery<Titan1Verte
}
@Override
public Iterable<AtlasVertex<Titan1Vertex, Titan1Edge>> vertices(int limit) {
Iterable<TitanVertex> it = query.limit(limit).vertices();
return graph.wrapVertices(it);
}
@Override
public Iterable<AtlasVertex<Titan1Vertex, Titan1Edge>> vertices(int offset, int limit) {
List<Vertex> result = new ArrayList<>(limit);
Iterator<? extends Vertex> iter = query.limit(offset + limit).vertices().iterator();
for (long resultIdx = 0; iter.hasNext() && result.size() < limit; resultIdx++) {
if (resultIdx < offset) {
continue;
}
result.add(iter.next());
}
return graph.wrapVertices(result);
}
@Override
public void in(String propertyName, Collection<? extends Object> values) {
query.has(propertyName, Contain.IN, values);
}
@Override
public void has(String propertyName, ComparisionOperator op, Object value) {
Compare c = getGremlinPredicate(op);
TitanPredicate pred = TitanPredicate.Converter.convert(c);
public void has(String propertyName, QueryOperator op, Object value) {
TitanPredicate pred;
if (op instanceof ComparisionOperator) {
Compare c = getGremlinPredicate((ComparisionOperator) op);
pred = TitanPredicate.Converter.convert(c);
} else {
pred = getGremlinPredicate((MatchingOperator)op);
}
query.has(propertyName, pred, value);
}
private Text getGremlinPredicate(MatchingOperator op) {
switch (op) {
case CONTAINS:
return Text.CONTAINS;
case PREFIX:
return Text.PREFIX;
case SUFFIX:
return Text.CONTAINS_REGEX;
case REGEX:
return Text.REGEX;
default:
throw new RuntimeException("Unsupported matching operator:" + op);
}
}
private Compare getGremlinPredicate(ComparisionOperator op) {
switch (op) {
case EQUAL:
return Compare.eq;
case GREATER_THAN_EQUAL:
return Compare.gte;
case LESS_THAN_EQUAL:
return Compare.lte;
case NOT_EQUAL:
return Compare.neq;
default:
throw new RuntimeException("Unsupported comparison operator:" + op);
case EQUAL:
return Compare.eq;
case GREATER_THAN:
return Compare.gt;
case GREATER_THAN_EQUAL:
return Compare.gte;
case LESS_THAN:
return Compare.lt;
case LESS_THAN_EQUAL:
return Compare.lte;
case NOT_EQUAL:
return Compare.neq;
default:
throw new RuntimeException("Unsupported comparison operator:" + op);
}
}
......
......@@ -39,10 +39,9 @@ import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONL
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY)
public class AtlasSearchResult implements Serializable {
private AtlasQueryType queryType;
private SearchParameters searchParameters;
private String queryText;
private String type;
private String classification;
......@@ -59,11 +58,24 @@ public class AtlasSearchResult implements Serializable {
public AtlasSearchResult(String queryText, AtlasQueryType queryType) {
setQueryText(queryText);
setQueryType(queryType);
setSearchParameters(null);
setEntities(null);
setAttributes(null);
setFullTextResult(null);
}
public AtlasSearchResult(SearchParameters searchParameters) {
setQueryType(AtlasQueryType.BASIC);
if (searchParameters != null) {
setQueryText(searchParameters.getQuery());
setSearchParameters(searchParameters);
setEntities(null);
setAttributes(null);
setFullTextResult(null);
}
}
public AtlasQueryType getQueryType() { return queryType; }
public void setQueryType(AtlasQueryType queryType) { this.queryType = queryType; }
......@@ -98,6 +110,7 @@ public class AtlasSearchResult implements Serializable {
if (o == null || getClass() != o.getClass()) return false;
AtlasSearchResult that = (AtlasSearchResult) o;
return Objects.equals(queryType, that.queryType) &&
Objects.equals(searchParameters, that.searchParameters) &&
Objects.equals(queryText, that.queryText) &&
Objects.equals(type, that.type) &&
Objects.equals(classification, that.classification) &&
......@@ -107,12 +120,13 @@ public class AtlasSearchResult implements Serializable {
}
@Override
public int hashCode() { return Objects.hash(queryText, queryType, entities, attributes, fullTextResult, type, classification); }
public int hashCode() { return Objects.hash(queryType, searchParameters, queryText, type, classification, entities, attributes, fullTextResult); }
@Override
public String toString() {
return "AtlasSearchResult{" +
"queryType=" + queryType +
", searchParameters='" + searchParameters + '\'' +
", queryText='" + queryText + '\'' +
", type=" + type +
", classification=" + classification +
......@@ -149,6 +163,14 @@ public class AtlasSearchResult implements Serializable {
}
}
public void setSearchParameters(SearchParameters searchParameters) {
this.searchParameters = searchParameters;
}
public SearchParameters getSearchParameters() {
return searchParameters;
}
public enum AtlasQueryType { DSL, FULL_TEXT, GREMLIN, BASIC, ATTRIBUTE }
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.model.discovery;
import org.codehaus.jackson.annotate.JsonAutoDetect;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.annotate.JsonValue;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE;
import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY;
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class SearchParameters {
private String query;
private String typeName;
private String classification;
private boolean excludeDeletedEntities;
private int limit;
private int offset;
private FilterCriteria entityFilters;
private FilterCriteria tagFilters;
private Set<String> attributes;
/**
* @return The type of query
*/
public String getQuery() {
return query;
}
/**
* Set query type
* @param query type
*/
public void setQuery(String query) {
this.query = query;
}
/**
* @return Type name to search on
*/
public String getTypeName() {
return typeName;
}
/**
* Set the type name to search on
* @param typeName type name
*/
public void setTypeName(String typeName) {
this.typeName = typeName;
}
/**
*
* @return Classification/tag to search on
*/
public String getClassification() {
return classification;
}
/**
* Set the classification/tag to search on
* @param classification classification/tag name
*/
public void setClassification(String classification) {
this.classification = classification;
}
/**
* @return True iff deleted entities are excluded
*/
public boolean getExcludeDeletedEntities() {
return excludeDeletedEntities;
}
/**
* Exclude deleted entities from search
* @param excludeDeletedEntities boolean flag
*/
public void setExcludeDeletedEntities(boolean excludeDeletedEntities) {
this.excludeDeletedEntities = excludeDeletedEntities;
}
/**
* @return Max number of results to be returned
*/
public int getLimit() {
return limit;
}
/**
* Restrict the results to the specified limit
* @param limit max number of results
*/
public void setLimit(int limit) {
this.limit = limit;
}
/**
* @return Offset(pagination) of the results
*/
public int getOffset() {
return offset;
}
/**
* @param offset
*/
public void setOffset(int offset) {
this.offset = offset;
}
/**
* Entity attribute filters for the type (if type name is specified)
* @return
*/
public FilterCriteria getEntityFilters() {
return entityFilters;
}
/**
* Filter the entities on this criteria
* @param entityFilters
*/
public void setEntityFilters(FilterCriteria entityFilters) {
this.entityFilters = entityFilters;
}
/**
* Tag attribute filters for the classification/tag (if tag name is specified)
* @return
*/
public FilterCriteria getTagFilters() {
return tagFilters;
}
/**
* Filter the tag/classification on this criteria
* @param tagFilters
*/
public void setTagFilters(FilterCriteria tagFilters) {
this.tagFilters = tagFilters;
}
/**
* Attribute values included in the results
* @return
*/
public Set<String> getAttributes() {
return attributes;
}
/**
* Return these attributes in the result response
* @param attributes
*/
public void setAttributes(Set<String> attributes) {
this.attributes = attributes;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SearchParameters that = (SearchParameters) o;
return excludeDeletedEntities == that.excludeDeletedEntities &&
limit == that.limit &&
offset == that.offset &&
Objects.equals(query, that.query) &&
Objects.equals(typeName, that.typeName) &&
Objects.equals(classification, that.classification) &&
Objects.equals(entityFilters, that.entityFilters) &&
Objects.equals(tagFilters, that.tagFilters) &&
Objects.equals(attributes, that.attributes);
}
@Override
public int hashCode() {
return Objects.hash(query, typeName, classification, excludeDeletedEntities, limit, offset, entityFilters, tagFilters, attributes);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("SearchParameters{");
sb.append("query='").append(query).append('\'');
sb.append(", typeName='").append(typeName).append('\'');
sb.append(", classification='").append(classification).append('\'');
sb.append(", excludeDeletedEntities=").append(excludeDeletedEntities);
sb.append(", limit=").append(limit);
sb.append(", offset=").append(offset);
sb.append(", entityFilters=").append(entityFilters);
sb.append(", tagFilters=").append(tagFilters);
sb.append(", attributes=").append(attributes);
sb.append('}');
return sb.toString();
}
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public static class FilterCriteria {
// Can be presented as a group of conditions or a single condition
public enum Condition { AND, OR }
// Single condition
private String attributeName;
private Operator operator;
private String attributeValue;
// Complex conditions
private Condition condition;
private List<FilterCriteria> criterion;
public String getAttributeName() {
return attributeName;
}
public void setAttributeName(String attributeName) {
this.attributeName = attributeName;
}
public Operator getOperator() {
return operator;
}
public void setOperator(Operator operator) {
this.operator = operator;
}
public String getAttributeValue() {
return attributeValue;
}
public void setAttributeValue(String attributeValue) {
this.attributeValue = attributeValue;
}
public Condition getCondition() {
return condition;
}
public void setCondition(Condition condition) {
this.condition = condition;
}
public List<FilterCriteria> getCriterion() {
return criterion;
}
public void setCriterion(List<FilterCriteria> criterion) {
this.criterion = criterion;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FilterCriteria that = (FilterCriteria) o;
return Objects.equals(attributeName, that.attributeName) &&
Objects.equals(operator, that.operator) &&
Objects.equals(attributeValue, that.attributeValue) &&
condition == that.condition &&
Objects.equals(criterion, that.criterion);
}
@Override
public int hashCode() {
return Objects.hash(attributeName, operator, attributeValue, condition, criterion);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("FilterCriteria{");
sb.append("attributeName='").append(attributeName).append('\'');
sb.append(", operator=").append(operator);
sb.append(", attributeValue='").append(attributeValue).append('\'');
sb.append(", condition=").append(condition);
sb.append(", criterion=").append(criterion);
sb.append('}');
return sb.toString();
}
}
/**
* Supported search operations
* Logical comparision operators can only be used with numbers or dates
* IN, LIKE, startsWith, endsWith, CONTAINS can only be used with strings or text
*/
public enum Operator {
LT(new String[]{"<", "lt"}),
GT(new String[]{">", "gt"}),
LTE(new String[]{"<=", "lte"}),
GTE(new String[]{">=", "gte"}),
EQ(new String[]{"eq", "="}),
NEQ(new String[]{"neq", "!="}),
IN(new String[]{"in", "IN"}),
LIKE(new String[]{"like", "LIKE"}),
STARTS_WITH(new String[]{"startsWith", "STARTSWITH", "begins_with", "BEGINS_WITH"}),
ENDS_WITH(new String[]{"endsWith", "ENDSWITH", "ends_with", "BEGINS_WITH"}),
CONTAINS(new String[]{"contains", "CONTAINS"})
;
static final Map<String, Operator> operatorsMap = new HashMap<>();
private String[] symbols;
static {
for (Operator operator : Operator.values()) {
for (String s : operator.symbols) {
operatorsMap.put(s, operator);
}
}
}
Operator(String[] symbols) {
this.symbols = symbols;
}
@JsonCreator
public static Operator fromString(String symbol) {
return operatorsMap.get(symbol);
}
@JsonValue
public String getSymbol() {
return symbols[0];
}
public String[] getSymbols() {
return symbols;
}
@Override
public String toString() {
return getSymbol();
}
}
}
......@@ -18,14 +18,12 @@
package org.apache.atlas.model.impexp;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.codehaus.jackson.annotate.JsonAnySetter;
import org.codehaus.jackson.annotate.JsonAutoDetect;
import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
......@@ -33,12 +31,9 @@ import java.util.Map;
import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE;
import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY;
@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown=true)
@XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY)
public class AtlasImportRequest implements Serializable {
private static final long serialVersionUID = 1L;
public static final String TRANSFORMS_KEY = "transforms";
......@@ -97,4 +92,10 @@ public class AtlasImportRequest implements Serializable {
return (String) this.options.get(key);
}
}
@JsonAnySetter
public void setOption(String key, String value) {
if (null == options) {
options = new HashMap<>();
}
options.put(key, value);
}}
......@@ -21,6 +21,7 @@ package org.apache.atlas.discovery;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.discovery.SearchParameters;
public interface AtlasDiscoveryService {
/**
......@@ -56,4 +57,12 @@ public interface AtlasDiscoveryService {
*/
AtlasSearchResult searchUsingBasicQuery(String query, String type, String classification, String attrName,
String attrValuePrefix, boolean excludeDeletedEntities, int limit, int offset) throws AtlasBaseException;
/**
* Search for entities matching the search criteria
* @param searchParameters Search criteria
* @return Matching entities
* @throws AtlasBaseException
*/
AtlasSearchResult searchUsingBasicQuery(SearchParameters searchParameters) throws AtlasBaseException;
}
......@@ -20,12 +20,14 @@ package org.apache.atlas.discovery;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.discovery.AtlasSearchResult.AtlasFullTextResult;
import org.apache.atlas.model.discovery.AtlasSearchResult.AtlasQueryType;
import org.apache.atlas.model.discovery.AtlasSearchResult.AttributeSearchResult;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.AtlasException;
import org.apache.atlas.model.instance.AtlasEntityHeader;
......@@ -86,23 +88,28 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
private final EntityGraphRetriever entityRetriever;
private final AtlasGremlinQueryProvider gremlinQueryProvider;
private final AtlasTypeRegistry typeRegistry;
private final SearchPipeline searchPipeline;
private final int maxResultSetSize;
private final int maxTypesCountInIdxQuery;
private final int maxTagsCountInIdxQuery;
@Inject
EntityDiscoveryService(MetadataRepository metadataRepository, AtlasTypeRegistry typeRegistry, AtlasGraph graph) throws AtlasException {
EntityDiscoveryService(MetadataRepository metadataRepository, AtlasTypeRegistry typeRegistry,
AtlasGraph graph, SearchPipeline searchPipeline) throws AtlasException {
this.graph = graph;
this.graphPersistenceStrategy = new DefaultGraphPersistenceStrategy(metadataRepository);
this.entityRetriever = new EntityGraphRetriever(typeRegistry);
this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE;
this.typeRegistry = typeRegistry;
this.maxResultSetSize = ApplicationProperties.get().getInt("atlas.graph.index.search.max-result-set-size", 150);
this.maxTypesCountInIdxQuery = ApplicationProperties.get().getInt("atlas.graph.index.search.max-types-count", 10);
this.maxTagsCountInIdxQuery = ApplicationProperties.get().getInt("atlas.graph.index.search.max-tags-count", 10);
this.searchPipeline = searchPipeline;
this.maxResultSetSize = ApplicationProperties.get().getInt(Constants.INDEX_SEARCH_MAX_RESULT_SET_SIZE, 150);
this.maxTypesCountInIdxQuery = ApplicationProperties.get().getInt(Constants.INDEX_SEARCH_MAX_TYPES_COUNT, 10);
this.maxTagsCountInIdxQuery = ApplicationProperties.get().getInt(Constants.INDEX_SEARCH_MAX_TAGS_COUNT, 10);
}
@Override
@GraphTransaction
public AtlasSearchResult searchUsingDslQuery(String dslQuery, int limit, int offset) throws AtlasBaseException {
AtlasSearchResult ret = new AtlasSearchResult(dslQuery, AtlasQueryType.DSL);
GremlinQuery gremlinQuery = toGremlinQuery(dslQuery, limit, offset);
......@@ -155,6 +162,7 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
}
@Override
@GraphTransaction
public AtlasSearchResult searchUsingFullTextQuery(String fullTextQuery, boolean excludeDeletedEntities, int limit, int offset)
throws AtlasBaseException {
AtlasSearchResult ret = new AtlasSearchResult(fullTextQuery, AtlasQueryType.FULL_TEXT);
......@@ -170,6 +178,7 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
}
@Override
@GraphTransaction
public AtlasSearchResult searchUsingBasicQuery(String query, String typeName, String classification, String attrName,
String attrValuePrefix, boolean excludeDeletedEntities, int limit,
int offset) throws AtlasBaseException {
......@@ -393,6 +402,22 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
return ret;
}
@Override
@GraphTransaction
public AtlasSearchResult searchUsingBasicQuery(SearchParameters searchParameters) throws AtlasBaseException {
AtlasSearchResult ret = new AtlasSearchResult(searchParameters);
List<AtlasVertex> resultList = searchPipeline.run(searchParameters);
for (AtlasVertex atlasVertex : resultList) {
AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader(atlasVertex, searchParameters.getAttributes());
ret.addEntity(entity);
}
return ret;
}
private String getQueryForFullTextSearch(String userKeyedString, String typeName, String classification) {
String typeFilter = getTypeFilter(typeRegistry, typeName, maxTypesCountInIdxQuery);
String classficationFilter = getClassificationFilter(typeRegistry, classification, maxTagsCountInIdxQuery);
......@@ -548,4 +573,5 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
public int getMaxResultSetSize() {
return maxResultSetSize;
}
}
......@@ -21,6 +21,7 @@ package org.apache.atlas.discovery;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.lineage.AtlasLineageInfo;
......@@ -62,6 +63,7 @@ public class EntityLineageService implements AtlasLineageService {
}
@Override
@GraphTransaction
public AtlasLineageInfo getAtlasLineageInfo(String guid, LineageDirection direction, int depth) throws AtlasBaseException {
AtlasLineageInfo lineageInfo;
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.discovery;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria;
import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria.Condition;
import org.apache.atlas.model.discovery.SearchParameters.Operator;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import static org.apache.atlas.discovery.SearchPipeline.IndexResultType;
import static org.apache.atlas.discovery.SearchPipeline.PipelineContext;
import static org.apache.atlas.discovery.SearchPipeline.PipelineStep;
import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.ComparisionOperator;
import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.MatchingOperator;
@Component
public class GremlinStep implements PipelineStep {
private static final Logger LOG = LoggerFactory.getLogger(GremlinStep.class);
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("GremlinSearchStep");
private final AtlasGraph graph;
private final AtlasTypeRegistry typeRegistry;
enum GremlinFilterQueryType { TAG, ENTITY }
@Inject
public GremlinStep(AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
this.graph = graph;
this.typeRegistry = typeRegistry;
}
@Override
public void execute(PipelineContext context) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> GremlinStep.execute({})", context);
}
if (context == null) {
throw new AtlasBaseException("Can't start search without any context");
}
AtlasPerfTracer perf = null;
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "GremlinSearchStep.execute(" + context + ")");
}
final Iterator<AtlasVertex> result;
if (context.hasIndexResults()) {
// We have some results from the indexed step, let's proceed accordingly
if (context.getIndexResultType() == IndexResultType.TAG) {
// Index search was done on tag and filters
if (context.isTagProcessingComplete()) {
LOG.debug("GremlinStep.execute(): index has completely processed tag, further TAG filtering not needed");
Set<String> taggedVertexGUIDs = new HashSet<>();
Iterator<AtlasIndexQuery.Result> tagVertexIterator = context.getIndexResultsIterator();
while (tagVertexIterator.hasNext()) {
// Find out which Vertex has this outgoing edge
AtlasVertex vertex = tagVertexIterator.next().getVertex();
Iterable<AtlasEdge> edges = vertex.getEdges(AtlasEdgeDirection.IN);
for (AtlasEdge edge : edges) {
String guid = AtlasGraphUtilsV1.getIdFromVertex(edge.getOutVertex());
taggedVertexGUIDs.add(guid);
}
}
// No entities are tagged (actually this check is already done)
if (!taggedVertexGUIDs.isEmpty()) {
result = processEntity(taggedVertexGUIDs, context);
} else {
result = null;
}
} else {
result = processTagAndEntity(Collections.<String>emptySet(), context);
}
} else if (context.getIndexResultType() == IndexResultType.TEXT) {
// Index step processed full-text;
Set<String> entityIDs = getVertexIDs(context.getIndexResultsIterator());
result = processTagAndEntity(entityIDs, context);
} else if (context.getIndexResultType() == IndexResultType.ENTITY) {
// Index step processed entity and it's filters; tag filter wouldn't be set
Set<String> entityIDs = getVertexIDs(context.getIndexResultsIterator());
result = processEntity(entityIDs, context);
} else {
result = null;
}
} else {
// No index results, need full processing in Gremlin
if (context.getClassificationType() != null) {
// Process tag and filters first, then entity filters
result = processTagAndEntity(Collections.<String>emptySet(), context);
} else {
result = processEntity(Collections.<String>emptySet(), context);
}
}
context.setGremlinResultIterator(result);
AtlasPerfTracer.log(perf);
if (LOG.isDebugEnabled()) {
LOG.debug("<== GremlinStep.execute({})", context);
}
}
private Iterator<AtlasVertex> processEntity(Set<String> entityGUIDs, PipelineContext context) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> GremlinStep.processEntity(entityGUIDs={})", entityGUIDs);
}
final Iterator<AtlasVertex> ret;
SearchParameters searchParameters = context.getSearchParameters();
AtlasEntityType entityType = context.getEntityType();
if (entityType != null) {
AtlasGraphQuery entityFilterQuery = context.getGraphQuery("ENTITY_FILTER");
if (entityFilterQuery == null) {
entityFilterQuery = graph.query().in(Constants.TYPE_NAME_PROPERTY_KEY, entityType.getTypeAndAllSubTypes());
if (searchParameters.getEntityFilters() != null) {
toGremlinFilterQuery(GremlinFilterQueryType.ENTITY, entityType, searchParameters.getEntityFilters(), entityFilterQuery, context);
}
if (searchParameters.getExcludeDeletedEntities()) {
entityFilterQuery.has(Constants.STATE_PROPERTY_KEY, "ACTIVE");
}
context.cacheGraphQuery("ENTITY_FILTER", entityFilterQuery);
}
// Now get all vertices
if (CollectionUtils.isEmpty(entityGUIDs)) {
ret = entityFilterQuery.vertices(context.getCurrentOffset(), context.getMaxLimit()).iterator();
} else {
AtlasGraphQuery guidQuery = graph.query().in(Constants.GUID_PROPERTY_KEY, entityGUIDs);
if (entityFilterQuery != null) {
guidQuery.addConditionsFrom(entityFilterQuery);
} else if (searchParameters.getExcludeDeletedEntities()) {
guidQuery.has(Constants.STATE_PROPERTY_KEY, "ACTIVE");
}
ret = guidQuery.vertices(context.getMaxLimit()).iterator();
}
} else if (CollectionUtils.isNotEmpty(entityGUIDs)) {
AtlasGraphQuery guidQuery = graph.query().in(Constants.GUID_PROPERTY_KEY, entityGUIDs);
if (searchParameters.getExcludeDeletedEntities()) {
guidQuery.has(Constants.STATE_PROPERTY_KEY, "ACTIVE");
}
Iterable<AtlasVertex> vertices = guidQuery.vertices(context.getMaxLimit());
ret = vertices.iterator();
} else {
ret = null;
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== GremlinStep.processEntity(entityGUIDs={})", entityGUIDs);
}
return ret;
}
private Iterator<AtlasVertex> processTagAndEntity(Set<String> entityGUIDs, PipelineContext context) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> GremlinStep.processTagAndEntity(entityGUIDs={})", entityGUIDs);
}
final Iterator<AtlasVertex> ret;
AtlasClassificationType classificationType = context.getClassificationType();
if (classificationType != null) {
AtlasGraphQuery tagVertexQuery = context.getGraphQuery("TAG_VERTEX");
if (tagVertexQuery == null) {
tagVertexQuery = graph.query().in(Constants.TYPE_NAME_PROPERTY_KEY, classificationType.getTypeAndAllSubTypes());
SearchParameters searchParameters = context.getSearchParameters();
// Do tag filtering first as it'll return a smaller subset of vertices
if (searchParameters.getTagFilters() != null) {
toGremlinFilterQuery(GremlinFilterQueryType.TAG, classificationType, searchParameters.getTagFilters(), tagVertexQuery, context);
}
context.cacheGraphQuery("TAG_VERTEX", tagVertexQuery);
}
if (tagVertexQuery != null) {
Set<String> taggedVertexGuids = new HashSet<>();
// Now get all vertices after adjusting offset for each iteration
LOG.debug("Firing TAG query");
Iterator<AtlasVertex> tagVertexIterator = tagVertexQuery.vertices(context.getCurrentOffset(), context.getMaxLimit()).iterator();
while (tagVertexIterator.hasNext()) {
// Find out which Vertex has this outgoing edge
Iterable<AtlasEdge> edges = tagVertexIterator.next().getEdges(AtlasEdgeDirection.IN);
for (AtlasEdge edge : edges) {
String guid = AtlasGraphUtilsV1.getIdFromVertex(edge.getOutVertex());
taggedVertexGuids.add(guid);
}
}
entityGUIDs = taggedVertexGuids;
}
}
if (!entityGUIDs.isEmpty()) {
ret = processEntity(entityGUIDs, context);
} else {
ret = null;
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== GremlinStep.processTagAndEntity(entityGUIDs={})", entityGUIDs);
}
return ret;
}
private Set<String> getVertexIDs(Iterator<AtlasIndexQuery.Result> idxResultsIterator) {
Set<String> guids = new HashSet<>();
while (idxResultsIterator.hasNext()) {
AtlasVertex vertex = idxResultsIterator.next().getVertex();
String guid = AtlasGraphUtilsV1.getIdFromVertex(vertex);
guids.add(guid);
}
return guids;
}
private Set<String> getVertexIDs(Iterable<AtlasVertex> vertices) {
Set<String> guids = new HashSet<>();
for (AtlasVertex vertex : vertices) {
String guid = AtlasGraphUtilsV1.getIdFromVertex(vertex);
guids.add(guid);
}
return guids;
}
private AtlasGraphQuery toGremlinFilterQuery(GremlinFilterQueryType queryType, AtlasStructType type, FilterCriteria criteria,
AtlasGraphQuery query, PipelineContext context) {
if (criteria.getCondition() != null) {
if (criteria.getCondition() == Condition.AND) {
for (FilterCriteria filterCriteria : criteria.getCriterion()) {
AtlasGraphQuery nestedQuery = toGremlinFilterQuery(queryType, type, filterCriteria, graph.query(), context);
query.addConditionsFrom(nestedQuery);
}
} else {
List<AtlasGraphQuery> orConditions = new LinkedList<>();
for (FilterCriteria filterCriteria : criteria.getCriterion()) {
AtlasGraphQuery nestedQuery = toGremlinFilterQuery(queryType, type, filterCriteria, graph.query(), context);
// FIXME: Something might not be right here as the queries are getting overwritten sometimes
orConditions.add(graph.query().createChildQuery().addConditionsFrom(nestedQuery));
}
if (!orConditions.isEmpty()) {
query.or(orConditions);
}
}
} else {
String attrName = criteria.getAttributeName();
String attrValue = criteria.getAttributeValue();
Operator operator = criteria.getOperator();
try {
// If attribute belongs to supertype then adjust the name accordingly
final String qualifiedAttributeName;
final boolean attrProcessed;
if (queryType == GremlinFilterQueryType.TAG) {
qualifiedAttributeName = type.getQualifiedAttributeName(attrName);
attrProcessed = context.hasProcessedTagAttribute(qualifiedAttributeName);
} else {
qualifiedAttributeName = type.getQualifiedAttributeName(attrName);
attrProcessed = context.hasProcessedEntityAttribute(qualifiedAttributeName);
}
// Check if the qualifiedAttribute has been processed
if (!attrProcessed) {
switch (operator) {
case LT:
query.has(qualifiedAttributeName, ComparisionOperator.LESS_THAN, attrValue);
break;
case LTE:
query.has(qualifiedAttributeName, ComparisionOperator.LESS_THAN_EQUAL, attrValue);
break;
case GT:
query.has(qualifiedAttributeName, ComparisionOperator.GREATER_THAN, attrValue);
break;
case GTE:
query.has(qualifiedAttributeName, ComparisionOperator.GREATER_THAN_EQUAL, attrValue);
break;
case EQ:
query.has(qualifiedAttributeName, ComparisionOperator.EQUAL, attrValue);
break;
case NEQ:
query.has(qualifiedAttributeName, ComparisionOperator.NOT_EQUAL, attrValue);
break;
case LIKE:
// TODO: Maybe we need to validate pattern
query.has(qualifiedAttributeName, MatchingOperator.REGEX, getLikeRegex(attrValue));
break;
case CONTAINS:
query.has(qualifiedAttributeName, MatchingOperator.REGEX, getContainsRegex(attrValue));
break;
case STARTS_WITH:
query.has(qualifiedAttributeName, MatchingOperator.PREFIX, attrValue);
break;
case ENDS_WITH:
query.has(qualifiedAttributeName, MatchingOperator.REGEX, getSuffixRegex(attrValue));
case IN:
LOG.warn("{}: unsupported operator. Ignored", operator);
break;
}
}
} catch (AtlasBaseException e) {
LOG.error("toGremlinFilterQuery(): failed for attrName=" + attrName + "; operator=" + operator + "; attrValue=" + attrValue, e);
}
}
return query;
}
private String getContainsRegex(String attributeValue) {
return ".*" + attributeValue + ".*";
}
private String getSuffixRegex(String attributeValue) {
return ".*" + attributeValue;
}
private String getLikeRegex(String attributeValue) { return ".*" + attributeValue + ".*"; }
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.discovery;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria;
import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria.Condition;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graphdb.*;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.SearchTracker;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Component
public class SearchPipeline {
private static final Logger LOG = LoggerFactory.getLogger(SearchPipeline.class);
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("SearchPipeline");
enum ExecutionMode { SOLR, GREMLIN, MIXED }
enum IndexResultType { TAG, ENTITY, TEXT }
private final SolrStep solrStep;
private final GremlinStep gremlinStep;
private final SearchTracker searchTracker;
private final AtlasTypeRegistry typeRegistry;
private final Configuration atlasConfiguration;
private final GraphBackedSearchIndexer indexer;
@Inject
public SearchPipeline(SolrStep solrStep, GremlinStep gremlinStep, SearchTracker searchTracker, AtlasTypeRegistry typeRegistry, Configuration atlasConfiguration, GraphBackedSearchIndexer indexer) {
this.solrStep = solrStep;
this.gremlinStep = gremlinStep;
this.searchTracker = searchTracker;
this.typeRegistry = typeRegistry;
this.atlasConfiguration = atlasConfiguration;
this.indexer = indexer;
}
public List<AtlasVertex> run(SearchParameters searchParameters) throws AtlasBaseException {
final List<AtlasVertex> ret;
AtlasPerfTracer perf = null;
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "SearchPipeline.run("+ searchParameters +")");
}
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(searchParameters.getTypeName());
AtlasClassificationType classiType = typeRegistry.getClassificationTypeByName(searchParameters.getClassification());
PipelineContext context = new PipelineContext(searchParameters, entityType, classiType, indexer.getVertexIndexKeys());
String searchId = searchTracker.add(context); // For future cancellation
try {
ExecutionMode mode = determineExecutionMode(context);
if (LOG.isDebugEnabled()) {
LOG.debug("Execution mode {}", mode);
}
switch (mode) {
case SOLR:
ret = runOnlySolr(context);
break;
case GREMLIN:
ret = runOnlyGremlin(context);
break;
case MIXED:
ret = runMixed(context);
break;
default:
ret = Collections.emptyList();
}
} finally {
searchTracker.remove(searchId);
AtlasPerfTracer.log(perf);
}
return ret;
}
private List<AtlasVertex> runOnlySolr(PipelineContext context) throws AtlasBaseException {
// Only when there's no tag and query
List<AtlasVertex> results = new ArrayList<>();
while (results.size() < context.getSearchParameters().getLimit()) {
if (context.getForceTerminate()) {
LOG.debug("search has been terminated");
break;
}
// Execute solr search only
solrStep.execute(context);
List<AtlasVertex> stepResults = getIndexResults(context);
context.incrementSearchRound();
addToResult(results, stepResults, context.getSearchParameters().getLimit());
if (LOG.isDebugEnabled()) {
LOG.debug("Pipeline iteration {}: stepResults={}; totalResult={}", context.getIterationCount(), stepResults.size(), results.size());
}
if (CollectionUtils.isEmpty(stepResults)) {
// If no result is found any subsequent iteration then just stop querying the index
break;
}
}
if (context.getIndexResultType() == IndexResultType.TAG) {
List<AtlasVertex> entityVertices = new ArrayList<>(results.size());
for (AtlasVertex tagVertex : results) {
Iterable<AtlasEdge> edges = tagVertex.getEdges(AtlasEdgeDirection.IN);
for (AtlasEdge edge : edges) {
AtlasVertex entityVertex = edge.getOutVertex();
entityVertices.add(entityVertex);
}
}
results = entityVertices;
}
return results;
}
private List<AtlasVertex> runOnlyGremlin(PipelineContext context) throws AtlasBaseException {
List<AtlasVertex> results = new ArrayList<>();
while (results.size() < context.getSearchParameters().getLimit()) {
if (context.getForceTerminate()) {
LOG.debug("search has been terminated");
break;
}
gremlinStep.execute(context);
List<AtlasVertex> stepResults = getGremlinResults(context);
context.incrementSearchRound();
addToResult(results, stepResults, context.getSearchParameters().getLimit());
if (LOG.isDebugEnabled()) {
LOG.debug("Pipeline iteration {}: stepResults={}; totalResult={}", context.getIterationCount(), stepResults.size(), results.size());
}
if (CollectionUtils.isEmpty(stepResults)) {
// If no result is found any subsequent iteration then just stop querying the index
break;
}
}
return results;
}
/*
1. Index processes few attributes and then gremlin processes rest
1.1 Iterate for gremlin till the index results are non null
2. Index processes all attributes, gremlin has nothing to do
Sometimes the result set might be less than the max limit and we need to iterate until the result set is full
or the iteration doesn't return any results
*/
private List<AtlasVertex> runMixed(PipelineContext context) throws AtlasBaseException {
List<AtlasVertex> results = new ArrayList<>();
while (results.size() < context.getSearchParameters().getLimit()) {
if (context.getForceTerminate()) {
LOG.debug("search has been terminated");
break;
}
// Execute Solr search and then pass it to the Gremlin step (if needed)
solrStep.execute(context);
if (!context.hasIndexResults()) {
if (LOG.isDebugEnabled()) {
LOG.debug("No index results in iteration {}", context.getIterationCount());
}
// If no result is found any subsequent iteration then just stop querying the index
break;
}
// Attributes partially processed by Solr, use gremlin to process remaining attribute(s)
gremlinStep.execute(context);
context.incrementSearchRound();
List<AtlasVertex> stepResults = getGremlinResults(context);
addToResult(results, stepResults, context.getSearchParameters().getLimit());
if (LOG.isDebugEnabled()) {
LOG.debug("Pipeline iteration {}: stepResults={}; totalResult={}", context.getIterationCount(), stepResults.size(), results.size());
}
}
return results;
}
private void addToResult(List<AtlasVertex> result, List<AtlasVertex> stepResult, int maxLimit) {
if (result != null && stepResult != null && result.size() < maxLimit) {
for (AtlasVertex vertex : stepResult) {
result.add(vertex);
if (result.size() >= maxLimit) {
break;
}
}
}
}
private List<AtlasVertex> getIndexResults(PipelineContext pipelineContext) {
List<AtlasVertex> ret = new ArrayList<>();
if (pipelineContext.hasIndexResults()) {
Iterator<AtlasIndexQuery.Result> iter = pipelineContext.getIndexResultsIterator();
while(iter.hasNext()) {
ret.add(iter.next().getVertex());
}
}
return ret;
}
private List<AtlasVertex> getGremlinResults(PipelineContext pipelineContext) {
List<AtlasVertex> ret = new ArrayList<>();
if (pipelineContext.hasGremlinResults()) {
Iterator<AtlasVertex> iter = pipelineContext.getGremlinResultIterator();
while (iter.hasNext()) {
ret.add(iter.next());
}
}
return ret;
}
private ExecutionMode determineExecutionMode(PipelineContext context) {
SearchParameters searchParameters = context.getSearchParameters();
AtlasClassificationType classificationType = context.getClassificationType();
AtlasEntityType entityType = context.getEntityType();
int solrCount = 0;
int gremlinCount = 0;
if (StringUtils.isNotEmpty(searchParameters.getQuery())) {
solrCount++;
// __state index only exists in vertex_index
if (searchParameters.getExcludeDeletedEntities()) {
gremlinCount++;
}
}
if (classificationType != null) {
Set<String> typeAndAllSubTypes = classificationType.getTypeAndAllSubTypes();
if (typeAndAllSubTypes.size() > atlasConfiguration.getInt(Constants.INDEX_SEARCH_MAX_TAGS_COUNT, 10)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Classification type {} has too many subTypes ({}) to use in Solr search. Gremlin will be used to execute the search",
classificationType.getTypeName(), typeAndAllSubTypes.size());
}
gremlinCount++;
} else {
if (hasNonIndexedAttrViolation(classificationType, context.getIndexedKeys(), searchParameters.getTagFilters())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Tag filters not suitable for Solr search. Gremlin will be used to execute the search");
}
gremlinCount++;
} else {
solrCount++;
// __state index only exist in vertex_index
if (searchParameters.getExcludeDeletedEntities()) {
gremlinCount++;
}
}
}
}
if (entityType != null) {
Set<String> typeAndAllSubTypes = entityType.getTypeAndAllSubTypes();
if (typeAndAllSubTypes.size() > atlasConfiguration.getInt(Constants.INDEX_SEARCH_MAX_TYPES_COUNT, 10)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Entity type {} has too many subTypes ({}) to use in Solr search. Gremlin will be used to execute the search",
entityType.getTypeName(), typeAndAllSubTypes.size());
}
gremlinCount++;
} else {
if (hasNonIndexedAttrViolation(entityType, context.getIndexedKeys(), searchParameters.getEntityFilters())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Entity filters not suitable for Solr search. Gremlin will be used to execute the search");
}
gremlinCount++;
} else {
solrCount++;
}
}
}
ExecutionMode mode = ExecutionMode.MIXED;
if (solrCount == 1 && gremlinCount == 0) {
mode = ExecutionMode.SOLR;
} else if (gremlinCount == 1 && solrCount == 0) {
mode = ExecutionMode.GREMLIN;
}
return mode;
}
// If Index can't process all attributes and any of the non-indexed attribute is present in OR nested within AND
// then the only way is Gremlin
// A violation (here) is defined as presence of non-indexed attribute within any OR clause nested under an AND clause
// the reason being that the index would not be able to process the nested OR attribute which might result in
// exclusion of valid result (vertex)
private boolean hasNonIndexedAttrViolation(AtlasStructType structType, Set<String> indexKeys, FilterCriteria filterCriteria) {
return hasNonIndexedAttrViolation(structType, indexKeys, filterCriteria, false);
}
private boolean hasNonIndexedAttrViolation(AtlasStructType structType, Set<String> indexKeys, FilterCriteria filterCriteria, boolean enclosedInOrCondition) {
if (filterCriteria == null) {
return false;
}
boolean ret = false;
Condition filterCondition = filterCriteria.getCondition();
List<FilterCriteria> criterion = filterCriteria.getCriterion();
if (filterCondition != null && CollectionUtils.isNotEmpty(criterion)) {
if (!enclosedInOrCondition) {
enclosedInOrCondition = filterCondition == Condition.OR;
}
// If we have nested criterion let's find any nested ORs with non-indexed attr
for (FilterCriteria criteria : criterion) {
ret |= hasNonIndexedAttrViolation(structType, indexKeys, criteria, enclosedInOrCondition);
if (ret) {
break;
}
}
} else if (StringUtils.isNotEmpty(filterCriteria.getAttributeName())) {
// If attribute qualified name doesn't exist in the vertex index we potentially might have a problem
try {
String qualifiedAttributeName = structType.getQualifiedAttributeName(filterCriteria.getAttributeName());
ret = CollectionUtils.isEmpty(indexKeys) || !indexKeys.contains(qualifiedAttributeName);
if (ret) {
LOG.warn("search includes non-indexed attribute '{}'; might cause poor performance", qualifiedAttributeName);
}
} catch (AtlasBaseException e) {
LOG.warn(e.getMessage());
ret = true;
}
}
// return ret && enclosedInOrCondition;
return ret;
}
public interface PipelineStep {
void execute(PipelineContext context) throws AtlasBaseException;
}
public static class PipelineContext {
// TODO: See if anything can be cached in the context
private final SearchParameters searchParameters;
private final AtlasEntityType entityType;
private final AtlasClassificationType classificationType;
private final Set<String> indexedKeys;
private int iterationCount;
private boolean forceTerminate;
private int currentOffset;
private int maxLimit;
// Continuous processing stuff
private Set<String> tagSearchAttributes = new HashSet<>();
private Set<String> entitySearchAttributes = new HashSet<>();
private Set<String> tagAttrProcessedBySolr = new HashSet<>();
private Set<String> entityAttrProcessedBySolr = new HashSet<>();
// Results related stuff
private IndexResultType indexResultType;
private Iterator<AtlasIndexQuery.Result> indexResultsIterator;
private Iterator<AtlasVertex> gremlinResultIterator;
private Map<String, AtlasIndexQuery> cachedIndexQueries = new HashMap<>();
private Map<String, AtlasGraphQuery> cachedGraphQueries = new HashMap<>();
public PipelineContext(SearchParameters searchParameters, AtlasEntityType entityType, AtlasClassificationType classificationType, Set<String> indexedKeys) {
this.searchParameters = searchParameters;
this.entityType = entityType;
this.classificationType = classificationType;
this.indexedKeys = indexedKeys;
currentOffset = searchParameters.getOffset();
maxLimit = searchParameters.getLimit();
}
public SearchParameters getSearchParameters() {
return searchParameters;
}
public AtlasEntityType getEntityType() {
return entityType;
}
public AtlasClassificationType getClassificationType() {
return classificationType;
}
public Set<String> getIndexedKeys() { return indexedKeys; }
public int getIterationCount() {
return iterationCount;
}
public boolean getForceTerminate() {
return forceTerminate;
}
public void setForceTerminate(boolean forceTerminate) {
this.forceTerminate = forceTerminate;
}
public boolean hasProcessedTagAttribute(String attributeName) {
return tagAttrProcessedBySolr.contains(attributeName);
}
public boolean hasProcessedEntityAttribute(String attributeName) {
return entityAttrProcessedBySolr.contains(attributeName);
}
public Iterator<AtlasIndexQuery.Result> getIndexResultsIterator() {
return indexResultsIterator;
}
public void setIndexResultsIterator(Iterator<AtlasIndexQuery.Result> indexResultsIterator) {
this.indexResultsIterator = indexResultsIterator;
}
public Iterator<AtlasVertex> getGremlinResultIterator() {
return gremlinResultIterator;
}
public void setGremlinResultIterator(Iterator<AtlasVertex> gremlinResultIterator) {
this.gremlinResultIterator = gremlinResultIterator;
}
public boolean hasIndexResults() {
return null != indexResultsIterator && indexResultsIterator.hasNext();
}
public boolean hasGremlinResults() {
return null != gremlinResultIterator && gremlinResultIterator.hasNext();
}
public boolean isTagProcessingComplete() {
return CollectionUtils.isEmpty(tagSearchAttributes) ||
CollectionUtils.isEqualCollection(tagSearchAttributes, tagAttrProcessedBySolr);
}
public boolean isEntityProcessingComplete() {
return CollectionUtils.isEmpty(entitySearchAttributes) ||
CollectionUtils.isEqualCollection(entitySearchAttributes, entityAttrProcessedBySolr);
}
public boolean isProcessingComplete() {
return isTagProcessingComplete() && isEntityProcessingComplete();
}
public void incrementOffset(int increment) {
currentOffset += increment;
}
public void incrementSearchRound() {
iterationCount ++;
incrementOffset(searchParameters.getLimit());
}
public int getCurrentOffset() {
return currentOffset;
}
public boolean addTagSearchAttribute(String attribute) {
return tagSearchAttributes.add(attribute);
}
public boolean addProcessedTagAttribute(String attribute) {
return tagAttrProcessedBySolr.add(attribute);
}
public boolean addEntitySearchAttribute(String attribute) {
return tagSearchAttributes.add(attribute);
}
public boolean addProcessedEntityAttribute(String attribute) {
return entityAttrProcessedBySolr.add(attribute);
}
public void cacheGraphQuery(String name, AtlasGraphQuery graphQuery) {
cachedGraphQueries.put(name, graphQuery);
}
public void cacheIndexQuery(String name, AtlasIndexQuery indexQuery) {
cachedIndexQueries.put(name, indexQuery);
}
public AtlasIndexQuery getIndexQuery(String name){
return cachedIndexQueries.get(name);
}
public AtlasGraphQuery getGraphQuery(String name) {
return cachedGraphQueries.get(name);
}
public IndexResultType getIndexResultType() {
return indexResultType;
}
public void setIndexResultType(IndexResultType indexResultType) {
this.indexResultType = indexResultType;
}
public int getMaxLimit() {
return maxLimit;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("iterationCount", iterationCount)
.append("forceTerminate", forceTerminate)
.append("currentOffset", currentOffset)
.append("maxLimit", maxLimit)
.append("searchParameters", searchParameters)
.append("tagSearchAttributes", tagSearchAttributes)
.append("entitySearchAttributes", entitySearchAttributes)
.append("tagAttrProcessedBySolr", tagAttrProcessedBySolr)
.append("entityAttrProcessedBySolr", entityAttrProcessedBySolr)
.append("indexResultType", indexResultType)
.append("cachedIndexQueries", cachedIndexQueries)
.append("cachedGraphQueries", cachedGraphQueries)
.toString();
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.discovery;
import org.apache.atlas.discovery.SearchPipeline.IndexResultType;
import org.apache.atlas.discovery.SearchPipeline.PipelineContext;
import org.apache.atlas.discovery.SearchPipeline.PipelineStep;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.discovery.SearchParameters.Operator;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.type.*;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import static org.apache.atlas.model.discovery.SearchParameters.*;
@Component
public class SolrStep implements PipelineStep {
private static final Logger LOG = LoggerFactory.getLogger(SolrStep.class);
private static final Pattern STRAY_AND_PATTERN = Pattern.compile("(AND\\s+)+\\)");
private static final Pattern STRAY_OR_PATTERN = Pattern.compile("(OR\\s+)+\\)");
private static final Pattern STRAY_ELIPSIS_PATTERN = Pattern.compile("(\\(\\s*)\\)");
private static final String AND_STR = " AND ";
private static final String EMPTY_STRING = "";
private static final String SPACE_STRING = " ";
private static final String BRACE_OPEN_STR = "( ";
private static final String BRACE_CLOSE_STR = " )";
private static final Map<Operator, String> operatorMap = new HashMap<>();
static
{
operatorMap.put(Operator.LT,"v.\"%s\": [* TO %s}");
operatorMap.put(Operator.GT,"v.\"%s\": {%s TO *]");
operatorMap.put(Operator.LTE,"v.\"%s\": [* TO %s]");
operatorMap.put(Operator.GTE,"v.\"%s\": [%s TO *]");
operatorMap.put(Operator.EQ,"v.\"%s\": %s");
operatorMap.put(Operator.NEQ,"v.\"%s\": (NOT %s)");
operatorMap.put(Operator.IN, "v.\"%s\": (%s)");
operatorMap.put(Operator.LIKE, "v.\"%s\": (%s)");
operatorMap.put(Operator.STARTS_WITH, "v.\"%s\": (%s*)");
operatorMap.put(Operator.ENDS_WITH, "v.\"%s\": (*%s)");
operatorMap.put(Operator.CONTAINS, "v.\"%s\": (*%s*)");
}
private final AtlasGraph graph;
@Inject
public SolrStep(AtlasGraph graph) {
this.graph = graph;
}
@Override
public void execute(PipelineContext context) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> SolrStep.execute({})", context);
}
if (context == null) {
throw new AtlasBaseException("Can't start search without any context");
}
SearchParameters searchParameters = context.getSearchParameters();
final Iterator<AtlasIndexQuery.Result> result;
if (StringUtils.isNotEmpty(searchParameters.getQuery())) {
result = executeAgainstFulltextIndex(context);
} else {
result = executeAgainstVertexIndex(context);
}
context.setIndexResultsIterator(result);
if (LOG.isDebugEnabled()) {
LOG.debug("<== SolrStep.execute({})", context);
}
}
private Iterator<AtlasIndexQuery.Result> executeAgainstFulltextIndex(PipelineContext context) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> SolrStep.executeAgainstFulltextIndex()");
}
final Iterator<AtlasIndexQuery.Result> ret;
AtlasIndexQuery query = context.getIndexQuery("FULLTEXT");
if (query == null) {
// Compute only once
SearchParameters searchParameters = context.getSearchParameters();
String indexQuery = String.format("v.\"%s\":(%s)", Constants.ENTITY_TEXT_PROPERTY_KEY, searchParameters.getQuery());
query = graph.indexQuery(Constants.FULLTEXT_INDEX, indexQuery);
context.cacheIndexQuery("FULLTEXT", query);
}
context.setIndexResultType(IndexResultType.TEXT);
ret = query.vertices(context.getCurrentOffset(), context.getMaxLimit());
if (LOG.isDebugEnabled()) {
LOG.debug("<== SolrStep.executeAgainstFulltextIndex()");
}
return ret;
}
private Iterator<AtlasIndexQuery.Result> executeAgainstVertexIndex(PipelineContext context) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> SolrStep.executeAgainstVertexIndex()");
}
final Iterator<AtlasIndexQuery.Result> ret;
SearchParameters searchParameters = context.getSearchParameters();
AtlasIndexQuery query = context.getIndexQuery("VERTEX_INDEX");
if (query == null) {
StringBuilder solrQuery = new StringBuilder();
// If tag is specified then let's start processing using tag and it's attributes, entity filters will
// be pushed to Gremlin
if (context.getClassificationType() != null) {
context.setIndexResultType(IndexResultType.TAG);
constructTypeTestQuery(solrQuery, context.getClassificationType().getTypeAndAllSubTypes());
constructFilterQuery(solrQuery, context.getClassificationType(), searchParameters.getTagFilters(), context);
} else if (context.getEntityType() != null) {
context.setIndexResultType(IndexResultType.ENTITY);
constructTypeTestQuery(solrQuery, context.getEntityType().getTypeAndAllSubTypes());
constructFilterQuery(solrQuery, context.getEntityType(), searchParameters.getEntityFilters(), context);
// Set the status flag
if (searchParameters.getExcludeDeletedEntities()) {
if (solrQuery.length() > 0) {
solrQuery.append(" AND ");
}
solrQuery.append("v.\"__state\":").append("ACTIVE");
}
}
// No query was formed, doesn't make sense to do anything beyond this point
if (solrQuery.length() > 0) {
String validSolrQuery = STRAY_AND_PATTERN.matcher(solrQuery).replaceAll(")");
validSolrQuery = STRAY_OR_PATTERN.matcher(validSolrQuery).replaceAll(")");
validSolrQuery = STRAY_ELIPSIS_PATTERN.matcher(validSolrQuery).replaceAll(EMPTY_STRING);
query = graph.indexQuery(Constants.VERTEX_INDEX, validSolrQuery);
context.cacheIndexQuery("VERTEX_INDEX", query);
}
}
// Execute solr query and return the index results in the context
if (query != null) {
ret = query.vertices(context.getCurrentOffset(), context.getMaxLimit());
} else {
ret = null;
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== SolrStep.executeAgainstVertexIndex()");
}
return ret;
}
private void constructTypeTestQuery(StringBuilder solrQuery, Set<String> typeAndAllSubTypes) {
String typeAndSubtypesString = StringUtils.join(typeAndAllSubTypes, SPACE_STRING);
solrQuery.append("v.\"__typeName\": (")
.append(typeAndSubtypesString)
.append(")");
}
private void constructFilterQuery(StringBuilder solrQuery, AtlasStructType type, FilterCriteria filterCriteria, PipelineContext context) {
if (filterCriteria != null) {
LOG.debug("Processing Filters");
String filterQuery = toSolrQuery(type, filterCriteria, context);
if (StringUtils.isNotEmpty(filterQuery)) {
solrQuery.append(AND_STR).append(filterQuery);
}
}
}
private String toSolrQuery(AtlasStructType type, FilterCriteria criteria, PipelineContext context) {
return toSolrQuery(type, criteria, context, new StringBuilder());
}
private String toSolrQuery(AtlasStructType type, FilterCriteria criteria, PipelineContext context, StringBuilder sb) {
if (criteria.getCondition() != null && CollectionUtils.isNotEmpty(criteria.getCriterion())) {
StringBuilder nestedExpression = new StringBuilder();
for (FilterCriteria filterCriteria : criteria.getCriterion()) {
String nestedQuery = toSolrQuery(type, filterCriteria, context);
if (StringUtils.isNotEmpty(nestedQuery)) {
if (nestedExpression.length() > 0) {
nestedExpression.append(SPACE_STRING).append(criteria.getCondition()).append(SPACE_STRING);
}
nestedExpression.append(nestedQuery);
}
}
return nestedExpression.length() > 0 ? sb.append(BRACE_OPEN_STR).append(nestedExpression.toString()).append(BRACE_CLOSE_STR).toString() : EMPTY_STRING;
} else {
return toSolrExpression(type, criteria.getAttributeName(), criteria.getOperator(), criteria.getAttributeValue(), context);
}
}
private String toSolrExpression(AtlasStructType type, String attrName, Operator op, String attrVal, PipelineContext context) {
String ret = EMPTY_STRING;
try {
String indexKey = type.getQualifiedAttributeName(attrName);
AtlasType attributeType = type.getAttributeType(attrName);
switch (context.getIndexResultType()) {
case TAG:
context.addTagSearchAttribute(indexKey);
break;
case ENTITY:
context.addEntitySearchAttribute(indexKey);
break;
default:
// Do nothing
}
if (attributeType != null && AtlasTypeUtil.isBuiltInType(attributeType.getTypeName()) && context.getIndexedKeys().contains(indexKey)) {
if (operatorMap.get(op) != null) {
// If there's a chance of multi-value then we need some additional processing here
switch (context.getIndexResultType()) {
case TAG:
context.addProcessedTagAttribute(indexKey);
break;
case ENTITY:
context.addProcessedEntityAttribute(indexKey);
break;
}
ret = String.format(operatorMap.get(op), indexKey, attrVal);
}
}
} catch (AtlasBaseException ex) {
LOG.warn(ex.getMessage());
}
return ret;
}
}
......@@ -68,8 +68,10 @@ import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.*;
......@@ -96,7 +98,10 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
//allows injection of a dummy graph for testing
private IAtlasGraphProvider provider;
private boolean recomputeIndexedKeys = true;
private Set<String> vertexIndexKeys = new HashSet<>();
@Inject
public GraphBackedSearchIndexer(AtlasTypeRegistry typeRegistry) throws AtlasException {
this(new AtlasGraphProvider(), ApplicationProperties.get(), typeRegistry);
......@@ -130,6 +135,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
if (management.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)) {
LOG.info("Global indexes already exist for graph");
management.commit();
return;
}
......@@ -192,7 +198,6 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
throw new RepositoryException(t);
}
}
private void createFullTextIndex(AtlasGraphManagement management) {
AtlasPropertyKey fullText =
......@@ -247,6 +252,34 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
onAdd(dataTypes);
}
public Set<String> getVertexIndexKeys() {
if (recomputeIndexedKeys) {
AtlasGraphManagement management = null;
try {
management = provider.get().getManagementSystem();
} catch (RepositoryException excp) {
LOG.error("failed to get indexedKeys from graph", excp);
}
if (management != null) {
recomputeIndexedKeys = false;
AtlasGraphIndex vertexIndex = management.getGraphIndex(Constants.VERTEX_INDEX);
Set<String> indexKeys = new HashSet<>();
for (AtlasPropertyKey fieldKey : vertexIndex.getFieldKeys()) {
indexKeys.add(fieldKey.getName());
}
vertexIndexKeys = indexKeys;
}
}
return vertexIndexKeys;
}
private void addIndexForType(AtlasGraphManagement management, AtlasBaseTypeDef typeDef) {
if (typeDef instanceof AtlasEnumDef) {
// Only handle complex types like Struct, Classification and Entity
......@@ -577,6 +610,8 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
private void commit(AtlasGraphManagement management) throws IndexException {
try {
management.commit();
recomputeIndexedKeys = true;
} catch (Exception e) {
LOG.error("Index commit failed", e);
throw new IndexException("Index commit failed ", e);
......@@ -586,6 +621,8 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
private void rollback(AtlasGraphManagement management) throws IndexException {
try {
management.rollback();
recomputeIndexedKeys = true;
} catch (Exception e) {
LOG.error("Index rollback failed ", e);
throw new IndexException("Index rollback failed ", e);
......
......@@ -30,12 +30,14 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileNotFoundException;
@Component
public class ImportService {
private static final Logger LOG = LoggerFactory.getLogger(ImportService.class);
......@@ -46,6 +48,7 @@ public class ImportService {
private long startTimestamp;
private long endTimestamp;
@Inject
public ImportService(final AtlasTypeDefStore typeDefStore, final AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
this.typeDefStore = typeDefStore;
this.entityStore = entityStore;
......
......@@ -39,7 +39,10 @@ import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
/**
* Utility methods for Graph.
......
......@@ -46,10 +46,12 @@ import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_BIGDECIMAL;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_BIGINTEGER;
......@@ -123,7 +125,7 @@ public final class EntityGraphRetriever {
}
public AtlasEntityHeader toAtlasEntityHeader(AtlasVertex entityVertex) throws AtlasBaseException {
return entityVertex != null ? mapVertexToAtlasEntityHeader(entityVertex) : null;
return toAtlasEntityHeader(entityVertex, Collections.<String>emptySet());
}
private AtlasVertex getEntityVertex(String guid) throws AtlasBaseException {
......@@ -185,6 +187,10 @@ public final class EntityGraphRetriever {
}
private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex) throws AtlasBaseException {
return mapVertexToAtlasEntityHeader(entityVertex, Collections.<String>emptySet());
}
private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex, Set<String> attributes) throws AtlasBaseException {
AtlasEntityHeader ret = new AtlasEntityHeader();
String typeName = entityVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class);
......@@ -218,6 +224,20 @@ public final class EntityGraphRetriever {
if (displayText != null) {
ret.setDisplayText(displayText.toString());
}
if (CollectionUtils.isNotEmpty(attributes)) {
for (String attrName : attributes) {
if (ret.hasAttribute(attrName)) {
continue;
}
Object attrValue = getVertexAttribute(entityVertex, entityType.getAttribute(attrName));
if (attrValue != null) {
ret.setAttribute(attrName, attrValue);
}
}
}
}
return ret;
......@@ -556,4 +576,8 @@ public final class EntityGraphRetriever {
private Object getVertexAttribute(AtlasVertex vertex, AtlasAttribute attribute) throws AtlasBaseException {
return vertex != null && attribute != null ? mapVertexToAttribute(vertex, attribute, null) : null;
}
public AtlasEntityHeader toAtlasEntityHeader(AtlasVertex atlasVertex, Set<String> attributes) throws AtlasBaseException {
return atlasVertex != null ? mapVertexToAtlasEntityHeader(atlasVertex, attributes) : null;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.util;
import org.apache.atlas.annotation.AtlasService;
import org.apache.atlas.discovery.SearchPipeline.PipelineContext;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@AtlasService
public class SearchTracker {
private Map<String, PipelineContext> activeSearches = new HashMap<>();
/**
*
* @param context
*/
public String add(PipelineContext context) {
String searchId = Thread.currentThread().getName();
activeSearches.put(searchId, context);
return searchId;
}
/**
*
* @param searchId
* @return
*/
public PipelineContext terminate(String searchId) {
PipelineContext ret = null;
if (activeSearches.containsKey(searchId)) {
PipelineContext pipelineToTerminate = activeSearches.remove(searchId);
pipelineToTerminate.setForceTerminate(true);
ret = pipelineToTerminate;
}
return ret;
}
public PipelineContext remove(String id) {
return activeSearches.remove(id);
}
/**
*
* @return
*/
public Set<String> getActiveSearches() {
return activeSearches.keySet();
}
}
......@@ -17,19 +17,14 @@
*/
package org.apache.atlas;
import com.google.inject.AbstractModule;
import com.google.inject.Binder;
import com.google.inject.Provider;
import com.google.inject.Singleton;
import com.google.inject.matcher.Matchers;
import com.google.inject.multibindings.Multibinder;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.discovery.AtlasDiscoveryService;
import org.apache.atlas.discovery.AtlasLineageService;
import org.apache.atlas.discovery.DataSetLineageService;
import org.apache.atlas.discovery.DiscoveryService;
import org.apache.atlas.discovery.EntityDiscoveryService;
import org.apache.atlas.discovery.EntityLineageService;
import org.apache.atlas.discovery.LineageService;
import org.apache.atlas.discovery.*;
import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
import org.apache.atlas.graph.GraphSandboxUtil;
import org.apache.atlas.listener.EntityChangeListener;
......@@ -61,6 +56,7 @@ import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.typesystem.types.cache.TypeCache;
import org.apache.atlas.util.AtlasRepositoryConfiguration;
import org.apache.atlas.util.SearchTracker;
import org.apache.commons.configuration.Configuration;
import org.mockito.Mockito;
import org.slf4j.Logger;
......@@ -76,7 +72,7 @@ public class TestModules {
}
// Test only DI modules
public static class TestOnlyModule extends com.google.inject.AbstractModule {
public static class TestOnlyModule extends AbstractModule {
private static final Logger LOG = LoggerFactory.getLogger(TestOnlyModule.class);
......@@ -147,6 +143,11 @@ public class TestModules {
typeDefChangeListenerMultibinder.addBinding().to(DefaultMetadataService.class);
typeDefChangeListenerMultibinder.addBinding().to(GraphBackedSearchIndexer.class).asEagerSingleton();
bind(SearchPipeline.class).asEagerSingleton();
bind(SearchTracker.class).asEagerSingleton();
bind(SolrStep.class).asEagerSingleton();
bind(GremlinStep.class).asEagerSingleton();
bind(AtlasEntityStore.class).to(AtlasEntityStoreV1.class);
bind(AtlasRelationshipStore.class).to(AtlasRelationshipStoreV1.class);
......
......@@ -17,6 +17,7 @@
*/
package org.apache.atlas.services;
import org.apache.atlas.TestModules;
import org.apache.atlas.discovery.EntityDiscoveryService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.typedef.AtlasEntityDef;
......@@ -24,12 +25,16 @@ import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.lang.StringUtils;
import org.powermock.reflect.Whitebox;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@Guice(modules = TestModules.TestOnlyModule.class)
public class EntityDiscoveryServiceTest {
private final String TEST_TYPE = "test";
......@@ -47,6 +52,9 @@ public class EntityDiscoveryServiceTest {
private final int maxTypesCountInIdxQuery = 10;
@Inject
EntityDiscoveryService discoveryService;
@BeforeClass
public void init() throws AtlasBaseException {
......
......@@ -50,7 +50,7 @@ public class StaleTransactionCleanupFilter implements Filter {
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain filterChain)
throws IOException, ServletException {
LOG.info("Cleaning stale transactions");
LOG.debug("Cleaning stale transactions");
AtlasGraphProvider.getGraphInstance().rollback();
filterChain.doFilter(request, response);
}
......
......@@ -25,6 +25,7 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.authorize.AtlasActionTypes;
import org.apache.atlas.authorize.AtlasResourceTypes;
import org.apache.atlas.authorize.simple.AtlasAuthorizationUtils;
import org.apache.atlas.discovery.SearchPipeline;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult;
......@@ -35,11 +36,9 @@ import org.apache.atlas.repository.impexp.ExportService;
import org.apache.atlas.repository.impexp.ImportService;
import org.apache.atlas.repository.impexp.ZipSink;
import org.apache.atlas.repository.impexp.ZipSource;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.services.MetricsService;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.SearchTracker;
import org.apache.atlas.web.filters.AtlasCSRFPreventionFilter;
import org.apache.atlas.web.service.ServiceState;
import org.apache.atlas.web.util.Servlets;
......@@ -51,7 +50,6 @@ import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.context.SecurityContextHolder;
......@@ -62,9 +60,11 @@ import javax.inject.Singleton;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
......@@ -109,14 +109,10 @@ public class AdminResource {
private final ServiceState serviceState;
private final MetricsService metricsService;
private final AtlasTypeRegistry typeRegistry;
private final AtlasTypeDefStore typesDefStore;
private final AtlasEntityStore entityStore;
private static Configuration atlasProperties;
private final ExportService exportService;
@Inject
ApplicationContext applicationContext;
private final ImportService importService;
private final SearchTracker activeSearches;
static {
try {
......@@ -128,15 +124,13 @@ public class AdminResource {
@Inject
public AdminResource(ServiceState serviceState, MetricsService metricsService,
AtlasTypeRegistry typeRegistry, AtlasTypeDefStore typeDefStore,
AtlasEntityStore entityStore, ExportService exportService) {
ExportService exportService, ImportService importService, SearchTracker activeSearches) {
this.serviceState = serviceState;
this.metricsService = metricsService;
this.typeRegistry = typeRegistry;
this.typesDefStore = typeDefStore;
this.entityStore = entityStore;
this.exportService = exportService;
this.importExportOperationLock = new ReentrantLock();
this.importService = importService;
this.activeSearches = activeSearches;
importExportOperationLock = new ReentrantLock();
}
/**
......@@ -377,7 +371,6 @@ public class AdminResource {
try {
AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class);
ImportService importService = new ImportService(this.typesDefStore, this.entityStore, this.typeRegistry);
ZipSource zipSource = new ZipSource(inputStream);
result = importService.run(zipSource, request, Servlets.getUserName(httpServletRequest),
......@@ -412,7 +405,6 @@ public class AdminResource {
try {
AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class);
ImportService importService = new ImportService(this.typesDefStore, this.entityStore, this.typeRegistry);
result = importService.run(request, Servlets.getUserName(httpServletRequest),
Servlets.getHostName(httpServletRequest),
AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest));
......@@ -431,6 +423,21 @@ public class AdminResource {
return result;
}
@GET
@Path("activeSearches")
@Produces(Servlets.JSON_MEDIA_TYPE)
public Set<String> getActiveSearches() {
return activeSearches.getActiveSearches();
}
@DELETE
@Path("activeSearches/{id}")
@Produces(Servlets.JSON_MEDIA_TYPE)
public boolean terminateActiveSearch(@PathParam("id") String searchId) {
SearchPipeline.PipelineContext terminate = activeSearches.terminate(searchId);
return null != terminate;
}
private String getEditableEntityTypes(Configuration config) {
String ret = DEFAULT_EDITABLE_ENTITY_TYPES;
......
......@@ -21,8 +21,10 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.discovery.AtlasDiscoveryService;
import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.web.util.Servlets;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Service;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
......@@ -31,6 +33,7 @@ import javax.inject.Inject;
import javax.inject.Singleton;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
......@@ -213,6 +216,50 @@ public class DiscoveryREST {
}
}
/**
* Attribute based search for entities satisfying the search parameters
* @param parameters Search parameters
* @return Atlas search result
* @throws AtlasBaseException
*
* @HTTP 200 On successful search
* @HTTP 400 Tag/Entity doesn't exist or Tag/entity filter is present without tag/type name
*/
@Path("basic")
@POST
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE)
public AtlasSearchResult searchWithParameters(SearchParameters parameters) throws AtlasBaseException {
AtlasPerfTracer perf = null;
try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DiscoveryREST.searchWithParameters("+ parameters + ")");
}
if (parameters.getLimit() < 0 || parameters.getOffset() < 0) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Limit/offset should be non-negative");
}
if (StringUtils.isEmpty(parameters.getTypeName()) && !isEmpty(parameters.getEntityFilters())) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "EntityFilters specified without Type name");
}
if (StringUtils.isEmpty(parameters.getClassification()) && !isEmpty(parameters.getTagFilters())) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "TagFilters specified without tag name");
}
return atlasDiscoveryService.searchUsingBasicQuery(parameters);
} finally {
AtlasPerfTracer.log(perf);
}
}
private boolean isEmpty(SearchParameters.FilterCriteria filterCriteria) {
return filterCriteria == null ||
(StringUtils.isEmpty(filterCriteria.getAttributeName()) && CollectionUtils.isEmpty(filterCriteria.getCriterion()));
}
private String escapeTypeName(String typeName) {
String ret;
......
......@@ -48,7 +48,7 @@ public class AdminResourceTest {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null);
Response response = adminResource.getStatus();
assertEquals(response.getStatus(), HttpServletResponse.SC_OK);
JSONObject entity = (JSONObject) response.getEntity();
......@@ -59,7 +59,7 @@ public class AdminResourceTest {
public void testResourceGetsValueFromServiceState() throws JSONException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null);
Response response = adminResource.getStatus();
verify(serviceState).getState();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment