Commit 18019733 by Madhan Neethiraj

ATLAS-3055: updated entity create/update to handle relationship attributes consistently

parent 3841a1bf
......@@ -709,10 +709,10 @@ public class AtlasEntityType extends AtlasStructType {
String attributeName = attribute.getName();
AtlasAttributeDef attributeDef = attribute.getAttributeDef();
if (((AtlasEntity) obj).hasRelationshipAttribute(attributeName)) {
Object attributeValue = getNormalizedValue(entityObj.getAttribute(attributeName), attributeDef);
if (entityObj.hasRelationshipAttribute(attributeName)) {
Object attributeValue = getNormalizedValue(entityObj.getRelationshipAttribute(attributeName), attributeDef);
obj.setAttribute(attributeName, attributeValue);
entityObj.setRelationshipAttribute(attributeName, attributeValue);
}
}
}
......
......@@ -23,6 +23,7 @@ import org.aopalliance.intercept.MethodInvocation;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.exception.NotFoundException;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -33,6 +34,7 @@ import javax.ws.rs.core.Response;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
......@@ -47,6 +49,7 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
private static final ThreadLocal<List<PostTransactionHook>> postTransactionHooks = new ThreadLocal<>();
private static final ThreadLocal<Boolean> isTxnOpen = ThreadLocal.withInitial(() -> Boolean.FALSE);
private static final ThreadLocal<Boolean> innerFailure = ThreadLocal.withInitial(() -> Boolean.FALSE);
private static final ThreadLocal<Map<String, AtlasVertex>> guidVertexCache = ThreadLocal.withInitial(() -> new HashMap<>());
private final AtlasGraph graph;
......@@ -112,6 +115,7 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
// Reset the boolean flags
isTxnOpen.set(Boolean.FALSE);
innerFailure.set(Boolean.FALSE);
guidVertexCache.get().clear();
List<PostTransactionHook> trxHooks = postTransactionHooks.get();
......@@ -172,6 +176,24 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
OBJECT_UPDATE_SYNCHRONIZER.lockObject(guids);
}
public static void addToVertexCache(String guid, AtlasVertex vertex) {
Map<String, AtlasVertex> cache = guidVertexCache.get();
cache.put(guid, vertex);
}
public static void removeFromVertexCache(String guid) {
Map<String, AtlasVertex> cache = guidVertexCache.get();
cache.remove(guid);
}
public static AtlasVertex getVertexFromCache(String guid) {
Map<String, AtlasVertex> cache = guidVertexCache.get();
return cache.get(guid);
}
boolean logException(Throwable t) {
if (t instanceof AtlasBaseException) {
Response.Status httpCode = ((AtlasBaseException) t).getAtlasErrorCode().getHttpCode();
......
......@@ -88,13 +88,7 @@ public abstract class GlossaryUtils {
protected void createRelationship(AtlasRelationship relationship) throws AtlasBaseException {
try {
relationshipStore.create(relationship);
} catch (AtlasBaseException e) {
if (!e.getAtlasErrorCode().equals(AtlasErrorCode.RELATIONSHIP_ALREADY_EXISTS)) {
throw e;
}
}
relationshipStore.getOrCreate(relationship);
}
protected void updateRelationshipAttributes(AtlasRelationship relationship, AtlasRelatedTermHeader relatedTermHeader) {
......
......@@ -331,12 +331,15 @@ public class AtlasEntityGraphDiscoveryV2 implements EntityGraphDiscovery {
private void visitRelationships(AtlasEntityType entityType, AtlasEntity entity, List<String> visitedAttributes) throws AtlasBaseException {
for (AtlasAttribute attribute : entityType.getRelationshipAttributes().values()) {
AtlasType attrType = attribute.getAttributeType();
String attrName = attribute.getName();
Object attrVal = entity.getRelationshipAttribute(attrName);
// if attribute is not in 'relationshipAttributes', try 'attributes'
if (entity.hasRelationshipAttribute(attrName)) {
visitAttribute(attrType, attrVal);
visitAttribute(attribute.getAttributeType(), entity.getRelationshipAttribute(attrName));
visitedAttributes.add(attrName);
} else if (entity.hasAttribute(attrName)) {
visitAttribute(attribute.getAttributeType(), entity.getAttribute(attrName));
visitedAttributes.add(attrName);
}
......
......@@ -734,15 +734,11 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
boolean hasUpdates = false;
if (MapUtils.isNotEmpty(entity.getRelationshipAttributes())) {
hasUpdates = true; // if relationship attributes are provided, assume there is an update
}
if (!hasUpdates) {
hasUpdates = entity.getStatus() == AtlasEntity.Status.DELETED; // entity status could be updated during import
}
if (!hasUpdates) {
if (!hasUpdates && MapUtils.isNotEmpty(entity.getAttributes())) { // check for attribute value change
for (AtlasAttribute attribute : entityType.getAllAttributes().values()) {
if (!entity.getAttributes().containsKey(attribute.getName())) { // if value is not provided, current value will not be updated
continue;
......@@ -763,6 +759,27 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
}
}
if (!hasUpdates && MapUtils.isNotEmpty(entity.getRelationshipAttributes())) { // check of relationsship-attribute value change
for (AtlasAttribute attribute : entityType.getRelationshipAttributes().values()) {
if (!entity.getRelationshipAttributes().containsKey(attribute.getName())) { // if value is not provided, current value will not be updated
continue;
}
Object newVal = entity.getRelationshipAttribute(attribute.getName());
Object currVal = entityRetriever.getEntityAttribute(vertex, attribute);
if (!attribute.getAttributeType().areEqualValues(currVal, newVal, context.getGuidAssignments())) {
hasUpdates = true;
if (LOG.isDebugEnabled()) {
LOG.debug("found relationship attribute update: entity(guid={}, typeName={}), attrName={}, currValue={}, newValue={}", guid, entity.getTypeName(), attribute.getName(), currVal, newVal);
}
break;
}
}
}
// if classifications are to be replaced, then skip updates only when no change in classifications
if (!hasUpdates && replaceClassifications) {
List<AtlasClassification> newVal = entity.getClassifications();
......@@ -782,7 +799,9 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
entitiesToSkipUpdate = new ArrayList<>();
}
LOG.info("skipping unchanged entity: {}", entity);
if (LOG.isDebugEnabled()) {
LOG.debug("skipping unchanged entity: {}", entity);
}
entitiesToSkipUpdate.add(entity);
}
......
......@@ -20,6 +20,7 @@ package org.apache.atlas.repository.store.graph.v2;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
import org.apache.atlas.SortOrder;
import org.apache.atlas.discovery.SearchProcessor;
......@@ -324,14 +325,22 @@ public class AtlasGraphUtilsV2 {
}
public static AtlasVertex findByGuid(String guid) {
AtlasVertex ret = GraphTransactionInterceptor.getVertexFromCache(guid);
if (ret == null) {
AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
.has(Constants.GUID_PROPERTY_KEY, guid);
Iterator<AtlasVertex> results = query.vertices().iterator();
AtlasVertex vertex = results.hasNext() ? results.next() : null;
ret = results.hasNext() ? results.next() : null;
return vertex;
if (ret != null) {
GraphTransactionInterceptor.addToVertexCache(guid, ret);
}
}
return ret;
}
public static String getTypeNameFromGuid(String guid) {
......
......@@ -20,6 +20,7 @@ package org.apache.atlas.repository.store.graph.v2;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TimeBoundary;
......@@ -135,6 +136,8 @@ public class EntityGraphMapper {
AtlasGraphUtilsV2.setEncodedProperty(ret, GUID_PROPERTY_KEY, guid);
AtlasGraphUtilsV2.setEncodedProperty(ret, VERSION_PROPERTY_KEY, getEntityVersion(entity));
GraphTransactionInterceptor.addToVertexCache(guid, ret);
return ret;
}
......@@ -186,11 +189,11 @@ public class EntityGraphMapper {
AtlasVertex vertex = context.getVertex(guid);
AtlasEntityType entityType = context.getType(guid);
compactAttributes(createdEntity);
compactAttributes(createdEntity, entityType);
mapRelationshipAttributes(createdEntity, vertex, CREATE, context);
mapRelationshipAttributes(createdEntity, entityType, vertex, CREATE, context);
mapAttributes(createdEntity, vertex, CREATE, context);
mapAttributes(createdEntity, entityType, vertex, CREATE, context);
resp.addEntity(CREATE, constructHeader(createdEntity, entityType, vertex));
addClassifications(context, guid, createdEntity.getClassifications());
......@@ -203,11 +206,11 @@ public class EntityGraphMapper {
AtlasVertex vertex = context.getVertex(guid);
AtlasEntityType entityType = context.getType(guid);
compactAttributes(updatedEntity);
compactAttributes(updatedEntity, entityType);
mapRelationshipAttributes(updatedEntity, vertex, UPDATE, context);
mapRelationshipAttributes(updatedEntity, entityType, vertex, UPDATE, context);
mapAttributes(updatedEntity, vertex, UPDATE, context);
mapAttributes(updatedEntity, entityType, vertex, UPDATE, context);
if (isPartialUpdate) {
resp.addEntity(PARTIAL_UPDATE, constructHeader(updatedEntity, entityType, vertex));
......@@ -283,8 +286,11 @@ public class EntityGraphMapper {
return ret;
}
private void mapAttributes(AtlasStruct struct, AtlasVertex vertex, EntityOperation op, EntityMutationContext context) throws AtlasBaseException {
mapAttributes(struct, getStructType(struct.getTypeName()), vertex, op, context);
}
private void mapAttributes(AtlasStruct struct, AtlasStructType structType, AtlasVertex vertex, EntityOperation op, EntityMutationContext context) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> mapAttributes({}, {})", op, struct.getTypeName());
}
......@@ -292,8 +298,6 @@ public class EntityGraphMapper {
if (MapUtils.isNotEmpty(struct.getAttributes())) {
MetricRecorder metric = RequestContext.get().startMetricRecord("mapAttributes");
AtlasStructType structType = getStructType(struct.getTypeName());
if (op.equals(CREATE)) {
for (AtlasAttribute attribute : structType.getAllAttributes().values()) {
Object attrValue = struct.getAttribute(attribute.getName());
......@@ -325,7 +329,7 @@ public class EntityGraphMapper {
}
}
private void mapRelationshipAttributes(AtlasEntity entity, AtlasVertex vertex, EntityOperation op,
private void mapRelationshipAttributes(AtlasEntity entity, AtlasEntityType entityType, AtlasVertex vertex, EntityOperation op,
EntityMutationContext context) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> mapRelationshipAttributes({}, {})", op, entity.getTypeName());
......@@ -334,8 +338,6 @@ public class EntityGraphMapper {
if (MapUtils.isNotEmpty(entity.getRelationshipAttributes())) {
MetricRecorder metric = RequestContext.get().startMetricRecord("mapRelationshipAttributes");
AtlasEntityType entityType = getEntityType(entity.getTypeName());
if (op.equals(CREATE)) {
for (AtlasAttribute attribute : entityType.getRelationshipAttributes().values()) {
Object attrValue = entity.getRelationshipAttribute(attribute.getName());
......@@ -439,7 +441,7 @@ public class EntityGraphMapper {
AtlasEdge newEdge = null;
if (ctx.getValue() != null) {
AtlasEntityType instanceType = getInstanceType(ctx.getValue());
AtlasEntityType instanceType = getInstanceType(ctx.getValue(), context);
AtlasEdge edge = currentEdge != null ? currentEdge : null;
ctx.setElementType(instanceType);
......@@ -1090,7 +1092,7 @@ public class EntityGraphMapper {
return mapStructValue(ctx, context);
case OBJECT_ID_TYPE:
AtlasEntityType instanceType = getInstanceType(ctx.getValue());
AtlasEntityType instanceType = getInstanceType(ctx.getValue(), context);
ctx.setElementType(instanceType);
if (ctx.getAttributeDef().isSoftReferenced()) {
return mapSoftRefValue(ctx, context);
......@@ -1163,23 +1165,50 @@ public class EntityGraphMapper {
return null;
}
private AtlasEntityType getInstanceType(Object val) throws AtlasBaseException {
private AtlasEntityType getInstanceType(Object val, EntityMutationContext context) throws AtlasBaseException {
AtlasEntityType ret = null;
if (val != null) {
String typeName = null;
String guid = null;
if (val instanceof AtlasObjectId) {
typeName = ((AtlasObjectId)val).getTypeName();
AtlasObjectId objId = (AtlasObjectId) val;
typeName = objId.getTypeName();
guid = objId.getGuid();
} else if (val instanceof Map) {
Object typeNameVal = ((Map)val).get(AtlasObjectId.KEY_TYPENAME);
Map map = (Map) val;
Object typeNameVal = map.get(AtlasObjectId.KEY_TYPENAME);
Object guidVal = map.get(AtlasObjectId.KEY_GUID);
if (typeNameVal != null) {
typeName = typeNameVal.toString();
}
if (guidVal != null) {
guid = guidVal.toString();
}
}
ret = typeName != null ? typeRegistry.getEntityTypeByName(typeName) : null;
if (typeName == null) {
if (guid != null) {
ret = context.getType(guid);
if (ret == null) {
AtlasVertex vertex = context.getDiscoveryContext().getResolvedEntityVertex(guid);
if (vertex != null) {
typeName = AtlasGraphUtilsV2.getTypeName(vertex);
}
}
}
}
if (ret == null && typeName != null) {
ret = typeRegistry.getEntityTypeByName(typeName);
}
if (ret == null) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, val.toString());
......@@ -1868,15 +1897,28 @@ public class EntityGraphMapper {
}
}
private static void compactAttributes(AtlasEntity entity) {
// move/remove relationship-attributes present in 'attributes'
private static void compactAttributes(AtlasEntity entity, AtlasEntityType entityType) {
if (entity != null) {
Map<String, Object> relationshipAttributes = entity.getRelationshipAttributes();
Map<String, Object> attributes = entity.getAttributes();
for (AtlasAttribute attribute : entityType.getRelationshipAttributes().values()) {
String attrName = attribute.getName();
if (entity.hasAttribute(attrName)) {
Object attrValue = entity.getAttribute(attrName);
if (LOG.isDebugEnabled()) {
LOG.debug("relationship attribute {}.{} is present in entity, removing it", entityType.getTypeName(), attrName);
}
if (MapUtils.isNotEmpty(relationshipAttributes) && MapUtils.isNotEmpty(attributes)) {
for (String attrName : relationshipAttributes.keySet()) {
if (attributes.containsKey(attrName)) {
entity.removeAttribute(attrName);
if (attrValue != null) { // relationship attribute is present in 'attributes'
// if the attribute doesn't exist in relationshipAttributes, add it
Object relationshipAttrValue = entity.getRelationshipAttribute(attrName);
if (relationshipAttrValue == null) {
entity.setRelationshipAttribute(attrName, attrValue);
}
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment