Commit fae00825 by apoorvnaik Committed by Madhan Neethiraj

ATLAS-2044: In-memory filtering for correctness after index query

parent fa11d13a
...@@ -87,6 +87,8 @@ public class ClassificationSearchProcessor extends SearchProcessor { ...@@ -87,6 +87,8 @@ public class ClassificationSearchProcessor extends SearchProcessor {
indexQueryString = STRAY_ELIPSIS_PATTERN.matcher(indexQueryString).replaceAll(""); indexQueryString = STRAY_ELIPSIS_PATTERN.matcher(indexQueryString).replaceAll("");
this.indexQuery = graph.indexQuery(Constants.VERTEX_INDEX, indexQueryString); this.indexQuery = graph.indexQuery(Constants.VERTEX_INDEX, indexQueryString);
constructInMemoryPredicate(classificationType, filterCriteria, indexAttributes);
} else { } else {
indexQuery = null; indexQuery = null;
} }
...@@ -184,6 +186,9 @@ public class ClassificationSearchProcessor extends SearchProcessor { ...@@ -184,6 +186,9 @@ public class ClassificationSearchProcessor extends SearchProcessor {
getVertices(queryResult, classificationVertices); getVertices(queryResult, classificationVertices);
} }
// Do in-memory filtering before the graph query
CollectionUtils.filter(classificationVertices, inMemoryPredicate);
for (AtlasVertex classificationVertex : classificationVertices) { for (AtlasVertex classificationVertex : classificationVertices) {
Iterable<AtlasEdge> edges = classificationVertex.getEdges(AtlasEdgeDirection.IN); Iterable<AtlasEdge> edges = classificationVertex.getEdges(AtlasEdgeDirection.IN);
...@@ -208,19 +213,7 @@ public class ClassificationSearchProcessor extends SearchProcessor { ...@@ -208,19 +213,7 @@ public class ClassificationSearchProcessor extends SearchProcessor {
super.filter(entityVertices); super.filter(entityVertices);
for (AtlasVertex entityVertex : entityVertices) { resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, entityVertices);
resultIdx++;
if (resultIdx <= startIdx) {
continue;
}
ret.add(entityVertex);
if (ret.size() == limit) {
break;
}
}
} }
} finally { } finally {
AtlasPerfTracer.log(perf); AtlasPerfTracer.log(perf);
......
...@@ -71,6 +71,8 @@ public class EntitySearchProcessor extends SearchProcessor { ...@@ -71,6 +71,8 @@ public class EntitySearchProcessor extends SearchProcessor {
if (attrSearchByIndex) { if (attrSearchByIndex) {
constructFilterQuery(indexQuery, entityType, filterCriteria, indexAttributes); constructFilterQuery(indexQuery, entityType, filterCriteria, indexAttributes);
constructInMemoryPredicate(entityType, filterCriteria, indexAttributes);
} else { } else {
graphAttributes.addAll(indexAttributes); graphAttributes.addAll(indexAttributes);
} }
...@@ -165,11 +167,10 @@ public class EntitySearchProcessor extends SearchProcessor { ...@@ -165,11 +167,10 @@ public class EntitySearchProcessor extends SearchProcessor {
break; break;
} }
while (idxQueryResult.hasNext()) { getVerticesFromIndexQueryResult(idxQueryResult, entityVertices);
AtlasVertex vertex = idxQueryResult.next().getVertex();
entityVertices.add(vertex); // Do in-memory filtering before the graph query
} CollectionUtils.filter(entityVertices, inMemoryPredicate);
if (graphQuery != null) { if (graphQuery != null) {
AtlasGraphQuery guidQuery = context.getGraph().query().in(Constants.GUID_PROPERTY_KEY, getGuids(entityVertices)); AtlasGraphQuery guidQuery = context.getGraph().query().in(Constants.GUID_PROPERTY_KEY, getGuids(entityVertices));
...@@ -191,19 +192,7 @@ public class EntitySearchProcessor extends SearchProcessor { ...@@ -191,19 +192,7 @@ public class EntitySearchProcessor extends SearchProcessor {
super.filter(entityVertices); super.filter(entityVertices);
for (AtlasVertex entityVertex : entityVertices) { resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, entityVertices);
resultIdx++;
if (resultIdx <= startIdx) {
continue;
}
ret.add(entityVertex);
if (ret.size() == limit) {
break;
}
}
} }
} finally { } finally {
AtlasPerfTracer.log(perf); AtlasPerfTracer.log(perf);
......
...@@ -141,19 +141,7 @@ public class FullTextSearchProcessor extends SearchProcessor { ...@@ -141,19 +141,7 @@ public class FullTextSearchProcessor extends SearchProcessor {
super.filter(entityVertices); super.filter(entityVertices);
for (AtlasVertex entityVertex : entityVertices) { resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, entityVertices);
resultIdx++;
if (resultIdx <= startIdx) {
continue;
}
ret.add(entityVertex);
if (ret.size() == limit) {
break;
}
}
} }
} finally { } finally {
AtlasPerfTracer.log(perf); AtlasPerfTracer.log(perf);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment