Commit 2cae42c0 by Suma Shivaprasad

ATLAS-394 Fix BaseResourceIT.waitForNotification (shwethags via sumasai)

parent fa502b21
......@@ -57,6 +57,7 @@ atlas.kafka.bootstrap.servers=localhost:9027
atlas.kafka.zookeeper.session.timeout.ms=400
atlas.kafka.zookeeper.sync.time.ms=20
atlas.kafka.auto.commit.interval.ms=1000
atlas.kafka.auto.offset.reset=smallest
atlas.kafka.hook.group.id=atlas
......
......@@ -69,4 +69,10 @@ public class KafkaConsumer<T> extends AbstractNotificationConsumer<T> {
consumerId, message.topic(), message.partition(), message.offset(), message.message());
return (String) message.message();
}
@Override
protected String peekMessage() {
MessageAndMetadata message = (MessageAndMetadata) iterator.peek();
return (String) message.message();
}
}
......@@ -94,10 +94,11 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon
}
@Override
public void remove() {
throw new UnsupportedOperationException("The remove method is not supported.");
public T peek() {
return GSON.fromJson(peekMessage(), type);
}
protected abstract String peekMessage();
// ----- inner class : ImmutableListDeserializer ---------------------------
......
......@@ -17,8 +17,11 @@
package org.apache.atlas.notification;
import java.util.Iterator;
// TODO : docs!
public interface NotificationConsumer<T> extends Iterator<T>{
public interface NotificationConsumer<T>{
boolean hasNext();
T next();
T peek();
}
......@@ -19,10 +19,10 @@ package org.apache.atlas.notification;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import kafka.consumer.ConsumerTimeoutException;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
......@@ -98,6 +98,14 @@ public class NotificationHookConsumer implements Service {
this.consumer = consumer;
}
private boolean hasNext() {
try {
return consumer.hasNext();
} catch(ConsumerTimeoutException e) {
return false;
}
}
@Override
public void run() {
......@@ -105,9 +113,10 @@ public class NotificationHookConsumer implements Service {
return;
}
while(consumer.hasNext()) {
while(true) {
try {
if (hasNext()) {
HookNotification.HookNotificationMessage message = consumer.next();
try {
switch (message.getType()) {
case ENTITY_CREATE:
......@@ -132,7 +141,11 @@ public class NotificationHookConsumer implements Service {
}
} catch (Exception e) {
//todo handle failures
LOG.debug("Error handling message {}", message, e);
LOG.warn("Error handling message {}", message, e);
}
}
} catch(Throwable t) {
LOG.warn("Failure in NotificationHookConsumer", t);
}
}
}
......@@ -150,7 +163,7 @@ public class NotificationHookConsumer implements Service {
return false;
}
}
} catch (AtlasServiceException e) {
} catch (Throwable e) {
LOG.info(
"Handled AtlasServiceException while waiting for Atlas Server to become ready, " +
"exiting consumer thread.", e);
......
......@@ -14,6 +14,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags)
ALL CHANGES:
ATLAS-394 Fix BaseResourceIT.waitForNotification (shwethags via sumasai)
ATLAS-385 Support for Lineage for entities with SuperType as DataSet (anilsg via sumasai)
ATLAS-342 Atlas is sending an ENTITY_CREATE event to the ATLAS_ENTITIES topic even if the entity exists already (shwethags)
ATLAS-386 Handle hive rename Table (shwethags)
......
......@@ -252,7 +252,7 @@ public final class GraphHelper {
*/
public Vertex getVertexForInstanceByUniqueAttribute(ClassType classType, IReferenceableInstance instance)
throws AtlasException {
LOG.debug("Checking if there is an instance with the same unique attributes for instance {}", instance);
Vertex result = null;
for (AttributeInfo attributeInfo : classType.fieldMapping().fields.values()) {
if (attributeInfo.isUnique) {
......
......@@ -231,12 +231,14 @@ public final class TypedInstanceToGraphMapper {
List<ITypedReferenceableInstance> instancesToUpdate = new ArrayList<>();
for (IReferenceableInstance instance : instances) {
LOG.debug("Discovering instance to create/update for {}", instance);
ITypedReferenceableInstance newInstance;
Id id = instance.getId();
if (!idToVertexMap.containsKey(id)) {
Vertex instanceVertex;
if (id.isAssigned()) { // has a GUID
LOG.debug("Instance {} has an assigned id", instance.getId()._getId());
instanceVertex = graphHelper.getVertexForGUID(id.id);
if (!(instance instanceof ReferenceableInstance)) {
throw new IllegalStateException(
......@@ -252,6 +254,7 @@ public final class TypedInstanceToGraphMapper {
//no entity with the given unique attribute, create new
if (instanceVertex == null) {
LOG.debug("Creating new vertex for instance {}", instance);
newInstance = classType.convert(instance, Multiplicity.REQUIRED);
instanceVertex = graphHelper.createVertexWithIdentity(newInstance, classType.getAllSuperTypeNames());
instancesToCreate.add(newInstance);
......@@ -260,6 +263,7 @@ public final class TypedInstanceToGraphMapper {
mapInstanceToVertex(newInstance, instanceVertex, classType.fieldMapping().fields, true, Operation.CREATE);
} else {
LOG.debug("Re-using existing vertex {} for instance {}", instanceVertex.getId(), instance);
if (!(instance instanceof ReferenceableInstance)) {
throw new IllegalStateException(
String.format("%s is not of type ITypedReferenceableInstance", instance));
......
......@@ -66,6 +66,7 @@ atlas.kafka.bootstrap.servers=localhost:19027
atlas.kafka.data=${sys:atlas.data}/kafka
atlas.kafka.zookeeper.session.timeout.ms=400
atlas.kafka.zookeeper.sync.time.ms=20
atlas.kafka.consumer.timeout.ms=100
atlas.kafka.auto.commit.interval.ms=100
atlas.kafka.hook.group.id=atlas
atlas.kafka.entities.group.id=atlas_entities
......
......@@ -35,10 +35,8 @@ import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.apache.atlas.web.resources.BaseResourceIT;
import org.apache.atlas.web.util.Servlets;
import org.junit.AfterClass;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
......@@ -48,7 +46,9 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import static org.testng.Assert.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
/**
* Entity Notification Integration Tests.
......@@ -62,9 +62,9 @@ public class EntityNotificationIT extends BaseResourceIT {
private final String TABLE_NAME = "table" + randomString();
@Inject
private NotificationInterface notificationInterface;
private EntityNotificationConsumer notificationConsumer;
private Id tableId;
private String traitName;
private NotificationConsumer<EntityNotification> notificationConsumer;
@BeforeClass
public void setUp() throws Exception {
......@@ -74,19 +74,7 @@ public class EntityNotificationIT extends BaseResourceIT {
List<NotificationConsumer<EntityNotification>> consumers =
notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
NotificationConsumer<EntityNotification> consumer = consumers.iterator().next();
notificationConsumer = new EntityNotificationConsumer(consumer);
notificationConsumer.start();
}
@AfterClass
public void tearDown() {
notificationConsumer.stop();
}
@BeforeMethod
public void setupTest() {
notificationConsumer.reset();
notificationConsumer = consumers.iterator().next();
}
@Test
......@@ -97,17 +85,8 @@ public class EntityNotificationIT extends BaseResourceIT {
final String guid = tableId._getId();
waitForNotification(notificationConsumer, MAX_WAIT_TIME);
EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
assertNotNull(entityNotification);
assertEquals(EntityNotification.OperationType.ENTITY_CREATE, entityNotification.getOperationType());
IReferenceableInstance entity = entityNotification.getEntity();
assertEquals(HIVE_TABLE_TYPE, entity.getTypeName());
assertEquals(guid, entity.getId()._getId());
waitForNotification(notificationConsumer, MAX_WAIT_TIME,
newNotificationPredicate(EntityNotification.OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE, guid));
}
@Test(dependsOnMethods = "testCreateEntity")
......@@ -119,19 +98,8 @@ public class EntityNotificationIT extends BaseResourceIT {
serviceClient.updateEntityAttribute(guid, property, newValue);
waitForNotification(notificationConsumer, MAX_WAIT_TIME);
EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
assertNotNull(entityNotification);
assertEquals(EntityNotification.OperationType.ENTITY_UPDATE, entityNotification.getOperationType());
IReferenceableInstance entity = entityNotification.getEntity();
assertEquals(HIVE_TABLE_TYPE, entity.getTypeName());
assertEquals(guid, entity.getId()._getId());
assertEquals(newValue, entity.getValuesMap().get(property));
waitForNotification(notificationConsumer, MAX_WAIT_TIME,
newNotificationPredicate(EntityNotification.OperationType.ENTITY_UPDATE, HIVE_TABLE_TYPE, guid));
}
@Test(dependsOnMethods = "testCreateEntity")
......@@ -154,18 +122,10 @@ public class EntityNotificationIT extends BaseResourceIT {
ClientResponse clientResponse = addTrait(guid, traitInstanceJSON);
assertEquals(clientResponse.getStatus(), Response.Status.CREATED.getStatusCode());
waitForNotification(notificationConsumer, MAX_WAIT_TIME);
EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
assertNotNull(entityNotification);
assertEquals(EntityNotification.OperationType.TRAIT_ADD, entityNotification.getOperationType());
EntityNotification entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME,
newNotificationPredicate(EntityNotification.OperationType.TRAIT_ADD, HIVE_TABLE_TYPE, guid));
IReferenceableInstance entity = entityNotification.getEntity();
assertEquals(HIVE_TABLE_TYPE, entity.getTypeName());
assertEquals(guid, entity.getId()._getId());
assertTrue(entity.getTraits().contains(traitName));
List<IStruct> allTraits = entityNotification.getAllTraits();
......@@ -178,9 +138,6 @@ public class EntityNotificationIT extends BaseResourceIT {
assertTrue(allTraitNames.contains(superTraitName));
assertTrue(allTraitNames.contains(superSuperTraitName));
// add another trait with the same super type to the entity
notificationConsumer.reset();
String anotherTraitName = "Trait" + randomString();
createTrait(anotherTraitName, superTraitName);
......@@ -191,12 +148,8 @@ public class EntityNotificationIT extends BaseResourceIT {
clientResponse = addTrait(guid, traitInstanceJSON);
assertEquals(clientResponse.getStatus(), Response.Status.CREATED.getStatusCode());
waitForNotification(notificationConsumer, MAX_WAIT_TIME);
entityNotification = notificationConsumer.getLastEntityNotification();
assertNotNull(entityNotification);
assertEquals(EntityNotification.OperationType.TRAIT_ADD, entityNotification.getOperationType());
entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME,
newNotificationPredicate(EntityNotification.OperationType.TRAIT_ADD, HIVE_TABLE_TYPE, guid));
allTraits = entityNotification.getAllTraits();
allTraitNames = new LinkedList<>();
......@@ -217,20 +170,10 @@ public class EntityNotificationIT extends BaseResourceIT {
ClientResponse clientResponse = deleteTrait(guid, traitName);
Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
waitForNotification(notificationConsumer, MAX_WAIT_TIME);
EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
assertNotNull(entityNotification);
assertEquals(EntityNotification.OperationType.TRAIT_DELETE,
entityNotification.getOperationType());
IReferenceableInstance entity = entityNotification.getEntity();
assertEquals(HIVE_TABLE_TYPE, entity.getTypeName());
assertEquals(guid, entity.getId()._getId());
EntityNotification entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME,
newNotificationPredicate(EntityNotification.OperationType.TRAIT_DELETE, HIVE_TABLE_TYPE, guid));
assertFalse(entity.getTraits().contains(traitName));
assertFalse(entityNotification.getEntity().getTraits().contains(traitName));
}
......
......@@ -59,7 +59,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
sendHookMessage(new HookNotification.EntityCreateRequest(entity));
waitFor(1000, new Predicate() {
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
......@@ -80,7 +80,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
final Referenceable newEntity = new Referenceable(DATABASE_TYPE);
newEntity.set("owner", randomString());
sendHookMessage(new HookNotification.EntityPartialUpdateRequest(DATABASE_TYPE, "name", dbName, newEntity));
waitFor(1000, new Predicate() {
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
Referenceable localEntity = serviceClient.getEntity(DATABASE_TYPE, "name", dbName);
......@@ -106,7 +106,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
newEntity.set("name", newName);
sendHookMessage(new HookNotification.EntityPartialUpdateRequest(DATABASE_TYPE, "name", dbName, newEntity));
waitFor(1000, new Predicate() {
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
......@@ -136,7 +136,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
//updating unique attribute
sendHookMessage(new HookNotification.EntityUpdateRequest(newEntity));
waitFor(1000, new Predicate() {
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
......
......@@ -24,7 +24,10 @@ import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import org.apache.atlas.*;
import kafka.consumer.ConsumerTimeoutException;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.entity.EntityNotification;
import org.apache.atlas.typesystem.Referenceable;
......@@ -43,6 +46,7 @@ import org.apache.atlas.typesystem.types.IDataType;
import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.StructTypeDefinition;
import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.TypeUtils;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.apache.atlas.utils.ParamChecker;
import org.apache.atlas.web.util.Servlets;
......@@ -272,6 +276,17 @@ public abstract class BaseResourceIT {
boolean evaluate() throws Exception;
}
public interface NotificationPredicate {
/**
* Perform a predicate evaluation.
*
* @return the boolean result of the evaluation.
* @throws Exception thrown if the predicate evaluation could not evaluate.
*/
boolean evaluate(EntityNotification notification) throws Exception;
}
/**
* Wait for a condition, expressed via a {@link Predicate} to become true.
*
......@@ -292,49 +307,40 @@ public abstract class BaseResourceIT {
}
}
// ----- inner class : EntityNotificationConsumer --------------------------
protected static class EntityNotificationConsumer implements Runnable {
private final NotificationConsumer<EntityNotification> consumerIterator;
private EntityNotification entityNotification = null;
private boolean run;
public EntityNotificationConsumer(NotificationConsumer<EntityNotification> consumerIterator) {
this.consumerIterator = consumerIterator;
}
protected EntityNotification waitForNotification(final NotificationConsumer<EntityNotification> consumer, int maxWait,
final NotificationPredicate predicate) throws Exception {
final TypeUtils.Pair<EntityNotification, String> pair = TypeUtils.Pair.of(null, null);
final long maxCurrentTime = System.currentTimeMillis() + maxWait;
waitFor(maxWait, new Predicate() {
@Override
public void run() {
while (run && consumerIterator.hasNext()) {
entityNotification = consumerIterator.next();
public boolean evaluate() throws Exception {
try {
while (consumer.hasNext() && System.currentTimeMillis() < maxCurrentTime) {
EntityNotification notification = consumer.next();
if (predicate.evaluate(notification)) {
pair.left = notification;
return true;
}
}
public void reset() {
entityNotification = null;
} catch(ConsumerTimeoutException e) {
//ignore
}
public void start() {
Thread thread = new Thread(this);
run = true;
thread.start();
}
public void stop() {
run = false;
}
public EntityNotification getLastEntityNotification() {
return entityNotification;
return false;
}
});
return pair.left;
}
protected void waitForNotification(final EntityNotificationConsumer notificationConsumer, int maxWait) throws Exception {
waitFor(maxWait, new Predicate() {
protected NotificationPredicate newNotificationPredicate(final EntityNotification.OperationType operationType,
final String typeName, final String guid) {
return new NotificationPredicate() {
@Override
public boolean evaluate() throws Exception {
return notificationConsumer.getLastEntityNotification() != null;
public boolean evaluate(EntityNotification notification) throws Exception {
return notification != null &&
notification.getOperationType() == operationType &&
notification.getEntity().getTypeName().equals(typeName) &&
notification.getEntity().getId()._getId().equals(guid);
}
});
};
}
}
......@@ -48,12 +48,10 @@ import org.apache.atlas.web.util.Servlets;
import org.apache.commons.lang.RandomStringUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.junit.AfterClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
......@@ -67,7 +65,6 @@ import java.util.Map;
import java.util.UUID;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
/**
......@@ -89,7 +86,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
@Inject
private NotificationInterface notificationInterface;
private EntityNotificationConsumer notificationConsumer;
private NotificationConsumer<EntityNotification> notificationConsumer;
@BeforeClass
public void setUp() throws Exception {
......@@ -100,19 +97,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
List<NotificationConsumer<EntityNotification>> consumers =
notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
NotificationConsumer<EntityNotification> consumer = consumers.iterator().next();
notificationConsumer = new EntityNotificationConsumer(consumer);
notificationConsumer.start();
}
@AfterClass
public void tearDown() {
notificationConsumer.stop();
}
@BeforeMethod
public void setupTest() {
notificationConsumer.reset();
notificationConsumer = consumers.iterator().next();
}
@Test
......@@ -158,20 +143,26 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
serviceClient.createEntity(db).getString(0);
waitForNotification(notificationConsumer, MAX_WAIT_TIME);
EntityNotification notification = notificationConsumer.getLastEntityNotification();
assertNotNull(notification);
assertEquals(notification.getEntity().get("name"), dbName);
waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() {
@Override
public boolean evaluate(EntityNotification notification) throws Exception {
return notification != null && notification.getEntity().get("name").equals(dbName);
}
});
JSONArray results =
serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
assertEquals(results.length(), 1);
//create entity again shouldn't create another instance with same unique attribute value
notificationConsumer.reset();
serviceClient.createEntity(db);
try {
waitForNotification(notificationConsumer, MAX_WAIT_TIME);
waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() {
@Override
public boolean evaluate(EntityNotification notification) throws Exception {
return notification != null && notification.getEntity().get("name").equals(dbName);
}
});
fail("Expected time out exception");
} catch (Exception e) {
//expected timeout
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment