Commit ebc4502a by Suma Shivaprasad

ATLAS-114 Upgrade Hbase client to 1.1.2(sumasai)

parent 5bc6f6bd
...@@ -248,3 +248,20 @@ WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWIS ...@@ -248,3 +248,20 @@ WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWIS
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE. POSSIBILITY OF SUCH DAMAGE.
-----------------------------------------------------------------------
Titan Apache 2.0 License
-----------------------------------------------------------------------
Copyright 2012-2013 Aurelius LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
# limitations under the License. # limitations under the License.
import getpass import getpass
import os import os
import re
import platform import platform
import subprocess import subprocess
from threading import Thread from threading import Thread
...@@ -31,7 +32,7 @@ CONF = "conf" ...@@ -31,7 +32,7 @@ CONF = "conf"
LOG="logs" LOG="logs"
WEBAPP="server" + os.sep + "webapp" WEBAPP="server" + os.sep + "webapp"
DATA="data" DATA="data"
ENV_KEYS = ["JAVA_HOME", "METADATA_OPTS", "METADATA_LOG_DIR", "METADATA_PID_DIR", "METADATA_CONF", "METADATACPPATH", "METADATA_DATA_DIR", "METADATA_HOME_DIR", "METADATA_EXPANDED_WEBAPP_DIR"] ENV_KEYS = ["JAVA_HOME", "METADATA_OPTS", "METADATA_LOG_DIR", "METADATA_PID_DIR", "METADATA_CONF", "METADATACPPATH", "METADATA_DATA_DIR", "METADATA_HOME_DIR", "METADATA_EXPANDED_WEBAPP_DIR", "HBASE_CONF_DIR"]
METADATA_CONF = "METADATA_CONF" METADATA_CONF = "METADATA_CONF"
METADATA_LOG = "METADATA_LOG_DIR" METADATA_LOG = "METADATA_LOG_DIR"
METADATA_PID = "METADATA_PID_DIR" METADATA_PID = "METADATA_PID_DIR"
...@@ -39,6 +40,7 @@ METADATA_WEBAPP = "METADATA_EXPANDED_WEBAPP_DIR" ...@@ -39,6 +40,7 @@ METADATA_WEBAPP = "METADATA_EXPANDED_WEBAPP_DIR"
METADATA_OPTS = "METADATA_OPTS" METADATA_OPTS = "METADATA_OPTS"
METADATA_DATA = "METADATA_DATA_DIR" METADATA_DATA = "METADATA_DATA_DIR"
METADATA_HOME = "METADATA_HOME_DIR" METADATA_HOME = "METADATA_HOME_DIR"
HBASE_CONF_DIR = "HBASE_CONF_DIR"
IS_WINDOWS = platform.system() == "Windows" IS_WINDOWS = platform.system() == "Windows"
ON_POSIX = 'posix' in sys.builtin_module_names ON_POSIX = 'posix' in sys.builtin_module_names
DEBUG = False DEBUG = False
...@@ -60,6 +62,10 @@ def confDir(dir): ...@@ -60,6 +62,10 @@ def confDir(dir):
localconf = os.path.join(dir, CONF) localconf = os.path.join(dir, CONF)
return os.environ.get(METADATA_CONF, localconf) return os.environ.get(METADATA_CONF, localconf)
def hbaseConfDir(atlasConfDir):
parentDir = os.path.dirname(atlasConfDir)
return os.environ.get(HBASE_CONF_DIR, os.path.join(parentDir, "hbase", CONF))
def logDir(dir): def logDir(dir):
localLog = os.path.join(dir, LOG) localLog = os.path.join(dir, LOG)
return os.environ.get(METADATA_LOG, localLog) return os.environ.get(METADATA_LOG, localLog)
...@@ -320,8 +326,14 @@ def win_exist_pid(pid): ...@@ -320,8 +326,14 @@ def win_exist_pid(pid):
return False return False
def server_already_running(pid): def server_already_running(pid):
print "Atlas server is already running under process %s" % pid print "Atlas server is already running under process %s" % pid
sys.exit() sys.exit()
def server_pid_not_running(pid): def server_pid_not_running(pid):
print "The Server is no longer running with pid %s" %pid print "The Server is no longer running with pid %s" %pid
def grep(file, value):
for line in open(file).readlines():
if re.match(value, line):
return line
return None
...@@ -25,6 +25,8 @@ METADATA_LOG_OPTS="-Datlas.log.dir=%s -Datlas.log.file=application.log" ...@@ -25,6 +25,8 @@ METADATA_LOG_OPTS="-Datlas.log.dir=%s -Datlas.log.file=application.log"
METADATA_COMMAND_OPTS="-Datlas.home=%s" METADATA_COMMAND_OPTS="-Datlas.home=%s"
METADATA_CONFIG_OPTS="-Datlas.conf=%s" METADATA_CONFIG_OPTS="-Datlas.conf=%s"
DEFAULT_JVM_OPTS="-Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=atlas-log4j.xml" DEFAULT_JVM_OPTS="-Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=atlas-log4j.xml"
CONF_FILE="application.properties"
HBASE_STORAGE_CONF_ENTRY="atlas.graph.storage.backend\s*=\s*hbase"
def main(): def main():
...@@ -50,12 +52,21 @@ def main(): ...@@ -50,12 +52,21 @@ def main():
web_app_dir = mc.webAppDir(metadata_home) web_app_dir = mc.webAppDir(metadata_home)
mc.expandWebApp(metadata_home) mc.expandWebApp(metadata_home)
#add hbase-site.xml to classpath
hbase_conf_dir = mc.hbaseConfDir(confdir)
p = os.pathsep p = os.pathsep
metadata_classpath = confdir + p \ metadata_classpath = confdir + p \
+ os.path.join(web_app_dir, "atlas", "WEB-INF", "classes" ) + p \ + os.path.join(web_app_dir, "atlas", "WEB-INF", "classes" ) + p \
+ os.path.join(web_app_dir, "atlas", "WEB-INF", "lib", "*" ) + p \ + os.path.join(web_app_dir, "atlas", "WEB-INF", "lib", "*" ) + p \
+ os.path.join(metadata_home, "libext", "*") + os.path.join(metadata_home, "libext", "*")
if os.path.exists(hbase_conf_dir):
metadata_classpath = metadata_classpath + p \
+ hbase_conf_dir
else:
storage_backend = mc.grep(os.path.join(confdir, CONF_FILE), HBASE_STORAGE_CONF_ENTRY)
if storage_backend != None:
raise Exception("Could not find hbase-site.xml in %s. Please set env var HBASE_CONF_DIR to the hbase client conf dir", hbase_conf_dir)
metadata_pid_file = mc.pidFile(metadata_home) metadata_pid_file = mc.pidFile(metadata_home)
......
...@@ -25,7 +25,9 @@ atlas.graph.storage.directory=${sys:atlas.home}/data/berkley ...@@ -25,7 +25,9 @@ atlas.graph.storage.directory=${sys:atlas.home}/data/berkley
#hbase #hbase
#For standalone mode , specify localhost #For standalone mode , specify localhost
#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2 #for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2
#atlas.graph.storage.hostname=localhost atlas.graph.storage.hostname=localhost
atlas.graph.storage.hbase.regions-per-server=1
atlas.graph.storage.lock.wait-time=10000
#Solr #Solr
#atlas.graph.index.search.backend=solr #atlas.graph.index.search.backend=solr
......
...@@ -33,10 +33,19 @@ atlas.graph.storage.backend=hbase ...@@ -33,10 +33,19 @@ atlas.graph.storage.backend=hbase
atlas.graph.storage.hostname=<ZooKeeper Quorum> atlas.graph.storage.hostname=<ZooKeeper Quorum>
</verbatim> </verbatim>
HBASE_CONF_DIR environment variable needs to be set to point to the Hbase client configuration directory which is added to classpath when Atlas starts up.
hbase-site.xml needs to have the following properties set according to the cluster setup
<verbatim>
#Set below to /hbase-secure if the Hbase server is setup in secure mode
zookeeper.znode.parent=/hbase-unsecure
</verbatim>
Advanced configuration Advanced configuration
# If you are planning to use any of the configs mentioned below, they need to be prefixed with "atlas.graph." to take effect in ATLAS
Refer http://s3.thinkaurelius.com/docs/titan/0.5.4/titan-config-ref.html#_storage_hbase Refer http://s3.thinkaurelius.com/docs/titan/0.5.4/titan-config-ref.html#_storage_hbase
---++++ Graph Search Index ---++++ Graph Search Index
This section sets up the graph db - titan - to use an search indexing system. The example This section sets up the graph db - titan - to use an search indexing system. The example
configuration below setsup to use an embedded Elastic search indexing system. configuration below setsup to use an embedded Elastic search indexing system.
......
...@@ -121,6 +121,16 @@ and change it to look as below ...@@ -121,6 +121,16 @@ and change it to look as below
export METADATA_SERVER_OPTS="-Djava.awt.headless=true -Djava.security.krb5.realm= -Djava.security.krb5.kdc=" export METADATA_SERVER_OPTS="-Djava.awt.headless=true -Djava.security.krb5.realm= -Djava.security.krb5.kdc="
</verbatim> </verbatim>
* Hbase as the Storage Backend for the Graph Repository
By default, Atlas uses Titan as the graph repository and is the only graph repository implementation available currently.
The HBase versions currently supported are 0.98.x, 1.0.x, 1.1.x. For configuring ATLAS graph persistence on HBase, please go through the "Configuration - Graph persistence engine - HBase" section
for more details.
Pre-requisites for running HBase as a distributed cluster
* 3 or 5 ZooKeeper nodes
* Atleast 3 RegionServer nodes. It would be ideal to run the DataNodes on the same hosts as the Region servers for data locality.
* Configuring SOLR as the Indexing Backend for the Graph Repository * Configuring SOLR as the Indexing Backend for the Graph Repository
By default, Atlas uses Titan as the graph repository and is the only graph repository implementation available currently. By default, Atlas uses Titan as the graph repository and is the only graph repository implementation available currently.
...@@ -152,6 +162,13 @@ For configuring Titan to work with Solr, please follow the instructions below ...@@ -152,6 +162,13 @@ For configuring Titan to work with Solr, please follow the instructions below
For more information on Titan solr configuration , please refer http://s3.thinkaurelius.com/docs/titan/0.5.4/solr.htm For more information on Titan solr configuration , please refer http://s3.thinkaurelius.com/docs/titan/0.5.4/solr.htm
Pre-requisites for running Solr in cloud mode
* Memory - Solr is both memory and CPU intensive. Make sure the server running Solr has adequate memory, CPU and disk.
Solr works well with 32GB RAM. Plan to provide as much memory as possible to Solr process
* Disk - If the number of entities that need to be stored are large, plan to have at least 500 GB free space in the volume where Solr is going to store the index data
* SolrCloud has support for replication and sharding. It is highly recommended to use SolrCloud with at least two Solr nodes running on different servers with replication enabled.
If using SolrCloud, then you also need ZooKeeper installed and configured with 3 or 5 ZooKeeper nodes
*Starting Atlas Server* *Starting Atlas Server*
<verbatim> <verbatim>
bin/atlas_start.py [-port <port>] bin/atlas_start.py [-port <port>]
......
...@@ -328,7 +328,7 @@ ...@@ -328,7 +328,7 @@
<tinkerpop.version>2.6.0</tinkerpop.version> <tinkerpop.version>2.6.0</tinkerpop.version>
<titan.version>0.5.4</titan.version> <titan.version>0.5.4</titan.version>
<hadoop.version>2.7.0</hadoop.version> <hadoop.version>2.7.0</hadoop.version>
<hbase.version>0.98.9-hadoop2</hbase.version> <hbase.version>1.1.2</hbase.version>
<solr.version>5.1.0</solr.version> <solr.version>5.1.0</solr.version>
<kafka.version>0.8.2.0</kafka.version> <kafka.version>0.8.2.0</kafka.version>
<!-- scala versions --> <!-- scala versions -->
...@@ -1512,6 +1512,7 @@ ...@@ -1512,6 +1512,7 @@
<exclude>**/build.log</exclude> <exclude>**/build.log</exclude>
<exclude>.bowerrc</exclude> <exclude>.bowerrc</exclude>
<exclude>*.json</exclude> <exclude>*.json</exclude>
<exclude>**/overlays/**</exclude>
</excludes> </excludes>
</configuration> </configuration>
<executions> <executions>
......
...@@ -9,6 +9,7 @@ ATLAS-54 Rename configs in hive hook (shwethags) ...@@ -9,6 +9,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags) ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags)
ALL CHANGES: ALL CHANGES:
ATLAS-114 Upgrade hbase client to 1.1.2 (sumasai)
ATLAS-296 IllegalArgumentException during hive HiveHookIT integration tests (tbeerbower via shwethags) ATLAS-296 IllegalArgumentException during hive HiveHookIT integration tests (tbeerbower via shwethags)
ATLAS-158 Provide Atlas Entity Change Notification (tbeerbower via shwethags) ATLAS-158 Provide Atlas Entity Change Notification (tbeerbower via shwethags)
ATALS-238 atlas_start.py- the Atlas server won’t restart after improper shutdown(ndjouri via sumasai) ATALS-238 atlas_start.py- the Atlas server won’t restart after improper shutdown(ndjouri via sumasai)
......
...@@ -114,10 +114,11 @@ ...@@ -114,10 +114,11 @@
<artifactId>titan-berkeleyje</artifactId> <artifactId>titan-berkeleyje</artifactId>
</dependency> </dependency>
<dependency> <!-- Commenting out since titan-hbase classes are shaded for 1.x support -->
<groupId>com.thinkaurelius.titan</groupId> <!--<dependency>-->
<artifactId>titan-hbase</artifactId> <!--<groupId>com.thinkaurelius.titan</groupId>-->
</dependency> <!--<artifactId>titan-hbase</artifactId>-->
<!--</dependency>-->
<dependency> <dependency>
<groupId>org.apache.hbase</groupId> <groupId>org.apache.hbase</groupId>
......
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
/**
* This interface hides ABI/API breaking changes that HBase has made to its Admin/HBaseAdmin over the course
* of development from 0.94 to 1.0 and beyond.
*/
public interface AdminMask extends Closeable
{
void clearTable(String tableName, long timestamp) throws IOException;
HTableDescriptor getTableDescriptor(String tableName) throws TableNotFoundException, IOException;
boolean tableExists(String tableName) throws IOException;
void createTable(HTableDescriptor desc) throws IOException;
void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException;
/**
* Estimate the number of regionservers in the HBase cluster.
*
* This is usually implemented by calling
* {@link HBaseAdmin#getClusterStatus()} and then
* {@link ClusterStatus#getServers()} and finally {@code size()} on the
* returned server list.
*
* @return the number of servers in the cluster or -1 if it could not be determined
*/
int getEstimatedRegionServerCount();
void disableTable(String tableName) throws IOException;
void enableTable(String tableName) throws IOException;
boolean isTableDisabled(String tableName) throws IOException;
void addColumn(String tableName, HColumnDescriptor columnDescriptor) throws IOException;
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.io.Closeable;
import java.io.IOException;
/**
* This interface hides ABI/API breaking changes that HBase has made to its (H)Connection class over the course
* of development from 0.94 to 1.0 and beyond.
*/
public interface ConnectionMask extends Closeable
{
TableMask getTable(String name) throws IOException;
AdminMask getAdmin() throws IOException;
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.thinkaurelius.titan.util.system.IOUtils;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
public class HBaseAdmin0_98 implements AdminMask
{
private static final Logger log = LoggerFactory.getLogger(HBaseAdmin0_98.class);
private final HBaseAdmin adm;
public HBaseAdmin0_98(HBaseAdmin adm)
{
this.adm = adm;
}
@Override
public void clearTable(String tableName, long timestamp) throws IOException
{
if (!adm.tableExists(tableName)) {
log.debug("clearStorage() called before table {} was created, skipping.", tableName);
return;
}
// Unfortunately, linear scanning and deleting tables is faster in HBase < 1 when running integration tests than
// disabling and deleting tables.
HTable table = null;
try {
table = new HTable(adm.getConfiguration(), tableName);
Scan scan = new Scan();
scan.setBatch(100);
scan.setCacheBlocks(false);
scan.setCaching(2000);
scan.setTimeRange(0, Long.MAX_VALUE);
scan.setMaxVersions(1);
ResultScanner scanner = null;
try {
scanner = table.getScanner(scan);
for (Result res : scanner) {
Delete d = new Delete(res.getRow());
d.setTimestamp(timestamp);
table.delete(d);
}
} finally {
IOUtils.closeQuietly(scanner);
}
} finally {
IOUtils.closeQuietly(table);
}
}
@Override
public HTableDescriptor getTableDescriptor(String tableName) throws TableNotFoundException, IOException
{
return adm.getTableDescriptor(tableName.getBytes());
}
@Override
public boolean tableExists(String tableName) throws IOException
{
return adm.tableExists(tableName);
}
@Override
public void createTable(HTableDescriptor desc) throws IOException
{
adm.createTable(desc);
}
@Override
public void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException
{
adm.createTable(desc, startKey, endKey, numRegions);
}
@Override
public int getEstimatedRegionServerCount()
{
int serverCount = -1;
try {
serverCount = adm.getClusterStatus().getServers().size();
log.debug("Read {} servers from HBase ClusterStatus", serverCount);
} catch (IOException e) {
log.debug("Unable to retrieve HBase cluster status", e);
}
return serverCount;
}
@Override
public void disableTable(String tableName) throws IOException
{
adm.disableTable(tableName);
}
@Override
public void enableTable(String tableName) throws IOException
{
adm.enableTable(tableName);
}
@Override
public boolean isTableDisabled(String tableName) throws IOException
{
return adm.isTableDisabled(tableName);
}
@Override
public void addColumn(String tableName, HColumnDescriptor columnDescriptor) throws IOException
{
adm.addColumn(tableName, columnDescriptor);
}
@Override
public void close() throws IOException
{
adm.close();
}
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.HBaseAdmin;
public class HBaseAdmin1_0 implements AdminMask
{
private static final Logger log = LoggerFactory.getLogger(HBaseAdmin1_0.class);
private final Admin adm;
public HBaseAdmin1_0(HBaseAdmin adm)
{
this.adm = adm;
}
@Override
public void clearTable(String tableString, long timestamp) throws IOException
{
TableName tableName = TableName.valueOf(tableString);
if (!adm.tableExists(tableName)) {
log.debug("Attempted to clear table {} before it exists (noop)", tableString);
return;
}
if (!adm.isTableDisabled(tableName))
adm.disableTable(tableName);
if (!adm.isTableDisabled(tableName))
throw new RuntimeException("Unable to disable table " + tableName);
// This API call appears to both truncate and reenable the table.
log.info("Truncating table {}", tableName);
adm.truncateTable(tableName, true /* preserve splits */);
try {
adm.enableTable(tableName);
} catch (TableNotDisabledException e) {
// This triggers seemingly every time in testing with 1.0.2.
log.debug("Table automatically reenabled by truncation: {}", tableName, e);
}
}
@Override
public HTableDescriptor getTableDescriptor(String tableString) throws TableNotFoundException, IOException
{
return adm.getTableDescriptor(TableName.valueOf(tableString));
}
@Override
public boolean tableExists(String tableString) throws IOException
{
return adm.tableExists(TableName.valueOf(tableString));
}
@Override
public void createTable(HTableDescriptor desc) throws IOException
{
adm.createTable(desc);
}
@Override
public void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException
{
adm.createTable(desc, startKey, endKey, numRegions);
}
@Override
public int getEstimatedRegionServerCount()
{
int serverCount = -1;
try {
serverCount = adm.getClusterStatus().getServers().size();
log.debug("Read {} servers from HBase ClusterStatus", serverCount);
} catch (IOException e) {
log.debug("Unable to retrieve HBase cluster status", e);
}
return serverCount;
}
@Override
public void disableTable(String tableString) throws IOException
{
adm.disableTable(TableName.valueOf(tableString));
}
@Override
public void enableTable(String tableString) throws IOException
{
adm.enableTable(TableName.valueOf(tableString));
}
@Override
public boolean isTableDisabled(String tableString) throws IOException
{
return adm.isTableDisabled(TableName.valueOf(tableString));
}
@Override
public void addColumn(String tableString, HColumnDescriptor columnDescriptor) throws IOException
{
adm.addColumn(TableName.valueOf(tableString), columnDescriptor);
}
@Override
public void close() throws IOException
{
adm.close();
}
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Delete;
public interface HBaseCompat {
/**
* Configure the compression scheme {@code algo} on a column family
* descriptor {@code cd}. The {@code algo} parameter is a string value
* corresponding to one of the values of HBase's Compression enum. The
* Compression enum has moved between packages as HBase has evolved, which
* is why this method has a String argument in the signature instead of the
* enum itself.
*
* @param cd
* column family to configure
* @param algo
* compression type to use
*/
public void setCompression(HColumnDescriptor cd, String algo);
/**
* Create and return a HTableDescriptor instance with the given name. The
* constructors on this method have remained stable over HBase development
* so far, but the old HTableDescriptor(String) constructor & byte[] friends
* are now marked deprecated and may eventually be removed in favor of the
* HTableDescriptor(TableName) constructor. That constructor (and the
* TableName type) only exists in newer HBase versions. Hence this method.
*
* @param tableName
* HBase table name
* @return a new table descriptor instance
*/
public HTableDescriptor newTableDescriptor(String tableName);
ConnectionMask createConnection(Configuration conf) throws IOException;
void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc);
void setTimestamp(Delete d, long timestamp);
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.io.compress.Compression;
public class HBaseCompat0_98 implements HBaseCompat {
@Override
public void setCompression(HColumnDescriptor cd, String algo) {
cd.setCompressionType(Compression.Algorithm.valueOf(algo));
}
@Override
public HTableDescriptor newTableDescriptor(String tableName) {
TableName tn = TableName.valueOf(tableName);
return new HTableDescriptor(tn);
}
@Override
public ConnectionMask createConnection(Configuration conf) throws IOException
{
return new HConnection0_98(HConnectionManager.createConnection(conf));
}
@Override
public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc)
{
tdesc.addFamily(cdesc);
}
@Override
public void setTimestamp(Delete d, long timestamp)
{
d.setTimestamp(timestamp);
}
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.io.compress.Compression;
public class HBaseCompat1_0 implements HBaseCompat {
@Override
public void setCompression(HColumnDescriptor cd, String algo) {
cd.setCompressionType(Compression.Algorithm.valueOf(algo));
}
@Override
public HTableDescriptor newTableDescriptor(String tableName) {
TableName tn = TableName.valueOf(tableName);
return new HTableDescriptor(tn);
}
@Override
public ConnectionMask createConnection(Configuration conf) throws IOException
{
return new HConnection1_0(ConnectionFactory.createConnection(conf));
}
@Override
public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc)
{
tdesc.addFamily(cdesc);
}
@Override
public void setTimestamp(Delete d, long timestamp)
{
d.setTimestamp(timestamp);
}
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.io.compress.Compression;
import java.io.IOException;
public class HBaseCompat1_1 implements HBaseCompat {
@Override
public void setCompression(HColumnDescriptor cd, String algo) {
cd.setCompressionType(Compression.Algorithm.valueOf(algo));
}
@Override
public HTableDescriptor newTableDescriptor(String tableName) {
TableName tn = TableName.valueOf(tableName);
return new HTableDescriptor(tn);
}
@Override
public ConnectionMask createConnection(Configuration conf) throws IOException
{
return new HConnection1_0(ConnectionFactory.createConnection(conf));
}
@Override
public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc)
{
tdesc.addFamily(cdesc);
}
@Override
public void setTimestamp(Delete d, long timestamp)
{
d.setTimestamp(timestamp);
}
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.util.Arrays;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HBaseCompatLoader {
private static final Logger log = LoggerFactory.getLogger(HBaseCompatLoader.class);
private static final String DEFAULT_HBASE_COMPAT_VERSION = "1.1";
private static final String DEFAULT_HBASE_CLASS_NAME = "com.thinkaurelius.titan.diskstorage.hbase.HBaseCompat1_1";
private static HBaseCompat cachedCompat;
public synchronized static HBaseCompat getCompat(String classOverride) {
if (null != cachedCompat) {
log.debug("Returning cached HBase compatibility layer: {}", cachedCompat);
return cachedCompat;
}
HBaseCompat compat;
String className = null;
String classNameSource = null;
if (null != classOverride) {
className = classOverride;
classNameSource = "from explicit configuration";
} else {
String hbaseVersion = VersionInfo.getVersion();
for (String supportedVersion : Arrays.asList("0.94", "0.96", "0.98", "1.0", "1.1")) {
if (hbaseVersion.startsWith(supportedVersion + ".")) {
className = "com.thinkaurelius.titan.diskstorage.hbase.HBaseCompat" + supportedVersion.replaceAll("\\.", "_");
classNameSource = "supporting runtime HBase version " + hbaseVersion;
break;
}
}
if (null == className) {
log.info("The HBase version {} is not explicitly supported by Titan. " +
"Loading Titan's compatibility layer for its most recent supported HBase version ({})",
hbaseVersion, DEFAULT_HBASE_COMPAT_VERSION);
className = DEFAULT_HBASE_CLASS_NAME;
classNameSource = " by default";
}
}
final String errTemplate = " when instantiating HBase compatibility class " + className;
try {
compat = (HBaseCompat)Class.forName(className).newInstance();
log.info("Instantiated HBase compatibility layer {}: {}", classNameSource, compat.getClass().getCanonicalName());
} catch (IllegalAccessException e) {
throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e);
} catch (InstantiationException e) {
throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e);
}
return cachedCompat = compat;
}
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.thinkaurelius.titan.diskstorage.*;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*;
import com.thinkaurelius.titan.diskstorage.util.RecordIterator;
import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry;
import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntryList;
import com.thinkaurelius.titan.util.system.IOUtils;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.ColumnPaginationFilter;
import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.*;
/**
* Here are some areas that might need work:
* <p/>
* - batching? (consider HTable#batch, HTable#setAutoFlush(false)
* - tuning HTable#setWriteBufferSize (?)
* - writing a server-side filter to replace ColumnCountGetFilter, which drops
* all columns on the row where it reaches its limit. This requires getSlice,
* currently, to impose its limit on the client side. That obviously won't
* scale.
* - RowMutations for combining Puts+Deletes (need a newer HBase than 0.92 for this)
* - (maybe) fiddle with HTable#setRegionCachePrefetch and/or #prewarmRegionCache
* <p/>
* There may be other problem areas. These are just the ones of which I'm aware.
*/
public class HBaseKeyColumnValueStore implements KeyColumnValueStore {
private static final Logger logger = LoggerFactory.getLogger(HBaseKeyColumnValueStore.class);
private final String tableName;
private final HBaseStoreManager storeManager;
// When using shortened CF names, columnFamily is the shortname and storeName is the longname
// When not using shortened CF names, they are the same
//private final String columnFamily;
private final String storeName;
// This is columnFamily.getBytes()
private final byte[] columnFamilyBytes;
private final HBaseGetter entryGetter;
private final ConnectionMask cnx;
HBaseKeyColumnValueStore(HBaseStoreManager storeManager, ConnectionMask cnx, String tableName, String columnFamily, String storeName) {
this.storeManager = storeManager;
this.cnx = cnx;
this.tableName = tableName;
//this.columnFamily = columnFamily;
this.storeName = storeName;
this.columnFamilyBytes = columnFamily.getBytes();
this.entryGetter = new HBaseGetter(storeManager.getMetaDataSchema(storeName));
}
@Override
public void close() throws BackendException {
}
@Override
public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException {
Map<StaticBuffer, EntryList> result = getHelper(Arrays.asList(query.getKey()), getFilter(query));
return Iterables.getOnlyElement(result.values(), EntryList.EMPTY_LIST);
}
@Override
public Map<StaticBuffer,EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException {
return getHelper(keys, getFilter(query));
}
@Override
public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException {
Map<StaticBuffer, KCVMutation> mutations = ImmutableMap.of(key, new KCVMutation(additions, deletions));
mutateMany(mutations, txh);
}
@Override
public void acquireLock(StaticBuffer key,
StaticBuffer column,
StaticBuffer expectedValue,
StoreTransaction txh) throws BackendException {
throw new UnsupportedOperationException();
}
@Override
public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh) throws BackendException {
return executeKeySliceQuery(query.getKeyStart().as(StaticBuffer.ARRAY_FACTORY),
query.getKeyEnd().as(StaticBuffer.ARRAY_FACTORY),
new FilterList(FilterList.Operator.MUST_PASS_ALL),
query);
}
@Override
public String getName() {
return storeName;
}
@Override
public KeyIterator getKeys(SliceQuery query, StoreTransaction txh) throws BackendException {
return executeKeySliceQuery(new FilterList(FilterList.Operator.MUST_PASS_ALL), query);
}
public static Filter getFilter(SliceQuery query) {
byte[] colStartBytes = query.getSliceEnd().length() > 0 ? query.getSliceStart().as(StaticBuffer.ARRAY_FACTORY) : null;
byte[] colEndBytes = query.getSliceEnd().length() > 0 ? query.getSliceEnd().as(StaticBuffer.ARRAY_FACTORY) : null;
Filter filter = new ColumnRangeFilter(colStartBytes, true, colEndBytes, false);
if (query.hasLimit()) {
filter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
filter,
new ColumnPaginationFilter(query.getLimit(), 0));
}
logger.debug("Generated HBase Filter {}", filter);
return filter;
}
private Map<StaticBuffer,EntryList> getHelper(List<StaticBuffer> keys, Filter getFilter) throws BackendException {
List<Get> requests = new ArrayList<Get>(keys.size());
{
for (StaticBuffer key : keys) {
Get g = new Get(key.as(StaticBuffer.ARRAY_FACTORY)).addFamily(columnFamilyBytes).setFilter(getFilter);
try {
g.setTimeRange(0, Long.MAX_VALUE);
} catch (IOException e) {
throw new PermanentBackendException(e);
}
requests.add(g);
}
}
Map<StaticBuffer,EntryList> resultMap = new HashMap<StaticBuffer,EntryList>(keys.size());
try {
TableMask table = null;
Result[] results = null;
try {
table = cnx.getTable(tableName);
results = table.get(requests);
} finally {
IOUtils.closeQuietly(table);
}
if (results == null)
return KCVSUtil.emptyResults(keys);
assert results.length==keys.size();
for (int i = 0; i < results.length; i++) {
Result result = results[i];
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> f = result.getMap();
if (f == null) { // no result for this key
resultMap.put(keys.get(i), EntryList.EMPTY_LIST);
continue;
}
// actual key with <timestamp, value>
NavigableMap<byte[], NavigableMap<Long, byte[]>> r = f.get(columnFamilyBytes);
resultMap.put(keys.get(i), (r == null)
? EntryList.EMPTY_LIST
: StaticArrayEntryList.ofBytes(r.entrySet(), entryGetter));
}
return resultMap;
} catch (IOException e) {
throw new TemporaryBackendException(e);
}
}
private void mutateMany(Map<StaticBuffer, KCVMutation> mutations, StoreTransaction txh) throws BackendException {
storeManager.mutateMany(ImmutableMap.of(storeName, mutations), txh);
}
private KeyIterator executeKeySliceQuery(FilterList filters, @Nullable SliceQuery columnSlice) throws BackendException {
return executeKeySliceQuery(null, null, filters, columnSlice);
}
private KeyIterator executeKeySliceQuery(@Nullable byte[] startKey,
@Nullable byte[] endKey,
FilterList filters,
@Nullable SliceQuery columnSlice) throws BackendException {
Scan scan = new Scan().addFamily(columnFamilyBytes);
try {
scan.setTimeRange(0, Long.MAX_VALUE);
} catch (IOException e) {
throw new PermanentBackendException(e);
}
if (startKey != null)
scan.setStartRow(startKey);
if (endKey != null)
scan.setStopRow(endKey);
if (columnSlice != null) {
filters.addFilter(getFilter(columnSlice));
}
TableMask table = null;
try {
table = cnx.getTable(tableName);
return new RowIterator(table, table.getScanner(scan.setFilter(filters)), columnFamilyBytes);
} catch (IOException e) {
IOUtils.closeQuietly(table);
throw new PermanentBackendException(e);
}
}
private class RowIterator implements KeyIterator {
private final Closeable table;
private final Iterator<Result> rows;
private final byte[] columnFamilyBytes;
private Result currentRow;
private boolean isClosed;
public RowIterator(Closeable table, ResultScanner rows, byte[] columnFamilyBytes) {
this.table = table;
this.columnFamilyBytes = Arrays.copyOf(columnFamilyBytes, columnFamilyBytes.length);
this.rows = Iterators.filter(rows.iterator(), new Predicate<Result>() {
@Override
public boolean apply(@Nullable Result result) {
if (result == null)
return false;
try {
StaticBuffer id = StaticArrayBuffer.of(result.getRow());
id.getLong(0);
} catch (NumberFormatException e) {
return false;
}
return true;
}
});
}
@Override
public RecordIterator<Entry> getEntries() {
ensureOpen();
return new RecordIterator<Entry>() {
private final Iterator<Map.Entry<byte[], NavigableMap<Long, byte[]>>> kv = currentRow.getMap().get(columnFamilyBytes).entrySet().iterator();
@Override
public boolean hasNext() {
ensureOpen();
return kv.hasNext();
}
@Override
public Entry next() {
ensureOpen();
return StaticArrayEntry.ofBytes(kv.next(), entryGetter);
}
@Override
public void close() {
isClosed = true;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
@Override
public boolean hasNext() {
ensureOpen();
return rows.hasNext();
}
@Override
public StaticBuffer next() {
ensureOpen();
currentRow = rows.next();
return StaticArrayBuffer.of(currentRow.getRow());
}
@Override
public void close() {
IOUtils.closeQuietly(table);
isClosed = true;
logger.debug("RowIterator closed table {}", table);
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
private void ensureOpen() {
if (isClosed)
throw new IllegalStateException("Iterator has been closed.");
}
}
private static class HBaseGetter implements StaticArrayEntry.GetColVal<Map.Entry<byte[], NavigableMap<Long, byte[]>>, byte[]> {
private final EntryMetaData[] schema;
private HBaseGetter(EntryMetaData[] schema) {
this.schema = schema;
}
@Override
public byte[] getColumn(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) {
return element.getKey();
}
@Override
public byte[] getValue(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) {
return element.getValue().lastEntry().getValue();
}
@Override
public EntryMetaData[] getMetaSchema(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) {
return schema;
}
@Override
public Object getMetaData(Map.Entry<byte[], NavigableMap<Long, byte[]>> element, EntryMetaData meta) {
switch(meta) {
case TIMESTAMP:
return element.getValue().lastEntry().getKey();
default:
throw new UnsupportedOperationException("Unsupported meta data: " + meta);
}
}
}
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import static com.thinkaurelius.titan.diskstorage.Backend.EDGESTORE_NAME;
import static com.thinkaurelius.titan.diskstorage.Backend.ID_STORE_NAME;
import static com.thinkaurelius.titan.diskstorage.Backend.INDEXSTORE_NAME;
import static com.thinkaurelius.titan.diskstorage.Backend.LOCK_STORE_SUFFIX;
import static com.thinkaurelius.titan.diskstorage.Backend.SYSTEM_MGMT_LOG_NAME;
import static com.thinkaurelius.titan.diskstorage.Backend.SYSTEM_TX_LOG_NAME;
import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import com.thinkaurelius.titan.diskstorage.configuration.ConfigElement;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.CustomizeStoreKCVSManager;
import com.thinkaurelius.titan.diskstorage.util.time.Timestamps;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.thinkaurelius.titan.core.TitanException;
import com.thinkaurelius.titan.diskstorage.BackendException;
import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig;
import com.thinkaurelius.titan.diskstorage.Entry;
import com.thinkaurelius.titan.diskstorage.PermanentBackendException;
import com.thinkaurelius.titan.diskstorage.StaticBuffer;
import com.thinkaurelius.titan.diskstorage.TemporaryBackendException;
import com.thinkaurelius.titan.diskstorage.common.DistributedStoreManager;
import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace;
import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption;
import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KCVMutation;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStore;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyRange;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StandardStoreFeatures;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreFeatures;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
import com.thinkaurelius.titan.diskstorage.util.BufferUtil;
import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions;
import com.thinkaurelius.titan.util.system.IOUtils;
import com.thinkaurelius.titan.util.system.NetworkUtil;
/**
* Storage Manager for HBase
*
* @author Dan LaRocque <dalaro@hopcount.org>
*/
@PreInitializeConfigOptions
public class HBaseStoreManager extends DistributedStoreManager implements KeyColumnValueStoreManager, CustomizeStoreKCVSManager {
private static final Logger logger = LoggerFactory.getLogger(HBaseStoreManager.class);
public static final ConfigNamespace HBASE_NS =
new ConfigNamespace(GraphDatabaseConfiguration.STORAGE_NS, "hbase", "HBase storage options");
public static final ConfigOption<Boolean> SHORT_CF_NAMES =
new ConfigOption<Boolean>(HBASE_NS, "short-cf-names",
"Whether to shorten the names of Titan's column families to one-character mnemonics " +
"to conserve storage space", ConfigOption.Type.FIXED, true);
public static final String COMPRESSION_DEFAULT = "-DEFAULT-";
public static final ConfigOption<String> COMPRESSION =
new ConfigOption<String>(HBASE_NS, "compression-algorithm",
"An HBase Compression.Algorithm enum string which will be applied to newly created column families. " +
"The compression algorithm must be installed and available on the HBase cluster. Titan cannot install " +
"and configure new compression algorithms on the HBase cluster by itself.",
ConfigOption.Type.MASKABLE, "GZ");
public static final ConfigOption<Boolean> SKIP_SCHEMA_CHECK =
new ConfigOption<Boolean>(HBASE_NS, "skip-schema-check",
"Assume that Titan's HBase table and column families already exist. " +
"When this is true, Titan will not check for the existence of its table/CFs, " +
"nor will it attempt to create them under any circumstances. This is useful " +
"when running Titan without HBase admin privileges.",
ConfigOption.Type.MASKABLE, false);
public static final ConfigOption<String> HBASE_TABLE =
new ConfigOption<String>(HBASE_NS, "table",
"The name of the table Titan will use. When " + ConfigElement.getPath(SKIP_SCHEMA_CHECK) +
" is false, Titan will automatically create this table if it does not already exist.",
ConfigOption.Type.LOCAL, "titan");
/**
* Related bug fixed in 0.98.0, 0.94.7, 0.95.0:
*
* https://issues.apache.org/jira/browse/HBASE-8170
*/
public static final int MIN_REGION_COUNT = 3;
/**
* The total number of HBase regions to create with Titan's table. This
* setting only effects table creation; this normally happens just once when
* Titan connects to an HBase backend for the first time.
*/
public static final ConfigOption<Integer> REGION_COUNT =
new ConfigOption<Integer>(HBASE_NS, "region-count",
"The number of initial regions set when creating Titan's HBase table",
ConfigOption.Type.MASKABLE, Integer.class, new Predicate<Integer>() {
@Override
public boolean apply(Integer input) {
return null != input && MIN_REGION_COUNT <= input;
}
}
);
/**
* This setting is used only when {@link #REGION_COUNT} is unset.
* <p/>
* If Titan's HBase table does not exist, then it will be created with total
* region count = (number of servers reported by ClusterStatus) * (this
* value).
* <p/>
* The Apache HBase manual suggests an order-of-magnitude range of potential
* values for this setting:
*
* <ul>
* <li>
* <a href="https://hbase.apache.org/book/important_configurations.html#disable.splitting">2.5.2.7. Managed Splitting</a>:
* <blockquote>
* What's the optimal number of pre-split regions to create? Mileage will
* vary depending upon your application. You could start low with 10
* pre-split regions / server and watch as data grows over time. It's
* better to err on the side of too little regions and rolling split later.
* </blockquote>
* </li>
* <li>
* <a href="https://hbase.apache.org/book/regions.arch.html">9.7 Regions</a>:
* <blockquote>
* In general, HBase is designed to run with a small (20-200) number of
* relatively large (5-20Gb) regions per server... Typically you want to
* keep your region count low on HBase for numerous reasons. Usually
* right around 100 regions per RegionServer has yielded the best results.
* </blockquote>
* </li>
* </ul>
*
* These considerations may differ for other HBase implementations (e.g. MapR).
*/
public static final ConfigOption<Integer> REGIONS_PER_SERVER =
new ConfigOption<Integer>(HBASE_NS, "regions-per-server",
"The number of regions per regionserver to set when creating Titan's HBase table",
ConfigOption.Type.MASKABLE, Integer.class);
/**
* If this key is present in either the JVM system properties or the process
* environment (checked in the listed order, first hit wins), then its value
* must be the full package and class name of an implementation of
* {@link HBaseCompat} that has a no-arg public constructor.
* <p>
* When this <b>is not</b> set, Titan attempts to automatically detect the
* HBase runtime version by calling {@link VersionInfo#getVersion()}. Titan
* then checks the returned version string against a hard-coded list of
* supported version prefixes and instantiates the associated compat layer
* if a match is found.
* <p>
* When this <b>is</b> set, Titan will not call
* {@code VersionInfo.getVersion()} or read its hard-coded list of supported
* version prefixes. Titan will instead attempt to instantiate the class
* specified (via the no-arg constructor which must exist) and then attempt
* to cast it to HBaseCompat and use it as such. Titan will assume the
* supplied implementation is compatible with the runtime HBase version and
* make no attempt to verify that assumption.
* <p>
* Setting this key incorrectly could cause runtime exceptions at best or
* silent data corruption at worst. This setting is intended for users
* running exotic HBase implementations that don't support VersionInfo or
* implementations which return values from {@code VersionInfo.getVersion()}
* that are inconsistent with Apache's versioning convention. It may also be
* useful to users who want to run against a new release of HBase that Titan
* doesn't yet officially support.
*
*/
public static final ConfigOption<String> COMPAT_CLASS =
new ConfigOption<String>(HBASE_NS, "compat-class",
"The package and class name of the HBaseCompat implementation. HBaseCompat masks version-specific HBase API differences. " +
"When this option is unset, Titan calls HBase's VersionInfo.getVersion() and loads the matching compat class " +
"at runtime. Setting this option forces Titan to instead reflectively load and instantiate the specified class.",
ConfigOption.Type.MASKABLE, String.class);
public static final int PORT_DEFAULT = 9160;
public static final Timestamps PREFERRED_TIMESTAMPS = Timestamps.MILLI;
public static final ConfigNamespace HBASE_CONFIGURATION_NAMESPACE =
new ConfigNamespace(HBASE_NS, "ext", "Overrides for hbase-{site,default}.xml options", true);
private static final BiMap<String, String> SHORT_CF_NAME_MAP =
ImmutableBiMap.<String, String>builder()
.put(INDEXSTORE_NAME, "g")
.put(INDEXSTORE_NAME + LOCK_STORE_SUFFIX, "h")
.put(ID_STORE_NAME, "i")
.put(EDGESTORE_NAME, "e")
.put(EDGESTORE_NAME + LOCK_STORE_SUFFIX, "f")
.put(SYSTEM_PROPERTIES_STORE_NAME, "s")
.put(SYSTEM_PROPERTIES_STORE_NAME + LOCK_STORE_SUFFIX, "t")
.put(SYSTEM_MGMT_LOG_NAME, "m")
.put(SYSTEM_TX_LOG_NAME, "l")
.build();
private static final StaticBuffer FOUR_ZERO_BYTES = BufferUtil.zeroBuffer(4);
static {
// Verify that shortCfNameMap is injective
// Should be guaranteed by Guava BiMap, but it doesn't hurt to check
Preconditions.checkArgument(null != SHORT_CF_NAME_MAP);
Collection<String> shorts = SHORT_CF_NAME_MAP.values();
Preconditions.checkArgument(Sets.newHashSet(shorts).size() == shorts.size());
}
// Immutable instance fields
private final String tableName;
private final String compression;
private final int regionCount;
private final int regionsPerServer;
private final ConnectionMask cnx;
private final org.apache.hadoop.conf.Configuration hconf;
private final boolean shortCfNames;
private final boolean skipSchemaCheck;
private final String compatClass;
private final HBaseCompat compat;
private static final ConcurrentHashMap<HBaseStoreManager, Throwable> openManagers =
new ConcurrentHashMap<HBaseStoreManager, Throwable>();
// Mutable instance state
private final ConcurrentMap<String, HBaseKeyColumnValueStore> openStores;
public HBaseStoreManager(com.thinkaurelius.titan.diskstorage.configuration.Configuration config) throws BackendException {
super(config, PORT_DEFAULT);
checkConfigDeprecation(config);
this.tableName = config.get(HBASE_TABLE);
this.compression = config.get(COMPRESSION);
this.regionCount = config.has(REGION_COUNT) ? config.get(REGION_COUNT) : -1;
this.regionsPerServer = config.has(REGIONS_PER_SERVER) ? config.get(REGIONS_PER_SERVER) : -1;
this.skipSchemaCheck = config.get(SKIP_SCHEMA_CHECK);
this.compatClass = config.has(COMPAT_CLASS) ? config.get(COMPAT_CLASS) : null;
this.compat = HBaseCompatLoader.getCompat(compatClass);
/*
* Specifying both region count options is permitted but may be
* indicative of a misunderstanding, so issue a warning.
*/
if (config.has(REGIONS_PER_SERVER) && config.has(REGION_COUNT)) {
logger.warn("Both {} and {} are set in Titan's configuration, but "
+ "the former takes precedence and the latter will be ignored.",
REGION_COUNT, REGIONS_PER_SERVER);
}
/* This static factory calls HBaseConfiguration.addHbaseResources(),
* which in turn applies the contents of hbase-default.xml and then
* applies the contents of hbase-site.xml.
*/
this.hconf = HBaseConfiguration.create();
// Copy a subset of our commons config into a Hadoop config
int keysLoaded=0;
Map<String,Object> configSub = config.getSubset(HBASE_CONFIGURATION_NAMESPACE);
for (Map.Entry<String,Object> entry : configSub.entrySet()) {
logger.info("HBase configuration: setting {}={}", entry.getKey(), entry.getValue());
if (entry.getValue()==null) continue;
hconf.set(entry.getKey(), entry.getValue().toString());
keysLoaded++;
}
// Special case for STORAGE_HOSTS
if (config.has(GraphDatabaseConfiguration.STORAGE_HOSTS)) {
String zkQuorumKey = "hbase.zookeeper.quorum";
String csHostList = Joiner.on(",").join(config.get(GraphDatabaseConfiguration.STORAGE_HOSTS));
hconf.set(zkQuorumKey, csHostList);
logger.info("Copied host list from {} to {}: {}", GraphDatabaseConfiguration.STORAGE_HOSTS, zkQuorumKey, csHostList);
}
logger.debug("HBase configuration: set a total of {} configuration values", keysLoaded);
this.shortCfNames = config.get(SHORT_CF_NAMES);
try {
//this.cnx = HConnectionManager.createConnection(hconf);
this.cnx = compat.createConnection(hconf);
} catch (IOException e) {
throw new PermanentBackendException(e);
}
if (logger.isTraceEnabled()) {
openManagers.put(this, new Throwable("Manager Opened"));
dumpOpenManagers();
}
logger.debug("Dumping HBase config key=value pairs");
for (Map.Entry<String, String> entry : hconf) {
logger.debug("[HBaseConfig] " + entry.getKey() + "=" + entry.getValue());
}
logger.debug("End of HBase config key=value pairs");
openStores = new ConcurrentHashMap<String, HBaseKeyColumnValueStore>();
}
@Override
public Deployment getDeployment() {
List<KeyRange> local;
try {
local = getLocalKeyPartition();
return null != local && !local.isEmpty() ? Deployment.LOCAL : Deployment.REMOTE;
} catch (BackendException e) {
// propagating StorageException might be a better approach
throw new RuntimeException(e);
}
}
@Override
public String toString() {
return "hbase[" + tableName + "@" + super.toString() + "]";
}
public void dumpOpenManagers() {
int estimatedSize = openManagers.size();
logger.trace("---- Begin open HBase store manager list ({} managers) ----", estimatedSize);
for (HBaseStoreManager m : openManagers.keySet()) {
logger.trace("Manager {} opened at:", m, openManagers.get(m));
}
logger.trace("---- End open HBase store manager list ({} managers) ----", estimatedSize);
}
@Override
public void close() {
openStores.clear();
if (logger.isTraceEnabled())
openManagers.remove(this);
IOUtils.closeQuietly(cnx);
}
@Override
public StoreFeatures getFeatures() {
Configuration c = GraphDatabaseConfiguration.buildConfiguration();
StandardStoreFeatures.Builder fb = new StandardStoreFeatures.Builder()
.orderedScan(true).unorderedScan(true).batchMutation(true)
.multiQuery(true).distributed(true).keyOrdered(true).storeTTL(true)
.timestamps(true).preferredTimestamps(PREFERRED_TIMESTAMPS)
.keyConsistent(c);
try {
fb.localKeyPartition(getDeployment() == Deployment.LOCAL);
} catch (Exception e) {
logger.warn("Unexpected exception during getDeployment()", e);
}
return fb.build();
}
@Override
public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException {
final MaskedTimestamp commitTime = new MaskedTimestamp(txh);
// In case of an addition and deletion with identical timestamps, the
// deletion tombstone wins.
// http://hbase.apache.org/book/versions.html#d244e4250
Map<StaticBuffer, Pair<Put, Delete>> commandsPerKey =
convertToCommands(
mutations,
commitTime.getAdditionTime(times.getUnit()),
commitTime.getDeletionTime(times.getUnit()));
List<Row> batch = new ArrayList<Row>(commandsPerKey.size()); // actual batch operation
// convert sorted commands into representation required for 'batch' operation
for (Pair<Put, Delete> commands : commandsPerKey.values()) {
if (commands.getFirst() != null)
batch.add(commands.getFirst());
if (commands.getSecond() != null)
batch.add(commands.getSecond());
}
try {
TableMask table = null;
try {
table = cnx.getTable(tableName);
table.batch(batch, new Object[batch.size()]);
} finally {
IOUtils.closeQuietly(table);
}
} catch (IOException e) {
throw new TemporaryBackendException(e);
} catch (InterruptedException e) {
throw new TemporaryBackendException(e);
}
sleepAfterWrite(txh, commitTime);
}
@Override
public KeyColumnValueStore openDatabase(String longName) throws BackendException {
return openDatabase(longName, -1);
}
@Override
public KeyColumnValueStore openDatabase(final String longName, int ttlInSeconds) throws BackendException {
HBaseKeyColumnValueStore store = openStores.get(longName);
if (store == null) {
final String cfName = shortCfNames ? shortenCfName(longName) : longName;
HBaseKeyColumnValueStore newStore = new HBaseKeyColumnValueStore(this, cnx, tableName, cfName, longName);
store = openStores.putIfAbsent(longName, newStore); // nothing bad happens if we loose to other thread
if (store == null) {
if (!skipSchemaCheck)
ensureColumnFamilyExists(tableName, cfName, ttlInSeconds);
store = newStore;
}
logger.info("Loaded 1.x Hbase Client Store Manager");
}
return store;
}
@Override
public StoreTransaction beginTransaction(final BaseTransactionConfig config) throws BackendException {
return new HBaseTransaction(config);
}
@Override
public String getName() {
return tableName;
}
/**
* Deletes the specified table with all its columns.
* ATTENTION: Invoking this method will delete the table if it exists and therefore causes data loss.
*/
@Override
public void clearStorage() throws BackendException {
try (AdminMask adm = getAdminInterface()) {
adm.clearTable(tableName, times.getTime().getNativeTimestamp());
} catch (IOException e)
{
throw new TemporaryBackendException(e);
}
}
@Override
public List<KeyRange> getLocalKeyPartition() throws BackendException {
List<KeyRange> result = new LinkedList<KeyRange>();
HTable table = null;
try {
ensureTableExists(tableName, getCfNameForStoreName(GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME), 0);
table = new HTable(hconf, tableName);
Map<KeyRange, ServerName> normed =
normalizeKeyBounds(table.getRegionLocations());
for (Map.Entry<KeyRange, ServerName> e : normed.entrySet()) {
if (NetworkUtil.isLocalConnection(e.getValue().getHostname())) {
result.add(e.getKey());
logger.debug("Found local key/row partition {} on host {}", e.getKey(), e.getValue());
} else {
logger.debug("Discarding remote {}", e.getValue());
}
}
} catch (MasterNotRunningException e) {
logger.warn("Unexpected MasterNotRunningException", e);
} catch (ZooKeeperConnectionException e) {
logger.warn("Unexpected ZooKeeperConnectionException", e);
} catch (IOException e) {
logger.warn("Unexpected IOException", e);
} finally {
IOUtils.closeQuietly(table);
}
return result;
}
/**
* Given a map produced by {@link HTable#getRegionLocations()}, transform
* each key from an {@link HRegionInfo} to a {@link KeyRange} expressing the
* region's start and end key bounds using Titan-partitioning-friendly
* conventions (start inclusive, end exclusive, zero bytes appended where
* necessary to make all keys at least 4 bytes long).
* <p/>
* This method iterates over the entries in its map parameter and performs
* the following conditional conversions on its keys. "Require" below means
* either a {@link Preconditions} invocation or an assertion. HRegionInfo
* sometimes returns start and end keys of zero length; this method replaces
* zero length keys with null before doing any of the checks described
* below. The parameter map and the values it contains are only read and
* never modified.
*
* <ul>
* <li>If an entry's HRegionInfo has null start and end keys, then first
* require that the parameter map is a singleton, and then return a
* single-entry map whose {@code KeyRange} has start and end buffers that
* are both four bytes of zeros.</li>
* <li>If the entry has a null end key (but non-null start key), put an
* equivalent entry in the result map with a start key identical to the
* input, except that zeros are appended to values less than 4 bytes long,
* and an end key that is four bytes of zeros.
* <li>If the entry has a null start key (but non-null end key), put an
* equivalent entry in the result map where the start key is four bytes of
* zeros, and the end key has zeros appended, if necessary, to make it at
* least 4 bytes long, after which one is added to the padded value in
* unsigned 32-bit arithmetic with overflow allowed.</li>
* <li>Any entry which matches none of the above criteria results in an
* equivalent entry in the returned map, except that zeros are appended to
* both keys to make each at least 4 bytes long, and the end key is then
* incremented as described in the last bullet point.</li>
* </ul>
*
* After iterating over the parameter map, this method checks that it either
* saw no entries with null keys, one entry with a null start key and a
* different entry with a null end key, or one entry with both start and end
* keys null. If any null keys are observed besides these three cases, the
* method will die with a precondition failure.
*
* @param raw
* A map of HRegionInfo and ServerName from HBase
* @return Titan-friendly expression of each region's rowkey boundaries
*/
private Map<KeyRange, ServerName> normalizeKeyBounds(NavigableMap<HRegionInfo, ServerName> raw) {
Map.Entry<HRegionInfo, ServerName> nullStart = null;
Map.Entry<HRegionInfo, ServerName> nullEnd = null;
ImmutableMap.Builder<KeyRange, ServerName> b = ImmutableMap.builder();
for (Map.Entry<HRegionInfo, ServerName> e : raw.entrySet()) {
HRegionInfo regionInfo = e.getKey();
byte startKey[] = regionInfo.getStartKey();
byte endKey[] = regionInfo.getEndKey();
if (0 == startKey.length) {
startKey = null;
logger.trace("Converted zero-length HBase startKey byte array to null");
}
if (0 == endKey.length) {
endKey = null;
logger.trace("Converted zero-length HBase endKey byte array to null");
}
if (null == startKey && null == endKey) {
Preconditions.checkState(1 == raw.size());
logger.debug("HBase table {} has a single region {}", tableName, regionInfo);
// Choose arbitrary shared value = startKey = endKey
return b.put(new KeyRange(FOUR_ZERO_BYTES, FOUR_ZERO_BYTES), e.getValue()).build();
} else if (null == startKey) {
logger.debug("Found HRegionInfo with null startKey on server {}: {}", e.getValue(), regionInfo);
Preconditions.checkState(null == nullStart);
nullStart = e;
// I thought endBuf would be inclusive from the HBase javadoc, but in practice it is exclusive
StaticBuffer endBuf = StaticArrayBuffer.of(zeroExtend(endKey));
// Replace null start key with zeroes
b.put(new KeyRange(FOUR_ZERO_BYTES, endBuf), e.getValue());
} else if (null == endKey) {
logger.debug("Found HRegionInfo with null endKey on server {}: {}", e.getValue(), regionInfo);
Preconditions.checkState(null == nullEnd);
nullEnd = e;
// Replace null end key with zeroes
b.put(new KeyRange(StaticArrayBuffer.of(zeroExtend(startKey)), FOUR_ZERO_BYTES), e.getValue());
} else {
Preconditions.checkState(null != startKey);
Preconditions.checkState(null != endKey);
// Convert HBase's inclusive end keys into exclusive Titan end keys
StaticBuffer startBuf = StaticArrayBuffer.of(zeroExtend(startKey));
StaticBuffer endBuf = StaticArrayBuffer.of(zeroExtend(endKey));
KeyRange kr = new KeyRange(startBuf, endBuf);
b.put(kr, e.getValue());
logger.debug("Found HRegionInfo with non-null end and start keys on server {}: {}", e.getValue(), regionInfo);
}
}
// Require either no null key bounds or a pair of them
Preconditions.checkState(!(null == nullStart ^ null == nullEnd));
// Check that every key in the result is at least 4 bytes long
Map<KeyRange, ServerName> result = b.build();
for (KeyRange kr : result.keySet()) {
Preconditions.checkState(4 <= kr.getStart().length());
Preconditions.checkState(4 <= kr.getEnd().length());
}
return result;
}
/**
* If the parameter is shorter than 4 bytes, then create and return a new 4
* byte array with the input array's bytes followed by zero bytes. Otherwise
* return the parameter.
*
* @param dataToPad non-null but possibly zero-length byte array
* @return either the parameter or a new array
*/
private final byte[] zeroExtend(byte[] dataToPad) {
assert null != dataToPad;
final int targetLength = 4;
if (targetLength <= dataToPad.length)
return dataToPad;
byte padded[] = new byte[targetLength];
for (int i = 0; i < dataToPad.length; i++)
padded[i] = dataToPad[i];
for (int i = dataToPad.length; i < padded.length; i++)
padded[i] = (byte)0;
return padded;
}
public static String shortenCfName(String longName) throws PermanentBackendException {
final String s;
if (SHORT_CF_NAME_MAP.containsKey(longName)) {
s = SHORT_CF_NAME_MAP.get(longName);
Preconditions.checkNotNull(s);
logger.debug("Substituted default CF name \"{}\" with short form \"{}\" to reduce HBase KeyValue size", longName, s);
} else {
if (SHORT_CF_NAME_MAP.containsValue(longName)) {
String fmt = "Must use CF long-form name \"%s\" instead of the short-form name \"%s\" when configured with %s=true";
String msg = String.format(fmt, SHORT_CF_NAME_MAP.inverse().get(longName), longName, SHORT_CF_NAMES.getName());
throw new PermanentBackendException(msg);
}
s = longName;
logger.debug("Kept default CF name \"{}\" because it has no associated short form", s);
}
return s;
}
private HTableDescriptor ensureTableExists(String tableName, String initialCFName, int ttlInSeconds) throws BackendException {
AdminMask adm = null;
HTableDescriptor desc;
try { // Create our table, if necessary
adm = getAdminInterface();
/*
* Some HBase versions/impls respond badly to attempts to create a
* table without at least one CF. See #661. Creating a CF along with
* the table avoids HBase carping.
*/
if (adm.tableExists(tableName)) {
desc = adm.getTableDescriptor(tableName);
} else {
desc = createTable(tableName, initialCFName, ttlInSeconds, adm);
}
} catch (IOException e) {
throw new TemporaryBackendException(e);
} finally {
IOUtils.closeQuietly(adm);
}
return desc;
}
private HTableDescriptor createTable(String tableName, String cfName, int ttlInSeconds, AdminMask adm) throws IOException {
HTableDescriptor desc = compat.newTableDescriptor(tableName);
HColumnDescriptor cdesc = new HColumnDescriptor(cfName);
setCFOptions(cdesc, ttlInSeconds);
compat.addColumnFamilyToTableDescriptor(desc, cdesc);
int count; // total regions to create
String src;
if (MIN_REGION_COUNT <= (count = regionCount)) {
src = "region count configuration";
} else if (0 < regionsPerServer &&
MIN_REGION_COUNT <= (count = regionsPerServer * adm.getEstimatedRegionServerCount())) {
src = "ClusterStatus server count";
} else {
count = -1;
src = "default";
}
if (MIN_REGION_COUNT < count) {
adm.createTable(desc, getStartKey(count), getEndKey(count), count);
logger.debug("Created table {} with region count {} from {}", tableName, count, src);
} else {
adm.createTable(desc);
logger.debug("Created table {} with default start key, end key, and region count", tableName);
}
return desc;
}
/**
* This method generates the second argument to
* {@link HBaseAdmin#createTable(HTableDescriptor, byte[], byte[], int)}.
* <p/>
* From the {@code createTable} javadoc:
* "The start key specified will become the end key of the first region of
* the table, and the end key specified will become the start key of the
* last region of the table (the first region has a null start key and
* the last region has a null end key)"
* <p/>
* To summarize, the {@code createTable} argument called "startKey" is
* actually the end key of the first region.
*/
private byte[] getStartKey(int regionCount) {
ByteBuffer regionWidth = ByteBuffer.allocate(4);
regionWidth.putInt((int)(((1L << 32) - 1L) / regionCount)).flip();
return StaticArrayBuffer.of(regionWidth).getBytes(0, 4);
}
/**
* Companion to {@link #getStartKey(int)}. See its javadoc for details.
*/
private byte[] getEndKey(int regionCount) {
ByteBuffer regionWidth = ByteBuffer.allocate(4);
regionWidth.putInt((int)(((1L << 32) - 1L) / regionCount * (regionCount - 1))).flip();
return StaticArrayBuffer.of(regionWidth).getBytes(0, 4);
}
private void ensureColumnFamilyExists(String tableName, String columnFamily, int ttlInSeconds) throws BackendException {
AdminMask adm = null;
try {
adm = getAdminInterface();
HTableDescriptor desc = ensureTableExists(tableName, columnFamily, ttlInSeconds);
Preconditions.checkNotNull(desc);
HColumnDescriptor cf = desc.getFamily(columnFamily.getBytes());
// Create our column family, if necessary
if (cf == null) {
try {
if (!adm.isTableDisabled(tableName)) {
adm.disableTable(tableName);
}
} catch (TableNotEnabledException e) {
logger.debug("Table {} already disabled", tableName);
} catch (IOException e) {
throw new TemporaryBackendException(e);
}
try {
HColumnDescriptor cdesc = new HColumnDescriptor(columnFamily);
setCFOptions(cdesc, ttlInSeconds);
adm.addColumn(tableName, cdesc);
try {
logger.debug("Added HBase ColumnFamily {}, waiting for 1 sec. to propogate.", columnFamily);
Thread.sleep(1000L);
} catch (InterruptedException ie) {
throw new TemporaryBackendException(ie);
}
adm.enableTable(tableName);
} catch (TableNotFoundException ee) {
logger.error("TableNotFoundException", ee);
throw new PermanentBackendException(ee);
} catch (org.apache.hadoop.hbase.TableExistsException ee) {
logger.debug("Swallowing exception {}", ee);
} catch (IOException ee) {
throw new TemporaryBackendException(ee);
}
}
} finally {
IOUtils.closeQuietly(adm);
}
}
private void setCFOptions(HColumnDescriptor cdesc, int ttlInSeconds) {
if (null != compression && !compression.equals(COMPRESSION_DEFAULT))
compat.setCompression(cdesc, compression);
if (ttlInSeconds > 0)
cdesc.setTimeToLive(ttlInSeconds);
}
/**
* Convert Titan internal Mutation representation into HBase native commands.
*
* @param mutations Mutations to convert into HBase commands.
* @param putTimestamp The timestamp to use for Put commands.
* @param delTimestamp The timestamp to use for Delete commands.
* @return Commands sorted by key converted from Titan internal representation.
* @throws com.thinkaurelius.titan.diskstorage.PermanentBackendException
*/
private Map<StaticBuffer, Pair<Put, Delete>> convertToCommands(Map<String, Map<StaticBuffer, KCVMutation>> mutations,
final long putTimestamp,
final long delTimestamp) throws PermanentBackendException {
Map<StaticBuffer, Pair<Put, Delete>> commandsPerKey = new HashMap<StaticBuffer, Pair<Put, Delete>>();
for (Map.Entry<String, Map<StaticBuffer, KCVMutation>> entry : mutations.entrySet()) {
String cfString = getCfNameForStoreName(entry.getKey());
byte[] cfName = cfString.getBytes();
for (Map.Entry<StaticBuffer, KCVMutation> m : entry.getValue().entrySet()) {
byte[] key = m.getKey().as(StaticBuffer.ARRAY_FACTORY);
KCVMutation mutation = m.getValue();
Pair<Put, Delete> commands = commandsPerKey.get(m.getKey());
if (commands == null) {
commands = new Pair<Put, Delete>();
commandsPerKey.put(m.getKey(), commands);
}
if (mutation.hasDeletions()) {
if (commands.getSecond() == null) {
Delete d = new Delete(key);
compat.setTimestamp(d, delTimestamp);
commands.setSecond(d);
}
for (StaticBuffer b : mutation.getDeletions()) {
commands.getSecond().deleteColumns(cfName, b.as(StaticBuffer.ARRAY_FACTORY), delTimestamp);
}
}
if (mutation.hasAdditions()) {
if (commands.getFirst() == null) {
Put p = new Put(key, putTimestamp);
commands.setFirst(p);
}
for (Entry e : mutation.getAdditions()) {
commands.getFirst().add(cfName,
e.getColumnAs(StaticBuffer.ARRAY_FACTORY),
putTimestamp,
e.getValueAs(StaticBuffer.ARRAY_FACTORY));
}
}
}
}
return commandsPerKey;
}
private String getCfNameForStoreName(String storeName) throws PermanentBackendException {
return shortCfNames ? shortenCfName(storeName) : storeName;
}
private void checkConfigDeprecation(com.thinkaurelius.titan.diskstorage.configuration.Configuration config) {
if (config.has(GraphDatabaseConfiguration.STORAGE_PORT)) {
logger.warn("The configuration property {} is ignored for HBase. Set hbase.zookeeper.property.clientPort in hbase-site.xml or {}.hbase.zookeeper.property.clientPort in Titan's configuration file.",
ConfigElement.getPath(GraphDatabaseConfiguration.STORAGE_PORT), ConfigElement.getPath(HBASE_CONFIGURATION_NAMESPACE));
}
}
private AdminMask getAdminInterface() {
try {
return cnx.getAdmin();
} catch (IOException e) {
throw new TitanException(e);
}
}
/**
* Similar to {@link Function}, except that the {@code apply} method is allowed
* to throw {@link BackendException}.
*/
private static interface BackendFunction<F, T> {
T apply(F input) throws BackendException;
}
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig;
import com.thinkaurelius.titan.diskstorage.common.AbstractStoreTransaction;
/**
* This class overrides and adds nothing compared with
* {@link com.thinkaurelius.titan.diskstorage.locking.consistentkey.ExpectedValueCheckingTransaction}; however, it creates a transaction type specific
* to HBase, which lets us check for user errors like passing a Cassandra
* transaction into a HBase method.
*
* @author Dan LaRocque <dalaro@hopcount.org>
*/
public class HBaseTransaction extends AbstractStoreTransaction {
public HBaseTransaction(final BaseTransactionConfig config) {
super(config);
}
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.io.IOException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
public class HConnection0_98 implements ConnectionMask
{
private final HConnection cnx;
public HConnection0_98(HConnection cnx)
{
this.cnx = cnx;
}
@Override
public TableMask getTable(String name) throws IOException
{
return new HTable0_98(cnx.getTable(name));
}
@Override
public AdminMask getAdmin() throws IOException
{
return new HBaseAdmin0_98(new HBaseAdmin(cnx));
}
@Override
public void close() throws IOException
{
cnx.close();
}
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.io.IOException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HBaseAdmin;
public class HConnection1_0 implements ConnectionMask
{
private final Connection cnx;
public HConnection1_0(Connection cnx)
{
this.cnx = cnx;
}
@Override
public TableMask getTable(String name) throws IOException
{
return new HTable1_0(cnx.getTable(TableName.valueOf(name)));
}
@Override
public AdminMask getAdmin() throws IOException
{
return new HBaseAdmin1_0(new HBaseAdmin(cnx));
}
@Override
public void close() throws IOException
{
cnx.close();
}
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Scan;
public class HTable0_98 implements TableMask
{
private final HTableInterface table;
public HTable0_98(HTableInterface table)
{
this.table = table;
}
@Override
public ResultScanner getScanner(Scan filter) throws IOException
{
return table.getScanner(filter);
}
@Override
public Result[] get(List<Get> gets) throws IOException
{
return table.get(gets);
}
@Override
public void batch(List<Row> writes, Object[] results) throws IOException, InterruptedException
{
table.batch(writes, results);
table.flushCommits();
}
@Override
public void close() throws IOException
{
table.close();
}
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
public class HTable1_0 implements TableMask
{
private final Table table;
public HTable1_0(Table table)
{
this.table = table;
}
@Override
public ResultScanner getScanner(Scan filter) throws IOException
{
return table.getScanner(filter);
}
@Override
public Result[] get(List<Get> gets) throws IOException
{
return table.get(gets);
}
@Override
public void batch(List<Row> writes, Object[] results) throws IOException, InterruptedException
{
table.batch(writes, results);
/* table.flushCommits(); not needed anymore */
}
@Override
public void close() throws IOException
{
table.close();
}
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Scan;
/**
* This interface hides ABI/API breaking changes that HBase has made to its Table/HTableInterface over the course
* of development from 0.94 to 1.0 and beyond.
*/
public interface TableMask extends Closeable
{
ResultScanner getScanner(Scan filter) throws IOException;
Result[] get(List<Get> gets) throws IOException;
void batch(List<Row> writes, Object[] results) throws IOException, InterruptedException;
}
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Copyright 2012-2013 Aurelius LLC
* or more contributor license agreements. See the NOTICE file * Licensed under the Apache License, Version 2.0 (the "License");
* distributed with this work for additional information * you may not use this file except in compliance with the License.
* regarding copyright ownership. The ASF licenses this file * You may obtain a copy of the License at
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance * http://www.apache.org/licenses/LICENSE-2.0
* with the License. You may obtain a copy of the License at *
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package com.thinkaurelius.titan.diskstorage.solr; package com.thinkaurelius.titan.diskstorage.solr;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
......
...@@ -29,7 +29,10 @@ atlas.graph.storage.directory=target/data/berkley ...@@ -29,7 +29,10 @@ atlas.graph.storage.directory=target/data/berkley
#hbase #hbase
#For standalone mode , specify localhost #For standalone mode , specify localhost
#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2 #for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2
atlas.graph.storage.hostname=${titan.storage.hostname} atlas.graph.storage.hostname=${titan.storage.hostname}
atlas.graph.storage.hbase.regions-per-server=1
atlas.graph.storage.lock.wait-time=10000
#ElasticSearch #ElasticSearch
atlas.graph.index.search.directory=target/data/es atlas.graph.index.search.directory=target/data/es
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment