Commit 919120f6 by Suma Shivaprasad

ATLAS-352 Improve write performance on type and entity creation with Hbase(sumasai)

parent 91ad0218
......@@ -89,6 +89,16 @@
......@@ -58,6 +58,7 @@ def main():
p = os.pathsep
metadata_classpath = confdir + p \
+ os.path.join(web_app_dir, "atlas", "WEB-INF", "classes" ) + p \
+ os.path.join(web_app_dir, "atlas", "WEB-INF", "lib", "atlas-titan-${project.version}.jar" ) + p \
+ os.path.join(web_app_dir, "atlas", "WEB-INF", "lib", "*" ) + p \
+ os.path.join(metadata_home, "libext", "*")
if os.path.exists(hbase_conf_dir):
......@@ -48,7 +48,7 @@
......@@ -56,20 +56,16 @@ class TestMetadata(unittest.TestCase):
['-app', 'metadata_home\\server\\webapp\\atlas'],
['-Datlas.log.dir=metadata_home\\logs', '-Datlas.log.file=application.log', '-Datlas.home=metadata_home', '-Datlas.conf=metadata_home\\conf', '-Xmx1024m', '-XX:MaxPermSize=512m', '-Dlog4j.configuration=atlas-log4j.xml'], 'metadata_home\\logs')
['-app', 'metadata_home/server/webapp/atlas'],
['-Datlas.log.dir=metadata_home/logs', '-Datlas.log.file=application.log', '-Datlas.home=metadata_home', '-Datlas.conf=metadata_home/conf', '-Xmx1024m', '-XX:MaxPermSize=512m', '-Dlog4j.configuration=atlas-log4j.xml'], 'metadata_home/logs')
def test_jar_java_lookups_fail(self):
......@@ -409,6 +409,7 @@
......@@ -925,6 +926,12 @@
......@@ -9,6 +9,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags)
ATLAS-352 Improve write performance on type and entity creation with Hbase (sumasai)
ATLAS-350 Document jaas config details for atlas (tbeerbower via shwethags)
ATLAS-344 Document HBase permissions for secure cluster (tbeerbower via shwethags)
ATLAS-335 Kerberized cluster: Atlas fails to come up with hbase as backend (sumasai via shwethags)
......@@ -50,6 +50,11 @@
......@@ -85,52 +90,6 @@
<!-- Commenting out since titan-hbase classes are shaded for 1.x support -->
......@@ -27,6 +27,7 @@ import org.aopalliance.intercept.MethodInterceptor;
import org.apache.atlas.discovery.DiscoveryService;
import org.apache.atlas.discovery.HiveLineageService;
import org.apache.atlas.discovery.LineageService;
import org.apache.atlas.discovery.SearchIndexer;
import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
import org.apache.atlas.listener.TypesChangeListener;
import org.apache.atlas.repository.MetadataRepository;
......@@ -18,9 +18,11 @@
package org.apache.atlas.repository.graph;
import com.thinkaurelius.titan.core.TitanFactory;
import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.TitanIndexQuery;
import com.thinkaurelius.titan.core.util.TitanCleanup;
import com.thinkaurelius.titan.diskstorage.BackendException;
import com.thinkaurelius.titan.diskstorage.configuration.ReadConfiguration;
import com.thinkaurelius.titan.diskstorage.configuration.backend.CommonsConfiguration;
......@@ -30,6 +32,7 @@ import com.tinkerpop.blueprints.GraphQuery;
import com.tinkerpop.blueprints.Predicate;
import com.tinkerpop.blueprints.Vertex;
import org.apache.atlas.GraphTransaction;
import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.TestUtils;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
......@@ -43,6 +46,7 @@ import;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
......@@ -53,6 +57,7 @@ import java.util.Date;
import java.util.Random;
@Guice(modules = RepositoryMetadataModule.class)
public class GraphRepoMapperScaleTest {
private static final String DATABASE_NAME = "foo";
......@@ -61,50 +66,21 @@ public class GraphRepoMapperScaleTest {
private static final String INDEX_DIR =
System.getProperty("", "/tmp") + "/atlas-test" + new Random().nextLong();
private GraphProvider<TitanGraph> graphProvider = new GraphProvider<TitanGraph>() {
private TitanGraph graph = null;
//Ensure separate directory for graph provider to avoid issues with index merging
public TitanGraph get() {
try {
if (graph == null) {
synchronized (GraphRepoMapperScaleTest.class) {
if (graph == null) {
ReadConfiguration config = new CommonsConfiguration() {{
set("storage.backend", "inmemory");
set("", INDEX_DIR);
set("", "elasticsearch");
set("", "true");
set("", "false");
GraphDatabaseConfiguration graphconfig = new GraphDatabaseConfiguration(config);
graph =;
} catch (BackendException e) {
return graph;
GraphProvider<TitanGraph> graphProvider;
private GraphBackedMetadataRepository repositoryService;
private GraphBackedSearchIndexer searchIndexer;
private TypeSystem typeSystem = TypeSystem.getInstance();
private String dbGUID;
public void setUp() throws Exception {
//Make sure we can cleanup the index directory
repositoryService = new GraphBackedMetadataRepository(graphProvider);
searchIndexer = new GraphBackedSearchIndexer(graphProvider);
Collection<IDataType> typesAdded = TestUtils.createHiveTypes(typeSystem);
......@@ -112,11 +88,17 @@ public class GraphRepoMapperScaleTest {
public void tearDown() throws Exception {
try {
//TODO - Fix failure during shutdown while using BDB
} catch (Exception e) {
try {
FileUtils.deleteDirectory(new File(INDEX_DIR));
} catch (IOException ioe) {
System.err.println("Failed to cleanup index directory");
} catch (Exception e) {
......@@ -142,6 +124,10 @@ public class GraphRepoMapperScaleTest {
@Test(dependsOnMethods = "testSubmitEntity")
public void testSearchIndex() throws Exception {
//Elasticsearch requires some time before index is updated
searchWithOutIndex(Constants.GUID_PROPERTY_KEY, dbGUID);
searchWithOutIndex(Constants.ENTITY_TYPE_PROPERTY_KEY, "column_type");
searchWithOutIndex(Constants.ENTITY_TYPE_PROPERTY_KEY, TestUtils.TABLE_TYPE);
<?xml version="1.0" encoding="UTF-8"?>
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ See the License for the specific language governing permissions and
~ limitations under the License.
<project xmlns=""
<description>Apache Atlas Titan Overrides</description>
<name>Apache Atlas Titan</name>
\ No newline at end of file
......@@ -19,7 +19,6 @@ import;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Delete;
public interface HBaseCompat {
......@@ -22,7 +22,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HConnectionManager;
public class HBaseCompat1_0 implements HBaseCompat {
......@@ -18,12 +18,19 @@ import;
import com.thinkaurelius.titan.core.attribute.Duration;
import com.thinkaurelius.titan.diskstorage.*;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*;
import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator;
import com.thinkaurelius.titan.diskstorage.locking.PermanentLockingException;
import com.thinkaurelius.titan.diskstorage.util.KeyColumn;
import com.thinkaurelius.titan.diskstorage.util.RecordIterator;
import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry;
import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntryList;
import com.thinkaurelius.titan.diskstorage.util.time.Timepoint;
import com.thinkaurelius.titan.diskstorage.util.time.Timestamps;
import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
import com.thinkaurelius.titan.util.system.IOUtils;
import org.apache.hadoop.hbase.client.*;
......@@ -31,6 +38,7 @@ import org.apache.hadoop.hbase.filter.ColumnPaginationFilter;
import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -39,6 +47,10 @@ import javax.annotation.Nullable;
import java.util.*;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static com.thinkaurelius.titan.diskstorage.EntryMetaData.*;
* Here are some areas that might need work:
......@@ -71,7 +83,11 @@ public class HBaseKeyColumnValueStore implements KeyColumnValueStore {
private final ConnectionMask cnx;
HBaseKeyColumnValueStore(HBaseStoreManager storeManager, ConnectionMask cnx, String tableName, String columnFamily, String storeName) {
private LocalLockMediator<StoreTransaction> localLockMediator;
private Duration lockExpiryTime;
HBaseKeyColumnValueStore(HBaseStoreManager storeManager, ConnectionMask cnx, String tableName, String columnFamily, String storeName, LocalLockMediator<StoreTransaction> llm) {
this.storeManager = storeManager;
this.cnx = cnx;
this.tableName = tableName;
......@@ -79,6 +95,8 @@ public class HBaseKeyColumnValueStore implements KeyColumnValueStore {
this.storeName = storeName;
this.columnFamilyBytes = columnFamily.getBytes();
this.entryGetter = new HBaseGetter(storeManager.getMetaDataSchema(storeName));
this.localLockMediator = llm;
this.lockExpiryTime = storeManager.getStorageConfig().get(GraphDatabaseConfiguration.LOCK_EXPIRE);
......@@ -107,7 +125,15 @@ public class HBaseKeyColumnValueStore implements KeyColumnValueStore {
StaticBuffer column,
StaticBuffer expectedValue,
StoreTransaction txh) throws BackendException {
throw new UnsupportedOperationException();
KeyColumn lockID = new KeyColumn(key, column);
logger.debug("Attempting to acquireLock on {} ", lockID);
final Timepoint lockStartTime = Timestamps.NANO.getTime(System.nanoTime(), TimeUnit.NANOSECONDS);
boolean locked = localLockMediator.lock(lockID, txh, lockStartTime.add(lockExpiryTime));
if (!locked) {
throw new PermanentLockingException("Could not lock the keyColumn " + lockID + " on CF {} " + Bytes.toString(columnFamilyBytes));
((HBaseTransaction) txh).updateLocks(lockID, expectedValue);
......@@ -167,7 +193,9 @@ public class HBaseKeyColumnValueStore implements KeyColumnValueStore {
try {
table = cnx.getTable(tableName);
logger.debug("Get requests {} {} ", Bytes.toString(columnFamilyBytes), requests.size());
results = table.get(requests);
logger.debug("Get requests finished {} {} ", Bytes.toString(columnFamilyBytes), requests.size());
} finally {
......@@ -231,6 +259,7 @@ public class HBaseKeyColumnValueStore implements KeyColumnValueStore {
TableMask table = null;
logger.debug("Scan for row keys {} {} ", Bytes.toString(startKey), Bytes.toString(endKey));
try {
table = cnx.getTable(tableName);
return new RowIterator(table, table.getScanner(scan.setFilter(filters)), columnFamilyBytes);
......@@ -14,14 +14,6 @@
package com.thinkaurelius.titan.diskstorage.hbase;
import static com.thinkaurelius.titan.diskstorage.Backend.EDGESTORE_NAME;
import static com.thinkaurelius.titan.diskstorage.Backend.ID_STORE_NAME;
import static com.thinkaurelius.titan.diskstorage.Backend.INDEXSTORE_NAME;
import static com.thinkaurelius.titan.diskstorage.Backend.LOCK_STORE_SUFFIX;
import static com.thinkaurelius.titan.diskstorage.Backend.SYSTEM_MGMT_LOG_NAME;
import static com.thinkaurelius.titan.diskstorage.Backend.SYSTEM_TX_LOG_NAME;
import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME;
import java.nio.ByteBuffer;
import java.util.ArrayList;
......@@ -34,8 +26,11 @@ import java.util.NavigableMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import com.thinkaurelius.titan.diskstorage.Backend;
import com.thinkaurelius.titan.diskstorage.configuration.ConfigElement;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.CustomizeStoreKCVSManager;
import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator;
import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediators;
import com.thinkaurelius.titan.diskstorage.util.time.Timestamps;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
......@@ -51,6 +46,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.slf4j.Logger;
......@@ -236,15 +232,15 @@ public class HBaseStoreManager extends DistributedStoreManager implements KeyCol
private static final BiMap<String, String> SHORT_CF_NAME_MAP =
ImmutableBiMap.<String, String>builder()
.put(ID_STORE_NAME, "i")
.put(Backend.INDEXSTORE_NAME, "g")
.put(Backend.INDEXSTORE_NAME + Backend.LOCK_STORE_SUFFIX, "h")
.put(Backend.ID_STORE_NAME, "i")
.put(Backend.EDGESTORE_NAME, "e")
.put(Backend.EDGESTORE_NAME + Backend.LOCK_STORE_SUFFIX, "f")
.put(GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME, "s")
.put(GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME + Backend.LOCK_STORE_SUFFIX, "t")
.put(Backend.SYSTEM_MGMT_LOG_NAME, "m")
.put(Backend.SYSTEM_TX_LOG_NAME, "l")
private static final StaticBuffer FOUR_ZERO_BYTES = BufferUtil.zeroBuffer(4);
......@@ -275,6 +271,8 @@ public class HBaseStoreManager extends DistributedStoreManager implements KeyCol
// Mutable instance state
private final ConcurrentMap<String, HBaseKeyColumnValueStore> openStores;
private LocalLockMediator<StoreTransaction> llm;
public HBaseStoreManager(com.thinkaurelius.titan.diskstorage.configuration.Configuration config) throws BackendException {
super(config, PORT_DEFAULT);
......@@ -390,6 +388,7 @@ public class HBaseStoreManager extends DistributedStoreManager implements KeyCol
try {
......@@ -403,6 +402,7 @@ public class HBaseStoreManager extends DistributedStoreManager implements KeyCol
public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException {
logger.debug("Enter mutateMany");
final MaskedTimestamp commitTime = new MaskedTimestamp(txh);
// In case of an addition and deletion with identical timestamps, the
// deletion tombstone wins.
......@@ -429,7 +429,9 @@ public class HBaseStoreManager extends DistributedStoreManager implements KeyCol
try {
table = cnx.getTable(tableName);
logger.debug("mutateMany : batch mutate started size {} ", batch.size());
table.batch(batch, new Object[batch.size()]);
logger.debug("mutateMany : batch mutate finished {} ", batch.size());
} finally {
......@@ -456,7 +458,9 @@ public class HBaseStoreManager extends DistributedStoreManager implements KeyCol
if (store == null) {
final String cfName = shortCfNames ? shortenCfName(longName) : longName;
HBaseKeyColumnValueStore newStore = new HBaseKeyColumnValueStore(this, cnx, tableName, cfName, longName);
final String llmPrefix = getName();
llm = LocalLockMediators.INSTANCE.<StoreTransaction>get(llmPrefix, times);
HBaseKeyColumnValueStore newStore = new HBaseKeyColumnValueStore(this, cnx, tableName, cfName, longName, llm);
store = openStores.putIfAbsent(longName, newStore); // nothing bad happens if we loose to other thread
......@@ -475,7 +479,7 @@ public class HBaseStoreManager extends DistributedStoreManager implements KeyCol
public StoreTransaction beginTransaction(final BaseTransactionConfig config) throws BackendException {
return new HBaseTransaction(config);
return new HBaseTransaction(config, llm);
......@@ -804,12 +808,7 @@ public class HBaseStoreManager extends DistributedStoreManager implements KeyCol
adm.addColumn(tableName, cdesc);
try {
logger.debug("Added HBase ColumnFamily {}, waiting for 1 sec. to propogate.", columnFamily);
} catch (InterruptedException ie) {
throw new TemporaryBackendException(ie);
} catch (TableNotFoundException ee) {
......@@ -832,6 +831,8 @@ public class HBaseStoreManager extends DistributedStoreManager implements KeyCol
if (ttlInSeconds > 0)
......@@ -14,8 +14,18 @@
package com.thinkaurelius.titan.diskstorage.hbase;
import com.thinkaurelius.titan.diskstorage.BackendException;
import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig;
import com.thinkaurelius.titan.diskstorage.StaticBuffer;
import com.thinkaurelius.titan.diskstorage.common.AbstractStoreTransaction;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator;
import com.thinkaurelius.titan.diskstorage.util.KeyColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedHashSet;
import java.util.Set;
* This class overrides and adds nothing compared with
......@@ -27,7 +37,39 @@ import com.thinkaurelius.titan.diskstorage.common.AbstractStoreTransaction;
public class HBaseTransaction extends AbstractStoreTransaction {
public HBaseTransaction(final BaseTransactionConfig config) {
private static final Logger log = LoggerFactory.getLogger(HBaseTransaction.class);
LocalLockMediator<StoreTransaction> llm;
Set<KeyColumn> keyColumnLocks = new LinkedHashSet<>();
public HBaseTransaction(final BaseTransactionConfig config, LocalLockMediator<StoreTransaction> llm) {
this.llm = llm;
public synchronized void rollback() throws BackendException {
log.debug("Rolled back transaction");
public synchronized void commit() throws BackendException {
log.debug("Committed transaction");
public void updateLocks(KeyColumn lockID, StaticBuffer expectedValue) {
private void deleteAllLocks() {
for(KeyColumn kc : keyColumnLocks) {
log.debug("Removed lock {} ", kc);
llm.unlock(kc, this);
......@@ -18,7 +18,6 @@ import;
import java.util.List;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row;
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package com.thinkaurelius.titan.diskstorage.locking;
import com.thinkaurelius.titan.diskstorage.util.time.Timepoint;
import com.thinkaurelius.titan.diskstorage.util.time.TimestampProvider;
import com.thinkaurelius.titan.diskstorage.locking.consistentkey.ExpectedValueCheckingTransaction;
import com.thinkaurelius.titan.diskstorage.util.KeyColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
* This class resolves lock contention between two transactions on the same JVM.
* <p/>
* This is not just an optimization to reduce network traffic. Locks written by
* Titan to a distributed key-value store contain an identifier, the "Rid",
* which is unique only to the process level. The Rid can't tell which
* transaction in a process holds any given lock. This class prevents two
* transactions in a single process from concurrently writing the same lock to a
* distributed key-value store.
* @author Dan LaRocque <>
public class LocalLockMediator<T> {
private static final Logger log = LoggerFactory
* Namespace for which this mediator is responsible
* @see LocalLockMediatorProvider
private final String name;
private final TimestampProvider times;
private DelayQueue<ExpirableKeyColumn> expiryQueue = new DelayQueue<>();
private ExecutorService lockCleanerService = Executors.newFixedThreadPool(1, new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = Executors.defaultThreadFactory().newThread(runnable);
return thread;
* Maps a ({@code key}, {@code column}) pair to the local transaction
* holding a lock on that pair. Values in this map may have already expired
* according to {@link AuditRecord#expires}, in which case the lock should
* be considered invalid.
private final ConcurrentHashMap<KeyColumn, AuditRecord<T>> locks = new ConcurrentHashMap<KeyColumn, AuditRecord<T>>();
public LocalLockMediator(String name, TimestampProvider times) { = name;
this.times = times;
lockCleanerService.submit(new LockCleaner());
* Acquire the lock specified by {@code kc}.
* <p/>
* <p/>
* For any particular key-column, whatever value of {@code requestor} is
* passed to this method must also be passed to the associated later call to
* {@link #unlock(KeyColumn, ExpectedValueCheckingTransaction)}.
* <p/>
* If some requestor {@code r} calls this method on a KeyColumn {@code k}
* and this method returns true, then subsequent calls to this method by
* {@code r} on {@code l} merely attempt to update the {@code expiresAt}
* timestamp. This differs from typical lock reentrance: multiple successful
* calls to this method do not require an equal number of calls to
* {@code #unlock()}. One {@code #unlock()} call is enough, no matter how
* many times a {@code requestor} called {@code lock} beforehand. Note that
* updating the timestamp may fail, in which case the lock is considered to
* have expired and the calling context should assume it no longer holds the
* lock specified by {@code kc}.
* <p/>
* The number of nanoseconds elapsed since the UNIX Epoch is not readily
* available within the JVM. When reckoning expiration times, this method
* uses the approximation implemented by
* {@link com.thinkaurelius.titan.diskstorage.util.NanoTime#getApproxNSSinceEpoch(false)}.
* <p/>
* The current implementation of this method returns true when given an
* {@code expiresAt} argument in the past. Future implementations may return
* false instead.
* @param kc lock identifier
* @param requestor the object locking {@code kc}
* @param expires instant at which this lock will automatically expire
* @return true if the lock is acquired, false if it was not acquired
public boolean lock(KeyColumn kc, T requestor, Timepoint expires) {
assert null != kc;
assert null != requestor;
AuditRecord<T> audit = new AuditRecord<T>(requestor, expires);
AuditRecord<T> inmap = locks.putIfAbsent(kc, audit);
boolean success = false;
if (null == inmap) {
// Uncontended lock succeeded
if (log.isTraceEnabled()) {
log.trace("New local lock created: {} namespace={} txn={}",
new Object[]{kc, name, requestor});
success = true;
} else if (inmap.equals(audit)) {
// requestor has already locked kc; update expiresAt
success = locks.replace(kc, inmap, audit);
if (log.isTraceEnabled()) {
if (success) {
"Updated local lock expiration: {} namespace={} txn={} oldexp={} newexp={}",
new Object[]{kc, name, requestor, inmap.expires,
} else {
"Failed to update local lock expiration: {} namespace={} txn={} oldexp={} newexp={}",
new Object[]{kc, name, requestor, inmap.expires,
} else if (0 > inmap.expires.compareTo(times.getTime())) {
// the recorded lock has expired; replace it
success = locks.replace(kc, inmap, audit);
if (log.isTraceEnabled()) {
"Discarding expired lock: {} namespace={} txn={} expired={}",
new Object[]{kc, name, inmap.holder, inmap.expires});
} else {
// we lost to a valid lock
if (log.isTraceEnabled()) {
"Local lock failed: {} namespace={} txn={} (already owned by {})",
new Object[]{kc, name, requestor, inmap});
if (success) {
expiryQueue.add(new ExpirableKeyColumn(kc, expires));
return success;
* Release the lock specified by {@code kc} and which was previously
* locked by {@code requestor}, if it is possible to release it.
* @param kc lock identifier
* @param requestor the object which previously locked {@code kc}
public boolean unlock(KeyColumn kc, T requestor) {
if (!locks.containsKey(kc)) {"Local unlock failed: no locks found for {}", kc);
return false;
AuditRecord<T> unlocker = new AuditRecord<T>(requestor, null);
AuditRecord<T> holder = locks.get(kc);
if (!holder.equals(unlocker)) {
log.error("Local unlock of {} by {} failed: it is held by {}",
new Object[]{kc, unlocker, holder});
return false;
boolean removed = locks.remove(kc, unlocker);
if (removed) {
if (log.isTraceEnabled()) {
log.trace("Local unlock succeeded: {} namespace={} txn={}",
new Object[]{kc, name, requestor});
} else {
log.warn("Local unlock warning: lock record for {} disappeared "
+ "during removal; this suggests the lock either expired "
+ "while we were removing it, or that it was erroneously "
+ "unlocked multiple times.", kc);
// Even if !removed, we're finished unlocking, so return true
return true;
public String toString() {
return "LocalLockMediator [" + name + ", ~" + locks.size()
+ " current locks]";
* A record containing the local transaction that holds a lock and the
* lock's expiration time.
private static class AuditRecord<T> {
* The local transaction that holds/held the lock.
private final T holder;
* The expiration time of a the lock.
private final Timepoint expires;
* Cached hashCode.
private int hashCode;
private AuditRecord(T holder, Timepoint expires) {
this.holder = holder;
this.expires = expires;
* This implementation depends only on the lock holder and not on the
* lock expiration time.
public int hashCode() {
if (0 == hashCode)
hashCode = holder.hashCode();
return hashCode;
* This implementation depends only on the lock holder and not on the
* lock expiration time.
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
* This warning suppression is harmless because we are only going to
* call other.holder.equals(...), and since equals(...) is part of
* Object, it is guaranteed to be defined no matter the concrete
* type of parameter T.
AuditRecord other = (AuditRecord) obj;
if (holder == null) {
if (other.holder != null)
return false;
} else if (!holder.equals(other.holder))
return false;
return true;
public String toString() {
return "AuditRecord [txn=" + holder + ", expires=" + expires + "]";
private class LockCleaner implements Runnable {
public void run() {
try {
while (true) {
log.debug("Lock Cleaner service started");
ExpirableKeyColumn lock = expiryQueue.take();
log.debug("Expiring key column " + lock.getKeyColumn());
} catch (InterruptedException e) {
log.debug("Received interrupt. Exiting");
private static class ExpirableKeyColumn implements Delayed {
private Timepoint expiryTime;
private KeyColumn kc;
public ExpirableKeyColumn(KeyColumn keyColumn, Timepoint expiryTime) {
this.kc = keyColumn;
this.expiryTime = expiryTime;
public long getDelay(TimeUnit unit) {
return expiryTime.getTimestamp(TimeUnit.NANOSECONDS);
public int compareTo(Delayed o) {
if (this.expiryTime.getTimestamp(TimeUnit.NANOSECONDS) < ((ExpirableKeyColumn) o).expiryTime.getTimestamp(TimeUnit.NANOSECONDS)) {
return -1;
if (this.expiryTime.getTimestamp(TimeUnit.NANOSECONDS) > ((ExpirableKeyColumn) o).expiryTime.getTimestamp(TimeUnit.NANOSECONDS)) {
return 1;
return 0;
public KeyColumn getKeyColumn() {
return kc;
......@@ -41,6 +41,7 @@ import com.thinkaurelius.titan.diskstorage.indexing.IndexQuery;
import com.thinkaurelius.titan.diskstorage.indexing.KeyInformation;
import com.thinkaurelius.titan.diskstorage.indexing.RawQuery;
import com.thinkaurelius.titan.diskstorage.util.DefaultTransaction;
import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions;
import com.thinkaurelius.titan.graphdb.database.serialize.AttributeUtil;
import com.thinkaurelius.titan.graphdb.database.serialize.attribute.AbstractDecimal;
......@@ -89,8 +90,8 @@ import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.INDEX_MAX_RESULT_SET_SIZE;
import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.INDEX_NS;
import static com.thinkaurelius.titan.core.attribute.Cmp.*;
import static com.thinkaurelius.titan.core.schema.Mapping.*;
* NOTE: Copied from titan for supporting sol5. Do not change
......@@ -115,7 +116,7 @@ public class Solr5Index implements IndexProvider {
public static final ConfigNamespace SOLR_NS =
new ConfigNamespace(INDEX_NS, "solr", "Solr index configuration");
new ConfigNamespace(GraphDatabaseConfiguration.INDEX_NS, "solr", "Solr index configuration");
public static final ConfigOption<String> SOLR_MODE = new ConfigOption<String>(SOLR_NS,"mode",
"The operation mode for Solr which is either via HTTP (`http`) or using SolrCloud (`cloud`)",
......@@ -182,7 +183,7 @@ public class Solr5Index implements IndexProvider {
private static final IndexFeatures SOLR_FEATURES = new IndexFeatures.Builder().supportsDocumentTTL()
.setDefaultStringMapping(Mapping.TEXT).supportedStringMappings(Mapping.TEXT, Mapping.STRING).build();
.setDefaultStringMapping(TEXT).supportedStringMappings(TEXT, STRING).build();
private final SolrClient solrClient;
private final Configuration configuration;
......@@ -200,7 +201,7 @@ public class Solr5Index implements IndexProvider {
mode = Mode.parse(config.get(SOLR_MODE));
dynFields = config.get(DYNAMIC_FIELDS);
keyFieldIds = parseKeyFieldsForCollections(config);
maxResults = config.get(INDEX_MAX_RESULT_SET_SIZE);
maxResults = config.get(GraphDatabaseConfiguration.INDEX_MAX_RESULT_SET_SIZE);
ttlField = config.get(TTL_FIELD);
waitSearcher = config.get(WAIT_SEARCHER);
......@@ -556,10 +557,10 @@ public class Solr5Index implements IndexProvider {
} else if (value instanceof String) {
Mapping map = getStringMapping(informations.get(key));
assert map==Mapping.TEXT || map==Mapping.STRING;
if (map==Mapping.TEXT && !titanPredicate.toString().startsWith("CONTAINS"))
assert map== TEXT || map== STRING;
if (map== TEXT && !titanPredicate.toString().startsWith("CONTAINS"))
throw new IllegalArgumentException("Text mapped string values only support CONTAINS queries and not: " + titanPredicate);
if (map==Mapping.STRING && titanPredicate.toString().startsWith("CONTAINS"))
if (map== STRING && titanPredicate.toString().startsWith("CONTAINS"))
throw new IllegalArgumentException("String mapped string values do not support CONTAINS queries: " + titanPredicate);
//Special case
......@@ -587,9 +588,9 @@ public class Solr5Index implements IndexProvider {
return (key + ":" + escapeValue(value) + "*");
} else if (titanPredicate == Text.REGEX || titanPredicate == Text.CONTAINS_REGEX) {
return (key + ":/" + value + "/");
} else if (titanPredicate == Cmp.EQUAL) {
} else if (titanPredicate == EQUAL) {
return (key + ":\"" + escapeValue(value) + "\"");
} else if (titanPredicate == Cmp.NOT_EQUAL) {
} else if (titanPredicate == NOT_EQUAL) {
return ("-" + key + ":\"" + escapeValue(value) + "\"");
} else {
throw new IllegalArgumentException("Relation is not supported for string value: " + titanPredicate);
......@@ -651,9 +652,9 @@ public class Solr5Index implements IndexProvider {
throw new IllegalArgumentException("Boolean types only support EQUAL or NOT_EQUAL");
} else if (value instanceof UUID) {
if (titanPredicate == Cmp.EQUAL) {
if (titanPredicate == EQUAL) {
return (key + ":\"" + escapeValue(value) + "\"");
} else if (titanPredicate == Cmp.NOT_EQUAL) {
} else if (titanPredicate == NOT_EQUAL) {
return ("-" + key + ":\"" + escapeValue(value) + "\"");
} else {
throw new IllegalArgumentException("Relation is not supported for uuid value: " + titanPredicate);
......@@ -779,8 +780,8 @@ public class Solr5Index implements IndexProvider {
public boolean supports(KeyInformation information, TitanPredicate titanPredicate) {
Class<?> dataType = information.getDataType();
Mapping mapping = Mapping.getMapping(information);
if (mapping!=Mapping.DEFAULT && !AttributeUtil.isString(dataType)) return false;
Mapping mapping = getMapping(information);
if (mapping!= DEFAULT && !AttributeUtil.isString(dataType)) return false;
if (Number.class.isAssignableFrom(dataType)) {
return titanPredicate instanceof Cmp;
......@@ -792,16 +793,16 @@ public class Solr5Index implements IndexProvider {
case TEXT:
return titanPredicate == Text.CONTAINS || titanPredicate == Text.CONTAINS_PREFIX || titanPredicate == Text.CONTAINS_REGEX;
case STRING:
return titanPredicate == Cmp.EQUAL || titanPredicate==Cmp.NOT_EQUAL || titanPredicate==Text.REGEX || titanPredicate==Text.PREFIX;
return titanPredicate == EQUAL || titanPredicate== NOT_EQUAL || titanPredicate==Text.REGEX || titanPredicate==Text.PREFIX;
// return (titanPredicate instanceof Text) || titanPredicate == Cmp.EQUAL || titanPredicate==Cmp.NOT_EQUAL;
} else if (dataType == Date.class) {
if (titanPredicate instanceof Cmp) return true;
} else if (dataType == Boolean.class) {
return titanPredicate == Cmp.EQUAL || titanPredicate == Cmp.NOT_EQUAL;
return titanPredicate == EQUAL || titanPredicate == NOT_EQUAL;
} else if (dataType == UUID.class) {
return titanPredicate == Cmp.EQUAL || titanPredicate==Cmp.NOT_EQUAL;
return titanPredicate == EQUAL || titanPredicate== NOT_EQUAL;
return false;
......@@ -809,11 +810,11 @@ public class Solr5Index implements IndexProvider {
public boolean supports(KeyInformation information) {
Class<?> dataType = information.getDataType();
Mapping mapping = Mapping.getMapping(information);
Mapping mapping = getMapping(information);
if (Number.class.isAssignableFrom(dataType) || dataType == Geoshape.class || dataType == Date.class || dataType == Boolean.class || dataType == UUID.class) {
if (mapping==Mapping.DEFAULT) return true;
if (mapping== DEFAULT) return true;
} else if (AttributeUtil.isString(dataType)) {
if (mapping==Mapping.DEFAULT || mapping==Mapping.TEXT || mapping==Mapping.STRING) return true;
if (mapping== DEFAULT || mapping== TEXT || mapping== STRING) return true;
return false;
......@@ -861,8 +862,8 @@ public class Solr5Index implements IndexProvider {
private static Mapping getStringMapping(KeyInformation information) {
assert AttributeUtil.isString(information.getDataType());
Mapping map = Mapping.getMapping(information);
if (map==Mapping.DEFAULT) map = Mapping.TEXT;
Mapping map = getMapping(information);
if (map== DEFAULT) map = TEXT;
return map;
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package com.thinkaurelius.titan.diskstorage.locking;
import com.thinkaurelius.titan.diskstorage.hbase.HBaseTransaction;
import com.thinkaurelius.titan.diskstorage.util.time.TimestampProvider;
import com.thinkaurelius.titan.diskstorage.util.time.Timestamps;
import com.thinkaurelius.titan.diskstorage.StaticBuffer;
import com.thinkaurelius.titan.diskstorage.util.KeyColumn;
import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.concurrent.TimeUnit;
public class LocalLockMediatorTest {
private static final String LOCK_NAMESPACE = "test";
private static final StaticBuffer LOCK_ROW = StaticArrayBuffer.of(new byte[]{1});
private static final StaticBuffer LOCK_COL = StaticArrayBuffer.of(new byte[]{1});
private static final KeyColumn kc = new KeyColumn(LOCK_ROW, LOCK_COL);
private static final HBaseTransaction mockTx1 = Mockito.mock(HBaseTransaction.class);
private static final HBaseTransaction mockTx2 = Mockito.mock(HBaseTransaction.class);
public void testLock() throws InterruptedException {
TimestampProvider times = Timestamps.MICRO;
LocalLockMediator<HBaseTransaction> llm =
new LocalLockMediator<HBaseTransaction>(LOCK_NAMESPACE, times);
//Expire immediately
Assert.assertTrue(llm.lock(kc, mockTx1, times.getTime(0, TimeUnit.NANOSECONDS)));
Assert.assertTrue(llm.lock(kc, mockTx2, times.getTime(Long.MAX_VALUE, TimeUnit.NANOSECONDS)));
llm = new LocalLockMediator<HBaseTransaction>(LOCK_NAMESPACE, times);
//Expire later
Assert.assertTrue(llm.lock(kc, mockTx1, times.getTime(Long.MAX_VALUE, TimeUnit.NANOSECONDS)));
//So second lock should fail on same keyCol
Assert.assertFalse(llm.lock(kc, mockTx2, times.getTime(Long.MAX_VALUE, TimeUnit.NANOSECONDS)));
Assert.assertTrue(llm.unlock(kc, mockTx1));
//Now locking should succeed
Assert.assertTrue(llm.lock(kc, mockTx2, times.getTime(Long.MAX_VALUE, TimeUnit.NANOSECONDS)));
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment