Commit f74e43c2 by Madhan Neethiraj

ATLAS-1961: Basic search improvement in use of Solr index for attribute filtering (# 3)

parent 5527afb0
......@@ -58,7 +58,7 @@ public class ClassificationSearchProcessor extends SearchProcessor {
if (useSolrSearch) {
StringBuilder solrQuery = new StringBuilder();
constructTypeTestQuery(solrQuery, typeAndSubTypes);
constructTypeTestQuery(solrQuery, classificationType, typeAndSubTypes);
constructFilterQuery(solrQuery, classificationType, filterCriteria, solrAttributes);
String solrQueryString = STRAY_AND_PATTERN.matcher(solrQuery).replaceAll(")");
......@@ -95,20 +95,26 @@ public class ClassificationSearchProcessor extends SearchProcessor {
}
try {
int qryOffset = (nextProcessor == null) ? context.getSearchParameters().getOffset() : 0;
int limit = context.getSearchParameters().getLimit();
int resultIdx = qryOffset;
Set<String> processedGuids = new HashSet<>();
final int startIdx = context.getSearchParameters().getOffset();
final int limit = context.getSearchParameters().getLimit();
int qryOffset = nextProcessor == null ? startIdx : 0;
int resultIdx = qryOffset;
final Set<String> processedGuids = new HashSet<>();
final List<AtlasVertex> entityVertices = new ArrayList<>();
final List<AtlasVertex> classificationVertices = new ArrayList<>();
for (; ret.size() < limit; qryOffset += limit) {
entityVertices.clear();
classificationVertices.clear();
while (ret.size() < limit) {
if (context.terminateSearch()) {
LOG.warn("query terminated: {}", context.getSearchParameters());
break;
}
List<AtlasVertex> classificationVertices;
if (indexQuery != null) {
Iterator<AtlasIndexQuery.Result> queryResult = indexQuery.vertices(qryOffset, limit);
......@@ -116,7 +122,7 @@ public class ClassificationSearchProcessor extends SearchProcessor {
break;
}
classificationVertices = getVerticesFromIndexQueryResult(queryResult);
getVerticesFromIndexQueryResult(queryResult, classificationVertices);
} else {
Iterator<AtlasVertex> queryResult = allGraphQuery.vertices(qryOffset, limit).iterator();
......@@ -124,13 +130,9 @@ public class ClassificationSearchProcessor extends SearchProcessor {
break;
}
classificationVertices = getVertices(queryResult);
getVertices(queryResult, classificationVertices);
}
qryOffset += limit;
List<AtlasVertex> entityVertices = new ArrayList<>();
for (AtlasVertex classificationVertex : classificationVertices) {
Iterable<AtlasEdge> edges = classificationVertex.getEdges(AtlasEdgeDirection.IN);
......@@ -148,12 +150,12 @@ public class ClassificationSearchProcessor extends SearchProcessor {
}
}
entityVertices = super.filter(entityVertices);
super.filter(entityVertices);
for (AtlasVertex entityVertex : entityVertices) {
resultIdx++;
if (resultIdx < context.getSearchParameters().getOffset()) {
if (resultIdx <= startIdx) {
continue;
}
......@@ -176,7 +178,7 @@ public class ClassificationSearchProcessor extends SearchProcessor {
}
@Override
public List<AtlasVertex> filter(List<AtlasVertex> entityVertices) {
public void filter(List<AtlasVertex> entityVertices) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> ClassificationSearchProcessor.filter({})", entityVertices.size());
}
......@@ -185,14 +187,13 @@ public class ClassificationSearchProcessor extends SearchProcessor {
query.addConditionsFrom(filterGraphQuery);
List<AtlasVertex> ret = getVertices(query.vertices().iterator());
entityVertices.clear();
getVertices(query.vertices().iterator(), entityVertices);
ret = super.filter(ret);
super.filter(entityVertices);
if (LOG.isDebugEnabled()) {
LOG.debug("<== ClassificationSearchProcessor.filter({}): ret.size()={}", entityVertices.size(), ret.size());
LOG.debug("<== ClassificationSearchProcessor.filter(): ret.size()={}", entityVertices.size());
}
return ret;
}
}
......@@ -20,6 +20,7 @@ package org.apache.atlas.discovery;
import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.*;
import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.utils.AtlasPerfTracer;
......@@ -59,7 +60,7 @@ public class EntitySearchProcessor extends SearchProcessor {
StringBuilder solrQuery = new StringBuilder();
if (typeSearchBySolr) {
constructTypeTestQuery(solrQuery, typeAndSubTypes);
constructTypeTestQuery(solrQuery, entityType, typeAndSubTypes);
}
if (attrSearchBySolr) {
......@@ -127,34 +128,48 @@ public class EntitySearchProcessor extends SearchProcessor {
}
try {
int qryOffset = (nextProcessor == null && (graphQuery == null || indexQuery == null)) ? context.getSearchParameters().getOffset() : 0;
int limit = context.getSearchParameters().getLimit();
int resultIdx = qryOffset;
final int startIdx = context.getSearchParameters().getOffset();
final int limit = context.getSearchParameters().getLimit();
int qryOffset = (nextProcessor == null && (graphQuery == null || indexQuery == null)) ? startIdx : 0;
int resultIdx = qryOffset;
final List<AtlasVertex> entityVertices = new ArrayList<>();
for (; ret.size() < limit; qryOffset += limit) {
entityVertices.clear();
while (ret.size() < limit) {
if (context.terminateSearch()) {
LOG.warn("query terminated: {}", context.getSearchParameters());
break;
}
List<AtlasVertex> vertices;
if (indexQuery != null) {
Iterator<AtlasIndexQuery.Result> queryResult = indexQuery.vertices(qryOffset, limit);
Iterator<AtlasIndexQuery.Result> idxQueryResult = indexQuery.vertices(qryOffset, limit);
if (!queryResult.hasNext()) { // no more results from solr - end of search
if (!idxQueryResult.hasNext()) { // no more results from solr - end of search
break;
}
vertices = getVerticesFromIndexQueryResult(queryResult);
while (idxQueryResult.hasNext()) {
AtlasVertex vertex = idxQueryResult.next().getVertex();
// skip non-entity vertices
if (!AtlasGraphUtilsV1.isEntityVertex(vertex)) {
LOG.warn("EntitySearchProcessor.execute(): ignoring non-entity vertex (id={})", vertex.getId()); // might cause duplicate entries in result
continue;
}
entityVertices.add(vertex);
}
if (graphQuery != null) {
AtlasGraphQuery guidQuery = context.getGraph().query().in(Constants.GUID_PROPERTY_KEY, getGuids(vertices));
AtlasGraphQuery guidQuery = context.getGraph().query().in(Constants.GUID_PROPERTY_KEY, getGuids(entityVertices));
guidQuery.addConditionsFrom(graphQuery);
vertices = getVertices(guidQuery.vertices().iterator());
getVertices(guidQuery.vertices().iterator(), entityVertices);
}
} else {
Iterator<AtlasVertex> queryResult = graphQuery.vertices(qryOffset, limit).iterator();
......@@ -163,21 +178,19 @@ public class EntitySearchProcessor extends SearchProcessor {
break;
}
vertices = getVertices(queryResult);
getVertices(queryResult, entityVertices);
}
qryOffset += limit;
vertices = super.filter(vertices);
super.filter(entityVertices);
for (AtlasVertex vertex : vertices) {
for (AtlasVertex entityVertex : entityVertices) {
resultIdx++;
if (resultIdx < context.getSearchParameters().getOffset()) {
if (resultIdx <= startIdx) {
continue;
}
ret.add(vertex);
ret.add(entityVertex);
if (ret.size() == limit) {
break;
......@@ -196,7 +209,7 @@ public class EntitySearchProcessor extends SearchProcessor {
}
@Override
public List<AtlasVertex> filter(List<AtlasVertex> entityVertices) {
public void filter(List<AtlasVertex> entityVertices) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> EntitySearchProcessor.filter({})", entityVertices.size());
}
......@@ -205,14 +218,13 @@ public class EntitySearchProcessor extends SearchProcessor {
query.addConditionsFrom(filterGraphQuery);
List<AtlasVertex> ret = getVertices(query.vertices().iterator());
entityVertices.clear();
getVertices(query.vertices().iterator(), entityVertices);
ret = super.filter(ret);
super.filter(entityVertices);
if (LOG.isDebugEnabled()) {
LOG.debug("<== EntitySearchProcessor.filter({}): ret.size()={}", entityVertices.size(), ret.size());
LOG.debug("<== EntitySearchProcessor.filter(): ret.size()={}", entityVertices.size());
}
return ret;
}
}
......@@ -19,8 +19,10 @@ package org.apache.atlas.discovery;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
......@@ -68,7 +70,7 @@ public class FullTextSearchProcessor extends SearchProcessor {
queryString.append(AND_STR).append("(").append(StringUtils.join(typeAndSubTypeNames, SPACE_STRING)).append(")");
} else {
LOG.warn("'{}' has too many subtypes ({}) to include in index-query; might cause poor performance",
context.getEntityType().getTypeName(), typeAndSubTypeNames.size());
context.getClassificationType().getTypeName(), typeAndSubTypeNames.size());
}
}
......@@ -92,11 +94,16 @@ public class FullTextSearchProcessor extends SearchProcessor {
}
try {
int qryOffset = nextProcessor == null ? context.getSearchParameters().getOffset() : 0;
int limit = context.getSearchParameters().getLimit();
int resultIdx = qryOffset;
final int startIdx = context.getSearchParameters().getOffset();
final int limit = context.getSearchParameters().getLimit();
int qryOffset = nextProcessor == null ? startIdx : 0;
int resultIdx = qryOffset;
final List<AtlasVertex> entityVertices = new ArrayList<>();
for (; ret.size() < limit; qryOffset += limit) {
entityVertices.clear();
while (ret.size() < limit) {
if (context.terminateSearch()) {
LOG.warn("query terminated: {}", context.getSearchParameters());
......@@ -109,20 +116,29 @@ public class FullTextSearchProcessor extends SearchProcessor {
break;
}
qryOffset += limit;
while (idxQueryResult.hasNext()) {
AtlasVertex vertex = idxQueryResult.next().getVertex();
List<AtlasVertex> vertices = getVerticesFromIndexQueryResult(idxQueryResult);
// skip non-entity vertices
if (!AtlasGraphUtilsV1.isEntityVertex(vertex)) {
LOG.warn("FullTextSearchProcessor.execute(): ignoring non-entity vertex (id={})", vertex.getId()); // might cause duplicate entries in result
continue;
}
entityVertices.add(vertex);
}
vertices = super.filter(vertices);
super.filter(entityVertices);
for (AtlasVertex vertex : vertices) {
for (AtlasVertex entityVertex : entityVertices) {
resultIdx++;
if (resultIdx < context.getSearchParameters().getOffset()) {
if (resultIdx <= startIdx) {
continue;
}
ret.add(vertex);
ret.add(entityVertex);
if (ret.size() == limit) {
break;
......
......@@ -50,6 +50,7 @@ public abstract class SearchProcessor {
public static final String SPACE_STRING = " ";
public static final String BRACE_OPEN_STR = "( ";
public static final String BRACE_CLOSE_STR = " )";
public static final char DOUBLE_QUOTE = '"';
private static final Map<SearchParameters.Operator, String> OPERATOR_MAP = new HashMap<>();
private static final char[] OFFENDING_CHARS = {'@', '/', ' '}; // This can grow as we discover corner cases
......@@ -87,8 +88,10 @@ public abstract class SearchProcessor {
public abstract List<AtlasVertex> execute();
public List<AtlasVertex> filter(List<AtlasVertex> entityVertices) {
return nextProcessor == null || CollectionUtils.isEmpty(entityVertices) ? entityVertices : nextProcessor.filter(entityVertices);
public void filter(List<AtlasVertex> entityVertices) {
if (nextProcessor != null && CollectionUtils.isNotEmpty(entityVertices)) {
nextProcessor.filter(entityVertices);
}
}
......@@ -178,12 +181,26 @@ public abstract class SearchProcessor {
return ret;
}
protected void constructTypeTestQuery(StringBuilder solrQuery, Set<String> typeAndAllSubTypes) {
protected void constructTypeTestQuery(StringBuilder solrQuery, AtlasStructType type, Set<String> typeAndAllSubTypes) {
String typeAndSubtypesString = StringUtils.join(typeAndAllSubTypes, SPACE_STRING);
solrQuery.append("v.\"").append(Constants.TYPE_NAME_PROPERTY_KEY).append("\": (")
.append(typeAndSubtypesString)
.append(")");
if (CollectionUtils.isNotEmpty(typeAndAllSubTypes)) {
if (solrQuery.length() > 0) {
solrQuery.append(AND_STR);
}
solrQuery.append("v.\"").append(Constants.TYPE_NAME_PROPERTY_KEY).append("\": (")
.append(typeAndSubtypesString)
.append(")");
}
if (type instanceof AtlasEntityType && context.getSearchParameters().getExcludeDeletedEntities()) {
if (solrQuery.length() > 0) {
solrQuery.append(AND_STR);
}
solrQuery.append("v.\"").append(Constants.STATE_PROPERTY_KEY).append("\":ACTIVE");
}
}
protected void constructFilterQuery(StringBuilder solrQuery, AtlasStructType type, FilterCriteria filterCriteria, Set<String> solrAttributes) {
......@@ -200,14 +217,6 @@ public abstract class SearchProcessor {
solrQuery.append(filterQuery);
}
}
if (type instanceof AtlasEntityType && context.getSearchParameters().getExcludeDeletedEntities()) {
if (solrQuery.length() > 0) {
solrQuery.append(AND_STR);
}
solrQuery.append("v.\"").append(Constants.STATE_PROPERTY_KEY).append("\":ACTIVE");
}
}
private String toSolrQuery(AtlasStructType type, FilterCriteria criteria, Set<String> solrAttributes, int level) {
......@@ -246,15 +255,10 @@ public abstract class SearchProcessor {
String ret = EMPTY_STRING;
try {
String qualifiedName = type.getQualifiedAttributeName(attrName);
if (OPERATOR_MAP.get(op) != null) {
if (hasOffendingChars(attrVal)) {
// FIXME: if attrVal has offending chars & op is contains, endsWith, startsWith, solr doesn't like it and results are skewed
ret = String.format(OPERATOR_MAP.get(op), qualifiedName, "\"" + attrVal + "\"");
} else {
ret = String.format(OPERATOR_MAP.get(op), qualifiedName, attrVal);
}
String qualifiedName = type.getQualifiedAttributeName(attrName);
ret = String.format(OPERATOR_MAP.get(op), qualifiedName, escapeIndexQueryValue(attrVal));
}
} catch (AtlasBaseException ex) {
LOG.warn(ex.getMessage());
......@@ -348,32 +352,28 @@ public abstract class SearchProcessor {
private String getLikeRegex(String attributeValue) { return ".*" + attributeValue + ".*"; }
protected List<AtlasVertex> getVerticesFromIndexQueryResult(Iterator<AtlasIndexQuery.Result> idxQueryResult) {
List<AtlasVertex> ret = new ArrayList<>();
protected List<AtlasVertex> getVerticesFromIndexQueryResult(Iterator<AtlasIndexQuery.Result> idxQueryResult, List<AtlasVertex> vertices) {
if (idxQueryResult != null) {
while (idxQueryResult.hasNext()) {
AtlasVertex vertex = idxQueryResult.next().getVertex();
ret.add(vertex);
vertices.add(vertex);
}
}
return ret;
return vertices;
}
protected List<AtlasVertex> getVertices(Iterator<AtlasVertex> vertices) {
List<AtlasVertex> ret = new ArrayList<>();
if (vertices != null) {
while (vertices.hasNext()) {
AtlasVertex vertex = vertices.next();
protected List<AtlasVertex> getVertices(Iterator<AtlasVertex> iterator, List<AtlasVertex> vertices) {
if (iterator != null) {
while (iterator.hasNext()) {
AtlasVertex vertex = iterator.next();
ret.add(vertex);
vertices.add(vertex);
}
}
return ret;
return vertices;
}
protected Set<String> getGuids(List<AtlasVertex> vertices) {
......@@ -402,7 +402,24 @@ public abstract class SearchProcessor {
return defaultValue;
}
private boolean hasOffendingChars(String str) {
return StringUtils.containsAny(str, OFFENDING_CHARS);
private String escapeIndexQueryValue(String value) {
String ret = value;
if (StringUtils.containsAny(value, OFFENDING_CHARS)) {
boolean isQuoteAtStart = value.charAt(0) == DOUBLE_QUOTE;
boolean isQuoteAtEnd = value.charAt(value.length() - 1) == DOUBLE_QUOTE;
if (!isQuoteAtStart) {
if (!isQuoteAtEnd) {
ret = DOUBLE_QUOTE + value + DOUBLE_QUOTE;
} else {
ret = DOUBLE_QUOTE + value;
}
} else if (!isQuoteAtEnd) {
ret = value + DOUBLE_QUOTE;
}
}
return ret;
}
}
......@@ -103,6 +103,10 @@ public class AtlasGraphUtilsV1 {
}
}
public static boolean isEntityVertex(AtlasVertex vertex) {
return StringUtils.isNotEmpty(getIdFromVertex(vertex)) && StringUtils.isNotEmpty(getTypeName(vertex));
}
public static boolean isReference(AtlasType type) {
return isReference(type.getTypeCategory());
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment