Commit ab95c1a7 by Shwetha GS

ATLAS-1119 Add retries for edge label creation (sumasai via shwethags)

parent bed2a70d
......@@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES:
ATLAS-1060 Add composite indexes for exact match performance improvements for all attributes (sumasai via shwethags)
ALL CHANGES:
ATLAS-1119 Add retries for edge label creation (sumasai via shwethags)
ATLAS-1111 Data loss is observed when atlas is restarted while hive_table metadata ingestion into kafka topic is in-progress(shwethags via sumasai)
ATLAS-1115 Show Tag / Taxonomy Listing in sorted order (Kalyanikashikar via sumasai)
ATLAS-1117 Atlas start fails on trunk (jnhagelb via dkantor)
......
......@@ -85,7 +85,7 @@ public abstract class DeleteHandler {
}
// Get GUIDs and vertices for all deletion candidates.
Set<VertexInfo> compositeVertices = GraphHelper.getCompositeVertices(instanceVertex);
Set<VertexInfo> compositeVertices = graphHelper.getCompositeVertices(instanceVertex);
// Record all deletion candidate GUIDs in RequestContext
// and gather deletion candidate vertices.
......@@ -158,7 +158,7 @@ public abstract class DeleteHandler {
DataTypes.TypeCategory elementTypeCategory = elementType.getTypeCategory();
if (elementTypeCategory == DataTypes.TypeCategory.STRUCT ||
elementTypeCategory == DataTypes.TypeCategory.CLASS) {
Iterator<Edge> edges = GraphHelper.getOutGoingEdgesByLabel(instanceVertex, edgeLabel);
Iterator<Edge> edges = graphHelper.getOutGoingEdgesByLabel(instanceVertex, edgeLabel);
if (edges != null) {
while (edges.hasNext()) {
Edge edge = edges.next();
......@@ -228,7 +228,7 @@ public abstract class DeleteHandler {
public void deleteEdgeReference(Vertex outVertex, String edgeLabel, DataTypes.TypeCategory typeCategory,
boolean isComposite) throws AtlasException {
Edge edge = GraphHelper.getEdgeForLabel(outVertex, edgeLabel);
Edge edge = graphHelper.getEdgeForLabel(outVertex, edgeLabel);
if (edge != null) {
deleteEdgeReference(edge, typeCategory, isComposite, false);
}
......@@ -295,7 +295,7 @@ public abstract class DeleteHandler {
case CLASS:
//If its class attribute, its the only edge between two vertices
if (attributeInfo.multiplicity.nullAllowed()) {
edge = GraphHelper.getEdgeForLabel(outVertex, edgeLabel);
edge = graphHelper.getEdgeForLabel(outVertex, edgeLabel);
if (shouldUpdateReverseAttribute) {
GraphHelper.setProperty(outVertex, propertyName, null);
}
......
......@@ -267,7 +267,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
try {
final String entityTypeName = GraphHelper.getTypeName(instanceVertex);
String relationshipLabel = GraphHelper.getTraitLabel(entityTypeName, traitNameToBeDeleted);
Edge edge = GraphHelper.getEdgeForLabel(instanceVertex, relationshipLabel);
Edge edge = graphHelper.getEdgeForLabel(instanceVertex, relationshipLabel);
if(edge != null) {
deleteHandler.deleteEdgeReference(edge, DataTypes.TypeCategory.TRAIT, false, true);
......
......@@ -18,19 +18,23 @@
package org.apache.atlas.repository.graph;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.Stack;
import java.util.UUID;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.TitanProperty;
import com.thinkaurelius.titan.core.TitanVertex;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.GraphQuery;
import com.tinkerpop.blueprints.Vertex;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.RepositoryException;
import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.ITypedInstance;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
......@@ -48,18 +52,15 @@ import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.TitanProperty;
import com.thinkaurelius.titan.core.TitanVertex;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.GraphQuery;
import com.tinkerpop.blueprints.Vertex;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.Stack;
import java.util.UUID;
/**
* Utility class for graph operations.
......@@ -71,18 +72,50 @@ public final class GraphHelper {
private static final TypeSystem typeSystem = TypeSystem.getInstance();
private static final GraphHelper INSTANCE = new GraphHelper(TitanGraphProvider.getGraphInstance());
public static final String RETRY_COUNT = "atlas.graph.storage.num.retries";
public static final String RETRY_DELAY = "atlas.graph.storage.retry.sleeptime.ms";
private static volatile GraphHelper INSTANCE;
private TitanGraph titanGraph;
private static int maxRetries;
public static long retrySleepTimeMillis;
private GraphHelper(TitanGraph titanGraph) {
@VisibleForTesting
GraphHelper(TitanGraph titanGraph) {
this.titanGraph = titanGraph;
try {
maxRetries = ApplicationProperties.get().getInt(RETRY_COUNT, 3);
retrySleepTimeMillis = ApplicationProperties.get().getLong(RETRY_DELAY, 1000);
} catch (AtlasException e) {
LOG.error("Could not load configuration. Setting to default value for " + RETRY_COUNT, e);
}
}
public static GraphHelper getInstance() {
if ( INSTANCE == null) {
synchronized (GraphHelper.class) {
if (INSTANCE == null) {
INSTANCE = new GraphHelper(TitanGraphProvider.getGraphInstance());
}
}
}
return INSTANCE;
}
@VisibleForTesting
static GraphHelper getInstance(TitanGraph graph) {
if ( INSTANCE == null) {
synchronized (GraphHelper.class) {
if (INSTANCE == null) {
INSTANCE = new GraphHelper(graph);
}
}
}
return INSTANCE;
}
public Vertex createVertexWithIdentity(ITypedReferenceableInstance typedInstance, Set<String> superTypeNames) {
final String guid = UUID.randomUUID().toString();
......@@ -135,19 +168,41 @@ public final class GraphHelper {
return edge;
}
public Edge getOrCreateEdge(Vertex outVertex, Vertex inVertex, String edgeLabel) {
Iterator<Edge> edges = GraphHelper.getAdjacentEdgesByLabel(inVertex, Direction.IN, edgeLabel);
public Edge getOrCreateEdge(Vertex outVertex, Vertex inVertex, String edgeLabel) throws RepositoryException {
for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
try {
LOG.debug("Running edge creation attempt {}", numRetries);
Iterator<Edge> edges = getAdjacentEdgesByLabel(inVertex, Direction.IN, edgeLabel);
while (edges.hasNext()) {
Edge edge = edges.next();
if (edge.getVertex(Direction.OUT).getId().toString().equals(outVertex.getId().toString())) {
Id.EntityState edgeState = getState(edge);
if (edgeState == null || edgeState == Id.EntityState.ACTIVE) {
return edge;
}
}
}
return addEdge(outVertex, inVertex, edgeLabel);
} catch (Exception e) {
LOG.warn(String.format("Exception while trying to create edge from %s to %s with label %s. Retrying",
vertexString(outVertex), vertexString(inVertex), edgeLabel), e);
if (numRetries == (maxRetries - 1)) {
LOG.error("Max retries exceeded for edge creation {} {} {} ", outVertex, inVertex, edgeLabel, e);
throw new RepositoryException("Edge creation failed after retries", e);
}
while (edges.hasNext()) {
Edge edge = edges.next();
if (edge.getVertex(Direction.OUT).getId().toString().equals(outVertex.getId().toString())) {
Id.EntityState edgeState = getState(edge);
if (edgeState == null || edgeState == Id.EntityState.ACTIVE) {
return edge;
try {
LOG.info("Retrying with delay of {} ms ", retrySleepTimeMillis);
Thread.sleep(retrySleepTimeMillis);
} catch(InterruptedException ie) {
LOG.warn("Retry interrupted during edge creation ");
throw new RepositoryException("Retry interrupted during edge creation", ie);
}
}
}
return addEdge(outVertex, inVertex, edgeLabel);
return null;
}
......@@ -202,7 +257,7 @@ public final class GraphHelper {
//In some cases of parallel APIs, the edge is added, but get edge by label doesn't return the edge. ATLAS-1104
//So traversing all the edges
public static Iterator<Edge> getAdjacentEdgesByLabel(Vertex instanceVertex, Direction direction, final String edgeLabel) {
public Iterator<Edge> getAdjacentEdgesByLabel(Vertex instanceVertex, Direction direction, final String edgeLabel) {
LOG.debug("Finding edges for {} with label {}", string(instanceVertex), edgeLabel);
if(instanceVertex != null && edgeLabel != null) {
final Iterator<Edge> iterator = instanceVertex.getEdges(direction).iterator();
......@@ -239,7 +294,7 @@ public final class GraphHelper {
return null;
}
public static Iterator<Edge> getOutGoingEdgesByLabel(Vertex instanceVertex, String edgeLabel) {
public Iterator<Edge> getOutGoingEdgesByLabel(Vertex instanceVertex, String edgeLabel) {
return getAdjacentEdgesByLabel(instanceVertex, Direction.OUT, edgeLabel);
}
......@@ -250,8 +305,8 @@ public final class GraphHelper {
* @param edgeLabel
* @return
*/
public static Edge getEdgeForLabel(Vertex vertex, String edgeLabel) {
Iterator<Edge> iterator = GraphHelper.getAdjacentEdgesByLabel(vertex, Direction.OUT, edgeLabel);
public Edge getEdgeForLabel(Vertex vertex, String edgeLabel) {
Iterator<Edge> iterator = getAdjacentEdgesByLabel(vertex, Direction.OUT, edgeLabel);
Edge latestDeletedEdge = null;
long latestDeletedEdgeTime = Long.MIN_VALUE;
......@@ -269,7 +324,6 @@ public final class GraphHelper {
}
}
}
//If the vertex is deleted, return latest deleted edge
LOG.debug("Found {}", latestDeletedEdge == null ? "null" : string(latestDeletedEdge));
return latestDeletedEdge;
......@@ -513,7 +567,7 @@ public final class GraphHelper {
* @return set of VertexInfo for all composite entities
* @throws AtlasException
*/
public static Set<VertexInfo> getCompositeVertices(Vertex entityVertex) throws AtlasException {
public Set<VertexInfo> getCompositeVertices(Vertex entityVertex) throws AtlasException {
Set<VertexInfo> result = new HashSet<>();
Stack<Vertex> vertices = new Stack<>();
vertices.push(entityVertex);
......@@ -535,7 +589,7 @@ public final class GraphHelper {
String edgeLabel = GraphHelper.getEdgeLabel(classType, attributeInfo);
switch (attributeInfo.dataType().getTypeCategory()) {
case CLASS:
Edge edge = GraphHelper.getEdgeForLabel(vertex, edgeLabel);
Edge edge = getEdgeForLabel(vertex, edgeLabel);
if (edge != null && GraphHelper.getState(edge) == Id.EntityState.ACTIVE) {
Vertex compositeVertex = edge.getVertex(Direction.IN);
vertices.push(compositeVertex);
......@@ -547,7 +601,7 @@ public final class GraphHelper {
if (elementTypeCategory != TypeCategory.CLASS) {
continue;
}
Iterator<Edge> edges = GraphHelper.getOutGoingEdgesByLabel(vertex, edgeLabel);
Iterator<Edge> edges = getOutGoingEdgesByLabel(vertex, edgeLabel);
if (edges != null) {
while (edges.hasNext()) {
edge = edges.next();
......@@ -569,7 +623,7 @@ public final class GraphHelper {
if (keys != null) {
for (String key : keys) {
String mapEdgeLabel = GraphHelper.getQualifiedNameForMapKey(edgeLabel, key);
edge = GraphHelper.getEdgeForLabel(vertex, mapEdgeLabel);
edge = getEdgeForLabel(vertex, mapEdgeLabel);
if (edge != null && GraphHelper.getState(edge) == Id.EntityState.ACTIVE) {
Vertex compositeVertex = edge.getVertex(Direction.IN);
vertices.push(compositeVertex);
......
......@@ -163,7 +163,7 @@ public final class GraphToTypedInstanceMapper {
Edge edge;
if (edgeId == null) {
edge = GraphHelper.getEdgeForLabel(instanceVertex, relationshipLabel);
edge = graphHelper.getEdgeForLabel(instanceVertex, relationshipLabel);
} else {
edge = graphHelper.getEdgeByEdgeId(instanceVertex, relationshipLabel, edgeId);
}
......@@ -269,7 +269,7 @@ public final class GraphToTypedInstanceMapper {
Edge edge;
if (edgeId == null) {
edge = GraphHelper.getEdgeForLabel(instanceVertex, relationshipLabel);
edge = graphHelper.getEdgeForLabel(instanceVertex, relationshipLabel);
} else {
edge = graphHelper.getEdgeByEdgeId(instanceVertex, relationshipLabel, edgeId);
}
......@@ -296,7 +296,7 @@ public final class GraphToTypedInstanceMapper {
TraitType traitType, ITypedStruct traitInstance) throws AtlasException {
String relationshipLabel = GraphHelper.getTraitLabel(typedInstanceTypeName, traitName);
LOG.debug("Finding edge for {} -> label {} ", instanceVertex, relationshipLabel);
Iterator<Edge> edgeIterator = GraphHelper.getOutGoingEdgesByLabel(instanceVertex, relationshipLabel);
Iterator<Edge> edgeIterator = graphHelper.getOutGoingEdgesByLabel(instanceVertex, relationshipLabel);
while (edgeIterator.hasNext()) {
Edge edge = edgeIterator.next();
final Vertex traitInstanceVertex = edge.getVertex(Direction.IN);
......
......@@ -211,9 +211,9 @@ public final class TypedInstanceToGraphMapper {
case STRUCT:
case CLASS:
String edgeLabel = GraphHelper.getEdgeLabel(typedInstance, attributeInfo);
String edgeLabel = graphHelper.getEdgeLabel(typedInstance, attributeInfo);
Edge currentEdge = GraphHelper.getEdgeForLabel(instanceVertex, edgeLabel);
Edge currentEdge = graphHelper.getEdgeForLabel(instanceVertex, edgeLabel);
String newEdgeId = addOrUpdateReference(instanceVertex, attributeInfo, attributeInfo.dataType(),
attrValue, currentEdge, edgeLabel, operation);
......
......@@ -282,7 +282,7 @@ public class GraphBackedTypeStore implements ITypeStore {
private ImmutableSet<String> getSuperTypes(Vertex vertex) {
Set<String> superTypes = new HashSet<>();
Iterator<Edge> edges = GraphHelper.getOutGoingEdgesByLabel(vertex, SUPERTYPE_EDGE_LABEL);
Iterator<Edge> edges = graphHelper.getOutGoingEdgesByLabel(vertex, SUPERTYPE_EDGE_LABEL);
while (edges.hasNext()) {
Edge edge = edges.next();
superTypes.add((String) edge.getVertex(Direction.IN).getProperty(Constants.TYPENAME_PROPERTY_KEY));
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graph;
import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.TitanVertex;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import org.apache.atlas.repository.RepositoryException;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.Iterator;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
public class GraphHelperMockTest {
private GraphHelper graphHelperInstance;
private TitanGraph graph;
@BeforeClass
public void setup() {
MockitoAnnotations.initMocks(this);
graph = mock(TitanGraph.class);
graphHelperInstance = GraphHelper.getInstance(graph);
}
@Test(expectedExceptions = RepositoryException.class)
public void testGetOrCreateEdgeLabelWithMaxRetries() throws Exception {
final String edgeLabel = "testLabel";
TitanVertex v1 = mock(TitanVertex.class);
TitanVertex v2 = mock(TitanVertex.class);
Iterable noEdgesIterable = new Iterable<Edge>() {
@Override
public Iterator<Edge> iterator() {
return new Iterator<Edge>() {
@Override
public boolean hasNext() {
return false;
}
@Override
public Edge next() {
return null;
}
@Override
public void remove() {
}
};
}
};
when(v2.getEdges(Direction.IN)).thenReturn(noEdgesIterable);
when(v1.getEdges(Direction.OUT)).thenReturn(noEdgesIterable);
when(v1.getId()).thenReturn(new String("1234"));
when(v2.getId()).thenReturn(new String("5678"));
when(graph.addEdge(null, v1, v2, edgeLabel)).thenThrow(new RuntimeException("Unique property constraint violated"));
graphHelperInstance.getOrCreateEdge(v1, v2, edgeLabel);
}
@Test
public void testGetOrCreateEdgeLabelWithRetries() throws Exception {
final String edgeLabel = "testLabel";
TitanVertex v1 = mock(TitanVertex.class);
TitanVertex v2 = mock(TitanVertex.class);
Edge edge = mock(Edge.class);
Iterable noEdgesIterable = new Iterable<Edge>() {
@Override
public Iterator<Edge> iterator() {
return new Iterator<Edge>() {
@Override
public boolean hasNext() {
return false;
}
@Override
public Edge next() {
return null;
}
@Override
public void remove() {
}
};
}
};
when(v2.getEdges(Direction.IN)).thenReturn(noEdgesIterable);
when(v1.getEdges(Direction.OUT)).thenReturn(noEdgesIterable);
when(v1.getId()).thenReturn(new String("v1"));
when(v2.getId()).thenReturn(new String("v2"));
when(edge.getId()).thenReturn(new String("edge"));
when(graph.addEdge(null, v1, v2, edgeLabel))
.thenThrow(new RuntimeException("Unique property constraint violated")).thenReturn(edge);
Edge redge = graphHelperInstance.getOrCreateEdge(v1, v2, edgeLabel);
assertEquals(edge, redge);
}
}
......@@ -117,7 +117,7 @@ public class GraphHelperTest {
}
}
Vertex deptVertex = GraphHelper.getInstance().getVertexForGUID(deptGuid);
Set<VertexInfo> compositeVertices = GraphHelper.getCompositeVertices(deptVertex);
Set<VertexInfo> compositeVertices = GraphHelper.getInstance().getCompositeVertices(deptVertex);
HashMap<String, VertexInfo> verticesByGuid = new HashMap<>();
for (VertexInfo vertexInfo: compositeVertices) {
verticesByGuid.put(vertexInfo.getGuid(), vertexInfo);
......@@ -148,7 +148,7 @@ public class GraphHelperTest {
v1.addEdge("l1", v2);
v1.addEdge("l2", v2);
Iterator<Edge> iterator = GraphHelper.getOutGoingEdgesByLabel(v1, "l1");
Iterator<Edge> iterator = GraphHelper.getInstance().getOutGoingEdgesByLabel(v1, "l1");
assertTrue(iterator.hasNext());
assertTrue(iterator.hasNext());
assertNotNull(iterator.next());
......
......@@ -223,7 +223,7 @@ public class GraphBackedTypeStoreTest {
private int countOutgoingEdges(Vertex typeVertex, String edgeLabel) {
Iterator<Edge> outGoingEdgesByLabel = GraphHelper.getOutGoingEdgesByLabel(typeVertex, edgeLabel);
Iterator<Edge> outGoingEdgesByLabel = GraphHelper.getInstance().getOutGoingEdgesByLabel(typeVertex, edgeLabel);
int edgeCount = 0;
for (Iterator<Edge> iterator = outGoingEdgesByLabel; iterator.hasNext();) {
iterator.next();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment