Commit ad0d764d by Shwetha GS

ATLAS-539 Store for entity audit events (shwethags)

parent ad7604fc
......@@ -26,5 +26,5 @@ public interface Service {
void start() throws AtlasException;
void stop();
void stop() throws AtlasException;
}
......@@ -52,7 +52,11 @@ public class Services {
public void stop() {
for (Service service : services) {
LOG.debug("Stopping service {}", service.getClass().getName());
service.stop();
try {
service.stop();
} catch (Throwable e) {
LOG.warn("Error stopping service {}", service.getClass().getName(), e);
}
}
}
}
......@@ -880,6 +880,30 @@
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.thinkaurelius.titan</groupId>
<artifactId>titan-es</artifactId>
<version>${titan.version}</version>
......@@ -1447,14 +1471,14 @@
<systemProperties>
<user.dir>${project.basedir}</user.dir>
<atlas.data>${project.build.directory}/data</atlas.data>
<log4j.configuration>atlas-log4j.xml</log4j.configuration>
</systemProperties>
<skipTests>${skipTests}</skipTests>
<forkMode>always</forkMode>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<argLine>-Djava.awt.headless=true -Dproject.version=${project.version}
-Dhadoop.tmp.dir="${project.build.directory}/tmp-hadoop-${user.name}"
-Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=atlas-log4j.xml
-Djava.net.preferIPv4Stack=true
-Xmx1024m -XX:MaxPermSize=512m -Djava.net.preferIPv4Stack=true
</argLine>
<skip>${skipUTs}</skip>
<excludes>
......@@ -1473,16 +1497,17 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.18.1</version>
<version>2.19.1</version>
<configuration>
<systemPropertyVariables>
<projectBaseDir>${projectBaseDir}</projectBaseDir>
<atlas.data>${project.build.directory}/data</atlas.data>
<log4j.configuration>atlas-log4j.xml</log4j.configuration>
</systemPropertyVariables>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<argLine>-Djava.awt.headless=true -Dproject.version=${project.version}
-Dhadoop.tmp.dir="${project.build.directory}/tmp-hadoop-${user.name}"
-Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=atlas-log4j.xml
-Xmx1024m -XX:MaxPermSize=512m
</argLine>
<skip>${skipITs}</skip>
<parallel>none</parallel>
......
......@@ -13,6 +13,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
ATLAS-539 Store for entity audit events (shwethags)
ATLAS-523 Support alter view (sumasai via shwethags)
ATLAS-555 Tag creation from UI fails due to missing description attribute (guptaneeru via shwethags)
ATLAS-522 Support Alter table commands (sumasai via shwethags)
......
......@@ -138,6 +138,18 @@
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
</dependency>
</dependencies>
<build>
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.audit;
import org.apache.atlas.AtlasException;
import org.apache.commons.lang.StringUtils;
import java.util.List;
/**
* Interface for repository for storing entity audit events
*/
public interface EntityAuditRepository {
/**
* Structure of entity audit event
*/
class EntityAuditEvent {
String entityId;
Long timestamp;
String user;
String action;
String details;
public EntityAuditEvent() {
}
public EntityAuditEvent(String entityId, long ts, String user, String action, String details) {
this.entityId = entityId;
this.timestamp = ts;
this.user = user;
this.action = action;
this.details = details;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof EntityAuditEvent)) {
return false;
}
EntityAuditEvent otherEvent = (EntityAuditEvent) other;
return StringUtils.equals(entityId, otherEvent.entityId) &&
(timestamp.longValue() == otherEvent.timestamp.longValue()) &&
StringUtils.equals(user, otherEvent.user) && StringUtils.equals(action, otherEvent.action) &&
StringUtils.equals(details, otherEvent.details);
}
@Override
public int hashCode() {
return toString().hashCode();
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("EntityId=").append(entityId).append(";Timestamp=").append(timestamp).append(";User=")
.append(user).append(";Action=").append(action).append(";Details=").append(details);
return builder.toString();
}
}
/**
* Add events to the event repository
* @param events events to be added
* @throws AtlasException
*/
void putEvents(EntityAuditEvent... events) throws AtlasException;
/**
* List events for the given entity id in decreasing order of timestamp, from the given timestamp. Returns n results
* @param entityId entity id
* @param ts starting timestamp for events
* @param n number of events to be returned
* @return list of events
* @throws AtlasException
*/
List<EntityAuditRepository.EntityAuditEvent> listEvents(String entityId, Long ts, short n) throws AtlasException;
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.audit;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* HBase based repository for entity audit events
* Table -> 1, ATLAS_ENTITY_EVENTS
* Key -> entity id + timestamp
* Column Family -> 1,dt
* Columns -> action, user, detail
* versions -> 1
*
* Note: The timestamp in the key is assumed to be timestamp in nano seconds. Since the key is entity id + timestamp,
* and only 1 version is kept, there can be just 1 audit event per entity id + timestamp. This is ok for one atlas server.
* But if there are more than one atlas servers, we should use server id in the key
*/
public class HBaseBasedAuditRepository implements Service, EntityAuditRepository {
private static final Logger LOG = LoggerFactory.getLogger(HBaseBasedAuditRepository.class);
public static final String CONFIG_PREFIX = "atlas.audit";
public static final String CONFIG_TABLE_NAME = CONFIG_PREFIX + ".hbase.tablename";
public static final String DEFAULT_TABLE_NAME = "ATLAS_ENTITY_AUDIT_EVENTS";
private static final String FIELD_SEPARATOR = ":";
public static final byte[] COLUMN_FAMILY = Bytes.toBytes("dt");
public static final byte[] COLUMN_ACTION = Bytes.toBytes("action");
public static final byte[] COLUMN_DETAIL = Bytes.toBytes("detail");
public static final byte[] COLUMN_USER = Bytes.toBytes("user");
private TableName tableName;
private Connection connection;
/**
* Add events to the event repository
* @param events events to be added
* @throws AtlasException
*/
public void putEvents(EntityAuditRepository.EntityAuditEvent... events) throws AtlasException {
LOG.info("Putting {} events", events.length);
Table table = null;
try {
table = connection.getTable(tableName);
List<Put> puts = new ArrayList<>(events.length);
for (EntityAuditRepository.EntityAuditEvent event : events) {
LOG.debug("Adding entity audit event {}", event);
Put put = new Put(getKey(event.entityId, event.timestamp));
addColumn(put, COLUMN_ACTION, event.action);
addColumn(put, COLUMN_USER, event.user);
addColumn(put, COLUMN_DETAIL, event.details);
puts.add(put);
}
table.put(puts);
} catch (IOException e) {
throw new AtlasException(e);
} finally {
close(table);
}
}
private void addColumn(Put put, byte[] columnName, String columnValue) {
if (StringUtils.isNotEmpty(columnValue)) {
put.addColumn(COLUMN_FAMILY, columnName, Bytes.toBytes(columnValue));
}
}
private byte[] getKey(String id, Long ts) {
assert id != null : "entity id can't be null";
assert ts != null : "timestamp can't be null";
String keyStr = id + FIELD_SEPARATOR + ts;
return Bytes.toBytes(keyStr);
}
/**
* List events for the given entity id in decreasing order of timestamp, from the given timestamp. Returns n results
* @param entityId entity id
* @param ts starting timestamp for events
* @param n number of events to be returned
* @return list of events
* @throws AtlasException
*/
public List<EntityAuditRepository.EntityAuditEvent> listEvents(String entityId, Long ts, short n)
throws AtlasException {
LOG.info("Listing events for entity id {}, starting timestamp {}, #records {}", entityId, ts, n);
Table table = null;
ResultScanner scanner = null;
try {
table = connection.getTable(tableName);
Scan scan = new Scan().setReversed(true).setFilter(new PageFilter(n))
.setStartRow(getKey(entityId, ts))
.setStopRow(Bytes.toBytes(entityId))
.setCaching(n)
.setSmall(true);
scanner = table.getScanner(scan);
Result result;
List<EntityAuditRepository.EntityAuditEvent> events = new ArrayList<>();
//PageFilter doesn't ensure n results are returned. The filter is per region server.
//So, adding extra check on n here
while ((result = scanner.next()) != null && events.size() < n) {
String key = Bytes.toString(result.getRow());
EntityAuditRepository.EntityAuditEvent event = fromKey(key);
event.user = getResultString(result, COLUMN_USER);
event.action = getResultString(result, COLUMN_ACTION);
event.details = getResultString(result, COLUMN_DETAIL);
events.add(event);
}
LOG.info("Got events for entity id {}, starting timestamp {}, #records {}", entityId, ts, events.size());
return events;
} catch (IOException e) {
throw new AtlasException(e);
} finally {
close(scanner);
close(table);
}
}
private String getResultString(Result result, byte[] columnName) {
return Bytes.toString(result.getValue(COLUMN_FAMILY, columnName));
}
private EntityAuditEvent fromKey(String key) {
EntityAuditEvent event = new EntityAuditEvent();
if (StringUtils.isNotEmpty(key)) {
String[] parts = key.split(FIELD_SEPARATOR);
event.entityId = parts[0];
event.timestamp = Long.valueOf(parts[1]);
}
return event;
}
private void close(Closeable closeable) throws AtlasException {
if (closeable != null) {
try {
closeable.close();
} catch (IOException e) {
throw new AtlasException(e);
}
}
}
/**
* Converts atlas' application properties to hadoop conf
* @return
* @throws AtlasException
* @param atlasConf
*/
public org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration atlasConf) throws AtlasException {
Configuration subsetAtlasConf =
ApplicationProperties.getSubsetConfiguration(atlasConf, CONFIG_PREFIX);
org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfiguration.create();
Iterator<String> keys = subsetAtlasConf.getKeys();
while (keys.hasNext()) {
String key = keys.next();
hbaseConf.set(key, subsetAtlasConf.getString(key));
}
return hbaseConf;
}
private void createTableIfNotExists() throws AtlasException {
try {
Admin admin = connection.getAdmin();
LOG.info("Checking if table {} exists", tableName.getNameAsString());
if (!admin.tableExists(tableName)) {
LOG.info("Creating table {}", tableName.getNameAsString());
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnFamily = new HColumnDescriptor(COLUMN_FAMILY);
columnFamily.setMaxVersions(1);
tableDescriptor.addFamily(columnFamily);
admin.createTable(tableDescriptor);
} else {
LOG.info("Table {} exists", tableName.getNameAsString());
}
} catch (IOException e) {
throw new AtlasException(e);
}
}
@Override
public void start() throws AtlasException {
Configuration atlasConf = ApplicationProperties.get();
String tableNameStr = atlasConf.getString(CONFIG_TABLE_NAME, DEFAULT_TABLE_NAME);
tableName = TableName.valueOf(tableNameStr);
try {
org.apache.hadoop.conf.Configuration hbaseConf = getHBaseConfiguration(atlasConf);
connection = ConnectionFactory.createConnection(hbaseConf);
} catch (IOException e) {
throw new AtlasException(e);
}
createTableIfNotExists();
}
@Override
public void stop() throws AtlasException {
close(connection);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.audit;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
public class HBaseBasedAuditRepositoryTest {
private HBaseTestingUtility testUtility;
private HBaseBasedAuditRepository eventRepository;
private LocalHBaseCluster hbaseCluster;
private TableName tableName;
@BeforeClass
public void setup() throws Exception {
testUtility = HBaseTestingUtility.createLocalHTU();
testUtility.startMiniZKCluster();
testUtility.getConfiguration().set("zookeeper.session.timeout.ms", "1000");
hbaseCluster = new LocalHBaseCluster(testUtility.getConfiguration());
hbaseCluster.startup();
eventRepository = new HBaseBasedAuditRepository() {
@Override
public org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration atlasConf)
throws AtlasException {
return testUtility.getConfiguration();
}
};
eventRepository.start();
Configuration properties = ApplicationProperties.get();
String tableNameStr = properties.getString(HBaseBasedAuditRepository.CONFIG_TABLE_NAME,
HBaseBasedAuditRepository.DEFAULT_TABLE_NAME);
tableName = TableName.valueOf(tableNameStr);
}
@AfterClass
public void teardown() throws Exception {
eventRepository.stop();
testUtility.getConnection().close();
hbaseCluster.shutdown();
testUtility.shutdownMiniZKCluster();
}
private String rand() {
return RandomStringUtils.randomAlphanumeric(10);
}
@Test
public void testTableCreated() throws Exception {
Admin admin = testUtility.getConnection().getAdmin();
assertTrue(admin.tableExists(tableName));
}
@Test
public void testAddEvents() throws Exception {
EntityAuditRepository.EntityAuditEvent event =
new EntityAuditRepository.EntityAuditEvent(rand(), System.currentTimeMillis(), "u1", "a1", "d1");
eventRepository.putEvents(event);
List<EntityAuditRepository.EntityAuditEvent> events =
eventRepository.listEvents(event.entityId, System.currentTimeMillis(), (short) 10);
assertEquals(events.size(), 1);
assertEquals(events.get(0), event);
}
@Test
public void testListPagination() throws Exception {
String id1 = "id1" + rand();
String id2 = "id2" + rand();
String id3 = "id3" + rand();
long ts = System.nanoTime();
List<EntityAuditRepository.EntityAuditEvent> expectedEvents = new ArrayList<>(3);
for (int i = 0; i < 3; i++) {
//Add events for both ids
EntityAuditRepository.EntityAuditEvent event =
new EntityAuditRepository.EntityAuditEvent(id2, ts - i, "user" + i, "action" + i, "details" + i);
eventRepository.putEvents(event);
expectedEvents.add(event);
eventRepository.putEvents(new EntityAuditRepository.EntityAuditEvent(id1, ts - i, "user" + i,
"action" + i, "details" + i));
eventRepository.putEvents(new EntityAuditRepository.EntityAuditEvent(id3, ts - i, "user" + i,
"action" + i, "details" + i));
}
//Use ts for which there is no event - ts + 2
List<EntityAuditRepository.EntityAuditEvent> events = eventRepository.listEvents(id2, ts + 2, (short) 2);
assertEquals(events.size(), 2);
assertEquals(events.get(0), expectedEvents.get(0));
assertEquals(events.get(1), expectedEvents.get(1));
//Use last event's timestamp for next list(). Should give only 1 event and shouldn't include events from other id
events = eventRepository.listEvents(id2, events.get(1).timestamp - 1, (short) 3);
assertEquals(events.size(), 1);
assertEquals(events.get(0), expectedEvents.get(2));
}
}
......@@ -78,3 +78,5 @@ atlas.enableTLS=false
atlas.server.https.port=31443
######### Security Properties #########
hbase.security.authentication=simple
......@@ -353,7 +353,11 @@
<systemProperties>
<systemProperty>
<name>log4j.configuration</name>
<value>atlas-log4j.xml</value>
<value>file://${project.build.directory}/../../distro/src/conf/atlas-log4j.xml</value>
</systemProperty>
<systemProperty>
<name>atlas.log.file</name>
<value>application.log</value>
</systemProperty>
<systemProperty>
<name>atlas.log.dir</name>
......
<?xml version="1.0" encoding="UTF-8" ?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="console" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/>
</layout>
</appender>
<appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
<param name="File" value="${atlas.log.dir}/application.log"/>
<param name="Append" value="true"/>
<param name="Threshold" value="debug"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/>
</layout>
</appender>
<appender name="AUDIT" class="org.apache.log4j.DailyRollingFileAppender">
<param name="File" value="${atlas.log.dir}/audit.log"/>
<param name="Append" value="true"/>
<param name="Threshold" value="debug"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %x %m%n"/>
</layout>
</appender>
<logger name="org.apache.atlas" additivity="false">
<level value="debug"/>
<appender-ref ref="FILE"/>
</logger>
<logger name="com.thinkaurelius.titan" additivity="false">
<level value="info"/>
<appender-ref ref="FILE"/>
</logger>
<logger name="AUDIT">
<level value="info"/>
<appender-ref ref="AUDIT"/>
</logger>
<root>
<priority value="warn"/>
<appender-ref ref="FILE"/>
</root>
</log4j:configuration>
......@@ -48,7 +48,7 @@ public class SecureEmbeddedServerTest extends SecureEmbeddedServerTestBase {
};
secureEmbeddedServer.server.start();
URL url = new URL("https://localhost:21443/");
URL url = new URL("https://localhost:21443/api/atlas/admin/version");
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
connection.connect();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment