Commit 92d02817 by apoorvnaik Committed by Madhan Neethiraj

ATLAS-1436: Metrics caching and UTs (Part 2)

parent bf377abb
...@@ -207,6 +207,7 @@ atlas.feature.taxonomy.enable=true ...@@ -207,6 +207,7 @@ atlas.feature.taxonomy.enable=true
############ Atlas Metric/Stats configs ################ ############ Atlas Metric/Stats configs ################
# Format: atlas.metric.query.<key>.<name> # Format: atlas.metric.query.<key>.<name>
atlas.metric.query.cache.ttlInSecs=900
#atlas.metric.query.general.typeCount= #atlas.metric.query.general.typeCount=
#atlas.metric.query.general.typeUnusedCount= #atlas.metric.query.general.typeUnusedCount=
#atlas.metric.query.general.entityCount= #atlas.metric.query.general.entityCount=
......
...@@ -61,7 +61,7 @@ public class AtlasMetrics { ...@@ -61,7 +61,7 @@ public class AtlasMetrics {
} }
@JsonIgnore @JsonIgnore
public void addData(String groupKey, String key, Integer value) { public void addData(String groupKey, String key, Number value) {
Map<String, Map<String, Number>> data = this.data; Map<String, Map<String, Number>> data = this.data;
if (data == null) { if (data == null) {
data = new HashMap<>(); data = new HashMap<>();
......
...@@ -17,18 +17,21 @@ ...@@ -17,18 +17,21 @@
*/ */
package org.apache.atlas.services; package org.apache.atlas.services;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.model.metrics.AtlasMetrics; import org.apache.atlas.model.metrics.AtlasMetrics;
import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.script.ScriptException; import javax.script.ScriptException;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -37,6 +40,8 @@ public class MetricsService { ...@@ -37,6 +40,8 @@ public class MetricsService {
private static final Logger LOG = LoggerFactory.getLogger(MetricsService.class); private static final Logger LOG = LoggerFactory.getLogger(MetricsService.class);
public static final String METRIC_QUERY_PREFIX = "atlas.metric.query."; public static final String METRIC_QUERY_PREFIX = "atlas.metric.query.";
public static final String METRIC_QUERY_CACHE_TTL = "atlas.metric.query.cache.ttlInSecs";
public static final int DEFAULT_CACHE_TTL_IN_SECS = 900;
public static final String TYPE = "type"; public static final String TYPE = "type";
public static final String ENTITY = "entity"; public static final String ENTITY = "entity";
...@@ -55,35 +60,107 @@ public class MetricsService { ...@@ -55,35 +60,107 @@ public class MetricsService {
public static final String METRIC_TAG_COUNT = TAG + "Count"; public static final String METRIC_TAG_COUNT = TAG + "Count";
public static final String METRIC_ENTITIES_PER_TAG = TAG + "Entities"; public static final String METRIC_ENTITIES_PER_TAG = TAG + "Entities";
private static AtlasGraph atlasGraph; public static final String METRIC_COLLECTION_TIME = "collectionTime";
private static Configuration configuration;
public MetricsService() throws AtlasException { private static Configuration configuration = null;
atlasGraph = AtlasGraphProvider.getGraphInstance(); private final AtlasGraph atlasGraph;
configuration = ApplicationProperties.get(); private final AtlasTypeRegistry atlasTypeRegistry;
private final int cacheTTLInSecs;
private AtlasMetrics cachedMetrics = null;
private long cacheExpirationTime = 0;
@Inject
public MetricsService(AtlasTypeRegistry typeRegistry) throws AtlasException {
this(ApplicationProperties.get(), AtlasGraphProvider.getGraphInstance(), typeRegistry);
}
@VisibleForTesting
MetricsService(Configuration configuration, AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
MetricsService.configuration = configuration;
atlasTypeRegistry = typeRegistry;
atlasGraph = graph;
cacheTTLInSecs = configuration != null ? configuration.getInt(METRIC_QUERY_CACHE_TTL, DEFAULT_CACHE_TTL_IN_SECS)
: DEFAULT_CACHE_TTL_IN_SECS;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public AtlasMetrics getMetrics() { public AtlasMetrics getMetrics() {
if (!isCacheValid()) {
AtlasMetrics metrics = new AtlasMetrics(); AtlasMetrics metrics = new AtlasMetrics();
for (MetricQuery metricQuery : MetricQuery.values()) { for (MetricQuery metricQuery : MetricQuery.values()) {
try { try {
Object result = atlasGraph.executeGremlinScript(metricQuery.query, false); if (LOG.isDebugEnabled()) {
LOG.debug("Executing query: {}", metricQuery);
}
if (metricQuery == MetricQuery.ENTITIES_PER_TYPE) {
Collection<String> entityDefNames = atlasTypeRegistry.getAllEntityDefNames();
for (String entityDefName : entityDefNames) {
String formattedQuery = String.format(metricQuery.query, entityDefName);
executeGremlinQuery(metrics, metricQuery.type, entityDefName, formattedQuery);
}
} else {
executeGremlinQuery(metrics, metricQuery.type, metricQuery.name, metricQuery.query);
}
} catch (ScriptException e) {
LOG.error("Gremlin execution failed for metric {}", metricQuery, e);
}
}
long collectionTime = System.currentTimeMillis();
metrics.addData(GENERAL, METRIC_COLLECTION_TIME, collectionTime);
this.cachedMetrics = metrics;
this.cacheExpirationTime = (collectionTime + cacheTTLInSecs * 1000);
}
return cachedMetrics;
}
private void executeGremlinQuery(AtlasMetrics metrics, String type, String name, String query) throws ScriptException {
Object result = atlasGraph.executeGremlinScript(query, false);
if (result instanceof Number) { if (result instanceof Number) {
metrics.addData(metricQuery.type, metricQuery.name, ((Number) result).intValue()); metrics.addData(type, name, ((Number) result).intValue());
} else if (result instanceof List) { } else if (result instanceof List) {
for (Map resultMap : (List<Map>) result) { for (Map resultMap : (List<Map>) result) {
metrics.addData(metricQuery.type, (String) resultMap.get("key"), ((Number) resultMap.get("value")).intValue()); metrics.addData(type, (String) resultMap.get("key"), ((Number) resultMap.get("value")).intValue());
} }
} else { } else {
LOG.warn("Unhandled return type {} for {}. Ignoring", result.getClass().getSimpleName(), metricQuery); String returnClassName = result != null ? result.getClass().getSimpleName() : "null";
LOG.warn("Unhandled return type {} for {}. Ignoring", returnClassName, query);
} }
} catch (ScriptException e) {
LOG.error("Gremlin execution failed for metric {}", metricQuery, e);
} }
private boolean isCacheValid() {
boolean valid = cachedMetrics != null && System.currentTimeMillis() < cacheExpirationTime;
if (LOG.isDebugEnabled()) {
LOG.debug("cachedMetrics: {}", cachedMetrics != null);
LOG.debug("cacheExpirationTime: {}", cacheExpirationTime);
LOG.debug("valid: {}", valid);
}
return valid;
}
private static String getQuery(String type, String name, String defaultQuery) {
String ret = configuration != null ? configuration.getString(METRIC_QUERY_PREFIX + type + "." + name, defaultQuery)
: defaultQuery;
if (LOG.isDebugEnabled()) {
LOG.debug("query for {}.{}: {}", type, name, ret);
} }
return metrics; return ret;
} }
/** /**
...@@ -92,35 +169,27 @@ public class MetricsService { ...@@ -92,35 +169,27 @@ public class MetricsService {
* The default behavior is to read from the properties and override the statically type query if the configured * The default behavior is to read from the properties and override the statically type query if the configured
* query is not blank/empty. * query is not blank/empty.
*/ */
enum MetricQuery { private enum MetricQuery {
TYPE_COUNT(GENERAL, METRIC_TYPE_COUNT, "g.V().has('__type', 'typeSystem').filter({it.'__type.category'.name() != 'TRAIT'}).count()"), TYPE_COUNT(GENERAL, METRIC_TYPE_COUNT, "g.V().has('__type', 'typeSystem').filter({it.'__type.category'.name() != 'TRAIT'}).count()"),
UNUSED_TYPE_COUNT(GENERAL, METRIC_TYPE_UNUSED_COUNT, "g.V('__type', 'typeSystem').filter({ it.'__type.category'.name() != 'TRAIT' && it.inE.count() == 0}).count()"), UNUSED_TYPE_COUNT(GENERAL, METRIC_TYPE_UNUSED_COUNT, "g.V('__type', 'typeSystem').filter({ it.'__type.category'.name() != 'TRAIT' && it.inE.count() == 0}).count()"),
ENTITY_COUNT(GENERAL, METRIC_ENTITY_COUNT, "g.V().has('__superTypeNames', T.in, ['Referenceable']).count()"), ENTITY_COUNT(GENERAL, METRIC_ENTITY_COUNT, "g.V().has('__superTypeNames', T.in, ['Referenceable']).count()"),
TAGS_COUNT(GENERAL, METRIC_TAG_COUNT, "g.V().has('__type', 'typeSystem').filter({it.'__type.category'.name() == 'TRAIT'}).count()"), TAGS_COUNT(GENERAL, METRIC_TAG_COUNT, "g.V().has('__type', 'typeSystem').filter({it.'__type.category'.name() == 'TRAIT'}).count()"),
DELETED_ENTITY_COUNT(GENERAL, METRIC_ENTITY_DELETED, "g.V().has('__superTypeNames', T.in, ['Referenceable']).has('__status', 'DELETED').count()"), DELETED_ENTITY_COUNT(GENERAL, METRIC_ENTITY_DELETED, "g.V().has('__superTypeNames', T.in, ['Referenceable']).has('__status', 'DELETED').count()"),
ENTITIES_PER_TYPE(ENTITY, METRIC_TYPE_ENTITIES, "g.V().has('__type', 'typeSystem').has('__type.name').filter({it.'__type.category'.name() != 'TRAIT'}).transform{[key: it.'__type.name', value: it.inE.count()]}.dedup().toList()"), ENTITIES_PER_TYPE(ENTITY, METRIC_TYPE_ENTITIES, "g.V().has('__typeName', T.in, ['%s']).count()"),
TAGGED_ENTITIES(ENTITY, METRIC_TAGGED_ENTITIES, "g.V().has('__superTypeNames', T.in, ['Referenceable']).has('__traitNames').count()"), TAGGED_ENTITIES(ENTITY, METRIC_TAGGED_ENTITIES, "g.V().has('__superTypeNames', T.in, ['Referenceable']).has('__traitNames').count()"),
TAGS_PER_ENTITY(TAG, METRIC_TAGS_PER_ENTITY, "g.V().has('__superTypeNames', T.in, ['Referenceable']).has('__traitNames').transform{[ key: it.'Referenceable.qualifiedName', value: it.'__traitNames'.size()]}.dedup().toList()"), TAGS_PER_ENTITY(TAG, METRIC_TAGS_PER_ENTITY, "g.V().has('__superTypeNames', T.in, ['Referenceable']).has('__traitNames').transform{[ key: it.'Referenceable.qualifiedName', value: it.'__traitNames'.size()]}.dedup().toList()"),
; ;
private String type;
private String name;
private String query;
private static String getQuery(String type, String name) { private final String type;
String metricQueryKey = METRIC_QUERY_PREFIX + type + "." + name; private final String name;
if (LOG.isDebugEnabled()) { private final String query;
LOG.debug("Looking for configured query {}", metricQueryKey);
}
return configuration.getString(metricQueryKey, "");
}
MetricQuery(String type, String name, String query) { MetricQuery(String type, String name, String query) {
this.type = type; this.type = type;
this.name = name; this.name = name;
String configuredQuery = getQuery(type, name); this.query = MetricsService.getQuery(type, name, query);
this.query = StringUtils.isNotEmpty(configuredQuery) ? configuredQuery : query;
} }
@Override @Override
......
package org.apache.atlas.services;
import org.apache.atlas.model.metrics.AtlasMetrics;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.configuration.Configuration;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import javax.script.ScriptException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
public class MetricsServiceTest {
private Configuration mockConfig = mock(Configuration.class);
private AtlasTypeRegistry mockTypeRegistry = mock(AtlasTypeRegistry.class);
private AtlasGraph mockGraph = mock(AtlasGraph.class);
private MetricsService metricsService;
private List<Map> mockMapList = new ArrayList<>();
private Number mockCount = 10;
@BeforeClass
public void init() throws ScriptException {
Map<String, Object> aMockMap = new HashMap<>();
Map<String, Object> bMockMap = new HashMap<>();
Map<String, Object> cMockMap = new HashMap<>();
aMockMap.put("key", "a");
aMockMap.put("value", 1);
bMockMap.put("key", "b");
bMockMap.put("value", 2);
cMockMap.put("key", "c");
cMockMap.put("value", 3);
mockMapList.add(aMockMap);
mockMapList.add(bMockMap);
mockMapList.add(cMockMap);
when(mockConfig.getInt(anyString(), anyInt())).thenReturn(5);
assertEquals(mockConfig.getInt("test", 1), 5);
when(mockTypeRegistry.getAllEntityDefNames()).thenReturn(Arrays.asList("a", "b", "c"));
setupMockGraph();
metricsService = new MetricsService(mockConfig, mockGraph, mockTypeRegistry);
}
private void setupMockGraph() throws ScriptException {
if (mockGraph == null) mockGraph = mock(AtlasGraph.class);
when(mockGraph.executeGremlinScript(anyString(), eq(false))).thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
if (((String)invocationOnMock.getArguments()[0]).contains("count()")) {
return mockCount;
} else {
return mockMapList;
}
}
});
}
@Test
public void testGetMetrics() throws InterruptedException, ScriptException {
assertNotNull(metricsService);
AtlasMetrics metrics = metricsService.getMetrics();
assertNotNull(metrics);
Number aCount = metrics.getMetric("entity", "a");
assertNotNull(aCount);
assertEquals(aCount, 10);
Number bCount = metrics.getMetric("entity", "b");
assertNotNull(bCount);
assertEquals(bCount, 10);
Number cCount = metrics.getMetric("entity", "c");
assertNotNull(cCount);
assertEquals(cCount, 10);
Number aTags = metrics.getMetric("tag", "a");
assertNotNull(aTags);
assertEquals(aTags, 1);
Number bTags = metrics.getMetric("tag", "b");
assertNotNull(bTags);
assertEquals(bTags, 2);
Number cTags = metrics.getMetric("tag", "c");
assertNotNull(cTags);
assertEquals(cTags, 3);
verify(mockGraph, atLeastOnce()).executeGremlinScript(anyString(), anyBoolean());
// Subsequent call within the cache timeout window
metricsService.getMetrics();
verifyZeroInteractions(mockGraph);
// Now test the cache refresh
Thread.sleep(6000);
metricsService.getMetrics();
verify(mockGraph, atLeastOnce()).executeGremlinScript(anyString(), anyBoolean());
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment