Commit 705014eb by Shwetha GS

ATLAS-716 Entity update/delete notifications (shwethags)

parent 153fc362
......@@ -188,7 +188,7 @@ public class HiveMetaStoreBridge {
List<String> guids = getAtlasClient().createEntity(entityJSON);
LOG.debug("created instance for type " + typeName + ", guid: " + guids);
return new Referenceable(guids.get(0), referenceable.getTypeName(), null);
return new Referenceable(guids.get(guids.size() - 1), referenceable.getTypeName(), null);
}
/**
......
......@@ -18,16 +18,14 @@
package org.apache.atlas;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.commons.lang.StringUtils;
/**
* Structure of entity audit event
*/
public class EntityAuditEvent {
public static final Gson GSON = new GsonBuilder().create();
public enum EntityAuditAction {
ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, TAG_ADD, TAG_DELETE
}
......@@ -38,16 +36,19 @@ public class EntityAuditEvent {
private EntityAuditAction action;
private String details;
private String eventKey;
private IReferenceableInstance entityDefinition;
public EntityAuditEvent() {
}
public EntityAuditEvent(String entityId, Long ts, String user, EntityAuditAction action, String details) {
public EntityAuditEvent(String entityId, Long ts, String user, EntityAuditAction action, String details,
IReferenceableInstance entityDefinition) throws AtlasException {
this.entityId = entityId;
this.timestamp = ts;
this.user = user;
this.action = action;
this.details = details;
this.entityDefinition = entityDefinition;
}
@Override
......@@ -63,9 +64,11 @@ public class EntityAuditEvent {
EntityAuditEvent otherEvent = (EntityAuditEvent) other;
return StringUtils.equals(entityId, otherEvent.entityId) &&
(timestamp == otherEvent.timestamp) &&
StringUtils.equals(user, otherEvent.user) && (action == otherEvent.action) &&
StringUtils.equals(user, otherEvent.user) &&
(action == otherEvent.action) &&
StringUtils.equals(details, otherEvent.details) &&
StringUtils.equals(eventKey, otherEvent.eventKey);
StringUtils.equals(eventKey, otherEvent.eventKey) &&
StringUtils.equals(getEntityDefinitionString(), otherEvent.getEntityDefinitionString());
}
@Override
......@@ -75,11 +78,11 @@ public class EntityAuditEvent {
@Override
public String toString() {
return GSON.toJson(this);
return SerDe.GSON.toJson(this);
}
public static EntityAuditEvent fromString(String eventString) {
return GSON.fromJson(eventString, EntityAuditEvent.class);
return SerDe.GSON.fromJson(eventString, EntityAuditEvent.class);
}
public String getEntityId() {
......@@ -129,4 +132,19 @@ public class EntityAuditEvent {
public void setEventKey(String eventKey) {
this.eventKey = eventKey;
}
public IReferenceableInstance getEntityDefinition() {
return entityDefinition;
}
public String getEntityDefinitionString() {
if (entityDefinition != null) {
return InstanceSerialization.toJson(entityDefinition, true);
}
return null;
}
public void setEntityDefinition(String entityDefinition) {
this.entityDefinition = InstanceSerialization.fromJsonReferenceable(entityDefinition, true);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import java.lang.reflect.Type;
public class SerDe {
public static final Gson GSON = new GsonBuilder().
registerTypeAdapter(IStruct.class, new StructDeserializer()).
registerTypeAdapter(IReferenceableInstance.class, new ReferenceableSerializerDeserializer()).
registerTypeAdapter(Referenceable.class, new ReferenceableSerializerDeserializer()).
create();
/**
* Serde for Struct used by AbstractNotificationConsumer.GSON.
*/
public static final class StructDeserializer implements JsonDeserializer<IStruct>, JsonSerializer<IStruct> {
@Override
public IStruct deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) {
return context.deserialize(json, Struct.class);
}
@Override
public JsonElement serialize(IStruct src, Type typeOfSrc, JsonSerializationContext context) {
String instanceJson = InstanceSerialization.toJson(src, true);
return new JsonParser().parse(instanceJson).getAsJsonObject();
}
}
/**
* Serde for Referenceable used by AbstractNotificationConsumer.GSON.
*/
public static final class ReferenceableSerializerDeserializer implements JsonDeserializer<IStruct>,
JsonSerializer<IReferenceableInstance> {
@Override
public IReferenceableInstance deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) {
return InstanceSerialization.fromJsonReferenceable(json.toString(), true);
}
@Override
public JsonElement serialize(IReferenceableInstance src, Type typeOfSrc, JsonSerializationContext context) {
String instanceJson = InstanceSerialization.toJson(src, true);
return new JsonParser().parse(instanceJson).getAsJsonObject();
}
}
}
......@@ -21,8 +21,12 @@ import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jettison.json.JSONObject;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
......@@ -33,9 +37,12 @@ import javax.ws.rs.core.UriBuilder;
import java.net.ConnectException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
......@@ -76,6 +83,25 @@ public class AtlasClientTest {
assertTrue(atlasClient.isServerReady());
}
@Test
public void testCreateEntity() throws Exception {
setupRetryParams();
AtlasClient atlasClient = new AtlasClient(service, configuration);
WebResource.Builder builder = setupBuilder(AtlasClient.API.CREATE_ENTITY, service);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.CREATED.getStatusCode());
JSONObject jsonResponse = new JSONObject(new AtlasClient.EntityResult(Arrays.asList("id"), null, null).toString());
when(response.getEntity(String.class)).thenReturn(jsonResponse.toString());
String entityJson = InstanceSerialization.toJson(new Referenceable("type"), true);
when(builder.method(anyString(), Matchers.<Class>any(), anyString())).thenReturn(response);
List<String> ids = atlasClient.createEntity(entityJson);
assertEquals(ids.size(), 1);
assertEquals(ids.get(0), "id");
}
private WebResource.Builder setupBuilder(AtlasClient.API api, WebResource webResource) {
when(webResource.path(api.getPath())).thenReturn(service);
WebResource.Builder builder = getBuilder(service);
......
......@@ -97,6 +97,11 @@ public class MessageVersion implements Comparable<MessageVersion> {
}
@Override
public String toString() {
return "MessageVersion[version=" + version + "]";
}
// ----- helper methods --------------------------------------------------
/**
......
......@@ -17,11 +17,11 @@
*/
package org.apache.atlas.notification;
import com.google.gson.reflect.TypeToken;
import org.apache.atlas.notification.entity.EntityMessageDeserializer;
import org.apache.atlas.notification.entity.EntityNotification;
import org.apache.atlas.notification.hook.HookMessageDeserializer;
import org.apache.atlas.notification.hook.HookNotification;
import com.google.gson.reflect.TypeToken;
import java.lang.reflect.Type;
import java.util.List;
......
......@@ -31,7 +31,7 @@ import java.lang.reflect.Type;
public abstract class VersionedMessageDeserializer<T> implements MessageDeserializer<T> {
public static final String VERSION_MISMATCH_MSG =
"Notification message version mismatch. Expected %s but recieved %s";
"Notification message version mismatch. Expected %s but recieved %s. Message %s";
private final Type versionedMessageType;
private final MessageVersion expectedVersion;
......@@ -90,18 +90,16 @@ public abstract class VersionedMessageDeserializer<T> implements MessageDeserial
// message has newer version
if (comp > 0) {
String msg = String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion());
String msg =
String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion(), messageJson);
notificationLogger.error(msg);
notificationLogger.info(messageJson);
throw new IncompatibleVersionException(msg);
}
// message has older version
if (comp < 0) {
notificationLogger.info(
String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion()));
notificationLogger.info(messageJson);
notificationLogger.info(String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion(),
messageJson));
}
}
}
......@@ -27,9 +27,13 @@ import java.lang.reflect.Type;
import java.util.LinkedList;
import java.util.List;
import static org.mockito.Matchers.endsWith;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
/**
* AbstractNotificationConsumer tests.
......@@ -110,17 +114,17 @@ public class AbstractNotificationConsumerTest {
assertTrue(consumer.hasNext());
assertEquals(new TestMessage("sValue2", 98), consumer.next());
verify(logger).info(json2);
verify(logger).info(endsWith(json2));
assertTrue(consumer.hasNext());
assertEquals(new TestMessage("sValue3", 97), consumer.next());
verify(logger).info(json3);
verify(logger).info(endsWith(json3));
assertTrue(consumer.hasNext());
assertEquals(new TestMessage("sValue4", 96), consumer.next());
verify(logger).info(json4);
verify(logger).info(endsWith(json4));
assertFalse(consumer.hasNext());
}
......@@ -154,7 +158,7 @@ public class AbstractNotificationConsumerTest {
consumer.next();
fail("Expected VersionMismatchException!");
} catch (IncompatibleVersionException e) {
verify(logger).info(json2);
verify(logger).error(endsWith(json2));
}
assertFalse(consumer.hasNext());
......
......@@ -3,6 +3,7 @@ Apache Atlas Release Notes
--trunk - unreleased
INCOMPATIBLE CHANGES:
ATLAS-716 Entity update/delete notifications (shwethags)
ATLAS-619 Canonicalize hive queries (sumasai)
ATLAS-497 Simple Authorization (saqeeb.s via yhemanth)
ATLAS-661 REST API Authentication (nixonrodrigues via yhemanth)
......
......@@ -18,6 +18,7 @@
package org.apache.atlas.repository;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.ITypedStruct;
......@@ -26,7 +27,6 @@ import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.typesystem.exception.TraitNotFoundException;
import org.apache.atlas.typesystem.types.AttributeInfo;
import org.apache.atlas.typesystem.types.IDataType;
import org.apache.atlas.typesystem.types.TypeUtils;
import java.util.List;
......@@ -111,7 +111,7 @@ public interface MetadataRepository {
* @return guids of deleted entities
* @throws RepositoryException
*/
TypeUtils.Pair<List<String>, List<ITypedReferenceableInstance>> deleteEntities(List<String> guids) throws RepositoryException;
AtlasClient.EntityResult deleteEntities(List<String> guids) throws RepositoryException;
// Trait management functions
......@@ -147,13 +147,13 @@ public interface MetadataRepository {
* Adds/Updates the property to the entity that corresponds to the GUID
* Supports only primitive attribute/Class Id updations.
*/
TypeUtils.Pair<List<String>, List<String>> updatePartial(ITypedReferenceableInstance entity) throws RepositoryException;
AtlasClient.EntityResult updatePartial(ITypedReferenceableInstance entity) throws RepositoryException;
/**
* Adds the property to the entity that corresponds to the GUID
* @param entitiesToBeUpdated The entities to be updated
*/
TypeUtils.Pair<List<String>, List<String>> updateEntities(ITypedReferenceableInstance... entitiesToBeUpdated) throws RepositoryException;
AtlasClient.EntityResult updateEntities(ITypedReferenceableInstance... entitiesToBeUpdated) throws RepositoryException;
/**
* Returns the entity for the given type and qualified name
......
......@@ -55,8 +55,9 @@ public class EntityAuditListener implements EntityChangeListener {
}
private EntityAuditEvent createEvent(ITypedReferenceableInstance entity, long ts,
EntityAuditEvent.EntityAuditAction action, String details) {
return new EntityAuditEvent(entity.getId()._getId(), ts, RequestContext.get().getUser(), action, details);
EntityAuditEvent.EntityAuditAction action, String details)
throws AtlasException {
return new EntityAuditEvent(entity.getId()._getId(), ts, RequestContext.get().getUser(), action, details, entity);
}
@Override
......
......@@ -78,6 +78,7 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
public static final byte[] COLUMN_ACTION = Bytes.toBytes("action");
public static final byte[] COLUMN_DETAIL = Bytes.toBytes("detail");
public static final byte[] COLUMN_USER = Bytes.toBytes("user");
public static final byte[] COLUMN_DEFINITION = Bytes.toBytes("def");
private TableName tableName;
private Connection connection;
......@@ -110,6 +111,7 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
addColumn(put, COLUMN_ACTION, event.getAction());
addColumn(put, COLUMN_USER, event.getUser());
addColumn(put, COLUMN_DETAIL, event.getDetails());
addColumn(put, COLUMN_DEFINITION, event.getEntityDefinitionString());
puts.add(put);
}
table.put(puts);
......@@ -183,6 +185,7 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
event.setUser(getResultString(result, COLUMN_USER));
event.setAction(EntityAuditEvent.EntityAuditAction.valueOf(getResultString(result, COLUMN_ACTION)));
event.setDetails(getResultString(result, COLUMN_DETAIL));
event.setEntityDefinition(getResultString(result, COLUMN_DEFINITION));
events.add(event);
}
LOG.info("Got events for entity id {}, starting timestamp {}, #records {}", entityId, startKey, events.size());
......
......@@ -22,8 +22,10 @@ import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.thinkaurelius.titan.core.TitanGraph;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.GraphQuery;
import com.tinkerpop.blueprints.Vertex;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.GraphTransaction;
import org.apache.atlas.RequestContext;
......@@ -40,7 +42,6 @@ import org.apache.atlas.typesystem.types.ClassType;
import org.apache.atlas.typesystem.types.DataTypes;
import org.apache.atlas.typesystem.types.IDataType;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.typesystem.types.TypeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -258,8 +259,8 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
try {
final String entityTypeName = GraphHelper.getTypeName(instanceVertex);
String relationshipLabel = GraphHelper.getTraitLabel(entityTypeName, traitNameToBeDeleted);
deleteHandler.deleteReference(instanceVertex, relationshipLabel, DataTypes.TypeCategory.TRAIT);
Edge edge = GraphHelper.getEdgeForLabel(instanceVertex, relationshipLabel);
deleteHandler.deleteEdgeReference(edge, DataTypes.TypeCategory.TRAIT, false, true);
// update the traits in entity once trait removal is successful
traitNames.remove(traitNameToBeDeleted);
......@@ -284,14 +285,15 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@Override
@GraphTransaction
public TypeUtils.Pair<List<String>, List<String>> updateEntities(ITypedReferenceableInstance... entitiesUpdated) throws RepositoryException {
public AtlasClient.EntityResult updateEntities(ITypedReferenceableInstance... entitiesUpdated) throws RepositoryException {
LOG.info("updating entity {}", entitiesUpdated);
try {
TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper(graphToInstanceMapper, deleteHandler);
instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.UPDATE_FULL,
entitiesUpdated);
RequestContext requestContext = RequestContext.get();
return TypeUtils.Pair.of(requestContext.getCreatedEntityIds(), requestContext.getUpdatedEntityIds());
return new AtlasClient.EntityResult(requestContext.getCreatedEntityIds(),
requestContext.getUpdatedEntityIds(), requestContext.getDeletedEntityIds());
} catch (AtlasException e) {
throw new RepositoryException(e);
}
......@@ -299,13 +301,14 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@Override
@GraphTransaction
public TypeUtils.Pair<List<String>, List<String>> updatePartial(ITypedReferenceableInstance entity) throws RepositoryException {
public AtlasClient.EntityResult updatePartial(ITypedReferenceableInstance entity) throws RepositoryException {
LOG.info("updating entity {}", entity);
try {
TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper(graphToInstanceMapper, deleteHandler);
instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.UPDATE_PARTIAL, entity);
RequestContext requestContext = RequestContext.get();
return TypeUtils.Pair.of(requestContext.getCreatedEntityIds(), requestContext.getUpdatedEntityIds());
return new AtlasClient.EntityResult(requestContext.getCreatedEntityIds(),
requestContext.getUpdatedEntityIds(), requestContext.getDeletedEntityIds());
} catch (AtlasException e) {
throw new RepositoryException(e);
}
......@@ -313,7 +316,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@Override
@GraphTransaction
public TypeUtils.Pair<List<String>, List<ITypedReferenceableInstance>> deleteEntities(List<String> guids) throws RepositoryException {
public AtlasClient.EntityResult deleteEntities(List<String> guids) throws RepositoryException {
if (guids == null || guids.size() == 0) {
throw new IllegalArgumentException("guids must be non-null and non-empty");
......@@ -337,6 +340,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
}
}
RequestContext requestContext = RequestContext.get();
return new TypeUtils.Pair<>(requestContext.getDeletedEntityIds(), requestContext.getDeletedEntities());
return new AtlasClient.EntityResult(requestContext.getCreatedEntityIds(),
requestContext.getUpdatedEntityIds(), requestContext.getDeletedEntityIds());
}
}
......@@ -107,11 +107,13 @@ public final class GraphHelper {
// add timestamp information
setProperty(vertexWithoutIdentity, Constants.TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
setProperty(vertexWithoutIdentity, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY,
RequestContext.get().getRequestTime());
return vertexWithoutIdentity;
}
public Edge addEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) {
private Edge addEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) {
LOG.debug("Adding edge for {} -> label {} -> {}", string(fromVertex), edgeLabel, string(toVertex));
Edge edge = titanGraph.addEdge(null, fromVertex, toVertex, edgeLabel);
......@@ -127,12 +129,34 @@ public final class GraphHelper {
Iterable<Edge> edges = inVertex.getEdges(Direction.IN, edgeLabel);
for (Edge edge : edges) {
if (edge.getVertex(Direction.OUT).getId().toString().equals(outVertex.getId().toString())) {
Id.EntityState edgeState = getState(edge);
if (edgeState == null || edgeState == Id.EntityState.ACTIVE) {
return edge;
}
}
}
return addEdge(outVertex, inVertex, edgeLabel);
}
public Edge getEdgeByEdgeId(Vertex outVertex, String edgeLabel, String edgeId) {
if (edgeId == null) {
return null;
}
return titanGraph.getEdge(edgeId);
//TODO get edge id is expensive. Use this logic. But doesn't work for now
/**
Iterable<Edge> edges = outVertex.getEdges(Direction.OUT, edgeLabel);
for (Edge edge : edges) {
if (edge.getId().toString().equals(edgeId)) {
return edge;
}
}
return null;
**/
}
/**
* Args of the format prop1, key1, prop2, key2...
* Searches for a vertex with prop1=key1 && prop2=key2
......@@ -180,15 +204,14 @@ public final class GraphHelper {
* @return
*/
public static Edge getEdgeForLabel(Vertex vertex, String edgeLabel) {
String vertexState = vertex.getProperty(Constants.STATE_PROPERTY_KEY);
Iterator<Edge> iterator = GraphHelper.getOutGoingEdgesByLabel(vertex, edgeLabel);
Edge latestDeletedEdge = null;
long latestDeletedEdgeTime = Long.MIN_VALUE;
while (iterator != null && iterator.hasNext()) {
Edge edge = iterator.next();
String edgeState = edge.getProperty(Constants.STATE_PROPERTY_KEY);
if (edgeState == null || Id.EntityState.ACTIVE.name().equals(edgeState)) {
Id.EntityState edgeState = getState(edge);
if (edgeState == null || edgeState == Id.EntityState.ACTIVE) {
LOG.debug("Found {}", string(edge));
return edge;
} else {
......@@ -201,21 +224,10 @@ public final class GraphHelper {
}
//If the vertex is deleted, return latest deleted edge
if (Id.EntityState.DELETED.equals(vertexState)) {
LOG.debug("Found {}", string(latestDeletedEdge));
LOG.debug("Found {}", latestDeletedEdge == null ? "null" : string(latestDeletedEdge));
return latestDeletedEdge;
}
return null;
}
public Edge getEdgeById(String edgeId) {
if(edgeId != null) {
return titanGraph.getEdge(edgeId);
}
return null;
}
public static String vertexString(final Vertex vertex) {
StringBuilder properties = new StringBuilder();
for (String propertyKey : vertex.getPropertyKeys()) {
......@@ -343,6 +355,15 @@ public final class GraphHelper {
return instanceVertex.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY);
}
public static Id.EntityState getState(Element element) {
String state = getStateAsString(element);
return state == null ? null : Id.EntityState.valueOf(state);
}
public static String getStateAsString(Element element) {
return element.getProperty(Constants.STATE_PROPERTY_KEY);
}
/**
* For the given type, finds an unique attribute and checks if there is an existing instance with the same
* unique value
......
......@@ -68,9 +68,9 @@ public final class GraphToTypedInstanceMapper {
LOG.debug("Mapping graph root vertex {} to typed instance for guid {}", instanceVertex, guid);
String typeName = instanceVertex.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY);
List<String> traits = GraphHelper.getTraitNames(instanceVertex);
String state = GraphHelper.getStateAsString(instanceVertex);
Id id = new Id(guid, instanceVertex.<Integer>getProperty(Constants.VERSION_PROPERTY_KEY), typeName,
instanceVertex.<String>getProperty(Constants.STATE_PROPERTY_KEY));
Id id = new Id(guid, instanceVertex.<Integer>getProperty(Constants.VERSION_PROPERTY_KEY), typeName, state);
LOG.debug("Created id {} for instance type {}", id, typeName);
ClassType classType = typeSystem.getDataType(ClassType.class, typeName);
......@@ -161,9 +161,9 @@ public final class GraphToTypedInstanceMapper {
Edge edge;
if (edgeId == null) {
edge = GraphHelper.getEdgeForLabel(instanceVertex, relationshipLabel);;
edge = GraphHelper.getEdgeForLabel(instanceVertex, relationshipLabel);
} else {
edge = graphHelper.getEdgeById(edgeId);
edge = graphHelper.getEdgeByEdgeId(instanceVertex, relationshipLabel, edgeId);
}
if (edge != null) {
......@@ -175,9 +175,10 @@ public final class GraphToTypedInstanceMapper {
LOG.debug("Found composite, mapping vertex to instance");
return mapGraphToTypedInstance(guid, referenceVertex);
} else {
String state = GraphHelper.getStateAsString(referenceVertex);
Id referenceId =
new Id(guid, referenceVertex.<Integer>getProperty(Constants.VERSION_PROPERTY_KEY),
dataType.getName());
dataType.getName(), state);
LOG.debug("Found non-composite, adding id {} ", referenceId);
return referenceId;
}
......@@ -271,7 +272,7 @@ public final class GraphToTypedInstanceMapper {
if (edgeId == null) {
edge = GraphHelper.getEdgeForLabel(instanceVertex, relationshipLabel);
} else {
edge = graphHelper.getEdgeById(edgeId);
edge = graphHelper.getEdgeByEdgeId(instanceVertex, relationshipLabel, edgeId);
}
if (edge != null) {
......
......@@ -26,20 +26,18 @@ import org.apache.atlas.typesystem.types.TypeSystem;
public class HardDeleteHandler extends DeleteHandler {
private static final GraphHelper graphHelper = GraphHelper.getInstance();
@Inject
public HardDeleteHandler(TypeSystem typeSystem) {
super(typeSystem, true);
super(typeSystem, true, false);
}
@Override
protected void _deleteVertex(Vertex instanceVertex) {
protected void _deleteVertex(Vertex instanceVertex, boolean force) {
graphHelper.removeVertex(instanceVertex);
}
@Override
protected void deleteEdge(Edge edge) throws AtlasException {
protected void deleteEdge(Edge edge, boolean force) throws AtlasException {
graphHelper.removeEdge(edge);
}
}
......@@ -32,24 +32,34 @@ import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
public class SoftDeleteHandler extends DeleteHandler {
@Inject
public SoftDeleteHandler(TypeSystem typeSystem) {
super(typeSystem, false);
super(typeSystem, false, true);
}
@Override
protected void _deleteVertex(Vertex instanceVertex) {
Id.EntityState state = Id.EntityState.valueOf((String) instanceVertex.getProperty(STATE_PROPERTY_KEY));
protected void _deleteVertex(Vertex instanceVertex, boolean force) {
if (force) {
graphHelper.removeVertex(instanceVertex);
} else {
Id.EntityState state = GraphHelper.getState(instanceVertex);
if (state != Id.EntityState.DELETED) {
GraphHelper.setProperty(instanceVertex, STATE_PROPERTY_KEY, Id.EntityState.DELETED.name());
GraphHelper.setProperty(instanceVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
GraphHelper.setProperty(instanceVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY,
RequestContext.get().getRequestTime());
}
}
}
@Override
protected void deleteEdge(Edge edge) throws AtlasException {
Id.EntityState state = Id.EntityState.valueOf((String) edge.getProperty(STATE_PROPERTY_KEY));
protected void deleteEdge(Edge edge, boolean force) throws AtlasException {
if (force) {
graphHelper.removeEdge(edge);
} else {
Id.EntityState state = GraphHelper.getState(edge);
if (state != Id.EntityState.DELETED) {
GraphHelper.setProperty(edge, STATE_PROPERTY_KEY, Id.EntityState.DELETED.name());
GraphHelper.setProperty(edge, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
GraphHelper
.setProperty(edge, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
}
}
}
}
......@@ -69,6 +69,8 @@ public class GraphBackedTypeStore implements ITypeStore {
private final TitanGraph titanGraph;
private GraphHelper graphHelper = GraphHelper.getInstance();
@Inject
public GraphBackedTypeStore(GraphProvider<TitanGraph> graphProvider) {
titanGraph = graphProvider.get();
......@@ -155,7 +157,7 @@ public class GraphBackedTypeStore implements ITypeStore {
for (String superTypeName : superTypes) {
HierarchicalType superType = typeSystem.getDataType(HierarchicalType.class, superTypeName);
Vertex superVertex = createVertex(superType.getTypeCategory(), superTypeName, superType.getDescription());
addEdge(vertex, superVertex, SUPERTYPE_EDGE_LABEL);
graphHelper.getOrCreateEdge(vertex, superVertex, SUPERTYPE_EDGE_LABEL);
}
}
}
......@@ -200,24 +202,9 @@ public class GraphBackedTypeStore implements ITypeStore {
if (!coreTypes.contains(attrType.getName())) {
Vertex attrVertex = createVertex(attrType.getTypeCategory(), attrType.getName(), attrType.getDescription());
String label = getEdgeLabel(vertexTypeName, attribute.name);
addEdge(vertex, attrVertex, label);
}
}
}
private void addEdge(Vertex fromVertex, Vertex toVertex, String label) {
Iterator<Edge> edges = GraphHelper.getOutGoingEdgesByLabel(fromVertex, label);
// ATLAS-474: Check if this type system edge already exists, to avoid duplicates.
while (edges.hasNext()) {
Edge edge = edges.next();
if (edge.getVertex(Direction.IN).equals(toVertex)) {
LOG.debug("Edge from {} to {} with label {} already exists",
toString(fromVertex), toString(toVertex), label);
return;
graphHelper.getOrCreateEdge(vertex, attrVertex, label);
}
}
LOG.debug("Adding edge from {} to {} with label {}", toString(fromVertex), toString(toVertex), label);
titanGraph.addEdge(null, fromVertex, toVertex, label);
}
@Override
......
......@@ -26,6 +26,7 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.RequestContext;
import org.apache.atlas.classification.InterfaceAudience;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
......@@ -58,8 +59,6 @@ import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.StructTypeDefinition;
import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.typesystem.types.TypeUtils;
import org.apache.atlas.typesystem.types.TypeUtils.Pair;
import org.apache.atlas.typesystem.types.ValueConversionException;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.apache.atlas.utils.ParamChecker;
......@@ -305,16 +304,15 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
* Creates an entity, instance of the type.
*
* @param entityInstanceDefinition json array of entity definitions
* @return guids - json array of guids
* @return guids - list of guids
*/
@Override
public String createEntities(String entityInstanceDefinition) throws AtlasException {
public List<String> createEntities(String entityInstanceDefinition) throws AtlasException {
ParamChecker.notEmpty(entityInstanceDefinition, "Entity instance definition");
ITypedReferenceableInstance[] typedInstances = deserializeClassInstances(entityInstanceDefinition);
List<String> guids = createEntities(typedInstances);
return new JSONArray(guids).toString();
return createEntities(typedInstances);
}
public List<String> createEntities(ITypedReferenceableInstance[] typedInstances) throws AtlasException {
......@@ -422,25 +420,26 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
* @return guids - json array of guids
*/
@Override
public String updateEntities(String entityInstanceDefinition) throws AtlasException {
public AtlasClient.EntityResult updateEntities(String entityInstanceDefinition) throws AtlasException {
ParamChecker.notEmpty(entityInstanceDefinition, "Entity instance definition");
ITypedReferenceableInstance[] typedInstances = deserializeClassInstances(entityInstanceDefinition);
TypeUtils.Pair<List<String>, List<String>> guids = repository.updateEntities(typedInstances);
return onEntitiesAddedUpdated(guids);
AtlasClient.EntityResult entityResult = repository.updateEntities(typedInstances);
onEntitiesAddedUpdated(entityResult);
return entityResult;
}
private String onEntitiesAddedUpdated(TypeUtils.Pair<List<String>, List<String>> guids) throws AtlasException {
onEntitiesAdded(guids.left);
onEntitiesUpdated(guids.right);
guids.left.addAll(guids.right);
return new JSONArray(guids.left).toString();
private void onEntitiesAddedUpdated(AtlasClient.EntityResult entityResult) throws AtlasException {
onEntitiesAdded(entityResult.getCreatedEntities());
onEntitiesUpdated(entityResult.getUpdateEntities());
//Note: doesn't access deletedEntities from entityResult
onEntitiesDeleted(RequestContext.get().getDeletedEntities());
}
@Override
public String updateEntityAttributeByGuid(final String guid, String attributeName, String value) throws AtlasException {
public AtlasClient.EntityResult updateEntityAttributeByGuid(final String guid, String attributeName,
String value) throws AtlasException {
ParamChecker.notEmpty(guid, "entity id");
ParamChecker.notEmpty(attributeName, "attribute name");
ParamChecker.notEmpty(value, "attribute value");
......@@ -469,8 +468,9 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
}
((ReferenceableInstance)newInstance).replaceWithNewId(new Id(guid, 0, newInstance.getTypeName()));
TypeUtils.Pair<List<String>, List<String>> guids = repository.updatePartial(newInstance);
return onEntitiesAddedUpdated(guids);
AtlasClient.EntityResult entityResult = repository.updatePartial(newInstance);
onEntitiesAddedUpdated(entityResult);
return entityResult;
}
private ITypedReferenceableInstance validateEntityExists(String guid)
......@@ -483,7 +483,8 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
}
@Override
public String updateEntityPartialByGuid(final String guid, Referenceable newEntity) throws AtlasException {
public AtlasClient.EntityResult updateEntityPartialByGuid(final String guid, Referenceable newEntity)
throws AtlasException {
ParamChecker.notEmpty(guid, "guid cannot be null");
ParamChecker.notNull(newEntity, "updatedEntity cannot be null");
ITypedReferenceableInstance existInstance = validateEntityExists(guid);
......@@ -491,11 +492,13 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
ITypedReferenceableInstance newInstance = convertToTypedInstance(newEntity, existInstance.getTypeName());
((ReferenceableInstance)newInstance).replaceWithNewId(new Id(guid, 0, newInstance.getTypeName()));
TypeUtils.Pair<List<String>, List<String>> guids = repository.updatePartial(newInstance);
return onEntitiesAddedUpdated(guids);
AtlasClient.EntityResult entityResult = repository.updatePartial(newInstance);
onEntitiesAddedUpdated(entityResult);
return entityResult;
}
private ITypedReferenceableInstance convertToTypedInstance(Referenceable updatedEntity, String typeName) throws AtlasException {
private ITypedReferenceableInstance convertToTypedInstance(Referenceable updatedEntity, String typeName)
throws AtlasException {
ClassType type = typeSystem.getDataType(ClassType.class, typeName);
ITypedReferenceableInstance newInstance = type.createInstance();
......@@ -538,7 +541,8 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
}
@Override
public String updateEntityByUniqueAttribute(String typeName, String uniqueAttributeName, String attrValue,
public AtlasClient.EntityResult updateEntityByUniqueAttribute(String typeName, String uniqueAttributeName,
String attrValue,
Referenceable updatedEntity) throws AtlasException {
ParamChecker.notEmpty(typeName, "typeName");
ParamChecker.notEmpty(uniqueAttributeName, "uniqueAttributeName");
......@@ -550,8 +554,9 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
final ITypedReferenceableInstance newInstance = convertToTypedInstance(updatedEntity, typeName);
((ReferenceableInstance)newInstance).replaceWithNewId(oldInstance.getId());
TypeUtils.Pair<List<String>, List<String>> guids = repository.updatePartial(newInstance);
return onEntitiesAddedUpdated(guids);
AtlasClient.EntityResult entityResult = repository.updatePartial(newInstance);
onEntitiesAddedUpdated(entityResult);
return entityResult;
}
private void validateTypeExists(String entityType) throws AtlasException {
......@@ -726,13 +731,14 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
* @see org.apache.atlas.services.MetadataService#deleteEntities(java.lang.String)
*/
@Override
public List<String> deleteEntities(List<String> deleteCandidateGuids) throws AtlasException {
public AtlasClient.EntityResult deleteEntities(List<String> deleteCandidateGuids) throws AtlasException {
ParamChecker.notEmpty(deleteCandidateGuids, "delete candidate guids");
return deleteGuids(deleteCandidateGuids);
}
@Override
public List<String> deleteEntityByUniqueAttribute(String typeName, String uniqueAttributeName, String attrValue) throws AtlasException {
public AtlasClient.EntityResult deleteEntityByUniqueAttribute(String typeName, String uniqueAttributeName,
String attrValue) throws AtlasException {
ParamChecker.notEmpty(typeName, "delete candidate typeName");
ParamChecker.notEmpty(uniqueAttributeName, "delete candidate unique attribute name");
ParamChecker.notEmpty(attrValue, "delete candidate unique attribute value");
......@@ -745,12 +751,10 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
return deleteGuids(deleteCandidateGuids);
}
private List<String> deleteGuids(List<String> deleteCandidateGuids) throws AtlasException {
Pair<List<String>, List<ITypedReferenceableInstance>> deleteEntitiesResult = repository.deleteEntities(deleteCandidateGuids);
if (deleteEntitiesResult.right.size() > 0) {
onEntitiesDeleted(deleteEntitiesResult.right);
}
return deleteEntitiesResult.left;
private AtlasClient.EntityResult deleteGuids(List<String> deleteCandidateGuids) throws AtlasException {
AtlasClient.EntityResult entityResult = repository.deleteEntities(deleteCandidateGuids);
onEntitiesAddedUpdated(entityResult);
return entityResult;
}
private void onEntitiesDeleted(List<ITypedReferenceableInstance> entities) throws AtlasException {
......
......@@ -19,6 +19,7 @@
package org.apache.atlas.repository.audit;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.commons.lang.RandomStringUtils;
import org.testng.annotations.Test;
......@@ -38,7 +39,7 @@ public class AuditRepositoryTestBase {
@Test
public void testAddEvents() throws Exception {
EntityAuditEvent event = new EntityAuditEvent(rand(), System.currentTimeMillis(), "u1",
EntityAuditEvent.EntityAuditAction.ENTITY_CREATE, "d1");
EntityAuditEvent.EntityAuditAction.ENTITY_CREATE, "d1", new Referenceable(rand()));
eventRepository.putEvents(event);
......@@ -54,17 +55,18 @@ public class AuditRepositoryTestBase {
String id2 = "id2" + rand();
String id3 = "id3" + rand();
long ts = System.currentTimeMillis();
Referenceable entity = new Referenceable(rand());
List<EntityAuditEvent> expectedEvents = new ArrayList<>(3);
for (int i = 0; i < 3; i++) {
//Add events for both ids
EntityAuditEvent event = new EntityAuditEvent(id2, ts - i, "user" + i,
EntityAuditEvent.EntityAuditAction.ENTITY_UPDATE, "details" + i);
EntityAuditEvent.EntityAuditAction.ENTITY_UPDATE, "details" + i, entity);
eventRepository.putEvents(event);
expectedEvents.add(event);
eventRepository.putEvents(new EntityAuditEvent(id1, ts - i, "user" + i,
EntityAuditEvent.EntityAuditAction.TAG_DELETE, "details" + i));
EntityAuditEvent.EntityAuditAction.TAG_DELETE, "details" + i, entity));
eventRepository.putEvents(new EntityAuditEvent(id3, ts - i, "user" + i,
EntityAuditEvent.EntityAuditAction.TAG_ADD, "details" + i));
EntityAuditEvent.EntityAuditAction.TAG_ADD, "details" + i, entity));
}
//Use ts for which there is no event - ts + 2
......
......@@ -21,8 +21,10 @@ package org.apache.atlas.repository.graph;
import com.tinkerpop.blueprints.Vertex;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.TestUtils;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.ITypedStruct;
......@@ -32,8 +34,11 @@ import org.apache.atlas.typesystem.types.TypeSystem;
import org.testng.Assert;
import java.util.List;
import java.util.Map;
import static org.apache.atlas.TestUtils.COLUMNS_ATTR_NAME;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.fail;
import static org.testng.AssertJUnit.assertNotNull;
......@@ -45,7 +50,24 @@ public class GraphBackedRepositoryHardDeleteTest extends GraphBackedMetadataRepo
}
@Override
protected void assertTestDeleteReference(ITypedReferenceableInstance processInstance) throws Exception {
protected void assertTestDeleteEntityWithTraits(String guid) {
//entity is deleted. So, no assertions
}
@Override
protected void assertTableForTestDeleteReference(String tableId) {
//entity is deleted. So, no assertions
}
@Override
protected void assertColumnForTestDeleteReference(ITypedReferenceableInstance tableInstance) throws AtlasException {
List<ITypedReferenceableInstance> columns =
(List<ITypedReferenceableInstance>) tableInstance.get(COLUMNS_ATTR_NAME);
assertNull(columns);
}
@Override
protected void assertProcessForTestDeleteReference(ITypedReferenceableInstance processInstance) throws Exception {
//assert that outputs is empty
ITypedReferenceableInstance newProcess =
repositoryService.getEntityDefinition(processInstance.getId()._getId());
......@@ -63,6 +85,11 @@ public class GraphBackedRepositoryHardDeleteTest extends GraphBackedMetadataRepo
}
@Override
protected void assertDeletedColumn(ITypedReferenceableInstance tableInstance) throws AtlasException {
assertEquals(((List<IReferenceableInstance>) tableInstance.get(COLUMNS_ATTR_NAME)).size(), 2);
}
@Override
protected void assertTestDeleteEntities(ITypedReferenceableInstance tableInstance) {
int vertexCount = getVertices(Constants.ENTITY_TYPE_PROPERTY_KEY, TestUtils.TABLE_TYPE).size();
assertEquals(vertexCount, 0);
......@@ -85,12 +112,42 @@ public class GraphBackedRepositoryHardDeleteTest extends GraphBackedMetadataRepo
}
@Override
protected void assertTestDisconnectBidirectionalReferences(String janeGuid) throws Exception {
protected void assertJohnForTestDisconnectBidirectionalReferences(ITypedReferenceableInstance john,
String janeGuid) throws Exception {
assertNull(john.get("manager"));
}
@Override
protected void assertMaxForTestDisconnectBidirectionalReferences(Map<String, String> nameGuidMap)
throws Exception {
// Verify that the Department.employees reference to the deleted employee
// was disconnected.
ITypedReferenceableInstance hrDept = repositoryService.getEntityDefinition(nameGuidMap.get("hr"));
List<ITypedReferenceableInstance> employees = (List<ITypedReferenceableInstance>) hrDept.get("employees");
Assert.assertEquals(employees.size(), 3);
String maxGuid = nameGuidMap.get("Max");
for (ITypedReferenceableInstance employee : employees) {
Assert.assertNotEquals(employee.getId()._getId(), maxGuid);
}
// Verify that the Manager.subordinates reference to the deleted employee
// Max was disconnected.
ITypedReferenceableInstance jane = repositoryService.getEntityDefinition(janeGuid);
ITypedReferenceableInstance jane = repositoryService.getEntityDefinition(nameGuidMap.get("Jane"));
List<ITypedReferenceableInstance> subordinates = (List<ITypedReferenceableInstance>) jane.get("subordinates");
assertEquals(subordinates.size(), 1);
// Verify that max's Person.mentor unidirectional reference to john was disconnected.
ITypedReferenceableInstance john = repositoryService.getEntityDefinition(nameGuidMap.get("John"));
assertNull(john.get("mentor"));
}
@Override
protected void assertTestDisconnectUnidirectionalArrayReferenceFromClassType(
List<ITypedReferenceableInstance> columns, String columnGuid) {
assertEquals(columns.size(), 4);
for (ITypedReferenceableInstance column : columns) {
assertFalse(column.getId()._getId().equals(columnGuid));
}
}
@Override
......
......@@ -19,10 +19,11 @@
package org.apache.atlas.repository.graph;
import com.tinkerpop.blueprints.Vertex;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.TestUtils;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.ITypedStruct;
......@@ -33,8 +34,12 @@ import org.testng.Assert;
import java.util.List;
import java.util.Map;
import static org.apache.atlas.TestUtils.COLUMNS_ATTR_NAME;
import static org.apache.atlas.TestUtils.NAME;
import static org.apache.atlas.TestUtils.PII;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
public class GraphBackedRepositorySoftDeleteTest extends GraphBackedMetadataRepositoryDeleteTestBase {
@Override
......@@ -43,7 +48,38 @@ public class GraphBackedRepositorySoftDeleteTest extends GraphBackedMetadataRepo
}
@Override
protected void assertTestDeleteReference(ITypedReferenceableInstance expected) throws Exception {
protected void assertTestDeleteEntityWithTraits(String guid) throws Exception {
ITypedReferenceableInstance instance = repositoryService.getEntityDefinition(guid);
assertTrue(instance.getTraits().contains(PII));
}
@Override
protected void assertTableForTestDeleteReference(String tableId) throws Exception {
ITypedReferenceableInstance table = repositoryService.getEntityDefinition(tableId);
assertNotNull(table.get(NAME));
assertNotNull(table.get("description"));
assertNotNull(table.get("type"));
assertNotNull(table.get("tableType"));
assertNotNull(table.get("created"));
Id dbId = (Id) table.get("database");
assertNotNull(dbId);
ITypedReferenceableInstance db = repositoryService.getEntityDefinition(dbId.getId()._getId());
assertNotNull(db);
assertEquals(db.getId().getState(), Id.EntityState.ACTIVE);
}
@Override
protected void assertColumnForTestDeleteReference(ITypedReferenceableInstance tableInstance) throws AtlasException {
List<ITypedReferenceableInstance> columns =
(List<ITypedReferenceableInstance>) tableInstance.get(COLUMNS_ATTR_NAME);
assertEquals(columns.size(), 1);
assertEquals(columns.get(0).getId().getState(), Id.EntityState.DELETED);
}
@Override
protected void assertProcessForTestDeleteReference(ITypedReferenceableInstance expected) throws Exception {
ITypedReferenceableInstance process = repositoryService.getEntityDefinition(expected.getId()._getId());
List<ITypedReferenceableInstance> outputs =
(List<ITypedReferenceableInstance>) process.get(AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS);
......@@ -59,6 +95,13 @@ public class GraphBackedRepositorySoftDeleteTest extends GraphBackedMetadataRepo
}
@Override
protected void assertDeletedColumn(ITypedReferenceableInstance tableInstance) throws AtlasException {
List<IReferenceableInstance> columns = (List<IReferenceableInstance>) tableInstance.get(COLUMNS_ATTR_NAME);
assertEquals(columns.size(), 3);
assertEquals(columns.get(0).getId().getState(), Id.EntityState.DELETED);
}
@Override
protected void assertTestDeleteEntities(ITypedReferenceableInstance expected) throws Exception {
//Assert that the deleted table can be fully constructed back
ITypedReferenceableInstance table = repositoryService.getEntityDefinition(expected.getId()._getId());
......@@ -67,6 +110,7 @@ public class GraphBackedRepositorySoftDeleteTest extends GraphBackedMetadataRepo
List<ITypedReferenceableInstance> expectedColumns =
(List<ITypedReferenceableInstance>) table.get(TestUtils.COLUMNS_ATTR_NAME);
assertEquals(columns.size(), expectedColumns.size());
assertNotNull(table.get("database"));
}
@Override
......@@ -85,11 +129,57 @@ public class GraphBackedRepositorySoftDeleteTest extends GraphBackedMetadataRepo
}
@Override
protected void assertTestDisconnectBidirectionalReferences(String janeGuid) throws Exception {
protected void assertJohnForTestDisconnectBidirectionalReferences(ITypedReferenceableInstance john, String janeGuid)
throws Exception {
Id mgr = (Id) john.get("manager");
assertNotNull(mgr);
assertEquals(mgr._getId(), janeGuid);
assertEquals(mgr.getState(), Id.EntityState.DELETED);
}
@Override
protected void assertMaxForTestDisconnectBidirectionalReferences(Map<String, String> nameGuidMap) throws Exception {
// Verify that the Department.employees reference to the deleted employee
// was disconnected.
ITypedReferenceableInstance hrDept = repositoryService.getEntityDefinition(nameGuidMap.get("hr"));
List<ITypedReferenceableInstance> employees = (List<ITypedReferenceableInstance>) hrDept.get("employees");
Assert.assertEquals(employees.size(), 4);
String maxGuid = nameGuidMap.get("Max");
for (ITypedReferenceableInstance employee : employees) {
if (employee.getId()._getId().equals(maxGuid)) {
assertEquals(employee.getId().getState(), Id.EntityState.DELETED);
}
}
// Verify that the Manager.subordinates still references deleted employee
ITypedReferenceableInstance jane = repositoryService.getEntityDefinition(janeGuid);
ITypedReferenceableInstance jane = repositoryService.getEntityDefinition(nameGuidMap.get("Jane"));
List<ITypedReferenceableInstance> subordinates = (List<ITypedReferenceableInstance>) jane.get("subordinates");
assertEquals(subordinates.size(), 2);
for (ITypedReferenceableInstance subordinate : subordinates) {
if (subordinate.getId()._getId().equals(maxGuid)) {
assertEquals(subordinate.getId().getState(), Id.EntityState.DELETED);
}
}
// Verify that max's Person.mentor unidirectional reference to john was disconnected.
ITypedReferenceableInstance john = repositoryService.getEntityDefinition(nameGuidMap.get("John"));
Id mentor = (Id) john.get("mentor");
assertEquals(mentor._getId(), maxGuid);
assertEquals(mentor.getState(), Id.EntityState.DELETED);
}
@Override
protected void assertTestDisconnectUnidirectionalArrayReferenceFromClassType(
List<ITypedReferenceableInstance> columns, String columnGuid) {
Assert.assertEquals(columns.size(), 5);
for (ITypedReferenceableInstance column : columns) {
if (column.getId()._getId().equals(columnGuid)) {
assertEquals(column.getId().getState(), Id.EntityState.DELETED);
} else {
assertEquals(column.getId().getState(), Id.EntityState.ACTIVE);
}
}
}
@Override
......@@ -122,7 +212,6 @@ public class GraphBackedRepositorySoftDeleteTest extends GraphBackedMetadataRepo
@Override
protected void assertTestDeleteTargetOfMultiplicityRequiredReference() throws Exception {
// No-op - it's ok that no exception was thrown if soft deletes are enabled.
}
}
......@@ -49,7 +49,16 @@ public class RequestContext {
private RequestContext() {
}
//To handle gets from background threads where createContext() is not called
//createContext called for every request in the filter
public static RequestContext get() {
if (CURRENT_CONTEXT.get() == null) {
synchronized (RequestContext.class) {
if (CURRENT_CONTEXT.get() == null) {
createContext();
}
}
}
return CURRENT_CONTEXT.get();
}
......@@ -72,15 +81,19 @@ public class RequestContext {
this.user = user;
}
public void recordCreatedEntities(Collection<String> createdEntityIds) {
public void recordEntityCreate(Collection<String> createdEntityIds) {
this.createdEntityIds.addAll(createdEntityIds);
}
public void recordUpdatedEntities(Collection<String> updatedEntityIds) {
public void recordEntityUpdate(Collection<String> updatedEntityIds) {
this.updatedEntityIds.addAll(updatedEntityIds);
}
public void recordDeletedEntity(String entityId, String typeName) throws AtlasException {
public void recordEntityUpdate(String entityId) {
this.updatedEntityIds.add(entityId);
}
public void recordEntityDelete(String entityId, String typeName) throws AtlasException {
ClassType type = typeSystem.getDataType(ClassType.class, typeName);
ITypedReferenceableInstance entity = type.createInstance(new Id(entityId, 0, typeName));
if (deletedEntityIds.add(entityId)) {
......
......@@ -18,6 +18,7 @@
package org.apache.atlas.services;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.listener.EntityChangeListener;
......@@ -80,7 +81,7 @@ public interface MetadataService {
* @param entityDefinition definition
* @return json array of guids of entities created
*/
String createEntities(String entityDefinition) throws AtlasException;
List<String> createEntities(String entityDefinition) throws AtlasException;
/**
* Get a typed entity instance.
......@@ -136,7 +137,7 @@ public interface MetadataService {
* @param value property value
* @return json array of guids of entities created/updated
*/
String updateEntityAttributeByGuid(String guid, String attribute, String value) throws AtlasException;
AtlasClient.EntityResult updateEntityAttributeByGuid(String guid, String attribute, String value) throws AtlasException;
/**
* Supports Partial updates of an entity. Users can update a subset of attributes for an entity identified by its guid
......@@ -146,7 +147,7 @@ public interface MetadataService {
* @return json array of guids of entities created/updated
* @throws AtlasException
*/
String updateEntityPartialByGuid(String guid, Referenceable entity) throws AtlasException;
AtlasClient.EntityResult updateEntityPartialByGuid(String guid, Referenceable entity) throws AtlasException;
/**
* Batch API - Adds/Updates the given entity id(guid).
......@@ -154,7 +155,7 @@ public interface MetadataService {
* @param entityJson entity json
* @return json array of guids of entities created/updated
*/
String updateEntities(String entityJson) throws AtlasException;
AtlasClient.EntityResult updateEntities(String entityJson) throws AtlasException;
// Trait management functions
......@@ -168,7 +169,8 @@ public interface MetadataService {
* @return Guid of updated entity
* @throws AtlasException
*/
String updateEntityByUniqueAttribute(String typeName, String uniqueAttributeName, String attrValue,
AtlasClient.EntityResult updateEntityByUniqueAttribute(String typeName, String uniqueAttributeName,
String attrValue,
Referenceable updatedEntity) throws AtlasException;
/**
......@@ -210,7 +212,7 @@ public interface MetadataService {
* @return List of guids for deleted entities
* @throws AtlasException
*/
List<String> deleteEntities(List<String> guids) throws AtlasException;
AtlasClient.EntityResult deleteEntities(List<String> guids) throws AtlasException;
/**
* Register a listener for entity change.
......@@ -235,7 +237,8 @@ public interface MetadataService {
* @return List of guids for deleted entities (including their composite references)
* @throws AtlasException
*/
List<String> deleteEntityByUniqueAttribute(String typeName, String uniqueAttributeName, String attrValue) throws AtlasException;
AtlasClient.EntityResult deleteEntityByUniqueAttribute(String typeName, String uniqueAttributeName,
String attrValue) throws AtlasException;
/**
* Returns entity audit events for entity id in the decreasing order of timestamp
......
......@@ -100,7 +100,7 @@ public class Referenceable extends Struct implements IReferenceableInstance {
* @throws AtlasException if the referenceable can not be created
*/
public Referenceable(IReferenceableInstance instance) throws AtlasException {
this(instance.getId()._getId(), instance.getTypeName(), instance.getValuesMap(), instance.getTraits(),
this(instance.getId(), instance.getTypeName(), instance.getValuesMap(), instance.getTraits(),
getTraits(instance));
}
......
......@@ -83,13 +83,13 @@ public class LocalAtlasClient extends AtlasClient {
}
};
JSONObject response = entityOperation.run();
List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>());
EntityResult results = extractEntityResult(response);
LOG.debug("Create entities returned results: {}", results);
return results;
return results.getCreatedEntities();
}
@Override
protected List<String> updateEntities(final JSONArray entities) throws AtlasServiceException {
protected EntityResult updateEntities(final JSONArray entities) throws AtlasServiceException {
LOG.debug("Updating entities: {}", entities);
EntityOperation entityOperation = new EntityOperation(API.UPDATE_ENTITY) {
@Override
......@@ -98,7 +98,7 @@ public class LocalAtlasClient extends AtlasClient {
}
};
JSONObject response = entityOperation.run();
List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>());
EntityResult results = extractEntityResult(response);
LOG.debug("Update entities returned results: {}", results);
return results;
}
......@@ -130,7 +130,7 @@ public class LocalAtlasClient extends AtlasClient {
}
@Override
public String updateEntity(final String entityType, final String uniqueAttributeName,
public EntityResult updateEntity(final String entityType, final String uniqueAttributeName,
final String uniqueAttributeValue, Referenceable entity) throws AtlasServiceException {
final String entityJson = InstanceSerialization.toJson(entity, true);
LOG.debug("Updating entity type: {}, attributeName: {}, attributeValue: {}, entity: {}", entityType,
......@@ -143,13 +143,13 @@ public class LocalAtlasClient extends AtlasClient {
}
};
JSONObject response = entityOperation.run();
String result = getString(response, GUID);
EntityResult result = extractEntityResult(response);
LOG.debug("Update entity returned result: {}", result);
return result;
}
@Override
public List<String> deleteEntity(final String entityType, final String uniqueAttributeName,
public EntityResult deleteEntity(final String entityType, final String uniqueAttributeName,
final String uniqueAttributeValue) throws AtlasServiceException {
LOG.debug("Deleting entity type: {}, attributeName: {}, attributeValue: {}", entityType, uniqueAttributeName,
uniqueAttributeValue);
......@@ -160,7 +160,7 @@ public class LocalAtlasClient extends AtlasClient {
}
};
JSONObject response = entityOperation.run();
List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>());
EntityResult results = extractEntityResult(response);
LOG.debug("Delete entities returned results: {}", results);
return results;
}
......@@ -191,18 +191,18 @@ public class LocalAtlasClient extends AtlasClient {
}
@Override
public void updateEntityAttribute(final String guid, final String attribute, String value) throws AtlasServiceException {
public EntityResult updateEntityAttribute(final String guid, final String attribute, String value) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public void updateEntity(String guid, Referenceable entity) throws AtlasServiceException {
public EntityResult updateEntity(String guid, Referenceable entity) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public List<String> deleteEntities(final String ... guids) throws AtlasServiceException {
public EntityResult deleteEntities(final String ... guids) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
......
......@@ -15,22 +15,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.entity;
package org.apache.atlas.notification;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import org.apache.atlas.AtlasException;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.entity.EntityNotification;
import org.apache.atlas.notification.entity.EntityNotificationImpl;
import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.types.FieldMapping;
import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.TypeSystem;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Listen to the repository for entity changes and produce entity change notifications.
......@@ -87,6 +95,53 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
// ----- helper methods -------------------------------------------------
// ----- helper methods ----------------------------------------------------
@VisibleForTesting
public static List<IStruct> getAllTraits(IReferenceableInstance entityDefinition,
TypeSystem typeSystem) throws AtlasException {
List<IStruct> traitInfo = new LinkedList<>();
for (String traitName : entityDefinition.getTraits()) {
IStruct trait = entityDefinition.getTrait(traitName);
String typeName = trait.getTypeName();
Map<String, Object> valuesMap = trait.getValuesMap();
traitInfo.add(new Struct(typeName, valuesMap));
traitInfo.addAll(getSuperTraits(typeName, valuesMap, typeSystem));
}
return traitInfo;
}
private static List<IStruct> getSuperTraits(
String typeName, Map<String, Object> values, TypeSystem typeSystem) throws AtlasException {
List<IStruct> superTypes = new LinkedList<>();
TraitType traitDef = typeSystem.getDataType(TraitType.class, typeName);
Set<String> superTypeNames = traitDef.getAllSuperTypeNames();
for (String superTypeName : superTypeNames) {
TraitType superTraitDef = typeSystem.getDataType(TraitType.class, superTypeName);
Map<String, Object> superTypeValues = new HashMap<>();
FieldMapping fieldMapping = superTraitDef.fieldMapping();
if (fieldMapping != null) {
Set<String> superTypeAttributeNames = fieldMapping.fields.keySet();
for (String superTypeAttributeName : superTypeAttributeNames) {
if (values.containsKey(superTypeAttributeName)) {
superTypeValues.put(superTypeAttributeName, values.get(superTypeAttributeName));
}
}
}
IStruct superTrait = new Struct(superTypeName, superTypeValues);
superTypes.add(superTrait);
superTypes.addAll(getSuperTraits(superTypeName, values, typeSystem));
}
return superTypes;
}
// send notification of entity change
private void notifyOfEntityEvent(Collection<ITypedReferenceableInstance> entityDefinitions,
EntityNotification.OperationType operationType) throws AtlasException {
......@@ -96,7 +151,7 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
Referenceable entity = new Referenceable(entityDefinition);
EntityNotificationImpl notification =
new EntityNotificationImpl(entity, operationType, typeSystem);
new EntityNotificationImpl(entity, operationType, getAllTraits(entity, typeSystem));
messages.add(notification);
}
......
......@@ -19,7 +19,6 @@
package org.apache.atlas.web.resources;
import com.google.common.base.Preconditions;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent;
......@@ -59,9 +58,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
......@@ -119,19 +116,11 @@ public class EntityResource {
LOG.debug("submitting entities {} ", AtlasClient.toString(new JSONArray(entities)));
final String guids = metadataService.createEntities(entities);
final List<String> guids = metadataService.createEntities(entities);
JSONObject response = getResponse(new AtlasClient.EntityResult(guids, null, null));
UriBuilder ub = uriInfo.getAbsolutePathBuilder();
URI locationURI = ub.path(guids).build();
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
JSONArray guidArray = new JSONArray(guids);
response.put(AtlasClient.GUID, guidArray);
if (guidArray.length() > 0) {
response.put(AtlasClient.DEFINITION,
new JSONObject(metadataService.getEntityDefinition(new JSONArray(guids).getString(0))));
}
URI locationURI = guids.isEmpty() ? null : ub.path(guids.get(0)).build();
return Response.created(locationURI).entity(response).build();
......@@ -150,6 +139,18 @@ public class EntityResource {
}
}
private JSONObject getResponse(AtlasClient.EntityResult entityResult) throws AtlasException, JSONException {
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
response.put(AtlasClient.ENTITIES, new JSONObject(entityResult.toString()).get(AtlasClient.ENTITIES));
String sampleEntityId = getSample(entityResult);
if (sampleEntityId != null) {
String entityDefinition = metadataService.getEntityDefinition(sampleEntityId);
response.put(AtlasClient.DEFINITION, new JSONObject(entityDefinition));
}
return response;
}
/**
* Complete update of a set of entities - the values not specified will be replaced with null/removed
* Adds/Updates given entities identified by its GUID or unique attribute
......@@ -163,14 +164,8 @@ public class EntityResource {
final String entities = Servlets.getRequestPayload(request);
LOG.debug("updating entities {} ", AtlasClient.toString(new JSONArray(entities)));
final String guids = metadataService.updateEntities(entities);
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
JSONArray guidsArray = new JSONArray(guids);
response.put(AtlasClient.GUID, guidsArray);
String entityDefinition = metadataService.getEntityDefinition(guidsArray.getString(0));
response.put(AtlasClient.DEFINITION, new JSONObject(entityDefinition));
AtlasClient.EntityResult entityResult = metadataService.updateEntities(entities);
JSONObject response = getResponse(entityResult);
return Response.ok(response).build();
} catch(EntityExistsException e) {
LOG.error("Unique constraint violation", e);
......@@ -187,6 +182,25 @@ public class EntityResource {
}
}
private String getSample(AtlasClient.EntityResult entityResult) {
String sample = getSample(entityResult.getCreatedEntities());
if (sample == null) {
sample = getSample(entityResult.getUpdateEntities());
}
if (sample == null) {
sample = getSample(entityResult.getDeletedEntities());
}
return sample;
}
private String getSample(List<String> list) {
if (list != null && list.size() > 0) {
return list.get(0);
}
return null;
}
/**
* Adds/Updates given entity identified by its unique attribute( entityType, attributeName and value)
* Updates support only partial update of an entity - Adds/updates any new values specified
......@@ -214,11 +228,10 @@ public class EntityResource {
Referenceable updatedEntity =
InstanceSerialization.fromJsonReferenceable(entities, true);
final String guid = metadataService.updateEntityByUniqueAttribute(entityType, attribute, value, updatedEntity);
AtlasClient.EntityResult entityResult =
metadataService.updateEntityByUniqueAttribute(entityType, attribute, value, updatedEntity);
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Thread.currentThread().getName());
response.put(AtlasClient.GUID, guid);
JSONObject response = getResponse(entityResult);
return Response.ok(response).build();
} catch (ValueConversionException ve) {
LOG.error("Unable to persist entity instance due to a desrialization error ", ve);
......@@ -268,10 +281,8 @@ public class EntityResource {
Referenceable updatedEntity =
InstanceSerialization.fromJsonReferenceable(entityJson, true);
metadataService.updateEntityPartialByGuid(guid, updatedEntity);
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Thread.currentThread().getName());
AtlasClient.EntityResult entityResult = metadataService.updateEntityPartialByGuid(guid, updatedEntity);
JSONObject response = getResponse(entityResult);
return Response.ok(response).build();
} catch (EntityNotFoundException e) {
LOG.error("An entity with GUID={} does not exist", guid, e);
......@@ -301,12 +312,8 @@ public class EntityResource {
String value = Servlets.getRequestPayload(request);
Preconditions.checkNotNull(value, "Entity value cannot be null");
metadataService.updateEntityAttributeByGuid(guid, property, value);
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Thread.currentThread().getName());
response.put(AtlasClient.GUID, guid);
AtlasClient.EntityResult entityResult = metadataService.updateEntityAttributeByGuid(guid, property, value);
JSONObject response = getResponse(entityResult);
return Response.ok(response).build();
} catch (EntityNotFoundException e) {
LOG.error("An entity with GUID={} does not exist", guid, e);
......@@ -340,19 +347,13 @@ public class EntityResource {
@QueryParam("value") String value) {
try {
List<String> deletedGuids = new ArrayList<>();
AtlasClient.EntityResult entityResult;
if (guids != null && !guids.isEmpty()) {
deletedGuids = metadataService.deleteEntities(guids);
entityResult = metadataService.deleteEntities(guids);
} else {
deletedGuids = metadataService.deleteEntityByUniqueAttribute(entityType, attribute, value);
}
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
JSONArray guidArray = new JSONArray(deletedGuids.size());
for (String guid : deletedGuids) {
guidArray.put(guid);
entityResult = metadataService.deleteEntityByUniqueAttribute(entityType, attribute, value);
}
response.put(AtlasClient.GUID, guidArray);
JSONObject response = getResponse(entityResult);
return Response.ok(response).build();
} catch (EntityNotFoundException e) {
if(guids != null || !guids.isEmpty()) {
......@@ -386,7 +387,6 @@ public class EntityResource {
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
response.put(AtlasClient.GUID, guid);
Response.Status status = Response.Status.NOT_FOUND;
if (entityDefinition != null) {
......@@ -518,7 +518,6 @@ public class EntityResource {
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
response.put(AtlasClient.GUID, guid);
response.put(AtlasClient.RESULTS, new JSONArray(traitNames));
response.put(AtlasClient.COUNT, traitNames.size());
......@@ -555,7 +554,6 @@ public class EntityResource {
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
response.put(AtlasClient.GUID, guid);
return Response.created(locationURI).entity(response).build();
} catch (EntityNotFoundException | TypeNotFoundException e) {
......@@ -588,7 +586,6 @@ public class EntityResource {
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
response.put(AtlasClient.GUID, guid);
response.put(TRAIT_NAME, traitName);
return Response.ok(response).build();
......
......@@ -23,7 +23,7 @@ import com.google.inject.multibindings.Multibinder;
import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.notification.NotificationHookConsumer;
import org.apache.atlas.notification.entity.NotificationEntityChangeListener;
import org.apache.atlas.notification.NotificationEntityChangeListener;
import org.apache.atlas.service.Service;
public class ServiceModule extends AbstractModule {
......
......@@ -23,7 +23,6 @@ import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.web.resources.EntityResource;
import org.apache.atlas.web.service.ServiceState;
import org.apache.commons.lang.RandomStringUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
......@@ -36,6 +35,7 @@ import javax.ws.rs.core.Response;
import java.util.Arrays;
import java.util.List;
import static org.apache.atlas.AtlasClient.ENTITIES;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyString;
......@@ -64,7 +64,8 @@ public class LocalAtlasClientTest {
when(entityResource.submit(any(HttpServletRequest.class))).thenReturn(response);
final String guid = random();
when(response.getEntity()).thenReturn(new JSONObject() {{
put(AtlasClient.GUID, new JSONArray(Arrays.asList(guid)));
put(ENTITIES, new JSONObject(
new AtlasClient.EntityResult(Arrays.asList(guid), null, null).toString()).get(ENTITIES));
}});
LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, entityResource);
......@@ -119,12 +120,14 @@ public class LocalAtlasClientTest {
when(entityResource.updateByUniqueAttribute(anyString(), anyString(), anyString(),
any(HttpServletRequest.class))).thenReturn(response);
when(response.getEntity()).thenReturn(new JSONObject() {{
put(AtlasClient.GUID, guid);
put(ENTITIES, new JSONObject(
new AtlasClient.EntityResult(null, Arrays.asList(guid), null).toString()).get(ENTITIES));
}});
LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, entityResource);
String actualId = atlasClient.updateEntity(random(), random(), random(), new Referenceable(random()));
assertEquals(actualId, guid);
AtlasClient.EntityResult
entityResult = atlasClient.updateEntity(random(), random(), random(), new Referenceable(random()));
assertEquals(entityResult.getUpdateEntities(), Arrays.asList(guid));
}
@Test
......@@ -132,14 +135,14 @@ public class LocalAtlasClientTest {
final String guid = random();
Response response = mock(Response.class);
when(response.getEntity()).thenReturn(new JSONObject() {{
put(AtlasClient.GUID, new JSONArray(Arrays.asList(guid)));
put(ENTITIES, new JSONObject(
new AtlasClient.EntityResult(null, null, Arrays.asList(guid)).toString()).get(ENTITIES));
}});
when(entityResource.deleteEntities(anyListOf(String.class), anyString(), anyString(), anyString())).thenReturn(response);
LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, entityResource);
List<String> results = atlasClient.deleteEntity(random(), random(), random());
assertEquals(results.size(), 1);
assertEquals(results.get(0), guid);
AtlasClient.EntityResult entityResult = atlasClient.deleteEntity(random(), random(), random());
assertEquals(entityResult.getDeletedEntities(), Arrays.asList(guid));
}
private String random() {
......
......@@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import org.apache.atlas.notification.entity.EntityNotification;
import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.IStruct;
......@@ -43,7 +42,6 @@ import org.testng.annotations.Test;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.Response;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.testng.annotations.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
public class NotificationEntityChangeListenerTest {
@Test
public void testGetAllTraitsSuperTraits() throws Exception {
TypeSystem typeSystem = mock(TypeSystem.class);
String traitName = "MyTrait";
IStruct myTrait = new Struct(traitName);
String superTraitName = "MySuperTrait";
TraitType traitDef = mock(TraitType.class);
Set<String> superTypeNames = Collections.singleton(superTraitName);
TraitType superTraitDef = mock(TraitType.class);
Set<String> superSuperTypeNames = Collections.emptySet();
Referenceable entity = getEntity("id", myTrait);
when(typeSystem.getDataType(TraitType.class, traitName)).thenReturn(traitDef);
when(typeSystem.getDataType(TraitType.class, superTraitName)).thenReturn(superTraitDef);
when(traitDef.getAllSuperTypeNames()).thenReturn(superTypeNames);
when(superTraitDef.getAllSuperTypeNames()).thenReturn(superSuperTypeNames);
List<IStruct> allTraits = NotificationEntityChangeListener.getAllTraits(entity, typeSystem);
assertEquals(2, allTraits.size());
for (IStruct trait : allTraits) {
String typeName = trait.getTypeName();
assertTrue(typeName.equals(traitName) || typeName.equals(superTraitName));
}
}
private Referenceable getEntity(String id, IStruct... traits) {
String typeName = "typeName";
Map<String, Object> values = new HashMap<>();
List<String> traitNames = new LinkedList<>();
Map<String, IStruct> traitMap = new HashMap<>();
for (IStruct trait : traits) {
String traitName = trait.getTypeName();
traitNames.add(traitName);
traitMap.put(traitName, trait);
}
return new Referenceable(id, typeName, values, traitNames, traitMap);
}
}
......@@ -71,6 +71,7 @@ import java.util.Map;
import java.util.UUID;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
......@@ -154,7 +155,10 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
JSONObject response = new JSONObject(responseAsString);
Assert.assertNotNull(response.get(AtlasClient.REQUEST_ID));
Assert.assertNotNull(response.get(AtlasClient.GUID));
AtlasClient.EntityResult entityResult = AtlasClient.EntityResult.fromString(response.toString());
assertEquals(entityResult.getCreatedEntities().size(), 1);
assertNotNull(entityResult.getCreatedEntities().get(0));
}
@Test
......@@ -376,7 +380,9 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
}
private void addProperty(String guid, String property, String value) throws AtlasServiceException {
serviceClient.updateEntityAttribute(guid, property, value);
AtlasClient.EntityResult entityResult = serviceClient.updateEntityAttribute(guid, property, value);
assertEquals(entityResult.getUpdateEntities().size(), 1);
assertEquals(entityResult.getUpdateEntities().get(0), guid);
}
private ClientResponse getEntityDefinition(String guid) {
......@@ -482,7 +488,6 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
JSONObject response = new JSONObject(responseAsString);
Assert.assertNotNull(response.get(AtlasClient.REQUEST_ID));
Assert.assertNotNull(response.get("GUID"));
final JSONArray list = response.getJSONArray(AtlasClient.RESULTS);
Assert.assertEquals(list.length(), 7);
......@@ -513,7 +518,6 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
JSONObject response = new JSONObject(responseAsString);
Assert.assertNotNull(response.get(AtlasClient.REQUEST_ID));
Assert.assertNotNull(response.get(AtlasClient.GUID));
assertEntityAudit(guid, EntityAuditEvent.EntityAuditAction.TAG_ADD);
}
......@@ -561,7 +565,6 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
JSONObject response = new JSONObject(responseAsString);
Assert.assertNotNull(response.get(AtlasClient.REQUEST_ID));
Assert.assertNotNull(response.get(AtlasClient.GUID));
// verify the response
clientResponse = getEntityDefinition(guid);
......@@ -612,7 +615,6 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
JSONObject response = new JSONObject(responseAsString);
Assert.assertNotNull(response.get(AtlasClient.REQUEST_ID));
Assert.assertNotNull(response.get("GUID"));
Assert.assertNotNull(response.get("traitName"));
assertEntityAudit(guid, EntityAuditEvent.EntityAuditAction.TAG_DELETE);
}
......@@ -635,7 +637,8 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
"trait=" + traitName + " should be defined in type system before it can be deleted");
Assert.assertNotNull(response.get(AtlasClient.STACKTRACE));
}
@Test(dependsOnMethods = "testSubmitEntity()")
@Test(dependsOnMethods = "testSubmitEntity()")
public void testDeleteExistentTraitNonExistentForEntity() throws Exception {
final String guid = tableId._getId();
......@@ -704,7 +707,9 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
}});
LOG.debug("Updating entity= " + tableUpdated);
serviceClient.updateEntity(tableId._getId(), tableUpdated);
AtlasClient.EntityResult entityResult = serviceClient.updateEntity(tableId._getId(), tableUpdated);
assertEquals(entityResult.getUpdateEntities().size(), 1);
assertEquals(entityResult.getUpdateEntities().get(0), tableId._getId());
ClientResponse response = getEntityDefinition(tableId._getId());
String definition = getEntityDefinition(response);
......@@ -722,8 +727,10 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
}});
LOG.debug("Updating entity= " + tableUpdated);
serviceClient.updateEntity(BaseResourceIT.HIVE_TABLE_TYPE, "name", (String) tableInstance.get("name"),
tableUpdated);
entityResult = serviceClient.updateEntity(BaseResourceIT.HIVE_TABLE_TYPE, "name",
(String) tableInstance.get("name"), tableUpdated);
assertEquals(entityResult.getUpdateEntities().size(), 1);
assertEquals(entityResult.getUpdateEntities().get(0), tableId._getId());
response = getEntityDefinition(tableId._getId());
definition = getEntityDefinition(response);
......@@ -732,7 +739,6 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
Assert.assertTrue(refs.get(0).equalsContents(columns.get(0)));
Assert.assertEquals(refs.get(0).get("dataType"), "int");
}
@Test(dependsOnMethods = "testSubmitEntity")
......@@ -765,9 +771,8 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
// ATLAS-586: verify response entity can be parsed by GSON.
String entity = clientResponse.getEntity(String.class);
Gson gson = new Gson();
UpdateEntitiesResponse updateEntitiesResponse = null;
try {
updateEntitiesResponse = gson.fromJson(entity, UpdateEntitiesResponse.class);
UpdateEntitiesResponse updateEntitiesResponse = gson.fromJson(entity, UpdateEntitiesResponse.class);
}
catch (JsonSyntaxException e) {
Assert.fail("Response entity from " + service.path(ENTITIES).getURI() + " not parseable by GSON", e);
......@@ -785,7 +790,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
private static class UpdateEntitiesResponse {
String requestId;
String[] GUID;
AtlasClient.EntityResult entities;
AtlasEntity definition;
}
......@@ -811,15 +816,9 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
queryParam(AtlasClient.GUID.toLowerCase(), db1Id._getId()).
queryParam(AtlasClient.GUID.toLowerCase(), db2Id._getId()).
accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE).method(HttpMethod.DELETE, ClientResponse.class);
JSONObject response = getEntity(clientResponse);
final String deletedGuidsJson = response.getString(AtlasClient.GUID);
Assert.assertNotNull(deletedGuidsJson);
JSONArray guidsArray = new JSONArray(deletedGuidsJson);
Assert.assertEquals(guidsArray.length(), 2);
List<String> deletedGuidsList = new ArrayList<>(2);
for (int index = 0; index < guidsArray.length(); index++) {
deletedGuidsList.add(guidsArray.getString(index));
}
List<String> deletedGuidsList = AtlasClient.EntityResult.fromString(response.toString()).getDeletedEntities();
Assert.assertTrue(deletedGuidsList.contains(db1Id._getId()));
Assert.assertTrue(deletedGuidsList.contains(db2Id._getId()));
......@@ -843,7 +842,8 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
Id db2Id = createInstance(db2);
// Delete the database entities
List<String> deletedGuidsList = serviceClient.deleteEntities(db1Id._getId(), db2Id._getId());
List<String> deletedGuidsList =
serviceClient.deleteEntities(db1Id._getId(), db2Id._getId()).getDeletedEntities();
// Verify that deleteEntities() response has database entity guids
Assert.assertEquals(deletedGuidsList.size(), 2);
......@@ -867,7 +867,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
Id db1Id = createInstance(db1);
// Delete the database entity
List<String> deletedGuidsList = serviceClient.deleteEntity(DATABASE_TYPE, "name", dbName);
List<String> deletedGuidsList = serviceClient.deleteEntity(DATABASE_TYPE, "name", dbName).getDeletedEntities();
// Verify that deleteEntities() response has database entity guids
Assert.assertEquals(deletedGuidsList.size(), 1);
......
......@@ -23,21 +23,16 @@ import org.apache.atlas.ha.HAConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static junit.framework.TestCase.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment