Commit 1dc7f549 by apoorvnaik Committed by Madhan Neethiraj

ATLAS-2115: Basic search updates to fix performance regression

parent 8348f221
......@@ -344,7 +344,9 @@ public class SearchParameters {
LIKE(new String[]{"like", "LIKE"}),
STARTS_WITH(new String[]{"startsWith", "STARTSWITH", "begins_with", "BEGINS_WITH"}),
ENDS_WITH(new String[]{"endsWith", "ENDSWITH", "ends_with", "BEGINS_WITH"}),
CONTAINS(new String[]{"contains", "CONTAINS"})
CONTAINS(new String[]{"contains", "CONTAINS"}),
CONTAINS_ANY(new String[]{"containsAny", "CONTAINSANY", "contains_any", "CONTAINS_ANY"}),
CONTAINS_ALL(new String[]{"containsAll", "CONTAINSALL", "contains_all", "CONTAINS_ALL"})
;
static final Map<String, Operator> operatorsMap = new HashMap<>();
......
......@@ -30,8 +30,10 @@ import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.util.AtlasGremlinQueryProvider;
import org.apache.atlas.util.SearchPredicateUtil;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -51,11 +53,14 @@ public class ClassificationSearchProcessor extends SearchProcessor {
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("ClassificationSearchProcessor");
private final AtlasIndexQuery indexQuery;
private final AtlasGraphQuery allGraphQuery;
private final AtlasGraphQuery tagGraphQueryWithAttributes;
private final AtlasGraphQuery entityGraphQueryTraitNames;
private final Predicate entityPredicateTraitNames;
private final String gremlinTagFilterQuery;
private final Map<String, Object> gremlinQueryBindings;
public ClassificationSearchProcessor(SearchContext context) {
super(context);
......@@ -88,19 +93,19 @@ public class ClassificationSearchProcessor extends SearchProcessor {
this.indexQuery = graph.indexQuery(Constants.VERTEX_INDEX, indexQueryString);
constructInMemoryPredicate(classificationType, filterCriteria, indexAttributes);
inMemoryPredicate = constructInMemoryPredicate(classificationType, filterCriteria, indexAttributes);
} else {
indexQuery = null;
}
AtlasGraphQuery query = graph.query().in(Constants.TYPE_NAME_PROPERTY_KEY, typeAndSubTypes);
allGraphQuery = toGraphFilterQuery(classificationType, filterCriteria, allAttributes, query);
if (context.getSearchParameters().getTagFilters() != null) {
// Now filter on the tag attributes
AtlasGremlinQueryProvider queryProvider = AtlasGremlinQueryProvider.INSTANCE;
tagGraphQueryWithAttributes = toGraphFilterQuery(classificationType, filterCriteria, allAttributes, graph.query().in(Constants.TYPE_NAME_PROPERTY_KEY, typeAndSubTypes));
entityGraphQueryTraitNames = null;
entityPredicateTraitNames = null;
gremlinQueryBindings = new HashMap<>();
StringBuilder gremlinQuery = new StringBuilder();
......@@ -122,6 +127,11 @@ public class ClassificationSearchProcessor extends SearchProcessor {
LOG.debug("gremlinTagFilterQuery={}", gremlinTagFilterQuery);
}
} else {
tagGraphQueryWithAttributes = null;
entityGraphQueryTraitNames = graph.query().in(Constants.TRAIT_NAMES_PROPERTY_KEY, typeAndSubTypes);
entityPredicateTraitNames = SearchPredicateUtil.getContainsAnyPredicateGenerator()
.generatePredicate(Constants.TRAIT_NAMES_PROPERTY_KEY, classificationType.getTypeAndAllSubTypes(), List.class);
gremlinTagFilterQuery = null;
gremlinQueryBindings = null;
}
......@@ -176,38 +186,56 @@ public class ClassificationSearchProcessor extends SearchProcessor {
}
getVerticesFromIndexQueryResult(queryResult, classificationVertices);
// Do in-memory filtering before the graph query
CollectionUtils.filter(classificationVertices, inMemoryPredicate);
} else {
Iterator<AtlasVertex> queryResult = allGraphQuery.vertices(qryOffset, limit).iterator();
if (context.getSearchParameters().getTagFilters() == null) {
// We can use single graph query to determine in this case
Iterator<AtlasVertex> queryResult = entityGraphQueryTraitNames.vertices(qryOffset, limit).iterator();
if (!queryResult.hasNext()) { // no more results - end of search
break;
}
if (!queryResult.hasNext()) { // no more results - end of search
break;
}
getVertices(queryResult, classificationVertices);
}
getVertices(queryResult, entityVertices);
} else {
Iterator<AtlasVertex> queryResult = tagGraphQueryWithAttributes.vertices(qryOffset, limit).iterator();
// Do in-memory filtering before the graph query
CollectionUtils.filter(classificationVertices, inMemoryPredicate);
if (!queryResult.hasNext()) { // no more results - end of search
break;
}
for (AtlasVertex classificationVertex : classificationVertices) {
Iterable<AtlasEdge> edges = classificationVertex.getEdges(AtlasEdgeDirection.IN);
getVertices(queryResult, classificationVertices);
for (AtlasEdge edge : edges) {
AtlasVertex entityVertex = edge.getOutVertex();
// Do in-memory filtering before the graph query
CollectionUtils.filter(classificationVertices, inMemoryPredicate);
}
}
if (activeOnly && AtlasGraphUtilsV1.getState(entityVertex) != AtlasEntity.Status.ACTIVE) {
continue;
}
// Since tag filters are present, we need to collect the entity vertices after filtering the classification
// vertex results (as these might be lower in number)
if (CollectionUtils.isNotEmpty(classificationVertices)) {
for (AtlasVertex classificationVertex : classificationVertices) {
Iterable<AtlasEdge> edges = classificationVertex.getEdges(AtlasEdgeDirection.IN);
String guid = AtlasGraphUtilsV1.getIdFromVertex(entityVertex);
for (AtlasEdge edge : edges) {
AtlasVertex entityVertex = edge.getOutVertex();
if (processedGuids.contains(guid)) {
continue;
}
if (activeOnly && AtlasGraphUtilsV1.getState(entityVertex) != AtlasEntity.Status.ACTIVE) {
continue;
}
String guid = AtlasGraphUtilsV1.getIdFromVertex(entityVertex);
entityVertices.add(entityVertex);
if (processedGuids.contains(guid)) {
continue;
}
processedGuids.add(guid);
entityVertices.add(entityVertex);
processedGuids.add(guid);
}
}
}
......@@ -254,6 +282,8 @@ public class ClassificationSearchProcessor extends SearchProcessor {
LOG.warn(e.getMessage(), e);
}
}
} else if (entityPredicateTraitNames != null) {
CollectionUtils.filter(entityVertices, entityPredicateTraitNames);
}
super.filter(entityVertices);
......
......@@ -24,12 +24,16 @@ import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.util.SearchPredicateUtil;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.Predicate;
import org.apache.commons.collections.PredicateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
......@@ -41,7 +45,8 @@ public class EntitySearchProcessor extends SearchProcessor {
private final AtlasIndexQuery indexQuery;
private final AtlasGraphQuery graphQuery;
private final AtlasGraphQuery filterGraphQuery;
private Predicate graphQueryPredicate;
private Predicate filterGraphQueryPredicate;
public EntitySearchProcessor(SearchContext context) {
super(context);
......@@ -54,10 +59,18 @@ public class EntitySearchProcessor extends SearchProcessor {
final Set<String> graphAttributes = new HashSet<>();
final Set<String> allAttributes = new HashSet<>();
final AtlasClassificationType classificationType = context.getClassificationType();
final boolean filterClassification = classificationType != null && !context.needClassificationProcessor();
final AtlasClassificationType classificationType = context.getClassificationType();
final boolean filterClassification = classificationType != null && !context.needClassificationProcessor();
final Set<String> classificationTypeAndSubTypes = classificationType != null ? classificationType.getTypeAndAllSubTypes() : Collections.EMPTY_SET;
final Predicate typeNamePredicate = SearchPredicateUtil.getINPredicateGenerator()
.generatePredicate(Constants.TYPE_NAME_PROPERTY_KEY, typeAndSubTypes, String.class);
final Predicate traitPredicate = SearchPredicateUtil.getContainsAnyPredicateGenerator()
.generatePredicate(Constants.TRAIT_NAMES_PROPERTY_KEY, classificationTypeAndSubTypes, List.class);
final Predicate activePredicate = SearchPredicateUtil.getEQPredicateGenerator()
.generatePredicate(Constants.STATE_PROPERTY_KEY, "ACTIVE", String.class);
processSearchAttributes(entityType, filterCriteria, indexAttributes, graphAttributes, allAttributes);
final boolean typeSearchByIndex = !filterClassification && typeAndSubTypesQryStr.length() <= MAX_QUERY_STR_LENGTH_TYPES;
......@@ -72,7 +85,7 @@ public class EntitySearchProcessor extends SearchProcessor {
if (attrSearchByIndex) {
constructFilterQuery(indexQuery, entityType, filterCriteria, indexAttributes);
constructInMemoryPredicate(entityType, filterCriteria, indexAttributes);
inMemoryPredicate = constructInMemoryPredicate(entityType, filterCriteria, indexAttributes);
} else {
graphAttributes.addAll(indexAttributes);
}
......@@ -97,31 +110,71 @@ public class EntitySearchProcessor extends SearchProcessor {
if (!typeSearchByIndex) {
query.in(Constants.TYPE_NAME_PROPERTY_KEY, typeAndSubTypes);
// Construct a parallel in-memory predicate
if (graphQueryPredicate != null) {
graphQueryPredicate = PredicateUtils.andPredicate(graphQueryPredicate, typeNamePredicate);
} else {
graphQueryPredicate = typeNamePredicate;
}
}
// If we need to filter on the trait names then we need to build the query and equivalent in-memory predicate
if (filterClassification) {
query.in(Constants.TRAIT_NAMES_PROPERTY_KEY, classificationType.getTypeAndAllSubTypes());
query.in(Constants.TRAIT_NAMES_PROPERTY_KEY, classificationTypeAndSubTypes);
// Construct a parallel in-memory predicate
if (graphQueryPredicate != null) {
graphQueryPredicate = PredicateUtils.andPredicate(graphQueryPredicate, traitPredicate);
} else {
graphQueryPredicate = traitPredicate;
}
}
graphQuery = toGraphFilterQuery(entityType, filterCriteria, graphAttributes, query);
// Prepare in-memory predicate for attribute filtering
Predicate attributePredicate = constructInMemoryPredicate(entityType, filterCriteria, graphAttributes);
if (attributePredicate != null) {
if (graphQueryPredicate != null) {
graphQueryPredicate = PredicateUtils.andPredicate(graphQueryPredicate, attributePredicate);
} else {
graphQueryPredicate = attributePredicate;
}
}
// Filter condition for the STATUS
if (context.getSearchParameters().getExcludeDeletedEntities() && this.indexQuery == null) {
graphQuery.has(Constants.STATE_PROPERTY_KEY, "ACTIVE");
if (graphQueryPredicate != null) {
graphQueryPredicate = PredicateUtils.andPredicate(graphQueryPredicate, activePredicate);
} else {
graphQueryPredicate = activePredicate;
}
}
} else {
graphQuery = null;
graphQueryPredicate = null;
}
AtlasGraphQuery query = context.getGraph().query().in(Constants.TYPE_NAME_PROPERTY_KEY, typeAndSubTypes);
if (filterClassification) {
query.in(Constants.TRAIT_NAMES_PROPERTY_KEY, classificationType.getTypeAndAllSubTypes());
// Prepare the graph query and in-memory filter for the filtering phase
filterGraphQueryPredicate = typeNamePredicate;
Predicate attributesPredicate = constructInMemoryPredicate(entityType, filterCriteria, allAttributes);
if (attributesPredicate != null) {
filterGraphQueryPredicate = PredicateUtils.andPredicate(filterGraphQueryPredicate, attributesPredicate);
}
filterGraphQuery = toGraphFilterQuery(entityType, filterCriteria, allAttributes, query);
if (filterClassification) {
filterGraphQueryPredicate = PredicateUtils.andPredicate(filterGraphQueryPredicate, traitPredicate);
}
// Filter condition for the STATUS
if (context.getSearchParameters().getExcludeDeletedEntities()) {
filterGraphQuery.has(Constants.STATE_PROPERTY_KEY, "ACTIVE");
filterGraphQueryPredicate = PredicateUtils.andPredicate(filterGraphQueryPredicate, activePredicate);
}
}
......@@ -172,18 +225,8 @@ public class EntitySearchProcessor extends SearchProcessor {
// Do in-memory filtering before the graph query
CollectionUtils.filter(entityVertices, inMemoryPredicate);
if (graphQuery != null) {
Set<String> guids = getGuids(entityVertices);
entityVertices.clear();
if (CollectionUtils.isNotEmpty(guids)) {
AtlasGraphQuery guidQuery = context.getGraph().query().in(Constants.GUID_PROPERTY_KEY, guids);
guidQuery.addConditionsFrom(graphQuery);
getVertices(guidQuery.vertices().iterator(), entityVertices);
}
if (graphQueryPredicate != null) {
CollectionUtils.filter(entityVertices, graphQueryPredicate);
}
} else {
Iterator<AtlasVertex> queryResult = graphQuery.vertices(qryOffset, limit).iterator();
......@@ -216,16 +259,11 @@ public class EntitySearchProcessor extends SearchProcessor {
LOG.debug("==> EntitySearchProcessor.filter({})", entityVertices.size());
}
Set<String> guids = getGuids(entityVertices);
entityVertices.clear();
if (CollectionUtils.isNotEmpty(guids)) {
AtlasGraphQuery query = context.getGraph().query().in(Constants.GUID_PROPERTY_KEY, guids);
query.addConditionsFrom(filterGraphQuery);
getVertices(query.vertices().iterator(), entityVertices);
// Since we already have the entity vertices, a in-memory filter will be faster than fetching the same
// vertices again with the required filtering
if (filterGraphQueryPredicate != null) {
LOG.debug("Filtering in-memory");
CollectionUtils.filter(entityVertices, filterGraphQueryPredicate);
}
super.filter(entityVertices);
......
......@@ -23,18 +23,20 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria;
import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria.Condition;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasEnumType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.util.AtlasGremlinQueryProvider;
import org.apache.atlas.util.SearchPredicateUtil.VertexAttributePredicateGenerator;
import org.apache.atlas.util.SearchPredicateUtil.*;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.Predicate;
import org.apache.commons.collections.PredicateUtils;
......@@ -44,15 +46,7 @@ import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.regex.Pattern;
import static org.apache.atlas.util.SearchPredicateUtil.*;
......@@ -269,14 +263,16 @@ public abstract class SearchProcessor {
}
}
protected void constructInMemoryPredicate(AtlasStructType type, FilterCriteria filterCriteria, Set<String> indexAttributes) {
protected Predicate constructInMemoryPredicate(AtlasStructType type, FilterCriteria filterCriteria, Set<String> indexAttributes) {
Predicate ret = null;
if (filterCriteria != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing Filters");
}
inMemoryPredicate = toInMemoryPredicate(type, filterCriteria, indexAttributes);
ret = toInMemoryPredicate(type, filterCriteria, indexAttributes);
}
return ret;
}
protected void constructGremlinFilterQuery(StringBuilder gremlinQuery, Map<String, Object> queryBindings, AtlasStructType structType, FilterCriteria filterCriteria) {
......@@ -411,59 +407,61 @@ public abstract class SearchProcessor {
VertexAttributePredicateGenerator predicate = OPERATOR_PREDICATE_MAP.get(op);
if (attribute != null && predicate != null) {
final AtlasType attrType = attribute.getAttributeType();
final String attributeType = attrType.getTypeName().toLowerCase();
final AtlasType attrType = attribute.getAttributeType();
final Class attrClass;
final Object attrValue;
switch (attributeType) {
case "string":
switch (attrType.getTypeName()) {
case AtlasBaseTypeDef.ATLAS_TYPE_STRING:
attrClass = String.class;
attrValue = attrVal;
break;
case "short":
case AtlasBaseTypeDef.ATLAS_TYPE_SHORT:
attrClass = Short.class;
attrValue = Short.parseShort(attrVal);
break;
case "int":
case AtlasBaseTypeDef.ATLAS_TYPE_INT:
attrClass = Integer.class;
attrValue = Integer.parseInt(attrVal);
break;
case "biginteger":
case AtlasBaseTypeDef.ATLAS_TYPE_BIGINTEGER:
attrClass = BigInteger.class;
attrValue = new BigInteger(attrVal);
break;
case "boolean":
case AtlasBaseTypeDef.ATLAS_TYPE_BOOLEAN:
attrClass = Boolean.class;
attrValue = Boolean.parseBoolean(attrVal);
break;
case "byte":
case AtlasBaseTypeDef.ATLAS_TYPE_BYTE:
attrClass = Byte.class;
attrValue = Byte.parseByte(attrVal);
break;
case "long":
case "date":
case AtlasBaseTypeDef.ATLAS_TYPE_LONG:
case AtlasBaseTypeDef.ATLAS_TYPE_DATE:
attrClass = Long.class;
attrValue = Long.parseLong(attrVal);
break;
case "float":
case AtlasBaseTypeDef.ATLAS_TYPE_FLOAT:
attrClass = Float.class;
attrValue = Float.parseFloat(attrVal);
break;
case "double":
case AtlasBaseTypeDef.ATLAS_TYPE_DOUBLE:
attrClass = Double.class;
attrValue = Double.parseDouble(attrVal);
break;
case "bigdecimal":
case AtlasBaseTypeDef.ATLAS_TYPE_BIGDECIMAL:
attrClass = BigDecimal.class;
attrValue = new BigDecimal(attrVal);
break;
default:
if (attrType instanceof AtlasEnumType) {
attrClass = String.class;
} else if (attrType instanceof AtlasArrayType) {
attrClass = List.class;
} else {
attrClass = Object.class;
}
attrValue = attrVal;
break;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment