Commit fe6e696d by Suma Shivaprasad

ATLAS-498 Support Embedded HBase (tbeerbower via sumasai)

parent 46365f8c
......@@ -29,6 +29,12 @@
<description>Apache Atlas Distribution</description>
<name>Apache Atlas Distribution</name>
<properties>
<hbase.dir>${project.build.directory}/hbase</hbase.dir>
<hbase.tar>http://apache.mirrors.pair.com/hbase/stable/hbase-1.1.4-bin.tar.gz</hbase.tar>
<hbase.folder>hbase-1.1.4</hbase.folder>
</properties>
<profiles>
<profile>
<id>Windows</id>
......@@ -66,6 +72,39 @@
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<phase>generate-resources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target name="Download HBase">
<mkdir dir="${hbase.dir}"/>
<get
src="${hbase.tar}"
dest="${project.build.directory}/hbase.tar.gz"
usetimestamp="true"
/>
<untar
src="${project.build.directory}/hbase.tar.gz"
dest="${project.build.directory}/hbase.temp"
compression="gzip"
/>
<copy todir="${hbase.dir}">
<fileset dir="${project.build.directory}/hbase.temp/${hbase.folder}">
<include name="**/*"/>
</fileset>
</copy>
</target>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
......
......@@ -20,18 +20,18 @@ import os
import re
import platform
import subprocess
from threading import Thread
from signal import SIGTERM
import sys
import time
import errno
from re import split
from time import sleep
BIN = "bin"
LIB = "lib"
CONF = "conf"
LOG="logs"
WEBAPP="server" + os.sep + "webapp"
DATA="data"
LOG = "logs"
WEBAPP = "server" + os.sep + "webapp"
DATA = "data"
ENV_KEYS = ["JAVA_HOME", "ATLAS_OPTS", "ATLAS_LOG_DIR", "ATLAS_PID_DIR", "ATLAS_CONF", "ATLASCPPATH", "ATLAS_DATA_DIR", "ATLAS_HOME_DIR", "ATLAS_EXPANDED_WEBAPP_DIR", "HBASE_CONF_DIR"]
ATLAS_CONF = "ATLAS_CONF"
ATLAS_LOG = "ATLAS_LOG_DIR"
......@@ -43,6 +43,9 @@ ATLAS_HOME = "ATLAS_HOME_DIR"
HBASE_CONF_DIR = "HBASE_CONF_DIR"
IS_WINDOWS = platform.system() == "Windows"
ON_POSIX = 'posix' in sys.builtin_module_names
CONF_FILE="atlas-application.properties"
HBASE_STORAGE_CONF_ENTRY="atlas.graph.storage.backend\s*=\s*hbase"
HBASE_STORAGE_LOCAL_CONF_ENTRY="atlas.graph.storage.hostname\s*=\s*localhost"
DEBUG = False
def scriptDir():
......@@ -62,9 +65,11 @@ def confDir(dir):
localconf = os.path.join(dir, CONF)
return os.environ.get(ATLAS_CONF, localconf)
def hbaseConfDir(atlasConfDir):
parentDir = os.path.dirname(atlasConfDir)
return os.environ.get(HBASE_CONF_DIR, os.path.join(parentDir, "hbase", CONF))
def hbaseBinDir(dir):
return os.path.join(dir, "hbase", BIN)
def hbaseConfDir(dir):
return os.environ.get(HBASE_CONF_DIR, os.path.join(dir, "hbase", CONF))
def logDir(dir):
localLog = os.path.join(dir, LOG)
......@@ -171,21 +176,27 @@ def which(program):
return None
def runProcess(commandline, logdir=None):
def runProcess(commandline, logdir=None, shell=False, wait=False):
"""
Run a process
:param commandline: command line
:return:the return code
"""
global finished
debug ("Executing : %s" % commandline)
debug ("Executing : %s" % str(commandline))
timestr = time.strftime("atlas.%Y%m%d-%H%M%S")
stdoutFile = None
stderrFile = None
if logdir:
stdoutFile = open(os.path.join(logdir, timestr + ".out"), "w")
stderrFile = open(os.path.join(logdir,timestr + ".err"), "w")
return subprocess.Popen(commandline, stdout=stdoutFile, stderr=stderrFile)
p = subprocess.Popen(commandline, stdout=stdoutFile, stderr=stderrFile, shell=shell)
if wait:
p.communicate()
return p
def print_output(name, src, toStdErr):
"""
......@@ -298,35 +309,84 @@ def writePid(atlas_pid_file, process):
f.write(str(process.pid))
f.close()
def unix_exist_pid(pid):
#check if process id exist in the current process table
#See man 2 kill - Linux man page for info about the kill(pid,0) system function
try:
os.kill(pid, 0)
except OSError as e :
return e.errno == errno.EPERM
def exist_pid(pid):
if ON_POSIX:
#check if process id exist in the current process table
#See man 2 kill - Linux man page for info about the kill(pid,0) system function
try:
os.kill(pid, 0)
except OSError as e :
return e.errno == errno.EPERM
else:
return True
elif IS_WINDOWS:
#The os.kill approach does not work on Windows with python 2.7
#the output from tasklist command is searched for the process id
command='tasklist /fi "pid eq '+ pid + '"'
sub_process=subprocess.Popen(command, stdout = subprocess.PIPE, shell=False)
sub_process.communicate()
output = subprocess.check_output(command)
output=split(" *",output)
for line in output:
if pid in line:
return True
return False
#os other than nt or posix - not supported - need to delete the file to restart server if pid no longer exist
return True
def wait_for_shutdown(pid, msg, wait):
count = 0
sys.stdout.write(msg)
while exist_pid(pid):
sys.stdout.write('.')
sys.stdout.flush()
sleep(1)
if count > wait:
break
count = count + 1
sys.stdout.write('\n')
def is_hbase(confdir):
confdir = os.path.join(confdir, CONF_FILE)
return grep(confdir, HBASE_STORAGE_CONF_ENTRY) is not None
def is_hbase_local(confdir):
confdir = os.path.join(confdir, CONF_FILE)
return grep(confdir, HBASE_STORAGE_CONF_ENTRY) is not None and grep(confdir, HBASE_STORAGE_LOCAL_CONF_ENTRY) is not None
def run_hbase(dir, action, hbase_conf_dir = None, logdir = None, wait=True):
if hbase_conf_dir is not None:
cmd = [os.path.join(dir, "hbase-daemon.sh"), '--config', hbase_conf_dir, action, 'master']
else:
return True
cmd = [os.path.join(dir, "hbase-daemon.sh"), action, 'master']
return runProcess(cmd, logdir, False, wait)
def win_exist_pid(pid):
#The os.kill approach does not work on Windows with python 2.7
#the output from tasklist command is searched for the process id
command='tasklist /fi "pid eq '+ pid + '"'
sub_process=subprocess.Popen(command, stdout = subprocess.PIPE, shell=False)
sub_process.communicate()
output = subprocess.check_output(command)
output=split(" *",output)
for line in output:
if pid in line:
return True
return False
def configure_hbase(dir):
env_conf_dir = os.environ.get(HBASE_CONF_DIR)
conf_dir = os.path.join(dir, "hbase", CONF)
tmpl_dir = os.path.join(dir, CONF, "hbase")
if env_conf_dir is None or env_conf_dir == conf_dir:
hbase_conf_file = "hbase-site.xml"
tmpl_file = os.path.join(tmpl_dir, hbase_conf_file + ".template")
conf_file = os.path.join(conf_dir, hbase_conf_file)
if os.path.exists(tmpl_file):
debug ("Configuring " + tmpl_file + " to " + conf_file)
f = open(tmpl_file,'r')
template = f.read()
f.close()
config = template.replace("${hbase_home}", dir)
f = open(conf_file,'w')
f.write(config)
f.close()
os.remove(tmpl_file)
def server_already_running(pid):
print "Atlas server is already running under process %s" % pid
......
......@@ -25,8 +25,6 @@ ATLAS_LOG_OPTS="-Datlas.log.dir=%s -Datlas.log.file=application.log"
ATLAS_COMMAND_OPTS="-Datlas.home=%s"
ATLAS_CONFIG_OPTS="-Datlas.conf=%s"
DEFAULT_JVM_OPTS="-Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=atlas-log4j.xml -Djava.net.preferIPv4Stack=true"
CONF_FILE="atlas-application.properties"
HBASE_STORAGE_CONF_ENTRY="atlas.graph.storage.backend\s*=\s*hbase"
def main():
......@@ -62,7 +60,13 @@ def main():
mc.expandWebApp(atlas_home)
#add hbase-site.xml to classpath
hbase_conf_dir = mc.hbaseConfDir(confdir)
hbase_conf_dir = mc.hbaseConfDir(atlas_home)
if mc.is_hbase_local(confdir):
print "configured for local hbase."
mc.configure_hbase(atlas_home)
mc.run_hbase(mc.hbaseBinDir(atlas_home), "start", hbase_conf_dir, logdir)
print "hbase started."
p = os.pathsep
atlas_classpath = confdir + p \
......@@ -74,9 +78,8 @@ def main():
atlas_classpath = atlas_classpath + p \
+ hbase_conf_dir
else:
storage_backend = mc.grep(os.path.join(confdir, CONF_FILE), HBASE_STORAGE_CONF_ENTRY)
if storage_backend != None:
raise Exception("Could not find hbase-site.xml in %s. Please set env var HBASE_CONF_DIR to the hbase client conf dir", hbase_conf_dir)
if mc.is_hbase(confdir):
raise Exception("Could not find hbase-site.xml in %s. Please set env var HBASE_CONF_DIR to the hbase client conf dir", hbase_conf_dir)
if mc.isCygwin():
atlas_classpath = mc.convertCygwinPath(atlas_classpath, True)
......@@ -88,27 +91,11 @@ def main():
pf = file(atlas_pid_file, 'r')
pid = pf.read().strip()
pf.close()
if mc.ON_POSIX:
if mc.unix_exist_pid((int)(pid)):
mc.server_already_running(pid)
else:
mc.server_pid_not_running(pid)
if mc.exist_pid((int)(pid)):
mc.server_already_running(pid)
else:
if mc.IS_WINDOWS:
if mc.win_exist_pid(pid):
mc.server_already_running(pid)
else:
mc.server_pid_not_running(pid)
else:
#os other than nt or posix - not supported - need to delete the file to restart server if pid no longer exist
mc.server_already_running(pid)
mc.server_pid_not_running(pid)
web_app_path = os.path.join(web_app_dir, "atlas")
if (mc.isCygwin()):
......
......@@ -27,7 +27,7 @@ def main():
atlas_home = mc.atlasDir()
confdir = mc.dirMustExist(mc.confDir(atlas_home))
mc.executeEnvSh(confdir)
piddir = mc.dirMustExist(mc.logDir(atlas_home))
mc.dirMustExist(mc.logDir(atlas_home))
atlas_pid_file = mc.pidFile(atlas_home)
......@@ -40,25 +40,24 @@ def main():
if not pid:
sys.stderr.write("No process ID file found. Server not running?\n")
return
if mc.ON_POSIX:
if not mc.unix_exist_pid(pid):
sys.stderr.write("Server no longer running with pid %s\nImproper shutdown?\npid file deleted.\n" %pid)
os.remove(atlas_pid_file)
return
else:
if mc.IS_WINDOWS:
if not mc.win_exist_pid((str)(pid)):
sys.stderr.write("Server no longer running with pid %s\nImproper shutdown?\npid file deleted.\n" %pid)
os.remove(atlas_pid_file)
return
if not mc.exist_pid(pid):
sys.stderr.write("Server no longer running with pid %s\nImproper shutdown?\npid file deleted.\n" %pid)
os.remove(atlas_pid_file)
return
os.kill(pid, SIGTERM)
mc.wait_for_shutdown(pid, "stopping atlas", 30)
# assuming kill worked since process check on windows is more involved...
if os.path.exists(atlas_pid_file):
os.remove(atlas_pid_file)
# stop hbase
if mc.is_hbase_local(confdir):
mc.run_hbase(mc.hbaseBinDir(atlas_home), "stop", None, None, True)
if __name__ == '__main__':
try:
returncode = main()
......
......@@ -18,11 +18,11 @@
######### Graph Database Configs #########
# Graph Storage
atlas.graph.storage.backend=berkeleyje
atlas.graph.storage.directory=${sys:atlas.home}/data/berkley
#atlas.graph.storage.backend=berkeleyje
#atlas.graph.storage.directory=${sys:atlas.home}/data/berkley
#Hbase as stoarge backend
#hbase
atlas.graph.storage.backend=hbase
#For standalone mode , specify localhost
#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2
atlas.graph.storage.hostname=localhost
......@@ -108,4 +108,4 @@ atlas.server.ha.enabled=false
#atlas.server.ha.zookeeper.session.timeout.ms=20000
## if ACLs need to be set on the created nodes, uncomment these lines and set the values ##
#atlas.server.ha.zookeeper.acl=<scheme>:<id>
#atlas.server.ha.zookeeper.auth=<scheme>:<authinfo>
\ No newline at end of file
#atlas.server.ha.zookeeper.auth=<scheme>:<authinfo>
......@@ -55,6 +55,13 @@
</fileSet>
<fileSet>
<directory>target/hbase</directory>
<outputDirectory>hbase</outputDirectory>
<fileMode>0755</fileMode>
<directoryMode>0755</directoryMode>
</fileSet>
<fileSet>
<directory>../logs</directory>
<outputDirectory>logs</outputDirectory>
<directoryMode>0777</directoryMode>
......
......@@ -30,8 +30,10 @@ IS_WINDOWS = platform.system() == "Windows"
logger = logging.getLogger()
class TestMetadata(unittest.TestCase):
@patch.object(mc,"win_exist_pid")
@patch.object(mc,"unix_exist_pid")
@patch.object(mc,"runProcess")
@patch.object(mc,"configure_hbase")
@patch.object(mc,"grep")
@patch.object(mc,"exist_pid")
@patch.object(mc,"writePid")
@patch.object(mc, "executeEnvSh")
@patch.object(mc,"atlasDir")
......@@ -39,17 +41,19 @@ class TestMetadata(unittest.TestCase):
@patch("os.path.exists")
@patch.object(mc, "java")
def test_main(self, java_mock, exists_mock, expandWebApp_mock, atlasDir_mock, executeEnvSh_mock, writePid_mock, unix_exist_pid_mock, win_exist_pid_mock):
def test_main(self, java_mock, exists_mock, expandWebApp_mock, atlasDir_mock, executeEnvSh_mock, writePid_mock, exist_pid_mock, grep_mock, configure_hbase_mock, runProcess_mock):
sys.argv = []
exists_mock.return_value = True
expandWebApp_mock.return_value = "webapp"
atlasDir_mock.return_value = "atlas_home"
win_exist_pid_mock("789")
win_exist_pid_mock.assert_called_with((str)(789))
unix_exist_pid_mock(789)
unix_exist_pid_mock.assert_called_with(789)
exist_pid_mock(789)
exist_pid_mock.assert_called_with(789)
grep_mock.return_value = "hbase"
atlas.main()
self.assertTrue(configure_hbase_mock.called)
runProcess_mock.assert_called_with(['atlas_home/hbase/bin/hbase-daemon.sh', '--config', 'atlas_home/hbase/conf', 'start', 'master'], 'atlas_home/logs', False, True)
self.assertTrue(java_mock.called)
if IS_WINDOWS:
......
......@@ -53,6 +53,9 @@ Without Ranger, HBase shell can be used to set the permissions.
echo "grant 'atlas', 'RWXCA', 'titan'" | hbase shell
</verbatim>
Note that HBase is included in the distribution so that a standalone instance of HBase can be started as the default
storage backend for the graph repository.
---+++ Graph Search Index
This section sets up the graph db - titan - to use an search indexing system. The example
configuration below sets up to use an embedded Elastic search indexing system.
......
......@@ -33,11 +33,10 @@ enabling hot failover. To minimize service loss, we recommend the following:
---++ Metadata Store
As described above, Atlas uses Titan to store the metadata it manages. By default, Titan uses BerkeleyDB as an embedded
backing store. However, this option would result in loss of data if the node running the Atlas server fails. In order
to provide HA for the metadata store, we recommend that Atlas be configured to use HBase as the backing store for Titan.
Doing this implies that you could benefit from the HA guarantees HBase provides. In order to configure Atlas to use
HBase in HA mode, do the following:
As described above, Atlas uses Titan to store the metadata it manages. By default, Atlas uses a standalone HBase
instance as the backing store for Titan. In order to provide HA for the metadata store, we recommend that Atlas be
configured to use distributed HBase as the backing store for Titan. Doing this implies that you could benefit from the
HA guarantees HBase provides. In order to configure Atlas to use HBase in HA mode, do the following:
* Choose an existing HBase cluster that is set up in HA mode to configure in Atlas (OR) Set up a new HBase cluster in [[http://hbase.apache.org/book.html#quickstart_fully_distributed][HA mode]].
* If setting up HBase for Atlas, please following instructions listed for setting up HBase in the [[InstallationSteps][Installation Steps]].
......@@ -86,4 +85,4 @@ to configure Atlas to use Kafka in HA mode, do the following:
---++ Known Issues
* [[https://issues.apache.org/jira/browse/ATLAS-338][ATLAS-338]]: ATLAS-338: Metadata events generated from a Hive CLI (as opposed to Beeline or any client going !HiveServer2) would be lost if Atlas server is down.
* If the HBase region servers hosting the Atlas ‘titan’ HTable are down, Atlas would not be able to store or retrieve metadata from HBase until they are brought back online.
\ No newline at end of file
* If the HBase region servers hosting the Atlas ‘titan’ HTable are down, Atlas would not be able to store or retrieve metadata from HBase until they are brought back online.
......@@ -33,6 +33,8 @@ Tar is structured as follows
|- conf
|- atlas-application.properties
|- atlas-env.sh
|- hbase
|- hbase-site.xml.template
|- log4j.xml
|- solr
|- currency.xml
......@@ -44,6 +46,10 @@ Tar is structured as follows
|- stopwords.txt
|- synonyms.txt
|- docs
|- hbase
|- bin
|- conf
...
|- server
|- webapp
|- atlas.war
......@@ -55,6 +61,12 @@ Tar is structured as follows
</verbatim>
Note that HBase is included in the distribution so that a standalone instance of HBase can be started as the default
storage backend for the graph repository. During Atlas installation the conf/hbase/hbase-site.xml.template gets
expanded and moved to hbase/conf/hbase-site.xml for the initial standalone HBase configuration. To configure ATLAS
graph persistence for a different HBase instance, please see "Graph persistence engine - HBase" in the
[[Configuration][Configuration]] section.
---+++ Installing & Running Atlas
*Installing Atlas*
......
......@@ -13,6 +13,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
ATLAS-498 Support Embedded HBase (tbeerbower via sumasai)
ATLAS-527 Support lineage for load table, import, export (sumasai via shwethags)
ATLAS-572 Handle secure instance of Zookeeper for leader election.(yhemanth via sumasai)
ATLAS-605 Hook Notifications for DELETE entity needs to be supported (sumasai)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment