Commit bcb128af by ashutoshm Committed by Madhan Neethiraj

ATLAS-1889: fix to handle concurrent calls to update tags for an entity

parent e0abdb3c
......@@ -17,6 +17,7 @@
package org.apache.atlas;
import com.google.common.annotations.VisibleForTesting;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.atlas.exception.AtlasBaseException;
......@@ -29,12 +30,18 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
@Component
public class GraphTransactionInterceptor implements MethodInterceptor {
private static final Logger LOG = LoggerFactory.getLogger(GraphTransactionInterceptor.class);
@VisibleForTesting
private static final ObjectUpdateSynchronizer OBJECT_UPDATE_SYNCHRONIZER = new ObjectUpdateSynchronizer();
private static final ThreadLocal<List<PostTransactionHook>> postTransactionHooks = new ThreadLocal<>();
private final AtlasGraph graph;
......@@ -82,9 +89,19 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
}
}
}
OBJECT_UPDATE_SYNCHRONIZER.releaseLockedObjects();
}
}
public static void lockObjectAndReleasePostCommit(final String guid) {
OBJECT_UPDATE_SYNCHRONIZER.lockObject(guid);
}
public static void lockObjectAndReleasePostCommit(final List<String> guids) {
OBJECT_UPDATE_SYNCHRONIZER.lockObject(guids);
}
boolean logException(Throwable t) {
if (t instanceof AtlasBaseException) {
Response.Status httpCode = ((AtlasBaseException) t).getAtlasErrorCode().getHttpCode();
......@@ -110,4 +127,107 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
public abstract void onComplete(boolean isSuccess);
}
private static class RefCountedReentrantLock extends ReentrantLock {
private int refCount;
public RefCountedReentrantLock() {
this.refCount = 0;
}
public int increment() {
return ++refCount;
}
public int decrement() {
return --refCount;
}
public int getRefCount() { return refCount; }
}
public static class ObjectUpdateSynchronizer {
private final Map<String, RefCountedReentrantLock> guidLockMap = new ConcurrentHashMap<>();
private final ThreadLocal<List<String>> lockedGuids = new ThreadLocal<List<String>>() {
@Override
protected List<String> initialValue() {
return new ArrayList<>();
}
};
public void lockObject(final List<String> guids) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> lockObject(): guids: {}", guids);
}
Collections.sort(guids);
for (String g : guids) {
lockObject(g);
}
}
private void lockObject(final String guid) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> lockObject(): guid: {}, guidLockMap.size: {}", guid, guidLockMap.size());
}
ReentrantLock lock = getOrCreateObjectLock(guid);
lock.lock();
lockedGuids.get().add(guid);
if (LOG.isDebugEnabled()) {
LOG.debug("<== lockObject(): guid: {}, guidLockMap.size: {}", guid, guidLockMap.size());
}
}
public void releaseLockedObjects() {
if (LOG.isDebugEnabled()) {
LOG.debug("==> releaseLockedObjects(): lockedGuids.size: {}", lockedGuids.get().size());
}
for (String guid : lockedGuids.get()) {
releaseObjectLock(guid);
}
lockedGuids.get().clear();
if (LOG.isDebugEnabled()) {
LOG.debug("<== releaseLockedObjects(): lockedGuids.size: {}", lockedGuids.get().size());
}
}
private RefCountedReentrantLock getOrCreateObjectLock(String guid) {
synchronized (guidLockMap) {
RefCountedReentrantLock ret = guidLockMap.get(guid);
if (ret == null) {
ret = new RefCountedReentrantLock();
guidLockMap.put(guid, ret);
}
ret.increment();
return ret;
}
}
private RefCountedReentrantLock releaseObjectLock(String guid) {
synchronized (guidLockMap) {
RefCountedReentrantLock lock = guidLockMap.get(guid);
if (lock != null && lock.isHeldByCurrentThread()) {
int refCount = lock.decrement();
if (refCount == 0) {
guidLockMap.remove(guid);
}
lock.unlock();
} else {
LOG.warn("releaseLockedObjects: {} Attempting to release a lock not held by current thread.", guid);
}
return lock;
}
}
}
}
......@@ -21,6 +21,7 @@ package org.apache.atlas.repository.graph;
import com.google.common.base.Preconditions;
import org.apache.atlas.AtlasException;
import org.apache.atlas.CreateUpdateEntitiesResult;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.model.instance.GuidMapping;
......@@ -49,7 +50,16 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* An implementation backed by a Graph database provided
......@@ -303,6 +313,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
LOG.debug("Adding a new trait={} for entities={}", traitInstance.getTypeName(), entityGuids);
}
GraphTransactionInterceptor.lockObjectAndReleasePostCommit(entityGuids);
for (String entityGuid : entityGuids) {
addTraitImpl(entityGuid, traitInstance);
}
......@@ -321,12 +332,12 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
Preconditions.checkNotNull(guid, "guid cannot be null");
Preconditions.checkNotNull(traitInstance, "Trait instance cannot be null");
GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
addTraitImpl(guid, traitInstance);
}
private void addTraitImpl(String guid, ITypedStruct traitInstance) throws RepositoryException {
final String traitName = traitInstance.getTypeName();
if (LOG.isDebugEnabled()) {
LOG.debug("Adding a new trait={} for entity={}", traitName, guid);
}
......@@ -365,9 +376,8 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@Override
@GraphTransaction
public void deleteTrait(String guid, String traitNameToBeDeleted) throws TraitNotFoundException, EntityNotFoundException, RepositoryException {
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting trait={} from entity={}", traitNameToBeDeleted, guid);
}
LOG.debug("Deleting trait={} from entity={}", traitNameToBeDeleted, guid);
GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
AtlasVertex instanceVertex = graphHelper.getVertexForGUID(guid);
......@@ -383,11 +393,11 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
AtlasEdge edge = graphHelper.getEdgeForLabel(instanceVertex, relationshipLabel);
if(edge != null) {
deleteHandler.deleteEdgeReference(edge, DataTypes.TypeCategory.TRAIT, false, true);
// update the traits in entity once trait removal is successful
traitNames.remove(traitNameToBeDeleted);
updateTraits(instanceVertex, traitNames);
}
// update the traits in entity once trait removal is successful
traitNames.remove(traitNameToBeDeleted);
updateTraits(instanceVertex, traitNames);
} catch (Exception e) {
throw new RepositoryException(e);
}
......
......@@ -19,6 +19,7 @@ package org.apache.atlas.repository.store.graph.v1;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException;
......@@ -456,6 +457,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
LOG.debug("Adding classifications={} to entity={}", classifications, guid);
}
GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
for (AtlasClassification classification : classifications) {
validateAndNormalize(classification);
}
......@@ -484,6 +486,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified");
}
GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
List<AtlasClassification> updatedClassifications = new ArrayList<>();
for (AtlasClassification newClassification : newClassifications) {
......@@ -527,6 +530,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
LOG.debug("Adding classification={} to entities={}", classification, guids);
}
GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guids);
validateAndNormalize(classification);
List<AtlasClassification> classifications = Collections.singletonList(classification);
......@@ -557,6 +562,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
LOG.debug("Deleting classifications={} from entity={}", classificationNames, guid);
}
GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
entityGraphMapper.deleteClassifications(guid, classificationNames);
// notify listeners on classification deletion
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.utils;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.RandomStringUtils;
import org.springframework.util.CollectionUtils;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
public class ObjectUpdateSynchronizerTest {
private static final GraphTransactionInterceptor.ObjectUpdateSynchronizer objectUpdateSynchronizer = new GraphTransactionInterceptor.ObjectUpdateSynchronizer();
private final List<Integer> outputList = new ArrayList<>();
private final int MAX_COUNT = 10;
class CounterThread extends Thread {
String ids[];
public CounterThread(String id) {
this.ids = new String[1];
this.ids[0] = id;
}
public void setIds(String... ids) {
this.ids = ids;
}
public void run() {
objectUpdateSynchronizer.lockObject(CollectionUtils.arrayToList(ids));
for (int i = 0; i < MAX_COUNT; i++) {
outputList.add(i);
RandomStringUtils.randomAlphabetic(20);
}
objectUpdateSynchronizer.releaseLockedObjects();
}
}
@BeforeMethod
public void clearOutputList() {
outputList.clear();
}
@Test
public void singleThreadRun() throws InterruptedException {
verifyMultipleThreadRun(1);
}
@Test
public void twoThreadsAccessingDifferntGuids_DoNotSerialize() throws InterruptedException {
CounterThread th[] = getCounterThreads(false, 2);
startCounterThreads(th);
waitForThreadsToEnd(th);
assertArrayNotEquals(populateExpectedArrayOutput(2));
}
@Test
public void twoThreadsAccessingSameGuid_Serialize() throws InterruptedException {
verifyMultipleThreadRun(2);
}
@Test
public void severalThreadsAccessingSameGuid_Serialize() throws InterruptedException {
verifyMultipleThreadRun(10);
}
@Test
public void severalThreadsSequentialAccessingListOfGuids() throws InterruptedException {
CounterThread th[] = getCounterThreads(false, 10);
int i = 0;
th[i++].setIds("1", "2", "3", "4", "5");
th[i++].setIds("1", "2", "3", "4");
th[i++].setIds("1", "2", "3");
th[i++].setIds("1", "2");
th[i++].setIds("1");
th[i++].setIds("1", "2");
th[i++].setIds("1", "2", "3");
th[i++].setIds("1", "2", "3", "4");
th[i++].setIds("1", "2", "3", "4", "5");
th[i++].setIds("1");
startCounterThreads(th);
waitForThreadsToEnd(th);
assertArrayEquals(populateExpectedArrayOutput(th.length));
}
@Test
public void severalThreadsNonSequentialAccessingListOfGuids() throws InterruptedException {
CounterThread th[] = getCounterThreads(false, 5);
int i = 0;
th[i++].setIds("2", "1", "3", "4", "5");
th[i++].setIds("3", "2", "4", "1");
th[i++].setIds("2", "3", "1");
th[i++].setIds("1", "2");
th[i++].setIds("1");
startCounterThreads(th);
waitForThreadsToEnd(th);
assertArrayEquals(populateExpectedArrayOutput(th.length));
}
@Test
public void severalThreadsAccessingOverlappingListOfGuids() throws InterruptedException {
CounterThread th[] = getCounterThreads(false, 5);
int i = 0;
th[i++].setIds("1", "2", "3", "4", "5");
th[i++].setIds("3", "4", "5", "6");
th[i++].setIds("5", "6", "7");
th[i++].setIds("7", "8");
th[i++].setIds("8");
startCounterThreads(th);
waitForThreadsToEnd(th);
assertArrayNotEquals(populateExpectedArrayOutput(th.length));
}
@Test
public void severalThreadsAccessingOverlappingListOfGuids2() throws InterruptedException {
CounterThread th[] = getCounterThreads(false, 3);
int i = 0;
th[i++].setIds("1", "2", "3", "4", "5");
th[i++].setIds("6", "7", "8", "9");
th[i++].setIds("4", "5", "6");
startCounterThreads(th);
waitForThreadsToEnd(th);
assertArrayNotEquals(populateExpectedArrayOutput(th.length));
}
@Test
public void severalThreadsAccessingOverlappingListOfGuidsEnsuringSerialOutput() throws InterruptedException {
CounterThread th[] = getCounterThreads(false, 5);
int i = 0;
th[i++].setIds("1", "2", "3", "4", "7");
th[i++].setIds("3", "4", "5", "7");
th[i++].setIds("5", "6", "7");
th[i++].setIds("7", "8");
th[i++].setIds("7");
startCounterThreads(th);
waitForThreadsToEnd(th);
assertArrayEquals(populateExpectedArrayOutput(th.length));
}
private void verifyMultipleThreadRun(int limit) throws InterruptedException {
CounterThread[] th = getCounterThreads(limit);
startCounterThreads(th);
waitForThreadsToEnd(th);
assertArrayEquals(populateExpectedArrayOutput(limit));
}
private void startCounterThreads(CounterThread[] th) {
for (int i = 0; i < th.length; i++) {
th[i].start();
}
}
private CounterThread[] getCounterThreads(int limit) {
return getCounterThreads(true, limit);
}
private CounterThread[] getCounterThreads(boolean sameId, int limit) {
CounterThread th[] = new CounterThread[limit];
for (Integer i = 0; i < limit; i++) {
th[i] = new CounterThread(sameId ? "1" : i.toString());
}
return th;
}
private void assertArrayEquals(List<Integer> expected) {
assertEquals(outputList.toArray(), expected.toArray());
}
private void assertArrayNotEquals(List<Integer> expected) {
assertFalse(ArrayUtils.isEquals(outputList.toArray(), expected));
}
private void waitForThreadsToEnd(CounterThread... threads) throws InterruptedException {
for (Thread t : threads) {
t.join();
}
}
private List<Integer> populateExpectedArrayOutput(int limit) {
List<Integer> list = new ArrayList<>();
for (int i = 0; i < limit*MAX_COUNT; i+=MAX_COUNT) {
for (int j = 0; j < MAX_COUNT; j++) {
list.add(j);
}
}
return list;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment