Commit 37c8a4d1 by Jeff Hagelberg Committed by Vimal Sharma

Cache of compiled DSL queries

parent d204df78
...@@ -70,7 +70,6 @@ ...@@ -70,7 +70,6 @@
<groupId>com.google.guava</groupId> <groupId>com.google.guava</groupId>
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
<version>${guava.version}</version> <version>${guava.version}</version>
<scope>test</scope>
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.utils;
import java.text.DateFormat;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Fixed size LRU Cache.
*
*/
public class LruCache<K, V> extends LinkedHashMap<K, V>{
private static final long serialVersionUID = 8715233786643882558L;
private static final Logger LOGGER = LoggerFactory.getLogger(LruCache.class.getName());
/**
* Specifies the number evictions that pass before a warning is logged.
*/
private final int evictionWarningThrottle;
// The number of evictions since the last warning was logged.
private long evictionsSinceWarning = 0;
// When the last eviction warning was issued.
private Date lastEvictionWarning = new Date();
// The maximum number of entries the cache holds.
private final int capacity;
/**
*
* @param cacheSize The size of the cache.
* @param evictionWarningThrottle The number evictions that pass before a warning is logged.
*/
public LruCache(int cacheSize, int evictionWarningThrottle) {
super(cacheSize, 0.75f, true);
this.evictionWarningThrottle = evictionWarningThrottle;
this.capacity = cacheSize;
}
@Override
protected boolean removeEldestEntry(Map.Entry<K,V> eldest) {
if( size() > capacity) {
evictionWarningIfNeeded();
return true;
}
return false;
}
/**
* Logs a warning if a threshold number of evictions has occurred since the
* last warning.
*/
private void evictionWarningIfNeeded() {
// If not logging eviction warnings, just return.
if (evictionWarningThrottle <= 0) {
return;
}
evictionsSinceWarning++;
if (evictionsSinceWarning >= evictionWarningThrottle) {
DateFormat dateFormat = DateFormat.getDateTimeInstance();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("There have been " + evictionsSinceWarning
+ " evictions from the cache since "
+ dateFormat.format(lastEvictionWarning));
}
evictionsSinceWarning = 0;
lastEvictionWarning = new Date();
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.utils;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.RandomStringUtils;
import org.testng.annotations.Test;
/**
* Tests the LruCache.
*/
@Test
public class LruCacheTest {
/**
* Tests the basic operations on the cache.
*/
@Test
public void testBasicOps() throws Exception {
LruCache<String, String> cache = new LruCache<>(1000, 0);
// Get the static cache and populate it. Its size and other
// characteristics depend on the bootstrap properties that are hard to
// control in a test. So it is hard to see that if we add more entries
// than the size of the cache one is evicted, or that it gets reaped at
// the right time. However, a lot of this type of functionality is
// tested by the underlying LruCache's test.
// Note that query handle IDs are of the form sessionID::queryID
String h1 = createHandle("s1::", "1::");
String q1 = createQuery();
String h2 = createHandle("s1::", "2::");
String q2 = createQuery();
String h3 = createHandle("s2::", "1::");
String q3 = createQuery();
String h4 = createHandle("s1::", "3::");
String q4 = createQuery();
String h5 = createHandle("s3::", null);
String q5 = createQuery();
String h5b = createHandle("s3::", null);
String q5b = createQuery();
String h6 = createHandle(null, "3::");
String q6 = createQuery();
String h6b = createHandle(null, "3::");
String q6b = createQuery();
// Test put and get.
cache.put(h1, q1);
cache.put(h2, q2);
cache.put(h3, q3);
cache.put(h4, q4);
cache.put(h5, q5);
cache.put(h6, q6);
assertEquals(cache.get(h1), q1);
assertEquals(cache.get(h2), q2);
assertEquals(cache.get(h3), q3);
assertEquals(cache.get(h4), q4);
assertEquals(cache.get(h5), q5);
assertEquals(cache.remove(h1), q1);
assertEquals(cache.remove(h2), q2);
assertEquals(cache.remove(h3), q3);
assertEquals(cache.remove(h4), q4);
assertEquals(cache.remove(h5), q5);
assertNull(cache.remove(h5b));
assertEquals(q6, cache.remove(h6));
assertNull(cache.remove(h6b));
cache.put(h5b, q5b);
cache.put(h6b, q6b);
assertEquals(q5b, cache.remove(h5));
assertNull(cache.remove(h5b));
assertEquals(q6b, cache.remove(h6));
assertNull(cache.remove(h6b));
}
@Test
public void testMapOperations() {
Map<String, String> reference = new HashMap<>();
reference.put("name", "Fred");
reference.put("occupation", "student");
reference.put("height", "5'11");
reference.put("City", "Littleton");
reference.put("State", "MA");
LruCache<String, String> map = new LruCache<>(10, 10);
map.putAll(reference);
assertEquals(map.size(), reference.size());
assertEquals(map.keySet().size(), reference.keySet().size());
assertTrue(map.keySet().containsAll(reference.keySet()));
assertTrue(reference.keySet().containsAll(map.keySet()));
assertEquals(reference.entrySet().size(), map.entrySet().size());
for(Map.Entry<String, String> entry : map.entrySet()) {
assertTrue(reference.containsKey(entry.getKey()));
assertEquals(entry.getValue(), reference.get(entry.getKey()));
assertTrue(map.containsKey(entry.getKey()));
assertTrue(map.containsValue(entry.getValue()));
assertTrue(map.values().contains(entry.getValue()));
}
assertTrue(reference.equals(map));
assertTrue(map.equals(reference));
}
@Test
public void testReplaceValueInMap() {
LruCache<String, String> map = new LruCache<>(10, 10);
map.put("name", "Fred");
map.put("name", "George");
assertEquals(map.get("name"), "George");
assertEquals(map.size(), 1);
}
@Test
public void testOrderUpdatedWhenAddExisting() {
LruCache<String, String> map = new LruCache<>(2, 10);
map.put("name", "Fred");
map.put("age", "15");
map.put("name", "George");
//age should be evicted
map.put("height", "5'3\"");
//age is now least recently used
assertFalse(map.containsKey("age"));
}
@Test
public void testMapRemove() {
LruCache<String, String> map = new LruCache<>(10, 10);
map.put("name", "Fred");
map.put("occupation", "student");
map.put("height", "5'11");
map.put("City", "Littleton");
map.put("State", "MA");
assertMapHasSize(map, 5);
assertTrue(map.containsKey("State"));
map.remove("State");
assertMapHasSize(map, 4);
assertFalse(map.containsKey("State"));
}
private void assertMapHasSize(LruCache<String, String> map, int size) {
assertEquals(map.size(), size);
assertEquals(map.keySet().size(), size);
assertEquals(map.values().size(), size);
assertEquals(map.entrySet().size(), size);
}
@Test
public void testEvict() {
LruCache<String, String> map = new LruCache<>(5, 10);
map.put("name", "Fred");
map.put("occupation", "student");
map.put("height", "5'11");
map.put("City", "Littleton");
map.put("State", "MA");
assertMapHasSize(map, 5);
//name should be evicted next
assertTrue(map.containsKey("name"));
map.put("zip", "01460");
assertFalse(map.containsKey("name"));
assertMapHasSize(map, 5);
map.get("occupation");
//height should be evicted next
assertTrue(map.containsKey("height"));
map.put("country", "USA");
assertFalse(map.containsKey("height"));
assertMapHasSize(map, 5);
}
/**
* Create a fake query handle for testing.
*
* @param queryPrefix
* @param pkgPrefix
* @return a new query handle.
*/
private String createHandle(String s1, String s2) {
return s1 + ": " + s2 + ":select x from x in y";
}
/**
* Create a mock IInternalQuery.
*
* @return a mock IInternalQuery.
* @throws QueryException
*/
private String createQuery() {
return RandomStringUtils.randomAlphabetic(10);
}
}
...@@ -218,3 +218,17 @@ atlas.metric.query.cache.ttlInSecs=900 ...@@ -218,3 +218,17 @@ atlas.metric.query.cache.ttlInSecs=900
#atlas.metric.query.entity.entityTagged= #atlas.metric.query.entity.entityTagged=
# #
#atlas.metric.query.tags.entityTags= #atlas.metric.query.tags.entityTags=
######### Compiled Query Cache Configuration #########
# The size of the compiled query cache. Older queries will be evicted from the cache
# when we reach the capacity.
#atlas.CompiledQueryCache.capacity=1000
# Allows notifications when items are evicted from the compiled query
# cache because it has become full. A warning will be issued when
# the specified number of evictions have occurred. If the eviction
# warning threshold <= 0, no eviction warnings will be issued.
#atlas.CompiledQueryCache.evictionWarningThrottle=0
\ No newline at end of file
...@@ -9,6 +9,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements for al ...@@ -9,6 +9,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements for al
ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai) ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai)
ALL CHANGES: ALL CHANGES:
ATLAS-1387 Compiled Query Cache (jnhagelberg@us.ibm.com via svimal2106)
ATLAS-1312 Update QuickStart to use the v2 APIs for types and entities creation (sarath.kum4r@gmail.com via mneethiraj) ATLAS-1312 Update QuickStart to use the v2 APIs for types and entities creation (sarath.kum4r@gmail.com via mneethiraj)
ATLAS-1498 added unit-tests to validate handling of array/map/struct attributes in entity create/update (sumasai via mneethiraj) ATLAS-1498 added unit-tests to validate handling of array/map/struct attributes in entity create/update (sumasai via mneethiraj)
ATLAS-1114 Performance improvements for create/update entity (jnhagelb) ATLAS-1114 Performance improvements for create/update entity (jnhagelb)
......
...@@ -18,6 +18,17 @@ ...@@ -18,6 +18,17 @@
package org.apache.atlas.discovery.graph; package org.apache.atlas.discovery.graph;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.script.ScriptException;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.GraphTransaction; import org.apache.atlas.GraphTransaction;
import org.apache.atlas.discovery.DiscoveryException; import org.apache.atlas.discovery.DiscoveryException;
...@@ -38,24 +49,17 @@ import org.apache.atlas.repository.graphdb.AtlasEdge; ...@@ -38,24 +49,17 @@ import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery; import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.util.CompiledQueryCacheKey;
import org.apache.atlas.util.NoopGremlinQuery;
import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import scala.util.Either; import scala.util.Either;
import scala.util.parsing.combinator.Parsers; import scala.util.parsing.combinator.Parsers;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.script.ScriptException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/** /**
* Graph backed implementation of Search. * Graph backed implementation of Search.
*/ */
...@@ -124,42 +128,57 @@ public class GraphBackedDiscoveryService implements DiscoveryService { ...@@ -124,42 +128,57 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
} }
public GremlinQueryResult evaluate(String dslQuery, QueryParams queryParams) throws DiscoveryException { public GremlinQueryResult evaluate(String dslQuery, QueryParams queryParams) throws DiscoveryException {
if(LOG.isDebugEnabled()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing dsl query={}", dslQuery); LOG.debug("Executing dsl query={}", dslQuery);
} }
try { try {
Either<Parsers.NoSuccess, Expressions.Expression> either = QueryParser.apply(dslQuery, queryParams); GremlinQuery gremlinQuery = parseAndTranslateDsl(dslQuery, queryParams);
if (either.isRight()) { if(gremlinQuery instanceof NoopGremlinQuery) {
Expressions.Expression expression = either.right().get(); return new GremlinQueryResult(dslQuery, ((NoopGremlinQuery)gremlinQuery).getDataType(), Collections.emptyList());
return evaluate(dslQuery, expression);
} else {
throw new DiscoveryException("Invalid expression : " + dslQuery + ". " + either.left());
} }
return new GremlinEvaluator(gremlinQuery, graphPersistenceStrategy, graph).evaluate();
} catch (Exception e) { // unable to catch ExpressionException } catch (Exception e) { // unable to catch ExpressionException
throw new DiscoveryException("Invalid expression : " + dslQuery, e); throw new DiscoveryException("Invalid expression : " + dslQuery, e);
} }
} }
private GremlinQueryResult evaluate(String dslQuery, Expressions.Expression expression) { private GremlinQuery parseAndTranslateDsl(String dslQuery, QueryParams queryParams) throws DiscoveryException {
Expressions.Expression validatedExpression = QueryProcessor.validate(expression);
//If the final limit is 0, don't launch the query, return with 0 rows CompiledQueryCacheKey entry = new CompiledQueryCacheKey(dslQuery, queryParams);
if (validatedExpression instanceof Expressions.LimitExpression GremlinQuery gremlinQuery = QueryProcessor.compiledQueryCache().get(entry);
&& ((Integer)((Expressions.LimitExpression) validatedExpression).limit().rawValue()) == 0) { if(gremlinQuery == null) {
return new GremlinQueryResult(dslQuery, validatedExpression.dataType(), Collections.emptyList()); Expressions.Expression validatedExpression = parseQuery(dslQuery, queryParams);
}
GremlinQuery gremlinQuery = new GremlinTranslator(validatedExpression, graphPersistenceStrategy).translate(); //If the final limit is 0, don't launch the query, return with 0 rows
if (validatedExpression instanceof Expressions.LimitExpression
&& ((Integer)((Expressions.LimitExpression) validatedExpression).limit().rawValue()) == 0) {
gremlinQuery = new NoopGremlinQuery(validatedExpression.dataType());
}
else {
gremlinQuery = new GremlinTranslator(validatedExpression, graphPersistenceStrategy).translate();
if (LOG.isDebugEnabled()) {
LOG.debug("Query = {}", validatedExpression);
LOG.debug("Expression Tree = {}", validatedExpression.treeString());
LOG.debug("Gremlin Query = {}", gremlinQuery.queryStr());
}
}
QueryProcessor.compiledQueryCache().put(entry, gremlinQuery);
}
return gremlinQuery;
}
if (LOG.isDebugEnabled()) { private Expressions.Expression parseQuery(String dslQuery, QueryParams queryParams) throws DiscoveryException {
LOG.debug("Query = {}", validatedExpression); Either<Parsers.NoSuccess, Expressions.Expression> either = QueryParser.apply(dslQuery, queryParams);
LOG.debug("Expression Tree = {}", validatedExpression.treeString()); if (either.isRight()) {
LOG.debug("Gremlin Query = {}", gremlinQuery.queryStr()); Expressions.Expression expression = either.right().get();
Expressions.Expression validatedExpression = QueryProcessor.validate(expression);
return validatedExpression;
} else {
throw new DiscoveryException("Invalid expression : " + dslQuery + ". " + either.left());
} }
return new GremlinEvaluator(gremlinQuery, graphPersistenceStrategy, graph).evaluate();
} }
/** /**
...@@ -182,12 +201,12 @@ public class GraphBackedDiscoveryService implements DiscoveryService { ...@@ -182,12 +201,12 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
throw new DiscoveryException(se); throw new DiscoveryException(se);
} }
} }
private List<Map<String, String>> extractResult(final Object o) throws DiscoveryException { private List<Map<String, String>> extractResult(final Object o) throws DiscoveryException {
List<Map<String, String>> result = new ArrayList<>(); List<Map<String, String>> result = new ArrayList<>();
if (o instanceof List) { if (o instanceof List) {
List l = (List) o; List l = (List) o;
for (Object value : l) { for (Object value : l) {
Map<String, String> oRow = new HashMap<>(); Map<String, String> oRow = new HashMap<>();
if (value instanceof Map) { if (value instanceof Map) {
...@@ -205,7 +224,7 @@ public class GraphBackedDiscoveryService implements DiscoveryService { ...@@ -205,7 +224,7 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
oRow.put(key, propertyValue.toString()); oRow.put(key, propertyValue.toString());
} }
} }
} else if (value instanceof String) { } else if (value instanceof String) {
oRow.put("", value.toString()); oRow.put("", value.toString());
} else if(value instanceof AtlasEdge) { } else if(value instanceof AtlasEdge) {
...@@ -220,14 +239,14 @@ public class GraphBackedDiscoveryService implements DiscoveryService { ...@@ -220,14 +239,14 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
} else { } else {
throw new DiscoveryException(String.format("Cannot process result %s", String.valueOf(value))); throw new DiscoveryException(String.format("Cannot process result %s", String.valueOf(value)));
} }
result.add(oRow); result.add(oRow);
} }
} }
else { else {
result.add(new HashMap<String, String>() {{ result.add(new HashMap<String, String>() {{
put("result", o.toString()); put("result", o.toString());
}}); }});
} }
return result; return result;
} }
......
...@@ -43,6 +43,9 @@ public class AtlasRepositoryConfiguration { ...@@ -43,6 +43,9 @@ public class AtlasRepositoryConfiguration {
private static Logger LOG = LoggerFactory.getLogger(AtlasRepositoryConfiguration.class); private static Logger LOG = LoggerFactory.getLogger(AtlasRepositoryConfiguration.class);
public static final int DEFAULT_COMPILED_QUERY_CACHE_EVICTION_WARNING_THROTTLE = 0;
public static final int DEFAULT_COMPILED_QUERY_CACHE_CAPACITY = 1000;
public static final String TYPE_CACHE_IMPLEMENTATION_PROPERTY = "atlas.TypeCache.impl"; public static final String TYPE_CACHE_IMPLEMENTATION_PROPERTY = "atlas.TypeCache.impl";
public static final String AUDIT_EXCLUDED_OPERATIONS = "atlas.audit.excludes"; public static final String AUDIT_EXCLUDED_OPERATIONS = "atlas.audit.excludes";
private static List<String> skippedOperations = null; private static List<String> skippedOperations = null;
...@@ -70,7 +73,7 @@ public class AtlasRepositoryConfiguration { ...@@ -70,7 +73,7 @@ public class AtlasRepositoryConfiguration {
public static Class<? extends EntityAuditRepository> getAuditRepositoryImpl() { public static Class<? extends EntityAuditRepository> getAuditRepositoryImpl() {
try { try {
Configuration config = ApplicationProperties.get(); Configuration config = ApplicationProperties.get();
return ApplicationProperties.getClass(config, return ApplicationProperties.getClass(config,
AUDIT_REPOSITORY_IMPLEMENTATION_PROPERTY, HBaseBasedAuditRepository.class.getName(), EntityAuditRepository.class); AUDIT_REPOSITORY_IMPLEMENTATION_PROPERTY, HBaseBasedAuditRepository.class.getName(), EntityAuditRepository.class);
} catch (AtlasException e) { } catch (AtlasException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
...@@ -83,7 +86,7 @@ public class AtlasRepositoryConfiguration { ...@@ -83,7 +86,7 @@ public class AtlasRepositoryConfiguration {
public static Class<? extends DeleteHandler> getDeleteHandlerImpl() { public static Class<? extends DeleteHandler> getDeleteHandlerImpl() {
try { try {
Configuration config = ApplicationProperties.get(); Configuration config = ApplicationProperties.get();
return ApplicationProperties.getClass(config, return ApplicationProperties.getClass(config,
DELETE_HANDLER_IMPLEMENTATION_PROPERTY, SoftDeleteHandler.class.getName(), DeleteHandler.class); DELETE_HANDLER_IMPLEMENTATION_PROPERTY, SoftDeleteHandler.class.getName(), DeleteHandler.class);
} catch (AtlasException e) { } catch (AtlasException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
...@@ -99,15 +102,50 @@ public class AtlasRepositoryConfiguration { ...@@ -99,15 +102,50 @@ public class AtlasRepositoryConfiguration {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
public static final String COMPILED_QUERY_CACHE_CAPACITY = "atlas.CompiledQueryCache.capacity";
/**
* Get the configuration property that specifies the size of the compiled query
* cache. This is an optional property. A default is used if it is not
* present.
*
* @return the size to be used when creating the compiled query cache.
*/
public static int getCompiledQueryCacheCapacity() {
try {
return ApplicationProperties.get().getInt(COMPILED_QUERY_CACHE_CAPACITY, DEFAULT_COMPILED_QUERY_CACHE_CAPACITY);
} catch (AtlasException e) {
throw new RuntimeException(e);
}
}
public static final String COMPILED_QUERY_CACHE_EVICTION_WARNING_THROTTLE = "atlas.CompiledQueryCache.evictionWarningThrottle";
/**
* Get the configuration property that specifies the number evictions that pass
* before a warning is logged. This is an optional property. A default is
* used if it is not present.
*
* @return the number of evictions before a warning is logged.
*/
public static int getCompiledQueryCacheEvictionWarningThrottle() {
try {
return ApplicationProperties.get().getInt(COMPILED_QUERY_CACHE_EVICTION_WARNING_THROTTLE, DEFAULT_COMPILED_QUERY_CACHE_EVICTION_WARNING_THROTTLE);
} catch (AtlasException e) {
throw new RuntimeException(e);
}
}
private static final String GRAPH_DATABASE_IMPLEMENTATION_PROPERTY = "atlas.graphdb.backend"; private static final String GRAPH_DATABASE_IMPLEMENTATION_PROPERTY = "atlas.graphdb.backend";
private static final String DEFAULT_GRAPH_DATABASE_IMPLEMENTATION_CLASS = "org.apache.atlas.repository.graphdb.titan0.Titan0GraphDatabase"; private static final String DEFAULT_GRAPH_DATABASE_IMPLEMENTATION_CLASS = "org.apache.atlas.repository.graphdb.titan0.Titan0GraphDatabase";
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static Class<? extends GraphDatabase> getGraphDatabaseImpl() { public static Class<? extends GraphDatabase> getGraphDatabaseImpl() {
try { try {
Configuration config = ApplicationProperties.get(); Configuration config = ApplicationProperties.get();
return ApplicationProperties.getClass(config, return ApplicationProperties.getClass(config,
GRAPH_DATABASE_IMPLEMENTATION_PROPERTY, DEFAULT_GRAPH_DATABASE_IMPLEMENTATION_CLASS, GraphDatabase.class); GRAPH_DATABASE_IMPLEMENTATION_PROPERTY, DEFAULT_GRAPH_DATABASE_IMPLEMENTATION_CLASS, GraphDatabase.class);
} catch (AtlasException e) { } catch (AtlasException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.util;
import org.apache.atlas.query.QueryParams;
/**
* Represents a key for an entry in the compiled query cache.
*
*/
public class CompiledQueryCacheKey {
private final String dslQuery;
private final QueryParams queryParams;
public CompiledQueryCacheKey(String dslQuery, QueryParams queryParams) {
super();
this.dslQuery = dslQuery;
this.queryParams = queryParams;
}
public CompiledQueryCacheKey(String dslQuery) {
super();
this.dslQuery = dslQuery;
this.queryParams = null;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((dslQuery == null) ? 0 : dslQuery.hashCode());
result = prime * result + ((queryParams == null) ? 0 : queryParams.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (!(obj instanceof CompiledQueryCacheKey)) {
return false;
}
CompiledQueryCacheKey other = (CompiledQueryCacheKey) obj;
if (! equals(dslQuery, other.dslQuery)) {
return false;
}
if (! equals(queryParams, other.queryParams)) {
return false;
}
return true;
}
private static boolean equals(Object o1, Object o2) {
if(o1 == o2) {
return true;
}
if(o1 == null) {
return o2 == null;
}
return o1.equals(o2);
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.util;
import org.apache.atlas.query.GremlinQuery;
import org.apache.atlas.typesystem.types.IDataType;
/**
* Represents a query that we know will have no results.
*
*/
public class NoopGremlinQuery extends GremlinQuery {
private final IDataType dataType;
public NoopGremlinQuery(IDataType dataType) {
super(null, null, null);
this.dataType = dataType;
}
public IDataType getDataType() {
return dataType;
}
}
\ No newline at end of file
...@@ -21,10 +21,18 @@ package org.apache.atlas.query ...@@ -21,10 +21,18 @@ package org.apache.atlas.query
import org.apache.atlas.repository.graphdb.AtlasGraph import org.apache.atlas.repository.graphdb.AtlasGraph
import org.apache.atlas.query.Expressions._ import org.apache.atlas.query.Expressions._
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
import org.apache.atlas.util.AtlasRepositoryConfiguration
import org.apache.atlas.utils.LruCache
import org.apache.atlas.util.CompiledQueryCacheKey
import java.util.Collections
object QueryProcessor { object QueryProcessor {
val LOG : Logger = LoggerFactory.getLogger("org.apache.atlas.query.QueryProcessor") val LOG : Logger = LoggerFactory.getLogger("org.apache.atlas.query.QueryProcessor")
val compiledQueryCache = Collections.synchronizedMap(new LruCache[CompiledQueryCacheKey, GremlinQuery](
AtlasRepositoryConfiguration.getCompiledQueryCacheCapacity(),
AtlasRepositoryConfiguration.getCompiledQueryCacheEvictionWarningThrottle()));
def evaluate(e: Expression, g: AtlasGraph[_,_], gP : GraphPersistenceStrategies = null): def evaluate(e: Expression, g: AtlasGraph[_,_], gP : GraphPersistenceStrategies = null):
GremlinQueryResult = { GremlinQueryResult = {
...@@ -33,11 +41,28 @@ object QueryProcessor { ...@@ -33,11 +41,28 @@ object QueryProcessor {
strategy = GraphPersistenceStrategy1(g); strategy = GraphPersistenceStrategy1(g);
} }
val e1 = validate(e) //convert the query expression to DSL so we can check whether or not it is in the compiled
val q = new GremlinTranslator(e1, strategy).translate() //query cache and avoid validating/translating it again if it is.
LOG.debug("Query: " + e1) val dsl = e.toString();
LOG.debug("Expression Tree:\n" + e1.treeString) val cacheKey = new CompiledQueryCacheKey(dsl);
LOG.debug("Gremlin Query: " + q.queryStr) var q = compiledQueryCache.get(cacheKey);
if(q == null) {
//query was not found in the compiled query cache. Validate
//and translate it, then cache the result.
val e1 = validate(e)
q = new GremlinTranslator(e1, strategy).translate()
compiledQueryCache.put(cacheKey, q);
if(LOG.isDebugEnabled()) {
LOG.debug("Validated Query: " + e1)
LOG.debug("Expression Tree:\n" + e1.treeString);
}
}
if(LOG.isDebugEnabled()) {
LOG.debug("DSL Query: " + dsl);
LOG.debug("Gremlin Query: " + q.queryStr)
}
new GremlinEvaluator(q, strategy, g).evaluate() new GremlinEvaluator(q, strategy, g).evaluate()
} }
......
package org.apache.atlas.util;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotSame;
import org.apache.atlas.query.QueryParams;
import org.testng.annotations.Test;
/**
* Tests hashcode/equals behavior of CompiledQueryCacheKey
*
*
*/
public class CompiledQueryCacheKeyTest {
@Test
public void testNoQueryParams() {
CompiledQueryCacheKey e1 = new CompiledQueryCacheKey("query 1");
CompiledQueryCacheKey e2 = new CompiledQueryCacheKey("query 1");
CompiledQueryCacheKey e3 = new CompiledQueryCacheKey("query 2");
assertKeysEqual(e1, e2);
assertKeysDifferent(e2, e3);
}
@Test
public void testWithQueryParams() {
CompiledQueryCacheKey e1 = new CompiledQueryCacheKey("query 1", new QueryParams(10,10));
CompiledQueryCacheKey e2 = new CompiledQueryCacheKey("query 1", new QueryParams(10,10));
CompiledQueryCacheKey e3 = new CompiledQueryCacheKey("query 2", new QueryParams(10,10));
assertKeysEqual(e1, e2);
assertKeysDifferent(e2, e3);
}
@Test
public void testOnlyQueryParamsDifferent() {
CompiledQueryCacheKey e1 = new CompiledQueryCacheKey("query 1", new QueryParams(10,10));
CompiledQueryCacheKey e2 = new CompiledQueryCacheKey("query 1", new QueryParams(20,10));
assertKeysDifferent(e1, e2);
}
@Test
public void testOnlyDslDifferent() {
CompiledQueryCacheKey e1 = new CompiledQueryCacheKey("query 1", new QueryParams(10,10));
CompiledQueryCacheKey e2 = new CompiledQueryCacheKey("query 2", new QueryParams(10,10));
assertKeysDifferent(e1, e2);
}
@Test
public void testMixOfQueryParamsAndNone() {
CompiledQueryCacheKey e1 = new CompiledQueryCacheKey("query 1", new QueryParams(10,10));
CompiledQueryCacheKey e2 = new CompiledQueryCacheKey("query 1");
assertKeysDifferent(e1, e2);
}
private void assertKeysEqual(CompiledQueryCacheKey e1, CompiledQueryCacheKey e2) {
assertEquals(e1.hashCode(), e2.hashCode());
assertEquals(e1, e2);
assertEquals(e2, e1);
}
private void assertKeysDifferent(CompiledQueryCacheKey e1, CompiledQueryCacheKey e2) {
assertNotSame(e1.hashCode(), e2.hashCode());
assertNotSame(e1, e2);
assertNotSame(e2, e1);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment