Commit 83d05397 by Hemanth Yamijala

ATLAS-503 Lock exceptions occurring due to concurrent updates to backend stores (yhemanth)

parent 59268875
...@@ -119,3 +119,7 @@ atlas.auth.policy.file=${sys:atlas.home}/conf/policy-store.txt ...@@ -119,3 +119,7 @@ atlas.auth.policy.file=${sys:atlas.home}/conf/policy-store.txt
#########authorizer impl class ######### #########authorizer impl class #########
atlas.authorizer.impl=SIMPLE atlas.authorizer.impl=SIMPLE
######### Performance Configs #########
#atlas.graph.storage.lock.retries=10
#atlas.graph.storage.cache.db-cache-time=120000
\ No newline at end of file
...@@ -220,3 +220,18 @@ atlas.client.ha.sleep.interval.ms=5000 ...@@ -220,3 +220,18 @@ atlas.client.ha.sleep.interval.ms=5000
# Set the following property to true, to enable the setup steps to run on each server start. Default = false. # Set the following property to true, to enable the setup steps to run on each server start. Default = false.
atlas.server.run.setup.on.start=false atlas.server.run.setup.on.start=false
</verbatim> </verbatim>
---++ Performance configuration items
The following properties can be used to tune performance of Atlas under specific circumstances:
<verbatim>
# The number of times Atlas code tries to acquire a lock (to ensure consistency) while committing a transaction.
# This should be related to the amount of concurrency expected to be supported by the server. For e.g. with retries set to 10, upto 100 threads can concurrently create types in the Atlas system.
# If this is set to a low value (default is 3), concurrent operations might fail with a PermanentLockingException.
atlas.graph.storage.lock.retries=10
# Milliseconds to wait before evicting a cached entry. This should be > atlas.graph.storage.lock.wait-time x atlas.graph.storage.lock.retries
# If this is set to a low value (default is 10000), warnings on transactions taking too long will occur in the Atlas application log.
atlas.graph.storage.cache.db-cache-time=120000
</verbatim>
\ No newline at end of file
...@@ -22,6 +22,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ...@@ -22,6 +22,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES: ALL CHANGES:
ATLAS-503 Lock exceptions occurring due to concurrent updates to backend stores (yhemanth)
ATLAS-766 Atlas policy file does not honour standard hash as comment format ( saqeeb.s via sumasai ) ATLAS-766 Atlas policy file does not honour standard hash as comment format ( saqeeb.s via sumasai )
ATLAS-843 Atlas UI: Feature to search terms in left navigation. (Kalyanikashikar via sumasai) ATLAS-843 Atlas UI: Feature to search terms in left navigation. (Kalyanikashikar via sumasai)
ATLAS-731 Remove dashboard module in Atlas, replaced by dashboardv2 (kevalbhatt18 via sumasai) ATLAS-731 Remove dashboard module in Atlas, replaced by dashboardv2 (kevalbhatt18 via sumasai)
......
...@@ -20,6 +20,7 @@ import com.google.common.collect.Iterables; ...@@ -20,6 +20,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import com.thinkaurelius.titan.core.attribute.Duration; import com.thinkaurelius.titan.core.attribute.Duration;
import com.thinkaurelius.titan.diskstorage.*; import com.thinkaurelius.titan.diskstorage.*;
import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*; import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*;
import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator; import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator;
import com.thinkaurelius.titan.diskstorage.locking.PermanentLockingException; import com.thinkaurelius.titan.diskstorage.locking.PermanentLockingException;
...@@ -50,8 +51,6 @@ import java.util.*; ...@@ -50,8 +51,6 @@ import java.util.*;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static com.thinkaurelius.titan.diskstorage.EntryMetaData.*;
/** /**
* Here are some areas that might need work: * Here are some areas that might need work:
* <p/> * <p/>
...@@ -85,7 +84,9 @@ public class HBaseKeyColumnValueStore implements KeyColumnValueStore { ...@@ -85,7 +84,9 @@ public class HBaseKeyColumnValueStore implements KeyColumnValueStore {
private LocalLockMediator<StoreTransaction> localLockMediator; private LocalLockMediator<StoreTransaction> localLockMediator;
private Duration lockExpiryTime; private final Duration lockExpiryTimeMs;
private final Duration lockMaxWaitTimeMs;
private final Integer lockMaxRetries;
HBaseKeyColumnValueStore(HBaseStoreManager storeManager, ConnectionMask cnx, String tableName, String columnFamily, String storeName, LocalLockMediator<StoreTransaction> llm) { HBaseKeyColumnValueStore(HBaseStoreManager storeManager, ConnectionMask cnx, String tableName, String columnFamily, String storeName, LocalLockMediator<StoreTransaction> llm) {
this.storeManager = storeManager; this.storeManager = storeManager;
...@@ -96,7 +97,10 @@ public class HBaseKeyColumnValueStore implements KeyColumnValueStore { ...@@ -96,7 +97,10 @@ public class HBaseKeyColumnValueStore implements KeyColumnValueStore {
this.columnFamilyBytes = columnFamily.getBytes(); this.columnFamilyBytes = columnFamily.getBytes();
this.entryGetter = new HBaseGetter(storeManager.getMetaDataSchema(storeName)); this.entryGetter = new HBaseGetter(storeManager.getMetaDataSchema(storeName));
this.localLockMediator = llm; this.localLockMediator = llm;
this.lockExpiryTime = storeManager.getStorageConfig().get(GraphDatabaseConfiguration.LOCK_EXPIRE); Configuration storageConfig = storeManager.getStorageConfig();
this.lockExpiryTimeMs = storageConfig.get(GraphDatabaseConfiguration.LOCK_EXPIRE);
this.lockMaxWaitTimeMs = storageConfig.get(GraphDatabaseConfiguration.LOCK_WAIT);
this.lockMaxRetries = storageConfig.get(GraphDatabaseConfiguration.LOCK_RETRY);
} }
@Override @Override
...@@ -128,14 +132,37 @@ public class HBaseKeyColumnValueStore implements KeyColumnValueStore { ...@@ -128,14 +132,37 @@ public class HBaseKeyColumnValueStore implements KeyColumnValueStore {
KeyColumn lockID = new KeyColumn(key, column); KeyColumn lockID = new KeyColumn(key, column);
logger.debug("Attempting to acquireLock on {} ", lockID); logger.debug("Attempting to acquireLock on {} ", lockID);
final Timepoint lockStartTime = Timestamps.NANO.getTime(System.nanoTime(), TimeUnit.NANOSECONDS); int trialCount = 0;
boolean locked = localLockMediator.lock(lockID, txh, lockStartTime.add(lockExpiryTime)); boolean locked;
while (trialCount < lockMaxRetries) {
final Timepoint lockStartTime = Timestamps.MILLI.getTime(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
locked = localLockMediator.lock(lockID, txh, lockStartTime.add(lockExpiryTimeMs));
trialCount++;
if (!locked) { if (!locked) {
throw new PermanentLockingException("Could not lock the keyColumn " + lockID + " on CF {} " + Bytes.toString(columnFamilyBytes)); handleLockFailure(txh, lockID, trialCount);
} else {
logger.debug("Acquired lock on {}, {}", lockID, txh);
break;
}
} }
((HBaseTransaction) txh).updateLocks(lockID, expectedValue); ((HBaseTransaction) txh).updateLocks(lockID, expectedValue);
} }
void handleLockFailure(StoreTransaction txh, KeyColumn lockID, int trialCount) throws PermanentLockingException {
if (trialCount < lockMaxRetries) {
try {
Thread.sleep(lockMaxWaitTimeMs.getLength(TimeUnit.DAYS.MILLISECONDS));
} catch (InterruptedException e) {
throw new PermanentLockingException(
"Interrupted while waiting for acquiring lock for transaction "
+ txh + " lockID " + lockID + " on retry " + trialCount, e);
}
} else {
throw new PermanentLockingException("Could not lock the keyColumn " +
lockID + " on CF {} " + Bytes.toString(columnFamilyBytes));
}
}
@Override @Override
public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh) throws BackendException { public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh) throws BackendException {
return executeKeySliceQuery(query.getKeyStart().as(StaticBuffer.ARRAY_FACTORY), return executeKeySliceQuery(query.getKeyStart().as(StaticBuffer.ARRAY_FACTORY),
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import com.thinkaurelius.titan.diskstorage.BackendException;
import com.thinkaurelius.titan.diskstorage.EntryMetaData;
import com.thinkaurelius.titan.diskstorage.StaticBuffer;
import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator;
import com.thinkaurelius.titan.diskstorage.locking.PermanentLockingException;
import com.thinkaurelius.titan.diskstorage.util.KeyColumn;
import com.thinkaurelius.titan.diskstorage.util.time.StandardDuration;
import com.thinkaurelius.titan.diskstorage.util.time.Timepoint;
import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import java.util.concurrent.TimeUnit;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.fail;
public class HBaseKeyColumnValueStoreTest {
@Mock
HBaseStoreManager storeManager;
@Mock
ConnectionMask connectionMask;
@Mock
LocalLockMediator localLockMediator;
@Mock
StaticBuffer key;
@Mock
StaticBuffer column;
@Mock
StaticBuffer expectedValue;
@Mock
HBaseTransaction transaction;
@Mock
Configuration storageConfig;
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Test
public void shouldSucceedInLockingIfLockMediatorSucceeds() throws BackendException {
when(storeManager.getMetaDataSchema("hbase")).thenReturn(new EntryMetaData[] {EntryMetaData.TIMESTAMP});
when(storeManager.getStorageConfig()).thenReturn(storageConfig);
when(storageConfig.get(GraphDatabaseConfiguration.LOCK_EXPIRE)).thenReturn(
new StandardDuration(300L, TimeUnit.MILLISECONDS));
when(storageConfig.get(GraphDatabaseConfiguration.LOCK_WAIT)).thenReturn(
new StandardDuration(10L, TimeUnit.MILLISECONDS));
when(storageConfig.get(GraphDatabaseConfiguration.LOCK_RETRY)).thenReturn(3);
KeyColumn lockID = new KeyColumn(key, column);
when(localLockMediator.lock(eq(lockID), eq(transaction), any(Timepoint.class))).
thenReturn(true);
HBaseKeyColumnValueStore hBaseKeyColumnValueStore =
new HBaseKeyColumnValueStore(storeManager, connectionMask, "titan", "e", "hbase", localLockMediator);
hBaseKeyColumnValueStore.acquireLock(key, column, expectedValue, transaction);
verify(transaction).updateLocks(lockID, expectedValue);
verify(localLockMediator, times(1)).lock(eq(lockID), eq(transaction), any(Timepoint.class));
}
@Test
public void shouldRetryRightNumberOfTimesIfLockMediationFails() throws BackendException {
when(storeManager.getMetaDataSchema("hbase")).thenReturn(new EntryMetaData[] {EntryMetaData.TIMESTAMP});
when(storeManager.getStorageConfig()).thenReturn(storageConfig);
when(storageConfig.get(GraphDatabaseConfiguration.LOCK_EXPIRE)).thenReturn(
new StandardDuration(300L, TimeUnit.MILLISECONDS));
when(storageConfig.get(GraphDatabaseConfiguration.LOCK_WAIT)).thenReturn(
new StandardDuration(10L, TimeUnit.MILLISECONDS));
when(storageConfig.get(GraphDatabaseConfiguration.LOCK_RETRY)).thenReturn(3);
KeyColumn lockID = new KeyColumn(key, column);
when(localLockMediator.lock(eq(lockID), eq(transaction), any(Timepoint.class))).
thenReturn(false).thenReturn(false).thenReturn(true);
HBaseKeyColumnValueStore hBaseKeyColumnValueStore =
new HBaseKeyColumnValueStore(storeManager, connectionMask, "titan", "e", "hbase", localLockMediator);
hBaseKeyColumnValueStore.acquireLock(key, column, expectedValue, transaction);
verify(transaction).updateLocks(lockID, expectedValue);
verify(localLockMediator, times(3)).lock(eq(lockID), eq(transaction), any(Timepoint.class));
}
@Test(expectedExceptions = PermanentLockingException.class)
public void shouldThrowExceptionAfterConfiguredRetriesIfLockMediationFails() throws BackendException {
when(storeManager.getMetaDataSchema("hbase")).thenReturn(new EntryMetaData[] {EntryMetaData.TIMESTAMP});
when(storeManager.getStorageConfig()).thenReturn(storageConfig);
when(storageConfig.get(GraphDatabaseConfiguration.LOCK_EXPIRE)).thenReturn(
new StandardDuration(300L, TimeUnit.MILLISECONDS));
when(storageConfig.get(GraphDatabaseConfiguration.LOCK_WAIT)).thenReturn(
new StandardDuration(10L, TimeUnit.MILLISECONDS));
when(storageConfig.get(GraphDatabaseConfiguration.LOCK_RETRY)).thenReturn(3);
KeyColumn lockID = new KeyColumn(key, column);
when(localLockMediator.lock(eq(lockID), eq(transaction), any(Timepoint.class))).
thenReturn(false).thenReturn(false).thenReturn(false);
HBaseKeyColumnValueStore hBaseKeyColumnValueStore =
new HBaseKeyColumnValueStore(storeManager, connectionMask, "titan", "e", "hbase", localLockMediator);
hBaseKeyColumnValueStore.acquireLock(key, column, expectedValue, transaction);
fail("Should fail as lock could not be acquired after 3 retries.");
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment