Commit b243a363 by apoorvnaik

ATLAS-2597: Batched metric query execution

Change-Id: I4d1e5fa006b7fe459cf66daaaa9410d858c3c47e
parent 5cfb0228
......@@ -24,10 +24,8 @@ import org.apache.atlas.model.metrics.AtlasMetrics;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.AtlasGremlinQueryProvider;
import org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -37,6 +35,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.INSTANCE;
@AtlasService
public class MetricsService {
......@@ -62,9 +64,12 @@ public class MetricsService {
protected static final String METRIC_TAG_COUNT = TAG + "Count";
protected static final String METRIC_ENTITIES_PER_TAG = TAG + "Entities";
public static final String METRIC_QUERY_PREFIX = "atlas.metric.query.";
public static final String METRIC_QUERY_CACHE_TTL = "atlas.metric.query.cache.ttlInSecs";
public static final int DEFAULT_CACHE_TTL_IN_SECS = 900;
public static final String METRIC_QUERY_PREFIX = "atlas.metric.query.";
public static final String METRIC_QUERY_CACHE_TTL = "atlas.metric.query.cache.ttlInSecs";
public static final String METRIC_QUERY_GREMLIN_TYPES_BATCH_SIZE = "atlas.metric.query.gremlin.typesBatchSize";
public static final int DEFAULT_CACHE_TTL_IN_SECS = 900;
public static final int DEFAULT_GREMLIN_BATCH_SIZE = 25;
public static final String METRIC_COLLECTION_TIME = "collectionTime";
......@@ -74,6 +79,7 @@ public class MetricsService {
private final AtlasGraph atlasGraph;
private final AtlasTypeRegistry typeRegistry;
private final int cacheTTLInSecs;
private final int gremlinBatchSize;
private AtlasMetrics cachedMetrics = null;
private long cacheExpirationTime = 0;
......@@ -81,7 +87,7 @@ public class MetricsService {
@Inject
public MetricsService(final Configuration configuration, final AtlasGraph graph, final AtlasTypeRegistry typeRegistry) {
this(configuration, graph, typeRegistry, AtlasGremlinQueryProvider.INSTANCE);
this(configuration, graph, typeRegistry, INSTANCE);
}
@VisibleForTesting
......@@ -90,6 +96,8 @@ public class MetricsService {
atlasGraph = graph;
cacheTTLInSecs = configuration != null ? configuration.getInt(METRIC_QUERY_CACHE_TTL, DEFAULT_CACHE_TTL_IN_SECS)
: DEFAULT_CACHE_TTL_IN_SECS;
gremlinBatchSize = configuration != null ? configuration.getInt(METRIC_QUERY_GREMLIN_TYPES_BATCH_SIZE, DEFAULT_GREMLIN_BATCH_SIZE)
: DEFAULT_GREMLIN_BATCH_SIZE;
gremlinQueryProvider = queryProvider;
this.typeRegistry = typeRegistry;
}
......@@ -109,36 +117,41 @@ public class MetricsService {
int tagCount = 0;
Collection<String> classificationDefNames = typeRegistry.getAllClassificationDefNames()
.stream()
.map(x -> "'" + x + "'")
.collect(Collectors.toSet());
String classificationNamesCSV = StringUtils.join(classificationDefNames, ',');
Map<String, Number> activeCountMap = new HashMap<>();
Map<String, Number> deletedCountMap = new HashMap<>();
List<String> classificationDefNames = typeRegistry.getAllClassificationDefNames()
.stream()
.map(x -> "'" + x + "'")
.collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(classificationDefNames)) {
tagCount = classificationDefNames.size();
}
metrics.addMetric(GENERAL, METRIC_TAG_COUNT, tagCount);
Collection<String> entityDefNames = typeRegistry.getAllEntityDefNames()
.stream()
.map(x -> "'" + x + "'")
.collect(Collectors.toList());
String entityNamesCSV = StringUtils.join(entityDefNames, ',');
String entityAndClassificationNamesCSV = entityNamesCSV + "," + classificationNamesCSV;
IntStream
.range(0, (classificationDefNames.size() + gremlinBatchSize - 1) / gremlinBatchSize)
.mapToObj(i -> classificationDefNames.subList(i * gremlinBatchSize, Math.min(classificationDefNames.size(), (i + 1) * gremlinBatchSize)))
.forEach(batch -> captureCounts(batch, activeCountMap, deletedCountMap));
String query = String.format(gremlinQueryProvider.getQuery(AtlasGremlinQuery.ENTITY_ACTIVE_METRIC), entityAndClassificationNamesCSV);
Map<String, Number> activeCountMap = extractCounts(query);
query = String.format(gremlinQueryProvider.getQuery(AtlasGremlinQuery.ENTITY_DELETED_METRIC), entityAndClassificationNamesCSV);
Map<String, Number> deletedCountMap = extractCounts(query);
List<String> entityDefNames = typeRegistry.getAllEntityDefNames()
.stream()
.map(x -> "'" + x + "'")
.collect(Collectors.toList());
IntStream
.range(0, (entityDefNames.size() + gremlinBatchSize - 1) / gremlinBatchSize)
.mapToObj(i -> entityDefNames.subList(i * gremlinBatchSize, Math.min(entityDefNames.size(), (i + 1) * gremlinBatchSize)))
.forEach(batch -> captureCounts(batch, activeCountMap, deletedCountMap));
int totalEntities = 0;
Map<String, Number> activeEntityCount = new HashMap<>();
Map<String, Number> activeEntityCount = new HashMap<>();
Map<String, Number> deletedEntityCount = new HashMap<>();
for (String entityDefName : typeRegistry.getAllEntityDefNames()) {
Number activeCount = activeCountMap.getOrDefault(entityDefName, null);
Number activeCount = activeCountMap.getOrDefault(entityDefName, null);
Number deletedCount = deletedCountMap.getOrDefault(entityDefName, null);
if (activeCount != null) {
......@@ -181,6 +194,16 @@ public class MetricsService {
return cachedMetrics;
}
private void captureCounts(List<String> typeNames, Map<String, Number> activeCountMap, Map<String, Number> deletedCountMap) {
String typeNamesAsStr = String.join(",", typeNames);
String query = String.format(gremlinQueryProvider.getQuery(AtlasGremlinQuery.ENTITY_ACTIVE_METRIC), typeNamesAsStr);
activeCountMap.putAll(extractCounts(query));
query = String.format(gremlinQueryProvider.getQuery(AtlasGremlinQuery.ENTITY_DELETED_METRIC), typeNamesAsStr);
deletedCountMap.putAll(extractCounts(query));
}
private Map<String, Number> extractCounts(final String query) {
Map<String, Number> ret = new HashMap<>();
try {
......
......@@ -43,10 +43,13 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
public class MetricsServiceTest {
private Configuration mockConfig = mock(Configuration.class);
private AtlasTypeRegistry mockTypeRegistry = mock(AtlasTypeRegistry.class);
private AtlasGraph mockGraph = mock(AtlasGraph.class);
private Configuration mockConfig = mock(Configuration.class);
private Configuration mockConfig1 = mock(Configuration.class);
private AtlasTypeRegistry mockTypeRegistry = mock(AtlasTypeRegistry.class);
private AtlasGraph mockGraph = mock(AtlasGraph.class);
private AtlasGraph mockGraph1 = mock(AtlasGraph.class);
private MetricsService metricsService;
private MetricsService metricsService1;
private List<Map> activeEntityCountList = new ArrayList<>();
private List<Map> deletedEntityCountList = new ArrayList<>();
......@@ -66,16 +69,18 @@ public class MetricsServiceTest {
deletedEntityCount.put("b", 5);
deletedEntityCountList.add(deletedEntityCount);
when(mockConfig.getInt(anyString(), anyInt())).thenReturn(5);
assertEquals(mockConfig.getInt("test", 1), 5);
when(mockConfig.getString(anyString(), anyString())).thenReturn("toList()", "toList()");
when(mockConfig.getInt(eq(MetricsService.METRIC_QUERY_GREMLIN_TYPES_BATCH_SIZE), anyInt())).thenReturn(25);
when(mockConfig.getInt(eq(MetricsService.METRIC_QUERY_CACHE_TTL), anyInt())).thenReturn(900);
when(mockConfig1.getInt(eq(MetricsService.METRIC_QUERY_GREMLIN_TYPES_BATCH_SIZE), anyInt())).thenReturn(2);
when(mockTypeRegistry.getAllTypeNames()).thenReturn(Arrays.asList("a", "b", "c", "d", "e", "f"));
when(mockTypeRegistry.getAllEntityDefNames()).thenReturn(Arrays.asList("a", "b", "c"));
when(mockTypeRegistry.getAllClassificationDefNames()).thenReturn(Arrays.asList("d", "e", "f"));
// when(mockTypeRegistry.getAllEntityDefNames()).thenReturn(Arrays.asList("a", "b", "c"));
setupMockGraph();
metricsService = new MetricsService(mockConfig, mockGraph, mockTypeRegistry, new AtlasGremlin3QueryProvider());
metricsService1 = new MetricsService(mockConfig1, mockGraph1, mockTypeRegistry, new AtlasGremlin3QueryProvider());
}
catch(Exception e) {
throw new SkipException("MetricsServicesTest: init failed!", e);
......@@ -91,9 +96,21 @@ public class MetricsServiceTest {
private void setupMockGraph() throws AtlasBaseException {
if (mockGraph == null) mockGraph = mock(AtlasGraph.class);
when(mockGraph.executeGremlinScript(anyString(), eq(false)))
.thenReturn(activeEntityCountList)
.thenReturn(deletedEntityCountList);
if (mockGraph1 == null) mockGraph1 = mock(AtlasGraph.class);
when(mockGraph.executeGremlinScript(anyString(), eq(false))).thenAnswer(invocationOnMock -> {
if (((String)invocationOnMock.getArguments()[0]).contains("ACTIVE")) {
return activeEntityCountList;
} else {
return deletedEntityCountList;
}
});
when(mockGraph1.executeGremlinScript(anyString(), eq(false))).thenAnswer(invocationOnMock -> {
if (((String)invocationOnMock.getArguments()[0]).contains("ACTIVE")) {
return activeEntityCountList;
} else {
return deletedEntityCountList;
}
});
}
@Test
......@@ -124,7 +141,14 @@ public class MetricsServiceTest {
assertEquals(taggedEntityMetric.get("e"), 10);
assertEquals(taggedEntityMetric.get("f"), 15);
verify(mockGraph, atLeastOnce()).executeGremlinScript(anyString(), anyBoolean());
// 2 calls for entity types and 2 calls for classification types
verify(mockGraph, times(4)).executeGremlinScript(anyString(), anyBoolean());
// Test batched calls
metricsService1.getMetrics(false);
// 3 classifications, 3 entity types & batch size = 2 and 2 calls per batch, total batches = 4, total calls = 8
// 2 for entity and 2 for classification
verify(mockGraph1, times(8)).executeGremlinScript(anyString(), anyBoolean());
// Subsequent call within the cache timeout window
metricsService.getMetrics(false);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment