Commit c2356f8e by Shwetha GS

ATLAS-540 API to retrieve entity version events (shwethags)

parent 85afbefc
...@@ -205,8 +205,7 @@ ...@@ -205,8 +205,7 @@
<groupId>org.eclipse.jetty</groupId> <groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-maven-plugin</artifactId> <artifactId>jetty-maven-plugin</artifactId>
<configuration> <configuration>
<!--<skip>${skipTests}</skip>--> <skip>${skipTests}</skip>
<!--only skip int tests -->
<httpConnector> <httpConnector>
<port>31000</port> <port>31000</port>
<idleTimeout>60000</idleTimeout> <idleTimeout>60000</idleTimeout>
......
...@@ -38,7 +38,6 @@ import org.apache.falcon.entity.v0.feed.Feed; ...@@ -38,7 +38,6 @@ import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.security.CurrentUser; import org.apache.falcon.security.CurrentUser;
import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -63,11 +62,10 @@ public class FalconHookIT { ...@@ -63,11 +62,10 @@ public class FalconHookIT {
private AtlasClient atlasClient; private AtlasClient atlasClient;
private static final ConfigurationStore STORE = ConfigurationStore.get(); private static final ConfigurationStore STORE = ConfigurationStore.get();
private Configuration atlasProperties;
@BeforeClass @BeforeClass
public void setUp() throws Exception { public void setUp() throws Exception {
atlasProperties = ApplicationProperties.get(); Configuration atlasProperties = ApplicationProperties.get();
atlasClient = new AtlasClient(atlasProperties.getString("atlas.rest.address")); atlasClient = new AtlasClient(atlasProperties.getString("atlas.rest.address"));
AtlasService service = new AtlasService(); AtlasService service = new AtlasService();
...@@ -83,8 +81,7 @@ public class FalconHookIT { ...@@ -83,8 +81,7 @@ public class FalconHookIT {
return; return;
} }
HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasProperties, HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasClient);
UserGroupInformation.getCurrentUser().getShortUserName(), UserGroupInformation.getCurrentUser());
hiveMetaStoreBridge.registerHiveDataModel(); hiveMetaStoreBridge.registerHiveDataModel();
FalconDataModelGenerator dataModelGenerator = new FalconDataModelGenerator(); FalconDataModelGenerator dataModelGenerator = new FalconDataModelGenerator();
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
package org.apache.atlas.hive.bridge; package org.apache.atlas.hive.bridge;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
...@@ -33,7 +32,6 @@ import org.apache.atlas.typesystem.Struct; ...@@ -33,7 +32,6 @@ import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.typesystem.json.TypesSerialization; import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FieldSchema;
...@@ -66,32 +64,19 @@ public class HiveMetaStoreBridge { ...@@ -66,32 +64,19 @@ public class HiveMetaStoreBridge {
public static final String TABLE_TYPE_ATTR = "tableType"; public static final String TABLE_TYPE_ATTR = "tableType";
public static final String SEARCH_ENTRY_GUID_ATTR = "__guid"; public static final String SEARCH_ENTRY_GUID_ATTR = "__guid";
public static final String LAST_ACCESS_TIME_ATTR = "lastAccessTime"; public static final String LAST_ACCESS_TIME_ATTR = "lastAccessTime";
private final String clusterName; private final String clusterName;
public static final String ATLAS_ENDPOINT = "atlas.rest.address"; public static final String ATLAS_ENDPOINT = "atlas.rest.address";
private final String doAsUser;
private final UserGroupInformation ugi;
private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreBridge.class); private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreBridge.class);
public final Hive hiveClient; public final Hive hiveClient;
private final AtlasClient atlasClient; private AtlasClient atlasClient = null;
/**
* Construct a HiveMetaStoreBridge.
* @param hiveConf {@link HiveConf} for Hive component in the cluster
* @param atlasConf {@link Configuration} for Atlas component in the cluster
* @throws Exception
*/
public HiveMetaStoreBridge(HiveConf hiveConf, Configuration atlasConf) throws Exception {
this(hiveConf, atlasConf, null, null);
}
@VisibleForTesting
HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient) { HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient) {
this(clusterName, hiveClient, atlasClient, null, null); this.clusterName = clusterName;
this.hiveClient = hiveClient;
this.atlasClient = atlasClient;
} }
public String getClusterName() { public String getClusterName() {
...@@ -101,26 +86,20 @@ public class HiveMetaStoreBridge { ...@@ -101,26 +86,20 @@ public class HiveMetaStoreBridge {
/** /**
* Construct a HiveMetaStoreBridge. * Construct a HiveMetaStoreBridge.
* @param hiveConf {@link HiveConf} for Hive component in the cluster * @param hiveConf {@link HiveConf} for Hive component in the cluster
* @param doAsUser The user accessing Atlas service
* @param ugi {@link UserGroupInformation} representing the Atlas service
*/ */
public HiveMetaStoreBridge(HiveConf hiveConf, Configuration atlasConf, String doAsUser, public HiveMetaStoreBridge(HiveConf hiveConf) throws Exception {
UserGroupInformation ugi) throws Exception { this(hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME), Hive.get(hiveConf), null);
this(hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME),
Hive.get(hiveConf),
new AtlasClient(atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL), ugi, doAsUser), doAsUser, ugi);
} }
@VisibleForTesting /**
HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient, String user, UserGroupInformation ugi) { * Construct a HiveMetaStoreBridge.
this.clusterName = clusterName; * @param hiveConf {@link HiveConf} for Hive component in the cluster
this.hiveClient = hiveClient; */
this.atlasClient = atlasClient; public HiveMetaStoreBridge(HiveConf hiveConf, AtlasClient atlasClient) throws Exception {
this.doAsUser = user; this(hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME), Hive.get(hiveConf), atlasClient);
this.ugi = ugi;
} }
private AtlasClient getAtlasClient() { AtlasClient getAtlasClient() {
return atlasClient; return atlasClient;
} }
...@@ -200,7 +179,7 @@ public class HiveMetaStoreBridge { ...@@ -200,7 +179,7 @@ public class HiveMetaStoreBridge {
String entityJSON = InstanceSerialization.toJson(referenceable, true); String entityJSON = InstanceSerialization.toJson(referenceable, true);
LOG.debug("Submitting new entity {} = {}", referenceable.getTypeName(), entityJSON); LOG.debug("Submitting new entity {} = {}", referenceable.getTypeName(), entityJSON);
JSONArray guids = atlasClient.createEntity(entityJSON); JSONArray guids = getAtlasClient().createEntity(entityJSON);
LOG.debug("created instance for type " + typeName + ", guid: " + guids); LOG.debug("created instance for type " + typeName + ", guid: " + guids);
return new Referenceable(guids.getString(0), referenceable.getTypeName(), null); return new Referenceable(guids.getString(0), referenceable.getTypeName(), null);
...@@ -539,7 +518,11 @@ public class HiveMetaStoreBridge { ...@@ -539,7 +518,11 @@ public class HiveMetaStoreBridge {
public static void main(String[] argv) throws Exception { public static void main(String[] argv) throws Exception {
Configuration atlasConf = ApplicationProperties.get(); Configuration atlasConf = ApplicationProperties.get();
HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasConf); String atlasEndpoint = atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
AtlasClient atlasClient = new AtlasClient(atlasEndpoint, ugi, ugi.getShortUserName());
HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasClient);
hiveMetaStoreBridge.registerHiveDataModel(); hiveMetaStoreBridge.registerHiveDataModel();
hiveMetaStoreBridge.importHiveMetadata(); hiveMetaStoreBridge.importHiveMetadata();
} }
......
...@@ -46,7 +46,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; ...@@ -46,7 +46,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.json.JSONObject; import org.json.JSONObject;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -290,7 +289,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -290,7 +289,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
LOG.info("Entered Atlas hook for hook type {} operation {}", event.getHookType(), event.getOperation()); LOG.info("Entered Atlas hook for hook type {} operation {}", event.getHookType(), event.getOperation());
HiveMetaStoreBridge dgiBridge = new HiveMetaStoreBridge(hiveConf, atlasProperties, event.getUser(), event.getUgi()); HiveMetaStoreBridge dgiBridge = new HiveMetaStoreBridge(hiveConf);
switch (event.getOperation()) { switch (event.getOperation()) {
case CREATEDATABASE: case CREATEDATABASE:
......
...@@ -89,7 +89,7 @@ public class HiveHookIT { ...@@ -89,7 +89,7 @@ public class HiveHookIT {
Configuration configuration = ApplicationProperties.get(); Configuration configuration = ApplicationProperties.get();
dgiCLient = new AtlasClient(configuration.getString(HiveMetaStoreBridge.ATLAS_ENDPOINT, DGI_URL)); dgiCLient = new AtlasClient(configuration.getString(HiveMetaStoreBridge.ATLAS_ENDPOINT, DGI_URL));
HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(conf, configuration); HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(conf, dgiCLient);
hiveMetaStoreBridge.registerHiveDataModel(); hiveMetaStoreBridge.registerHiveDataModel();
} }
......
...@@ -28,7 +28,6 @@ import org.apache.atlas.sqoop.model.SqoopDataModelGenerator; ...@@ -28,7 +28,6 @@ import org.apache.atlas.sqoop.model.SqoopDataModelGenerator;
import org.apache.atlas.sqoop.model.SqoopDataTypes; import org.apache.atlas.sqoop.model.SqoopDataTypes;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.sqoop.SqoopJobDataPublisher; import org.apache.sqoop.SqoopJobDataPublisher;
import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
...@@ -43,20 +42,19 @@ public class SqoopHookIT { ...@@ -43,20 +42,19 @@ public class SqoopHookIT {
private static final String CLUSTER_NAME = "primary"; private static final String CLUSTER_NAME = "primary";
public static final String DEFAULT_DB = "default"; public static final String DEFAULT_DB = "default";
private static final int MAX_WAIT_TIME = 2000; private static final int MAX_WAIT_TIME = 2000;
private AtlasClient dgiCLient; private AtlasClient atlasClient;
@BeforeClass @BeforeClass
public void setUp() throws Exception { public void setUp() throws Exception {
//Set-up sqoop session //Set-up sqoop session
Configuration configuration = ApplicationProperties.get(); Configuration configuration = ApplicationProperties.get();
dgiCLient = new AtlasClient(configuration.getString("atlas.rest.address")); atlasClient = new AtlasClient(configuration.getString("atlas.rest.address"));
registerDataModels(dgiCLient, configuration); registerDataModels(atlasClient);
} }
private void registerDataModels(AtlasClient client, Configuration atlasConf) throws Exception { private void registerDataModels(AtlasClient client) throws Exception {
// Make sure hive model exists // Make sure hive model exists
HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasConf, HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasClient);
UserGroupInformation.getCurrentUser().getShortUserName(), UserGroupInformation.getCurrentUser());
hiveMetaStoreBridge.registerHiveDataModel(); hiveMetaStoreBridge.registerHiveDataModel();
SqoopDataModelGenerator dataModelGenerator = new SqoopDataModelGenerator(); SqoopDataModelGenerator dataModelGenerator = new SqoopDataModelGenerator();
...@@ -118,12 +116,12 @@ public class SqoopHookIT { ...@@ -118,12 +116,12 @@ public class SqoopHookIT {
waitFor(MAX_WAIT_TIME, new Predicate() { waitFor(MAX_WAIT_TIME, new Predicate() {
@Override @Override
public boolean evaluate() throws Exception { public boolean evaluate() throws Exception {
JSONArray results = dgiCLient.search(query); JSONArray results = atlasClient.search(query);
return results.length() > 0; return results.length() > 0;
} }
}); });
JSONArray results = dgiCLient.search(query); JSONArray results = atlasClient.search(query);
JSONObject row = results.getJSONObject(0).getJSONObject("t"); JSONObject row = results.getJSONObject(0).getJSONObject("t");
return row.getString("id"); return row.getString("id");
......
...@@ -71,11 +71,16 @@ public class AtlasClient { ...@@ -71,11 +71,16 @@ public class AtlasClient {
public static final String ROWS = "rows"; public static final String ROWS = "rows";
public static final String DATATYPE = "dataType"; public static final String DATATYPE = "dataType";
public static final String EVENTS = "events";
public static final String START_KEY = "startKey";
public static final String NUM_RESULTS = "count";
public static final String BASE_URI = "api/atlas/"; public static final String BASE_URI = "api/atlas/";
public static final String ADMIN_VERSION = "admin/version"; public static final String ADMIN_VERSION = "admin/version";
public static final String ADMIN_STATUS = "admin/status"; public static final String ADMIN_STATUS = "admin/status";
public static final String TYPES = "types"; public static final String TYPES = "types";
public static final String URI_ENTITY = "entities"; public static final String URI_ENTITY = "entities";
public static final String URI_ENTITY_AUDIT = "audit";
public static final String URI_SEARCH = "discovery/search"; public static final String URI_SEARCH = "discovery/search";
public static final String URI_LINEAGE = "lineage/hive/table"; public static final String URI_LINEAGE = "lineage/hive/table";
...@@ -351,6 +356,9 @@ public class AtlasClient { ...@@ -351,6 +356,9 @@ public class AtlasClient {
DELETE_ENTITIES(BASE_URI + URI_ENTITY, HttpMethod.DELETE, Response.Status.OK), DELETE_ENTITIES(BASE_URI + URI_ENTITY, HttpMethod.DELETE, Response.Status.OK),
DELETE_ENTITY(BASE_URI + URI_ENTITY, HttpMethod.DELETE, Response.Status.OK), DELETE_ENTITY(BASE_URI + URI_ENTITY, HttpMethod.DELETE, Response.Status.OK),
//audit operation
LIST_ENTITY_AUDIT(BASE_URI + URI_ENTITY, HttpMethod.GET, Response.Status.OK),
//Trait operations //Trait operations
ADD_TRAITS(BASE_URI + URI_ENTITY, HttpMethod.POST, Response.Status.CREATED), ADD_TRAITS(BASE_URI + URI_ENTITY, HttpMethod.POST, Response.Status.CREATED),
DELETE_TRAITS(BASE_URI + URI_ENTITY, HttpMethod.DELETE, Response.Status.OK), DELETE_TRAITS(BASE_URI + URI_ENTITY, HttpMethod.DELETE, Response.Status.OK),
...@@ -396,7 +404,12 @@ public class AtlasClient { ...@@ -396,7 +404,12 @@ public class AtlasClient {
*/ */
public List<String> createType(String typeAsJson) throws AtlasServiceException { public List<String> createType(String typeAsJson) throws AtlasServiceException {
JSONObject response = callAPI(API.CREATE_TYPE, typeAsJson); JSONObject response = callAPI(API.CREATE_TYPE, typeAsJson);
return extractResults(response, AtlasClient.TYPES); return extractResults(response, AtlasClient.TYPES, new ExtractOperation<String, JSONObject>() {
@Override
String extractElement(JSONObject element) throws JSONException {
return element.getString(AtlasClient.NAME);
}
});
} }
/** /**
...@@ -417,7 +430,12 @@ public class AtlasClient { ...@@ -417,7 +430,12 @@ public class AtlasClient {
*/ */
public List<String> updateType(String typeAsJson) throws AtlasServiceException { public List<String> updateType(String typeAsJson) throws AtlasServiceException {
JSONObject response = callAPI(API.UPDATE_TYPE, typeAsJson); JSONObject response = callAPI(API.UPDATE_TYPE, typeAsJson);
return extractResults(response, AtlasClient.TYPES); return extractResults(response, AtlasClient.TYPES, new ExtractOperation<String, JSONObject>() {
@Override
String extractElement(JSONObject element) throws JSONException {
return element.getString(AtlasClient.NAME);
}
});
} }
/** /**
...@@ -432,7 +450,7 @@ public class AtlasClient { ...@@ -432,7 +450,7 @@ public class AtlasClient {
public List<String> listTypes() throws AtlasServiceException { public List<String> listTypes() throws AtlasServiceException {
final JSONObject jsonObject = callAPI(API.LIST_TYPES, null); final JSONObject jsonObject = callAPI(API.LIST_TYPES, null);
return extractResults(jsonObject, AtlasClient.RESULTS); return extractResults(jsonObject, AtlasClient.RESULTS, new ExtractOperation<String, String>());
} }
public String getType(String typeName) throws AtlasServiceException { public String getType(String typeName) throws AtlasServiceException {
...@@ -611,7 +629,7 @@ public class AtlasClient { ...@@ -611,7 +629,7 @@ public class AtlasClient {
return resource; return resource;
} }
}); });
return extractResults(jsonResponse, GUID); return extractResults(jsonResponse, GUID, new ExtractOperation<String, String>());
} }
/** /**
...@@ -621,14 +639,15 @@ public class AtlasClient { ...@@ -621,14 +639,15 @@ public class AtlasClient {
* @param uniqueAttributeValue Attribute Value that uniquely identifies the entity * @param uniqueAttributeValue Attribute Value that uniquely identifies the entity
* @return List of deleted entity guids(including composite references from that entity) * @return List of deleted entity guids(including composite references from that entity)
*/ */
public List<String> deleteEntity(String entityType, String uniqueAttributeName, String uniqueAttributeValue) throws AtlasServiceException { public List<String> deleteEntity(String entityType, String uniqueAttributeName, String uniqueAttributeValue)
throws AtlasServiceException {
API api = API.DELETE_ENTITY; API api = API.DELETE_ENTITY;
WebResource resource = getResource(api); WebResource resource = getResource(api);
resource = resource.queryParam(TYPE, entityType); resource = resource.queryParam(TYPE, entityType);
resource = resource.queryParam(ATTRIBUTE_NAME, uniqueAttributeName); resource = resource.queryParam(ATTRIBUTE_NAME, uniqueAttributeName);
resource = resource.queryParam(ATTRIBUTE_VALUE, uniqueAttributeValue); resource = resource.queryParam(ATTRIBUTE_VALUE, uniqueAttributeValue);
JSONObject jsonResponse = callAPIWithResource(API.DELETE_ENTITIES, resource, null); JSONObject jsonResponse = callAPIWithResource(API.DELETE_ENTITIES, resource, null);
return extractResults(jsonResponse, GUID); return extractResults(jsonResponse, GUID, new ExtractOperation<String, String>());
} }
/** /**
...@@ -698,20 +717,23 @@ public class AtlasClient { ...@@ -698,20 +717,23 @@ public class AtlasClient {
return resource; return resource;
} }
}); });
return extractResults(jsonResponse, AtlasClient.RESULTS); return extractResults(jsonResponse, AtlasClient.RESULTS, new ExtractOperation<String, String>());
}
private class ExtractOperation<T, U> {
T extractElement(U element) throws JSONException {
return (T) element;
}
} }
private List<String> extractResults(JSONObject jsonResponse, String key) throws AtlasServiceException { private <T, U> List<T> extractResults(JSONObject jsonResponse, String key, ExtractOperation<T, U> extractInterafce)
throws AtlasServiceException {
try { try {
JSONArray results = jsonResponse.getJSONArray(key); JSONArray results = jsonResponse.getJSONArray(key);
ArrayList<String> resultsList = new ArrayList<>(); ArrayList<T> resultsList = new ArrayList<>();
for (int index = 0; index < results.length(); index++) { for (int index = 0; index < results.length(); index++) {
Object element = results.get(index); Object element = results.get(index);
if (element instanceof String) { resultsList.add(extractInterafce.extractElement((U) element));
resultsList.add((String) element);
} else if (element instanceof JSONObject) {
resultsList.add(((JSONObject) element).getString(AtlasClient.NAME));
}
} }
return resultsList; return resultsList;
} catch (JSONException e) { } catch (JSONException e) {
...@@ -720,6 +742,44 @@ public class AtlasClient { ...@@ -720,6 +742,44 @@ public class AtlasClient {
} }
/** /**
* Get the latest numResults entity audit events in decreasing order of timestamp for the given entity id
* @param entityId entity id
* @param numResults number of results to be returned
* @return list of audit events for the entity id
* @throws AtlasServiceException
*/
public List<EntityAuditEvent> getEntityAuditEvents(String entityId, short numResults)
throws AtlasServiceException {
return getEntityAuditEvents(entityId, null, numResults);
}
/**
* Get the entity audit events in decreasing order of timestamp for the given entity id
* @param entityId entity id
* @param startKey key for the first event to be returned, used for pagination
* @param numResults number of results to be returned
* @return list of audit events for the entity id
* @throws AtlasServiceException
*/
public List<EntityAuditEvent> getEntityAuditEvents(String entityId, String startKey, short numResults)
throws AtlasServiceException {
WebResource resource = getResource(API.LIST_ENTITY_AUDIT, entityId, URI_ENTITY_AUDIT);
if (StringUtils.isNotEmpty(startKey)) {
resource = resource.queryParam(START_KEY, startKey);
}
resource = resource.queryParam(NUM_RESULTS, String.valueOf(numResults));
JSONObject jsonResponse = callAPIWithResource(API.LIST_ENTITY_AUDIT, resource, null);
return extractResults(jsonResponse, AtlasClient.EVENTS, new ExtractOperation<EntityAuditEvent, JSONObject>() {
@Override
EntityAuditEvent extractElement(JSONObject element) throws JSONException {
return EntityAuditEvent.GSON.fromJson(element.toString(), EntityAuditEvent.class);
}
});
}
/**
* Search using gremlin/dsl/full text * Search using gremlin/dsl/full text
* @param searchQuery * @param searchQuery
* @return * @return
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.commons.lang.StringUtils;
/**
* Structure of entity audit event
*/
public class EntityAuditEvent {
public static final Gson GSON = new GsonBuilder().create();
public enum EntityAuditAction {
ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, TAG_ADD, TAG_DELETE
}
private String entityId;
private long timestamp;
private String user;
private EntityAuditAction action;
private String details;
private String eventKey;
public EntityAuditEvent() {
}
public EntityAuditEvent(String entityId, Long ts, String user, EntityAuditAction action, String details) {
this.entityId = entityId;
this.timestamp = ts;
this.user = user;
this.action = action;
this.details = details;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof EntityAuditEvent)) {
return false;
}
EntityAuditEvent otherEvent = (EntityAuditEvent) other;
return StringUtils.equals(entityId, otherEvent.entityId) &&
(timestamp == otherEvent.timestamp) &&
StringUtils.equals(user, otherEvent.user) && (action == otherEvent.action) &&
StringUtils.equals(details, otherEvent.details) &&
StringUtils.equals(eventKey, otherEvent.eventKey);
}
@Override
public int hashCode() {
return toString().hashCode();
}
@Override
public String toString() {
return GSON.toJson(this);
}
public static EntityAuditEvent fromString(String eventString) {
return GSON.fromJson(eventString, EntityAuditEvent.class);
}
public String getEntityId() {
return entityId;
}
public long getTimestamp() {
return timestamp;
}
public String getUser() {
return user;
}
public EntityAuditAction getAction() {
return action;
}
public String getDetails() {
return details;
}
public void setEntityId(String entityId) {
this.entityId = entityId;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public void setUser(String user) {
this.user = user;
}
public void setAction(EntityAuditAction action) {
this.action = action;
}
public void setDetails(String details) {
this.details = details;
}
public String getEventKey() {
return eventKey;
}
public void setEventKey(String eventKey) {
this.eventKey = eventKey;
}
}
...@@ -151,4 +151,19 @@ public final class ParamChecker { ...@@ -151,4 +151,19 @@ public final class ParamChecker {
} }
return list; return list;
} }
/**
* Checks that the given value is <= max value.
* @param value
* @param maxValue
* @param name
*/
public static void lessThan(short value, short maxValue, String name) {
if (value <= 0) {
throw new IllegalArgumentException(name + " should be > 0, current value " + value);
}
if (value > maxValue) {
throw new IllegalArgumentException(name + " should be <= " + maxValue + ", current value " + value);
}
}
} }
...@@ -88,6 +88,7 @@ ...@@ -88,6 +88,7 @@
src="${hbase.tar}" src="${hbase.tar}"
dest="${project.build.directory}/hbase.tar.gz" dest="${project.build.directory}/hbase.tar.gz"
usetimestamp="true" usetimestamp="true"
verbose="true" skipexisting="true"
/> />
<untar <untar
src="${project.build.directory}/hbase.tar.gz" src="${project.build.directory}/hbase.tar.gz"
...@@ -118,6 +119,7 @@ ...@@ -118,6 +119,7 @@
<descriptor>src/main/assemblies/src-package.xml</descriptor> <descriptor>src/main/assemblies/src-package.xml</descriptor>
</descriptors> </descriptors>
<finalName>apache-atlas-${project.version}</finalName> <finalName>apache-atlas-${project.version}</finalName>
<tarLongFileMode>gnu</tarLongFileMode>
</configuration> </configuration>
</execution> </execution>
</executions> </executions>
......
...@@ -97,6 +97,11 @@ atlas.http.authentication.type=simple ...@@ -97,6 +97,11 @@ atlas.http.authentication.type=simple
######### Server Properties ######### ######### Server Properties #########
atlas.rest.address=http://localhost:21000 atlas.rest.address=http://localhost:21000
######### Entity Audit Configs #########
atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS
atlas.audit.zookeeper.session.timeout.ms=1000
atlas.audit.hbase.zookeeper.quorum=localhost:2181
######### High Availability Configuration ######## ######### High Availability Configuration ########
atlas.server.ha.enabled=false atlas.server.ha.enabled=false
#### Enabled the configs below as per need if HA is enabled ##### #### Enabled the configs below as per need if HA is enabled #####
......
...@@ -46,9 +46,6 @@ public abstract class AtlasHook { ...@@ -46,9 +46,6 @@ public abstract class AtlasHook {
private static final Logger LOG = LoggerFactory.getLogger(AtlasHook.class); private static final Logger LOG = LoggerFactory.getLogger(AtlasHook.class);
/**
* Hadoop Cluster name for this instance, typically used for namespace.
*/
protected static Configuration atlasProperties; protected static Configuration atlasProperties;
protected static NotificationInterface notifInterface; protected static NotificationInterface notifInterface;
......
...@@ -19,15 +19,13 @@ package org.apache.atlas.notification; ...@@ -19,15 +19,13 @@ package org.apache.atlas.notification;
import com.google.inject.AbstractModule; import com.google.inject.AbstractModule;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import com.google.inject.multibindings.Multibinder;
import org.apache.atlas.kafka.KafkaNotification; import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.kafka.KafkaNotificationProvider; import org.apache.atlas.kafka.KafkaNotificationProvider;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.notification.entity.NotificationEntityChangeListener;
import org.apache.atlas.service.Service;
/** /**
* Notification module for Guice. * Notification module for Guice.
*
* NOTE: This module is loaded by hook clients like hive hook etc. Don't add any server specific bindings here.
*/ */
public class NotificationModule extends AbstractModule { public class NotificationModule extends AbstractModule {
...@@ -35,14 +33,5 @@ public class NotificationModule extends AbstractModule { ...@@ -35,14 +33,5 @@ public class NotificationModule extends AbstractModule {
protected void configure() { protected void configure() {
bind(NotificationInterface.class).to(KafkaNotification.class).in(Singleton.class); bind(NotificationInterface.class).to(KafkaNotification.class).in(Singleton.class);
bind(KafkaNotification.class).toProvider(KafkaNotificationProvider.class).in(Singleton.class); bind(KafkaNotification.class).toProvider(KafkaNotificationProvider.class).in(Singleton.class);
Multibinder<Service> serviceBinder = Multibinder.newSetBinder(binder(), Service.class);
serviceBinder.addBinding().to(KafkaNotification.class);
serviceBinder.addBinding().to(NotificationHookConsumer.class);
//Add NotificationEntityChangeListener as EntityChangeListener
Multibinder<EntityChangeListener> entityChangeListenerBinder =
Multibinder.newSetBinder(binder(), EntityChangeListener.class);
entityChangeListenerBinder.addBinding().to(NotificationEntityChangeListener.class);
} }
} }
...@@ -894,6 +894,7 @@ ...@@ -894,6 +894,7 @@
<groupId>org.apache.hbase</groupId> <groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId> <artifactId>hbase-server</artifactId>
<version>${hbase.version}</version> <version>${hbase.version}</version>
<scope>test</scope>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.mortbay.jetty</groupId> <groupId>org.mortbay.jetty</groupId>
...@@ -1481,8 +1482,6 @@ ...@@ -1481,8 +1482,6 @@
<user.dir>${project.basedir}</user.dir> <user.dir>${project.basedir}</user.dir>
<atlas.data>${project.build.directory}/data</atlas.data> <atlas.data>${project.build.directory}/data</atlas.data>
<log4j.configuration>atlas-log4j.xml</log4j.configuration> <log4j.configuration>atlas-log4j.xml</log4j.configuration>
<zookeeper.client.secure>false</zookeeper.client.secure>
<zookeeper.sasl.client>false</zookeeper.sasl.client>
</systemProperties> </systemProperties>
<skipTests>${skipTests}</skipTests> <skipTests>${skipTests}</skipTests>
<forkMode>always</forkMode> <forkMode>always</forkMode>
......
...@@ -14,6 +14,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ...@@ -14,6 +14,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES: ALL CHANGES:
ATLAS-540 API to retrieve entity version events (shwethags)
ATLAS-529 support drop database (sumasai) ATLAS-529 support drop database (sumasai)
ATLAS-528 Support drop table,view (sumasai) ATLAS-528 Support drop table,view (sumasai)
ATLAS-603 Document High Availability of Atlas (yhemanth via sumasai) ATLAS-603 Document High Availability of Atlas (yhemanth via sumasai)
......
...@@ -141,6 +141,11 @@ ...@@ -141,6 +141,11 @@
<dependency> <dependency>
<groupId>org.apache.hbase</groupId> <groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId> <artifactId>hbase-server</artifactId>
<classifier>tests</classifier> <classifier>tests</classifier>
<scope>test</scope> <scope>test</scope>
......
...@@ -34,13 +34,14 @@ import org.apache.atlas.listener.TypesChangeListener; ...@@ -34,13 +34,14 @@ import org.apache.atlas.listener.TypesChangeListener;
import org.apache.atlas.repository.MetadataRepository; import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.audit.EntityAuditListener; import org.apache.atlas.repository.audit.EntityAuditListener;
import org.apache.atlas.repository.audit.EntityAuditRepository; import org.apache.atlas.repository.audit.EntityAuditRepository;
import org.apache.atlas.repository.audit.InMemoryEntityAuditRepository; import org.apache.atlas.repository.audit.HBaseBasedAuditRepository;
import org.apache.atlas.repository.graph.GraphBackedMetadataRepository; import org.apache.atlas.repository.graph.GraphBackedMetadataRepository;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graph.GraphProvider; import org.apache.atlas.repository.graph.GraphProvider;
import org.apache.atlas.repository.graph.TitanGraphProvider; import org.apache.atlas.repository.graph.TitanGraphProvider;
import org.apache.atlas.repository.typestore.GraphBackedTypeStore; import org.apache.atlas.repository.typestore.GraphBackedTypeStore;
import org.apache.atlas.repository.typestore.ITypeStore; import org.apache.atlas.repository.typestore.ITypeStore;
import org.apache.atlas.service.Service;
import org.apache.atlas.services.DefaultMetadataService; import org.apache.atlas.services.DefaultMetadataService;
import org.apache.atlas.services.IBootstrapTypesRegistrar; import org.apache.atlas.services.IBootstrapTypesRegistrar;
import org.apache.atlas.services.MetadataService; import org.apache.atlas.services.MetadataService;
...@@ -95,15 +96,11 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule { ...@@ -95,15 +96,11 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
} }
protected void bindAuditRepository(Binder binder) { protected void bindAuditRepository(Binder binder) {
/** Enable this after ATLAS-498 is committed
//Map EntityAuditRepository interface to hbase based implementation //Map EntityAuditRepository interface to hbase based implementation
binder.bind(EntityAuditRepository.class).to(HBaseBasedAuditRepository.class).asEagerSingleton(); binder.bind(EntityAuditRepository.class).to(HBaseBasedAuditRepository.class).asEagerSingleton();
//Add HBaseBasedAuditRepository to service so that connection is closed at shutdown //Add HBaseBasedAuditRepository to service so that connection is closed at shutdown
Multibinder<Service> serviceBinder = Multibinder.newSetBinder(binder(), Service.class); Multibinder<Service> serviceBinder = Multibinder.newSetBinder(binder, Service.class);
serviceBinder.addBinding().to(HBaseBasedAuditRepository.class); serviceBinder.addBinding().to(HBaseBasedAuditRepository.class);
**/
//Map EntityAuditRepository interface to hbase based implementation
binder.bind(EntityAuditRepository.class).to(InMemoryEntityAuditRepository.class).asEagerSingleton();
} }
} }
...@@ -18,14 +18,15 @@ ...@@ -18,14 +18,15 @@
package org.apache.atlas.repository.audit; package org.apache.atlas.repository.audit;
import com.google.inject.Inject;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.RequestContext; import org.apache.atlas.RequestContext;
import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.ITypedReferenceableInstance; import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.atlas.typesystem.json.InstanceSerialization;
import javax.inject.Inject;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
...@@ -43,51 +44,55 @@ public class EntityAuditListener implements EntityChangeListener { ...@@ -43,51 +44,55 @@ public class EntityAuditListener implements EntityChangeListener {
@Override @Override
public void onEntitiesAdded(Collection<ITypedReferenceableInstance> entities) throws AtlasException { public void onEntitiesAdded(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
List<EntityAuditRepository.EntityAuditEvent> events = new ArrayList<>(); List<EntityAuditEvent> events = new ArrayList<>();
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
for (ITypedReferenceableInstance entity : entities) { for (ITypedReferenceableInstance entity : entities) {
EntityAuditRepository.EntityAuditEvent event = createEvent(entity, currentTime, EntityAuditEvent event = createEvent(entity, currentTime, EntityAuditEvent.EntityAuditAction.ENTITY_CREATE,
EntityAuditRepository.EntityAuditAction.ENTITY_CREATE,
"Created: " + InstanceSerialization.toJson(entity, true)); "Created: " + InstanceSerialization.toJson(entity, true));
events.add(event); events.add(event);
} }
auditRepository.putEvents(events); auditRepository.putEvents(events);
} }
private EntityAuditRepository.EntityAuditEvent createEvent(ITypedReferenceableInstance entity, long ts, private EntityAuditEvent createEvent(ITypedReferenceableInstance entity, long ts,
EntityAuditRepository.EntityAuditAction action, EntityAuditEvent.EntityAuditAction action, String details) {
String details) { return new EntityAuditEvent(entity.getId()._getId(), ts, RequestContext.get().getUser(), action, details);
return new EntityAuditRepository.EntityAuditEvent(entity.getId()._getId(), ts, RequestContext.get().getUser(),
action, details);
} }
@Override @Override
public void onEntitiesUpdated(Collection<ITypedReferenceableInstance> entities) throws AtlasException { public void onEntitiesUpdated(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
List<EntityAuditEvent> events = new ArrayList<>();
long currentTime = System.currentTimeMillis();
for (ITypedReferenceableInstance entity : entities) {
EntityAuditEvent event = createEvent(entity, currentTime, EntityAuditEvent.EntityAuditAction.ENTITY_UPDATE,
"Updated: " + InstanceSerialization.toJson(entity, true));
events.add(event);
}
auditRepository.putEvents(events);
} }
@Override @Override
public void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException { public void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException {
EntityAuditRepository.EntityAuditEvent event = createEvent(entity, System.currentTimeMillis(), EntityAuditEvent event = createEvent(entity, System.currentTimeMillis(),
EntityAuditRepository.EntityAuditAction.TAG_ADD, EntityAuditEvent.EntityAuditAction.TAG_ADD,
"Added trait: " + InstanceSerialization.toJson(trait, true)); "Added trait: " + InstanceSerialization.toJson(trait, true));
auditRepository.putEvents(event); auditRepository.putEvents(event);
} }
@Override @Override
public void onTraitDeleted(ITypedReferenceableInstance entity, String traitName) throws AtlasException { public void onTraitDeleted(ITypedReferenceableInstance entity, String traitName) throws AtlasException {
EntityAuditRepository.EntityAuditEvent event = createEvent(entity, System.currentTimeMillis(), EntityAuditEvent event = createEvent(entity, System.currentTimeMillis(),
EntityAuditRepository.EntityAuditAction.TAG_DELETE, "Deleted trait: " + traitName); EntityAuditEvent.EntityAuditAction.TAG_DELETE, "Deleted trait: " + traitName);
auditRepository.putEvents(event); auditRepository.putEvents(event);
} }
@Override @Override
public void onEntitiesDeleted(Collection<ITypedReferenceableInstance> entities) throws AtlasException { public void onEntitiesDeleted(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
List<EntityAuditRepository.EntityAuditEvent> events = new ArrayList<>(); List<EntityAuditEvent> events = new ArrayList<>();
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
for (ITypedReferenceableInstance entity : entities) { for (ITypedReferenceableInstance entity : entities) {
EntityAuditRepository.EntityAuditEvent event = createEvent(entity, currentTime, EntityAuditEvent event = createEvent(entity, currentTime,
EntityAuditRepository.EntityAuditAction.ENTITY_DELETE, "Deleted entity"); EntityAuditEvent.EntityAuditAction.ENTITY_DELETE, "Deleted entity");
events.add(event); events.add(event);
} }
auditRepository.putEvents(events); auditRepository.putEvents(events);
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
package org.apache.atlas.repository.audit; package org.apache.atlas.repository.audit;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.commons.lang.StringUtils; import org.apache.atlas.EntityAuditEvent;
import java.util.List; import java.util.List;
...@@ -27,82 +27,6 @@ import java.util.List; ...@@ -27,82 +27,6 @@ import java.util.List;
* Interface for repository for storing entity audit events * Interface for repository for storing entity audit events
*/ */
public interface EntityAuditRepository { public interface EntityAuditRepository {
enum EntityAuditAction {
ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, TAG_ADD, TAG_DELETE;
}
/**
* Structure of entity audit event
*/
class EntityAuditEvent {
String entityId;
Long timestamp;
String user;
EntityAuditAction action;
String details;
public EntityAuditEvent() {
}
public EntityAuditEvent(String entityId, Long ts, String user, EntityAuditAction action, String details) {
this.entityId = entityId;
this.timestamp = ts;
this.user = user;
this.action = action;
this.details = details;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof EntityAuditEvent)) {
return false;
}
EntityAuditEvent otherEvent = (EntityAuditEvent) other;
return StringUtils.equals(entityId, otherEvent.entityId) &&
(timestamp.longValue() == otherEvent.timestamp.longValue()) &&
StringUtils.equals(user, otherEvent.user) && (action == otherEvent.action) &&
StringUtils.equals(details, otherEvent.details);
}
@Override
public int hashCode() {
return toString().hashCode();
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("EntityId=").append(entityId).append(";Timestamp=").append(timestamp).append(";User=")
.append(user).append(";Action=").append(action).append(";Details=").append(details);
return builder.toString();
}
public String getEntityId() {
return entityId;
}
public Long getTimestamp() {
return timestamp;
}
public String getUser() {
return user;
}
public EntityAuditAction getAction() {
return action;
}
public String getDetails() {
return details;
}
}
/** /**
* Add events to the event repository * Add events to the event repository
* @param events events to be added * @param events events to be added
...@@ -120,10 +44,10 @@ public interface EntityAuditRepository { ...@@ -120,10 +44,10 @@ public interface EntityAuditRepository {
/** /**
* List events for the given entity id in decreasing order of timestamp, from the given timestamp. Returns n results * List events for the given entity id in decreasing order of timestamp, from the given timestamp. Returns n results
* @param entityId entity id * @param entityId entity id
* @param ts starting timestamp for events * @param startKey key for the first event to be returned, used for pagination
* @param n number of events to be returned * @param n number of events to be returned
* @return list of events * @return list of events
* @throws AtlasException * @throws AtlasException
*/ */
List<EntityAuditRepository.EntityAuditEvent> listEvents(String entityId, Long ts, short n) throws AtlasException; List<EntityAuditEvent> listEvents(String entityId, String startKey, short n) throws AtlasException;
} }
...@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; ...@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.service.Service; import org.apache.atlas.service.Service;
...@@ -59,7 +60,7 @@ import java.util.List; ...@@ -59,7 +60,7 @@ import java.util.List;
* Columns -> action, user, detail * Columns -> action, user, detail
* versions -> 1 * versions -> 1
* *
* Note: The timestamp in the key is assumed to be timestamp in nano seconds. Since the key is entity id + timestamp, * Note: The timestamp in the key is assumed to be timestamp in milli seconds. Since the key is entity id + timestamp,
* and only 1 version is kept, there can be just 1 audit event per entity id + timestamp. This is ok for one atlas server. * and only 1 version is kept, there can be just 1 audit event per entity id + timestamp. This is ok for one atlas server.
* But if there are more than one atlas servers, we should use server id in the key * But if there are more than one atlas servers, we should use server id in the key
*/ */
...@@ -87,7 +88,7 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository ...@@ -87,7 +88,7 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
* @throws AtlasException * @throws AtlasException
*/ */
@Override @Override
public void putEvents(EntityAuditRepository.EntityAuditEvent... events) throws AtlasException { public void putEvents(EntityAuditEvent... events) throws AtlasException {
putEvents(Arrays.asList(events)); putEvents(Arrays.asList(events));
} }
...@@ -103,14 +104,12 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository ...@@ -103,14 +104,12 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
try { try {
table = connection.getTable(tableName); table = connection.getTable(tableName);
List<Put> puts = new ArrayList<>(events.size()); List<Put> puts = new ArrayList<>(events.size());
for (EntityAuditRepository.EntityAuditEvent event : events) { for (EntityAuditEvent event : events) {
LOG.debug("Adding entity audit event {}", event); LOG.debug("Adding entity audit event {}", event);
Put put = new Put(getKey(event.entityId, event.timestamp)); Put put = new Put(getKey(event.getEntityId(), event.getTimestamp()));
if (event.action != null) { addColumn(put, COLUMN_ACTION, event.getAction());
put.addColumn(COLUMN_FAMILY, COLUMN_ACTION, Bytes.toBytes((short)event.action.ordinal())); addColumn(put, COLUMN_USER, event.getUser());
} addColumn(put, COLUMN_DETAIL, event.getDetails());
addColumn(put, COLUMN_USER, event.user);
addColumn(put, COLUMN_DETAIL, event.details);
puts.add(put); puts.add(put);
} }
table.put(puts); table.put(puts);
...@@ -121,9 +120,9 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository ...@@ -121,9 +120,9 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
} }
} }
private void addColumn(Put put, byte[] columnName, String columnValue) { private <T> void addColumn(Put put, byte[] columnName, T columnValue) {
if (StringUtils.isNotEmpty(columnValue)) { if (columnValue != null && !columnValue.toString().isEmpty()) {
put.addColumn(COLUMN_FAMILY, columnName, Bytes.toBytes(columnValue)); put.addColumn(COLUMN_FAMILY, columnName, Bytes.toBytes(columnValue.toString()));
} }
} }
...@@ -135,41 +134,58 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository ...@@ -135,41 +134,58 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
} }
/** /**
* List events for the given entity id in decreasing order of timestamp, from the given timestamp. Returns n results * List events for the given entity id in decreasing order of timestamp, from the given startKey. Returns n results
* @param entityId entity id * @param entityId entity id
* @param ts starting timestamp for events * @param startKey key for the first event to be returned, used for pagination
* @param n number of events to be returned * @param n number of events to be returned
* @return list of events * @return list of events
* @throws AtlasException * @throws AtlasException
*/ */
public List<EntityAuditRepository.EntityAuditEvent> listEvents(String entityId, Long ts, short n) public List<EntityAuditEvent> listEvents(String entityId, String startKey, short n)
throws AtlasException { throws AtlasException {
LOG.info("Listing events for entity id {}, starting timestamp {}, #records {}", entityId, ts, n); LOG.info("Listing events for entity id {}, starting timestamp {}, #records {}", entityId, startKey, n);
Table table = null; Table table = null;
ResultScanner scanner = null; ResultScanner scanner = null;
try { try {
table = connection.getTable(tableName); table = connection.getTable(tableName);
/**
* Scan Details:
* In hbase, the events are stored in increasing order of timestamp. So, doing reverse scan to get the latest event first
* Page filter is set to limit the number of results returned.
* Stop row is set to the entity id to avoid going past the current entity while scanning
* small is set to true to optimise RPC calls as the scanner is created per request
*/
Scan scan = new Scan().setReversed(true).setFilter(new PageFilter(n)) Scan scan = new Scan().setReversed(true).setFilter(new PageFilter(n))
.setStartRow(getKey(entityId, ts))
.setStopRow(Bytes.toBytes(entityId)) .setStopRow(Bytes.toBytes(entityId))
.setCaching(n) .setCaching(n)
.setSmall(true); .setSmall(true);
if (StringUtils.isEmpty(startKey)) {
//Set start row to entity id + max long value
byte[] entityBytes = getKey(entityId, Long.MAX_VALUE);
scan = scan.setStartRow(entityBytes);
} else {
scan = scan.setStartRow(Bytes.toBytes(startKey));
}
scanner = table.getScanner(scan); scanner = table.getScanner(scan);
Result result; Result result;
List<EntityAuditRepository.EntityAuditEvent> events = new ArrayList<>(); List<EntityAuditEvent> events = new ArrayList<>();
//PageFilter doesn't ensure n results are returned. The filter is per region server. //PageFilter doesn't ensure n results are returned. The filter is per region server.
//So, adding extra check on n here //So, adding extra check on n here
while ((result = scanner.next()) != null && events.size() < n) { while ((result = scanner.next()) != null && events.size() < n) {
String key = Bytes.toString(result.getRow()); EntityAuditEvent event = fromKey(result.getRow());
EntityAuditRepository.EntityAuditEvent event = fromKey(key);
event.user = getResultString(result, COLUMN_USER); //In case the user sets random start key, guarding against random events
event.action = if (!event.getEntityId().equals(entityId)) {
EntityAuditAction.values()[(Bytes.toShort(result.getValue(COLUMN_FAMILY, COLUMN_ACTION)))]; continue;
event.details = getResultString(result, COLUMN_DETAIL); }
event.setUser(getResultString(result, COLUMN_USER));
event.setAction(EntityAuditEvent.EntityAuditAction.valueOf(getResultString(result, COLUMN_ACTION)));
event.setDetails(getResultString(result, COLUMN_DETAIL));
events.add(event); events.add(event);
} }
LOG.info("Got events for entity id {}, starting timestamp {}, #records {}", entityId, ts, events.size()); LOG.info("Got events for entity id {}, starting timestamp {}, #records {}", entityId, startKey, events.size());
return events; return events;
} catch (IOException e) { } catch (IOException e) {
throw new AtlasException(e); throw new AtlasException(e);
...@@ -183,12 +199,14 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository ...@@ -183,12 +199,14 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
return Bytes.toString(result.getValue(COLUMN_FAMILY, columnName)); return Bytes.toString(result.getValue(COLUMN_FAMILY, columnName));
} }
private EntityAuditEvent fromKey(String key) { private EntityAuditEvent fromKey(byte[] keyBytes) {
String key = Bytes.toString(keyBytes);
EntityAuditEvent event = new EntityAuditEvent(); EntityAuditEvent event = new EntityAuditEvent();
if (StringUtils.isNotEmpty(key)) { if (StringUtils.isNotEmpty(key)) {
String[] parts = key.split(FIELD_SEPARATOR); String[] parts = key.split(FIELD_SEPARATOR);
event.entityId = parts[0]; event.setEntityId(parts[0]);
event.timestamp = Long.valueOf(parts[1]); event.setTimestamp(Long.valueOf(parts[1]));
event.setEventKey(key);
} }
return event; return event;
} }
...@@ -222,8 +240,9 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository ...@@ -222,8 +240,9 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
} }
private void createTableIfNotExists() throws AtlasException { private void createTableIfNotExists() throws AtlasException {
Admin admin = null;
try { try {
Admin admin = connection.getAdmin(); admin = connection.getAdmin();
LOG.info("Checking if table {} exists", tableName.getNameAsString()); LOG.info("Checking if table {} exists", tableName.getNameAsString());
if (!admin.tableExists(tableName)) { if (!admin.tableExists(tableName)) {
LOG.info("Creating table {}", tableName.getNameAsString()); LOG.info("Creating table {}", tableName.getNameAsString());
...@@ -237,6 +256,8 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository ...@@ -237,6 +256,8 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
} }
} catch (IOException e) { } catch (IOException e) {
throw new AtlasException(e); throw new AtlasException(e);
} finally {
close(admin);
} }
} }
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package org.apache.atlas.repository.audit; package org.apache.atlas.repository.audit;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
...@@ -40,17 +41,23 @@ public class InMemoryEntityAuditRepository implements EntityAuditRepository { ...@@ -40,17 +41,23 @@ public class InMemoryEntityAuditRepository implements EntityAuditRepository {
@Override @Override
public synchronized void putEvents(List<EntityAuditEvent> events) throws AtlasException { public synchronized void putEvents(List<EntityAuditEvent> events) throws AtlasException {
for (EntityAuditEvent event : events) { for (EntityAuditEvent event : events) {
auditEvents.put(event.entityId + (Long.MAX_VALUE - event.timestamp), event); String rowKey = event.getEntityId() + (Long.MAX_VALUE - event.getTimestamp());
event.setEventKey(rowKey);
auditEvents.put(rowKey, event);
} }
} }
@Override @Override
public List<EntityAuditEvent> listEvents(String entityId, Long ts, short maxResults) public List<EntityAuditEvent> listEvents(String entityId, String startKey, short maxResults)
throws AtlasException { throws AtlasException {
List<EntityAuditEvent> events = new ArrayList<>(); List<EntityAuditEvent> events = new ArrayList<>();
SortedMap<String, EntityAuditEvent> subMap = auditEvents.tailMap(entityId + (Long.MAX_VALUE - ts)); String myStartKey = startKey;
if (myStartKey == null) {
myStartKey = entityId;
}
SortedMap<String, EntityAuditEvent> subMap = auditEvents.tailMap(myStartKey);
for (EntityAuditEvent event : subMap.values()) { for (EntityAuditEvent event : subMap.values()) {
if (events.size() < maxResults && event.entityId.equals(entityId)) { if (events.size() < maxResults && event.getEntityId().equals(entityId)) {
events.add(event); events.add(event);
} }
} }
......
...@@ -22,10 +22,10 @@ import com.google.common.base.Preconditions; ...@@ -22,10 +22,10 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.inject.Provider; import com.google.inject.Provider;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.classification.InterfaceAudience; import org.apache.atlas.classification.InterfaceAudience;
import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.listener.ActiveStateChangeHandler;
...@@ -33,6 +33,7 @@ import org.apache.atlas.listener.EntityChangeListener; ...@@ -33,6 +33,7 @@ import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.listener.TypesChangeListener; import org.apache.atlas.listener.TypesChangeListener;
import org.apache.atlas.repository.MetadataRepository; import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.RepositoryException; import org.apache.atlas.repository.RepositoryException;
import org.apache.atlas.repository.audit.EntityAuditRepository;
import org.apache.atlas.repository.typestore.ITypeStore; import org.apache.atlas.repository.typestore.ITypeStore;
import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.ITypedReferenceableInstance; import org.apache.atlas.typesystem.ITypedReferenceableInstance;
...@@ -85,6 +86,9 @@ import java.util.Map; ...@@ -85,6 +86,9 @@ import java.util.Map;
public class DefaultMetadataService implements MetadataService, ActiveStateChangeHandler { public class DefaultMetadataService implements MetadataService, ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(DefaultMetadataService.class); private static final Logger LOG = LoggerFactory.getLogger(DefaultMetadataService.class);
private final short maxAuditResults;
private static final String CONFIG_MAX_AUDIT_RESULTS = "atlas.audit.maxResults";
private static final short DEFAULT_MAX_AUDIT_RESULTS = 1000;
private final TypeSystem typeSystem; private final TypeSystem typeSystem;
private final MetadataRepository repository; private final MetadataRepository repository;
...@@ -97,6 +101,9 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang ...@@ -97,6 +101,9 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
private boolean wasInitialized = false; private boolean wasInitialized = false;
@Inject @Inject
private EntityAuditRepository auditRepository;
@Inject
DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore, DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore,
final IBootstrapTypesRegistrar typesRegistrar, final IBootstrapTypesRegistrar typesRegistrar,
final Collection<Provider<TypesChangeListener>> typeListenerProviders, final Collection<Provider<TypesChangeListener>> typeListenerProviders,
...@@ -128,6 +135,8 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang ...@@ -128,6 +135,8 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
if (!HAConfiguration.isHAEnabled(configuration)) { if (!HAConfiguration.isHAEnabled(configuration)) {
restoreTypeSystem(); restoreTypeSystem();
} }
maxAuditResults = configuration.getShort(CONFIG_MAX_AUDIT_RESULTS, DEFAULT_MAX_AUDIT_RESULTS);
} }
private void restoreTypeSystem() throws AtlasException { private void restoreTypeSystem() throws AtlasException {
...@@ -211,7 +220,7 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang ...@@ -211,7 +220,7 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
} }
private JSONObject createOrUpdateTypes(String typeDefinition, boolean isUpdate) throws AtlasException { private JSONObject createOrUpdateTypes(String typeDefinition, boolean isUpdate) throws AtlasException {
ParamChecker.notEmpty(typeDefinition, "type definition cannot be empty"); ParamChecker.notEmpty(typeDefinition, "type definition");
TypesDef typesDef = validateTypeDefinition(typeDefinition); TypesDef typesDef = validateTypeDefinition(typeDefinition);
try { try {
...@@ -299,7 +308,7 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang ...@@ -299,7 +308,7 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
*/ */
@Override @Override
public String createEntities(String entityInstanceDefinition) throws AtlasException { public String createEntities(String entityInstanceDefinition) throws AtlasException {
ParamChecker.notEmpty(entityInstanceDefinition, "Entity instance definition cannot be empty"); ParamChecker.notEmpty(entityInstanceDefinition, "Entity instance definition");
ITypedReferenceableInstance[] typedInstances = deserializeClassInstances(entityInstanceDefinition); ITypedReferenceableInstance[] typedInstances = deserializeClassInstances(entityInstanceDefinition);
...@@ -348,7 +357,7 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang ...@@ -348,7 +357,7 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
*/ */
@Override @Override
public String getEntityDefinition(String guid) throws AtlasException { public String getEntityDefinition(String guid) throws AtlasException {
ParamChecker.notEmpty(guid, "guid cannot be null"); ParamChecker.notEmpty(guid, "entity id");
final ITypedReferenceableInstance instance = repository.getEntityDefinition(guid); final ITypedReferenceableInstance instance = repository.getEntityDefinition(guid);
return InstanceSerialization.toJson(instance, true); return InstanceSerialization.toJson(instance, true);
...@@ -404,7 +413,7 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang ...@@ -404,7 +413,7 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
@Override @Override
public String updateEntities(String entityInstanceDefinition) throws AtlasException { public String updateEntities(String entityInstanceDefinition) throws AtlasException {
ParamChecker.notEmpty(entityInstanceDefinition, "Entity instance definition cannot be empty"); ParamChecker.notEmpty(entityInstanceDefinition, "Entity instance definition");
ITypedReferenceableInstance[] typedInstances = deserializeClassInstances(entityInstanceDefinition); ITypedReferenceableInstance[] typedInstances = deserializeClassInstances(entityInstanceDefinition);
TypeUtils.Pair<List<String>, List<String>> guids = repository.updateEntities(typedInstances); TypeUtils.Pair<List<String>, List<String>> guids = repository.updateEntities(typedInstances);
...@@ -421,9 +430,9 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang ...@@ -421,9 +430,9 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
@Override @Override
public String updateEntityAttributeByGuid(final String guid, String attributeName, String value) throws AtlasException { public String updateEntityAttributeByGuid(final String guid, String attributeName, String value) throws AtlasException {
ParamChecker.notEmpty(guid, "guid cannot be null"); ParamChecker.notEmpty(guid, "entity id");
ParamChecker.notEmpty(attributeName, "property cannot be null"); ParamChecker.notEmpty(attributeName, "attribute name");
ParamChecker.notEmpty(value, "property value cannot be null"); ParamChecker.notEmpty(value, "attribute value");
ITypedReferenceableInstance existInstance = validateEntityExists(guid); ITypedReferenceableInstance existInstance = validateEntityExists(guid);
ClassType type = typeSystem.getDataType(ClassType.class, existInstance.getTypeName()); ClassType type = typeSystem.getDataType(ClassType.class, existInstance.getTypeName());
...@@ -520,10 +529,10 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang ...@@ -520,10 +529,10 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
@Override @Override
public String updateEntityByUniqueAttribute(String typeName, String uniqueAttributeName, String attrValue, public String updateEntityByUniqueAttribute(String typeName, String uniqueAttributeName, String attrValue,
Referenceable updatedEntity) throws AtlasException { Referenceable updatedEntity) throws AtlasException {
ParamChecker.notEmpty(typeName, "typeName cannot be null"); ParamChecker.notEmpty(typeName, "typeName");
ParamChecker.notEmpty(uniqueAttributeName, "uniqueAttributeName cannot be null"); ParamChecker.notEmpty(uniqueAttributeName, "uniqueAttributeName");
ParamChecker.notNull(attrValue, "value cannot be null"); ParamChecker.notNull(attrValue, "unique attribute value");
ParamChecker.notNull(updatedEntity, "updatedEntity cannot be null"); ParamChecker.notNull(updatedEntity, "updatedEntity");
ITypedReferenceableInstance oldInstance = getEntityDefinitionReference(typeName, uniqueAttributeName, attrValue); ITypedReferenceableInstance oldInstance = getEntityDefinitionReference(typeName, uniqueAttributeName, attrValue);
...@@ -535,7 +544,7 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang ...@@ -535,7 +544,7 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
} }
private void validateTypeExists(String entityType) throws AtlasException { private void validateTypeExists(String entityType) throws AtlasException {
ParamChecker.notEmpty(entityType, "entity type cannot be null"); ParamChecker.notEmpty(entityType, "entity type");
IDataType type = typeSystem.getDataType(IDataType.class, entityType); IDataType type = typeSystem.getDataType(IDataType.class, entityType);
if (type.getTypeCategory() != DataTypes.TypeCategory.CLASS) { if (type.getTypeCategory() != DataTypes.TypeCategory.CLASS) {
...@@ -552,7 +561,7 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang ...@@ -552,7 +561,7 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
*/ */
@Override @Override
public List<String> getTraitNames(String guid) throws AtlasException { public List<String> getTraitNames(String guid) throws AtlasException {
ParamChecker.notEmpty(guid, "entity GUID cannot be null"); ParamChecker.notEmpty(guid, "entity id");
return repository.getTraitNames(guid); return repository.getTraitNames(guid);
} }
...@@ -565,8 +574,8 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang ...@@ -565,8 +574,8 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
*/ */
@Override @Override
public void addTrait(String guid, String traitInstanceDefinition) throws AtlasException { public void addTrait(String guid, String traitInstanceDefinition) throws AtlasException {
ParamChecker.notEmpty(guid, "entity GUID cannot be null"); ParamChecker.notEmpty(guid, "entity id");
ParamChecker.notEmpty(traitInstanceDefinition, "Trait instance cannot be null"); ParamChecker.notEmpty(traitInstanceDefinition, "trait instance definition");
ITypedStruct traitInstance = deserializeTraitInstance(traitInstanceDefinition); ITypedStruct traitInstance = deserializeTraitInstance(traitInstanceDefinition);
final String traitName = traitInstance.getTypeName(); final String traitName = traitInstance.getTypeName();
...@@ -594,7 +603,7 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang ...@@ -594,7 +603,7 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
try { try {
Struct traitInstance = InstanceSerialization.fromJsonStruct(traitInstanceDefinition, true); Struct traitInstance = InstanceSerialization.fromJsonStruct(traitInstanceDefinition, true);
final String entityTypeName = traitInstance.getTypeName(); final String entityTypeName = traitInstance.getTypeName();
ParamChecker.notEmpty(entityTypeName, "entity type cannot be null"); ParamChecker.notEmpty(entityTypeName, "entity type");
TraitType traitType = typeSystem.getDataType(TraitType.class, entityTypeName); TraitType traitType = typeSystem.getDataType(TraitType.class, entityTypeName);
return traitType.convert(traitInstance, Multiplicity.REQUIRED); return traitType.convert(traitInstance, Multiplicity.REQUIRED);
...@@ -614,8 +623,8 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang ...@@ -614,8 +623,8 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
*/ */
@Override @Override
public void deleteTrait(String guid, String traitNameToBeDeleted) throws AtlasException { public void deleteTrait(String guid, String traitNameToBeDeleted) throws AtlasException {
ParamChecker.notEmpty(guid, "entity GUID cannot be null"); ParamChecker.notEmpty(guid, "entity id");
ParamChecker.notEmpty(traitNameToBeDeleted, "Trait name cannot be null"); ParamChecker.notEmpty(traitNameToBeDeleted, "trait name");
// ensure trait type is already registered with the TS // ensure trait type is already registered with the TS
if (!typeSystem.isRegistered(traitNameToBeDeleted)) { if (!typeSystem.isRegistered(traitNameToBeDeleted)) {
...@@ -685,6 +694,15 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang ...@@ -685,6 +694,15 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
entityChangeListeners.remove(listener); entityChangeListeners.remove(listener);
} }
@Override
public List<EntityAuditEvent> getAuditEvents(String guid, String startKey, short count) throws AtlasException {
ParamChecker.notEmpty(guid, "entity id");
ParamChecker.notEmptyIfNotNull(startKey, "start key");
ParamChecker.lessThan(count, maxAuditResults, "count");
return auditRepository.listEvents(guid, startKey, count);
}
/* (non-Javadoc) /* (non-Javadoc)
* @see org.apache.atlas.services.MetadataService#deleteEntities(java.lang.String) * @see org.apache.atlas.services.MetadataService#deleteEntities(java.lang.String)
*/ */
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.atlas.repository.audit; package org.apache.atlas.repository.audit;
import org.apache.atlas.EntityAuditEvent;
import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.RandomStringUtils;
import org.testng.annotations.Test; import org.testng.annotations.Test;
...@@ -25,6 +26,7 @@ import java.util.ArrayList; ...@@ -25,6 +26,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
public class AuditRepositoryTestBase { public class AuditRepositoryTestBase {
protected EntityAuditRepository eventRepository; protected EntityAuditRepository eventRepository;
...@@ -35,16 +37,15 @@ public class AuditRepositoryTestBase { ...@@ -35,16 +37,15 @@ public class AuditRepositoryTestBase {
@Test @Test
public void testAddEvents() throws Exception { public void testAddEvents() throws Exception {
EntityAuditRepository.EntityAuditEvent event = EntityAuditEvent event = new EntityAuditEvent(rand(), System.currentTimeMillis(), "u1",
new EntityAuditRepository.EntityAuditEvent(rand(), System.currentTimeMillis(), "u1", EntityAuditEvent.EntityAuditAction.ENTITY_CREATE, "d1");
EntityAuditRepository.EntityAuditAction.ENTITY_CREATE, "d1");
eventRepository.putEvents(event); eventRepository.putEvents(event);
List<EntityAuditRepository.EntityAuditEvent> events = List<EntityAuditEvent> events =
eventRepository.listEvents(event.entityId, System.currentTimeMillis(), (short) 10); eventRepository.listEvents(event.getEntityId(), null, (short) 10);
assertEquals(events.size(), 1); assertEquals(events.size(), 1);
assertEquals(events.get(0), event); assertEventEquals(events.get(0), event);
} }
@Test @Test
...@@ -53,29 +54,46 @@ public class AuditRepositoryTestBase { ...@@ -53,29 +54,46 @@ public class AuditRepositoryTestBase {
String id2 = "id2" + rand(); String id2 = "id2" + rand();
String id3 = "id3" + rand(); String id3 = "id3" + rand();
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
List<EntityAuditRepository.EntityAuditEvent> expectedEvents = new ArrayList<>(3); List<EntityAuditEvent> expectedEvents = new ArrayList<>(3);
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
//Add events for both ids //Add events for both ids
EntityAuditRepository.EntityAuditEvent event = EntityAuditEvent event = new EntityAuditEvent(id2, ts - i, "user" + i,
new EntityAuditRepository.EntityAuditEvent(id2, ts - i, "user" + i, EntityAuditEvent.EntityAuditAction.ENTITY_UPDATE, "details" + i);
EntityAuditRepository.EntityAuditAction.ENTITY_UPDATE, "details" + i);
eventRepository.putEvents(event); eventRepository.putEvents(event);
expectedEvents.add(event); expectedEvents.add(event);
eventRepository.putEvents(new EntityAuditRepository.EntityAuditEvent(id1, ts - i, "user" + i, eventRepository.putEvents(new EntityAuditEvent(id1, ts - i, "user" + i,
EntityAuditRepository.EntityAuditAction.TAG_DELETE, "details" + i)); EntityAuditEvent.EntityAuditAction.TAG_DELETE, "details" + i));
eventRepository.putEvents(new EntityAuditRepository.EntityAuditEvent(id3, ts - i, "user" + i, eventRepository.putEvents(new EntityAuditEvent(id3, ts - i, "user" + i,
EntityAuditRepository.EntityAuditAction.TAG_ADD, "details" + i)); EntityAuditEvent.EntityAuditAction.TAG_ADD, "details" + i));
} }
//Use ts for which there is no event - ts + 2 //Use ts for which there is no event - ts + 2
List<EntityAuditRepository.EntityAuditEvent> events = eventRepository.listEvents(id2, ts + 2, (short) 2); List<EntityAuditEvent> events = eventRepository.listEvents(id2, null, (short) 3);
assertEquals(events.size(), 2); assertEquals(events.size(), 3);
assertEquals(events.get(0), expectedEvents.get(0)); assertEventEquals(events.get(0), expectedEvents.get(0));
assertEquals(events.get(1), expectedEvents.get(1)); assertEventEquals(events.get(1), expectedEvents.get(1));
assertEventEquals(events.get(2), expectedEvents.get(2));
//Use last event's timestamp for next list(). Should give only 1 event and shouldn't include events from other id //Use last event's timestamp for next list(). Should give only 1 event and shouldn't include events from other id
events = eventRepository.listEvents(id2, events.get(1).timestamp - 1, (short) 3); events = eventRepository.listEvents(id2, events.get(2).getEventKey(), (short) 3);
assertEquals(events.size(), 1); assertEquals(events.size(), 1);
assertEquals(events.get(0), expectedEvents.get(2)); assertEventEquals(events.get(0), expectedEvents.get(2));
}
@Test
public void testInvalidEntityId() throws Exception {
List<EntityAuditEvent> events = eventRepository.listEvents(rand(), null, (short) 3);
assertEquals(events.size(), 0);
}
private void assertEventEquals(EntityAuditEvent actual, EntityAuditEvent expected) {
if (expected != null) {
assertNotNull(actual);
}
assertEquals(actual.getEntityId(), expected.getEntityId());
assertEquals(actual.getAction(), expected.getAction());
assertEquals(actual.getTimestamp(), expected.getTimestamp());
assertEquals(actual.getDetails(), expected.getDetails());
} }
} }
...@@ -35,6 +35,7 @@ import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition; ...@@ -35,6 +35,7 @@ import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
import org.apache.atlas.typesystem.types.utils.TypesUtil; import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.apache.atlas.utils.ParamChecker; import org.apache.atlas.utils.ParamChecker;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.RepositoryMetadataModule; import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.RequestContext; import org.apache.atlas.RequestContext;
import org.apache.atlas.TestUtils; import org.apache.atlas.TestUtils;
...@@ -72,6 +73,7 @@ import java.util.List; ...@@ -72,6 +73,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull; import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail; import static org.testng.Assert.fail;
...@@ -211,7 +213,7 @@ public class DefaultMetadataServiceTest { ...@@ -211,7 +213,7 @@ public class DefaultMetadataServiceTest {
//name is the unique attribute //name is the unique attribute
Referenceable entity = createDBEntity(); Referenceable entity = createDBEntity();
String id = createInstance(entity); String id = createInstance(entity);
assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.ENTITY_CREATE); assertAuditEvents(id, EntityAuditEvent.EntityAuditAction.ENTITY_CREATE);
//using the same name should succeed, but not create another entity //using the same name should succeed, but not create another entity
String newId = createInstance(entity); String newId = createInstance(entity);
...@@ -228,28 +230,36 @@ public class DefaultMetadataServiceTest { ...@@ -228,28 +230,36 @@ public class DefaultMetadataServiceTest {
//create entity //create entity
Referenceable entity = createDBEntity(); Referenceable entity = createDBEntity();
String id = createInstance(entity); String id = createInstance(entity);
assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.ENTITY_CREATE); assertAuditEvents(id, EntityAuditEvent.EntityAuditAction.ENTITY_CREATE);
Struct tag = new Struct(TestUtils.PII); Struct tag = new Struct(TestUtils.PII);
metadataService.addTrait(id, InstanceSerialization.toJson(tag, true)); metadataService.addTrait(id, InstanceSerialization.toJson(tag, true));
assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.TAG_ADD); assertAuditEvents(id, EntityAuditEvent.EntityAuditAction.TAG_ADD);
metadataService.deleteTrait(id, TestUtils.PII); metadataService.deleteTrait(id, TestUtils.PII);
assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.TAG_DELETE); assertAuditEvents(id, EntityAuditEvent.EntityAuditAction.TAG_DELETE);
metadataService.updateEntityAttributeByGuid(id, "description", "new description");
assertAuditEvents(id, EntityAuditEvent.EntityAuditAction.ENTITY_UPDATE);
metadataService.deleteEntities(Arrays.asList(id)); metadataService.deleteEntities(Arrays.asList(id));
assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.ENTITY_DELETE); assertAuditEvents(id, EntityAuditEvent.EntityAuditAction.ENTITY_DELETE);
} }
private void assertAuditEvents(String id, EntityAuditRepository.EntityAuditAction action) throws Exception { private void assertAuditEvents(String id, EntityAuditEvent.EntityAuditAction expectedAction) throws Exception {
List<EntityAuditRepository.EntityAuditEvent> events = List<EntityAuditEvent> events = repository.listEvents(id, null, (short) 10);
repository.listEvents(id, System.currentTimeMillis(), (short) 10); for (EntityAuditEvent event : events) {
for (EntityAuditRepository.EntityAuditEvent event : events) { if (event.getAction() == expectedAction) {
if (event.getAction() == action) {
return; return;
} }
} }
fail("Didn't find " + action + " in audit events"); fail("Expected audit action " + expectedAction);
}
private void assertAuditEvents(String entityId, int numEvents) throws Exception {
List<EntityAuditEvent> events = repository.listEvents(entityId, null, (short)numEvents);
assertNotNull(events);
assertEquals(events.size(), numEvents);
} }
@Test @Test
...@@ -257,6 +267,10 @@ public class DefaultMetadataServiceTest { ...@@ -257,6 +267,10 @@ public class DefaultMetadataServiceTest {
Referenceable db = createDBEntity(); Referenceable db = createDBEntity();
String dbId = createInstance(db); String dbId = createInstance(db);
//Assert that there is just 1 audit events and thats for entity create
assertAuditEvents(dbId, 1);
assertAuditEvents(dbId, EntityAuditEvent.EntityAuditAction.ENTITY_CREATE);
Referenceable table = new Referenceable(TestUtils.TABLE_TYPE); Referenceable table = new Referenceable(TestUtils.TABLE_TYPE);
table.set(NAME, TestUtils.randomString()); table.set(NAME, TestUtils.randomString());
table.set("description", "random table"); table.set("description", "random table");
...@@ -272,6 +286,9 @@ public class DefaultMetadataServiceTest { ...@@ -272,6 +286,9 @@ public class DefaultMetadataServiceTest {
Referenceable tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true); Referenceable tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true);
Referenceable actualDb = (Referenceable) tableDefinition.get("databaseComposite"); Referenceable actualDb = (Referenceable) tableDefinition.get("databaseComposite");
Assert.assertEquals(actualDb.getId().id, dbId); Assert.assertEquals(actualDb.getId().id, dbId);
//Assert that as part table create, db is not created and audit event is not added to db
assertAuditEvents(dbId, 1);
} }
@Test @Test
...@@ -280,7 +297,8 @@ public class DefaultMetadataServiceTest { ...@@ -280,7 +297,8 @@ public class DefaultMetadataServiceTest {
Referenceable tableUpdated = new Referenceable(TestUtils.TABLE_TYPE, new HashMap<String, Object>() {{ Referenceable tableUpdated = new Referenceable(TestUtils.TABLE_TYPE, new HashMap<String, Object>() {{
put("columnNames", colNameList); put("columnNames", colNameList);
}}); }});
metadataService.updateEntityByUniqueAttribute(table.getTypeName(), NAME, (String) table.get(NAME), tableUpdated); metadataService.updateEntityByUniqueAttribute(table.getTypeName(), NAME, (String) table.get(NAME),
tableUpdated);
String tableDefinitionJson = String tableDefinitionJson =
metadataService.getEntityDefinition(TestUtils.TABLE_TYPE, NAME, (String) table.get(NAME)); metadataService.getEntityDefinition(TestUtils.TABLE_TYPE, NAME, (String) table.get(NAME));
...@@ -291,7 +309,6 @@ public class DefaultMetadataServiceTest { ...@@ -291,7 +309,6 @@ public class DefaultMetadataServiceTest {
@Test @Test
public void testUpdateEntityWithMap() throws Exception { public void testUpdateEntityWithMap() throws Exception {
final Map<String, Struct> partsMap = new HashMap<>(); final Map<String, Struct> partsMap = new HashMap<>();
partsMap.put("part0", new Struct(TestUtils.PARTITION_STRUCT_TYPE, partsMap.put("part0", new Struct(TestUtils.PARTITION_STRUCT_TYPE,
new HashMap<String, Object>() {{ new HashMap<String, Object>() {{
...@@ -599,8 +616,8 @@ public class DefaultMetadataServiceTest { ...@@ -599,8 +616,8 @@ public class DefaultMetadataServiceTest {
//ATLAS-383 Test //ATLAS-383 Test
Referenceable sdReferenceable = new Referenceable(TestUtils.STORAGE_DESC_TYPE); Referenceable sdReferenceable = new Referenceable(TestUtils.STORAGE_DESC_TYPE);
sdReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, TestUtils.randomString()); sdReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, TestUtils.randomString());
sdReferenceable.set("compressed", "false"); sdReferenceable.set("compressed", "false");
sdReferenceable.set("location", "hdfs://tmp/hive-user"); sdReferenceable.set("location", "hdfs://tmp/hive-user");
String sdGuid = createInstance(sdReferenceable); String sdGuid = createInstance(sdReferenceable);
Referenceable sdRef2 = new Referenceable(sdGuid, TestUtils.STORAGE_DESC_TYPE, null); Referenceable sdRef2 = new Referenceable(sdGuid, TestUtils.STORAGE_DESC_TYPE, null);
...@@ -631,7 +648,7 @@ public class DefaultMetadataServiceTest { ...@@ -631,7 +648,7 @@ public class DefaultMetadataServiceTest {
metadataService.getEntityDefinition(tableId._getId()); metadataService.getEntityDefinition(tableId._getId());
Referenceable tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true); Referenceable tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true);
Assert.assertEquals(dbId, (((Id)tableDefinition.get("database"))._getId())); Assert.assertEquals(dbId, (((Id) tableDefinition.get("database"))._getId()));
/* Update with referenceable - TODO - Fails . Need to fix this */ /* Update with referenceable - TODO - Fails . Need to fix this */
/*final String dbName = TestUtils.randomString(); /*final String dbName = TestUtils.randomString();
...@@ -786,7 +803,7 @@ public class DefaultMetadataServiceTest { ...@@ -786,7 +803,7 @@ public class DefaultMetadataServiceTest {
//Update optional Attribute //Update optional Attribute
Assert.assertNotNull(tableDefinition.get("created")); Assert.assertNotNull(tableDefinition.get("created"));
//Update optional attribute //Update optional attribute
table.setNull("created"); table.setNull("created");
String newtableId = updateInstance(table); String newtableId = updateInstance(table);
Assert.assertEquals(newtableId, tableId._getId()); Assert.assertEquals(newtableId, tableId._getId());
...@@ -798,7 +815,7 @@ public class DefaultMetadataServiceTest { ...@@ -798,7 +815,7 @@ public class DefaultMetadataServiceTest {
} }
@Test @Test
public void testCreateEntityWithEnum() throws Exception { public void testCreateEntityWithEnum ()throws Exception {
String tableDefinitionJson = String tableDefinitionJson =
metadataService.getEntityDefinition(TestUtils.TABLE_TYPE, NAME, (String) table.get(NAME)); metadataService.getEntityDefinition(TestUtils.TABLE_TYPE, NAME, (String) table.get(NAME));
Referenceable tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true); Referenceable tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true);
...@@ -1006,6 +1023,57 @@ public class DefaultMetadataServiceTest { ...@@ -1006,6 +1023,57 @@ public class DefaultMetadataServiceTest {
Assert.assertNotNull(typeDefinition); Assert.assertNotNull(typeDefinition);
} }
@Test
public void testAuditEventsInvalidParams() throws Exception {
//entity id can't be null
try {
metadataService.getAuditEvents(null, "key", (short) 10);
fail("expected IllegalArgumentException");
} catch(IllegalArgumentException e) {
//expected IllegalArgumentException
assertEquals(e.getMessage(), "entity id cannot be null");
}
//entity id can't be empty
try {
metadataService.getAuditEvents("", "key", (short) 10);
fail("expected IllegalArgumentException");
} catch(IllegalArgumentException e) {
//expected IllegalArgumentException
assertEquals(e.getMessage(), "entity id cannot be empty");
}
//start key can be null
metadataService.getAuditEvents("id", null, (short) 10);
//start key can't be emoty
try {
metadataService.getAuditEvents("id", "", (short) 10);
fail("expected IllegalArgumentException");
} catch(IllegalArgumentException e) {
//expected IllegalArgumentException
assertEquals(e.getMessage(), "start key cannot be empty");
}
//number of results can't be > max value
try {
metadataService.getAuditEvents("id", "key", (short) 10000);
fail("expected IllegalArgumentException");
} catch(IllegalArgumentException e) {
//expected IllegalArgumentException
assertEquals(e.getMessage(), "count should be <= 1000, current value 10000");
}
//number of results can't be <= 0
try {
metadataService.getAuditEvents("id", "key", (short) -1);
fail("expected IllegalArgumentException");
} catch(IllegalArgumentException e) {
//expected IllegalArgumentException
assertEquals(e.getMessage(), "count should be > 0, current value -1");
}
}
private static class DeleteEntitiesChangeListener implements EntityChangeListener { private static class DeleteEntitiesChangeListener implements EntityChangeListener {
private Collection<ITypedReferenceableInstance> deletedEntities_; private Collection<ITypedReferenceableInstance> deletedEntities_;
......
...@@ -52,10 +52,12 @@ ...@@ -52,10 +52,12 @@
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId> <artifactId>hadoop-common</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.mockito</groupId> <groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId> <artifactId>mockito-all</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.atlas</groupId> <groupId>org.apache.atlas</groupId>
<artifactId>atlas-client</artifactId> <artifactId>atlas-client</artifactId>
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package org.apache.atlas.services; package org.apache.atlas.services;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.types.DataTypes; import org.apache.atlas.typesystem.types.DataTypes;
...@@ -207,4 +208,13 @@ public interface MetadataService { ...@@ -207,4 +208,13 @@ public interface MetadataService {
* @throws AtlasException * @throws AtlasException
*/ */
List<String> deleteEntityByUniqueAttribute(String typeName, String uniqueAttributeName, String attrValue) throws AtlasException; List<String> deleteEntityByUniqueAttribute(String typeName, String uniqueAttributeName, String attrValue) throws AtlasException;
/**
* Returns entity audit events for entity id in the decreasing order of timestamp
* @param guid entity id
* @param startKey key for the first event, used for pagination
* @param count number of events to be returned
* @return
*/
List<EntityAuditEvent> getAuditEvents(String guid, String startKey, short count) throws AtlasException;
} }
...@@ -42,6 +42,7 @@ import org.apache.atlas.web.filters.ActiveServerFilter; ...@@ -42,6 +42,7 @@ import org.apache.atlas.web.filters.ActiveServerFilter;
import org.apache.atlas.web.filters.AtlasAuthenticationFilter; import org.apache.atlas.web.filters.AtlasAuthenticationFilter;
import org.apache.atlas.web.filters.AuditFilter; import org.apache.atlas.web.filters.AuditFilter;
import org.apache.atlas.web.service.ActiveInstanceElectorModule; import org.apache.atlas.web.service.ActiveInstanceElectorModule;
import org.apache.atlas.web.service.ServiceModule;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.ConfigurationException;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -76,7 +77,7 @@ public class GuiceServletConfig extends GuiceServletContextListener { ...@@ -76,7 +77,7 @@ public class GuiceServletConfig extends GuiceServletContextListener {
loginProcessor.login(); loginProcessor.login();
injector = Guice.createInjector(getRepositoryModule(), new ActiveInstanceElectorModule(), injector = Guice.createInjector(getRepositoryModule(), new ActiveInstanceElectorModule(),
new NotificationModule(), new JerseyServletModule() { new NotificationModule(), new ServiceModule(), new JerseyServletModule() {
private Configuration appConfiguration = null; private Configuration appConfiguration = null;
......
...@@ -21,12 +21,13 @@ package org.apache.atlas.web.resources; ...@@ -21,12 +21,13 @@ package org.apache.atlas.web.resources;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.services.MetadataService; import org.apache.atlas.services.MetadataService;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.exception.EntityExistsException; import org.apache.atlas.typesystem.exception.EntityExistsException;
import org.apache.atlas.typesystem.exception.EntityNotFoundException; import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.typesystem.exception.TypeNotFoundException;
import org.apache.atlas.typesystem.exception.TraitNotFoundException; import org.apache.atlas.typesystem.exception.TraitNotFoundException;
import org.apache.atlas.typesystem.exception.TypeNotFoundException;
import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.typesystem.types.ValueConversionException; import org.apache.atlas.typesystem.types.ValueConversionException;
import org.apache.atlas.utils.ParamChecker; import org.apache.atlas.utils.ParamChecker;
...@@ -43,6 +44,7 @@ import javax.inject.Singleton; ...@@ -43,6 +44,7 @@ import javax.inject.Singleton;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes; import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE; import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET; import javax.ws.rs.GET;
import javax.ws.rs.POST; import javax.ws.rs.POST;
import javax.ws.rs.PUT; import javax.ws.rs.PUT;
...@@ -52,11 +54,13 @@ import javax.ws.rs.Produces; ...@@ -52,11 +54,13 @@ import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam; import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException; import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context; import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder; import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo; import javax.ws.rs.core.UriInfo;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
...@@ -95,7 +99,7 @@ public class EntityResource { ...@@ -95,7 +99,7 @@ public class EntityResource {
* unique attribute for the give type. * unique attribute for the give type.
*/ */
@POST @POST
@Consumes(Servlets.JSON_MEDIA_TYPE) @Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
@Produces(Servlets.JSON_MEDIA_TYPE) @Produces(Servlets.JSON_MEDIA_TYPE)
public Response submit(@Context HttpServletRequest request) { public Response submit(@Context HttpServletRequest request) {
try { try {
...@@ -150,7 +154,7 @@ public class EntityResource { ...@@ -150,7 +154,7 @@ public class EntityResource {
* @return response payload as json * @return response payload as json
*/ */
@PUT @PUT
@Consumes(Servlets.JSON_MEDIA_TYPE) @Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
@Produces(Servlets.JSON_MEDIA_TYPE) @Produces(Servlets.JSON_MEDIA_TYPE)
public Response updateEntities(@Context HttpServletRequest request) { public Response updateEntities(@Context HttpServletRequest request) {
try { try {
...@@ -195,7 +199,7 @@ public class EntityResource { ...@@ -195,7 +199,7 @@ public class EntityResource {
*/ */
@POST @POST
@Path("qualifiedName") @Path("qualifiedName")
@Consumes(Servlets.JSON_MEDIA_TYPE) @Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
@Produces(Servlets.JSON_MEDIA_TYPE) @Produces(Servlets.JSON_MEDIA_TYPE)
public Response updateByUniqueAttribute(@QueryParam("type") String entityType, public Response updateByUniqueAttribute(@QueryParam("type") String entityType,
@QueryParam("property") String attribute, @QueryParam("property") String attribute,
...@@ -242,7 +246,7 @@ public class EntityResource { ...@@ -242,7 +246,7 @@ public class EntityResource {
*/ */
@POST @POST
@Path("{guid}") @Path("{guid}")
@Consumes(Servlets.JSON_MEDIA_TYPE) @Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
@Produces(Servlets.JSON_MEDIA_TYPE) @Produces(Servlets.JSON_MEDIA_TYPE)
public Response updateEntityByGuid(@PathParam("guid") String guid, @QueryParam("property") String attribute, public Response updateEntityByGuid(@PathParam("guid") String guid, @QueryParam("property") String attribute,
@Context HttpServletRequest request) { @Context HttpServletRequest request) {
...@@ -327,7 +331,6 @@ public class EntityResource { ...@@ -327,7 +331,6 @@ public class EntityResource {
* @return response payload as json - including guids of entities(including composite references from that entity) that were deleted * @return response payload as json - including guids of entities(including composite references from that entity) that were deleted
*/ */
@DELETE @DELETE
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE) @Produces(Servlets.JSON_MEDIA_TYPE)
public Response deleteEntities(@QueryParam("guid") List<String> guids, public Response deleteEntities(@QueryParam("guid") List<String> guids,
@QueryParam("type") String entityType, @QueryParam("type") String entityType,
...@@ -439,6 +442,7 @@ public class EntityResource { ...@@ -439,6 +442,7 @@ public class EntityResource {
} }
@GET @GET
@Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
@Produces(Servlets.JSON_MEDIA_TYPE) @Produces(Servlets.JSON_MEDIA_TYPE)
public Response getEntity(@QueryParam("type") String entityType, public Response getEntity(@QueryParam("type") String entityType,
@QueryParam("property") String attribute, @QueryParam("property") String attribute,
...@@ -537,7 +541,7 @@ public class EntityResource { ...@@ -537,7 +541,7 @@ public class EntityResource {
*/ */
@POST @POST
@Path("{guid}/traits") @Path("{guid}/traits")
@Consumes(Servlets.JSON_MEDIA_TYPE) @Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
@Produces(Servlets.JSON_MEDIA_TYPE) @Produces(Servlets.JSON_MEDIA_TYPE)
public Response addTrait(@Context HttpServletRequest request, @PathParam("guid") String guid) { public Response addTrait(@Context HttpServletRequest request, @PathParam("guid") String guid) {
try { try {
...@@ -573,7 +577,7 @@ public class EntityResource { ...@@ -573,7 +577,7 @@ public class EntityResource {
*/ */
@DELETE @DELETE
@Path("{guid}/traits/{traitName}") @Path("{guid}/traits/{traitName}")
@Consumes(Servlets.JSON_MEDIA_TYPE) @Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
@Produces(Servlets.JSON_MEDIA_TYPE) @Produces(Servlets.JSON_MEDIA_TYPE)
public Response deleteTrait(@Context HttpServletRequest request, @PathParam("guid") String guid, public Response deleteTrait(@Context HttpServletRequest request, @PathParam("guid") String guid,
@PathParam(TRAIT_NAME) String traitName) { @PathParam(TRAIT_NAME) String traitName) {
...@@ -601,4 +605,45 @@ public class EntityResource { ...@@ -601,4 +605,45 @@ public class EntityResource {
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR)); throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
} }
} }
/**
* Returns the entity audit events for a given entity id. The events are returned in the decreasing order of timestamp.
* @param guid entity id
* @param startKey used for pagination. Startkey is inclusive, the returned results contain the event with the given startkey.
* First time getAuditEvents() is called for an entity, startKey should be null,
* with count = (number of events required + 1). Next time getAuditEvents() is called for the same entity,
* startKey should be equal to the entityKey of the last event returned in the previous call.
* @param count number of events required
* @return
*/
@GET
@Path("{guid}/audit")
@Produces(Servlets.JSON_MEDIA_TYPE)
public Response getAuditEvents(@PathParam("guid") String guid, @QueryParam("startKey") String startKey,
@QueryParam("count") @DefaultValue("100") short count) {
LOG.debug("Audit events request for entity {}, start key {}, number of results required {}", guid, startKey,
count);
try {
List<EntityAuditEvent> events = metadataService.getAuditEvents(guid, startKey, count);
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
response.put(AtlasClient.EVENTS, getJSONArray(events));
return Response.ok(response).build();
} catch (AtlasException | IllegalArgumentException e) {
LOG.error("Unable to get audit events for entity {}", guid, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (Throwable e) {
LOG.error("Unable to get audit events for entity {}", guid, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
}
}
private <T> JSONArray getJSONArray(Collection<T> elements) throws JSONException {
JSONArray jsonArray = new JSONArray();
for(T element : elements) {
jsonArray.put(new JSONObject(element.toString()));
}
return jsonArray;
}
} }
...@@ -45,6 +45,7 @@ import javax.ws.rs.Produces; ...@@ -45,6 +45,7 @@ import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam; import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException; import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context; import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import java.util.List; import java.util.List;
...@@ -76,7 +77,7 @@ public class TypesResource { ...@@ -76,7 +77,7 @@ public class TypesResource {
* domain. Could represent things like Hive Database, Hive Table, etc. * domain. Could represent things like Hive Database, Hive Table, etc.
*/ */
@POST @POST
@Consumes(Servlets.JSON_MEDIA_TYPE) @Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
@Produces(Servlets.JSON_MEDIA_TYPE) @Produces(Servlets.JSON_MEDIA_TYPE)
public Response submit(@Context HttpServletRequest request) { public Response submit(@Context HttpServletRequest request) {
try { try {
...@@ -120,7 +121,7 @@ public class TypesResource { ...@@ -120,7 +121,7 @@ public class TypesResource {
* @return * @return
*/ */
@PUT @PUT
@Consumes(Servlets.JSON_MEDIA_TYPE) @Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
@Produces(Servlets.JSON_MEDIA_TYPE) @Produces(Servlets.JSON_MEDIA_TYPE)
public Response update(@Context HttpServletRequest request) { public Response update(@Context HttpServletRequest request) {
try { try {
......
...@@ -22,6 +22,7 @@ import com.google.inject.AbstractModule; ...@@ -22,6 +22,7 @@ import com.google.inject.AbstractModule;
import com.google.inject.multibindings.Multibinder; import com.google.inject.multibindings.Multibinder;
import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.notification.NotificationHookConsumer; import org.apache.atlas.notification.NotificationHookConsumer;
import org.apache.atlas.repository.audit.HBaseBasedAuditRepository;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.service.Service; import org.apache.atlas.service.Service;
import org.apache.atlas.services.DefaultMetadataService; import org.apache.atlas.services.DefaultMetadataService;
...@@ -39,8 +40,7 @@ public class ActiveInstanceElectorModule extends AbstractModule { ...@@ -39,8 +40,7 @@ public class ActiveInstanceElectorModule extends AbstractModule {
activeStateChangeHandlerBinder.addBinding().to(GraphBackedSearchIndexer.class); activeStateChangeHandlerBinder.addBinding().to(GraphBackedSearchIndexer.class);
activeStateChangeHandlerBinder.addBinding().to(DefaultMetadataService.class); activeStateChangeHandlerBinder.addBinding().to(DefaultMetadataService.class);
activeStateChangeHandlerBinder.addBinding().to(NotificationHookConsumer.class); activeStateChangeHandlerBinder.addBinding().to(NotificationHookConsumer.class);
//Enable this after ATLAS-498 is committed activeStateChangeHandlerBinder.addBinding().to(HBaseBasedAuditRepository.class);
//activeStateChangeHandlerBinder.addBinding().to(HBaseBasedAuditRepository.class);
Multibinder<Service> serviceBinder = Multibinder.newSetBinder(binder(), Service.class); Multibinder<Service> serviceBinder = Multibinder.newSetBinder(binder(), Service.class);
serviceBinder.addBinding().to(ActiveInstanceElectorService.class); serviceBinder.addBinding().to(ActiveInstanceElectorService.class);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.service;
import com.google.inject.AbstractModule;
import com.google.inject.multibindings.Multibinder;
import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.notification.NotificationHookConsumer;
import org.apache.atlas.notification.entity.NotificationEntityChangeListener;
import org.apache.atlas.service.Service;
public class ServiceModule extends AbstractModule {
@Override
protected void configure() {
Multibinder<Service> serviceBinder = Multibinder.newSetBinder(binder(), Service.class);
serviceBinder.addBinding().to(KafkaNotification.class);
serviceBinder.addBinding().to(NotificationHookConsumer.class);
//Add NotificationEntityChangeListener as EntityChangeListener
Multibinder<EntityChangeListener> entityChangeListenerBinder =
Multibinder.newSetBinder(binder(), EntityChangeListener.class);
entityChangeListenerBinder.addBinding().to(NotificationEntityChangeListener.class);
}
}
...@@ -26,6 +26,7 @@ import com.sun.jersey.api.client.WebResource; ...@@ -26,6 +26,7 @@ import com.sun.jersey.api.client.WebResource;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException; import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.notification.NotificationConsumer; import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationInterface; import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule; import org.apache.atlas.notification.NotificationModule;
...@@ -146,6 +147,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT { ...@@ -146,6 +147,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
db.set("description", randomString()); db.set("description", randomString());
final String dbid = serviceClient.createEntity(db).getString(0); final String dbid = serviceClient.createEntity(db).getString(0);
assertEntityAudit(dbid, EntityAuditEvent.EntityAuditAction.ENTITY_CREATE);
waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() { waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() {
@Override @Override
...@@ -187,6 +189,17 @@ public class EntityJerseyResourceIT extends BaseResourceIT { ...@@ -187,6 +189,17 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
assertEquals(results.length(), 1); assertEquals(results.length(), 1);
} }
private void assertEntityAudit(String dbid, EntityAuditEvent.EntityAuditAction auditAction)
throws Exception {
List<EntityAuditEvent> events = serviceClient.getEntityAuditEvents(dbid, (short) 100);
for (EntityAuditEvent event : events) {
if (event.getAction() == auditAction) {
return;
}
}
fail("Expected audit event with action = " + auditAction);
}
@Test @Test
public void testEntityDefinitionAcrossTypeUpdate() throws Exception { public void testEntityDefinitionAcrossTypeUpdate() throws Exception {
//create type //create type
...@@ -478,6 +491,8 @@ public class EntityJerseyResourceIT extends BaseResourceIT { ...@@ -478,6 +491,8 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
JSONObject response = new JSONObject(responseAsString); JSONObject response = new JSONObject(responseAsString);
Assert.assertNotNull(response.get(AtlasClient.REQUEST_ID)); Assert.assertNotNull(response.get(AtlasClient.REQUEST_ID));
Assert.assertNotNull(response.get(AtlasClient.GUID)); Assert.assertNotNull(response.get(AtlasClient.GUID));
assertEntityAudit(guid, EntityAuditEvent.EntityAuditAction.TAG_ADD);
} }
@Test(dependsOnMethods = "testAddTrait") @Test(dependsOnMethods = "testAddTrait")
...@@ -576,6 +591,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT { ...@@ -576,6 +591,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
Assert.assertNotNull(response.get(AtlasClient.REQUEST_ID)); Assert.assertNotNull(response.get(AtlasClient.REQUEST_ID));
Assert.assertNotNull(response.get("GUID")); Assert.assertNotNull(response.get("GUID"));
Assert.assertNotNull(response.get("traitName")); Assert.assertNotNull(response.get("traitName"));
assertEntityAudit(guid, EntityAuditEvent.EntityAuditAction.TAG_DELETE);
} }
@Test @Test
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment