Commit da8581e9 by Pierre Padovani Committed by David Radley

ATLAS-2470 - JanusGraph Cassandra support

parent 9368c8a0
......@@ -41,6 +41,7 @@
<artifactId>atlas-plugin-classloader</artifactId>
</dependency>
<!-- exclude the dropwizard metrics as it is an older version that conflicts -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
......@@ -55,7 +56,37 @@
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</exclusion>
<exclusion>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-graphite</artifactId>
</exclusion>
<exclusion>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-ganglia</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Update metrics to same version used in Atlas to avoid conflicts -->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>${dropwizard-metrics}</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-graphite</artifactId>
<version>${dropwizard-metrics}</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-ganglia</artifactId>
<version>${dropwizard-metrics}</version>
</dependency>
</dependencies>
</project>
......@@ -86,6 +86,7 @@
</dependency>
<!-- apache storm core dependencies -->
<!-- exclude the dropwizard metrics as it is an older version that conflicts -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
......@@ -99,9 +100,38 @@
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</exclusion>
<exclusion>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-graphite</artifactId>
</exclusion>
<exclusion>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-ganglia</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Update metrics to same version used in Atlas to avoid conflicts -->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>${dropwizard-metrics}</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-graphite</artifactId>
<version>${dropwizard-metrics}</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-ganglia</artifactId>
<version>${dropwizard-metrics}</version>
</dependency>
<!-- Testing dependencies -->
<dependency>
<groupId>org.testng</groupId>
......
......@@ -39,6 +39,12 @@
atlas.graph.storage.hostname=
atlas.graph.storage.hbase.regions-per-server=1
atlas.graph.storage.lock.wait-time=10000
#In order to use Cassandra as a backend, comment out the hbase specific properties above, and uncomment the
#the following properties
#atlas.graph.storage.clustername=
#atlas.graph.storage.port=
</graph.storage.properties>
<graph.index.backend>solr</graph.index.backend>
<graph.index.properties>#Solr
......@@ -184,6 +190,7 @@ atlas.graph.index.search.solr.wait-searcher=true
#atlas.graph.index.search.solr.http-urls=http://localhost:8983/solr
</graph.index.properties>
<cassandra.embedded>false</cassandra.embedded>
<hbase.embedded>true</hbase.embedded>
<solr.embedded>true</solr.embedded>
......@@ -249,6 +256,97 @@ atlas.graph.index.search.solr.wait-searcher=true
</plugins>
</build>
</profile>
<!-- profile to package and configure embedded cassandra and solr with the distribution -->
<profile>
<id>embedded-cassandra-solr</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<properties>
<graph.storage.backend>embeddedcassandra</graph.storage.backend>
<entity.repository.properties>atlas.EntityAuditRepository.impl=org.apache.atlas.repository.audit.CassandraBasedAuditRepository</entity.repository.properties>
<graph.storage.properties>#Cassandra
atlas.graph.storage.conf-file=${sys:atlas.home}/conf/cassandra.yml
</graph.storage.properties>
<graph.index.properties>#Solr
#Solr cloud mode properties
atlas.graph.index.search.solr.mode=cloud
atlas.graph.index.search.solr.zookeeper-url=localhost:2181
atlas.graph.index.search.solr.zookeeper-connect-timeout=60000
atlas.graph.index.search.solr.zookeeper-session-timeout=60000
atlas.graph.index.search.solr.wait-searcher=true
#Solr http mode properties
#atlas.graph.index.search.solr.mode=http
#atlas.graph.index.search.solr.http-urls=http://localhost:8983/solr
</graph.index.properties>
<cassandra.embedded>true</cassandra.embedded>
<hbase.embedded>false</hbase.embedded>
<solr.embedded>true</solr.embedded>
<solr.dir>${project.build.directory}/solr</solr.dir>
<solr.tar>http://archive.apache.org/dist/lucene/solr/${solr.version}/solr-${solr.version}.tgz</solr.tar>
<solr.folder>solr-${solr.version}</solr.folder>
<zk.dir>${project.build.directory}/zk</zk.dir>
<zk.tar>http://archive.apache.org/dist/zookeeper/zookeeper-${zookeeper.version}/zookeeper-${zookeeper.version}.tar.gz</zk.tar>
<zk.folder>zookeeper-${zookeeper.version}</zk.folder>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.7</version>
<executions>
<!-- package solr -->
<execution>
<id>solr</id>
<phase>generate-resources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target name="Download SOLR">
<mkdir dir="${solr.dir}" />
<mkdir dir="${project.basedir}/solr" />
<get src="${solr.tar}" dest="${project.basedir}/solr/${solr.folder}.tgz" usetimestamp="true" verbose="true" skipexisting="true" />
<untar src="${project.basedir}/solr/${solr.folder}.tgz" dest="${project.build.directory}/solr.temp" compression="gzip" />
<copy todir="${solr.dir}">
<fileset dir="${project.build.directory}/solr.temp/${solr.folder}">
<include name="**/*" />
</fileset>
</copy>
</target>
</configuration>
</execution>
<!-- package zookeeper -->
<execution>
<id>zk</id>
<phase>generate-resources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target name="Download zookeeper">
<mkdir dir="${zk.dir}" />
<mkdir dir="${project.basedir}/zk" />
<get src="${zk.tar}" dest="${project.basedir}/zk/${zk.folder}.tgz" usetimestamp="true" verbose="true" skipexisting="true" />
<untar src="${project.basedir}/zk/${zk.folder}.tgz" dest="${project.build.directory}/zk.temp" compression="gzip" />
<copy todir="${zk.dir}">
<fileset dir="${project.build.directory}/zk.temp/${zk.folder}">
<include name="**/*" />
</fileset>
</copy>
</target>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<build>
......
......@@ -46,6 +46,7 @@ ATLAS_HOME = "ATLAS_HOME_DIR"
HBASE_CONF_DIR = "HBASE_CONF_DIR"
MANAGE_LOCAL_HBASE = "MANAGE_LOCAL_HBASE"
MANAGE_LOCAL_SOLR = "MANAGE_LOCAL_SOLR"
MANAGE_EMBEDDED_CASSANDRA = "MANAGE_EMBEDDED_CASSANDRA"
SOLR_BIN = "SOLR_BIN"
SOLR_CONF = "SOLR_CONF"
SOLR_PORT = "SOLR_PORT"
......@@ -56,7 +57,8 @@ SOLR_REPLICATION_FACTOR = "SOLR_REPLICATION_FACTOR"
DEFAULT_SOLR_REPLICATION_FACTOR = "1"
ENV_KEYS = ["JAVA_HOME", ATLAS_OPTS, ATLAS_SERVER_OPTS, ATLAS_SERVER_HEAP, ATLAS_LOG, ATLAS_PID, ATLAS_CONF,
"ATLASCPPATH", ATLAS_DATA, ATLAS_HOME, ATLAS_WEBAPP, HBASE_CONF_DIR, SOLR_PORT]
"ATLASCPPATH", ATLAS_DATA, ATLAS_HOME, ATLAS_WEBAPP, HBASE_CONF_DIR, SOLR_PORT, MANAGE_LOCAL_HBASE,
MANAGE_LOCAL_SOLR, MANAGE_EMBEDDED_CASSANDRA]
IS_WINDOWS = platform.system() == "Windows"
ON_POSIX = 'posix' in sys.builtin_module_names
CONF_FILE="atlas-application.properties"
......@@ -99,6 +101,9 @@ def hbaseBinDir(dir):
def hbaseConfDir(dir):
return os.environ.get(HBASE_CONF_DIR, os.path.join(dir, "hbase", CONF))
def zookeeperBinDir(dir):
return os.environ.get(SOLR_BIN, os.path.join(dir, "zk", BIN))
def solrBinDir(dir):
return os.environ.get(SOLR_BIN, os.path.join(dir, "solr", BIN))
......@@ -430,6 +435,12 @@ def is_solr(confdir):
confdir = os.path.join(confdir, CONF_FILE)
return grep(confdir, SOLR_INDEX_CONF_ENTRY) is not None
def is_cassandra_local(configdir):
if os.environ.get(MANAGE_EMBEDDED_CASSANDRA, "False").lower() == 'false':
return False
return True
def is_solr_local(confdir):
if os.environ.get(MANAGE_LOCAL_SOLR, "False").lower() == 'false':
return False
......@@ -499,6 +510,16 @@ def wait_for_startup(confdir, wait):
sys.stdout.write('\n')
def run_zookeeper(dir, action, logdir = None, wait=True):
zookeeperScript = "zkServer.sh"
if IS_WINDOWS:
zookeeperScript = "zkServer.cmd"
cmd = [os.path.join(dir, zookeeperScript), action, os.path.join(dir, '../../conf/zookeeper/zoo.cfg')]
return runProcess(cmd, logdir, False, wait)
def run_solr(dir, action, zk_url = None, port = None, logdir = None, wait=True):
solrScript = "solr"
......@@ -561,6 +582,48 @@ def configure_hbase(dir):
f.close()
os.remove(tmpl_file)
def configure_zookeeper(dir):
conf_dir = os.path.join(dir, CONF, "zookeeper")
zk_conf_file = "zoo.cfg"
tmpl_file = os.path.join(conf_dir, zk_conf_file + ".template")
conf_file = os.path.join(conf_dir, zk_conf_file)
if os.path.exists(tmpl_file):
debug ("Configuring " + tmpl_file + " to " + conf_file)
f = open(tmpl_file,'r')
template = f.read()
f.close()
config = template.replace("${atlas_home}", dir)
f = open(conf_file,'w')
f.write(config)
f.close()
os.remove(tmpl_file)
def configure_cassandra(dir):
conf_dir = os.path.join(dir, CONF)
cassandra_conf_file = "cassandra.yml"
tmpl_file = os.path.join(conf_dir, cassandra_conf_file + ".template")
conf_file = os.path.join(conf_dir, cassandra_conf_file)
if os.path.exists(tmpl_file):
debug ("Configuring " + tmpl_file + " to " + conf_file)
f = open(tmpl_file,'r')
template = f.read()
f.close()
config = template.replace("${atlas_home}", dir)
f = open(conf_file,'w')
f.write(config)
f.close()
os.remove(tmpl_file)
def server_already_running(pid):
print "Atlas server is already running under process %s" % pid
sys.exit()
......
......@@ -118,6 +118,14 @@ def main():
#solr setup
if mc.is_solr_local(confdir):
print "configured for local solr."
if mc.is_cassandra_local(confdir):
print "Cassandra embedded configured."
mc.configure_cassandra(atlas_home)
mc.configure_zookeeper(atlas_home)
mc.run_zookeeper(mc.zookeeperBinDir(atlas_home), "start", logdir)
print "zookeeper started."
mc.run_solr(mc.solrBinDir(atlas_home), "start", mc.get_solr_zk_url(confdir), mc.solrPort(), logdir)
print "solr started."
......
......@@ -68,8 +68,12 @@ def main():
# stop solr
if mc.is_solr_local(confdir):
mc.run_solr(mc.solrBinDir(atlas_home), "stop", None, mc.solrPort(), None, True)
if mc.is_cassandra_local(confdir):
mc.run_zookeeper(mc.zookeeperBinDir(atlas_home), "stop")
# stop hbase
if mc.is_hbase_local(confdir):
mc.run_hbase_action(mc.hbaseBinDir(atlas_home), "stop", None, None, True)
......
......@@ -24,6 +24,16 @@
#atlas.graphdb.backend=org.apache.atlas.repository.graphdb.janus.AtlasJanusGraphDatabase
# Graph Storage
# Set atlas.graph.storage.backend to the correct value for your desired storage
# backend. Possible values:
#
# hbase
# cassandra
# embeddedcassandra - Should only be set by building Atlas with -Pdist,embedded-cassandra-solr
# berkeleyje
#
# See the configuration documentation for more information about configuring the various storage backends.
#
atlas.graph.storage.backend=${graph.storage.backend}
atlas.graph.storage.hbase.table=apache_atlas_janus
......@@ -52,10 +62,17 @@ ${graph.storage.properties}
#
# Allowed Values:
# org.apache.atlas.repository.audit.HBaseBasedAuditRepository - log entity changes to hbase
# org.apache.atlas.repository.audit.CassandraBasedAuditRepository - log entity changes to cassandra
# org.apache.atlas.repository.audit.NoopEntityAuditRepository - disable the audit repository
#
${entity.repository.properties}
# if Cassandra is used as a backend for audit from the above property, uncomment and set the following
# properties appropriately. If using the embedded cassandra profile, these properties can remain
# commented out.
# atlas.EntityAuditRepository.keyspace=atlas_audit
# atlas.EntityAuditRepository.replicationFactor=1
# Graph Search Index
atlas.graph.index.search.backend=${graph.index.backend}
......
......@@ -60,3 +60,6 @@ export MANAGE_LOCAL_HBASE=${hbase.embedded}
# indicates whether or not a local instance of Solr should be started for Atlas
export MANAGE_LOCAL_SOLR=${solr.embedded}
# indicates whether or not cassandra is the embedded backend for Atlas
export MANAGE_EMBEDDED_CASSANDRA=${cassandra.embedded}
\ No newline at end of file
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Define some default values that can be overridden by system properties
zookeeper.root.logger=INFO, CONSOLE
zookeeper.console.threshold=INFO
zookeeper.log.dir=/CHANGE_ME/logs
zookeeper.log.file=zookeeper.log
zookeeper.log.threshold=DEBUG
zookeeper.tracelog.dir=.
zookeeper.tracelog.file=zookeeper_trace.log
#
# ZooKeeper Logging Configuration
#
# Format is "<default threshold> (, <appender>)+
# DEFAULT: console appender only
log4j.rootLogger=${zookeeper.root.logger}
# Example with rolling log file
#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE
# Example with rolling log file and tracing
#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE
#
# Log INFO level and above messages to the console
#
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Threshold=${zookeeper.console.threshold}
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n
#
# Add ROLLINGFILE to rootLogger to get log file output
# Log DEBUG level and above messages to a log file
log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender
log4j.appender.ROLLINGFILE.Threshold=${zookeeper.log.threshold}
log4j.appender.ROLLINGFILE.File=${zookeeper.log.dir}/${zookeeper.log.file}
# Max log file size of 10MB
log4j.appender.ROLLINGFILE.MaxFileSize=10MB
# uncomment the next line to limit number of backup files
#log4j.appender.ROLLINGFILE.MaxBackupIndex=10
log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n
#
# Add TRACEFILE to rootLogger to get log file output
# Log DEBUG level and above messages to a log file
log4j.appender.TRACEFILE=org.apache.log4j.FileAppender
log4j.appender.TRACEFILE.Threshold=TRACE
log4j.appender.TRACEFILE.File=${zookeeper.tracelog.dir}/${zookeeper.tracelog.file}
log4j.appender.TRACEFILE.layout=org.apache.log4j.PatternLayout
### Notice we are including log4j's NDC here (%x)
log4j.appender.TRACEFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L][%x] - %m%n
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=${atlas_home}/data/zookeeper/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
......@@ -80,6 +80,13 @@
</fileSet>
<fileSet>
<directory>target/zk</directory>
<outputDirectory>zk</outputDirectory>
<fileMode>0755</fileMode>
<directoryMode>0755</directoryMode>
</fileSet>
<fileSet>
<directory>../logs</directory>
<outputDirectory>logs</outputDirectory>
<directoryMode>0777</directoryMode>
......
......@@ -34,6 +34,15 @@ mvn clean -DskipTests package -Pdist,embedded-hbase-solr</verbatim>
Using the embedded-hbase-solr profile will configure Atlas so that an HBase instance and a Solr instance will be started and stopped along with the Atlas server by default.
---+++ Packaging Atlas with Embedded Cassandra & Solr
To create Apache Atlas package that includes Cassandra and Solr, build with the embedded-cassandra-solr profile as shown below:
<verbatim>
mvn clean package -Pdist,embedded-cassandra-solr</verbatim>
Using the embedded-cassandra-solr profile will configure Atlas so that an embedded Cassandra instance and a Solr instance will be started and stopped along with the Atlas server by default.
NOTE: This distribution profile is only intended to be used for single node development not in production.
---+++ Apache Atlas Package
Build will create following files, which are used to install Apache Atlas.
......
......@@ -81,6 +81,40 @@
<groupId>org.janusgraph</groupId>
<artifactId>janusgraph-berkeleyje</artifactId>
<version>${janus.version}</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.janusgraph</groupId>
<artifactId>janusgraph-cassandra</artifactId>
<version>${janus.version}</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.janusgraph</groupId>
<artifactId>janusgraph-cql</artifactId>
<version>${janus.version}</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
......
......@@ -581,6 +581,7 @@
<antlr4.plugin.version>4.5</antlr4.plugin.version>
<maven-site-plugin.version>3.7</maven-site-plugin.version>
<doxia.version>1.8</doxia.version>
<dropwizard-metrics>3.2.2</dropwizard-metrics>
<PermGen>64m</PermGen>
<MaxPermGen>512m</MaxPermGen>
......@@ -671,7 +672,7 @@
<graphdb.backend.impl>org.apache.atlas.repository.graphdb.janus.AtlasJanusGraphDatabase</graphdb.backend.impl>
<graph.index.backend>solr</graph.index.backend>
<tests.solr.embedded>true</tests.solr.embedded>
<distro.exclude.packages>WEB-INF/lib/je-*.jar,WEB-INF/lib/elasticsearch-*.jar,WEB-INF/lib/solr-test-framework-*.jar, WEB-INF/lib/jts-*.jar</distro.exclude.packages>
<distro.exclude.packages>WEB-INF/lib/je-*.jar,WEB-INF/lib/elasticsearch-*.jar,WEB-INF/lib/solr-test-framework-*.jar, WEB-INF/lib/jts-*.jar,WEB-INF/lib/logback-*.jar</distro.exclude.packages>
</properties>
</profile>
......@@ -691,7 +692,7 @@
<graphdb.backend.impl>org.apache.atlas.repository.graphdb.janus.AtlasJanusGraphDatabase</graphdb.backend.impl>
<graph.index.backend>solr</graph.index.backend>
<tests.solr.embedded>true</tests.solr.embedded>
<distro.exclude.packages>WEB-INF/lib/je-*.jar,WEB-INF/lib/elasticsearch-*.jar,WEB-INF/lib/solr-test-framework-*.jar, WEB-INF/lib/jts-*.jar</distro.exclude.packages>
<distro.exclude.packages>WEB-INF/lib/je-*.jar,WEB-INF/lib/elasticsearch-*.jar,WEB-INF/lib/solr-test-framework-*.jar, WEB-INF/lib/jts-*.jar,WEB-INF/lib/logback-*.jar</distro.exclude.packages>
</properties>
</profile>
......
......@@ -89,6 +89,19 @@
<artifactId>mockito-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
<version>2.1.8</version>
</dependency>
<!-- Test dependencies -->
<dependency>
......@@ -156,6 +169,39 @@
<artifactId>guice-multibindings</artifactId>
<version>4.1.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.1.4</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.cassandraunit</groupId>
<artifactId>cassandra-unit</artifactId>
<version>2.0.2.2</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.audit;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.service.Service;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* This abstract base class should be used when adding support for an audit storage backend.
*/
public abstract class AbstractStorageBasedAuditRepository implements Service, EntityAuditRepository, ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(HBaseBasedAuditRepository.class);
private static final String AUDIT_REPOSITORY_MAX_SIZE_PROPERTY = "atlas.hbase.client.keyvalue.maxsize";
private static final String AUDIT_EXCLUDE_ATTRIBUTE_PROPERTY = "atlas.audit.hbase.entity";
protected static final String FIELD_SEPARATOR = ":";
private static final long ATLAS_HBASE_KEYVALUE_DEFAULT_SIZE = 1024 * 1024;
protected static Configuration APPLICATION_PROPERTIES = null;
public static final String CONFIG_PREFIX = "atlas.audit";
public static final String CONFIG_PERSIST_ENTITY_DEFINITION = CONFIG_PREFIX + ".persistEntityDefinition";
protected Map<String, List<String>> auditExcludedAttributesCache = new HashMap<>();
protected static boolean persistEntityDefinition;
static {
try {
persistEntityDefinition = ApplicationProperties.get().getBoolean(CONFIG_PERSIST_ENTITY_DEFINITION, false);
} catch (AtlasException e) {
throw new RuntimeException(e);
}
}
@Override
public void instanceIsActive() throws AtlasException { LOG.info("Reacting to active: No action for now."); }
@Override
public void instanceIsPassive() {
LOG.info("Reacting to passive: No action for now.");
}
@Override
public int getHandlerOrder() {
return HandlerOrder.AUDIT_REPOSITORY.getOrder();
}
@Override
public void putEventsV1(EntityAuditEvent... events) throws AtlasException {
putEventsV1(Arrays.asList(events));
}
@Override
public void putEventsV2(EntityAuditEventV2... events) throws AtlasBaseException {
putEventsV2(Arrays.asList(events));
}
@Override
public List<Object> listEvents(String entityId, String startKey, short maxResults) throws AtlasBaseException {
List ret = listEventsV2(entityId, startKey, maxResults);
try {
if (CollectionUtils.isEmpty(ret)) {
ret = listEventsV1(entityId, startKey, maxResults);
}
} catch (AtlasException e) {
throw new AtlasBaseException(e);
}
return ret;
}
@Override
public long repositoryMaxSize() {
long ret;
initApplicationProperties();
if (APPLICATION_PROPERTIES == null) {
ret = ATLAS_HBASE_KEYVALUE_DEFAULT_SIZE;
} else {
ret = APPLICATION_PROPERTIES.getLong(AUDIT_REPOSITORY_MAX_SIZE_PROPERTY, ATLAS_HBASE_KEYVALUE_DEFAULT_SIZE);
}
return ret;
}
@Override
public List<String> getAuditExcludeAttributes(String entityType) {
List<String> ret = null;
initApplicationProperties();
if (auditExcludedAttributesCache.containsKey(entityType)) {
ret = auditExcludedAttributesCache.get(entityType);
} else if (APPLICATION_PROPERTIES != null) {
String[] excludeAttributes = APPLICATION_PROPERTIES.getStringArray(AUDIT_EXCLUDE_ATTRIBUTE_PROPERTY + "." +
entityType + "." + "attributes.exclude");
if (excludeAttributes != null) {
ret = Arrays.asList(excludeAttributes);
}
auditExcludedAttributesCache.put(entityType, ret);
}
return ret;
}
protected void initApplicationProperties() {
if (APPLICATION_PROPERTIES == null) {
try {
APPLICATION_PROPERTIES = ApplicationProperties.get();
} catch (AtlasException ex) {
// ignore
}
}
}
/**
* Only should be used to initialize Application properties for testing.
*
* @param config
*/
@VisibleForTesting
protected void setApplicationProperties(Configuration config) {
APPLICATION_PROPERTIES = config;
}
protected byte[] getKey(String id, Long ts) {
assert id != null : "entity id can't be null";
assert ts != null : "timestamp can't be null";
String keyStr = id + FIELD_SEPARATOR + ts;
return Bytes.toBytes(keyStr);
}
}
......@@ -80,13 +80,11 @@ import java.util.Map;
@Singleton
@Component
@ConditionalOnAtlasProperty(property = "atlas.EntityAuditRepository.impl", isDefault = true)
public class HBaseBasedAuditRepository implements Service, EntityAuditRepository, ActiveStateChangeHandler {
public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditRepository {
private static final Logger LOG = LoggerFactory.getLogger(HBaseBasedAuditRepository.class);
public static final String CONFIG_PREFIX = "atlas.audit";
public static final String CONFIG_TABLE_NAME = CONFIG_PREFIX + ".hbase.tablename";
public static final String DEFAULT_TABLE_NAME = "ATLAS_ENTITY_AUDIT_EVENTS";
public static final String CONFIG_PERSIST_ENTITY_DEFINITION = CONFIG_PREFIX + ".persistEntityDefinition";
public static final byte[] COLUMN_FAMILY = Bytes.toBytes("dt");
public static final byte[] COLUMN_ACTION = Bytes.toBytes("a");
......@@ -94,23 +92,6 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
public static final byte[] COLUMN_USER = Bytes.toBytes("u");
public static final byte[] COLUMN_DEFINITION = Bytes.toBytes("f");
private static final String AUDIT_REPOSITORY_MAX_SIZE_PROPERTY = "atlas.hbase.client.keyvalue.maxsize";
private static final String AUDIT_EXCLUDE_ATTRIBUTE_PROPERTY = "atlas.audit.hbase.entity";
private static final String FIELD_SEPARATOR = ":";
private static final long ATLAS_HBASE_KEYVALUE_DEFAULT_SIZE = 1024 * 1024;
private static Configuration APPLICATION_PROPERTIES = null;
private static boolean persistEntityDefinition;
private Map<String, List<String>> auditExcludedAttributesCache = new HashMap<>();
static {
try {
persistEntityDefinition = ApplicationProperties.get().getBoolean(CONFIG_PERSIST_ENTITY_DEFINITION, false);
} catch (AtlasException e) {
throw new RuntimeException(e);
}
}
private TableName tableName;
private Connection connection;
......@@ -120,16 +101,6 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
* @throws AtlasException
*/
@Override
public void putEventsV1(EntityAuditEvent... events) throws AtlasException {
putEventsV1(Arrays.asList(events));
}
/**
* Add events to the event repository
* @param events events to be added
* @throws AtlasException
*/
@Override
public void putEventsV1(List<EntityAuditEvent> events) throws AtlasException {
if (LOG.isDebugEnabled()) {
LOG.debug("Putting {} events", events.size());
......@@ -159,11 +130,6 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
}
@Override
public void putEventsV2(EntityAuditEventV2... events) throws AtlasBaseException {
putEventsV2(Arrays.asList(events));
}
@Override
public void putEventsV2(List<EntityAuditEventV2> events) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Putting {} events", events.size());
......@@ -283,34 +249,12 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
}
}
@Override
public List<Object> listEvents(String entityId, String startKey, short maxResults) throws AtlasBaseException {
List ret = listEventsV2(entityId, startKey, maxResults);
try {
if (CollectionUtils.isEmpty(ret)) {
ret = listEventsV1(entityId, startKey, maxResults);
}
} catch (AtlasException e) {
throw new AtlasBaseException(e);
}
return ret;
}
private <T> void addColumn(Put put, byte[] columnName, T columnValue) {
if (columnValue != null && !columnValue.toString().isEmpty()) {
put.addColumn(COLUMN_FAMILY, columnName, Bytes.toBytes(columnValue.toString()));
}
}
private byte[] getKey(String id, Long ts) {
assert id != null : "entity id can't be null";
assert ts != null : "timestamp can't be null";
String keyStr = id + FIELD_SEPARATOR + ts;
return Bytes.toBytes(keyStr);
}
/**
* List events for the given entity id in decreasing order of timestamp, from the given startKey. Returns n results
* @param entityId entity id
......@@ -386,52 +330,6 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
}
}
@Override
public long repositoryMaxSize() {
long ret;
initApplicationProperties();
if (APPLICATION_PROPERTIES == null) {
ret = ATLAS_HBASE_KEYVALUE_DEFAULT_SIZE;
} else {
ret = APPLICATION_PROPERTIES.getLong(AUDIT_REPOSITORY_MAX_SIZE_PROPERTY, ATLAS_HBASE_KEYVALUE_DEFAULT_SIZE);
}
return ret;
}
@Override
public List<String> getAuditExcludeAttributes(String entityType) {
List<String> ret = null;
initApplicationProperties();
if (auditExcludedAttributesCache.containsKey(entityType)) {
ret = auditExcludedAttributesCache.get(entityType);
} else if (APPLICATION_PROPERTIES != null) {
String[] excludeAttributes = APPLICATION_PROPERTIES.getStringArray(AUDIT_EXCLUDE_ATTRIBUTE_PROPERTY + "." +
entityType + "." + "attributes.exclude");
if (excludeAttributes != null) {
ret = Arrays.asList(excludeAttributes);
}
auditExcludedAttributesCache.put(entityType, ret);
}
return ret;
}
private void initApplicationProperties() {
if (APPLICATION_PROPERTIES == null) {
try {
APPLICATION_PROPERTIES = ApplicationProperties.get();
} catch (AtlasException ex) {
// ignore
}
}
}
private String getResultString(Result result, byte[] columnName) {
byte[] rawValue = result.getValue(COLUMN_FAMILY, columnName);
if ( rawValue != null) {
......@@ -560,13 +458,4 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
createTableIfNotExists();
}
@Override
public void instanceIsPassive() {
LOG.info("Reacting to passive: No action for now.");
}
@Override
public int getHandlerOrder() {
return HandlerOrder.HBASE_AUDIT_REPOSITORY.getOrder();
}
}
......@@ -20,6 +20,8 @@ package org.apache.atlas.repository.audit;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
......@@ -42,7 +44,7 @@ public class AuditRepositoryTestBase {
}
@Test
public void testAddEvents() throws Exception {
public void testAddEventsV1() throws Exception {
EntityAuditEvent event = new EntityAuditEvent(rand(), System.currentTimeMillis(), "u1",
EntityAuditEvent.EntityAuditAction.ENTITY_CREATE, "d1", new Referenceable(rand()));
......@@ -51,11 +53,11 @@ public class AuditRepositoryTestBase {
List<EntityAuditEvent> events = eventRepository.listEventsV1(event.getEntityId(), null, (short) 10);
assertEquals(events.size(), 1);
assertEventEquals(events.get(0), event);
assertEventV1Equals(events.get(0), event);
}
@Test
public void testListPagination() throws Exception {
public void testListPaginationV1() throws Exception {
String id1 = "id1" + rand();
String id2 = "id2" + rand();
String id3 = "id3" + rand();
......@@ -76,24 +78,24 @@ public class AuditRepositoryTestBase {
//Use ts for which there is no event - ts + 2
List<EntityAuditEvent> events = eventRepository.listEventsV1(id2, null, (short) 3);
assertEquals(events.size(), 3);
assertEventEquals(events.get(0), expectedEvents.get(0));
assertEventEquals(events.get(1), expectedEvents.get(1));
assertEventEquals(events.get(2), expectedEvents.get(2));
assertEventV1Equals(events.get(0), expectedEvents.get(0));
assertEventV1Equals(events.get(1), expectedEvents.get(1));
assertEventV1Equals(events.get(2), expectedEvents.get(2));
//Use last event's timestamp for next list(). Should give only 1 event and shouldn't include events from other id
events = eventRepository.listEventsV1(id2, events.get(2).getEventKey(), (short) 3);
assertEquals(events.size(), 1);
assertEventEquals(events.get(0), expectedEvents.get(2));
assertEventV1Equals(events.get(0), expectedEvents.get(2));
}
@Test
public void testInvalidEntityId() throws Exception {
public void testInvalidEntityIdV1() throws Exception {
List<EntityAuditEvent> events = eventRepository.listEventsV1(rand(), null, (short) 3);
assertEquals(events.size(), 0);
}
protected void assertEventEquals(EntityAuditEvent actual, EntityAuditEvent expected) {
protected void assertEventV1Equals(EntityAuditEvent actual, EntityAuditEvent expected) {
if (expected != null) {
assertNotNull(actual);
}
......@@ -103,4 +105,69 @@ public class AuditRepositoryTestBase {
assertEquals(actual.getTimestamp(), expected.getTimestamp());
assertEquals(actual.getDetails(), expected.getDetails());
}
@Test
public void testAddEventsV2() throws Exception {
EntityAuditEventV2 event = new EntityAuditEventV2(rand(), System.currentTimeMillis(), "u1",
EntityAuditEventV2.EntityAuditAction.ENTITY_CREATE, "d1", new AtlasEntity(rand()));
eventRepository.putEventsV2(event);
List<EntityAuditEventV2> events = eventRepository.listEventsV2(event.getEntityId(), null, (short) 10);
assertEquals(events.size(), 1);
assertEventV2Equals(events.get(0), event);
}
@Test
public void testListPaginationV2() throws Exception {
String id1 = "id1" + rand();
String id2 = "id2" + rand();
String id3 = "id3" + rand();
long ts = System.currentTimeMillis();
AtlasEntity entity = new AtlasEntity(rand());
List<EntityAuditEventV2> expectedEvents = new ArrayList<>(3);
for (int i = 0; i < 3; i++) {
//Add events for both ids
EntityAuditEventV2 event = new EntityAuditEventV2(id2, ts - i, "user" + i, EntityAuditEventV2.EntityAuditAction.ENTITY_UPDATE, "details" + i, entity);
eventRepository.putEventsV2(event);
expectedEvents.add(event);
eventRepository.putEventsV2(new EntityAuditEventV2(id1, ts - i, "user" + i, EntityAuditEventV2.EntityAuditAction.ENTITY_DELETE, "details" + i, entity));
eventRepository.putEventsV2(new EntityAuditEventV2(id3, ts - i, "user" + i, EntityAuditEventV2.EntityAuditAction.ENTITY_CREATE, "details" + i, entity));
}
//Use ts for which there is no event - ts + 2
List<EntityAuditEventV2> events = eventRepository.listEventsV2(id2, null, (short) 3);
assertEquals(events.size(), 3);
assertEventV2Equals(events.get(0), expectedEvents.get(0));
assertEventV2Equals(events.get(1), expectedEvents.get(1));
assertEventV2Equals(events.get(2), expectedEvents.get(2));
//Use last event's timestamp for next list(). Should give only 1 event and shouldn't include events from other id
events = eventRepository.listEventsV2(id2, events.get(2).getEventKey(), (short) 3);
assertEquals(events.size(), 1);
assertEventV2Equals(events.get(0), expectedEvents.get(2));
}
@Test
public void testInvalidEntityIdV2() throws Exception {
List<EntityAuditEvent> events = eventRepository.listEventsV1(rand(), null, (short) 3);
assertEquals(events.size(), 0);
}
protected void assertEventV2Equals(EntityAuditEventV2 actual, EntityAuditEventV2 expected) {
if (expected != null) {
assertNotNull(actual);
}
assertEquals(actual.getEntityId(), expected.getEntityId());
assertEquals(actual.getAction(), expected.getAction());
assertEquals(actual.getTimestamp(), expected.getTimestamp());
assertEquals(actual.getDetails(), expected.getDetails());
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.audit;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.MapConfiguration;
import org.apache.thrift.transport.TTransportException;
import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import org.testng.annotations.BeforeClass;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class CassandraAuditRepositoryTest extends AuditRepositoryTestBase {
@BeforeClass
public void setup() throws InterruptedException, TTransportException, ConfigurationException, IOException,
AtlasException {
EmbeddedCassandraServerHelper.startEmbeddedCassandra("cassandra_test.yml");
eventRepository = new CassandraBasedAuditRepository();
Map<String, Object> props = new HashMap<>();
props.put(CassandraBasedAuditRepository.MANAGE_EMBEDDED_CASSANDRA, Boolean.TRUE);
props.put(CassandraBasedAuditRepository.CASSANDRA_CLUSTERNAME_PROPERTY, "Test Cluster");
props.put(CassandraBasedAuditRepository.CASSANDRA_HOSTNAME_PROPERTY, "localhost");
props.put(CassandraBasedAuditRepository.CASSANDRA_PORT_PROPERTY, 9042);
Configuration atlasConf = new MapConfiguration(props);
((CassandraBasedAuditRepository)eventRepository).setApplicationProperties(atlasConf);
((CassandraBasedAuditRepository)eventRepository).start();
// Pause for a second to ensure that the embedded cluster has started
Thread.sleep(1000);
}
}
......@@ -27,7 +27,7 @@ import org.apache.atlas.AtlasException;
*/
public interface ActiveStateChangeHandler {
public enum HandlerOrder {
HBASE_AUDIT_REPOSITORY(0),
AUDIT_REPOSITORY(0),
GRAPH_BACKED_SEARCH_INDEXER(1),
TYPEDEF_STORE_INITIALIZER(2),
DEFAULT_METADATA_SERVICE(3),
......
......@@ -44,4 +44,8 @@
<int name="connTimeout">${connTimeout:15000}</int>
</shardHandlerFactory>
<metrics>
</metrics>
</solr>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment