Commit 8cbb2c13 by Shwetha GS

ATLAS-342 Atlas is sending an ENTITY_CREATE event to the ATLAS_ENTITIES topic…

ATLAS-342 Atlas is sending an ENTITY_CREATE event to the ATLAS_ENTITIES topic even if the entity exists already (shwethags)
parent dce31ab8
......@@ -14,6 +14,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags)
ALL CHANGES:
ATLAS-342 Atlas is sending an ENTITY_CREATE event to the ATLAS_ENTITIES topic even if the entity exists already (shwethags)
ATLAS-386 Handle hive rename Table (shwethags)
ATLAS-374 Doc: Create a wiki for documenting fault tolerance and HA options for Atlas data (yhemanth via sumasai)
ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown (yhemanth via sumasai)
......
......@@ -19,12 +19,13 @@
package org.apache.atlas.repository;
import org.apache.atlas.AtlasException;
import org.apache.atlas.typesystem.exception.EntityExistsException;
import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.ITypedStruct;
import org.apache.atlas.typesystem.exception.EntityExistsException;
import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.typesystem.types.AttributeInfo;
import org.apache.atlas.typesystem.types.IDataType;
import org.apache.atlas.typesystem.types.TypeUtils;
import java.util.List;
......@@ -82,7 +83,7 @@ public interface MetadataRepository {
* @throws RepositoryException
* @throws EntityExistsException
*/
String[] createEntities(ITypedReferenceableInstance... entities) throws RepositoryException, EntityExistsException;
List<String> createEntities(ITypedReferenceableInstance... entities) throws RepositoryException, EntityExistsException;
/**
* Fetch the complete definition of an entity given its GUID.
......@@ -143,13 +144,13 @@ public interface MetadataRepository {
* Adds/Updates the property to the entity that corresponds to the GUID
* Supports only primitive attribute/Class Id updations.
*/
void updatePartial(ITypedReferenceableInstance entity) throws RepositoryException;
TypeUtils.Pair<List<String>, List<String>> updatePartial(ITypedReferenceableInstance entity) throws RepositoryException;
/**
* Adds the property to the entity that corresponds to the GUID
* @param entitiesToBeUpdated The entities to be updated
*/
String[] updateEntities(ITypedReferenceableInstance... entitiesToBeUpdated) throws RepositoryException;
TypeUtils.Pair<List<String>, List<String>> updateEntities(ITypedReferenceableInstance... entitiesToBeUpdated) throws RepositoryException;
/**
* Returns the entity for the given type and qualified name
......
......@@ -37,6 +37,7 @@ import org.apache.atlas.typesystem.types.AttributeInfo;
import org.apache.atlas.typesystem.types.ClassType;
import org.apache.atlas.typesystem.types.IDataType;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.typesystem.types.TypeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -114,12 +115,14 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@Override
@GraphTransaction
public String[] createEntities(ITypedReferenceableInstance... entities) throws RepositoryException,
public List<String> createEntities(ITypedReferenceableInstance... entities) throws RepositoryException,
EntityExistsException {
LOG.info("adding entities={}", entities);
try {
TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper(graphToInstanceMapper);
return instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.CREATE, entities);
TypeUtils.Pair<List<String>, List<String>> idPair =
instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.CREATE, entities);
return idPair.left;
} catch (EntityExistsException e) {
throw e;
} catch (AtlasException e) {
......@@ -279,7 +282,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@Override
@GraphTransaction
public String[] updateEntities(ITypedReferenceableInstance... entitiesUpdated) throws RepositoryException {
public TypeUtils.Pair<List<String>, List<String>> updateEntities(ITypedReferenceableInstance... entitiesUpdated) throws RepositoryException {
LOG.info("updating entity {}", entitiesUpdated);
try {
TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper(graphToInstanceMapper);
......@@ -292,11 +295,11 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@Override
@GraphTransaction
public void updatePartial(ITypedReferenceableInstance entity) throws RepositoryException {
public TypeUtils.Pair<List<String>, List<String>> updatePartial(ITypedReferenceableInstance entity) throws RepositoryException {
LOG.info("updating entity {}", entity);
try {
TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper(graphToInstanceMapper);
instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.UPDATE_PARTIAL, entity);
return instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.UPDATE_PARTIAL, entity);
} catch (AtlasException e) {
throw new RepositoryException(e);
}
......
......@@ -71,13 +71,13 @@ public final class GraphHelper {
return INSTANCE;
}
public Vertex createVertexWithIdentity(ITypedReferenceableInstance typedInstance,
Set<String> superTypeNames) {
public Vertex createVertexWithIdentity(ITypedReferenceableInstance typedInstance, Set<String> superTypeNames) {
final String guid = UUID.randomUUID().toString();
final Vertex vertexWithIdentity = createVertexWithoutIdentity(typedInstance.getTypeName(),
typedInstance.getId(), superTypeNames);
new Id(guid, 0 , typedInstance.getTypeName()), superTypeNames);
// add identity
final String guid = UUID.randomUUID().toString();
setProperty(vertexWithIdentity, Constants.GUID_PROPERTY_KEY, guid);
// add version information
......
......@@ -41,8 +41,8 @@ import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.ObjectGraphWalker;
import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.typesystem.types.TypeUtils;
import org.apache.atlas.utils.MD5Utils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -79,36 +79,40 @@ public final class TypedInstanceToGraphMapper {
this.graphToTypedInstanceMapper = graphToTypedInstanceMapper;
}
String[] mapTypedInstanceToGraph(Operation operation, ITypedReferenceableInstance... typedInstances)
TypeUtils.Pair<List<String>, List<String>> mapTypedInstanceToGraph(Operation operation, ITypedReferenceableInstance... typedInstances)
throws AtlasException {
List<String> guids = new ArrayList<>();
List<String> createdIds = new ArrayList<>();
List<String> updatedIds = new ArrayList<>();
for (ITypedReferenceableInstance typedInstance : typedInstances) {
Collection<IReferenceableInstance> newInstances = walkClassInstances(typedInstance);
Pair<List<ITypedReferenceableInstance>, List<ITypedReferenceableInstance>> instancesPair =
TypeUtils.Pair<List<ITypedReferenceableInstance>, List<ITypedReferenceableInstance>> instancesPair =
createVerticesAndDiscoverInstances(newInstances);
switch (operation) {
case CREATE:
addOrUpdateAttributesAndTraits(operation, instancesPair.getLeft());
addFullTextProperty(instancesPair.getLeft());
List<String> ids = addOrUpdateAttributesAndTraits(operation, instancesPair.left);
createdIds.addAll(ids);
addFullTextProperty(instancesPair.left);
break;
case UPDATE_FULL:
case UPDATE_PARTIAL:
List<ITypedReferenceableInstance> instancesForUpdate = instancesPair.getLeft();
instancesForUpdate.addAll(instancesPair.getRight());
addOrUpdateAttributesAndTraits(operation, instancesForUpdate);
addFullTextProperty(instancesForUpdate);
ids = addOrUpdateAttributesAndTraits(Operation.CREATE, instancesPair.left);
createdIds.addAll(ids);
ids = addOrUpdateAttributesAndTraits(operation, instancesPair.right);
updatedIds.addAll(ids);
addFullTextProperty(instancesPair.left);
addFullTextProperty(instancesPair.right);
break;
case DELETE:
throw new UnsupportedOperationException("Not handled - " + operation);
}
//Return guid for
addToGuids(typedInstance, guids);
}
return guids.toArray(new String[guids.size()]);
return TypeUtils.Pair.of(createdIds, updatedIds);
}
private Collection<IReferenceableInstance> walkClassInstances(ITypedReferenceableInstance typedInstance)
......@@ -126,18 +130,21 @@ public final class TypedInstanceToGraphMapper {
return entityProcessor.getInstances();
}
private void addOrUpdateAttributesAndTraits(Operation operation, List<ITypedReferenceableInstance> instances) throws AtlasException {
private List<String> addOrUpdateAttributesAndTraits(Operation operation, List<ITypedReferenceableInstance> instances) throws AtlasException {
List<String> guids = new ArrayList<>();
for (ITypedReferenceableInstance instance : instances) {
try {
//new vertex, set all the properties
addOrUpdateAttributesAndTraits(operation, instance);
String guid = addOrUpdateAttributesAndTraits(operation, instance);
guids.add(guid);
} catch (SchemaViolationException e) {
throw new EntityExistsException(instance, e);
}
}
return guids;
}
private void addOrUpdateAttributesAndTraits(Operation operation, ITypedReferenceableInstance typedInstance)
private String addOrUpdateAttributesAndTraits(Operation operation, ITypedReferenceableInstance typedInstance)
throws AtlasException {
LOG.debug("Adding/Updating typed instance {}", typedInstance.getTypeName());
......@@ -158,6 +165,8 @@ public final class TypedInstanceToGraphMapper {
//TODO - Handle Trait updates
addTraits(typedInstance, instanceVertex, classType);
}
return getId(typedInstance)._getId();
}
private void mapInstanceToVertex(ITypedInstance typedInstance, Vertex instanceVertex,
......@@ -215,14 +224,16 @@ public final class TypedInstanceToGraphMapper {
}
}
private Pair<List<ITypedReferenceableInstance>, List<ITypedReferenceableInstance>> createVerticesAndDiscoverInstances(
private TypeUtils.Pair<List<ITypedReferenceableInstance>, List<ITypedReferenceableInstance>> createVerticesAndDiscoverInstances(
Collection<IReferenceableInstance> instances) throws AtlasException {
List<ITypedReferenceableInstance> instancesToCreate = new ArrayList<>();
List<ITypedReferenceableInstance> instancesToUpdate = new ArrayList<>();
for (IReferenceableInstance instance : instances) {
ITypedReferenceableInstance newInstance;
Id id = instance.getId();
if (!idToVertexMap.containsKey(id)) {
Vertex instanceVertex;
if (id.isAssigned()) { // has a GUID
......@@ -231,7 +242,9 @@ public final class TypedInstanceToGraphMapper {
throw new IllegalStateException(
String.format("%s is not of type ITypedReferenceableInstance", instance));
}
instancesToUpdate.add((ITypedReferenceableInstance) instance);
newInstance = (ITypedReferenceableInstance) instance;
instancesToUpdate.add(newInstance);
} else {
//Check if there is already an instance with the same unique attribute value
ClassType classType = typeSystem.getDataType(ClassType.class, instance.getTypeName());
......@@ -239,31 +252,28 @@ public final class TypedInstanceToGraphMapper {
//no entity with the given unique attribute, create new
if (instanceVertex == null) {
ITypedReferenceableInstance newInstance = classType.convert(instance, Multiplicity.REQUIRED);
newInstance = classType.convert(instance, Multiplicity.REQUIRED);
instanceVertex = graphHelper.createVertexWithIdentity(newInstance, classType.getAllSuperTypeNames());
instancesToCreate.add(newInstance);
//Map only unique attributes for cases of circular references
mapInstanceToVertex(newInstance, instanceVertex, classType.fieldMapping().fields, true, Operation.CREATE);
} else {
if (!(instance instanceof ReferenceableInstance)) {
throw new IllegalStateException(
String.format("%s is not of type ITypedReferenceableInstance", instance));
}
instancesToUpdate.add((ITypedReferenceableInstance) instance);
newInstance = (ITypedReferenceableInstance) instance;
instancesToUpdate.add(newInstance);
}
}
//Set the id in the new instance
idToVertexMap.put(id, instanceVertex);
}
}
return Pair.of(instancesToCreate, instancesToUpdate);
}
private void addToGuids(ITypedReferenceableInstance typedInstance, List<String> guids) {
Vertex instanceVertex = idToVertexMap.get(typedInstance.getId());
String guid = instanceVertex.getProperty(Constants.GUID_PROPERTY_KEY);
guids.add(guid);
return TypeUtils.Pair.of(instancesToCreate, instancesToUpdate);
}
private void addFullTextProperty(List<ITypedReferenceableInstance> instances) throws AtlasException {
......@@ -275,7 +285,8 @@ public final class TypedInstanceToGraphMapper {
}
}
private void addTraits(ITypedReferenceableInstance typedInstance, Vertex instanceVertex, ClassType classType) throws AtlasException {
private void addTraits(ITypedReferenceableInstance typedInstance, Vertex instanceVertex, ClassType classType)
throws AtlasException {
for (String traitName : typedInstance.getTraits()) {
LOG.debug("mapping trait {}", traitName);
GraphHelper.addProperty(instanceVertex, Constants.TRAIT_NAMES_PROPERTY_KEY, traitName);
......@@ -288,7 +299,8 @@ public final class TypedInstanceToGraphMapper {
/******************************************** STRUCT **************************************************/
private Pair<Vertex, Edge> updateStructVertex(ITypedStruct structInstance, Edge relEdge, Operation operation) throws AtlasException {
private TypeUtils.Pair<Vertex, Edge> updateStructVertex(ITypedStruct structInstance, Edge relEdge,
Operation operation) throws AtlasException {
//Already existing vertex. Update
Vertex structInstanceVertex = relEdge.getVertex(Direction.IN);
......@@ -303,10 +315,11 @@ public final class TypedInstanceToGraphMapper {
mapInstanceToVertex(structInstance, structInstanceVertex, structInstance.fieldMapping().fields, false, operation);
GraphHelper.setProperty(structInstanceVertex, SIGNATURE_HASH_PROPERTY_KEY, String.valueOf(newSignature));
}
return Pair.of(structInstanceVertex, relEdge);
return TypeUtils.Pair.of(structInstanceVertex, relEdge);
}
private Pair<Vertex, Edge> addStructVertex(ITypedStruct structInstance, Vertex instanceVertex, AttributeInfo attributeInfo, String edgeLabel) throws AtlasException {
private TypeUtils.Pair<Vertex, Edge> addStructVertex(ITypedStruct structInstance, Vertex instanceVertex,
AttributeInfo attributeInfo, String edgeLabel) throws AtlasException {
// add a new vertex for the struct or trait instance
Vertex structInstanceVertex = graphHelper.createVertexWithoutIdentity(structInstance.getTypeName(), null,
Collections.<String>emptySet()); // no super types for struct type
......@@ -317,7 +330,7 @@ public final class TypedInstanceToGraphMapper {
// add an edge to the newly created vertex from the parent
Edge relEdge = graphHelper.addEdge(instanceVertex, structInstanceVertex, edgeLabel);
return Pair.of(structInstanceVertex, relEdge);
return TypeUtils.Pair.of(structInstanceVertex, relEdge);
}
/******************************************** ARRAY **************************************************/
......@@ -443,7 +456,7 @@ public final class TypedInstanceToGraphMapper {
private String addOrUpdateStruct(Vertex instanceVertex, AttributeInfo attributeInfo, IDataType elementType,
ITypedStruct structAttr, String curVal,
String edgeLabel, Operation operation) throws AtlasException {
Pair<Vertex, Edge> vertexEdgePair = null;
TypeUtils.Pair<Vertex, Edge> vertexEdgePair = null;
if (curVal != null && structAttr == null) {
//remove edge
removeUnusedReference(curVal, attributeInfo, elementType);
......@@ -456,7 +469,7 @@ public final class TypedInstanceToGraphMapper {
vertexEdgePair = addStructVertex(structAttr, instanceVertex, attributeInfo, edgeLabel);
}
return (vertexEdgePair != null) ? vertexEdgePair.getRight().getId().toString() : null;
return (vertexEdgePair != null) ? vertexEdgePair.right.getId().toString() : null;
}
private String addOrUpdateClassVertex(Vertex instanceVertex, AttributeInfo attributeInfo, IDataType elementType,
......@@ -468,27 +481,28 @@ public final class TypedInstanceToGraphMapper {
throw new EntityNotFoundException("Could not find vertex for Class Reference " + newVal);
}
Pair<Vertex, Edge> vertexEdgePair = null;
TypeUtils.Pair<Vertex, Edge> vertexEdgePair = null;
if (curVal != null && newVal == null) {
//remove edge
removeUnusedReference(curVal, attributeInfo, elementType);
} else if (curVal != null && newVal != null) {
Edge edge = graphHelper.getOutGoingEdgeById(curVal);
Id classRefId = getId(newVal);
vertexEdgePair = updateClassEdge(classRefId, newVal, instanceVertex, edge, toVertex, attributeInfo, elementType, edgeLabel, operation);
vertexEdgePair = updateClassEdge(classRefId, newVal, instanceVertex, edge, toVertex, attributeInfo,
elementType, edgeLabel, operation);
} else if (newVal != null){
vertexEdgePair = addClassEdge(instanceVertex, toVertex, edgeLabel);
}
return (vertexEdgePair != null) ? vertexEdgePair.getRight().getId().toString() : null;
return (vertexEdgePair != null) ? vertexEdgePair.right.getId().toString() : null;
}
/******************************************** CLASS **************************************************/
private Pair<Vertex, Edge> addClassEdge(Vertex instanceVertex, Vertex toVertex, String edgeLabel) throws AtlasException {
private TypeUtils.Pair<Vertex, Edge> addClassEdge(Vertex instanceVertex, Vertex toVertex, String edgeLabel) throws AtlasException {
// add an edge to the class vertex from the instance
Edge edge = graphHelper.addEdge(instanceVertex, toVertex, edgeLabel);
return Pair.of(toVertex, edge);
return TypeUtils.Pair.of(toVertex, edge);
}
private Vertex getClassVertex(ITypedReferenceableInstance typedReference) throws EntityNotFoundException {
......@@ -521,11 +535,11 @@ public final class TypedInstanceToGraphMapper {
}
private Pair<Vertex, Edge> updateClassEdge(Id id, final ITypedReferenceableInstance typedInstance,
private TypeUtils.Pair<Vertex, Edge> updateClassEdge(Id id, final ITypedReferenceableInstance typedInstance,
Vertex instanceVertex, Edge edge, Vertex toVertex,
AttributeInfo attributeInfo, IDataType dataType,
String edgeLabel, Operation operation) throws AtlasException {
Pair<Vertex, Edge> result = Pair.of(toVertex, edge);
TypeUtils.Pair<Vertex, Edge> result = TypeUtils.Pair.of(toVertex, edge);
Edge newEdge = edge;
// Update edge if it exists
Vertex invertex = edge.getVertex(Direction.IN);
......@@ -535,7 +549,7 @@ public final class TypedInstanceToGraphMapper {
// add an edge to the class vertex from the instance
if(toVertex != null) {
newEdge = graphHelper.addEdge(instanceVertex, toVertex, edgeLabel);
result = Pair.of(toVertex, newEdge);
result = TypeUtils.Pair.of(toVertex, newEdge);
}
removeUnusedReference(edge.getId().toString(), attributeInfo, dataType);
}
......
......@@ -23,16 +23,12 @@ import com.google.common.collect.ImmutableList;
import com.google.inject.Provider;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.repository.RepositoryException;
import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.typesystem.exception.TypeNotFoundException;
import org.apache.atlas.typesystem.persistence.ReferenceableInstance;
import org.apache.atlas.utils.ParamChecker;
import org.apache.atlas.classification.InterfaceAudience;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.listener.TypesChangeListener;
import org.apache.atlas.repository.IndexCreationException;
import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.RepositoryException;
import org.apache.atlas.repository.typestore.ITypeStore;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
......@@ -40,9 +36,12 @@ import org.apache.atlas.typesystem.ITypedStruct;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.typesystem.exception.TypeNotFoundException;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.typesystem.persistence.ReferenceableInstance;
import org.apache.atlas.typesystem.types.AttributeDefinition;
import org.apache.atlas.typesystem.types.AttributeInfo;
import org.apache.atlas.typesystem.types.ClassType;
......@@ -54,25 +53,24 @@ import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.StructTypeDefinition;
import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.typesystem.types.TypeUtils;
import org.apache.atlas.typesystem.types.ValueConversionException;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.apache.atlas.utils.ParamChecker;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.actors.threadpool.Arrays;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Simple wrapper over TypeSystem and MetadataRepository services with hooks
......@@ -279,17 +277,10 @@ public class DefaultMetadataService implements MetadataService {
ITypedReferenceableInstance[] typedInstances = deserializeClassInstances(entityInstanceDefinition);
final String[] guids = repository.createEntities(typedInstances);
Set<ITypedReferenceableInstance> entitites = new HashSet<>();
final List<String> guids = repository.createEntities(typedInstances);
for (String guid : guids) {
entitites.add(repository.getEntityDefinition(guid));
}
onEntitiesAdded(entitites);
return new JSONArray(Arrays.asList(guids)).toString();
onEntitiesAdded(guids);
return new JSONArray(guids).toString();
}
private ITypedReferenceableInstance[] deserializeClassInstances(String entityInstanceDefinition)
......@@ -390,14 +381,20 @@ public class DefaultMetadataService implements MetadataService {
ParamChecker.notEmpty(entityInstanceDefinition, "Entity instance definition cannot be empty");
ITypedReferenceableInstance[] typedInstances = deserializeClassInstances(entityInstanceDefinition);
String[] guids = repository.updateEntities(typedInstances);
onEntitiesAdded(Arrays.asList(typedInstances));
TypeUtils.Pair<List<String>, List<String>> guids = repository.updateEntities(typedInstances);
return onEntitiesAddedUpdated(guids);
}
private String onEntitiesAddedUpdated(TypeUtils.Pair<List<String>, List<String>> guids) throws AtlasException {
onEntitiesAdded(guids.left);
onEntitiesUpdated(guids.right);
return new JSONArray(Arrays.asList(guids)).toString();
guids.left.addAll(guids.right);
return new JSONArray(guids.left).toString();
}
@Override
public void updateEntityAttributeByGuid(final String guid, String attributeName, String value) throws AtlasException {
public String updateEntityAttributeByGuid(final String guid, String attributeName, String value) throws AtlasException {
ParamChecker.notEmpty(guid, "guid cannot be null");
ParamChecker.notEmpty(attributeName, "property cannot be null");
ParamChecker.notEmpty(value, "property value cannot be null");
......@@ -426,10 +423,8 @@ public class DefaultMetadataService implements MetadataService {
}
((ReferenceableInstance)newInstance).replaceWithNewId(new Id(guid, 0, newInstance.getTypeName()));
repository.updatePartial(newInstance);
onEntitiesUpdated(new ArrayList<ITypedReferenceableInstance>() {{
add(repository.getEntityDefinition(guid));
}});
TypeUtils.Pair<List<String>, List<String>> guids = repository.updatePartial(newInstance);
return onEntitiesAddedUpdated(guids);
}
private ITypedReferenceableInstance validateEntityExists(String guid)
......@@ -442,7 +437,7 @@ public class DefaultMetadataService implements MetadataService {
}
@Override
public void updateEntityPartialByGuid(final String guid, Referenceable newEntity) throws AtlasException {
public String updateEntityPartialByGuid(final String guid, Referenceable newEntity) throws AtlasException {
ParamChecker.notEmpty(guid, "guid cannot be null");
ParamChecker.notNull(newEntity, "updatedEntity cannot be null");
ITypedReferenceableInstance existInstance = validateEntityExists(guid);
......@@ -450,10 +445,8 @@ public class DefaultMetadataService implements MetadataService {
ITypedReferenceableInstance newInstance = convertToTypedInstance(newEntity, existInstance.getTypeName());
((ReferenceableInstance)newInstance).replaceWithNewId(new Id(guid, 0, newInstance.getTypeName()));
repository.updatePartial(newInstance);
onEntitiesUpdated(new ArrayList<ITypedReferenceableInstance>() {{
add(repository.getEntityDefinition(guid));
}});
TypeUtils.Pair<List<String>, List<String>> guids = repository.updatePartial(newInstance);
return onEntitiesAddedUpdated(guids);
}
private ITypedReferenceableInstance convertToTypedInstance(Referenceable updatedEntity, String typeName) throws AtlasException {
......@@ -511,13 +504,8 @@ public class DefaultMetadataService implements MetadataService {
final ITypedReferenceableInstance newInstance = convertToTypedInstance(updatedEntity, typeName);
((ReferenceableInstance)newInstance).replaceWithNewId(oldInstance.getId());
repository.updatePartial(newInstance);
onEntitiesUpdated(new ArrayList<ITypedReferenceableInstance>() {{
add(newInstance);
}});
return newInstance.getId()._getId();
TypeUtils.Pair<List<String>, List<String>> guids = repository.updatePartial(newInstance);
return onEntitiesAddedUpdated(guids);
}
private void validateTypeExists(String entityType) throws AtlasException {
......@@ -633,12 +621,22 @@ public class DefaultMetadataService implements MetadataService {
}
}
private void onEntitiesAdded(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
private void onEntitiesAdded(List<String> guids) throws AtlasException {
List<ITypedReferenceableInstance> entities = loadEntities(guids);
for (EntityChangeListener listener : entityChangeListeners) {
listener.onEntitiesAdded(entities);
}
}
private List<ITypedReferenceableInstance> loadEntities(List<String> guids) throws EntityNotFoundException,
RepositoryException {
List<ITypedReferenceableInstance> entities = new ArrayList<>();
for (String guid : guids) {
entities.add(repository.getEntityDefinition(guid));
}
return entities;
}
private void onTypesUpdated(Map<String, IDataType> typesUpdated) throws AtlasException {
Map<TypesChangeListener, Throwable> caughtExceptions = new HashMap<>();
for (Provider<TypesChangeListener> indexerProvider : typeChangeListeners) {
......@@ -656,8 +654,8 @@ public class DefaultMetadataService implements MetadataService {
}
}
private void onEntitiesUpdated(Collection<ITypedReferenceableInstance> entities)
throws AtlasException {
private void onEntitiesUpdated(List<String> guids) throws AtlasException {
List<ITypedReferenceableInstance> entities = loadEntities(guids);
for (EntityChangeListener listener : entityChangeListeners) {
listener.onEntitiesUpdated(entities);
}
......
......@@ -346,13 +346,10 @@ public class BaseHiveRepositoryTest {
return createInstance(referenceable, clsType);
}
private Id createInstance(Referenceable referenceable, ClassType clsType) throws Exception {
// String entityJSON = InstanceSerialization.toJson(referenceable, true);
ITypedReferenceableInstance typedInstance = clsType.convert(referenceable, Multiplicity.REQUIRED);
String guid = repository.createEntities(typedInstance)[0];
List<String> guids = repository.createEntities(typedInstance);
// return the reference to created instance with guid
return new Id(guid, 0, referenceable.getTypeName());
return new Id(guids.get(guids.size() - 1), 0, referenceable.getTypeName());
}
}
......@@ -118,10 +118,10 @@ public class GraphBackedMetadataRepositoryTest {
ClassType deptType = typeSystem.getDataType(ClassType.class, "Department");
ITypedReferenceableInstance hrDept2 = deptType.convert(hrDept, Multiplicity.REQUIRED);
String[] guids = repositoryService.createEntities(hrDept2);
List<String> guids = repositoryService.createEntities(hrDept2);
Assert.assertNotNull(guids);
Assert.assertEquals(guids.length, 1);
guid = guids[0];
Assert.assertEquals(guids.size(), 5);
guid = guids.get(4);
Assert.assertNotNull(guid);
}
......@@ -173,14 +173,12 @@ public class GraphBackedMetadataRepositoryTest {
ITypedReferenceableInstance db = dbType.convert(databaseInstance, Multiplicity.REQUIRED);
System.out.println("db = " + db);
String dbGUID = repositoryService.createEntities(db)[0];
System.out.println("added db = " + dbGUID);
Referenceable dbInstance = new Referenceable(dbGUID, TestUtils.DATABASE_TYPE, databaseInstance.getValuesMap());
ITypedReferenceableInstance table = createHiveTableInstance(dbInstance);
String tableGUID = repositoryService.createEntities(table)[0];
System.out.println("added table = " + tableGUID);
//Reuse the same database instance without id, with the same unique attribute
ITypedReferenceableInstance table = createHiveTableInstance(databaseInstance);
List<String> guids = repositoryService.createEntities(db, table);
Assert.assertEquals(guids.size(), 7); //1 db + 5 columns + 1 table. Shouldn't create db again
System.out.println("added db = " + guids.get(0));
System.out.println("added table = " + guids.get(6));
}
@Test(dependsOnMethods = "testCreateEntity")
......@@ -600,9 +598,10 @@ public class GraphBackedMetadataRepositoryTest {
ClassType deptType = typeSystem.getDataType(ClassType.class, "Department");
ITypedReferenceableInstance hrDept2 = deptType.convert(hrDept, Multiplicity.REQUIRED);
String[] guids = repositoryService.createEntities(hrDept2);
List<String> guids = repositoryService.createEntities(hrDept2);
Assert.assertNotNull(guids);
Assert.assertEquals(guids.length, 1);
Assert.assertNotNull(guids[0]);
Assert.assertEquals(guids.size(), 2);
Assert.assertNotNull(guids.get(0));
Assert.assertNotNull(guids.get(1));
}
}
......@@ -101,7 +101,7 @@ public class GraphRepoMapperScaleTest {
ClassType dbType = typeSystem.getDataType(ClassType.class, TestUtils.DATABASE_TYPE);
ITypedReferenceableInstance db = dbType.convert(databaseInstance, Multiplicity.REQUIRED);
dbGUID = repositoryService.createEntities(db)[0];
dbGUID = repositoryService.createEntities(db).get(0);
Referenceable dbInstance = new Referenceable(dbGUID, TestUtils.DATABASE_TYPE, databaseInstance.getValuesMap());
......
......@@ -54,6 +54,7 @@ import java.util.List;
import java.util.Map;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@Guice(modules = RepositoryMetadataModule.class)
......@@ -113,7 +114,11 @@ public class DefaultMetadataServiceTest {
JSONArray entitiesJson = new JSONArray();
entitiesJson.put(entityjson);
String response = metadataService.createEntities(entitiesJson.toString());
return new JSONArray(response).getString(0);
JSONArray guids = new JSONArray(response);
if (guids != null && guids.length() > 0) {
return guids.getString(0);
}
return null;
}
private String updateInstance(Referenceable entity) throws Exception {
......@@ -154,7 +159,7 @@ public class DefaultMetadataServiceTest {
//using the same name should succeed, but not create another entity
String newId = createInstance(entity);
Assert.assertEquals(newId, id);
assertNull(newId);
//Same entity, but different qualified name should succeed
entity.set("name", TestUtils.randomString());
......
......@@ -74,7 +74,7 @@ public interface MetadataService {
* Creates an entity, instance of the type.
*
* @param entityDefinition definition
* @return guid
* @return json array of guids of entities created
*/
String createEntities(String entityDefinition) throws AtlasException;
......@@ -107,25 +107,28 @@ public interface MetadataService {
/**
* Adds the property to the given entity id(guid).
* Currently supports updates only on PRIMITIVE, CLASS attribute types
*
* @param guid entity id
* @param guid entity id
* @param attribute property name
* @param value property value
* @return json array of guids of entities created/updated
*/
void updateEntityAttributeByGuid(String guid, String attribute, String value) throws AtlasException;
String updateEntityAttributeByGuid(String guid, String attribute, String value) throws AtlasException;
/**
* Supports Partial updates of an entity. Users can update a subset of attributes for an entity identified by its guid
* Note however that it cannot be used to set attribute values to null or delete attrbute values
*
* @param guid entity id
* @param entity
* @return json array of guids of entities created/updated
* @throws AtlasException
*/
void updateEntityPartialByGuid(String guid, Referenceable entity) throws AtlasException;
String updateEntityPartialByGuid(String guid, Referenceable entity) throws AtlasException;
/**
* Batch API - Adds/Updates the given entity id(guid).
*
* @param entityJson entity json
* @return List of guids which were updated and ones which were newly created as part of the updated entity
* @return json array of guids of entities created/updated
*/
String updateEntities(String entityJson) throws AtlasException;
......
......@@ -70,14 +70,18 @@ public class TypeUtils {
return b;
}
protected static class Pair<L, R> {
protected L left;
protected R right;
public static class Pair<L, R> {
public L left;
public R right;
public Pair(L left, R right) {
this.left = left;
this.right = right;
}
public static <L, R> Pair<L, R> of(L left, R right) {
return new Pair<>(left, right);
}
}
/**
......
......@@ -118,8 +118,12 @@ public class EntityResource {
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
response.put(AtlasClient.GUID, new JSONArray(guids));
response.put(AtlasClient.DEFINITION, new JSONObject(metadataService.getEntityDefinition(new JSONArray(guids).getString(0))));
JSONArray guidArray = new JSONArray(guids);
response.put(AtlasClient.GUID, guidArray);
if (guidArray.length() > 0) {
response.put(AtlasClient.DEFINITION,
new JSONObject(metadataService.getEntityDefinition(new JSONArray(guids).getString(0))));
}
return Response.created(locationURI).entity(response).build();
......
......@@ -58,7 +58,6 @@ public class EntityNotificationIT extends BaseResourceIT {
private static final String ENTITIES = "api/atlas/entities";
private static final String TRAITS = "traits";
private static final int MAX_WAIT_TIME = 10000;
private final String DATABASE_NAME = "db" + randomString();
private final String TABLE_NAME = "table" + randomString();
@Inject
......@@ -98,7 +97,7 @@ public class EntityNotificationIT extends BaseResourceIT {
final String guid = tableId._getId();
waitForNotification(MAX_WAIT_TIME);
waitForNotification(notificationConsumer, MAX_WAIT_TIME);
EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
......@@ -120,7 +119,7 @@ public class EntityNotificationIT extends BaseResourceIT {
serviceClient.updateEntityAttribute(guid, property, newValue);
waitForNotification(MAX_WAIT_TIME);
waitForNotification(notificationConsumer, MAX_WAIT_TIME);
EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
......@@ -155,7 +154,7 @@ public class EntityNotificationIT extends BaseResourceIT {
ClientResponse clientResponse = addTrait(guid, traitInstanceJSON);
assertEquals(clientResponse.getStatus(), Response.Status.CREATED.getStatusCode());
waitForNotification(MAX_WAIT_TIME);
waitForNotification(notificationConsumer, MAX_WAIT_TIME);
EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
......@@ -192,7 +191,7 @@ public class EntityNotificationIT extends BaseResourceIT {
clientResponse = addTrait(guid, traitInstanceJSON);
assertEquals(clientResponse.getStatus(), Response.Status.CREATED.getStatusCode());
waitForNotification(MAX_WAIT_TIME);
waitForNotification(notificationConsumer, MAX_WAIT_TIME);
entityNotification = notificationConsumer.getLastEntityNotification();
......@@ -218,7 +217,7 @@ public class EntityNotificationIT extends BaseResourceIT {
ClientResponse clientResponse = deleteTrait(guid, traitName);
Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
waitForNotification(MAX_WAIT_TIME);
waitForNotification(notificationConsumer, MAX_WAIT_TIME);
EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
......@@ -260,51 +259,4 @@ public class EntityNotificationIT extends BaseResourceIT {
return resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE)
.method(HttpMethod.DELETE, ClientResponse.class);
}
private void waitForNotification(int maxWait) throws Exception {
waitFor(maxWait, new Predicate() {
@Override
public boolean evaluate() throws Exception {
return notificationConsumer.getLastEntityNotification() != null;
}
});
}
// ----- inner class : EntityNotificationConsumer --------------------------
private static class EntityNotificationConsumer implements Runnable {
private final NotificationConsumer<EntityNotification> consumerIterator;
private EntityNotification entityNotification = null;
private boolean run;
public EntityNotificationConsumer(NotificationConsumer<EntityNotification> consumerIterator) {
this.consumerIterator = consumerIterator;
}
@Override
public void run() {
while (run && consumerIterator.hasNext()) {
entityNotification = consumerIterator.next();
}
}
public void reset() {
entityNotification = null;
}
public void start() {
Thread thread = new Thread(this);
run = true;
thread.start();
}
public void stop() {
run = false;
}
public EntityNotification getLastEntityNotification() {
return entityNotification;
}
}
}
......@@ -25,6 +25,8 @@ import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import org.apache.atlas.*;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.entity.EntityNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.TypesDef;
......@@ -68,6 +70,7 @@ public abstract class BaseResourceIT {
protected WebResource service;
protected AtlasClient serviceClient;
public static final Logger LOG = LoggerFactory.getLogger(BaseResourceIT.class);
protected static final int MAX_WAIT_TIME = 1000;
@BeforeClass
public void setUp() throws Exception {
......@@ -119,7 +122,10 @@ public abstract class BaseResourceIT {
System.out.println("created instance for type " + typeName + ", guid: " + guids);
// return the reference to created instance with guid
return new Id(guids.getString(0), 0, referenceable.getTypeName());
if (guids.length() > 0) {
return new Id(guids.getString(guids.length() - 1), 0, referenceable.getTypeName());
}
return null;
}
protected static final String DATABASE_TYPE = "hive_db";
......@@ -285,4 +291,50 @@ public abstract class BaseResourceIT {
throw new Exception("Waiting timed out after " + timeout + " msec");
}
}
// ----- inner class : EntityNotificationConsumer --------------------------
protected static class EntityNotificationConsumer implements Runnable {
private final NotificationConsumer<EntityNotification> consumerIterator;
private EntityNotification entityNotification = null;
private boolean run;
public EntityNotificationConsumer(NotificationConsumer<EntityNotification> consumerIterator) {
this.consumerIterator = consumerIterator;
}
@Override
public void run() {
while (run && consumerIterator.hasNext()) {
entityNotification = consumerIterator.next();
}
}
public void reset() {
entityNotification = null;
}
public void start() {
Thread thread = new Thread(this);
run = true;
thread.start();
}
public void stop() {
run = false;
}
public EntityNotification getLastEntityNotification() {
return entityNotification;
}
}
protected void waitForNotification(final EntityNotificationConsumer notificationConsumer, int maxWait) throws Exception {
waitFor(maxWait, new Predicate() {
@Override
public boolean evaluate() throws Exception {
return notificationConsumer.getLastEntityNotification() != null;
}
});
}
}
......@@ -19,10 +19,15 @@
package org.apache.atlas.web.resources;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule;
import org.apache.atlas.notification.entity.EntityNotification;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
......@@ -43,11 +48,14 @@ import org.apache.atlas.web.util.Servlets;
import org.apache.commons.lang.RandomStringUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.junit.AfterClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.ws.rs.HttpMethod;
......@@ -59,10 +67,13 @@ import java.util.Map;
import java.util.UUID;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
/**
* Integration tests for Entity Jersey Resource.
*/
@Guice(modules = NotificationModule.class)
public class EntityJerseyResourceIT extends BaseResourceIT {
private static final Logger LOG = LoggerFactory.getLogger(EntityJerseyResourceIT.class);
......@@ -76,11 +87,32 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
private Id tableId;
private String traitName;
@Inject
private NotificationInterface notificationInterface;
private EntityNotificationConsumer notificationConsumer;
@BeforeClass
public void setUp() throws Exception {
super.setUp();
createTypeDefinitions();
List<NotificationConsumer<EntityNotification>> consumers =
notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
NotificationConsumer<EntityNotification> consumer = consumers.iterator().next();
notificationConsumer = new EntityNotificationConsumer(consumer);
notificationConsumer.start();
}
@AfterClass
public void tearDown() {
notificationConsumer.stop();
}
@BeforeMethod
public void setupTest() {
notificationConsumer.reset();
}
@Test
......@@ -119,18 +151,32 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
@Test
public void testEntityDeduping() throws Exception {
Referenceable db = new Referenceable(DATABASE_TYPE);
String dbName = "db" + randomString();
final Referenceable db = new Referenceable(DATABASE_TYPE);
final String dbName = "db" + randomString();
db.set("name", dbName);
db.set("description", randomString());
serviceClient.createEntity(db);
serviceClient.createEntity(db).getString(0);
waitForNotification(notificationConsumer, MAX_WAIT_TIME);
EntityNotification notification = notificationConsumer.getLastEntityNotification();
assertNotNull(notification);
assertEquals(notification.getEntity().get("name"), dbName);
JSONArray results =
serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
assertEquals(results.length(), 1);
//create entity again shouldn't create another instance with same unique attribute value
notificationConsumer.reset();
serviceClient.createEntity(db);
try {
waitForNotification(notificationConsumer, MAX_WAIT_TIME);
fail("Expected time out exception");
} catch (Exception e) {
//expected timeout
}
results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
assertEquals(results.length(), 1);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment