Commit b445a1d2 by Sarath Subramanian

ATLAS-1984: Use AtlasRelatedObjectId to refer to relationship attributes during…

ATLAS-1984: Use AtlasRelatedObjectId to refer to relationship attributes during entity create/update
parent 2f6d7b24
......@@ -85,6 +85,12 @@ public class AtlasRelationship extends AtlasStruct implements Serializable {
init(nextInternalId(), end1, end2, null, null, null, null, null, null, 0L);
}
public AtlasRelationship(String typeName, AtlasObjectId end1, AtlasObjectId end2, Map<String, Object> attributes) {
super(typeName, attributes);
init(nextInternalId(), end1, end2, null, null, null, null, null, null, 0L);
}
public AtlasRelationship(String typeName, String attrName, Object attrValue) {
super(typeName, attrName, attrValue);
......
......@@ -126,6 +126,14 @@ public class AtlasStruct implements Serializable {
}
}
public void removeAttribute(String name) {
Map<String, Object> a = this.attributes;
if (a != null && a.containsKey(name)) {
a.remove(name);
}
}
public StringBuilder toString(StringBuilder sb) {
if (sb == null) {
sb = new StringBuilder();
......
......@@ -19,6 +19,7 @@ package org.apache.atlas.type;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
......@@ -330,13 +331,24 @@ public class AtlasStructType extends AtlasType {
if (value != null) {
ret = dataType.validateValue(value, fieldName, messages) && ret;
} else if (!attributeDef.getIsOptional()) {
ret = false;
messages.add(fieldName + ": mandatory attribute value missing in type " + getTypeName());
// if required attribute is null, check if attribute value specified in relationship
if (structObj instanceof AtlasEntity) {
AtlasEntity entityObj = (AtlasEntity) structObj;
if (entityObj.getRelationshipAttribute(attrName) == null) {
ret = false;
messages.add(fieldName + ": mandatory attribute value missing in type " + getTypeName());
}
} else {
ret = false;
messages.add(fieldName + ": mandatory attribute value missing in type " + getTypeName());
}
}
}
}
} else if (obj instanceof Map) {
Map attributes = AtlasTypeUtil.toStructAttributes((Map)obj);
Map attributes = AtlasTypeUtil.toStructAttributes((Map)obj);
Map relationshipAttributes = AtlasTypeUtil.toRelationshipAttributes((Map)obj);
for (AtlasAttributeDef attributeDef : structDef.getAttributeDefs()) {
String attrName = attributeDef.getName();
......@@ -350,8 +362,11 @@ public class AtlasStructType extends AtlasType {
if (value != null) {
ret = dataType.validateValue(value, fieldName, messages) && ret;
} else if (!attributeDef.getIsOptional()) {
ret = false;
messages.add(fieldName + ": mandatory attribute value missing in type " + getTypeName());
// if required attribute is null, check if attribute value specified in relationship
if (MapUtils.isEmpty(relationshipAttributes) || !relationshipAttributes.containsKey(attrName)) {
ret = false;
messages.add(fieldName + ": mandatory attribute value missing in type " + getTypeName());
}
}
}
}
......
......@@ -316,11 +316,13 @@ public class AtlasTypeUtil {
}
public static Map toRelationshipAttributes(Map map) {
Map ret = null;
if (map != null && map.containsKey("typeName") && map.containsKey("relationshipAttributes") && map.get("relationshipAttributes") instanceof Map) {
return (Map)map.get("relationshipAttributes");
ret = (Map) map.get("relationshipAttributes");
}
return map;
return ret;
}
public static AtlasObjectId getAtlasObjectId(AtlasEntity entity) {
......
......@@ -301,21 +301,38 @@ public class AtlasEntityGraphDiscoveryV1 implements EntityGraphDiscovery {
visitStruct(structType, struct);
}
void visitStruct(AtlasStructType structType, AtlasStruct struct) throws AtlasBaseException {
for (AtlasAttribute attribute : structType.getAllAttributes().values()) {
void visitEntity(AtlasEntityType entityType, AtlasEntity entity) throws AtlasBaseException {
List<String> visitedAttributes = new ArrayList<>();
// visit relationship attributes
for (AtlasAttribute attribute : entityType.getRelationshipAttributes().values()) {
AtlasType attrType = attribute.getAttributeType();
Object attrVal = struct.getAttribute(attribute.getName());
String attrName = attribute.getName();
Object attrVal = entity.getRelationshipAttribute(attrName);
visitAttribute(attrType, attrVal);
if (entity.hasRelationshipAttribute(attrName)) {
visitAttribute(attrType, attrVal);
visitedAttributes.add(attrName);
}
}
}
void visitEntity(AtlasEntityType entityType, AtlasEntity entity) throws AtlasBaseException {
visitStruct(entityType, entity);
// visit struct attributes
for (AtlasAttribute attribute : entityType.getAllAttributes().values()) {
AtlasType attrType = attribute.getAttributeType();
String attrName = attribute.getName();
Object attrVal = entity.getAttribute(attrName);
for (AtlasAttribute attribute : entityType.getRelationshipAttributes().values()) {
if (entity.hasAttribute(attrName) && !visitedAttributes.contains(attrName)) {
visitAttribute(attrType, attrVal);
}
}
}
void visitStruct(AtlasStructType structType, AtlasStruct struct) throws AtlasBaseException {
for (AtlasAttribute attribute : structType.getAllAttributes().values()) {
AtlasType attrType = attribute.getAttributeType();
Object attrVal = entity.getRelationshipAttribute(attribute.getName());
Object attrVal = struct.getAttribute(attribute.getName());
visitAttribute(attrType, attrVal);
}
......
......@@ -27,6 +27,7 @@ import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.instance.EntityMutationResponse;
......@@ -59,6 +60,7 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.*;
import static org.apache.atlas.model.instance.AtlasRelatedObjectId.KEY_RELATIONSHIP_ATTRIBUTES;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.CREATE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE;
......@@ -147,10 +149,12 @@ public class EntityGraphMapper {
AtlasVertex vertex = context.getVertex(guid);
AtlasEntityType entityType = context.getType(guid);
mapAttributes(createdEntity, vertex, CREATE, context);
compactAttributes(createdEntity);
mapRelationshipAttributes(createdEntity, vertex, CREATE, context);
mapAttributes(createdEntity, vertex, CREATE, context);
resp.addEntity(CREATE, constructHeader(createdEntity, entityType, vertex));
addClassifications(context, guid, createdEntity.getClassifications());
}
......@@ -162,10 +166,12 @@ public class EntityGraphMapper {
AtlasVertex vertex = context.getVertex(guid);
AtlasEntityType entityType = context.getType(guid);
mapAttributes(updatedEntity, vertex, UPDATE, context);
compactAttributes(updatedEntity);
mapRelationshipAttributes(updatedEntity, vertex, UPDATE, context);
mapAttributes(updatedEntity, vertex, UPDATE, context);
if (isPartialUpdate) {
resp.addEntity(PARTIAL_UPDATE, constructHeader(updatedEntity, entityType, vertex));
} else {
......@@ -354,16 +360,25 @@ public class EntityGraphMapper {
}
case OBJECT_ID_TYPE: {
String edgeLabel = ctx.getAttribute().getRelationshipEdgeLabel();
AtlasRelationshipEdgeDirection edgeDirection = ctx.getAttribute().getRelationshipEdgeDirection();
String edgeLabel = ctx.getAttribute().getRelationshipEdgeLabel();
// legacy case - if relationship attribute doesn't exist, use legacy edge label.
// if relationshipDefs doesn't exist, use legacy way of finding edge label.
if (StringUtils.isEmpty(edgeLabel)) {
edgeLabel = AtlasGraphUtilsV1.getEdgeLabel(ctx.getVertexProperty());
}
AtlasEdge currentEdge = graphHelper.getEdgeForLabel(ctx.getReferringVertex(), edgeLabel, edgeDirection);
AtlasEdge newEdge = null;
String relationshipGuid = getRelationshipGuid(ctx.getValue());
AtlasEdge currentEdge;
// if relationshipGuid is assigned in AtlasRelatedObjectId use it to fetch existing AtlasEdge
if (StringUtils.isNotEmpty(relationshipGuid)) {
currentEdge = graphHelper.getEdgeForGUID(relationshipGuid);
} else {
currentEdge = graphHelper.getEdgeForLabel(ctx.getReferringVertex(), edgeLabel, edgeDirection);
}
AtlasEdge newEdge = null;
if (ctx.getValue() != null) {
AtlasEntityType instanceType = getInstanceType(ctx.getValue());
......@@ -377,7 +392,7 @@ public class EntityGraphMapper {
// legacy case update inverse attribute
if (ctx.getAttribute().getInverseRefAttribute() != null) {
// Update the inverse reference using relationship on the target entity
addInverseReference(ctx.getAttribute().getInverseRefAttribute(), newEdge);
addInverseReference(ctx.getAttribute().getInverseRefAttribute(), newEdge, getRelationshipAttributes(ctx.getValue()));
}
}
......@@ -425,7 +440,7 @@ public class EntityGraphMapper {
}
}
private void addInverseReference(AtlasAttribute inverseAttribute, AtlasEdge edge) throws AtlasBaseException {
private void addInverseReference(AtlasAttribute inverseAttribute, AtlasEdge edge, Map<String, Object> relationshipAttributes) throws AtlasBaseException {
AtlasStructType inverseType = inverseAttribute.getDefinedInType();
AtlasVertex inverseVertex = edge.getInVertex();
String inverseEdgeLabel = inverseAttribute.getRelationshipEdgeLabel();
......@@ -433,7 +448,7 @@ public class EntityGraphMapper {
String propertyName = AtlasGraphUtilsV1.getQualifiedAttributePropertyKey(inverseType, inverseAttribute.getName());
// create new inverse reference
AtlasEdge newEdge = createInverseReferenceUsingRelationship(inverseAttribute, edge);
AtlasEdge newEdge = createInverseReferenceUsingRelationship(inverseAttribute, edge, relationshipAttributes);
boolean inverseUpdated = true;
switch (inverseAttribute.getAttributeType().getTypeCategory()) {
......@@ -480,7 +495,7 @@ public class EntityGraphMapper {
}
}
private AtlasEdge createInverseReferenceUsingRelationship(AtlasAttribute inverseAttribute, AtlasEdge edge) throws AtlasBaseException {
private AtlasEdge createInverseReferenceUsingRelationship(AtlasAttribute inverseAttribute, AtlasEdge edge, Map<String, Object> relationshipAttributes) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> createInverseReferenceUsingRelationship()");
}
......@@ -497,7 +512,7 @@ public class EntityGraphMapper {
if (entityType.hasRelationshipAttribute(inverseAttributeName)) {
String relationshipName = graphHelper.getRelationshipDefName(inverseVertex, entityType, inverseAttributeName);
ret = getOrCreateRelationship(inverseVertex, vertex, relationshipName, inverseAttribute);
ret = getOrCreateRelationship(inverseVertex, vertex, relationshipName, relationshipAttributes);
} else {
if (LOG.isDebugEnabled()) {
......@@ -657,8 +672,10 @@ public class EntityGraphMapper {
// use relationship to create/update edges
if (entityType.hasRelationshipAttribute(attributeName)) {
Map<String, Object> relationshipAttributes = getRelationshipAttributes(ctx.getValue());
if (ctx.getCurrentEdge() != null) {
ret = updateRelationship(ctx.getCurrentEdge(), attributeVertex, edgeDirection, ctx.getAttribute());
ret = updateRelationship(ctx.getCurrentEdge(), attributeVertex, edgeDirection, relationshipAttributes);
recordEntityUpdate(attributeVertex);
......@@ -677,7 +694,7 @@ public class EntityGraphMapper {
}
boolean relationshipExists = isRelationshipExists(fromVertex, toVertex, edgeLabel);
ret = getOrCreateRelationship(fromVertex, toVertex, relationshipName, ctx.getAttribute());
ret = getOrCreateRelationship(fromVertex, toVertex, relationshipName, relationshipAttributes);
// if relationship did not exist before and new relationship was created
// record entity update on both relationship vertices
......@@ -751,7 +768,8 @@ public class EntityGraphMapper {
// update the inverse reference value.
if (isReference && newEntry instanceof AtlasEdge && inverseRefAttribute != null) {
AtlasEdge newEdge = (AtlasEdge) newEntry;
addInverseReference(inverseRefAttribute, newEdge);
addInverseReference(inverseRefAttribute, newEdge, getRelationshipAttributes(ctx.getValue()));
}
}
}
......@@ -802,7 +820,8 @@ public class EntityGraphMapper {
if (isReference && newEntry instanceof AtlasEdge && inverseRefAttribute != null) {
// Update the inverse reference value.
AtlasEdge newEdge = (AtlasEdge) newEntry;
addInverseReference(inverseRefAttribute, newEdge);
addInverseReference(inverseRefAttribute, newEdge, getRelationshipAttributes(ctx.getValue()));
}
newElementsCreated.add(newEntry);
......@@ -941,6 +960,34 @@ public class EntityGraphMapper {
return null;
}
private static Map<String, Object> getRelationshipAttributes(Object val) throws AtlasBaseException {
if (val instanceof AtlasRelatedObjectId) {
AtlasStruct relationshipStruct = ((AtlasRelatedObjectId) val).getRelationshipAttributes();
return (relationshipStruct != null) ? relationshipStruct.getAttributes() : null;
} else if (val instanceof Map) {
Object relationshipStruct = ((Map) val).get(KEY_RELATIONSHIP_ATTRIBUTES);
if (relationshipStruct instanceof Map) {
return AtlasTypeUtil.toStructAttributes(((Map) relationshipStruct));
}
}
return null;
}
private static String getRelationshipGuid(Object val) throws AtlasBaseException {
if (val instanceof AtlasRelatedObjectId) {
return ((AtlasRelatedObjectId) val).getRelationshipGuid();
} else if (val instanceof Map) {
Object relationshipGuidVal = ((Map) val).get(AtlasRelatedObjectId.KEY_RELATIONSHIP_GUID);
return relationshipGuidVal != null ? relationshipGuidVal.toString() : null;
}
return null;
}
private AtlasEntityType getInstanceType(Object val) throws AtlasBaseException {
AtlasEntityType ret = null;
......@@ -1057,9 +1104,8 @@ public class EntityGraphMapper {
return newEdge;
}
private AtlasEdge updateRelationship(AtlasEdge currentEdge, final AtlasVertex newEntityVertex,
AtlasRelationshipEdgeDirection edgeDirection, AtlasAttribute attribute)
throws AtlasBaseException {
private AtlasEdge updateRelationship(AtlasEdge currentEdge, final AtlasVertex newEntityVertex, AtlasRelationshipEdgeDirection edgeDirection,
Map<String, Object> relationshipAttributes) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Updating entity reference using relationship {} for reference attribute {}", getTypeName(newEntityVertex));
}
......@@ -1083,8 +1129,8 @@ public class EntityGraphMapper {
relationshipName = currentEdge.getLabel();
}
ret = (edgeDirection == IN) ? getOrCreateRelationship(newEntityVertex, currentEdge.getInVertex(), relationshipName, attribute) :
getOrCreateRelationship(currentEdge.getOutVertex(), newEntityVertex, relationshipName, attribute);
ret = (edgeDirection == IN) ? getOrCreateRelationship(newEntityVertex, currentEdge.getInVertex(), relationshipName, relationshipAttributes) :
getOrCreateRelationship(currentEdge.getOutVertex(), newEntityVertex, relationshipName, relationshipAttributes);
}
return ret;
......@@ -1318,12 +1364,13 @@ public class EntityGraphMapper {
}
}
private AtlasEdge getOrCreateRelationship(AtlasVertex end1Vertex, AtlasVertex end2Vertex, String relationshipName, AtlasAttribute attribute) throws AtlasBaseException {
private AtlasEdge getOrCreateRelationship(AtlasVertex end1Vertex, AtlasVertex end2Vertex, String relationshipName,
Map<String, Object> relationshipAttributes) throws AtlasBaseException {
AtlasEdge ret = null;
AtlasObjectId end1 = new AtlasObjectId(getIdFromVertex(end1Vertex), AtlasGraphUtilsV1.getTypeName(end1Vertex));
AtlasObjectId end2 = new AtlasObjectId(getIdFromVertex(end2Vertex), AtlasGraphUtilsV1.getTypeName(end2Vertex));
AtlasRelationship relationship = relationshipStore.getOrCreate(new AtlasRelationship(relationshipName, end1, end2));
AtlasRelationship relationship = relationshipStore.getOrCreate(new AtlasRelationship(relationshipName, end1, end2, relationshipAttributes));
// return newly created AtlasEdge
// if multiple edges are returned, compare using guid to pick the right one
Iterator<AtlasEdge> outEdges = graphHelper.getOutGoingEdgesByLabel(end1Vertex, relationship.getLabel());
......@@ -1382,4 +1429,19 @@ public class EntityGraphMapper {
return ret;
}
private static void compactAttributes(AtlasEntity entity) {
if (entity != null) {
Map<String, Object> relationshipAttributes = entity.getRelationshipAttributes();
Map<String, Object> attributes = entity.getAttributes();
if (MapUtils.isNotEmpty(relationshipAttributes) && MapUtils.isNotEmpty(attributes)) {
for (String attrName : relationshipAttributes.keySet()) {
if (attributes.containsKey(attrName)) {
entity.removeAttribute(attrName);
}
}
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment