Commit f1bfe564 by Madhan Neethiraj

ATLAS-2723: search performance improvement: create composite index for…

ATLAS-2723: search performance improvement: create composite index for __traitNames with typeName and superTypeNames
parent bb2e5737
......@@ -25,6 +25,7 @@ import org.janusgraph.core.JanusGraphQuery;
import org.janusgraph.core.JanusGraphVertex;
import org.janusgraph.core.attribute.Contain;
import org.janusgraph.core.attribute.Text;
import org.janusgraph.graphdb.internal.ElementCategory;
import org.janusgraph.graphdb.query.JanusGraphPredicate;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery.ComparisionOperator;
......@@ -38,6 +39,9 @@ import org.apache.atlas.repository.graphdb.janus.AtlasJanusGraphDatabase;
import org.apache.atlas.repository.graphdb.janus.AtlasJanusVertex;
import org.apache.tinkerpop.gremlin.process.traversal.Compare;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.janusgraph.graphdb.query.graph.GraphCentricQueryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
......@@ -45,6 +49,7 @@ import java.util.*;
* Janus implementation of NativeTinkerpopGraphQuery.
*/
public class NativeJanusGraphQuery implements NativeTinkerpopGraphQuery<AtlasJanusVertex, AtlasJanusEdge> {
private static final Logger LOG = LoggerFactory.getLogger(NativeJanusGraphQuery.class);
private AtlasJanusGraph graph;
private JanusGraphQuery<?> query;
......@@ -69,20 +74,41 @@ public class NativeJanusGraphQuery implements NativeTinkerpopGraphQuery<AtlasJan
@Override
public Iterable<AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>> edges(int limit) {
Iterable<JanusGraphEdge> it = query.limit(limit).edges();
if (LOG.isDebugEnabled()) {
if (query instanceof GraphCentricQueryBuilder) {
LOG.debug("NativeJanusGraphQuery.vertices({}): resultSize={}, {}", limit, getCountForDebugLog(it), ((GraphCentricQueryBuilder) query).constructQuery(ElementCategory.EDGE));
} else {
LOG.debug("NativeJanusGraphQuery.vertices({}): resultSize={}, {}", limit, getCountForDebugLog(it), query);
}
}
return graph.wrapEdges(it);
}
@Override
public Iterable<AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>> edges(int offset, int limit) {
List<Edge> result = new ArrayList<>(limit);
Iterator<? extends Edge> iter = query.limit(offset + limit).edges().iterator();
Iterable<? extends Edge> it = query.limit(offset + limit).edges();
if (LOG.isDebugEnabled()) {
if (query instanceof GraphCentricQueryBuilder) {
LOG.debug("NativeJanusGraphQuery.vertices({}, {}): resultSize={}, {}", offset, limit, getCountForDebugLog(it), ((GraphCentricQueryBuilder) query).constructQuery(ElementCategory.EDGE));
} else {
LOG.debug("NativeJanusGraphQuery.vertices({}, {}): resultSize={}, {}", offset, limit, getCountForDebugLog(it), query);
}
}
Iterator<? extends Edge> iter = it.iterator();
for (long resultIdx = 0; iter.hasNext() && result.size() < limit; resultIdx++) {
Edge e = iter.next();
if (resultIdx < offset) {
continue;
}
result.add(iter.next());
result.add(e);
}
return graph.wrapEdges(result);
......@@ -91,20 +117,41 @@ public class NativeJanusGraphQuery implements NativeTinkerpopGraphQuery<AtlasJan
@Override
public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> vertices(int limit) {
Iterable<JanusGraphVertex> it = query.limit(limit).vertices();
if (LOG.isDebugEnabled()) {
if (query instanceof GraphCentricQueryBuilder) {
LOG.debug("NativeJanusGraphQuery.vertices({}): resultSize={}, {}", limit, getCountForDebugLog(it), ((GraphCentricQueryBuilder) query).constructQuery(ElementCategory.VERTEX));
} else {
LOG.debug("NativeJanusGraphQuery.vertices({}): resultSize={}, {}", limit, getCountForDebugLog(it), query);
}
}
return graph.wrapVertices(it);
}
@Override
public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> vertices(int offset, int limit) {
List<Vertex> result = new ArrayList<>(limit);
Iterator<? extends Vertex> iter = query.limit(offset + limit).vertices().iterator();
Iterable<JanusGraphVertex> it = query.limit(offset + limit).vertices();
if (LOG.isDebugEnabled()) {
if (query instanceof GraphCentricQueryBuilder) {
LOG.debug("NativeJanusGraphQuery.vertices({}, {}): resultSize={}, {}", offset, limit, getCountForDebugLog(it), ((GraphCentricQueryBuilder) query).constructQuery(ElementCategory.VERTEX));
} else {
LOG.debug("NativeJanusGraphQuery.vertices({}, {}): resultSize={}, {}", offset, limit, getCountForDebugLog(it), query);
}
}
Iterator<? extends Vertex> iter = it.iterator();
for (long resultIdx = 0; iter.hasNext() && result.size() < limit; resultIdx++) {
Vertex v = iter.next();
if (resultIdx < offset) {
continue;
}
result.add(iter.next());
result.add(v);
}
return graph.wrapVertices(result);
......@@ -169,4 +216,17 @@ public class NativeJanusGraphQuery implements NativeTinkerpopGraphQuery<AtlasJan
}
}
private int getCountForDebugLog(Iterable it) {
int ret = 0;
if (LOG.isDebugEnabled()) {
if (it != null) {
for (Iterator iter = it.iterator(); iter.hasNext(); iter.next()) {
ret++;
}
}
}
return ret;
}
}
......@@ -224,15 +224,15 @@ public class ClassificationSearchProcessor extends SearchProcessor {
break;
}
final boolean isLastResultPage;
if (indexQuery != null) {
Iterator<AtlasIndexQuery.Result> queryResult = indexQuery.vertices(qryOffset, limit);
if (!queryResult.hasNext()) { // no more results from index query - end of search
break;
}
getVerticesFromIndexQueryResult(queryResult, classificationVertices);
isLastResultPage = classificationVertices.size() < limit;
// Do in-memory filtering before the graph query
CollectionUtils.filter(classificationVertices, inMemoryPredicate);
} else {
......@@ -240,20 +240,16 @@ public class ClassificationSearchProcessor extends SearchProcessor {
// We can use single graph query to determine in this case
Iterator<AtlasVertex> queryResult = entityGraphQueryTraitNames.vertices(qryOffset, limit).iterator();
if (!queryResult.hasNext()) { // no more results - end of search
break;
}
getVertices(queryResult, entityVertices);
isLastResultPage = entityVertices.size() < limit;
} else {
Iterator<AtlasVertex> queryResult = tagGraphQueryWithAttributes.vertices(qryOffset, limit).iterator();
if (!queryResult.hasNext()) { // no more results - end of search
break;
}
getVertices(queryResult, classificationVertices);
isLastResultPage = classificationVertices.size() < limit;
// Do in-memory filtering before the graph query
CollectionUtils.filter(classificationVertices, inMemoryPredicate);
}
......@@ -288,6 +284,10 @@ public class ClassificationSearchProcessor extends SearchProcessor {
super.filter(entityVertices);
resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, entityVertices);
if (isLastResultPage) {
break;
}
}
} finally {
AtlasPerfTracer.log(perf);
......
......@@ -254,15 +254,15 @@ public class EntitySearchProcessor extends SearchProcessor {
break;
}
final boolean isLastResultPage;
if (indexQuery != null) {
Iterator<AtlasIndexQuery.Result> idxQueryResult = indexQuery.vertices(qryOffset, limit);
if (!idxQueryResult.hasNext()) { // no more results from index query - end of search
break;
}
getVerticesFromIndexQueryResult(idxQueryResult, entityVertices);
isLastResultPage = entityVertices.size() < limit;
// Do in-memory filtering before the graph query
CollectionUtils.filter(entityVertices, inMemoryPredicate);
......@@ -272,16 +272,18 @@ public class EntitySearchProcessor extends SearchProcessor {
} else {
Iterator<AtlasVertex> queryResult = graphQuery.vertices(qryOffset, limit).iterator();
if (!queryResult.hasNext()) { // no more results from query - end of search
break;
}
getVertices(queryResult, entityVertices);
isLastResultPage = entityVertices.size() < limit;
}
super.filter(entityVertices);
resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, entityVertices);
if (isLastResultPage) {
break;
}
}
} finally {
AtlasPerfTracer.log(perf);
......
......@@ -116,13 +116,14 @@ public class FullTextSearchProcessor extends SearchProcessor {
Iterator<AtlasIndexQuery.Result> idxQueryResult = indexQuery.vertices(qryOffset, limit);
if (!idxQueryResult.hasNext()) { // no more results from solr - end of search
break;
}
final boolean isLastResultPage;
int resultCount = 0;
while (idxQueryResult.hasNext()) {
AtlasVertex vertex = idxQueryResult.next().getVertex();
resultCount++;
// skip non-entity vertices
if (!AtlasGraphUtilsV2.isEntityVertex(vertex)) {
if (LOG.isDebugEnabled()) {
......@@ -139,9 +140,15 @@ public class FullTextSearchProcessor extends SearchProcessor {
entityVertices.add(vertex);
}
isLastResultPage = resultCount < limit;
super.filter(entityVertices);
resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, entityVertices);
if (isLastResultPage) {
break;
}
}
} finally {
AtlasPerfTracer.log(perf);
......
......@@ -260,19 +260,19 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
}
// create vertex indexes
createVertexIndex(management, GUID_PROPERTY_KEY, String.class, true, SINGLE, true, true);
createVertexIndex(management, GUID_PROPERTY_KEY, String.class, true, SINGLE, true, false);
createVertexIndex(management, ENTITY_TYPE_PROPERTY_KEY, String.class, false, SINGLE, true, false);
createVertexIndex(management, SUPER_TYPES_PROPERTY_KEY, String.class, false, SET, true, false);
createVertexIndex(management, TIMESTAMP_PROPERTY_KEY, Long.class, false, SINGLE, false, false);
createVertexIndex(management, MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class, false, SINGLE, false, false);
createVertexIndex(management, STATE_PROPERTY_KEY, String.class, false, SINGLE, false, false);
createVertexIndex(management, CREATED_BY_KEY, String.class, false, SINGLE, true, true);
createVertexIndex(management, MODIFIED_BY_KEY, String.class, false, SINGLE, true, true);
createVertexIndex(management, ENTITY_TYPE_PROPERTY_KEY, String.class, false, SINGLE, true, true);
createVertexIndex(management, SUPER_TYPES_PROPERTY_KEY, String.class, false, SET, true, true);
createVertexIndex(management, TRAIT_NAMES_PROPERTY_KEY, String.class, false, SET, true, true);
createVertexIndex(management, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, String.class, false, LIST, true, true);
createVertexIndex(management, TYPENAME_PROPERTY_KEY, String.class, true, SINGLE, true, true);
createVertexIndex(management, TYPENAME_PROPERTY_KEY, String.class, true, SINGLE, true, false);
createVertexIndex(management, VERTEX_TYPE_PROPERTY_KEY, String.class, false, SINGLE, true, true);
createVertexIndex(management, CLASSIFICATION_ENTITY_GUID, String.class, false, SINGLE, true, true);
createVertexIndex(management, CLASSIFICATION_ENTITY_GUID, String.class, false, SINGLE, true, false);
createVertexIndex(management, VERTEX_ID_IN_IMPORT_KEY, Long.class, false, SINGLE, true, false);
// create vertex-centric index
......@@ -499,8 +499,9 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
if (propertyKey != null) {
if (createCompositeIndex) {
createVertexCompositeIndex(management, propertyClass, propertyKey, isUnique);
}
} else if (createCompositeIndexWithTypeAndSuperTypes) {
if (createCompositeIndexWithTypeAndSuperTypes) {
createVertexCompositeIndexWithTypeName(management, propertyClass, propertyKey);
createVertexCompositeIndexWithSuperTypeName(management, propertyClass, propertyKey);
}
......
......@@ -316,8 +316,8 @@ public class AtlasGraphUtilsV2 {
public static AtlasVertex findByTypeAndPropertyName(String typeName, String propertyName, Object attrVal) {
AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
.has(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName)
.has(Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name())
.has(propertyName, attrVal);
.has(propertyName, attrVal)
.has(Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name());
Iterator<AtlasVertex> results = query.vertices().iterator();
......@@ -329,8 +329,8 @@ public class AtlasGraphUtilsV2 {
public static AtlasVertex findBySuperTypeAndPropertyName(String typeName, String propertyName, Object attrVal) {
AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
.has(Constants.SUPER_TYPES_PROPERTY_KEY, typeName)
.has(Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name())
.has(propertyName, attrVal);
.has(propertyName, attrVal)
.has(Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name());
Iterator<AtlasVertex> results = query.vertices().iterator();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment