Commit bcec42e3 by apoorvnaik Committed by Madhan Neethiraj

ATLAS-1947: AtlasSearchResult to include referredEntity headers

parent 0d8f9f8d
......@@ -31,6 +31,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE;
......@@ -48,6 +49,7 @@ public class AtlasSearchResult implements Serializable {
private List<AtlasEntityHeader> entities;
private AttributeSearchResult attributes;
private List<AtlasFullTextResult> fullTextResult;
private Map<String, AtlasEntityHeader> referredEntities;
public AtlasSearchResult() {}
......@@ -62,6 +64,7 @@ public class AtlasSearchResult implements Serializable {
setEntities(null);
setAttributes(null);
setFullTextResult(null);
setReferredEntities(null);
}
public AtlasSearchResult(SearchParameters searchParameters) {
......@@ -73,6 +76,7 @@ public class AtlasSearchResult implements Serializable {
setEntities(null);
setAttributes(null);
setFullTextResult(null);
setReferredEntities(null);
}
}
......@@ -80,6 +84,14 @@ public class AtlasSearchResult implements Serializable {
public void setQueryType(AtlasQueryType queryType) { this.queryType = queryType; }
public SearchParameters getSearchParameters() {
return searchParameters;
}
public void setSearchParameters(SearchParameters searchParameters) {
this.searchParameters = searchParameters;
}
public String getQueryText() { return queryText; }
public void setQueryText(String queryText) { this.queryText = queryText; }
......@@ -104,6 +116,17 @@ public class AtlasSearchResult implements Serializable {
public void setFullTextResult(List<AtlasFullTextResult> fullTextResult) { this.fullTextResult = fullTextResult; }
public Map<String, AtlasEntityHeader> getReferredEntities() {
return referredEntities;
}
public void setReferredEntities(Map<String, AtlasEntityHeader> referredEntities) {
this.referredEntities = referredEntities;
}
@Override
public int hashCode() { return Objects.hash(queryType, searchParameters, queryText, type, classification, entities, attributes, fullTextResult, referredEntities); }
@Override
public boolean equals(Object o) {
if (this == o) return true;
......@@ -116,24 +139,8 @@ public class AtlasSearchResult implements Serializable {
Objects.equals(classification, that.classification) &&
Objects.equals(entities, that.entities) &&
Objects.equals(attributes, that.attributes) &&
Objects.equals(fullTextResult, that.fullTextResult);
}
@Override
public int hashCode() { return Objects.hash(queryType, searchParameters, queryText, type, classification, entities, attributes, fullTextResult); }
@Override
public String toString() {
return "AtlasSearchResult{" +
"queryType=" + queryType +
", searchParameters='" + searchParameters + '\'' +
", queryText='" + queryText + '\'' +
", type=" + type +
", classification=" + classification +
", entities=" + entities +
", attributes=" + attributes +
", fullTextResult=" + fullTextResult +
'}';
Objects.equals(fullTextResult, that.fullTextResult) &&
Objects.equals(referredEntities, that.referredEntities);
}
public void addEntity(AtlasEntityHeader newEntity) {
......@@ -163,12 +170,19 @@ public class AtlasSearchResult implements Serializable {
}
}
public void setSearchParameters(SearchParameters searchParameters) {
this.searchParameters = searchParameters;
}
public SearchParameters getSearchParameters() {
return searchParameters;
@Override
public String toString() {
return "AtlasSearchResult{" +
"queryType=" + queryType +
", searchParameters='" + searchParameters + '\'' +
", queryText='" + queryText + '\'' +
", type=" + type +
", classification=" + classification +
", entities=" + entities +
", attributes=" + attributes +
", fullTextResult=" + fullTextResult +
", referredEntities=" + referredEntities +
'}';
}
public enum AtlasQueryType { DSL, FULL_TEXT, GREMLIN, BASIC, ATTRIBUTE }
......
......@@ -207,9 +207,12 @@ public class SearchParameters {
return Objects.hash(query, typeName, classification, excludeDeletedEntities, limit, offset, entityFilters, tagFilters, attributes);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("SearchParameters{");
public StringBuilder toString(StringBuilder sb) {
if (sb == null) {
sb = new StringBuilder();
}
sb.append('{');
sb.append("query='").append(query).append('\'');
sb.append(", typeName='").append(typeName).append('\'');
sb.append(", classification='").append(classification).append('\'');
......@@ -220,7 +223,13 @@ public class SearchParameters {
sb.append(", tagFilters=").append(tagFilters);
sb.append(", attributes=").append(attributes);
sb.append('}');
return sb.toString();
return sb;
}
@Override
public String toString() {
return toString(new StringBuilder()).toString();
}
......@@ -297,16 +306,25 @@ public class SearchParameters {
return Objects.hash(attributeName, operator, attributeValue, condition, criterion);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("FilterCriteria{");
public StringBuilder toString(StringBuilder sb) {
if (sb == null) {
sb = new StringBuilder();
}
sb.append('{');
sb.append("attributeName='").append(attributeName).append('\'');
sb.append(", operator=").append(operator);
sb.append(", attributeValue='").append(attributeValue).append('\'');
sb.append(", condition=").append(condition);
sb.append(", criterion=").append(criterion);
sb.append('}');
return sb.toString();
return sb;
}
@Override
public String toString() {
return toString(new StringBuilder()).toString();
}
}
......
......@@ -64,5 +64,5 @@ public interface AtlasDiscoveryService {
* @return Matching entities
* @throws AtlasBaseException
*/
AtlasSearchResult searchUsingBasicQuery(SearchParameters searchParameters) throws AtlasBaseException;
AtlasSearchResult searchWithParameters(SearchParameters searchParameters) throws AtlasBaseException;
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.discovery;
import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.*;
import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class ClassificationSearchProcessor extends SearchProcessor {
private static final Logger LOG = LoggerFactory.getLogger(ClassificationSearchProcessor.class);
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("ClassificationSearchProcessor");
private final AtlasIndexQuery indexQuery;
private final AtlasGraphQuery allGraphQuery;
private final AtlasGraphQuery filterGraphQuery;
public ClassificationSearchProcessor(SearchContext context) {
super(context);
AtlasClassificationType classificationType = context.getClassificationType();
FilterCriteria filterCriteria = context.getSearchParameters().getTagFilters();
Set<String> typeAndSubTypes = classificationType.getTypeAndAllSubTypes();
Set<String> solrAttributes = new HashSet<>();
Set<String> gremlinAttributes = new HashSet<>();
Set<String> allAttributes = new HashSet<>();
processSearchAttributes(classificationType, filterCriteria, solrAttributes, gremlinAttributes, allAttributes);
// for classification search, if any attribute can't be handled by Solr - switch to all Gremlin
boolean useSolrSearch = typeAndSubTypes.size() <= MAX_CLASSIFICATION_TYPES_IN_INDEX_QUERY && CollectionUtils.isEmpty(gremlinAttributes) && canApplySolrFilter(classificationType, filterCriteria, false);
if (useSolrSearch) {
StringBuilder solrQuery = new StringBuilder();
constructTypeTestQuery(solrQuery, typeAndSubTypes);
constructFilterQuery(solrQuery, classificationType, filterCriteria, solrAttributes);
String solrQueryString = STRAY_AND_PATTERN.matcher(solrQuery).replaceAll(")");
solrQueryString = STRAY_OR_PATTERN.matcher(solrQueryString).replaceAll(")");
solrQueryString = STRAY_ELIPSIS_PATTERN.matcher(solrQueryString).replaceAll("");
indexQuery = context.getGraph().indexQuery(Constants.VERTEX_INDEX, solrQueryString);
} else {
indexQuery = null;
}
AtlasGraphQuery query = context.getGraph().query().in(Constants.TYPE_NAME_PROPERTY_KEY, typeAndSubTypes);
allGraphQuery = toGremlinFilterQuery(classificationType, filterCriteria, allAttributes, query);
query = context.getGraph().query().in(Constants.TRAIT_NAMES_PROPERTY_KEY, typeAndSubTypes);
filterGraphQuery = query; // TODO: filer based on tag attributes
}
@Override
public List<AtlasVertex> execute() {
if (LOG.isDebugEnabled()) {
LOG.debug("==> ClassificationSearchProcessor.execute({})", context);
}
List<AtlasVertex> ret = new ArrayList<>();
AtlasPerfTracer perf = null;
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "ClassificationSearchProcessor.execute(" + context + ")");
}
try {
int qryOffset = (nextProcessor == null) ? context.getSearchParameters().getOffset() : 0;
int limit = context.getSearchParameters().getLimit();
int resultIdx = qryOffset;
Set<String> processedGuids = new HashSet<>();
while (ret.size() < limit) {
if (context.terminateSearch()) {
LOG.warn("query terminated: {}", context.getSearchParameters());
break;
}
List<AtlasVertex> classificationVertices;
if (indexQuery != null) {
Iterator<AtlasIndexQuery.Result> queryResult = indexQuery.vertices(qryOffset, limit);
if (!queryResult.hasNext()) { // no more results from solr - end of search
break;
}
classificationVertices = getVerticesFromIndexQueryResult(queryResult);
} else {
Iterator<AtlasVertex> queryResult = allGraphQuery.vertices(qryOffset, limit).iterator();
if (!queryResult.hasNext()) { // no more results - end of search
break;
}
classificationVertices = getVertices(queryResult);
}
qryOffset += limit;
List<AtlasVertex> entityVertices = new ArrayList<>();
for (AtlasVertex classificationVertex : classificationVertices) {
Iterable<AtlasEdge> edges = classificationVertex.getEdges(AtlasEdgeDirection.IN);
for (AtlasEdge edge : edges) {
AtlasVertex entityVertex = edge.getOutVertex();
String guid = AtlasGraphUtilsV1.getIdFromVertex(entityVertex);
if (!processedGuids.contains(guid)) {
if (!context.getSearchParameters().getExcludeDeletedEntities() || AtlasGraphUtilsV1.getState(entityVertex) == AtlasEntity.Status.ACTIVE) {
entityVertices.add(entityVertex);
}
processedGuids.add(guid);
}
}
}
entityVertices = super.filter(entityVertices);
for (AtlasVertex entityVertex : entityVertices) {
resultIdx++;
if (resultIdx < context.getSearchParameters().getOffset()) {
continue;
}
ret.add(entityVertex);
if (ret.size() == limit) {
break;
}
}
}
} finally {
AtlasPerfTracer.log(perf);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== ClassificationSearchProcessor.execute({}): ret.size()={}", context, ret.size());
}
return ret;
}
@Override
public List<AtlasVertex> filter(List<AtlasVertex> entityVertices) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> ClassificationSearchProcessor.filter({})", entityVertices.size());
}
AtlasGraphQuery query = context.getGraph().query().in(Constants.GUID_PROPERTY_KEY, getGuids(entityVertices));
query.addConditionsFrom(filterGraphQuery);
List<AtlasVertex> ret = getVertices(query.vertices().iterator());
ret = super.filter(ret);
if (LOG.isDebugEnabled()) {
LOG.debug("<== ClassificationSearchProcessor.filter({}): ret.size()={}", entityVertices.size(), ret.size());
}
return ret;
}
}
......@@ -20,6 +20,7 @@ package org.apache.atlas.discovery;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy;
import org.apache.atlas.exception.AtlasBaseException;
......@@ -29,8 +30,8 @@ import org.apache.atlas.model.discovery.AtlasSearchResult.AtlasQueryType;
import org.apache.atlas.model.discovery.AtlasSearchResult.AttributeSearchResult;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.AtlasException;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.query.Expressions.AliasExpression;
import org.apache.atlas.query.Expressions.Expression;
import org.apache.atlas.query.Expressions.SelectExpression;
......@@ -42,16 +43,16 @@ import org.apache.atlas.query.QueryProcessor;
import org.apache.atlas.query.SelectExpressionHelper;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.*;
import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.AtlasGremlinQueryProvider;
import org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery;
import org.apache.commons.collections.CollectionUtils;
......@@ -67,13 +68,7 @@ import scala.util.parsing.combinator.Parsers.NoSuccess;
import javax.inject.Inject;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import static org.apache.atlas.AtlasErrorCode.CLASSIFICATION_NOT_FOUND;
import static org.apache.atlas.AtlasErrorCode.DISCOVERY_QUERY_FAILED;
......@@ -88,21 +83,20 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
private final EntityGraphRetriever entityRetriever;
private final AtlasGremlinQueryProvider gremlinQueryProvider;
private final AtlasTypeRegistry typeRegistry;
private final SearchPipeline searchPipeline;
private final GraphBackedSearchIndexer indexer;
private final int maxResultSetSize;
private final int maxTypesCountInIdxQuery;
private final int maxTagsCountInIdxQuery;
@Inject
EntityDiscoveryService(MetadataRepository metadataRepository, AtlasTypeRegistry typeRegistry,
AtlasGraph graph, SearchPipeline searchPipeline) throws AtlasException {
AtlasGraph graph, GraphBackedSearchIndexer indexer) throws AtlasException {
this.graph = graph;
this.graphPersistenceStrategy = new DefaultGraphPersistenceStrategy(metadataRepository);
this.entityRetriever = new EntityGraphRetriever(typeRegistry);
this.indexer = indexer;
this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE;
this.typeRegistry = typeRegistry;
this.searchPipeline = searchPipeline;
this.maxResultSetSize = ApplicationProperties.get().getInt(Constants.INDEX_SEARCH_MAX_RESULT_SET_SIZE, 150);
this.maxTypesCountInIdxQuery = ApplicationProperties.get().getInt(Constants.INDEX_SEARCH_MAX_TYPES_COUNT, 10);
this.maxTagsCountInIdxQuery = ApplicationProperties.get().getInt(Constants.INDEX_SEARCH_MAX_TAGS_COUNT, 10);
......@@ -404,20 +398,85 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
@Override
@GraphTransaction
public AtlasSearchResult searchUsingBasicQuery(SearchParameters searchParameters) throws AtlasBaseException {
public AtlasSearchResult searchWithParameters(SearchParameters searchParameters) throws AtlasBaseException {
AtlasSearchResult ret = new AtlasSearchResult(searchParameters);
List<AtlasVertex> resultList = searchPipeline.run(searchParameters);
SearchContext context = new SearchContext(searchParameters, typeRegistry, graph, indexer.getVertexIndexKeys());
List<AtlasVertex> resultList = context.getSearchProcessor().execute();
// By default any attribute that shows up in the search parameter should be sent back in the response
// If additional values are requested then the entityAttributes will be a superset of the all search attributes
// and the explicitly requested attribute(s)
Set<String> resultAttributes = new HashSet<>();
Set<String> entityAttributes = new HashSet<>();
if (CollectionUtils.isNotEmpty(searchParameters.getAttributes())) {
resultAttributes.addAll(searchParameters.getAttributes());
}
for (String resultAttribute : resultAttributes) {
AtlasAttribute attribute = context.getEntityType().getAttribute(resultAttribute);
if (attribute != null) {
AtlasType attributeType = attribute.getAttributeType();
if (attributeType instanceof AtlasArrayType) {
attributeType = ((AtlasArrayType) attributeType).getElementType();
}
if (attributeType instanceof AtlasEntityType || attributeType instanceof AtlasObjectIdType) {
entityAttributes.add(resultAttribute);
}
}
}
for (AtlasVertex atlasVertex : resultList) {
AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader(atlasVertex, searchParameters.getAttributes());
AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader(atlasVertex, resultAttributes);
ret.addEntity(entity);
// populate ret.referredEntities
for (String entityAttribute : entityAttributes) {
Object attrValue = entity.getAttribute(entityAttribute);
if (attrValue instanceof AtlasObjectId) {
AtlasObjectId objId = (AtlasObjectId)attrValue;
if (ret.getReferredEntities() == null) {
ret.setReferredEntities(new HashMap<String, AtlasEntityHeader>());
}
if (!ret.getReferredEntities().containsKey(objId.getGuid())) {
ret.getReferredEntities().put(objId.getGuid(), entityRetriever.toAtlasEntityHeader(objId.getGuid()));
}
} else if (attrValue instanceof Collection) {
Collection objIds = (Collection)attrValue;
for (Object obj : objIds) {
if (obj instanceof AtlasObjectId) {
AtlasObjectId objId = (AtlasObjectId)obj;
if (ret.getReferredEntities() == null) {
ret.setReferredEntities(new HashMap<String, AtlasEntityHeader>());
}
if (!ret.getReferredEntities().containsKey(objId.getGuid())) {
ret.getReferredEntities().put(objId.getGuid(), entityRetriever.toAtlasEntityHeader(objId.getGuid()));
}
}
}
}
}
}
return ret;
}
public int getMaxResultSetSize() {
return maxResultSetSize;
}
private String getQueryForFullTextSearch(String userKeyedString, String typeName, String classification) {
String typeFilter = getTypeFilter(typeRegistry, typeName, maxTypesCountInIdxQuery);
String classficationFilter = getClassificationFilter(typeRegistry, classification, maxTagsCountInIdxQuery);
......@@ -447,28 +506,6 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
return String.format("v.\"%s\":(%s)", Constants.ENTITY_TEXT_PROPERTY_KEY, queryText.toString());
}
private static String getClassificationFilter(AtlasTypeRegistry typeRegistry, String classificationName, int maxTypesCountInIdxQuery) {
AtlasClassificationType classification = typeRegistry.getClassificationTypeByName(classificationName);
Set<String> typeAndSubTypes = classification != null ? classification.getTypeAndAllSubTypes() : null;
if(CollectionUtils.isNotEmpty(typeAndSubTypes) && typeAndSubTypes.size() <= maxTypesCountInIdxQuery) {
return String.format("(%s)", StringUtils.join(typeAndSubTypes, " "));
}
return "";
}
private static String getTypeFilter(AtlasTypeRegistry typeRegistry, String typeName, int maxTypesCountInIdxQuery) {
AtlasEntityType type = typeRegistry.getEntityTypeByName(typeName);
Set<String> typeAndSubTypes = type != null ? type.getTypeAndAllSubTypes() : null;
if(CollectionUtils.isNotEmpty(typeAndSubTypes) && typeAndSubTypes.size() <= maxTypesCountInIdxQuery) {
return String.format("(%s)", StringUtils.join(typeAndSubTypes, " "));
}
return "";
}
private List<AtlasFullTextResult> getIndexQueryResults(AtlasIndexQuery query, QueryParams params, boolean excludeDeletedEntities) throws AtlasBaseException {
List<AtlasFullTextResult> ret = new ArrayList<>();
Iterator<Result> iter = query.vertices();
......@@ -570,8 +607,25 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
return excludeDeletedEntities && GraphHelper.getStatus(vertex) == Status.DELETED;
}
public int getMaxResultSetSize() {
return maxResultSetSize;
private static String getClassificationFilter(AtlasTypeRegistry typeRegistry, String classificationName, int maxTypesCountInIdxQuery) {
AtlasClassificationType classification = typeRegistry.getClassificationTypeByName(classificationName);
Set<String> typeAndSubTypes = classification != null ? classification.getTypeAndAllSubTypes() : null;
if(CollectionUtils.isNotEmpty(typeAndSubTypes) && typeAndSubTypes.size() <= maxTypesCountInIdxQuery) {
return String.format("(%s)", StringUtils.join(typeAndSubTypes, " "));
}
return "";
}
private static String getTypeFilter(AtlasTypeRegistry typeRegistry, String typeName, int maxTypesCountInIdxQuery) {
AtlasEntityType type = typeRegistry.getEntityTypeByName(typeName);
Set<String> typeAndSubTypes = type != null ? type.getTypeAndAllSubTypes() : null;
if(CollectionUtils.isNotEmpty(typeAndSubTypes) && typeAndSubTypes.size() <= maxTypesCountInIdxQuery) {
return String.format("(%s)", StringUtils.join(typeAndSubTypes, " "));
}
return "";
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.discovery;
import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.*;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class EntitySearchProcessor extends SearchProcessor {
private static final Logger LOG = LoggerFactory.getLogger(EntitySearchProcessor.class);
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("EntitySearchProcessor");
private final AtlasIndexQuery indexQuery;
private final AtlasGraphQuery partialGraphQuery;
private final AtlasGraphQuery allGraphQuery;
public EntitySearchProcessor(SearchContext context) {
super(context);
AtlasEntityType entityType = context.getEntityType();
AtlasClassificationType classificationType = context.getClassificationType();
FilterCriteria filterCriteria = context.getSearchParameters().getEntityFilters();
Set<String> typeAndSubTypes = entityType.getTypeAndAllSubTypes();
Set<String> solrAttributes = new HashSet<>();
Set<String> gremlinAttributes = new HashSet<>();
Set<String> allAttributes = new HashSet<>();
processSearchAttributes(entityType, filterCriteria, solrAttributes, gremlinAttributes, allAttributes);
boolean useSolrSearch = typeAndSubTypes.size() <= MAX_ENTITY_TYPES_IN_INDEX_QUERY && canApplySolrFilter(entityType, filterCriteria, false);
if (useSolrSearch) {
StringBuilder solrQuery = new StringBuilder();
constructTypeTestQuery(solrQuery, typeAndSubTypes);
constructFilterQuery(solrQuery, entityType, filterCriteria, solrAttributes);
String solrQueryString = STRAY_AND_PATTERN.matcher(solrQuery).replaceAll(")");
solrQueryString = STRAY_OR_PATTERN.matcher(solrQueryString).replaceAll(")");
solrQueryString = STRAY_ELIPSIS_PATTERN.matcher(solrQueryString).replaceAll("");
indexQuery = context.getGraph().indexQuery(Constants.VERTEX_INDEX, solrQueryString);
if (CollectionUtils.isNotEmpty(gremlinAttributes) || classificationType != null) {
AtlasGraphQuery query = context.getGraph().query();
addClassificationNameConditionIfNecessary(query);
partialGraphQuery = toGremlinFilterQuery(entityType, filterCriteria, gremlinAttributes, query);
} else {
partialGraphQuery = null;
}
} else {
indexQuery = null;
partialGraphQuery = null;
}
AtlasGraphQuery query = context.getGraph().query().in(Constants.TYPE_NAME_PROPERTY_KEY, typeAndSubTypes);
addClassificationNameConditionIfNecessary(query);
allGraphQuery = toGremlinFilterQuery(entityType, filterCriteria, allAttributes, query);
if (context.getSearchParameters().getExcludeDeletedEntities()) {
allGraphQuery.has(Constants.STATE_PROPERTY_KEY, "ACTIVE");
}
}
@Override
public List<AtlasVertex> execute() {
if (LOG.isDebugEnabled()) {
LOG.debug("==> EntitySearchProcessor.execute({})", context);
}
List<AtlasVertex> ret = new ArrayList<>();
AtlasPerfTracer perf = null;
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntitySearchProcessor.execute(" + context + ")");
}
try {
int qryOffset = (nextProcessor == null) ? context.getSearchParameters().getOffset() : 0;
int limit = context.getSearchParameters().getLimit();
int resultIdx = qryOffset;
while (ret.size() < limit) {
if (context.terminateSearch()) {
LOG.warn("query terminated: {}", context.getSearchParameters());
break;
}
List<AtlasVertex> vertices;
if (indexQuery != null) {
Iterator<AtlasIndexQuery.Result> queryResult = indexQuery.vertices(qryOffset, limit);
if (!queryResult.hasNext()) { // no more results from solr - end of search
break;
}
vertices = getVerticesFromIndexQueryResult(queryResult);
if (partialGraphQuery != null) {
AtlasGraphQuery guidQuery = context.getGraph().query().in(Constants.GUID_PROPERTY_KEY, getGuids(vertices));
guidQuery.addConditionsFrom(partialGraphQuery);
vertices = getVertices(guidQuery.vertices().iterator());
}
} else {
Iterator<AtlasVertex> queryResult = allGraphQuery.vertices(qryOffset, limit).iterator();
if (!queryResult.hasNext()) { // no more results from query - end of search
break;
}
vertices = getVertices(queryResult);
}
qryOffset += limit;
vertices = super.filter(vertices);
for (AtlasVertex vertex : vertices) {
resultIdx++;
if (resultIdx < context.getSearchParameters().getOffset()) {
continue;
}
ret.add(vertex);
if (ret.size() == limit) {
break;
}
}
}
} finally {
AtlasPerfTracer.log(perf);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== EntitySearchProcessor.execute({}): ret.size()={}", context, ret.size());
}
return ret;
}
@Override
public List<AtlasVertex> filter(List<AtlasVertex> entityVertices) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> EntitySearchProcessor.filter({})", entityVertices.size());
}
AtlasGraphQuery query = context.getGraph().query().in(Constants.GUID_PROPERTY_KEY, getGuids(entityVertices));
query.addConditionsFrom(allGraphQuery);
List<AtlasVertex> ret = getVertices(query.vertices().iterator());
ret = super.filter(ret);
if (LOG.isDebugEnabled()) {
LOG.debug("<== EntitySearchProcessor.filter({}): ret.size()={}", entityVertices.size(), ret.size());
}
return ret;
}
private void addClassificationNameConditionIfNecessary(AtlasGraphQuery query) {
if (context.getClassificationType() != null && !context.needClassificationProcessor()) {
query.in(Constants.TRAIT_NAMES_PROPERTY_KEY, context.getClassificationType().getTypeAndAllSubTypes());
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.discovery;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class FullTextSearchProcessor extends SearchProcessor {
private static final Logger LOG = LoggerFactory.getLogger(FullTextSearchProcessor.class);
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("FullTextSearchProcessor");
private final AtlasIndexQuery indexQuery;
public FullTextSearchProcessor(SearchContext context) {
super(context);
SearchParameters searchParameters = context.getSearchParameters();
String queryString = String.format("v.\"%s\":(%s)", Constants.ENTITY_TEXT_PROPERTY_KEY, searchParameters.getQuery());
indexQuery = context.getGraph().indexQuery(Constants.FULLTEXT_INDEX, queryString);
}
@Override
public List<AtlasVertex> execute() {
if (LOG.isDebugEnabled()) {
LOG.debug("==> FullTextSearchProcessor.execute({})", context);
}
List<AtlasVertex> ret = new ArrayList<>();
AtlasPerfTracer perf = null;
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "FullTextSearchProcessor.execute(" + context + ")");
}
try {
int qryOffset = nextProcessor == null ? context.getSearchParameters().getOffset() : 0;
int limit = context.getSearchParameters().getLimit();
int resultIdx = qryOffset;
while (ret.size() < limit) {
if (context.terminateSearch()) {
LOG.warn("query terminated: {}", context.getSearchParameters());
break;
}
Iterator<AtlasIndexQuery.Result> idxQueryResult = indexQuery.vertices(qryOffset, limit);
if (!idxQueryResult.hasNext()) { // no more results from solr - end of search
break;
}
qryOffset += limit;
List<AtlasVertex> vertices = getVerticesFromIndexQueryResult(idxQueryResult);
vertices = super.filter(vertices);
for (AtlasVertex vertex : vertices) {
resultIdx++;
if (resultIdx < context.getSearchParameters().getOffset()) {
continue;
}
ret.add(vertex);
if (ret.size() == limit) {
break;
}
}
}
} finally {
AtlasPerfTracer.log(perf);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== FullTextSearchProcessor.execute({}): ret.size()={}", context, ret.size());
}
return ret;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.discovery;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria;
import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria.Condition;
import org.apache.atlas.model.discovery.SearchParameters.Operator;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import static org.apache.atlas.discovery.SearchPipeline.IndexResultType;
import static org.apache.atlas.discovery.SearchPipeline.PipelineContext;
import static org.apache.atlas.discovery.SearchPipeline.PipelineStep;
import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.ComparisionOperator;
import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.MatchingOperator;
@Component
public class GremlinStep implements PipelineStep {
private static final Logger LOG = LoggerFactory.getLogger(GremlinStep.class);
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("GremlinSearchStep");
private final AtlasGraph graph;
private final AtlasTypeRegistry typeRegistry;
enum GremlinFilterQueryType { TAG, ENTITY }
@Inject
public GremlinStep(AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
this.graph = graph;
this.typeRegistry = typeRegistry;
}
@Override
public void execute(PipelineContext context) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> GremlinStep.execute({})", context);
}
if (context == null) {
throw new AtlasBaseException("Can't start search without any context");
}
AtlasPerfTracer perf = null;
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "GremlinSearchStep.execute(" + context + ")");
}
final Iterator<AtlasVertex> result;
if (context.hasIndexResults()) {
// We have some results from the indexed step, let's proceed accordingly
if (context.getIndexResultType() == IndexResultType.TAG) {
// Index search was done on tag and filters
if (context.isTagProcessingComplete()) {
LOG.debug("GremlinStep.execute(): index has completely processed tag, further TAG filtering not needed");
Set<String> taggedVertexGUIDs = new HashSet<>();
Iterator<AtlasIndexQuery.Result> tagVertexIterator = context.getIndexResultsIterator();
while (tagVertexIterator.hasNext()) {
// Find out which Vertex has this outgoing edge
AtlasVertex vertex = tagVertexIterator.next().getVertex();
Iterable<AtlasEdge> edges = vertex.getEdges(AtlasEdgeDirection.IN);
for (AtlasEdge edge : edges) {
String guid = AtlasGraphUtilsV1.getIdFromVertex(edge.getOutVertex());
taggedVertexGUIDs.add(guid);
}
}
// No entities are tagged (actually this check is already done)
if (!taggedVertexGUIDs.isEmpty()) {
result = processEntity(taggedVertexGUIDs, context);
} else {
result = null;
}
} else {
result = processTagAndEntity(Collections.<String>emptySet(), context);
}
} else if (context.getIndexResultType() == IndexResultType.TEXT) {
// Index step processed full-text;
Set<String> entityIDs = getVertexIDs(context.getIndexResultsIterator());
result = processTagAndEntity(entityIDs, context);
} else if (context.getIndexResultType() == IndexResultType.ENTITY) {
// Index step processed entity and it's filters; tag filter wouldn't be set
Set<String> entityIDs = getVertexIDs(context.getIndexResultsIterator());
result = processEntity(entityIDs, context);
} else {
result = null;
}
} else {
// No index results, need full processing in Gremlin
if (context.getClassificationType() != null) {
// Process tag and filters first, then entity filters
result = processTagAndEntity(Collections.<String>emptySet(), context);
} else {
result = processEntity(Collections.<String>emptySet(), context);
}
}
context.setGremlinResultIterator(result);
AtlasPerfTracer.log(perf);
if (LOG.isDebugEnabled()) {
LOG.debug("<== GremlinStep.execute({})", context);
}
}
private Iterator<AtlasVertex> processEntity(Set<String> entityGUIDs, PipelineContext context) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> GremlinStep.processEntity(entityGUIDs={})", entityGUIDs);
}
final Iterator<AtlasVertex> ret;
SearchParameters searchParameters = context.getSearchParameters();
AtlasEntityType entityType = context.getEntityType();
if (entityType != null) {
AtlasGraphQuery entityFilterQuery = context.getGraphQuery("ENTITY_FILTER");
if (entityFilterQuery == null) {
entityFilterQuery = graph.query().in(Constants.TYPE_NAME_PROPERTY_KEY, entityType.getTypeAndAllSubTypes());
if (searchParameters.getEntityFilters() != null) {
toGremlinFilterQuery(GremlinFilterQueryType.ENTITY, entityType, searchParameters.getEntityFilters(), entityFilterQuery, context);
}
if (searchParameters.getExcludeDeletedEntities()) {
entityFilterQuery.has(Constants.STATE_PROPERTY_KEY, "ACTIVE");
}
context.cacheGraphQuery("ENTITY_FILTER", entityFilterQuery);
}
// Now get all vertices
if (CollectionUtils.isEmpty(entityGUIDs)) {
ret = entityFilterQuery.vertices(context.getCurrentOffset(), context.getMaxLimit()).iterator();
} else {
AtlasGraphQuery guidQuery = graph.query().in(Constants.GUID_PROPERTY_KEY, entityGUIDs);
if (entityFilterQuery != null) {
guidQuery.addConditionsFrom(entityFilterQuery);
} else if (searchParameters.getExcludeDeletedEntities()) {
guidQuery.has(Constants.STATE_PROPERTY_KEY, "ACTIVE");
}
ret = guidQuery.vertices(context.getMaxLimit()).iterator();
}
} else if (CollectionUtils.isNotEmpty(entityGUIDs)) {
AtlasGraphQuery guidQuery = graph.query().in(Constants.GUID_PROPERTY_KEY, entityGUIDs);
if (searchParameters.getExcludeDeletedEntities()) {
guidQuery.has(Constants.STATE_PROPERTY_KEY, "ACTIVE");
}
Iterable<AtlasVertex> vertices = guidQuery.vertices(context.getMaxLimit());
ret = vertices.iterator();
} else {
ret = null;
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== GremlinStep.processEntity(entityGUIDs={})", entityGUIDs);
}
return ret;
}
private Iterator<AtlasVertex> processTagAndEntity(Set<String> entityGUIDs, PipelineContext context) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> GremlinStep.processTagAndEntity(entityGUIDs={})", entityGUIDs);
}
final Iterator<AtlasVertex> ret;
AtlasClassificationType classificationType = context.getClassificationType();
if (classificationType != null) {
AtlasGraphQuery tagVertexQuery = context.getGraphQuery("TAG_VERTEX");
if (tagVertexQuery == null) {
tagVertexQuery = graph.query().in(Constants.TYPE_NAME_PROPERTY_KEY, classificationType.getTypeAndAllSubTypes());
SearchParameters searchParameters = context.getSearchParameters();
// Do tag filtering first as it'll return a smaller subset of vertices
if (searchParameters.getTagFilters() != null) {
toGremlinFilterQuery(GremlinFilterQueryType.TAG, classificationType, searchParameters.getTagFilters(), tagVertexQuery, context);
}
context.cacheGraphQuery("TAG_VERTEX", tagVertexQuery);
}
if (tagVertexQuery != null) {
Set<String> taggedVertexGuids = new HashSet<>();
// Now get all vertices after adjusting offset for each iteration
LOG.debug("Firing TAG query");
Iterator<AtlasVertex> tagVertexIterator = tagVertexQuery.vertices(context.getCurrentOffset(), context.getMaxLimit()).iterator();
while (tagVertexIterator.hasNext()) {
// Find out which Vertex has this outgoing edge
Iterable<AtlasEdge> edges = tagVertexIterator.next().getEdges(AtlasEdgeDirection.IN);
for (AtlasEdge edge : edges) {
String guid = AtlasGraphUtilsV1.getIdFromVertex(edge.getOutVertex());
taggedVertexGuids.add(guid);
}
}
entityGUIDs = taggedVertexGuids;
}
}
if (!entityGUIDs.isEmpty()) {
ret = processEntity(entityGUIDs, context);
} else {
ret = null;
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== GremlinStep.processTagAndEntity(entityGUIDs={})", entityGUIDs);
}
return ret;
}
private Set<String> getVertexIDs(Iterator<AtlasIndexQuery.Result> idxResultsIterator) {
Set<String> guids = new HashSet<>();
while (idxResultsIterator.hasNext()) {
AtlasVertex vertex = idxResultsIterator.next().getVertex();
String guid = AtlasGraphUtilsV1.getIdFromVertex(vertex);
guids.add(guid);
}
return guids;
}
private Set<String> getVertexIDs(Iterable<AtlasVertex> vertices) {
Set<String> guids = new HashSet<>();
for (AtlasVertex vertex : vertices) {
String guid = AtlasGraphUtilsV1.getIdFromVertex(vertex);
guids.add(guid);
}
return guids;
}
private AtlasGraphQuery toGremlinFilterQuery(GremlinFilterQueryType queryType, AtlasStructType type, FilterCriteria criteria,
AtlasGraphQuery query, PipelineContext context) {
if (criteria.getCondition() != null) {
if (criteria.getCondition() == Condition.AND) {
for (FilterCriteria filterCriteria : criteria.getCriterion()) {
AtlasGraphQuery nestedQuery = toGremlinFilterQuery(queryType, type, filterCriteria, graph.query(), context);
query.addConditionsFrom(nestedQuery);
}
} else {
List<AtlasGraphQuery> orConditions = new LinkedList<>();
for (FilterCriteria filterCriteria : criteria.getCriterion()) {
AtlasGraphQuery nestedQuery = toGremlinFilterQuery(queryType, type, filterCriteria, graph.query(), context);
// FIXME: Something might not be right here as the queries are getting overwritten sometimes
orConditions.add(graph.query().createChildQuery().addConditionsFrom(nestedQuery));
}
if (!orConditions.isEmpty()) {
query.or(orConditions);
}
}
} else {
String attrName = criteria.getAttributeName();
String attrValue = criteria.getAttributeValue();
Operator operator = criteria.getOperator();
try {
// If attribute belongs to supertype then adjust the name accordingly
final String qualifiedAttributeName;
final boolean attrProcessed;
if (queryType == GremlinFilterQueryType.TAG) {
qualifiedAttributeName = type.getQualifiedAttributeName(attrName);
attrProcessed = context.hasProcessedTagAttribute(qualifiedAttributeName);
} else {
qualifiedAttributeName = type.getQualifiedAttributeName(attrName);
attrProcessed = context.hasProcessedEntityAttribute(qualifiedAttributeName);
}
// Check if the qualifiedAttribute has been processed
if (!attrProcessed) {
switch (operator) {
case LT:
query.has(qualifiedAttributeName, ComparisionOperator.LESS_THAN, attrValue);
break;
case LTE:
query.has(qualifiedAttributeName, ComparisionOperator.LESS_THAN_EQUAL, attrValue);
break;
case GT:
query.has(qualifiedAttributeName, ComparisionOperator.GREATER_THAN, attrValue);
break;
case GTE:
query.has(qualifiedAttributeName, ComparisionOperator.GREATER_THAN_EQUAL, attrValue);
break;
case EQ:
query.has(qualifiedAttributeName, ComparisionOperator.EQUAL, attrValue);
break;
case NEQ:
query.has(qualifiedAttributeName, ComparisionOperator.NOT_EQUAL, attrValue);
break;
case LIKE:
// TODO: Maybe we need to validate pattern
query.has(qualifiedAttributeName, MatchingOperator.REGEX, getLikeRegex(attrValue));
break;
case CONTAINS:
query.has(qualifiedAttributeName, MatchingOperator.REGEX, getContainsRegex(attrValue));
break;
case STARTS_WITH:
query.has(qualifiedAttributeName, MatchingOperator.PREFIX, attrValue);
break;
case ENDS_WITH:
query.has(qualifiedAttributeName, MatchingOperator.REGEX, getSuffixRegex(attrValue));
break;
case IN:
LOG.warn("{}: unsupported operator. Ignored", operator);
break;
}
}
} catch (AtlasBaseException e) {
LOG.error("toGremlinFilterQuery(): failed for attrName=" + attrName + "; operator=" + operator + "; attrValue=" + attrValue, e);
}
}
return query;
}
private String getContainsRegex(String attributeValue) {
return ".*" + attributeValue + ".*";
}
private String getSuffixRegex(String attributeValue) {
return ".*" + attributeValue;
}
private String getLikeRegex(String attributeValue) { return ".*" + attributeValue + ".*"; }
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.discovery;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.Set;
public class SearchContext {
private final SearchParameters searchParameters;
private final AtlasTypeRegistry typeRegistry;
private final AtlasGraph graph;
private final Set<String> indexedKeys;
private final AtlasEntityType entityType;
private final AtlasClassificationType classificationType;
private SearchProcessor searchProcessor;
private boolean terminateSearch = false;
public SearchContext(SearchParameters searchParameters, AtlasTypeRegistry typeRegistry, AtlasGraph graph, Set<String> indexedKeys) {
this.searchParameters = searchParameters;
this.typeRegistry = typeRegistry;
this.graph = graph;
this.indexedKeys = indexedKeys;
this.entityType = typeRegistry.getEntityTypeByName(searchParameters.getTypeName());
this.classificationType = typeRegistry.getClassificationTypeByName(searchParameters.getClassification());
if (needFullTextrocessor()) {
addProcessor(new FullTextSearchProcessor(this));
}
if (needClassificationProcessor()) {
addProcessor(new ClassificationSearchProcessor(this));
}
if (needEntityProcessor()) {
addProcessor(new EntitySearchProcessor(this));
}
}
public SearchParameters getSearchParameters() { return searchParameters; }
public AtlasTypeRegistry getTypeRegistry() { return typeRegistry; }
public AtlasGraph getGraph() { return graph; }
public Set<String> getIndexedKeys() { return indexedKeys; }
public AtlasEntityType getEntityType() { return entityType; }
public AtlasClassificationType getClassificationType() { return classificationType; }
public SearchProcessor getSearchProcessor() { return searchProcessor; }
public boolean terminateSearch() { return this.terminateSearch; }
public void terminateSearch(boolean terminateSearch) { this.terminateSearch = terminateSearch; }
public StringBuilder toString(StringBuilder sb) {
if (sb == null) {
sb = new StringBuilder();
}
sb.append("searchParameters=");
if (searchParameters != null) {
searchParameters.toString(sb);
}
return sb;
}
@Override
public String toString() {
return toString(new StringBuilder()).toString();
}
public boolean needFullTextrocessor() {
return StringUtils.isNotEmpty(searchParameters.getQuery());
}
public boolean needClassificationProcessor() {
return classificationType != null && (hasAttributeFilter(searchParameters.getTagFilters()) || entityType == null);
}
public boolean needEntityProcessor() {
return entityType != null;
}
private boolean hasAttributeFilter(FilterCriteria filterCriteria) {
return filterCriteria != null &&
(CollectionUtils.isNotEmpty(filterCriteria.getCriterion()) || StringUtils.isNotEmpty(filterCriteria.getAttributeName()));
}
private void addProcessor(SearchProcessor processor) {
if (this.searchProcessor == null) {
this.searchProcessor = processor;
} else {
this.searchProcessor.addProcessor(processor);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.discovery;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria;
import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria.Condition;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graphdb.*;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.SearchTracker;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Component
public class SearchPipeline {
private static final Logger LOG = LoggerFactory.getLogger(SearchPipeline.class);
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("SearchPipeline");
enum ExecutionMode { SOLR, GREMLIN, MIXED }
enum IndexResultType { TAG, ENTITY, TEXT }
private final SolrStep solrStep;
private final GremlinStep gremlinStep;
private final SearchTracker searchTracker;
private final AtlasTypeRegistry typeRegistry;
private final Configuration atlasConfiguration;
private final GraphBackedSearchIndexer indexer;
@Inject
public SearchPipeline(SolrStep solrStep, GremlinStep gremlinStep, SearchTracker searchTracker, AtlasTypeRegistry typeRegistry, Configuration atlasConfiguration, GraphBackedSearchIndexer indexer) {
this.solrStep = solrStep;
this.gremlinStep = gremlinStep;
this.searchTracker = searchTracker;
this.typeRegistry = typeRegistry;
this.atlasConfiguration = atlasConfiguration;
this.indexer = indexer;
}
public List<AtlasVertex> run(SearchParameters searchParameters) throws AtlasBaseException {
final List<AtlasVertex> ret;
AtlasPerfTracer perf = null;
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "SearchPipeline.run("+ searchParameters +")");
}
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(searchParameters.getTypeName());
AtlasClassificationType classiType = typeRegistry.getClassificationTypeByName(searchParameters.getClassification());
PipelineContext context = new PipelineContext(searchParameters, entityType, classiType, indexer.getVertexIndexKeys());
String searchId = searchTracker.add(context); // For future cancellation
try {
ExecutionMode mode = determineExecutionMode(context);
if (LOG.isDebugEnabled()) {
LOG.debug("Execution mode {}", mode);
}
switch (mode) {
case SOLR:
ret = runOnlySolr(context);
break;
case GREMLIN:
ret = runOnlyGremlin(context);
break;
case MIXED:
ret = runMixed(context);
break;
default:
ret = Collections.emptyList();
}
} finally {
searchTracker.remove(searchId);
AtlasPerfTracer.log(perf);
}
return ret;
}
private List<AtlasVertex> runOnlySolr(PipelineContext context) throws AtlasBaseException {
// Only when there's no tag and query
List<AtlasVertex> results = new ArrayList<>();
while (results.size() < context.getSearchParameters().getLimit()) {
if (context.getForceTerminate()) {
LOG.debug("search has been terminated");
break;
}
// Execute solr search only
solrStep.execute(context);
List<AtlasVertex> stepResults = getIndexResults(context);
context.incrementSearchRound();
addToResult(results, stepResults, context.getSearchParameters().getLimit());
if (LOG.isDebugEnabled()) {
LOG.debug("Pipeline iteration {}: stepResults={}; totalResult={}", context.getIterationCount(), stepResults.size(), results.size());
}
if (CollectionUtils.isEmpty(stepResults)) {
// If no result is found any subsequent iteration then just stop querying the index
break;
}
}
if (context.getIndexResultType() == IndexResultType.TAG) {
List<AtlasVertex> entityVertices = new ArrayList<>(results.size());
for (AtlasVertex tagVertex : results) {
Iterable<AtlasEdge> edges = tagVertex.getEdges(AtlasEdgeDirection.IN);
for (AtlasEdge edge : edges) {
AtlasVertex entityVertex = edge.getOutVertex();
entityVertices.add(entityVertex);
}
}
results = entityVertices;
}
return results;
}
private List<AtlasVertex> runOnlyGremlin(PipelineContext context) throws AtlasBaseException {
List<AtlasVertex> results = new ArrayList<>();
while (results.size() < context.getSearchParameters().getLimit()) {
if (context.getForceTerminate()) {
LOG.debug("search has been terminated");
break;
}
gremlinStep.execute(context);
List<AtlasVertex> stepResults = getGremlinResults(context);
context.incrementSearchRound();
addToResult(results, stepResults, context.getSearchParameters().getLimit());
if (LOG.isDebugEnabled()) {
LOG.debug("Pipeline iteration {}: stepResults={}; totalResult={}", context.getIterationCount(), stepResults.size(), results.size());
}
if (CollectionUtils.isEmpty(stepResults)) {
// If no result is found any subsequent iteration then just stop querying the index
break;
}
}
return results;
}
/*
1. Index processes few attributes and then gremlin processes rest
1.1 Iterate for gremlin till the index results are non null
2. Index processes all attributes, gremlin has nothing to do
Sometimes the result set might be less than the max limit and we need to iterate until the result set is full
or the iteration doesn't return any results
*/
private List<AtlasVertex> runMixed(PipelineContext context) throws AtlasBaseException {
List<AtlasVertex> results = new ArrayList<>();
while (results.size() < context.getSearchParameters().getLimit()) {
if (context.getForceTerminate()) {
LOG.debug("search has been terminated");
break;
}
// Execute Solr search and then pass it to the Gremlin step (if needed)
solrStep.execute(context);
if (!context.hasIndexResults()) {
if (LOG.isDebugEnabled()) {
LOG.debug("No index results in iteration {}", context.getIterationCount());
}
// If no result is found any subsequent iteration then just stop querying the index
break;
}
// Attributes partially processed by Solr, use gremlin to process remaining attribute(s)
gremlinStep.execute(context);
context.incrementSearchRound();
List<AtlasVertex> stepResults = getGremlinResults(context);
addToResult(results, stepResults, context.getSearchParameters().getLimit());
if (LOG.isDebugEnabled()) {
LOG.debug("Pipeline iteration {}: stepResults={}; totalResult={}", context.getIterationCount(), stepResults.size(), results.size());
}
}
return results;
}
private void addToResult(List<AtlasVertex> result, List<AtlasVertex> stepResult, int maxLimit) {
if (result != null && stepResult != null && result.size() < maxLimit) {
for (AtlasVertex vertex : stepResult) {
result.add(vertex);
if (result.size() >= maxLimit) {
break;
}
}
}
}
private List<AtlasVertex> getIndexResults(PipelineContext pipelineContext) {
List<AtlasVertex> ret = new ArrayList<>();
if (pipelineContext.hasIndexResults()) {
Iterator<AtlasIndexQuery.Result> iter = pipelineContext.getIndexResultsIterator();
while(iter.hasNext()) {
ret.add(iter.next().getVertex());
}
}
return ret;
}
private List<AtlasVertex> getGremlinResults(PipelineContext pipelineContext) {
List<AtlasVertex> ret = new ArrayList<>();
if (pipelineContext.hasGremlinResults()) {
Iterator<AtlasVertex> iter = pipelineContext.getGremlinResultIterator();
while (iter.hasNext()) {
ret.add(iter.next());
}
}
return ret;
}
private ExecutionMode determineExecutionMode(PipelineContext context) {
SearchParameters searchParameters = context.getSearchParameters();
AtlasClassificationType classificationType = context.getClassificationType();
AtlasEntityType entityType = context.getEntityType();
int solrCount = 0;
int gremlinCount = 0;
if (StringUtils.isNotEmpty(searchParameters.getQuery())) {
solrCount++;
// __state index only exists in vertex_index
if (searchParameters.getExcludeDeletedEntities()) {
gremlinCount++;
}
}
if (classificationType != null) {
Set<String> typeAndAllSubTypes = classificationType.getTypeAndAllSubTypes();
if (typeAndAllSubTypes.size() > atlasConfiguration.getInt(Constants.INDEX_SEARCH_MAX_TAGS_COUNT, 10)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Classification type {} has too many subTypes ({}) to use in Solr search. Gremlin will be used to execute the search",
classificationType.getTypeName(), typeAndAllSubTypes.size());
}
gremlinCount++;
} else {
if (hasNonIndexedAttrViolation(classificationType, context.getIndexedKeys(), searchParameters.getTagFilters())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Tag filters not suitable for Solr search. Gremlin will be used to execute the search");
}
gremlinCount++;
} else {
solrCount++;
// __state index only exist in vertex_index
if (searchParameters.getExcludeDeletedEntities()) {
gremlinCount++;
}
}
}
}
if (entityType != null) {
Set<String> typeAndAllSubTypes = entityType.getTypeAndAllSubTypes();
if (typeAndAllSubTypes.size() > atlasConfiguration.getInt(Constants.INDEX_SEARCH_MAX_TYPES_COUNT, 10)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Entity type {} has too many subTypes ({}) to use in Solr search. Gremlin will be used to execute the search",
entityType.getTypeName(), typeAndAllSubTypes.size());
}
gremlinCount++;
} else {
if (hasNonIndexedAttrViolation(entityType, context.getIndexedKeys(), searchParameters.getEntityFilters())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Entity filters not suitable for Solr search. Gremlin will be used to execute the search");
}
gremlinCount++;
} else {
solrCount++;
}
}
}
ExecutionMode mode = ExecutionMode.MIXED;
if (solrCount == 1 && gremlinCount == 0) {
mode = ExecutionMode.SOLR;
} else if (gremlinCount == 1 && solrCount == 0) {
mode = ExecutionMode.GREMLIN;
}
return mode;
}
// If Index can't process all attributes and any of the non-indexed attribute is present in OR nested within AND
// then the only way is Gremlin
// A violation (here) is defined as presence of non-indexed attribute within any OR clause nested under an AND clause
// the reason being that the index would not be able to process the nested OR attribute which might result in
// exclusion of valid result (vertex)
private boolean hasNonIndexedAttrViolation(AtlasStructType structType, Set<String> indexKeys, FilterCriteria filterCriteria) {
return hasNonIndexedAttrViolation(structType, indexKeys, filterCriteria, false);
}
private boolean hasNonIndexedAttrViolation(AtlasStructType structType, Set<String> indexKeys, FilterCriteria filterCriteria, boolean enclosedInOrCondition) {
if (filterCriteria == null) {
return false;
}
boolean ret = false;
Condition filterCondition = filterCriteria.getCondition();
List<FilterCriteria> criterion = filterCriteria.getCriterion();
if (filterCondition != null && CollectionUtils.isNotEmpty(criterion)) {
if (!enclosedInOrCondition) {
enclosedInOrCondition = filterCondition == Condition.OR;
}
// If we have nested criterion let's find any nested ORs with non-indexed attr
for (FilterCriteria criteria : criterion) {
ret |= hasNonIndexedAttrViolation(structType, indexKeys, criteria, enclosedInOrCondition);
if (ret) {
break;
}
}
} else if (StringUtils.isNotEmpty(filterCriteria.getAttributeName())) {
// If attribute qualified name doesn't exist in the vertex index we potentially might have a problem
try {
String qualifiedAttributeName = structType.getQualifiedAttributeName(filterCriteria.getAttributeName());
ret = CollectionUtils.isEmpty(indexKeys) || !indexKeys.contains(qualifiedAttributeName);
if (ret) {
LOG.warn("search includes non-indexed attribute '{}'; might cause poor performance", qualifiedAttributeName);
}
} catch (AtlasBaseException e) {
LOG.warn(e.getMessage());
ret = true;
}
}
// return ret && enclosedInOrCondition;
return ret;
}
public interface PipelineStep {
void execute(PipelineContext context) throws AtlasBaseException;
}
public static class PipelineContext {
// TODO: See if anything can be cached in the context
private final SearchParameters searchParameters;
private final AtlasEntityType entityType;
private final AtlasClassificationType classificationType;
private final Set<String> indexedKeys;
private int iterationCount;
private boolean forceTerminate;
private int currentOffset;
private int maxLimit;
// Continuous processing stuff
private Set<String> tagSearchAttributes = new HashSet<>();
private Set<String> entitySearchAttributes = new HashSet<>();
private Set<String> tagAttrProcessedBySolr = new HashSet<>();
private Set<String> entityAttrProcessedBySolr = new HashSet<>();
// Results related stuff
private IndexResultType indexResultType;
private Iterator<AtlasIndexQuery.Result> indexResultsIterator;
private Iterator<AtlasVertex> gremlinResultIterator;
private Map<String, AtlasIndexQuery> cachedIndexQueries = new HashMap<>();
private Map<String, AtlasGraphQuery> cachedGraphQueries = new HashMap<>();
public PipelineContext(SearchParameters searchParameters, AtlasEntityType entityType, AtlasClassificationType classificationType, Set<String> indexedKeys) {
this.searchParameters = searchParameters;
this.entityType = entityType;
this.classificationType = classificationType;
this.indexedKeys = indexedKeys;
currentOffset = searchParameters.getOffset();
maxLimit = searchParameters.getLimit();
}
public SearchParameters getSearchParameters() {
return searchParameters;
}
public AtlasEntityType getEntityType() {
return entityType;
}
public AtlasClassificationType getClassificationType() {
return classificationType;
}
public Set<String> getIndexedKeys() { return indexedKeys; }
public int getIterationCount() {
return iterationCount;
}
public boolean getForceTerminate() {
return forceTerminate;
}
public void setForceTerminate(boolean forceTerminate) {
this.forceTerminate = forceTerminate;
}
public boolean hasProcessedTagAttribute(String attributeName) {
return tagAttrProcessedBySolr.contains(attributeName);
}
public boolean hasProcessedEntityAttribute(String attributeName) {
return entityAttrProcessedBySolr.contains(attributeName);
}
public Iterator<AtlasIndexQuery.Result> getIndexResultsIterator() {
return indexResultsIterator;
}
public void setIndexResultsIterator(Iterator<AtlasIndexQuery.Result> indexResultsIterator) {
this.indexResultsIterator = indexResultsIterator;
}
public Iterator<AtlasVertex> getGremlinResultIterator() {
return gremlinResultIterator;
}
public void setGremlinResultIterator(Iterator<AtlasVertex> gremlinResultIterator) {
this.gremlinResultIterator = gremlinResultIterator;
}
public boolean hasIndexResults() {
return null != indexResultsIterator && indexResultsIterator.hasNext();
}
public boolean hasGremlinResults() {
return null != gremlinResultIterator && gremlinResultIterator.hasNext();
}
public boolean isTagProcessingComplete() {
return CollectionUtils.isEmpty(tagSearchAttributes) ||
CollectionUtils.isEqualCollection(tagSearchAttributes, tagAttrProcessedBySolr);
}
public boolean isEntityProcessingComplete() {
return CollectionUtils.isEmpty(entitySearchAttributes) ||
CollectionUtils.isEqualCollection(entitySearchAttributes, entityAttrProcessedBySolr);
}
public boolean isProcessingComplete() {
return isTagProcessingComplete() && isEntityProcessingComplete();
}
public void incrementOffset(int increment) {
currentOffset += increment;
}
public void incrementSearchRound() {
iterationCount ++;
incrementOffset(searchParameters.getLimit());
}
public int getCurrentOffset() {
return currentOffset;
}
public boolean addTagSearchAttribute(String attribute) {
return tagSearchAttributes.add(attribute);
}
public boolean addProcessedTagAttribute(String attribute) {
return tagAttrProcessedBySolr.add(attribute);
}
public boolean addEntitySearchAttribute(String attribute) {
return tagSearchAttributes.add(attribute);
}
public boolean addProcessedEntityAttribute(String attribute) {
return entityAttrProcessedBySolr.add(attribute);
}
public void cacheGraphQuery(String name, AtlasGraphQuery graphQuery) {
cachedGraphQueries.put(name, graphQuery);
}
public void cacheIndexQuery(String name, AtlasIndexQuery indexQuery) {
cachedIndexQueries.put(name, indexQuery);
}
public AtlasIndexQuery getIndexQuery(String name){
return cachedIndexQueries.get(name);
}
public AtlasGraphQuery getGraphQuery(String name) {
return cachedGraphQueries.get(name);
}
public IndexResultType getIndexResultType() {
return indexResultType;
}
public void setIndexResultType(IndexResultType indexResultType) {
this.indexResultType = indexResultType;
}
public int getMaxLimit() {
return maxLimit;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("iterationCount", iterationCount)
.append("forceTerminate", forceTerminate)
.append("currentOffset", currentOffset)
.append("maxLimit", maxLimit)
.append("searchParameters", searchParameters)
.append("tagSearchAttributes", tagSearchAttributes)
.append("entitySearchAttributes", entitySearchAttributes)
.append("tagAttrProcessedBySolr", tagAttrProcessedBySolr)
.append("entityAttrProcessedBySolr", entityAttrProcessedBySolr)
.append("indexResultType", indexResultType)
.append("cachedIndexQueries", cachedIndexQueries)
.append("cachedGraphQueries", cachedGraphQueries)
.toString();
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.discovery;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria;
import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria.Condition;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.*;
import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.regex.Pattern;
public abstract class SearchProcessor {
private static final Logger LOG = LoggerFactory.getLogger(SearchProcessor.class);
public static final Pattern STRAY_AND_PATTERN = Pattern.compile("(AND\\s+)+\\)");
public static final Pattern STRAY_OR_PATTERN = Pattern.compile("(OR\\s+)+\\)");
public static final Pattern STRAY_ELIPSIS_PATTERN = Pattern.compile("(\\(\\s*)\\)");
public static final int MAX_RESULT_SIZE = getApplicationProperty(Constants.INDEX_SEARCH_MAX_RESULT_SET_SIZE, 150);
public static final int MAX_ENTITY_TYPES_IN_INDEX_QUERY = getApplicationProperty(Constants.INDEX_SEARCH_MAX_TYPES_COUNT, 10);
public static final int MAX_CLASSIFICATION_TYPES_IN_INDEX_QUERY = getApplicationProperty(Constants.INDEX_SEARCH_MAX_TAGS_COUNT, 10);
public static final String AND_STR = " AND ";
public static final String EMPTY_STRING = "";
public static final String SPACE_STRING = " ";
public static final String BRACE_OPEN_STR = "( ";
public static final String BRACE_CLOSE_STR = " )";
private static final Map<SearchParameters.Operator, String> OPERATOR_MAP = new HashMap<>();
static
{
OPERATOR_MAP.put(SearchParameters.Operator.LT,"v.\"%s\": [* TO %s}");
OPERATOR_MAP.put(SearchParameters.Operator.GT,"v.\"%s\": {%s TO *]");
OPERATOR_MAP.put(SearchParameters.Operator.LTE,"v.\"%s\": [* TO %s]");
OPERATOR_MAP.put(SearchParameters.Operator.GTE,"v.\"%s\": [%s TO *]");
OPERATOR_MAP.put(SearchParameters.Operator.EQ,"v.\"%s\": %s");
OPERATOR_MAP.put(SearchParameters.Operator.NEQ,"v.\"%s\": (NOT %s)");
OPERATOR_MAP.put(SearchParameters.Operator.IN, "v.\"%s\": (%s)");
OPERATOR_MAP.put(SearchParameters.Operator.LIKE, "v.\"%s\": (%s)");
OPERATOR_MAP.put(SearchParameters.Operator.STARTS_WITH, "v.\"%s\": (%s*)");
OPERATOR_MAP.put(SearchParameters.Operator.ENDS_WITH, "v.\"%s\": (*%s)");
OPERATOR_MAP.put(SearchParameters.Operator.CONTAINS, "v.\"%s\": (*%s*)");
}
protected final SearchContext context;
protected SearchProcessor nextProcessor;
protected SearchProcessor(SearchContext context) {
this.context = context;
}
public void addProcessor(SearchProcessor processor) {
if (nextProcessor == null) {
nextProcessor = processor;
} else {
nextProcessor.addProcessor(processor);
}
}
public abstract List<AtlasVertex> execute();
public List<AtlasVertex> filter(List<AtlasVertex> entityVertices) {
return nextProcessor == null ? entityVertices : nextProcessor.filter(entityVertices);
}
protected void processSearchAttributes(AtlasStructType structType, FilterCriteria filterCriteria, Set<String> solrFiltered, Set<String> gremlinFiltered, Set<String> allAttributes) {
if (structType == null || filterCriteria == null) {
return;
}
Condition filterCondition = filterCriteria.getCondition();
List<FilterCriteria> criterion = filterCriteria.getCriterion();
if (filterCondition != null && CollectionUtils.isNotEmpty(criterion)) {
for (SearchParameters.FilterCriteria criteria : criterion) {
processSearchAttributes(structType, criteria, solrFiltered, gremlinFiltered, allAttributes);
}
} else if (StringUtils.isNotEmpty(filterCriteria.getAttributeName())) {
try {
String attributeName = filterCriteria.getAttributeName();
String qualifiedName = structType.getQualifiedAttributeName(attributeName);
Set<String> indexedKeys = context.getIndexedKeys();
if (indexedKeys != null && indexedKeys.contains(qualifiedName)) {
solrFiltered.add(attributeName);
} else {
LOG.warn("search includes non-indexed attribute '{}'; might cause poor performance", qualifiedName);
gremlinFiltered.add(attributeName);
}
allAttributes.add(attributeName);
} catch (AtlasBaseException e) {
LOG.warn(e.getMessage());
}
}
}
//
// If filterCriteria contains any non-indexed attribute inside OR condition:
// Solr+Grelin can't be used. Need to use only Gremlin filter for all attributes. Examples:
// (OR idx-att1=x non-idx-attr=z)
// (AND idx-att1=x (OR idx-attr2=y non-idx-attr=z))
// Else
// Solr can be used for indexed-attribute filtering and Gremlin for non-indexed attributes. Examples:
// (AND idx-att1=x idx-attr2=y non-idx-attr=z)
// (AND (OR idx-att1=x idx-attr1=y) non-idx-attr=z)
// (AND (OR idx-att1=x idx-attr1=y) non-idx-attr=z (AND idx-attr2=xyz idx-attr2=abc))
//
protected boolean canApplySolrFilter(AtlasStructType structType, FilterCriteria filterCriteria, boolean insideOrCondition) {
if (filterCriteria == null) {
return true;
}
boolean ret = true;
Condition filterCondition = filterCriteria.getCondition();
List<FilterCriteria> criterion = filterCriteria.getCriterion();
Set<String> indexedKeys = context.getIndexedKeys();
if (filterCondition != null && CollectionUtils.isNotEmpty(criterion)) {
insideOrCondition = insideOrCondition || filterCondition == Condition.OR;
// If we have nested criterion let's find any nested ORs with non-indexed attr
for (FilterCriteria criteria : criterion) {
ret = canApplySolrFilter(structType, criteria, insideOrCondition);
if (!ret) {
break;
}
}
} else if (StringUtils.isNotEmpty(filterCriteria.getAttributeName())) {
try {
String qualifiedName = structType.getQualifiedAttributeName(filterCriteria.getAttributeName());
if (insideOrCondition && (indexedKeys == null || !indexedKeys.contains(qualifiedName))) {
ret = false;
}
} catch (AtlasBaseException e) {
LOG.warn(e.getMessage());
}
}
return ret;
}
protected void constructTypeTestQuery(StringBuilder solrQuery, Set<String> typeAndAllSubTypes) {
String typeAndSubtypesString = StringUtils.join(typeAndAllSubTypes, SPACE_STRING);
solrQuery.append("v.\"__typeName\": (")
.append(typeAndSubtypesString)
.append(")");
}
protected void constructFilterQuery(StringBuilder solrQuery, AtlasStructType type, FilterCriteria filterCriteria, Set<String> solrAttributes) {
if (filterCriteria != null) {
LOG.debug("Processing Filters");
String filterQuery = toSolrQuery(type, filterCriteria, solrAttributes);
if (StringUtils.isNotEmpty(filterQuery)) {
solrQuery.append(AND_STR).append(filterQuery);
}
}
if (type instanceof AtlasEntityType && context.getSearchParameters().getExcludeDeletedEntities()) {
solrQuery.append(AND_STR).append("v.\"__state\":").append("ACTIVE");
}
}
private String toSolrQuery(AtlasStructType type, FilterCriteria criteria, Set<String> solrAttributes) {
return toSolrQuery(type, criteria, solrAttributes, new StringBuilder());
}
private String toSolrQuery(AtlasStructType type, FilterCriteria criteria, Set<String> solrAttributes, StringBuilder sb) {
if (criteria.getCondition() != null && CollectionUtils.isNotEmpty(criteria.getCriterion())) {
StringBuilder nestedExpression = new StringBuilder();
for (FilterCriteria filterCriteria : criteria.getCriterion()) {
String nestedQuery = toSolrQuery(type, filterCriteria, solrAttributes);
if (StringUtils.isNotEmpty(nestedQuery)) {
if (nestedExpression.length() > 0) {
nestedExpression.append(SPACE_STRING).append(criteria.getCondition()).append(SPACE_STRING);
}
nestedExpression.append(nestedQuery);
}
}
return nestedExpression.length() > 0 ? sb.append(BRACE_OPEN_STR).append(nestedExpression.toString()).append(BRACE_CLOSE_STR).toString() : EMPTY_STRING;
} else if (solrAttributes.contains(criteria.getAttributeName())){
return toSolrExpression(type, criteria.getAttributeName(), criteria.getOperator(), criteria.getAttributeValue());
} else {
return EMPTY_STRING;
}
}
private String toSolrExpression(AtlasStructType type, String attrName, SearchParameters.Operator op, String attrVal) {
String ret = EMPTY_STRING;
try {
String qualifiedName = type.getQualifiedAttributeName(attrName);
if (OPERATOR_MAP.get(op) != null) {
ret = String.format(OPERATOR_MAP.get(op), qualifiedName, attrVal);
}
} catch (AtlasBaseException ex) {
LOG.warn(ex.getMessage());
}
return ret;
}
protected AtlasGraphQuery toGremlinFilterQuery(AtlasStructType type, FilterCriteria criteria, Set<String> gremlinAttributes, AtlasGraphQuery query) {
if (criteria != null) {
if (criteria.getCondition() != null) {
if (criteria.getCondition() == Condition.AND) {
for (FilterCriteria filterCriteria : criteria.getCriterion()) {
AtlasGraphQuery nestedQuery = toGremlinFilterQuery(type, filterCriteria, gremlinAttributes, context.getGraph().query());
query.addConditionsFrom(nestedQuery);
}
} else {
List<AtlasGraphQuery> orConditions = new LinkedList<>();
for (FilterCriteria filterCriteria : criteria.getCriterion()) {
AtlasGraphQuery nestedQuery = toGremlinFilterQuery(type, filterCriteria, gremlinAttributes, context.getGraph().query());
orConditions.add(context.getGraph().query().createChildQuery().addConditionsFrom(nestedQuery));
}
if (!orConditions.isEmpty()) {
query.or(orConditions);
}
}
} else if (gremlinAttributes.contains(criteria.getAttributeName())) {
String attrName = criteria.getAttributeName();
String attrValue = criteria.getAttributeValue();
SearchParameters.Operator operator = criteria.getOperator();
try {
final String qualifiedName = type.getQualifiedAttributeName(attrName);
switch (operator) {
case LT:
query.has(qualifiedName, AtlasGraphQuery.ComparisionOperator.LESS_THAN, attrValue);
break;
case LTE:
query.has(qualifiedName, AtlasGraphQuery.ComparisionOperator.LESS_THAN_EQUAL, attrValue);
break;
case GT:
query.has(qualifiedName, AtlasGraphQuery.ComparisionOperator.GREATER_THAN, attrValue);
break;
case GTE:
query.has(qualifiedName, AtlasGraphQuery.ComparisionOperator.GREATER_THAN_EQUAL, attrValue);
break;
case EQ:
query.has(qualifiedName, AtlasGraphQuery.ComparisionOperator.EQUAL, attrValue);
break;
case NEQ:
query.has(qualifiedName, AtlasGraphQuery.ComparisionOperator.NOT_EQUAL, attrValue);
break;
case LIKE:
// TODO: Maybe we need to validate pattern
query.has(qualifiedName, AtlasGraphQuery.MatchingOperator.REGEX, getLikeRegex(attrValue));
break;
case CONTAINS:
query.has(qualifiedName, AtlasGraphQuery.MatchingOperator.REGEX, getContainsRegex(attrValue));
break;
case STARTS_WITH:
query.has(qualifiedName, AtlasGraphQuery.MatchingOperator.PREFIX, attrValue);
break;
case ENDS_WITH:
query.has(qualifiedName, AtlasGraphQuery.MatchingOperator.REGEX, getSuffixRegex(attrValue));
break;
case IN:
LOG.warn("{}: unsupported operator. Ignored", operator);
break;
}
} catch (AtlasBaseException e) {
LOG.error("toGremlinFilterQuery(): failed for attrName=" + attrName + "; operator=" + operator + "; attrValue=" + attrValue, e);
}
}
}
return query;
}
private String getContainsRegex(String attributeValue) {
return ".*" + attributeValue + ".*";
}
private String getSuffixRegex(String attributeValue) {
return ".*" + attributeValue;
}
private String getLikeRegex(String attributeValue) { return ".*" + attributeValue + ".*"; }
protected List<AtlasVertex> getVerticesFromIndexQueryResult(Iterator<AtlasIndexQuery.Result> idxQueryResult) {
List<AtlasVertex> ret = new ArrayList<>();
if (idxQueryResult != null) {
while (idxQueryResult.hasNext()) {
AtlasVertex vertex = idxQueryResult.next().getVertex();
ret.add(vertex);
}
}
return ret;
}
protected List<AtlasVertex> getVertices(Iterator<AtlasVertex> vertices) {
List<AtlasVertex> ret = new ArrayList<>();
if (vertices != null) {
while (vertices.hasNext()) {
AtlasVertex vertex = vertices.next();
ret.add(vertex);
}
}
return ret;
}
protected Set<String> getGuids(List<AtlasVertex> vertices) {
Set<String> ret = new HashSet<>();
if (vertices != null) {
for(AtlasVertex vertex : vertices) {
String guid = AtlasGraphUtilsV1.getIdFromVertex(vertex);
if (StringUtils.isNotEmpty(guid)) {
ret.add(guid);
}
}
}
return ret;
}
private static int getApplicationProperty(String propertyName, int defaultValue) {
try {
return ApplicationProperties.get().getInt(propertyName, defaultValue);
} catch (AtlasException excp) {
// ignore
}
return defaultValue;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.discovery;
import org.apache.atlas.discovery.SearchPipeline.IndexResultType;
import org.apache.atlas.discovery.SearchPipeline.PipelineContext;
import org.apache.atlas.discovery.SearchPipeline.PipelineStep;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.discovery.SearchParameters.Operator;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.type.*;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import static org.apache.atlas.model.discovery.SearchParameters.*;
@Component
public class SolrStep implements PipelineStep {
private static final Logger LOG = LoggerFactory.getLogger(SolrStep.class);
private static final Pattern STRAY_AND_PATTERN = Pattern.compile("(AND\\s+)+\\)");
private static final Pattern STRAY_OR_PATTERN = Pattern.compile("(OR\\s+)+\\)");
private static final Pattern STRAY_ELIPSIS_PATTERN = Pattern.compile("(\\(\\s*)\\)");
private static final String AND_STR = " AND ";
private static final String EMPTY_STRING = "";
private static final String SPACE_STRING = " ";
private static final String BRACE_OPEN_STR = "( ";
private static final String BRACE_CLOSE_STR = " )";
private static final Map<Operator, String> operatorMap = new HashMap<>();
static
{
operatorMap.put(Operator.LT,"v.\"%s\": [* TO %s}");
operatorMap.put(Operator.GT,"v.\"%s\": {%s TO *]");
operatorMap.put(Operator.LTE,"v.\"%s\": [* TO %s]");
operatorMap.put(Operator.GTE,"v.\"%s\": [%s TO *]");
operatorMap.put(Operator.EQ,"v.\"%s\": %s");
operatorMap.put(Operator.NEQ,"v.\"%s\": (NOT %s)");
operatorMap.put(Operator.IN, "v.\"%s\": (%s)");
operatorMap.put(Operator.LIKE, "v.\"%s\": (%s)");
operatorMap.put(Operator.STARTS_WITH, "v.\"%s\": (%s*)");
operatorMap.put(Operator.ENDS_WITH, "v.\"%s\": (*%s)");
operatorMap.put(Operator.CONTAINS, "v.\"%s\": (*%s*)");
}
private final AtlasGraph graph;
@Inject
public SolrStep(AtlasGraph graph) {
this.graph = graph;
}
@Override
public void execute(PipelineContext context) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> SolrStep.execute({})", context);
}
if (context == null) {
throw new AtlasBaseException("Can't start search without any context");
}
SearchParameters searchParameters = context.getSearchParameters();
final Iterator<AtlasIndexQuery.Result> result;
if (StringUtils.isNotEmpty(searchParameters.getQuery())) {
result = executeAgainstFulltextIndex(context);
} else {
result = executeAgainstVertexIndex(context);
}
context.setIndexResultsIterator(result);
if (LOG.isDebugEnabled()) {
LOG.debug("<== SolrStep.execute({})", context);
}
}
private Iterator<AtlasIndexQuery.Result> executeAgainstFulltextIndex(PipelineContext context) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> SolrStep.executeAgainstFulltextIndex()");
}
final Iterator<AtlasIndexQuery.Result> ret;
AtlasIndexQuery query = context.getIndexQuery("FULLTEXT");
if (query == null) {
// Compute only once
SearchParameters searchParameters = context.getSearchParameters();
String indexQuery = String.format("v.\"%s\":(%s)", Constants.ENTITY_TEXT_PROPERTY_KEY, searchParameters.getQuery());
query = graph.indexQuery(Constants.FULLTEXT_INDEX, indexQuery);
context.cacheIndexQuery("FULLTEXT", query);
}
context.setIndexResultType(IndexResultType.TEXT);
ret = query.vertices(context.getCurrentOffset(), context.getMaxLimit());
if (LOG.isDebugEnabled()) {
LOG.debug("<== SolrStep.executeAgainstFulltextIndex()");
}
return ret;
}
private Iterator<AtlasIndexQuery.Result> executeAgainstVertexIndex(PipelineContext context) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> SolrStep.executeAgainstVertexIndex()");
}
final Iterator<AtlasIndexQuery.Result> ret;
SearchParameters searchParameters = context.getSearchParameters();
AtlasIndexQuery query = context.getIndexQuery("VERTEX_INDEX");
if (query == null) {
StringBuilder solrQuery = new StringBuilder();
// If tag is specified then let's start processing using tag and it's attributes, entity filters will
// be pushed to Gremlin
if (context.getClassificationType() != null) {
context.setIndexResultType(IndexResultType.TAG);
constructTypeTestQuery(solrQuery, context.getClassificationType().getTypeAndAllSubTypes());
constructFilterQuery(solrQuery, context.getClassificationType(), searchParameters.getTagFilters(), context);
} else if (context.getEntityType() != null) {
context.setIndexResultType(IndexResultType.ENTITY);
constructTypeTestQuery(solrQuery, context.getEntityType().getTypeAndAllSubTypes());
constructFilterQuery(solrQuery, context.getEntityType(), searchParameters.getEntityFilters(), context);
// Set the status flag
if (searchParameters.getExcludeDeletedEntities()) {
if (solrQuery.length() > 0) {
solrQuery.append(" AND ");
}
solrQuery.append("v.\"__state\":").append("ACTIVE");
}
}
// No query was formed, doesn't make sense to do anything beyond this point
if (solrQuery.length() > 0) {
String validSolrQuery = STRAY_AND_PATTERN.matcher(solrQuery).replaceAll(")");
validSolrQuery = STRAY_OR_PATTERN.matcher(validSolrQuery).replaceAll(")");
validSolrQuery = STRAY_ELIPSIS_PATTERN.matcher(validSolrQuery).replaceAll(EMPTY_STRING);
query = graph.indexQuery(Constants.VERTEX_INDEX, validSolrQuery);
context.cacheIndexQuery("VERTEX_INDEX", query);
}
}
// Execute solr query and return the index results in the context
if (query != null) {
ret = query.vertices(context.getCurrentOffset(), context.getMaxLimit());
} else {
ret = null;
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== SolrStep.executeAgainstVertexIndex()");
}
return ret;
}
private void constructTypeTestQuery(StringBuilder solrQuery, Set<String> typeAndAllSubTypes) {
String typeAndSubtypesString = StringUtils.join(typeAndAllSubTypes, SPACE_STRING);
solrQuery.append("v.\"__typeName\": (")
.append(typeAndSubtypesString)
.append(")");
}
private void constructFilterQuery(StringBuilder solrQuery, AtlasStructType type, FilterCriteria filterCriteria, PipelineContext context) {
if (filterCriteria != null) {
LOG.debug("Processing Filters");
String filterQuery = toSolrQuery(type, filterCriteria, context);
if (StringUtils.isNotEmpty(filterQuery)) {
solrQuery.append(AND_STR).append(filterQuery);
}
}
}
private String toSolrQuery(AtlasStructType type, FilterCriteria criteria, PipelineContext context) {
return toSolrQuery(type, criteria, context, new StringBuilder());
}
private String toSolrQuery(AtlasStructType type, FilterCriteria criteria, PipelineContext context, StringBuilder sb) {
if (criteria.getCondition() != null && CollectionUtils.isNotEmpty(criteria.getCriterion())) {
StringBuilder nestedExpression = new StringBuilder();
for (FilterCriteria filterCriteria : criteria.getCriterion()) {
String nestedQuery = toSolrQuery(type, filterCriteria, context);
if (StringUtils.isNotEmpty(nestedQuery)) {
if (nestedExpression.length() > 0) {
nestedExpression.append(SPACE_STRING).append(criteria.getCondition()).append(SPACE_STRING);
}
nestedExpression.append(nestedQuery);
}
}
return nestedExpression.length() > 0 ? sb.append(BRACE_OPEN_STR).append(nestedExpression.toString()).append(BRACE_CLOSE_STR).toString() : EMPTY_STRING;
} else {
return toSolrExpression(type, criteria.getAttributeName(), criteria.getOperator(), criteria.getAttributeValue(), context);
}
}
private String toSolrExpression(AtlasStructType type, String attrName, Operator op, String attrVal, PipelineContext context) {
String ret = EMPTY_STRING;
try {
String indexKey = type.getQualifiedAttributeName(attrName);
AtlasType attributeType = type.getAttributeType(attrName);
switch (context.getIndexResultType()) {
case TAG:
context.addTagSearchAttribute(indexKey);
break;
case ENTITY:
context.addEntitySearchAttribute(indexKey);
break;
default:
// Do nothing
}
if (attributeType != null && AtlasTypeUtil.isBuiltInType(attributeType.getTypeName()) && context.getIndexedKeys().contains(indexKey)) {
if (operatorMap.get(op) != null) {
// If there's a chance of multi-value then we need some additional processing here
switch (context.getIndexResultType()) {
case TAG:
context.addProcessedTagAttribute(indexKey);
break;
case ENTITY:
context.addProcessedEntityAttribute(indexKey);
break;
}
ret = String.format(operatorMap.get(op), indexKey, attrVal);
}
}
} catch (AtlasBaseException ex) {
LOG.warn(ex.getMessage());
}
return ret;
}
}
......@@ -128,6 +128,10 @@ public final class EntityGraphRetriever {
return ret;
}
public AtlasEntityHeader toAtlasEntityHeader(String guid) throws AtlasBaseException {
return toAtlasEntityHeader(getEntityVertex(guid));
}
public AtlasEntityHeader toAtlasEntityHeader(AtlasVertex entityVertex) throws AtlasBaseException {
return toAtlasEntityHeader(entityVertex, Collections.<String>emptySet());
}
......@@ -233,14 +237,15 @@ public final class EntityGraphRetriever {
if (CollectionUtils.isNotEmpty(attributes)) {
for (String attrName : attributes) {
String nonQualifiedAttrName = toNonQualifiedName(attrName);
if (ret.hasAttribute(attrName)) {
continue;
}
Object attrValue = getVertexAttribute(entityVertex, entityType.getAttribute(attrName));
Object attrValue = getVertexAttribute(entityVertex, entityType.getAttribute(nonQualifiedAttrName));
if (attrValue != null) {
ret.setAttribute(attrName, attrValue);
ret.setAttribute(nonQualifiedAttrName, attrValue);
}
}
}
......@@ -249,6 +254,17 @@ public final class EntityGraphRetriever {
return ret;
}
private String toNonQualifiedName(String attrName) {
String ret;
if (attrName.contains(".")) {
String[] attributeParts = attrName.split("\\.");
ret = attributeParts[attributeParts.length - 1];
} else {
ret = attrName;
}
return ret;
}
private AtlasEntity mapSystemAttributes(AtlasVertex entityVertex, AtlasEntity entity) {
if (LOG.isDebugEnabled()) {
LOG.debug("Mapping system attributes for type {}", entity.getTypeName());
......
......@@ -18,7 +18,7 @@
package org.apache.atlas.util;
import org.apache.atlas.annotation.AtlasService;
import org.apache.atlas.discovery.SearchPipeline.PipelineContext;
import org.apache.atlas.discovery.SearchContext;
import java.util.HashMap;
import java.util.Map;
......@@ -26,13 +26,13 @@ import java.util.Set;
@AtlasService
public class SearchTracker {
private Map<String, PipelineContext> activeSearches = new HashMap<>();
private Map<String, SearchContext> activeSearches = new HashMap<>();
/**
*
* @param context
*/
public String add(PipelineContext context) {
public String add(SearchContext context) {
String searchId = Thread.currentThread().getName();
activeSearches.put(searchId, context);
......@@ -45,13 +45,13 @@ public class SearchTracker {
* @param searchId
* @return
*/
public PipelineContext terminate(String searchId) {
PipelineContext ret = null;
public SearchContext terminate(String searchId) {
SearchContext ret = null;
if (activeSearches.containsKey(searchId)) {
PipelineContext pipelineToTerminate = activeSearches.remove(searchId);
SearchContext pipelineToTerminate = activeSearches.remove(searchId);
pipelineToTerminate.setForceTerminate(true);
pipelineToTerminate.terminateSearch(true);
ret = pipelineToTerminate;
}
......@@ -59,7 +59,7 @@ public class SearchTracker {
return ret;
}
public PipelineContext remove(String id) {
public SearchContext remove(String id) {
return activeSearches.remove(id);
}
......
......@@ -143,10 +143,7 @@ public class TestModules {
typeDefChangeListenerMultibinder.addBinding().to(DefaultMetadataService.class);
typeDefChangeListenerMultibinder.addBinding().to(GraphBackedSearchIndexer.class).asEagerSingleton();
bind(SearchPipeline.class).asEagerSingleton();
bind(SearchTracker.class).asEagerSingleton();
bind(SolrStep.class).asEagerSingleton();
bind(GremlinStep.class).asEagerSingleton();
bind(AtlasEntityStore.class).to(AtlasEntityStoreV1.class);
bind(AtlasRelationshipStore.class).to(AtlasRelationshipStoreV1.class);
......
......@@ -25,7 +25,7 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.authorize.AtlasActionTypes;
import org.apache.atlas.authorize.AtlasResourceTypes;
import org.apache.atlas.authorize.simple.AtlasAuthorizationUtils;
import org.apache.atlas.discovery.SearchPipeline;
import org.apache.atlas.discovery.SearchContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult;
......@@ -434,7 +434,7 @@ public class AdminResource {
@Path("activeSearches/{id}")
@Produces(Servlets.JSON_MEDIA_TYPE)
public boolean terminateActiveSearch(@PathParam("id") String searchId) {
SearchPipeline.PipelineContext terminate = activeSearches.terminate(searchId);
SearchContext terminate = activeSearches.terminate(searchId);
return null != terminate;
}
......
......@@ -249,7 +249,7 @@ public class DiscoveryREST {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "TagFilters specified without tag name");
}
return atlasDiscoveryService.searchUsingBasicQuery(parameters);
return atlasDiscoveryService.searchWithParameters(parameters);
} finally {
AtlasPerfTracer.log(perf);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment