Commit bca454e1 by Shwetha GS

ATLAS-577 Integrate entity audit with DefaultMetadataService (shwethags)

parent 985465fc
No related merge requests found
......@@ -205,13 +205,18 @@
<daemon>true</daemon>
<webApp>
<contextPath>/</contextPath>
<descriptor>../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
<descriptor>${project.basedir}/../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
<extraClasspath>${project.basedir}/../../webapp/target/test-classes/</extraClasspath>
</webApp>
<useTestScope>true</useTestScope>
<systemProperties>
<systemProperty>
<name>log4j.configuration</name>
<value>atlas-log4j.xml</value>
<value>file://${project.basedir}/../../distro/src/conf/atlas-log4j.xml</value>
</systemProperty>
<systemProperty>
<name>atlas.log.file</name>
<value>application.log</value>
</systemProperty>
<systemProperty>
<name>atlas.log.dir</name>
......
......@@ -20,22 +20,17 @@ package org.apache.atlas.falcon.hook;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.falcon.model.FalconDataModelGenerator;
import org.apache.atlas.falcon.model.FalconDataTypes;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.atlas.Util.EventUtil;
import org.apache.falcon.atlas.event.FalconEvent;
......@@ -50,8 +45,7 @@ import org.apache.falcon.entity.v0.process.Cluster;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.falcon.security.CurrentUser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -65,7 +59,7 @@ import java.util.concurrent.TimeUnit;
/**
* Falcon hook sends lineage information to the Atlas Service.
*/
public class FalconHook extends FalconEventPublisher {
public class FalconHook extends AtlasHook implements FalconEventPublisher {
private static final Logger LOG = LoggerFactory.getLogger(FalconHook.class);
public static final String CONF_PREFIX = "atlas.hook.falcon.";
......@@ -77,10 +71,6 @@ public class FalconHook extends FalconEventPublisher {
public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
public static final String ATLAS_ENDPOINT = "atlas.rest.address";
private static AtlasClient atlasClient;
// wait time determines how long we wait before we exit the jvm on
// shutdown. Pending requests after that will not be sent.
private static final int WAIT_TIME = 3;
......@@ -91,20 +81,12 @@ public class FalconHook extends FalconEventPublisher {
private static final long keepAliveTimeDefault = 10;
private static final int queueSizeDefault = 10000;
private static Configuration atlasProperties;
@Inject
private static NotificationInterface notifInterface;
public static boolean typesRegistered = false;
private static boolean sync;
private static ConfigurationStore STORE;
static {
try {
atlasProperties = ApplicationProperties.get();
// initialize the async facility to process hook calls. We don't
// want to do this inline since it adds plenty of overhead for the query.
int minThreads = atlasProperties.getInt(MIN_THREADS, minThreadsDefault);
......@@ -130,8 +112,6 @@ public class FalconHook extends FalconEventPublisher {
// shutdown client
}
});
atlasClient = new AtlasClient(atlasProperties.getString(ATLAS_ENDPOINT),
EventUtil.getUgi(), EventUtil.getUgi().getShortUserName());
STORE = ConfigurationStore.get();
} catch (Exception e) {
......@@ -166,7 +146,17 @@ public class FalconHook extends FalconEventPublisher {
private void fireAndForget(FalconEvent event) throws Exception {
LOG.info("Entered Atlas hook for Falcon hook operation {}", event.getOperation());
notifyEntity(createEntities(event));
notifyEntities(getAuthenticatedUser(), createEntities(event));
}
private String getAuthenticatedUser() {
String user = null;
try {
user = CurrentUser.getAuthenticatedUser();
} catch (IllegalArgumentException e) {
LOG.warn("Failed to get user from CurrentUser.getAuthenticatedUser");
}
return getUser(user, null);
}
private List<Referenceable> createEntities(FalconEvent event) throws Exception {
......@@ -179,36 +169,6 @@ public class FalconHook extends FalconEventPublisher {
}
/**
* Notify atlas of the entity through message. The entity can be a complex entity with reference to other entities.
* De-duping of entities is done on server side depending on the unique attribute on the
*
* @param entities entitiies to add
*/
private void notifyEntity(List<Referenceable> entities) {
int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3);
String message = entities.toString();
int numRetries = 0;
while (true) {
try {
notifInterface.send(NotificationInterface.NotificationType.HOOK,
new HookNotification.EntityCreateRequest(entities));
return;
} catch (Exception e) {
numRetries++;
if (numRetries < maxRetries) {
LOG.debug("Failed to notify atlas for entity {}. Retrying", message, e);
} else {
LOG.error("Failed to notify atlas for entity {} after {} retries. Quitting", message,
maxRetries, e);
break;
}
}
}
}
/**
+ * Creates process entity
+ *
+ * @param event process entity event
......@@ -324,32 +284,9 @@ public class FalconHook extends FalconEventPublisher {
return entities;
}
public synchronized void registerFalconDataModel() throws Exception {
if (isDataModelAlreadyRegistered()) {
LOG.info("Falcon data model is already registered!");
return;
}
HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasProperties,
UserGroupInformation.getCurrentUser().getShortUserName(), UserGroupInformation.getCurrentUser());
hiveMetaStoreBridge.registerHiveDataModel();
FalconDataModelGenerator dataModelGenerator = new FalconDataModelGenerator();
LOG.info("Registering Falcon data model");
atlasClient.createType(dataModelGenerator.getModelAsJson());
}
private boolean isDataModelAlreadyRegistered() throws Exception {
try {
atlasClient.getType(FalconDataTypes.FALCON_PROCESS_ENTITY.getName());
LOG.info("Hive data model is already registered!");
return true;
} catch(AtlasServiceException ase) {
if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
return false;
}
throw ase;
}
@Override
protected String getNumberOfRetriesPropertyKey() {
return HOOK_NUM_RETRIES;
}
}
......@@ -24,8 +24,8 @@ import org.apache.falcon.atlas.event.FalconEvent;
/**
* Falcon publisher for Atlas
*/
public abstract class FalconEventPublisher {
public static class Data {
public interface FalconEventPublisher {
class Data {
private FalconEvent event;
public Data(FalconEvent event) {
......@@ -37,5 +37,5 @@ public abstract class FalconEventPublisher {
}
}
public abstract void publish(final Data data) throws Exception;
void publish(final Data data) throws Exception;
}
......@@ -18,12 +18,16 @@
package org.apache.atlas.falcon.hook;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.falcon.model.FalconDataModelGenerator;
import org.apache.atlas.falcon.model.FalconDataTypes;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.falcon.atlas.service.AtlasService;
import org.apache.falcon.entity.store.ConfigurationStore;
......@@ -33,6 +37,8 @@ import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.security.CurrentUser;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
......@@ -54,21 +60,51 @@ public class FalconHookIT {
public static final String FEED_HDFS_RESOURCE = "/feed-hdfs.xml";
public static final String PROCESS_RESOURCE = "/process.xml";
private AtlasClient dgiCLient;
private AtlasClient atlasClient;
private static final ConfigurationStore STORE = ConfigurationStore.get();
private Configuration atlasProperties;
@BeforeClass
public void setUp() throws Exception {
dgiCLient = new AtlasClient(ApplicationProperties.get().getString("atlas.rest.address"));
atlasProperties = ApplicationProperties.get();
atlasClient = new AtlasClient(atlasProperties.getString("atlas.rest.address"));
AtlasService service = new AtlasService();
service.init();
STORE.registerListener(service);
new FalconHook().registerFalconDataModel();
registerFalconDataModel();
CurrentUser.authenticate(System.getProperty("user.name"));
}
private void registerFalconDataModel() throws Exception {
if (isDataModelAlreadyRegistered()) {
LOG.info("Falcon data model is already registered!");
return;
}
HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasProperties,
UserGroupInformation.getCurrentUser().getShortUserName(), UserGroupInformation.getCurrentUser());
hiveMetaStoreBridge.registerHiveDataModel();
FalconDataModelGenerator dataModelGenerator = new FalconDataModelGenerator();
LOG.info("Registering Falcon data model");
atlasClient.createType(dataModelGenerator.getModelAsJson());
}
private boolean isDataModelAlreadyRegistered() throws Exception {
try {
atlasClient.getType(FalconDataTypes.FALCON_PROCESS_ENTITY.getName());
LOG.info("Hive data model is already registered!");
return true;
} catch(AtlasServiceException ase) {
if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
return false;
}
throw ase;
}
}
private <T extends Entity> T loadEntity(EntityType type, String resource, String name) throws JAXBException {
Entity entity = (Entity) type.getUnmarshaller().unmarshal(this.getClass().getResourceAsStream(resource));
switch (entity.getEntityType()) {
......@@ -115,17 +151,17 @@ public class FalconHookIT {
STORE.publish(EntityType.PROCESS, process);
String pid = assertProcessIsRegistered(cluster.getName(), process.getName());
Referenceable processEntity = dgiCLient.getEntity(pid);
Referenceable processEntity = atlasClient.getEntity(pid);
assertNotNull(processEntity);
assertEquals(processEntity.get("processName"), process.getName());
Id inId = (Id) ((List)processEntity.get("inputs")).get(0);
Referenceable inEntity = dgiCLient.getEntity(inId._getId());
Referenceable inEntity = atlasClient.getEntity(inId._getId());
assertEquals(inEntity.get("name"),
HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), inDbName, inTableName));
Id outId = (Id) ((List)processEntity.get("outputs")).get(0);
Referenceable outEntity = dgiCLient.getEntity(outId._getId());
Referenceable outEntity = atlasClient.getEntity(outId._getId());
assertEquals(outEntity.get("name"),
HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), outDbName, outTableName));
}
......@@ -173,12 +209,12 @@ public class FalconHookIT {
STORE.publish(EntityType.PROCESS, process);
String pid = assertProcessIsRegistered(cluster.getName(), process.getName());
Referenceable processEntity = dgiCLient.getEntity(pid);
Referenceable processEntity = atlasClient.getEntity(pid);
assertEquals(processEntity.get("processName"), process.getName());
assertNull(processEntity.get("inputs"));
Id outId = (Id) ((List)processEntity.get("outputs")).get(0);
Referenceable outEntity = dgiCLient.getEntity(outId._getId());
Referenceable outEntity = atlasClient.getEntity(outId._getId());
assertEquals(outEntity.get("name"),
HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), outDbName, outTableName));
}
......@@ -209,13 +245,13 @@ public class FalconHookIT {
waitFor(2000000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results = dgiCLient.search(query);
JSONArray results = atlasClient.search(query);
System.out.println(results);
return results.length() == 1;
}
});
JSONArray results = dgiCLient.search(query);
JSONArray results = atlasClient.search(query);
JSONObject row = results.getJSONObject(0).getJSONObject("t");
return row.getString("id");
......
......@@ -283,13 +283,18 @@
<daemon>true</daemon>
<webApp>
<contextPath>/</contextPath>
<descriptor>../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
<descriptor>${project.basedir}/../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
<extraClasspath>${project.basedir}/../../webapp/target/test-classes/</extraClasspath>
</webApp>
<useTestScope>true</useTestScope>
<systemProperties>
<systemProperty>
<name>log4j.configuration</name>
<value>atlas-log4j.xml</value>
<value>file://${project.basedir}/../../distro/src/conf/atlas-log4j.xml</value>
</systemProperty>
<systemProperty>
<name>atlas.log.file</name>
<value>application.log</value>
</systemProperty>
<systemProperty>
<name>atlas.log.dir</name>
......
......@@ -20,14 +20,12 @@ package org.apache.atlas.hive.hook;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
......@@ -86,8 +84,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private static final long keepAliveTimeDefault = 10;
private static final int queueSizeDefault = 10000;
private static Configuration atlasProperties;
class HiveEvent {
public Set<ReadEntity> inputs;
public Set<WriteEntity> outputs;
......@@ -108,8 +104,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
static {
try {
atlasProperties = ApplicationProperties.get();
// initialize the async facility to process hook calls. We don't
// want to do this inline since it adds plenty of overhead for the query.
int minThreads = atlasProperties.getInt(MIN_THREADS, minThreadsDefault);
......@@ -166,7 +160,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
event.inputs = hookContext.getInputs();
event.outputs = hookContext.getOutputs();
event.user = hookContext.getUserName() == null ? hookContext.getUgi().getUserName() : hookContext.getUserName();
event.user = getUser(hookContext.getUserName(), hookContext.getUgi());
event.ugi = hookContext.getUgi();
event.operation = OPERATION_MAP.get(hookContext.getOperationName());
event.hookType = hookContext.getHookType();
......@@ -258,7 +252,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
for (WriteEntity writeEntity : event.outputs) {
if (writeEntity.getType() == Type.DATABASE) {
//Create/update table entity
createOrUpdateEntities(dgiBridge, writeEntity);
createOrUpdateEntities(dgiBridge, event.user, writeEntity);
}
}
}
......@@ -271,7 +265,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
//Below check should filter out partition related
if (writeEntity.getType() == Entity.Type.TABLE) {
//Create/update table entity
createOrUpdateEntities(dgiBridge, writeEntity);
createOrUpdateEntities(dgiBridge, event.user, writeEntity);
}
}
}
......@@ -292,7 +286,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
.equals(oldTable.getTableName())) {
//Create/update old table entity - create new entity and replace id
Referenceable tableEntity = createOrUpdateEntities(dgiBridge, writeEntity);
Referenceable tableEntity = createOrUpdateEntities(dgiBridge, event.user, writeEntity);
String oldQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),
oldTable.getDbName(), oldTable.getTableName());
tableEntity.set(HiveDataModelGenerator.NAME, oldQualifiedName);
......@@ -304,14 +298,15 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
Referenceable newEntity = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
newEntity.set(HiveDataModelGenerator.NAME, newQualifiedName);
newEntity.set(HiveDataModelGenerator.TABLE_NAME, newTable.getTableName().toLowerCase());
messages.add(new HookNotification.EntityPartialUpdateRequest(HiveDataTypes.HIVE_TABLE.getName(),
HiveDataModelGenerator.NAME, oldQualifiedName, newEntity));
messages.add(new HookNotification.EntityPartialUpdateRequest(event.user,
HiveDataTypes.HIVE_TABLE.getName(), HiveDataModelGenerator.NAME,
oldQualifiedName, newEntity));
}
}
}
}
private Referenceable createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, Entity entity) throws Exception {
private Referenceable createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, String user, Entity entity) throws Exception {
Database db = null;
Table table = null;
Partition partition = null;
......@@ -351,14 +346,14 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
entities.add(partitionEntity);
}
messages.add(new HookNotification.EntityUpdateRequest(entities));
messages.add(new HookNotification.EntityUpdateRequest(user, entities));
return tableEntity;
}
private void handleEventOutputs(HiveMetaStoreBridge dgiBridge, HiveEvent event, Type entityType) throws Exception {
for (WriteEntity entity : event.outputs) {
if (entity.getType() == entityType) {
createOrUpdateEntities(dgiBridge, entity);
createOrUpdateEntities(dgiBridge, event.user, entity);
}
}
}
......@@ -396,7 +391,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
List<Referenceable> source = new ArrayList<>();
for (ReadEntity readEntity : inputs) {
if (readEntity.getType() == Type.TABLE || readEntity.getType() == Type.PARTITION) {
Referenceable inTable = createOrUpdateEntities(dgiBridge, readEntity);
Referenceable inTable = createOrUpdateEntities(dgiBridge, event.user, readEntity);
source.add(inTable);
}
}
......@@ -405,7 +400,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
List<Referenceable> target = new ArrayList<>();
for (WriteEntity writeEntity : outputs) {
if (writeEntity.getType() == Type.TABLE || writeEntity.getType() == Type.PARTITION) {
Referenceable outTable = createOrUpdateEntities(dgiBridge, writeEntity);
Referenceable outTable = createOrUpdateEntities(dgiBridge, event.user, writeEntity);
target.add(outTable);
}
}
......@@ -417,7 +412,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
//TODO set
processReferenceable.set("queryGraph", "queryGraph");
messages.add(new HookNotification.EntityCreateRequest(processReferenceable));
messages.add(new HookNotification.EntityCreateRequest(event.user, processReferenceable));
}
......@@ -432,6 +427,4 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return new JSONObject();
}
}
}
......@@ -288,13 +288,18 @@
<daemon>true</daemon>
<webApp>
<contextPath>/</contextPath>
<descriptor>../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
<descriptor>${project.basedir}/../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
<extraClasspath>${project.basedir}/../../webapp/target/test-classes/</extraClasspath>
</webApp>
<useTestScope>true</useTestScope>
<systemProperties>
<systemProperty>
<name>log4j.configuration</name>
<value>atlas-log4j.xml</value>
<value>file://${project.basedir}/../../distro/src/conf/atlas-log4j.xml</value>
</systemProperty>
<systemProperty>
<name>atlas.log.file</name>
<value>application.log</value>
</systemProperty>
<systemProperty>
<name>atlas.log.dir</name>
......
......@@ -19,31 +19,24 @@
package org.apache.atlas.sqoop.hook;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.sqoop.model.SqoopDataModelGenerator;
import org.apache.atlas.sqoop.model.SqoopDataTypes;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.sqoop.SqoopJobDataPublisher;
import org.apache.sqoop.util.ImportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
......@@ -55,43 +48,16 @@ import java.util.Properties;
public class SqoopHook extends SqoopJobDataPublisher {
private static final Logger LOG = LoggerFactory.getLogger(SqoopHook.class);
private static final String DEFAULT_DGI_URL = "http://localhost:21000/";
public static final String CONF_PREFIX = "atlas.hook.sqoop.";
public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
public static final String ATLAS_CLUSTER_NAME = "atlas.cluster.name";
public static final String DEFAULT_CLUSTER_NAME = "primary";
public static final String ATLAS_REST_ADDRESS = "atlas.rest.address";
@Inject
private static NotificationInterface notifInterface;
static {
org.apache.hadoop.conf.Configuration.addDefaultResource("sqoop-site.xml");
}
synchronized void registerDataModels(AtlasClient client, Configuration atlasConf) throws Exception {
// Make sure hive model exists
HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasConf,
UserGroupInformation.getCurrentUser().getShortUserName(), UserGroupInformation.getCurrentUser());
hiveMetaStoreBridge.registerHiveDataModel();
SqoopDataModelGenerator dataModelGenerator = new SqoopDataModelGenerator();
//Register sqoop data model if its not already registered
try {
client.getType(SqoopDataTypes.SQOOP_PROCESS.getName());
LOG.info("Sqoop data model is already registered!");
} catch(AtlasServiceException ase) {
if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
//Expected in case types do not exist
LOG.info("Registering Sqoop data model");
client.createType(dataModelGenerator.getModelAsJson());
} else {
throw ase;
}
}
}
public Referenceable createHiveDatabaseInstance(String clusterName, String dbName)
throws Exception {
Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName());
......@@ -182,12 +148,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
@Override
public void publish(SqoopJobDataPublisher.Data data) throws Exception {
Injector injector = Guice.createInjector(new NotificationModule());
notifInterface = injector.getInstance(NotificationInterface.class);
Configuration atlasProperties = ApplicationProperties.get();
AtlasClient atlasClient = new AtlasClient(atlasProperties.getString(ATLAS_REST_ADDRESS, DEFAULT_DGI_URL),
UserGroupInformation.getCurrentUser(), UserGroupInformation.getCurrentUser().getShortUserName());
org.apache.hadoop.conf.Configuration sqoopConf = new org.apache.hadoop.conf.Configuration();
String clusterName = sqoopConf.get(ATLAS_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
......@@ -197,33 +158,9 @@ public class SqoopHook extends SqoopJobDataPublisher {
data.getHiveTable(), data.getHiveDB());
Referenceable procRef = createSqoopProcessInstance(dbStoreRef, hiveTableRef, data, clusterName);
notifyEntity(atlasProperties, dbStoreRef, dbRef, hiveTableRef, procRef);
}
/**
* Notify atlas of the entity through message. The entity can be a complex entity with reference to other entities.
* De-duping of entities is done on server side depending on the unique attribute on the
* @param entities - Entity references to publish.
*/
private void notifyEntity(Configuration atlasProperties, Referenceable... entities) {
int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3);
int numRetries = 0;
while (true) {
try {
notifInterface.send(NotificationInterface.NotificationType.HOOK,
new HookNotification.EntityCreateRequest(entities));
return;
} catch(Exception e) {
numRetries++;
if(numRetries < maxRetries) {
LOG.debug("Failed to notify atlas for entity {}. Retrying", entities, e);
} else {
LOG.error("Failed to notify atlas for entity {} after {} retries. Quitting", entities,
maxRetries, e);
break;
}
}
}
HookNotification.HookNotificationMessage message =
new HookNotification.EntityCreateRequest(AtlasHook.getUser(), dbStoreRef, dbRef, hiveTableRef, procRef);
AtlasHook.notifyEntities(Arrays.asList(message), maxRetries);
}
}
......@@ -18,11 +18,17 @@
package org.apache.atlas.sqoop.hook;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.sqoop.model.SqoopDataModelGenerator;
import org.apache.atlas.sqoop.model.SqoopDataTypes;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.sqoop.SqoopJobDataPublisher;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
......@@ -44,7 +50,29 @@ public class SqoopHookIT {
//Set-up sqoop session
Configuration configuration = ApplicationProperties.get();
dgiCLient = new AtlasClient(configuration.getString("atlas.rest.address"));
new SqoopHook().registerDataModels(dgiCLient, configuration);
registerDataModels(dgiCLient, configuration);
}
private void registerDataModels(AtlasClient client, Configuration atlasConf) throws Exception {
// Make sure hive model exists
HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasConf,
UserGroupInformation.getCurrentUser().getShortUserName(), UserGroupInformation.getCurrentUser());
hiveMetaStoreBridge.registerHiveDataModel();
SqoopDataModelGenerator dataModelGenerator = new SqoopDataModelGenerator();
//Register sqoop data model if its not already registered
try {
client.getType(SqoopDataTypes.SQOOP_PROCESS.getName());
LOG.info("Sqoop data model is already registered!");
} catch(AtlasServiceException ase) {
if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
//Expected in case types do not exist
LOG.info("Registering Sqoop data model");
client.createType(dataModelGenerator.getModelAsJson());
} else {
throw ase;
}
}
}
@Test
......
......@@ -318,13 +318,18 @@
<daemon>true</daemon>
<webApp>
<contextPath>/</contextPath>
<descriptor>../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
<descriptor>${project.basedir}/../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
<extraClasspath>${project.basedir}/../../webapp/target/test-classes/</extraClasspath>
</webApp>
<useTestScope>true</useTestScope>
<systemProperties>
<systemProperty>
<name>log4j.configuration</name>
<value>atlas-log4j.xml</value>
<value>file://${project.basedir}/../../distro/src/conf/atlas-log4j.xml</value>
</systemProperty>
<systemProperty>
<name>atlas.log.file</name>
<value>application.log</value>
</systemProperty>
<systemProperty>
<name>atlas.log.dir</name>
......
......@@ -24,20 +24,13 @@ import backtype.storm.generated.SpoutSpec;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.TopologyInfo;
import backtype.storm.utils.Utils;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.storm.model.StormDataModel;
import org.apache.atlas.storm.model.StormDataTypes;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
......@@ -70,15 +63,6 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
public static final String HBASE_NAMESPACE_DEFAULT = "default";
private static volatile boolean typesRegistered = false;
public StormAtlasHook() {
super();
}
StormAtlasHook(AtlasClient atlasClient) {
super(atlasClient);
}
@Override
protected String getNumberOfRetriesPropertyKey() {
return HOOK_NUM_RETRIES;
......@@ -113,7 +97,8 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
entities.add(topologyReferenceable);
LOG.debug("notifying entities, size = {}", entities.size());
notifyEntities(entities);
String user = getUser(topologyInfo.get_owner(), null);
notifyEntities(user, entities);
} catch (Exception e) {
throw new RuntimeException("Atlas hook is unable to process the topology.", e);
}
......@@ -379,38 +364,6 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
return String.format("%s.%s@%s", nameSpace, tableName, clusterName);
}
public synchronized void registerDataModel(HiveDataModelGenerator dataModelGenerator) throws AtlasException,
AtlasServiceException {
try {
atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName());
LOG.info("Hive data model is already registered! Going ahead with registration of Storm Data model");
} catch(AtlasServiceException ase) {
if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
//Expected in case types do not exist
LOG.info("Registering Hive data model");
atlasClient.createType(dataModelGenerator.getModelAsJson());
} else {
throw ase;
}
}
try {
atlasClient.getType(StormDataTypes.STORM_TOPOLOGY.getName());
} catch(AtlasServiceException ase) {
if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
LOG.info("Registering Storm/Kafka data model");
StormDataModel.main(new String[]{});
TypesDef typesDef = StormDataModel.typesDef();
String stormTypesAsJSON = TypesSerialization.toJson(typesDef);
LOG.info("stormTypesAsJSON = {}", stormTypesAsJSON);
atlasClient.createType(stormTypesAsJSON);
}
}
typesRegistered = true;
}
private String getClusterName(Map stormConf) {
String clusterName = AtlasConstants.DEFAULT_CLUSTER_NAME;
if (stormConf.containsKey(AtlasConstants.CLUSTER_NAME_KEY)) {
......@@ -418,6 +371,4 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
}
return clusterName;
}
}
......@@ -20,9 +20,13 @@ package org.apache.atlas.storm.hook;
import backtype.storm.ILocalCluster;
import backtype.storm.generated.StormTopology;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.storm.model.StormDataModel;
import org.apache.atlas.storm.model.StormDataTypes;
import org.apache.atlas.typesystem.Referenceable;
......@@ -57,9 +61,40 @@ public class StormAtlasHookIT {
Configuration configuration = ApplicationProperties.get();
atlasClient = new AtlasClient(configuration.getString("atlas.rest.address", ATLAS_URL));
new StormAtlasHook().registerDataModel(new HiveDataModelGenerator());
registerDataModel(new HiveDataModelGenerator());
}
private void registerDataModel(HiveDataModelGenerator dataModelGenerator) throws AtlasException,
AtlasServiceException {
try {
atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName());
LOG.info("Hive data model is already registered! Going ahead with registration of Storm Data model");
} catch(AtlasServiceException ase) {
if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
//Expected in case types do not exist
LOG.info("Registering Hive data model");
atlasClient.createType(dataModelGenerator.getModelAsJson());
} else {
throw ase;
}
}
try {
atlasClient.getType(StormDataTypes.STORM_TOPOLOGY.getName());
} catch(AtlasServiceException ase) {
if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
LOG.info("Registering Storm/Kafka data model");
StormDataModel.main(new String[]{});
TypesDef typesDef = StormDataModel.typesDef();
String stormTypesAsJSON = TypesSerialization.toJson(typesDef);
LOG.info("stormTypesAsJSON = {}", stormTypesAsJSON);
atlasClient.createType(stormTypesAsJSON);
}
}
}
@AfterClass
public void tearDown() throws Exception {
LOG.info("Shutting down storm local cluster");
......@@ -76,7 +111,7 @@ public class StormAtlasHookIT {
String stormTypesAsJSON = TypesSerialization.toJson(stormTypesDef);
LOG.info("stormTypesAsJSON = {}", stormTypesAsJSON);
new StormAtlasHook().registerDataModel(new HiveDataModelGenerator());
registerDataModel(new HiveDataModelGenerator());
// verify types are registered
for (StormDataTypes stormDataType : StormDataTypes.values()) {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.storm.hook;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.storm.model.StormDataTypes;
import org.testng.annotations.Test;
import static org.mockito.Matchers.contains;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@Test
public class StormAtlasHookTest {
@Test
public void testStormRegistersHiveDataModelIfNotPresent() throws AtlasException, AtlasServiceException {
AtlasClient atlasClient = mock(AtlasClient.class);
HiveDataModelGenerator dataModelGenerator = mock(HiveDataModelGenerator.class);
AtlasServiceException atlasServiceException = mock(AtlasServiceException.class);
when(atlasServiceException.getStatus()).thenReturn(ClientResponse.Status.NOT_FOUND);
when(atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName())).thenThrow(atlasServiceException);
String hiveModel = "{hive_model_as_json}";
when(dataModelGenerator.getModelAsJson()).thenReturn(hiveModel);
StormAtlasHook stormAtlasHook = new StormAtlasHook(atlasClient);
stormAtlasHook.registerDataModel(dataModelGenerator);
verify(atlasClient).createType(hiveModel);
}
@Test
public void testStormRegistersStormModelIfNotPresent() throws AtlasServiceException, AtlasException {
AtlasClient atlasClient = mock(AtlasClient.class);
HiveDataModelGenerator dataModelGenerator = mock(HiveDataModelGenerator.class);
when(atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName())).thenReturn("hive_process_definition");
AtlasServiceException atlasServiceException = mock(AtlasServiceException.class);
when(atlasServiceException.getStatus()).thenReturn(ClientResponse.Status.NOT_FOUND);
when(atlasClient.getType(StormDataTypes.STORM_TOPOLOGY.getName())).thenThrow(atlasServiceException);
StormAtlasHook stormAtlasHook = new StormAtlasHook(atlasClient);
stormAtlasHook.registerDataModel(dataModelGenerator);
verify(atlasClient).createType(contains("storm_topology"));
}
}
......@@ -35,12 +35,22 @@ public final class ApplicationProperties extends PropertiesConfiguration {
public static final String APPLICATION_PROPERTIES = "atlas-application.properties";
private static Configuration instance = null;
private static volatile Configuration instance = null;
private ApplicationProperties(URL url) throws ConfigurationException {
super(url);
}
public static void forceReload() {
if (instance != null) {
synchronized (ApplicationProperties.class) {
if (instance != null) {
instance = null;
}
}
}
}
public static Configuration get() throws AtlasException {
if (instance == null) {
synchronized (ApplicationProperties.class) {
......
......@@ -55,7 +55,7 @@
<appender-ref ref="FILE"/>
</logger>
<logger name="AUDIT">
<logger name="AUDIT" additivity="false">
<level value="info"/>
<appender-ref ref="AUDIT"/>
</logger>
......
......@@ -19,20 +19,21 @@
package org.apache.atlas.hook;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jettison.json.JSONArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
......@@ -44,25 +45,19 @@ import java.util.List;
public abstract class AtlasHook {
private static final Logger LOG = LoggerFactory.getLogger(AtlasHook.class);
private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
public static final String ATLAS_ENDPOINT = "atlas.rest.address";
protected final AtlasClient atlasClient;
/**
* Hadoop Cluster name for this instance, typically used for namespace.
*/
protected static Configuration atlasProperties;
@Inject
protected static NotificationInterface notifInterface;
static {
try {
atlasProperties = ApplicationProperties.get();
} catch (Exception e) {
LOG.info("Attempting to send msg while shutdown in progress.", e);
LOG.info("Failed to load application properties", e);
}
Injector injector = Guice.createInjector(new NotificationModule());
......@@ -71,18 +66,9 @@ public abstract class AtlasHook {
LOG.info("Created Atlas Hook");
}
public AtlasHook() {
this(new AtlasClient(atlasProperties.getString(ATLAS_ENDPOINT, DEFAULT_ATLAS_URL)));
}
public AtlasHook(AtlasClient atlasClient) {
this.atlasClient = atlasClient;
//TODO - take care of passing in - ugi, doAsUser for secure cluster
}
protected abstract String getNumberOfRetriesPropertyKey();
protected void notifyEntities(Collection<Referenceable> entities) {
protected void notifyEntities(String user, Collection<Referenceable> entities) {
JSONArray entitiesArray = new JSONArray();
for (Referenceable entity : entities) {
......@@ -92,27 +78,26 @@ public abstract class AtlasHook {
}
List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>();
hookNotificationMessages.add(new HookNotification.EntityCreateRequest(entitiesArray));
hookNotificationMessages.add(new HookNotification.EntityCreateRequest(user, entitiesArray));
notifyEntities(hookNotificationMessages);
}
/**
* Notify atlas
* of the entity through message. The entity can be a
* Notify atlas of the entity through message. The entity can be a
* complex entity with reference to other entities.
* De-duping of entities is done on server side depending on the
* unique attribute on the entities.
*
* @param entities entities
* @param messages hook notification messages
* @param maxRetries maximum number of retries while sending message to messaging system
*/
protected void notifyEntities(List<HookNotification.HookNotificationMessage> entities) {
final int maxRetries = atlasProperties.getInt(getNumberOfRetriesPropertyKey(), 3);
final String message = entities.toString();
public static void notifyEntities(List<HookNotification.HookNotificationMessage> messages, int maxRetries) {
final String message = messages.toString();
int numRetries = 0;
while (true) {
try {
notifInterface.send(NotificationInterface.NotificationType.HOOK, entities);
notifInterface.send(NotificationInterface.NotificationType.HOOK, messages);
return;
} catch(Exception e) {
numRetries++;
......@@ -125,4 +110,50 @@ public abstract class AtlasHook {
}
}
}
/**
* Notify atlas of the entity through message. The entity can be a
* complex entity with reference to other entities.
* De-duping of entities is done on server side depending on the
* unique attribute on the entities.
*
* @param messages hook notification messages
*/
protected void notifyEntities(List<HookNotification.HookNotificationMessage> messages) {
final int maxRetries = atlasProperties.getInt(getNumberOfRetriesPropertyKey(), 3);
notifyEntities(messages, maxRetries);
}
/**
* Returns the logged in user.
* @return
*/
public static String getUser() {
return getUser(null, null);
}
/**
* Returns the user. Order of preference:
* 1. Given userName
* 2. ugi.getShortUserName()
* 3. UserGroupInformation.getCurrentUser().getShortUserName()
* 4. System.getProperty("user.name")
*/
public static String getUser(String userName, UserGroupInformation ugi) {
if (StringUtils.isNotEmpty(userName)) {
return userName;
}
if (ugi != null && StringUtils.isNotEmpty(ugi.getShortUserName())) {
return ugi.getShortUserName();
}
try {
return UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException e) {
LOG.warn("Failed for UserGroupInformation.getCurrentUser()");
return System.getProperty("user.name");
}
}
}
......@@ -26,6 +26,7 @@ import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -48,14 +49,13 @@ public class NotificationHookConsumer implements Service {
@Inject
private NotificationInterface notificationInterface;
private ExecutorService executors;
private AtlasClient atlasClient;
private String atlasEndpoint;
@Override
public void start() throws AtlasException {
Configuration applicationProperties = ApplicationProperties.get();
String atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000");
atlasClient = new AtlasClient(atlasEndpoint);
atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000");
int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
List<NotificationConsumer<HookNotification.HookNotificationMessage>> consumers =
notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
......@@ -87,15 +87,8 @@ public class NotificationHookConsumer implements Service {
class HookConsumer implements Runnable {
private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer;
private final AtlasClient client;
public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
this(atlasClient, consumer);
}
public HookConsumer(AtlasClient client,
NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
this.client = client;
this.consumer = consumer;
}
......@@ -118,6 +111,9 @@ public class NotificationHookConsumer implements Service {
try {
if (hasNext()) {
HookNotification.HookNotificationMessage message = consumer.next();
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(message.getUser());
AtlasClient atlasClient = getAtlasClient(ugi);
try {
switch (message.getType()) {
case ENTITY_CREATE:
......@@ -154,9 +150,14 @@ public class NotificationHookConsumer implements Service {
}
}
protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
return new AtlasClient(atlasEndpoint, ugi, ugi.getShortUserName());
}
boolean serverAvailable(Timer timer) {
try {
while (!client.isServerReady()) {
AtlasClient atlasClient = getAtlasClient(UserGroupInformation.getCurrentUser());
while (!atlasClient.isServerReady()) {
try {
LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...",
SERVER_READY_WAIT_TIME_MS);
......
......@@ -22,6 +22,8 @@ import com.google.inject.Singleton;
import com.google.inject.multibindings.Multibinder;
import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.kafka.KafkaNotificationProvider;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.notification.entity.NotificationEntityChangeListener;
import org.apache.atlas.service.Service;
/**
......@@ -37,5 +39,10 @@ public class NotificationModule extends AbstractModule {
Multibinder<Service> serviceBinder = Multibinder.newSetBinder(binder(), Service.class);
serviceBinder.addBinding().to(KafkaNotification.class);
serviceBinder.addBinding().to(NotificationHookConsumer.class);
//Add NotificationEntityChangeListener as EntityChangeListener
Multibinder<EntityChangeListener> entityChangeListenerBinder =
Multibinder.newSetBinder(binder(), EntityChangeListener.class);
entityChangeListenerBinder.addBinding().to(NotificationEntityChangeListener.class);
}
}
......@@ -17,6 +17,7 @@
*/
package org.apache.atlas.notification.entity;
import com.google.inject.Inject;
import org.apache.atlas.AtlasException;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.notification.NotificationInterface;
......@@ -48,6 +49,7 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
* @param notificationInterface the notification framework interface
* @param typeSystem the Atlas type system
*/
@Inject
public NotificationEntityChangeListener(NotificationInterface notificationInterface, TypeSystem typeSystem) {
this.notificationInterface = notificationInterface;
this.typeSystem = typeSystem;
......
......@@ -25,6 +25,7 @@ import com.google.gson.JsonParseException;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.commons.lang.StringUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
......@@ -41,29 +42,24 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
@Override
public HookNotificationMessage deserialize(JsonElement json, Type typeOfT,
JsonDeserializationContext context) {
if (json.isJsonArray()) {
JSONArray jsonArray = context.deserialize(json, JSONArray.class);
return new EntityCreateRequest(jsonArray);
} else {
HookNotificationType type =
context.deserialize(((JsonObject) json).get("type"), HookNotificationType.class);
switch (type) {
case ENTITY_CREATE:
return context.deserialize(json, EntityCreateRequest.class);
case ENTITY_FULL_UPDATE:
return context.deserialize(json, EntityUpdateRequest.class);
case ENTITY_PARTIAL_UPDATE:
return context.deserialize(json, EntityPartialUpdateRequest.class);
case TYPE_CREATE:
case TYPE_UPDATE:
return context.deserialize(json, TypeRequest.class);
default:
throw new IllegalStateException("Unhandled type " + type);
}
HookNotificationType type =
context.deserialize(((JsonObject) json).get("type"), HookNotificationType.class);
switch (type) {
case ENTITY_CREATE:
return context.deserialize(json, EntityCreateRequest.class);
case ENTITY_FULL_UPDATE:
return context.deserialize(json, EntityUpdateRequest.class);
case ENTITY_PARTIAL_UPDATE:
return context.deserialize(json, EntityPartialUpdateRequest.class);
case TYPE_CREATE:
case TYPE_UPDATE:
return context.deserialize(json, TypeRequest.class);
default:
throw new IllegalStateException("Unhandled type " + type);
}
}
......@@ -78,18 +74,30 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
* Base type of hook message.
*/
public static class HookNotificationMessage {
public static final String UNKNOW_USER = "UNKNOWN";
protected HookNotificationType type;
protected String user;
private HookNotificationMessage() {
}
public HookNotificationMessage(HookNotificationType type) {
public HookNotificationMessage(HookNotificationType type, String user) {
this.type = type;
this.user = user;
}
public HookNotificationType getType() {
return type;
}
public String getUser() {
if (StringUtils.isEmpty(user)) {
return UNKNOW_USER;
}
return user;
}
}
/**
......@@ -101,8 +109,8 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
private TypeRequest() {
}
public TypeRequest(HookNotificationType type, TypesDef typesDef) {
super(type);
public TypeRequest(HookNotificationType type, TypesDef typesDef, String user) {
super(type, user);
this.typesDef = typesDef;
}
......@@ -120,21 +128,21 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
private EntityCreateRequest() {
}
public EntityCreateRequest(Referenceable... entities) {
this(HookNotificationType.ENTITY_CREATE, Arrays.asList(entities));
public EntityCreateRequest(String user, Referenceable... entities) {
this(HookNotificationType.ENTITY_CREATE, Arrays.asList(entities), user);
}
public EntityCreateRequest(List<Referenceable> entities) {
this(HookNotificationType.ENTITY_CREATE, entities);
public EntityCreateRequest(String user, List<Referenceable> entities) {
this(HookNotificationType.ENTITY_CREATE, entities, user);
}
protected EntityCreateRequest(HookNotificationType type, List<Referenceable> entities) {
super(type);
protected EntityCreateRequest(HookNotificationType type, List<Referenceable> entities, String user) {
super(type, user);
this.entities = entities;
}
public EntityCreateRequest(JSONArray jsonArray) {
super(HookNotificationType.ENTITY_CREATE);
public EntityCreateRequest(String user, JSONArray jsonArray) {
super(HookNotificationType.ENTITY_CREATE, user);
entities = new ArrayList<>();
for (int index = 0; index < jsonArray.length(); index++) {
try {
......@@ -154,12 +162,12 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
* Hook message for updating entities(full update).
*/
public static class EntityUpdateRequest extends EntityCreateRequest {
public EntityUpdateRequest(Referenceable... entities) {
this(Arrays.asList(entities));
public EntityUpdateRequest(String user, Referenceable... entities) {
this(user, Arrays.asList(entities));
}
public EntityUpdateRequest(List<Referenceable> entities) {
super(HookNotificationType.ENTITY_FULL_UPDATE, entities);
public EntityUpdateRequest(String user, List<Referenceable> entities) {
super(HookNotificationType.ENTITY_FULL_UPDATE, entities, user);
}
}
......@@ -175,9 +183,9 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
private EntityPartialUpdateRequest() {
}
public EntityPartialUpdateRequest(String typeName, String attribute, String attributeValue,
public EntityPartialUpdateRequest(String user, String typeName, String attribute, String attributeValue,
Referenceable entity) {
super(HookNotificationType.ENTITY_PARTIAL_UPDATE);
super(HookNotificationType.ENTITY_PARTIAL_UPDATE, user);
this.typeName = typeName;
this.attribute = attribute;
this.attributeValue = attributeValue;
......
......@@ -19,6 +19,7 @@ package org.apache.atlas.notification;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.hadoop.security.UserGroupInformation;
import org.testng.annotations.Test;
import static org.mockito.Mockito.*;
......@@ -29,10 +30,15 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumerCanProceedIfServerIsReady() throws InterruptedException, AtlasServiceException {
AtlasClient atlasClient = mock(AtlasClient.class);
final AtlasClient atlasClient = mock(AtlasClient.class);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class));
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
@Override
protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
return atlasClient;
}
};
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
when(atlasClient.isServerReady()).thenReturn(true);
......@@ -43,10 +49,15 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws AtlasServiceException, InterruptedException {
AtlasClient atlasClient = mock(AtlasClient.class);
final AtlasClient atlasClient = mock(AtlasClient.class);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class));
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
@Override
protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
return atlasClient;
}
};
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
when(atlasClient.isServerReady()).thenReturn(false, false, false, true);
......@@ -57,10 +68,15 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException, InterruptedException {
AtlasClient atlasClient = mock(AtlasClient.class);
final AtlasClient atlasClient = mock(AtlasClient.class);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class));
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
@Override
protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
return atlasClient;
}
};
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
doThrow(new InterruptedException()).when(timer).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
when(atlasClient.isServerReady()).thenReturn(false);
......@@ -70,10 +86,15 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumerProceedsWithFalseOnAtlasServiceException() throws AtlasServiceException {
AtlasClient atlasClient = mock(AtlasClient.class);
final AtlasClient atlasClient = mock(AtlasClient.class);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class));
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
@Override
protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
return atlasClient;
}
};
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
when(atlasClient.isServerReady()).thenThrow(new AtlasServiceException(AtlasClient.API.VERSION,
new Exception()));
......
......@@ -19,49 +19,74 @@ package org.apache.atlas.notification.hook;
import org.apache.atlas.notification.AbstractNotificationConsumer;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.codehaus.jettison.json.JSONArray;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
public class HookNotificationTest {
@Test
public void testMessageBackwardCompatibility() throws Exception {
JSONArray jsonArray = new JSONArray();
Referenceable entity = new Referenceable("sometype");
entity.set("name", "somename");
String entityJson = InstanceSerialization.toJson(entity, true);
jsonArray.put(entityJson);
HookNotification.HookNotificationMessage notification = AbstractNotificationConsumer.GSON.fromJson(
jsonArray.toString(), HookNotification.HookNotificationMessage.class);
assertNotNull(notification);
assertEquals(notification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE);
HookNotification.EntityCreateRequest createRequest = (HookNotification.EntityCreateRequest) notification;
assertEquals(createRequest.getEntities().size(), 1);
assertEquals(createRequest.getEntities().get(0).getTypeName(), entity.getTypeName());
}
@Test
public void testNewMessageSerDe() throws Exception {
Referenceable entity1 = new Referenceable("sometype");
entity1.set("attr", "value");
entity1.set("complex", new Referenceable("othertype"));
Referenceable entity2 = new Referenceable("newtype");
HookNotification.EntityCreateRequest request = new HookNotification.EntityCreateRequest(entity1, entity2);
String user = "user";
HookNotification.EntityCreateRequest request = new HookNotification.EntityCreateRequest(user, entity1, entity2);
String notificationJson = AbstractNotificationConsumer.GSON.toJson(request);
HookNotification.HookNotificationMessage actualNotification = AbstractNotificationConsumer.GSON.fromJson(
notificationJson, HookNotification.HookNotificationMessage.class);
assertEquals(actualNotification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE);
assertEquals(actualNotification.getUser(), user);
HookNotification.EntityCreateRequest createRequest = (HookNotification.EntityCreateRequest) actualNotification;
assertEquals(createRequest.getEntities().size(), 2);
Referenceable actualEntity1 = createRequest.getEntities().get(0);
assertEquals(actualEntity1.getTypeName(), "sometype");
assertEquals(((Referenceable)actualEntity1.get("complex")).getTypeName(), "othertype");
assertEquals(createRequest.getEntities().get(1).getTypeName(), "newtype");
}
@Test
public void testBackwardCompatibility() throws Exception {
/**
Referenceable entity = new Referenceable("sometype");
entity.set("attr", "value");
String user = "user";
HookNotification.EntityCreateRequest request = new HookNotification.EntityCreateRequest(null, entity);
String notificationJson = AbstractNotificationConsumer.GSON.toJson(request);
System.out.println(notificationJson);
**/
//Json without user and assert that the string can be deserialised
String notificationJson = "{\n"
+ " \"entities\": [\n"
+ " {\n"
+ " \"jsonClass\": \"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference\",\n"
+ " \"id\": {\n"
+ " \"jsonClass\": \"org.apache.atlas.typesystem.json.InstanceSerialization$_Id\",\n"
+ " \"id\": \"-1457685864305243000\",\n"
+ " \"version\": 0,\n"
+ " \"typeName\": \"sometype\"\n"
+ " },\n"
+ " \"typeName\": \"sometype\",\n"
+ " \"values\": {\n"
+ " \"attr\": \"value\"\n"
+ " },\n"
+ " \"traitNames\": [],\n"
+ " \"traits\": {}\n"
+ " }\n"
+ " ],\n"
+ " \"type\": \"ENTITY_CREATE\"\n"
+ "}";
HookNotification.HookNotificationMessage actualNotification = AbstractNotificationConsumer.GSON.fromJson(
notificationJson, HookNotification.HookNotificationMessage.class);
assertEquals(actualNotification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE);
assertNull(actualNotification.user);
assertEquals(actualNotification.getUser(), HookNotification.HookNotificationMessage.UNKNOW_USER);
}
}
......@@ -1474,6 +1474,8 @@
<user.dir>${project.basedir}</user.dir>
<atlas.data>${project.build.directory}/data</atlas.data>
<log4j.configuration>atlas-log4j.xml</log4j.configuration>
<zookeeper.client.secure>false</zookeeper.client.secure>
<zookeeper.sasl.client>false</zookeeper.sasl.client>
</systemProperties>
<skipTests>${skipTests}</skipTests>
<forkMode>always</forkMode>
......@@ -1483,9 +1485,6 @@
-Xmx1024m -XX:MaxPermSize=512m -Djava.net.preferIPv4Stack=true
</argLine>
<skip>${skipUTs}</skip>
<excludes>
<exclude>**/*Base*</exclude>
</excludes>
</configuration>
<dependencies>
<dependency>
......
......@@ -13,6 +13,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
ATLAS-577 Integrate entity audit with DefaultMetadataService (shwethags)
ATLAS-588 import-hive.sh fails while importing partitions for a non-partitioned table (sumasai via shwethags)
ATLAS-575 jetty-maven-plugin fails with ShutdownMonitorThread already started (shwethags)
ATLAS-408 UI : Add a close link (x) on the top right when Tag is added (darshankumar89 via shwethags)
......
......@@ -149,6 +149,7 @@
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
......
......@@ -18,6 +18,7 @@
package org.apache.atlas;
import com.google.inject.Binder;
import com.google.inject.Singleton;
import com.google.inject.matcher.Matchers;
import com.google.inject.multibindings.Multibinder;
......@@ -27,21 +28,26 @@ import org.aopalliance.intercept.MethodInterceptor;
import org.apache.atlas.discovery.DiscoveryService;
import org.apache.atlas.discovery.HiveLineageService;
import org.apache.atlas.discovery.LineageService;
import org.apache.atlas.discovery.SearchIndexer;
import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.listener.TypesChangeListener;
import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.audit.EntityAuditListener;
import org.apache.atlas.repository.audit.EntityAuditRepository;
import org.apache.atlas.repository.audit.HBaseBasedAuditRepository;
import org.apache.atlas.repository.graph.GraphBackedMetadataRepository;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graph.GraphProvider;
import org.apache.atlas.repository.graph.TitanGraphProvider;
import org.apache.atlas.repository.typestore.GraphBackedTypeStore;
import org.apache.atlas.repository.typestore.ITypeStore;
import org.apache.atlas.service.Service;
import org.apache.atlas.services.DefaultMetadataService;
import org.apache.atlas.services.IBootstrapTypesRegistrar;
import org.apache.atlas.services.MetadataService;
import org.apache.atlas.services.ReservedTypesRegistrar;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.typesystem.types.TypeSystemProvider;
/**
* Guice module for Repository module.
......@@ -51,9 +57,6 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
@Override
protected void configure() {
// special wiring for Titan Graph
ThrowingProviderBinder.create(binder()).bind(GraphProvider.class, TitanGraph.class).to(TitanGraphProvider.class)
.asEagerSingleton();
......@@ -61,7 +64,7 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
// bind the MetadataRepositoryService interface to an implementation
bind(MetadataRepository.class).to(GraphBackedMetadataRepository.class).asEagerSingleton();
bind(TypeSystem.class).in(Singleton.class);
bind(TypeSystem.class).toProvider(TypeSystemProvider.class).in(Singleton.class);
// bind the ITypeStore interface to an implementation
bind(ITypeStore.class).to(GraphBackedTypeStore.class).asEagerSingleton();
......@@ -80,9 +83,24 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
bind(LineageService.class).to(HiveLineageService.class).asEagerSingleton();
bindAuditRepository(binder());
//Add EntityAuditListener as EntityChangeListener
Multibinder<EntityChangeListener> entityChangeListenerBinder =
Multibinder.newSetBinder(binder(), EntityChangeListener.class);
entityChangeListenerBinder.addBinding().to(EntityAuditListener.class);
MethodInterceptor interceptor = new GraphTransactionInterceptor();
requestInjection(interceptor);
bindInterceptor(Matchers.any(), Matchers.annotatedWith(GraphTransaction.class), interceptor);
}
protected void bindAuditRepository(Binder binder) {
//Map EntityAuditRepository interface to hbase based implementation
binder.bind(EntityAuditRepository.class).to(HBaseBasedAuditRepository.class).asEagerSingleton();
//Add HBaseBasedAuditRepository to service so that connection is closed at shutdown
Multibinder<Service> serviceBinder = Multibinder.newSetBinder(binder(), Service.class);
serviceBinder.addBinding().to(HBaseBasedAuditRepository.class);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.audit;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* Listener on entity create/update/delete, tag add/delete. Adds the corresponding audit event to the audit repository.
*/
public class EntityAuditListener implements EntityChangeListener {
private EntityAuditRepository auditRepository;
@Inject
public EntityAuditListener(EntityAuditRepository auditRepository) {
this.auditRepository = auditRepository;
}
@Override
public void onEntitiesAdded(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
List<EntityAuditRepository.EntityAuditEvent> events = new ArrayList<>();
long currentTime = System.currentTimeMillis();
for (ITypedReferenceableInstance entity : entities) {
EntityAuditRepository.EntityAuditEvent event = createEvent(entity, currentTime,
EntityAuditRepository.EntityAuditAction.ENTITY_CREATE,
"Created: " + InstanceSerialization.toJson(entity, true));
events.add(event);
}
auditRepository.putEvents(events);
}
private EntityAuditRepository.EntityAuditEvent createEvent(ITypedReferenceableInstance entity, long ts,
EntityAuditRepository.EntityAuditAction action,
String details) {
return new EntityAuditRepository.EntityAuditEvent(entity.getId()._getId(), ts, RequestContext.get().getUser(),
action, details);
}
@Override
public void onEntitiesUpdated(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
}
@Override
public void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException {
EntityAuditRepository.EntityAuditEvent event = createEvent(entity, System.currentTimeMillis(),
EntityAuditRepository.EntityAuditAction.TAG_ADD,
"Added trait: " + InstanceSerialization.toJson(trait, true));
auditRepository.putEvents(event);
}
@Override
public void onTraitDeleted(ITypedReferenceableInstance entity, String traitName) throws AtlasException {
EntityAuditRepository.EntityAuditEvent event = createEvent(entity, System.currentTimeMillis(),
EntityAuditRepository.EntityAuditAction.TAG_DELETE, "Deleted trait: " + traitName);
auditRepository.putEvents(event);
}
@Override
public void onEntitiesDeleted(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
List<EntityAuditRepository.EntityAuditEvent> events = new ArrayList<>();
long currentTime = System.currentTimeMillis();
for (ITypedReferenceableInstance entity : entities) {
EntityAuditRepository.EntityAuditEvent event = createEvent(entity, currentTime,
EntityAuditRepository.EntityAuditAction.ENTITY_DELETE, "Deleted entity");
events.add(event);
}
auditRepository.putEvents(events);
}
}
......@@ -27,6 +27,10 @@ import java.util.List;
* Interface for repository for storing entity audit events
*/
public interface EntityAuditRepository {
enum EntityAuditAction {
ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, TAG_ADD, TAG_DELETE;
}
/**
* Structure of entity audit event
*/
......@@ -34,13 +38,13 @@ public interface EntityAuditRepository {
String entityId;
Long timestamp;
String user;
String action;
EntityAuditAction action;
String details;
public EntityAuditEvent() {
}
public EntityAuditEvent(String entityId, long ts, String user, String action, String details) {
public EntityAuditEvent(String entityId, Long ts, String user, EntityAuditAction action, String details) {
this.entityId = entityId;
this.timestamp = ts;
this.user = user;
......@@ -61,7 +65,7 @@ public interface EntityAuditRepository {
EntityAuditEvent otherEvent = (EntityAuditEvent) other;
return StringUtils.equals(entityId, otherEvent.entityId) &&
(timestamp.longValue() == otherEvent.timestamp.longValue()) &&
StringUtils.equals(user, otherEvent.user) && StringUtils.equals(action, otherEvent.action) &&
StringUtils.equals(user, otherEvent.user) && (action == otherEvent.action) &&
StringUtils.equals(details, otherEvent.details);
}
......@@ -77,6 +81,26 @@ public interface EntityAuditRepository {
.append(user).append(";Action=").append(action).append(";Details=").append(details);
return builder.toString();
}
public String getEntityId() {
return entityId;
}
public Long getTimestamp() {
return timestamp;
}
public String getUser() {
return user;
}
public EntityAuditAction getAction() {
return action;
}
public String getDetails() {
return details;
}
}
/**
......@@ -87,6 +111,13 @@ public interface EntityAuditRepository {
void putEvents(EntityAuditEvent... events) throws AtlasException;
/**
* Add events to the event repository
* @param events events to be added
* @throws AtlasException
*/
void putEvents(List<EntityAuditEvent> events) throws AtlasException;
/**
* List events for the given entity id in decreasing order of timestamp, from the given timestamp. Returns n results
* @param entityId entity id
* @param ts starting timestamp for events
......
......@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
......@@ -80,16 +81,29 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
* @param events events to be added
* @throws AtlasException
*/
@Override
public void putEvents(EntityAuditRepository.EntityAuditEvent... events) throws AtlasException {
LOG.info("Putting {} events", events.length);
putEvents(Arrays.asList(events));
}
@Override
/**
* Add events to the event repository
* @param events events to be added
* @throws AtlasException
*/
public void putEvents(List<EntityAuditEvent> events) throws AtlasException {
LOG.info("Putting {} events", events.size());
Table table = null;
try {
table = connection.getTable(tableName);
List<Put> puts = new ArrayList<>(events.length);
List<Put> puts = new ArrayList<>(events.size());
for (EntityAuditRepository.EntityAuditEvent event : events) {
LOG.debug("Adding entity audit event {}", event);
Put put = new Put(getKey(event.entityId, event.timestamp));
addColumn(put, COLUMN_ACTION, event.action);
if (event.action != null) {
put.addColumn(COLUMN_FAMILY, COLUMN_ACTION, Bytes.toBytes((short)event.action.ordinal()));
}
addColumn(put, COLUMN_USER, event.user);
addColumn(put, COLUMN_DETAIL, event.details);
puts.add(put);
......@@ -145,7 +159,8 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
String key = Bytes.toString(result.getRow());
EntityAuditRepository.EntityAuditEvent event = fromKey(key);
event.user = getResultString(result, COLUMN_USER);
event.action = getResultString(result, COLUMN_ACTION);
event.action =
EntityAuditAction.values()[(Bytes.toShort(result.getValue(COLUMN_FAMILY, COLUMN_ACTION)))];
event.details = getResultString(result, COLUMN_DETAIL);
events.add(event);
}
......@@ -189,7 +204,7 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
* @throws AtlasException
* @param atlasConf
*/
public org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration atlasConf) throws AtlasException {
public static org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration atlasConf) throws AtlasException {
Configuration subsetAtlasConf =
ApplicationProperties.getSubsetConfiguration(atlasConf, CONFIG_PREFIX);
org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfiguration.create();
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.audit;
import org.apache.atlas.AtlasException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
/**
* Entity audit repository where audit events are stored in-memory. Used only for integration tests
*/
public class InMemoryEntityAuditRepository implements EntityAuditRepository {
private TreeMap<String, EntityAuditEvent> auditEvents = new TreeMap<>();
@Override
public void putEvents(EntityAuditEvent... events) throws AtlasException {
putEvents(Arrays.asList(events));
}
@Override
public synchronized void putEvents(List<EntityAuditEvent> events) throws AtlasException {
for (EntityAuditEvent event : events) {
auditEvents.put(event.entityId + (Long.MAX_VALUE - event.timestamp), event);
}
}
@Override
public List<EntityAuditEvent> listEvents(String entityId, Long ts, short maxResults)
throws AtlasException {
List<EntityAuditEvent> events = new ArrayList<>();
SortedMap<String, EntityAuditEvent> subMap = auditEvents.tailMap(entityId + (Long.MAX_VALUE - ts));
for (EntityAuditEvent event : subMap.values()) {
if (events.size() < maxResults && event.entityId.equals(entityId)) {
events.add(event);
}
}
return events;
}
}
......@@ -22,13 +22,11 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Provider;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.classification.InterfaceAudience;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.listener.TypesChangeListener;
import org.apache.atlas.repository.IndexCreationException;
import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.RepositoryException;
import org.apache.atlas.repository.typestore.ITypeStore;
......@@ -68,11 +66,8 @@ import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
......@@ -86,32 +81,44 @@ public class DefaultMetadataService implements MetadataService {
private static final Logger LOG = LoggerFactory.getLogger(DefaultMetadataService.class);
private final Collection<EntityChangeListener> entityChangeListeners = new LinkedHashSet<>();
private final TypeSystem typeSystem;
private final MetadataRepository repository;
private final ITypeStore typeStore;
private IBootstrapTypesRegistrar typesRegistrar;
private final Collection<Provider<TypesChangeListener>> typeChangeListeners;
private final Collection<TypesChangeListener> typeChangeListeners = new LinkedHashSet<>();
private final Collection<EntityChangeListener> entityChangeListeners = new LinkedHashSet<>();
@Inject
DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore,
final IBootstrapTypesRegistrar typesRegistrar,
final Collection<Provider<TypesChangeListener>> typeChangeListeners) throws AtlasException {
this(repository, typeStore, typesRegistrar, typeChangeListeners, TypeSystem.getInstance());
final Collection<Provider<TypesChangeListener>> typeListenerProviders,
final Collection<Provider<EntityChangeListener>> entityListenerProviders)
throws AtlasException {
this(repository, typeStore, typesRegistrar, typeListenerProviders, entityListenerProviders,
TypeSystem.getInstance());
}
DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore,
final IBootstrapTypesRegistrar typesRegistrar,
final Collection<Provider<TypesChangeListener>> typeChangeListeners,
final Collection<Provider<TypesChangeListener>> typeListenerProviders,
final Collection<Provider<EntityChangeListener>> entityListenerProviders,
final TypeSystem typeSystem) throws AtlasException {
this.typeStore = typeStore;
this.typesRegistrar = typesRegistrar;
this.typeSystem = typeSystem;
this.repository = repository;
this.typeChangeListeners = typeChangeListeners;
for (Provider<TypesChangeListener> provider : typeListenerProviders) {
typeChangeListeners.add(provider.get());
}
for (Provider<EntityChangeListener> provider : entityListenerProviders) {
entityChangeListeners.add(provider.get());
}
restoreTypeSystem();
typesRegistrar.registerTypes(ReservedTypesRegistrar.getTypesDir(), typeSystem, this);
}
......@@ -604,19 +611,8 @@ public class DefaultMetadataService implements MetadataService {
}
private void onTypesAdded(Map<String, IDataType> typesAdded) throws AtlasException {
Map<TypesChangeListener, Throwable> caughtExceptions = new HashMap<>();
for (Provider<TypesChangeListener> indexerProvider : typeChangeListeners) {
final TypesChangeListener listener = indexerProvider.get();
try {
listener.onAdd(typesAdded.values());
} catch (IndexCreationException ice) {
LOG.error("Index creation for listener {} failed ", indexerProvider, ice);
caughtExceptions.put(listener, ice);
}
}
if (caughtExceptions.size() > 0) {
throw new IndexCreationException("Index creation failed for types " + typesAdded.keySet() + ". Aborting");
for (TypesChangeListener listener : typeChangeListeners) {
listener.onAdd(typesAdded.values());
}
}
......@@ -637,19 +633,8 @@ public class DefaultMetadataService implements MetadataService {
}
private void onTypesUpdated(Map<String, IDataType> typesUpdated) throws AtlasException {
Map<TypesChangeListener, Throwable> caughtExceptions = new HashMap<>();
for (Provider<TypesChangeListener> indexerProvider : typeChangeListeners) {
final TypesChangeListener listener = indexerProvider.get();
try {
listener.onChange(typesUpdated.values());
} catch (IndexCreationException ice) {
LOG.error("Index creation for listener {} failed ", indexerProvider, ice);
caughtExceptions.put(listener, ice);
}
}
if (caughtExceptions.size() > 0) {
throw new IndexCreationException("Index creation failed for types " + typesUpdated.keySet() + ". Aborting");
for (TypesChangeListener listener : typeChangeListeners) {
listener.onChange(typesUpdated.values());
}
}
......
......@@ -19,15 +19,12 @@
package org.apache.atlas.discovery;
import com.google.common.collect.ImmutableSet;
import com.thinkaurelius.titan.core.TitanGraph;
import org.apache.atlas.BaseHiveRepositoryTest;
import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.TestUtils;
import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.graph.GraphProvider;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.persistence.Id;
......@@ -46,7 +43,6 @@ import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
......@@ -60,9 +56,6 @@ import static org.apache.atlas.typesystem.types.utils.TypesUtil.createRequiredAt
public class GraphBackedDiscoveryServiceTest extends BaseHiveRepositoryTest {
@Inject
private GraphProvider<TitanGraph> graphProvider;
@Inject
private MetadataRepository repositoryService;
@Inject
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.audit;
import org.apache.commons.lang.RandomStringUtils;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
import static org.testng.Assert.assertEquals;
public class AuditRepositoryTestBase {
protected EntityAuditRepository eventRepository;
private String rand() {
return RandomStringUtils.randomAlphanumeric(10);
}
@Test
public void testAddEvents() throws Exception {
EntityAuditRepository.EntityAuditEvent event =
new EntityAuditRepository.EntityAuditEvent(rand(), System.currentTimeMillis(), "u1",
EntityAuditRepository.EntityAuditAction.ENTITY_CREATE, "d1");
eventRepository.putEvents(event);
List<EntityAuditRepository.EntityAuditEvent> events =
eventRepository.listEvents(event.entityId, System.currentTimeMillis(), (short) 10);
assertEquals(events.size(), 1);
assertEquals(events.get(0), event);
}
@Test
public void testListPagination() throws Exception {
String id1 = "id1" + rand();
String id2 = "id2" + rand();
String id3 = "id3" + rand();
long ts = System.currentTimeMillis();
List<EntityAuditRepository.EntityAuditEvent> expectedEvents = new ArrayList<>(3);
for (int i = 0; i < 3; i++) {
//Add events for both ids
EntityAuditRepository.EntityAuditEvent event =
new EntityAuditRepository.EntityAuditEvent(id2, ts - i, "user" + i,
EntityAuditRepository.EntityAuditAction.ENTITY_UPDATE, "details" + i);
eventRepository.putEvents(event);
expectedEvents.add(event);
eventRepository.putEvents(new EntityAuditRepository.EntityAuditEvent(id1, ts - i, "user" + i,
EntityAuditRepository.EntityAuditAction.TAG_DELETE, "details" + i));
eventRepository.putEvents(new EntityAuditRepository.EntityAuditEvent(id3, ts - i, "user" + i,
EntityAuditRepository.EntityAuditAction.TAG_ADD, "details" + i));
}
//Use ts for which there is no event - ts + 2
List<EntityAuditRepository.EntityAuditEvent> events = eventRepository.listEvents(id2, ts + 2, (short) 2);
assertEquals(events.size(), 2);
assertEquals(events.get(0), expectedEvents.get(0));
assertEquals(events.get(1), expectedEvents.get(1));
//Use last event's timestamp for next list(). Should give only 1 event and shouldn't include events from other id
events = eventRepository.listEvents(id2, events.get(1).timestamp - 1, (short) 3);
assertEquals(events.size(), 1);
assertEquals(events.get(0), expectedEvents.get(2));
}
}
......@@ -19,45 +19,24 @@
package org.apache.atlas.repository.audit;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
public class HBaseBasedAuditRepositoryTest {
private HBaseTestingUtility testUtility;
private HBaseBasedAuditRepository eventRepository;
private LocalHBaseCluster hbaseCluster;
public class HBaseBasedAuditRepositoryTest extends AuditRepositoryTestBase {
private TableName tableName;
@BeforeClass
public void setup() throws Exception {
testUtility = HBaseTestingUtility.createLocalHTU();
testUtility.startMiniZKCluster();
testUtility.getConfiguration().set("zookeeper.session.timeout.ms", "1000");
hbaseCluster = new LocalHBaseCluster(testUtility.getConfiguration());
hbaseCluster.startup();
eventRepository = new HBaseBasedAuditRepository() {
@Override
public org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration atlasConf)
throws AtlasException {
return testUtility.getConfiguration();
}
};
eventRepository.start();
eventRepository = new HBaseBasedAuditRepository();
HBaseTestUtils.startCluster();
((HBaseBasedAuditRepository)eventRepository).start();
Configuration properties = ApplicationProperties.get();
String tableNameStr = properties.getString(HBaseBasedAuditRepository.CONFIG_TABLE_NAME,
......@@ -67,63 +46,14 @@ public class HBaseBasedAuditRepositoryTest {
@AfterClass
public void teardown() throws Exception {
eventRepository.stop();
testUtility.getConnection().close();
hbaseCluster.shutdown();
testUtility.shutdownMiniZKCluster();
}
private String rand() {
return RandomStringUtils.randomAlphanumeric(10);
((HBaseBasedAuditRepository)eventRepository).stop();
HBaseTestUtils.stopCluster();
}
@Test
public void testTableCreated() throws Exception {
Admin admin = testUtility.getConnection().getAdmin();
Connection connection = HBaseTestUtils.getConnection();
Admin admin = connection.getAdmin();
assertTrue(admin.tableExists(tableName));
}
@Test
public void testAddEvents() throws Exception {
EntityAuditRepository.EntityAuditEvent event =
new EntityAuditRepository.EntityAuditEvent(rand(), System.currentTimeMillis(), "u1", "a1", "d1");
eventRepository.putEvents(event);
List<EntityAuditRepository.EntityAuditEvent> events =
eventRepository.listEvents(event.entityId, System.currentTimeMillis(), (short) 10);
assertEquals(events.size(), 1);
assertEquals(events.get(0), event);
}
@Test
public void testListPagination() throws Exception {
String id1 = "id1" + rand();
String id2 = "id2" + rand();
String id3 = "id3" + rand();
long ts = System.nanoTime();
List<EntityAuditRepository.EntityAuditEvent> expectedEvents = new ArrayList<>(3);
for (int i = 0; i < 3; i++) {
//Add events for both ids
EntityAuditRepository.EntityAuditEvent event =
new EntityAuditRepository.EntityAuditEvent(id2, ts - i, "user" + i, "action" + i, "details" + i);
eventRepository.putEvents(event);
expectedEvents.add(event);
eventRepository.putEvents(new EntityAuditRepository.EntityAuditEvent(id1, ts - i, "user" + i,
"action" + i, "details" + i));
eventRepository.putEvents(new EntityAuditRepository.EntityAuditEvent(id3, ts - i, "user" + i,
"action" + i, "details" + i));
}
//Use ts for which there is no event - ts + 2
List<EntityAuditRepository.EntityAuditEvent> events = eventRepository.listEvents(id2, ts + 2, (short) 2);
assertEquals(events.size(), 2);
assertEquals(events.get(0), expectedEvents.get(0));
assertEquals(events.get(1), expectedEvents.get(1));
//Use last event's timestamp for next list(). Should give only 1 event and shouldn't include events from other id
events = eventRepository.listEvents(id2, events.get(1).timestamp - 1, (short) 3);
assertEquals(events.size(), 1);
assertEquals(events.get(0), expectedEvents.get(2));
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.audit;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.RequestContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.client.Connection;
import java.io.IOException;
public class HBaseTestUtils {
private static HBaseTestingUtility hbaseTestUtility;
private static LocalHBaseCluster hbaseCluster;
public static void startCluster() throws Exception {
Configuration hbaseConf =
HBaseBasedAuditRepository.getHBaseConfiguration(ApplicationProperties.get());
hbaseTestUtility = new HBaseTestingUtility(hbaseConf);
int zkPort = hbaseConf.getInt("hbase.zookeeper.property.clientPort", 19026);
hbaseTestUtility.startMiniZKCluster(1, zkPort);
hbaseCluster = new LocalHBaseCluster(hbaseTestUtility.getConfiguration());
hbaseCluster.startup();
RequestContext.createContext();
RequestContext.get().setUser("testuser");
}
public static void stopCluster() throws Exception {
hbaseTestUtility.getConnection().close();
hbaseCluster.shutdown();
hbaseTestUtility.shutdownMiniZKCluster();
}
public static Connection getConnection() throws IOException {
return hbaseTestUtility.getConnection();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.audit;
import org.testng.annotations.BeforeClass;
public class InMemoryAuditRepositoryTest extends AuditRepositoryTestBase {
@BeforeClass
public void setup() {
eventRepository = new InMemoryEntityAuditRepository();
}
}
......@@ -25,6 +25,9 @@ import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.util.TitanCleanup;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.repository.audit.EntityAuditRepository;
import org.apache.atlas.repository.audit.HBaseBasedAuditRepository;
import org.apache.atlas.repository.audit.HBaseTestUtils;
import org.apache.atlas.typesystem.exception.TypeNotFoundException;
import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.typesystem.types.ClassType;
......@@ -71,14 +74,19 @@ import java.util.Map;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@Guice(modules = RepositoryMetadataModule.class)
public class DefaultMetadataServiceTest {
@Inject
private MetadataService metadataService;
@Inject
private GraphProvider<TitanGraph> graphProvider;
@Inject
private EntityAuditRepository repository;
private Referenceable db = createDBEntity();
private Id dbId;
......@@ -90,6 +98,11 @@ public class DefaultMetadataServiceTest {
@BeforeTest
public void setUp() throws Exception {
if (repository instanceof HBaseBasedAuditRepository) {
HBaseTestUtils.startCluster();
((HBaseBasedAuditRepository) repository).start();
}
TypesDef typesDef = TestUtils.defineHiveTypes();
try {
metadataService.getTypeDefinition(TestUtils.TABLE_TYPE);
......@@ -109,7 +122,7 @@ public class DefaultMetadataServiceTest {
}
@AfterTest
public void shutdown() {
public void shutdown() throws Exception {
TypeSystem.getInstance().reset();
try {
//TODO - Fix failure during shutdown while using BDB
......@@ -122,6 +135,11 @@ public class DefaultMetadataServiceTest {
} catch(Exception e) {
e.printStackTrace();
}
if (repository instanceof HBaseBasedAuditRepository) {
((HBaseBasedAuditRepository) repository).stop();
HBaseTestUtils.stopCluster();
}
}
private String createInstance(Referenceable entity) throws Exception {
......@@ -172,6 +190,7 @@ public class DefaultMetadataServiceTest {
entity.set("type", "VARCHAR(32)");
return entity;
}
@Test(expectedExceptions = TypeNotFoundException.class)
public void testCreateEntityWithUnknownDatatype() throws Exception {
Referenceable entity = new Referenceable("Unknown datatype");
......@@ -179,7 +198,7 @@ public class DefaultMetadataServiceTest {
entity.set("name", dbName);
entity.set("description", "us db");
createInstance(entity);
Assert.fail(TypeNotFoundException.class.getSimpleName() +" was expected but none thrown.");
Assert.fail(TypeNotFoundException.class.getSimpleName() + " was expected but none thrown.");
}
@Test
......@@ -187,6 +206,7 @@ public class DefaultMetadataServiceTest {
//name is the unique attribute
Referenceable entity = createDBEntity();
String id = createInstance(entity);
assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.ENTITY_CREATE);
//using the same name should succeed, but not create another entity
String newId = createInstance(entity);
......@@ -199,6 +219,35 @@ public class DefaultMetadataServiceTest {
}
@Test
public void testEntityAudit() throws Exception {
//create entity
Referenceable entity = createDBEntity();
String id = createInstance(entity);
assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.ENTITY_CREATE);
Struct tag = new Struct(TestUtils.PII);
metadataService.addTrait(id, InstanceSerialization.toJson(tag, true));
assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.TAG_ADD);
metadataService.deleteTrait(id, TestUtils.PII);
assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.TAG_DELETE);
metadataService.deleteEntities(Arrays.asList(id));
assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.ENTITY_DELETE);
}
private void assertAuditEvents(String id, EntityAuditRepository.EntityAuditAction action) throws Exception {
List<EntityAuditRepository.EntityAuditEvent> events =
repository.listEvents(id, System.currentTimeMillis(), (short) 10);
for (EntityAuditRepository.EntityAuditEvent event : events) {
if (event.getAction() == action) {
return;
}
}
fail("Didn't find " + action + " in audit events");
}
@Test
public void testCreateEntityWithUniqueAttributeWithReference() throws Exception {
Referenceable db = createDBEntity();
String dbId = createInstance(db);
......@@ -468,7 +517,7 @@ public class DefaultMetadataServiceTest {
tableDefinitionJson =
metadataService.getEntityDefinition(tableId._getId());
tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true);
Assert.assertNull(((Struct)tableDefinition.get("serde1")).get("description"));
Assert.assertNull(((Struct) tableDefinition.get("serde1")).get("description"));
}
......@@ -718,8 +767,6 @@ public class DefaultMetadataServiceTest {
@Test
public void testDeleteEntities() throws Exception {
// Create 2 table entities, each with 3 composite column entities
Referenceable dbEntity = createDBEntity();
String dbGuid = createInstance(dbEntity);
......
......@@ -20,18 +20,16 @@ package org.apache.atlas.services;
import com.google.inject.Provider;
import org.apache.atlas.AtlasException;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.listener.TypesChangeListener;
import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.typestore.ITypeStore;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.mockito.Matchers;
import org.testng.annotations.Test;
import java.util.ArrayList;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
......@@ -45,7 +43,8 @@ public class DefaultMetadataServiceMockTest {
when(typeSystem.isRegistered(any(String.class))).thenReturn(true);
DefaultMetadataService defaultMetadataService = new DefaultMetadataService(mock(MetadataRepository.class),
mock(ITypeStore.class),
typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(), typeSystem);
typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(),
new ArrayList<Provider<EntityChangeListener>>(), typeSystem);
verify(typesRegistrar).registerTypes(ReservedTypesRegistrar.getTypesDir(),
typeSystem, defaultMetadataService);
......
......@@ -47,7 +47,6 @@
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-typesystem</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RequestContext {
private static final Logger LOG = LoggerFactory.getLogger(RequestContext.class);
private static final ThreadLocal<RequestContext> CURRENT_CONTEXT = new ThreadLocal<>();
private String user;
private RequestContext() {
}
public static RequestContext get() {
return CURRENT_CONTEXT.get();
}
public static RequestContext createContext() {
RequestContext context = new RequestContext();
CURRENT_CONTEXT.set(context);
return context;
}
public static void clear() {
CURRENT_CONTEXT.remove();
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
}
......@@ -22,7 +22,6 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import org.apache.atlas.AtlasException;
import org.apache.atlas.classification.InterfaceAudience;
import org.apache.atlas.typesystem.TypesDef;
......@@ -30,7 +29,6 @@ import org.apache.atlas.typesystem.exception.TypeExistsException;
import org.apache.atlas.typesystem.exception.TypeNotFoundException;
import javax.inject.Singleton;
import java.lang.reflect.Constructor;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.typesystem.types;
import com.google.inject.Provider;
public class TypeSystemProvider implements Provider<TypeSystem> {
@Override
public TypeSystem get() {
return TypeSystem.getInstance();
}
}
......@@ -71,6 +71,12 @@ atlas.kafka.auto.commit.interval.ms=100
atlas.kafka.hook.group.id=atlas
atlas.kafka.entities.group.id=atlas_entities
######### Entity Audit Configs #########
atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS
atlas.audit.zookeeper.session.timeout.ms=1000
atlas.audit.hbase.zookeeper.quorum=localhost
atlas.audit.hbase.zookeeper.property.clientPort=19026
######### Security Properties #########
# SSL config
......@@ -80,3 +86,5 @@ atlas.server.https.port=31443
######### Security Properties #########
hbase.security.authentication=simple
atlas.hook.falcon.synchronous=true
\ No newline at end of file
......@@ -342,10 +342,10 @@
</httpConnector>
<war>${project.build.directory}/atlas-webapp-${project.version}.war</war>
<daemon>true</daemon>
<!--<webAppSourceDirectory>webapp/src/test/webapp</webAppSourceDirectory>-->
<webAppSourceDirectory>webapp/src/test/webapp</webAppSourceDirectory>
<webApp>
<contextPath>/</contextPath>
<descriptor>webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
<descriptor>${project.basedir}/src/test/webapp/WEB-INF/web.xml</descriptor>
<!-- ${project.build.directory}/atlas-webapp-${project.version} -->
<extraClasspath>${project.build.directory}/../../webapp/target/test-classes/</extraClasspath>
</webApp>
......
......@@ -20,22 +20,32 @@ package org.apache.atlas.web.filters;
import com.google.inject.Singleton;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.RequestContext;
import org.apache.atlas.security.SecurityProperties;
import org.apache.atlas.web.util.Servlets;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
import org.apache.log4j.NDC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.Properties;
/**
......@@ -47,6 +57,27 @@ public class AtlasAuthenticationFilter extends AuthenticationFilter {
private static final Logger LOG = LoggerFactory.getLogger(AtlasAuthenticationFilter.class);
static final String PREFIX = "atlas.http.authentication";
/**
* An options servlet is used to authenticate users. OPTIONS method is used for triggering authentication
* before invoking the actual resource.
*/
private HttpServlet optionsServlet;
/**
* Initialize the filter.
*
* @param filterConfig filter configuration.
* @throws ServletException thrown if the filter could not be initialized.
*/
@Override
public void init(FilterConfig filterConfig) throws ServletException {
LOG.info("AtlasAuthenticationFilter initialization started");
super.init(filterConfig);
optionsServlet = new HttpServlet() {};
optionsServlet.init();
}
@Override
protected Properties getConfiguration(String configPrefix, FilterConfig filterConfig) throws ServletException {
Configuration configuration;
......@@ -94,4 +125,50 @@ public class AtlasAuthenticationFilter extends AuthenticationFilter {
return config;
}
@Override
public void doFilter(final ServletRequest request, final ServletResponse response,
final FilterChain filterChain) throws IOException, ServletException {
FilterChain filterChainWrapper = new FilterChain() {
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse)
throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
if (httpRequest.getMethod().equals("OPTIONS")) { // option request meant only for authentication
optionsServlet.service(request, response);
} else {
final String user = Servlets.getUserFromRequest(httpRequest);
if (StringUtils.isEmpty(user)) {
((HttpServletResponse) response).sendError(Response.Status.BAD_REQUEST.getStatusCode(),
"Param user.name can't be empty");
} else {
try {
NDC.push(user + ":" + httpRequest.getMethod() + httpRequest.getRequestURI());
RequestContext requestContext = RequestContext.get();
requestContext.setUser(user);
LOG.info("Request from authenticated user: {}, URL={}", user,
Servlets.getRequestURI(httpRequest));
filterChain.doFilter(servletRequest, servletResponse);
} finally {
NDC.pop();
}
}
}
}
};
super.doFilter(request, response, filterChainWrapper);
}
@Override
public void destroy() {
if (optionsServlet != null) {
optionsServlet.destroy();
}
super.destroy();
}
}
......@@ -20,6 +20,7 @@ package org.apache.atlas.web.filters;
import com.google.inject.Singleton;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.RequestContext;
import org.apache.atlas.web.util.DateTimeHelper;
import org.apache.atlas.web.util.Servlets;
import org.slf4j.Logger;
......@@ -60,15 +61,19 @@ public class AuditFilter implements Filter {
final String requestId = UUID.randomUUID().toString();
final Thread currentThread = Thread.currentThread();
final String oldName = currentThread.getName();
String user = getUserFromRequest(httpRequest);
try {
currentThread.setName(formatName(oldName, requestId));
recordAudit(httpRequest, requestTimeISO9601);
RequestContext requestContext = RequestContext.createContext();
requestContext.setUser(user);
recordAudit(httpRequest, requestTimeISO9601, user);
filterChain.doFilter(request, response);
} finally {
// put the request id into the response so users can trace logs for this request
((HttpServletResponse) response).setHeader(AtlasClient.REQUEST_ID, requestId);
currentThread.setName(oldName);
RequestContext.clear();;
}
}
......@@ -76,8 +81,7 @@ public class AuditFilter implements Filter {
return oldName + " - " + requestId;
}
private void recordAudit(HttpServletRequest httpRequest, String whenISO9601) {
final String who = getUserFromRequest(httpRequest);
private void recordAudit(HttpServletRequest httpRequest, String whenISO9601, String who) {
final String fromHost = httpRequest.getRemoteHost();
final String fromAddress = httpRequest.getRemoteAddr();
final String whatRequest = httpRequest.getMethod();
......
......@@ -21,6 +21,7 @@ package org.apache.atlas.web.listeners;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provider;
import com.google.inject.TypeLiteral;
import com.google.inject.servlet.GuiceServletContextListener;
......@@ -33,13 +34,9 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule;
import org.apache.atlas.notification.entity.NotificationEntityChangeListener;
import org.apache.atlas.repository.graph.GraphProvider;
import org.apache.atlas.service.Services;
import org.apache.atlas.services.MetadataService;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.web.filters.AtlasAuthenticationFilter;
import org.apache.atlas.web.filters.AuditFilter;
import org.apache.commons.configuration.Configuration;
......@@ -75,7 +72,7 @@ public class GuiceServletConfig extends GuiceServletContextListener {
LoginProcessor loginProcessor = new LoginProcessor();
loginProcessor.login();
injector = Guice.createInjector(new RepositoryMetadataModule(), new NotificationModule(),
injector = Guice.createInjector(getRepositoryModule(), new NotificationModule(),
new JerseyServletModule() {
@Override
protected void configureServlets() {
......@@ -99,6 +96,7 @@ public class GuiceServletConfig extends GuiceServletContextListener {
try {
Configuration configuration = ApplicationProperties.get();
if (Boolean.valueOf(configuration.getString(HTTP_AUTHENTICATION_ENABLED))) {
LOG.info("Enabling AuthenticationFilter");
filter("/*").through(AtlasAuthenticationFilter.class);
}
} catch (AtlasException e) {
......@@ -113,13 +111,16 @@ public class GuiceServletConfig extends GuiceServletContextListener {
return injector;
}
protected Module getRepositoryModule() {
return new RepositoryMetadataModule();
}
@Override
public void contextInitialized(ServletContextEvent servletContextEvent) {
super.contextInitialized(servletContextEvent);
installLogBridge();
initMetadataService();
startServices();
}
......@@ -148,7 +149,12 @@ public class GuiceServletConfig extends GuiceServletContextListener {
TypeLiteral<GraphProvider<TitanGraph>> graphProviderType = new TypeLiteral<GraphProvider<TitanGraph>>() {};
Provider<GraphProvider<TitanGraph>> graphProvider = injector.getProvider(Key.get(graphProviderType));
final Graph graph = graphProvider.get().get();
graph.shutdown();
try {
graph.shutdown();
} catch(Throwable t) {
LOG.warn("Error while shutting down graph", t);
}
//stop services
stopServices();
......@@ -160,17 +166,4 @@ public class GuiceServletConfig extends GuiceServletContextListener {
Services services = injector.getInstance(Services.class);
services.stop();
}
// initialize the metadata service
private void initMetadataService() {
MetadataService metadataService = injector.getInstance(MetadataService.class);
// add a listener for entity changes
NotificationInterface notificationInterface = injector.getInstance(NotificationInterface.class);
NotificationEntityChangeListener listener =
new NotificationEntityChangeListener(notificationInterface, TypeSystem.getInstance());
metadataService.registerListener(listener);
}
}
\ No newline at end of file
......@@ -45,9 +45,14 @@ public class EmbeddedServer {
Connector connector = getConnector(port);
server.addConnector(connector);
WebAppContext application = getWebAppContext(path);
server.setHandler(application);
}
protected WebAppContext getWebAppContext(String path) {
WebAppContext application = new WebAppContext(path, "/");
application.setClassLoader(Thread.currentThread().getContextClassLoader());
server.setHandler(application);
return application;
}
public static EmbeddedServer newServer(int port, String path, boolean secure) throws IOException {
......
......@@ -33,6 +33,8 @@ import static org.testng.Assert.assertEquals;
@Guice(modules = NotificationModule.class)
public class NotificationHookConsumerIT extends BaseResourceIT {
private static final String TEST_USER = "testuser";
@Inject
private NotificationInterface kafka;
......@@ -57,7 +59,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
entity.set("name", "db" + randomString());
entity.set("description", randomString());
sendHookMessage(new HookNotification.EntityCreateRequest(entity));
sendHookMessage(new HookNotification.EntityCreateRequest(TEST_USER, entity));
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
......@@ -79,7 +81,8 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
final Referenceable newEntity = new Referenceable(DATABASE_TYPE);
newEntity.set("owner", randomString());
sendHookMessage(new HookNotification.EntityPartialUpdateRequest(DATABASE_TYPE, "name", dbName, newEntity));
sendHookMessage(
new HookNotification.EntityPartialUpdateRequest(TEST_USER, DATABASE_TYPE, "name", dbName, newEntity));
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
......@@ -105,7 +108,8 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
final String newName = "db" + randomString();
newEntity.set("name", newName);
sendHookMessage(new HookNotification.EntityPartialUpdateRequest(DATABASE_TYPE, "name", dbName, newEntity));
sendHookMessage(
new HookNotification.EntityPartialUpdateRequest(TEST_USER, DATABASE_TYPE, "name", dbName, newEntity));
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
......@@ -135,7 +139,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
newEntity.set("owner", randomString());
//updating unique attribute
sendHookMessage(new HookNotification.EntityUpdateRequest(newEntity));
sendHookMessage(new HookNotification.EntityUpdateRequest(TEST_USER, newEntity));
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
......
......@@ -16,13 +16,14 @@
*/
package org.apache.atlas.web.filters;
import org.apache.atlas.RequestContext;
import org.apache.atlas.web.security.BaseSecurityTest;
import org.apache.atlas.web.service.EmbeddedServer;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.testng.Assert;
import org.eclipse.jetty.webapp.WebAppContext;
import org.testng.annotations.Test;
import javax.security.auth.Subject;
......@@ -40,10 +41,12 @@ import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.Properties;
import static org.testng.Assert.assertEquals;
/**
*
*/
public class MetadataAuthenticationKerberosFilterIT extends BaseSecurityTest {
public class AtlasAuthenticationKerberosFilterIT extends BaseSecurityTest {
public static final String TEST_USER_JAAS_SECTION = "TestUser";
public static final String TESTUSER = "testuser";
public static final String TESTPASS = "testpass";
......@@ -59,6 +62,14 @@ public class MetadataAuthenticationKerberosFilterIT extends BaseSecurityTest {
Server getServer() {
return server;
}
@Override
protected WebAppContext getWebAppContext(String path) {
WebAppContext application = new WebAppContext(path, "/");
application.setDescriptor(System.getProperty("projectBaseDir") + "/webapp/src/test/webapp/WEB-INF/web.xml");
application.setClassLoader(Thread.currentThread().getContextClassLoader());
return application;
}
}
@Test(enabled = false)
......@@ -86,7 +97,7 @@ public class MetadataAuthenticationKerberosFilterIT extends BaseSecurityTest {
connection.setRequestMethod("GET");
connection.connect();
Assert.assertEquals(connection.getResponseCode(), 401);
assertEquals(connection.getResponseCode(), 401);
// need to populate the ticket cache with a local user, so logging in...
Subject subject = loginTestUser();
......@@ -100,8 +111,8 @@ public class MetadataAuthenticationKerberosFilterIT extends BaseSecurityTest {
connection.setRequestMethod("GET");
connection.connect();
Assert.assertEquals(connection.getResponseCode(), 200);
assertEquals(connection.getResponseCode(), 200);
assertEquals(RequestContext.get().getUser(), TESTUSER);
return null;
}
});
......
......@@ -16,11 +16,11 @@
*/
package org.apache.atlas.web.filters;
import org.apache.atlas.RequestContext;
import org.apache.atlas.web.security.BaseSecurityTest;
import org.apache.atlas.web.service.EmbeddedServer;
import org.apache.commons.configuration.ConfigurationException;
import org.eclipse.jetty.server.Server;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.io.IOException;
......@@ -28,10 +28,13 @@ import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Properties;
import static org.testng.Assert.assertEquals;
/**
*
*/
public class MetadataAuthenticationSimpleFilterIT extends BaseSecurityTest {
public class AtlasAuthenticationSimpleFilterIT extends BaseSecurityTest {
public static final String TESTUSER = "testuser";
class TestEmbeddedServer extends EmbeddedServer {
public TestEmbeddedServer(int port, String path) throws IOException {
......@@ -60,7 +63,7 @@ public class MetadataAuthenticationSimpleFilterIT extends BaseSecurityTest {
connection.connect();
try {
Assert.assertEquals(connection.getResponseCode(), 403);
assertEquals(connection.getResponseCode(), 403);
} catch (Exception e) {
e.printStackTrace();
}
......@@ -70,7 +73,8 @@ public class MetadataAuthenticationSimpleFilterIT extends BaseSecurityTest {
connection.setRequestMethod("GET");
connection.connect();
Assert.assertEquals(connection.getResponseCode(), 200);
assertEquals(connection.getResponseCode(), 200);
assertEquals(RequestContext.get().getUser(), TESTUSER);
} finally {
server.getServer().stop();
if (originalConf != null) {
......
......@@ -17,6 +17,7 @@
package org.apache.atlas.web.listeners;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provider;
import com.google.inject.TypeLiteral;
import com.thinkaurelius.titan.core.TitanGraph;
......@@ -60,6 +61,11 @@ public class TestGuiceServletConfig extends GuiceServletConfig {
}
@Override
protected Module getRepositoryModule() {
return new TestModule();
}
@Override
protected void startServices() {
try {
Configuration conf = ApplicationProperties.get();
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.listeners;
import com.google.inject.Binder;
import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.repository.audit.EntityAuditRepository;
import org.apache.atlas.repository.audit.InMemoryEntityAuditRepository;
public class TestModule extends RepositoryMetadataModule {
@Override
protected void bindAuditRepository(Binder binder) {
//Map EntityAuditRepository interface to hbase based implementation
binder.bind(EntityAuditRepository.class).to(InMemoryEntityAuditRepository.class).asEagerSingleton();
}
}
......@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.webapp.WebAppContext;
import java.io.File;
import java.io.IOException;
......@@ -51,8 +52,11 @@ public class BaseSSLAndKerberosTest extends BaseSecurityTest {
}
@Override
public org.apache.commons.configuration.Configuration getConfiguration() {
return super.getConfiguration();
protected WebAppContext getWebAppContext(String path) {
WebAppContext application = new WebAppContext(path, "/");
application.setDescriptor(System.getProperty("projectBaseDir") + "/webapp/src/test/webapp/WEB-INF/web.xml");
application.setClassLoader(Thread.currentThread().getContextClassLoader());
return application;
}
}
......
......@@ -32,11 +32,15 @@ import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.net.URL;
import java.nio.file.Files;
import java.util.Locale;
import java.util.Properties;
import static org.apache.atlas.security.SecurityProperties.*;
import static org.apache.atlas.security.SecurityProperties.CERT_STORES_CREDENTIAL_PROVIDER_PATH;
import static org.apache.atlas.security.SecurityProperties.KEYSTORE_FILE_KEY;
import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED;
import static org.apache.atlas.security.SecurityProperties.TRUSTSTORE_FILE_KEY;
/**
*
......@@ -135,4 +139,23 @@ public class BaseSecurityTest {
return configuration;
}
public static String writeConfiguration(final PropertiesConfiguration configuration) throws Exception {
String persistDir = TestUtils.getTempDirectory();
TestUtils.writeConfiguration(configuration, persistDir + File.separator +
ApplicationProperties.APPLICATION_PROPERTIES);
String confLocation = System.getProperty("atlas.conf");
URL url;
if (confLocation == null) {
url = BaseSecurityTest.class.getResource("/" + ApplicationProperties.APPLICATION_PROPERTIES);
} else {
url = new File(confLocation, ApplicationProperties.APPLICATION_PROPERTIES).toURI().toURL();
}
PropertiesConfiguration configuredProperties = new PropertiesConfiguration();
configuredProperties.load(url);
TestUtils.writeConfiguration(configuredProperties, persistDir + File.separator +
ApplicationProperties.APPLICATION_PROPERTIES);
ApplicationProperties.forceReload();
return persistDir;
}
}
......@@ -22,6 +22,7 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.web.TestUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.alias.JavaKeyStoreProvider;
......@@ -95,7 +96,7 @@ public class NegativeSSLAndKerberosTest extends BaseSSLAndKerberosTest {
System.setProperty("atlas.conf", persistDir);
secureEmbeddedServer = new TestSecureEmbeddedServer(21443, getWarPath()) {
@Override
public PropertiesConfiguration getConfiguration() {
public Configuration getConfiguration() {
return configuration;
}
};
......
......@@ -18,10 +18,8 @@
package org.apache.atlas.web.security;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.web.TestUtils;
import org.apache.atlas.web.service.SecureEmbeddedServer;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.conf.Configuration;
......@@ -30,6 +28,7 @@ import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.hadoop.security.alias.JavaKeyStoreProvider;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.webapp.WebAppContext;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
......@@ -47,6 +46,7 @@ public class SSLTest extends BaseSSLAndKerberosTest {
private Path jksPath;
private String providerUrl;
private TestSecureEmbeddedServer secureEmbeddedServer;
private String originalConf;
class TestSecureEmbeddedServer extends SecureEmbeddedServer {
......@@ -59,8 +59,11 @@ public class SSLTest extends BaseSSLAndKerberosTest {
}
@Override
public org.apache.commons.configuration.Configuration getConfiguration() {
return super.getConfiguration();
protected WebAppContext getWebAppContext(String path) {
WebAppContext application = new WebAppContext(path, "/");
application.setDescriptor(System.getProperty("projectBaseDir") + "/webapp/src/test/webapp/WEB-INF/web.xml");
application.setClassLoader(Thread.currentThread().getContextClassLoader());
return application;
}
}
......@@ -69,13 +72,9 @@ public class SSLTest extends BaseSSLAndKerberosTest {
jksPath = new Path(Files.createTempDirectory("tempproviders").toString(), "test.jks");
providerUrl = JavaKeyStoreProvider.SCHEME_NAME + "://file/" + jksPath.toUri();
String persistDir = TestUtils.getTempDirectory();
setupCredentials();
final PropertiesConfiguration configuration = getSSLConfiguration(providerUrl);
TestUtils.writeConfiguration(configuration, persistDir + File.separator +
ApplicationProperties.APPLICATION_PROPERTIES);
String persistDir = writeConfiguration(configuration);
dgiCLient = new AtlasClient(DGI_URL) {
@Override
......@@ -84,6 +83,8 @@ public class SSLTest extends BaseSSLAndKerberosTest {
}
};
originalConf = System.getProperty("atlas.conf");
System.setProperty("atlas.conf", persistDir);
secureEmbeddedServer = new TestSecureEmbeddedServer(21443, getWarPath()) {
@Override
public PropertiesConfiguration getConfiguration() {
......@@ -98,6 +99,10 @@ public class SSLTest extends BaseSSLAndKerberosTest {
if (secureEmbeddedServer != null) {
secureEmbeddedServer.getServer().stop();
}
if (originalConf != null) {
System.setProperty("atlas.conf", originalConf);
}
}
protected void setupCredentials() throws Exception {
......
......@@ -18,8 +18,11 @@
package org.apache.atlas.web.service;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.web.TestUtils;
import org.apache.atlas.web.security.BaseSecurityTest;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.eclipse.jetty.webapp.WebAppContext;
import org.testng.Assert;
import org.testng.annotations.Test;
......@@ -34,10 +37,16 @@ public class SecureEmbeddedServerTest extends SecureEmbeddedServerTestBase {
// setup the configuration
final PropertiesConfiguration configuration = new PropertiesConfiguration();
configuration.setProperty(CERT_STORES_CREDENTIAL_PROVIDER_PATH, providerUrl);
configuration.setProperty("atlas.services.enabled", false);
configuration.setProperty("atlas.notification.embedded", "false");
// setup the credential provider
setupCredentials();
String persistDir = BaseSecurityTest.writeConfiguration(configuration);
String originalConf = System.getProperty("atlas.conf");
System.setProperty("atlas.conf", persistDir);
ApplicationProperties.forceReload();
SecureEmbeddedServer secureEmbeddedServer = null;
try {
secureEmbeddedServer = new SecureEmbeddedServer(21443, TestUtils.getWarPath()) {
......@@ -45,6 +54,16 @@ public class SecureEmbeddedServerTest extends SecureEmbeddedServerTestBase {
protected PropertiesConfiguration getConfiguration() {
return configuration;
}
@Override
protected WebAppContext getWebAppContext(String path) {
WebAppContext application = new WebAppContext(path, "/");
application.setDescriptor(
System.getProperty("projectBaseDir") + "/webapp/src/test/webapp/WEB-INF/web.xml");
application.setClassLoader(Thread.currentThread().getContextClassLoader());
return application;
}
};
secureEmbeddedServer.server.start();
......@@ -59,7 +78,12 @@ public class SecureEmbeddedServerTest extends SecureEmbeddedServerTestBase {
Assert.fail("War deploy failed", e);
} finally {
secureEmbeddedServer.server.stop();
if (originalConf == null) {
System.clearProperty("atlas.conf");
} else {
System.setProperty("atlas.conf", originalConf);
}
}
}
}
......@@ -103,8 +103,11 @@ public class SecureEmbeddedServerTestBase {
@Test
public void testNoConfiguredCredentialProvider() throws Exception {
String originalConf = null;
try {
originalConf = System.getProperty("atlas.conf");
System.clearProperty("atlas.conf");
ApplicationProperties.forceReload();
secureEmbeddedServer = new SecureEmbeddedServer(securePort, TestUtils.getWarPath());
secureEmbeddedServer.server.start();
......@@ -113,7 +116,15 @@ public class SecureEmbeddedServerTestBase {
Assert.assertEquals(e.getMessage(),
"No credential provider path configured for storage of certificate store passwords");
} finally {
secureEmbeddedServer.server.stop();
if (secureEmbeddedServer != null) {
secureEmbeddedServer.server.stop();
}
if (originalConf == null) {
System.clearProperty("atlas.conf");
} else {
System.setProperty("atlas.conf", originalConf);
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment