Commit 7c68048f by Nikhil Bonte

ATLAS-4008: Cache getGuid and getStatus in GraphTransactionInterceptor

parent d4b15c7a
...@@ -23,6 +23,7 @@ import org.aopalliance.intercept.MethodInvocation; ...@@ -23,6 +23,7 @@ import org.aopalliance.intercept.MethodInvocation;
import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.exception.NotFoundException; import org.apache.atlas.exception.NotFoundException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder; import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
...@@ -54,6 +55,30 @@ public class GraphTransactionInterceptor implements MethodInterceptor { ...@@ -54,6 +55,30 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
private final AtlasGraph graph; private final AtlasGraph graph;
private static final ThreadLocal<Map<Object, String>> vertexGuidCache =
new ThreadLocal<Map<Object, String>>() {
@Override
public Map<Object, String> initialValue() {
return new HashMap<Object, String>();
}
};
private static final ThreadLocal<Map<Object, AtlasEntity.Status>> vertexStateCache =
new ThreadLocal<Map<Object, AtlasEntity.Status>>() {
@Override
public Map<Object, AtlasEntity.Status> initialValue() {
return new HashMap<Object, AtlasEntity.Status>();
}
};
private static final ThreadLocal<Map<Object, AtlasEntity.Status>> edgeStateCache =
new ThreadLocal<Map<Object, AtlasEntity.Status>>() {
@Override
public Map<Object, AtlasEntity.Status> initialValue() {
return new HashMap<Object, AtlasEntity.Status>();
}
};
@Inject @Inject
public GraphTransactionInterceptor(AtlasGraph graph) { public GraphTransactionInterceptor(AtlasGraph graph) {
this.graph = graph; this.graph = graph;
...@@ -117,7 +142,7 @@ public class GraphTransactionInterceptor implements MethodInterceptor { ...@@ -117,7 +142,7 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
// Reset the boolean flags // Reset the boolean flags
isTxnOpen.set(Boolean.FALSE); isTxnOpen.set(Boolean.FALSE);
innerFailure.set(Boolean.FALSE); innerFailure.set(Boolean.FALSE);
guidVertexCache.get().clear(); clearCache();
List<PostTransactionHook> trxHooks = postTransactionHooks.get(); List<PostTransactionHook> trxHooks = postTransactionHooks.get();
...@@ -201,6 +226,9 @@ public class GraphTransactionInterceptor implements MethodInterceptor { ...@@ -201,6 +226,9 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
public static void clearCache() { public static void clearCache() {
guidVertexCache.get().clear(); guidVertexCache.get().clear();
vertexGuidCache.get().clear();
vertexStateCache.get().clear();
edgeStateCache.get().clear();
} }
boolean logException(Throwable t) { boolean logException(Throwable t) {
...@@ -214,6 +242,72 @@ public class GraphTransactionInterceptor implements MethodInterceptor { ...@@ -214,6 +242,72 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
} }
} }
public static void addToVertexGuidCache(Object vertexId, String guid) {
if (guid == null) {
removeFromVertexGuidCache(vertexId);
} else {
Map<Object, String> cache = vertexGuidCache.get();
cache.put(vertexId, guid);
}
}
public static void removeFromVertexGuidCache(Object vertexId) {
Map<Object, String> cache = vertexGuidCache.get();
cache.remove(vertexId);
}
public static String getVertexGuidFromCache(Object vertexId) {
Map<Object, String> cache = vertexGuidCache.get();
return cache.get(vertexId);
}
public static void addToVertexStateCache(Object vertexId, AtlasEntity.Status status) {
if (status == null) {
removeFromVertexStateCache(vertexId);
} else {
Map<Object, AtlasEntity.Status> cache = vertexStateCache.get();
cache.put(vertexId, status);
}
}
public static void removeFromVertexStateCache(Object vertexId) {
Map<Object, AtlasEntity.Status> cache = vertexStateCache.get();
cache.remove(vertexId);
}
public static AtlasEntity.Status getVertexStateFromCache(Object vertexId) {
Map<Object, AtlasEntity.Status> cache = vertexStateCache.get();
return cache.get(vertexId);
}
public static void addToEdgeStateCache(Object edgeId, AtlasEntity.Status status) {
if (status == null) {
removeFromEdgeStateCache(edgeId);
} else {
Map<Object, AtlasEntity.Status> cache = edgeStateCache.get();
cache.put(edgeId, status);
}
}
public static void removeFromEdgeStateCache(Object edgeId) {
Map<Object, AtlasEntity.Status> cache = edgeStateCache.get();
cache.remove(edgeId);
}
public static AtlasEntity.Status getEdgeStateFromCache(Object edgeId) {
Map<Object, AtlasEntity.Status> cache = edgeStateCache.get();
return cache.get(edgeId);
}
public static abstract class PostTransactionHook { public static abstract class PostTransactionHook {
protected PostTransactionHook() { protected PostTransactionHook() {
List<PostTransactionHook> trxHooks = postTransactionHooks.get(); List<PostTransactionHook> trxHooks = postTransactionHooks.get();
......
...@@ -24,6 +24,7 @@ import com.google.common.collect.HashBiMap; ...@@ -24,6 +24,7 @@ import com.google.common.collect.HashBiMap;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext; import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
...@@ -815,8 +816,17 @@ public final class GraphHelper { ...@@ -815,8 +816,17 @@ public final class GraphHelper {
return element.getProperty(Constants.RELATIONSHIP_GUID_PROPERTY_KEY, String.class); return element.getProperty(Constants.RELATIONSHIP_GUID_PROPERTY_KEY, String.class);
} }
public static String getGuid(AtlasElement element) { public static String getGuid(AtlasVertex vertex) {
return element.<String>getProperty(Constants.GUID_PROPERTY_KEY, String.class); Object vertexId = vertex.getId();
String ret = GraphTransactionInterceptor.getVertexGuidFromCache(vertexId);
if (ret == null) {
ret = vertex.<String>getProperty(Constants.GUID_PROPERTY_KEY, String.class);
GraphTransactionInterceptor.addToVertexGuidCache(vertexId, ret);
}
return ret;
} }
public static String getHomeId(AtlasElement element) { public static String getHomeId(AtlasElement element) {
...@@ -870,10 +880,33 @@ public final class GraphHelper { ...@@ -870,10 +880,33 @@ public final class GraphHelper {
return element.getProperty(STATE_PROPERTY_KEY, String.class); return element.getProperty(STATE_PROPERTY_KEY, String.class);
} }
public static Status getStatus(AtlasElement element) { public static Status getStatus(AtlasVertex vertex) {
return (getState(element) == Id.EntityState.DELETED) ? Status.DELETED : Status.ACTIVE; Object vertexId = vertex.getId();
Status ret = GraphTransactionInterceptor.getVertexStateFromCache(vertexId);
if (ret == null) {
ret = (getState(vertex) == Id.EntityState.DELETED) ? Status.DELETED : Status.ACTIVE;
GraphTransactionInterceptor.addToVertexStateCache(vertexId, ret);
}
return ret;
}
public static Status getStatus(AtlasEdge edge) {
Object edgeId = edge.getId();
Status ret = GraphTransactionInterceptor.getEdgeStateFromCache(edgeId);
if (ret == null) {
ret = (getState(edge) == Id.EntityState.DELETED) ? Status.DELETED : Status.ACTIVE;
GraphTransactionInterceptor.addToEdgeStateCache(edgeId, ret);
}
return ret;
} }
public static AtlasRelationship.Status getEdgeStatus(AtlasElement element) { public static AtlasRelationship.Status getEdgeStatus(AtlasElement element) {
return (getState(element) == Id.EntityState.DELETED) ? AtlasRelationship.Status.DELETED : AtlasRelationship.Status.ACTIVE; return (getState(element) == Id.EntityState.DELETED) ? AtlasRelationship.Status.DELETED : AtlasRelationship.Status.ACTIVE;
} }
......
...@@ -961,10 +961,13 @@ public abstract class DeleteHandlerV1 { ...@@ -961,10 +961,13 @@ public abstract class DeleteHandlerV1 {
deleteRelationship(edge); deleteRelationship(edge);
} else { } else {
AtlasVertex outVertex = edge.getOutVertex(); AtlasVertex outVertex = edge.getOutVertex();
AtlasVertex inVertex = edge.getInVertex();
AtlasAttribute attribute = getAttributeForEdge(edge.getLabel());
deleteEdgeBetweenVertices(outVertex, inVertex, attribute); if (!isDeletedEntity(outVertex)) {
AtlasVertex inVertex = edge.getInVertex();
AtlasAttribute attribute = getAttributeForEdge(edge.getLabel());
deleteEdgeBetweenVertices(outVertex, inVertex, attribute);
}
} }
} }
} }
...@@ -972,6 +975,19 @@ public abstract class DeleteHandlerV1 { ...@@ -972,6 +975,19 @@ public abstract class DeleteHandlerV1 {
_deleteVertex(instanceVertex, force); _deleteVertex(instanceVertex, force);
} }
private boolean isDeletedEntity(AtlasVertex entityVertex) {
boolean ret = false;
String outGuid = GraphHelper.getGuid(entityVertex);
AtlasEntity.Status outState = GraphHelper.getStatus(entityVertex);
//If the reference vertex is marked for deletion, skip updating the reference
if (outState == AtlasEntity.Status.DELETED || (outGuid != null && RequestContext.get().isDeletedEntity(outGuid))) {
ret = true;
}
return ret;
}
public void deleteClassificationVertex(AtlasVertex classificationVertex, boolean force) { public void deleteClassificationVertex(AtlasVertex classificationVertex, boolean force) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Deleting classification vertex", string(classificationVertex)); LOG.debug("Deleting classification vertex", string(classificationVertex));
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
*/ */
package org.apache.atlas.repository.store.graph.v2; package org.apache.atlas.repository.store.graph.v2;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.TestModules; import org.apache.atlas.TestModules;
import org.apache.atlas.TestUtilsV2; import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
...@@ -645,6 +646,7 @@ public class AtlasComplexAttributesTest extends AtlasEntityTestBase { ...@@ -645,6 +646,7 @@ public class AtlasComplexAttributesTest extends AtlasEntityTestBase {
AtlasEntityHeader entityDeleted = response.getFirstDeletedEntityByTypeName(ENTITY_TYPE_WITH_COMPLEX_COLLECTION_ATTR); AtlasEntityHeader entityDeleted = response.getFirstDeletedEntityByTypeName(ENTITY_TYPE_WITH_COMPLEX_COLLECTION_ATTR);
GraphTransactionInterceptor.clearCache();
AtlasEntityWithExtInfo deletedEntityWithExtInfo = entityStore.getById(entityDeleted.getGuid()); AtlasEntityWithExtInfo deletedEntityWithExtInfo = entityStore.getById(entityDeleted.getGuid());
AtlasVertex deletedEntityVertex = AtlasGraphUtilsV2.findByGuid(entityDeleted.getGuid()); AtlasVertex deletedEntityVertex = AtlasGraphUtilsV2.findByGuid(entityDeleted.getGuid());
Iterator<AtlasEdge> edges = deletedEntityVertex.getEdges(AtlasEdgeDirection.OUT).iterator(); Iterator<AtlasEdge> edges = deletedEntityVertex.getEdges(AtlasEdgeDirection.OUT).iterator();
......
...@@ -19,6 +19,7 @@ package org.apache.atlas.repository.store.graph.v2; ...@@ -19,6 +19,7 @@ package org.apache.atlas.repository.store.graph.v2;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext; import org.apache.atlas.RequestContext;
import org.apache.atlas.TestModules; import org.apache.atlas.TestModules;
import org.apache.atlas.TestUtilsV2; import org.apache.atlas.TestUtilsV2;
...@@ -859,6 +860,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -859,6 +860,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
entityStore.createOrUpdate(new AtlasEntityStream(dbEntity), false); entityStore.createOrUpdate(new AtlasEntityStream(dbEntity), false);
entityStore.createOrUpdate(new AtlasEntityStream(db2Entity), false); entityStore.createOrUpdate(new AtlasEntityStream(db2Entity), false);
GraphTransactionInterceptor.clearCache();
final AtlasEntity tableEntity = TestUtilsV2.createTableEntity(dbEntity); final AtlasEntity tableEntity = TestUtilsV2.createTableEntity(dbEntity);
...@@ -873,6 +875,8 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -873,6 +875,8 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
createdTblEntity.setAttribute("databaseComposite", null); createdTblEntity.setAttribute("databaseComposite", null);
final EntityMutationResponse tblUpdateResponse = entityStore.createOrUpdate(new AtlasEntityStream(createdTblEntity), true); final EntityMutationResponse tblUpdateResponse = entityStore.createOrUpdate(new AtlasEntityStream(createdTblEntity), true);
GraphTransactionInterceptor.clearCache();
final AtlasEntityHeader updatedTblHeader = tblUpdateResponse.getFirstEntityPartialUpdated(); final AtlasEntityHeader updatedTblHeader = tblUpdateResponse.getFirstEntityPartialUpdated();
final AtlasEntity updatedTblEntity = getEntityFromStore(updatedTblHeader); final AtlasEntity updatedTblEntity = getEntityFromStore(updatedTblHeader);
final AtlasEntity deletedDb2Entity = getEntityFromStore(db2Entity.getGuid()); final AtlasEntity deletedDb2Entity = getEntityFromStore(db2Entity.getGuid());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment