Commit 44763bf9 by Ashutosh Mestry

ATLAS-2813: SoftRef implementation.

parent ab2043a8
...@@ -257,6 +257,9 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable { ...@@ -257,6 +257,9 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable {
public static class AtlasAttributeDef implements Serializable { public static class AtlasAttributeDef implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public static final String ATTRDEF_OPTION_SOFT_REFERENCE = "isSoftReference";
private final String STRING_TRUE = "true";
/** /**
* single-valued attribute or multi-valued attribute. * single-valued attribute or multi-valued attribute.
*/ */
...@@ -277,6 +280,7 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable { ...@@ -277,6 +280,7 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable {
private String description; private String description;
private List<AtlasConstraintDef> constraints; private List<AtlasConstraintDef> constraints;
private Map<String, String> options;
public AtlasAttributeDef() { this(null, null); } public AtlasAttributeDef() { this(null, null); }
...@@ -287,12 +291,12 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable { ...@@ -287,12 +291,12 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable {
public AtlasAttributeDef(String name, String typeName, boolean isOptional, Cardinality cardinality, public AtlasAttributeDef(String name, String typeName, boolean isOptional, Cardinality cardinality,
int valuesMinCount, int valuesMaxCount, boolean isUnique, boolean isIndexable, boolean includeInNotification, List<AtlasConstraintDef> constraints) { int valuesMinCount, int valuesMaxCount, boolean isUnique, boolean isIndexable, boolean includeInNotification, List<AtlasConstraintDef> constraints) {
this(name, typeName, isOptional, cardinality, valuesMinCount, valuesMaxCount, isUnique, isIndexable, includeInNotification, null, constraints, null); this(name, typeName, isOptional, cardinality, valuesMinCount, valuesMaxCount, isUnique, isIndexable, includeInNotification, null, constraints, null, null);
} }
public AtlasAttributeDef(String name, String typeName, boolean isOptional, Cardinality cardinality, public AtlasAttributeDef(String name, String typeName, boolean isOptional, Cardinality cardinality,
int valuesMinCount, int valuesMaxCount, boolean isUnique, boolean isIndexable, boolean includeInNotification, String defaultValue, int valuesMinCount, int valuesMaxCount, boolean isUnique, boolean isIndexable, boolean includeInNotification, String defaultValue,
List<AtlasConstraintDef> constraints, String description) { List<AtlasConstraintDef> constraints, Map<String,String> options, String description) {
setName(name); setName(name);
setTypeName(typeName); setTypeName(typeName);
setIsOptional(isOptional); setIsOptional(isOptional);
...@@ -304,6 +308,7 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable { ...@@ -304,6 +308,7 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable {
setIncludeInNotification(includeInNotification); setIncludeInNotification(includeInNotification);
setDefaultValue(defaultValue); setDefaultValue(defaultValue);
setConstraints(constraints); setConstraints(constraints);
setOptions(options);
setDescription(description); setDescription(description);
} }
...@@ -320,6 +325,7 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable { ...@@ -320,6 +325,7 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable {
setIncludeInNotification(other.getIncludeInNotification()); setIncludeInNotification(other.getIncludeInNotification());
setDefaultValue(other.getDefaultValue()); setDefaultValue(other.getDefaultValue());
setConstraints(other.getConstraints()); setConstraints(other.getConstraints());
setOptions(other.getOptions());
setDescription((other.getDescription())); setDescription((other.getDescription()));
} }
} }
...@@ -423,6 +429,23 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable { ...@@ -423,6 +429,23 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable {
cDefs.add(constraintDef); cDefs.add(constraintDef);
} }
public Map<String, String> getOptions() {
return options;
}
public void setOptions(Map<String, String> options) {
if (options != null) {
this.options = new HashMap<>(options);
} else {
this.options = null;
}
}
public boolean isSoftReferenced() {
return this.options != null &&
getOptions().containsKey(AtlasAttributeDef.ATTRDEF_OPTION_SOFT_REFERENCE) &&
getOptions().get(AtlasAttributeDef.ATTRDEF_OPTION_SOFT_REFERENCE).equals(STRING_TRUE);
}
public String getDescription() { public String getDescription() {
return description; return description;
...@@ -449,6 +472,7 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable { ...@@ -449,6 +472,7 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable {
sb.append(", isIndexable=").append(isIndexable); sb.append(", isIndexable=").append(isIndexable);
sb.append(", includeInNotification=").append(includeInNotification); sb.append(", includeInNotification=").append(includeInNotification);
sb.append(", defaultValue=").append(defaultValue); sb.append(", defaultValue=").append(defaultValue);
sb.append(", options='").append(options).append('\'');
sb.append(", constraints=["); sb.append(", constraints=[");
if (CollectionUtils.isNotEmpty(constraints)) { if (CollectionUtils.isNotEmpty(constraints)) {
int i = 0; int i = 0;
...@@ -482,12 +506,13 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable { ...@@ -482,12 +506,13 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable {
cardinality == that.cardinality && cardinality == that.cardinality &&
Objects.equals(defaultValue, that.defaultValue) && Objects.equals(defaultValue, that.defaultValue) &&
Objects.equals(description, that.description) && Objects.equals(description, that.description) &&
Objects.equals(constraints, that.constraints); Objects.equals(constraints, that.constraints) &&
Objects.equals(options, that.options);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(name, typeName, isOptional, cardinality, valuesMinCount, valuesMaxCount, isUnique, isIndexable, includeInNotification, defaultValue, constraints, description); return Objects.hash(name, typeName, isOptional, cardinality, valuesMinCount, valuesMaxCount, isUnique, isIndexable, includeInNotification, defaultValue, constraints, options, description);
} }
@Override @Override
......
...@@ -495,6 +495,10 @@ public class AtlasStructDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasStructDe ...@@ -495,6 +495,10 @@ public class AtlasStructDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasStructDe
attribInfo.put("defaultValue", attributeDef.getDefaultValue()); attribInfo.put("defaultValue", attributeDef.getDefaultValue());
attribInfo.put("description", attributeDef.getDescription()); attribInfo.put("description", attributeDef.getDescription());
if(attributeDef.getOptions() != null) {
attribInfo.put("options", AtlasType.toJson(attributeDef.getOptions()));
}
final int lower; final int lower;
final int upper; final int upper;
...@@ -536,6 +540,10 @@ public class AtlasStructDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasStructDe ...@@ -536,6 +540,10 @@ public class AtlasStructDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasStructDe
ret.setDefaultValue((String) attribInfo.get("defaultValue")); ret.setDefaultValue((String) attribInfo.get("defaultValue"));
ret.setDescription((String) attribInfo.get("description")); ret.setDescription((String) attribInfo.get("description"));
if(attribInfo.get("options") != null) {
ret.setOptions(AtlasType.fromJson((String) attribInfo.get("options"), Map.class));
}
if ((Boolean)attribInfo.get("isComposite")) { if ((Boolean)attribInfo.get("isComposite")) {
ret.addConstraint(new AtlasConstraintDef(AtlasConstraintDef.CONSTRAINT_TYPE_OWNED_REF)); ret.addConstraint(new AtlasConstraintDef(AtlasConstraintDef.CONSTRAINT_TYPE_OWNED_REF));
} }
......
...@@ -126,6 +126,10 @@ public class AttributeMutationContext { ...@@ -126,6 +126,10 @@ public class AttributeMutationContext {
return value; return value;
} }
public void setValue(Object value) {
this.value = value;
}
public String getVertexProperty() { return vertexProperty; } public String getVertexProperty() { return vertexProperty; }
public AtlasVertex getReferringVertex() { return referringVertex; } public AtlasVertex getReferringVertex() { return referringVertex; }
......
...@@ -88,6 +88,7 @@ import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelation ...@@ -88,6 +88,7 @@ import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelation
public class EntityGraphMapper { public class EntityGraphMapper {
private static final Logger LOG = LoggerFactory.getLogger(EntityGraphMapper.class); private static final Logger LOG = LoggerFactory.getLogger(EntityGraphMapper.class);
private static final String SOFT_REF_FORMAT = "%s:%s";
private static final int INDEXED_STR_SAFE_LEN = AtlasConfiguration.GRAPHSTORE_INDEXED_STRING_SAFE_LENGTH.getInt(); private static final int INDEXED_STR_SAFE_LEN = AtlasConfiguration.GRAPHSTORE_INDEXED_STRING_SAFE_LENGTH.getInt();
private final GraphHelper graphHelper = GraphHelper.getInstance(); private final GraphHelper graphHelper = GraphHelper.getInstance();
...@@ -392,6 +393,10 @@ public class EntityGraphMapper { ...@@ -392,6 +393,10 @@ public class EntityGraphMapper {
} }
case OBJECT_ID_TYPE: { case OBJECT_ID_TYPE: {
if (ctx.getAttributeDef().isSoftReferenced()) {
return mapSoftRefValueWithUpdate(ctx, context);
}
AtlasRelationshipEdgeDirection edgeDirection = ctx.getAttribute().getRelationshipEdgeDirection(); AtlasRelationshipEdgeDirection edgeDirection = ctx.getAttribute().getRelationshipEdgeDirection();
String edgeLabel = ctx.getAttribute().getRelationshipEdgeLabel(); String edgeLabel = ctx.getAttribute().getRelationshipEdgeLabel();
...@@ -471,6 +476,33 @@ public class EntityGraphMapper { ...@@ -471,6 +476,33 @@ public class EntityGraphMapper {
} }
} }
private Object mapSoftRefValue(AttributeMutationContext ctx, EntityMutationContext context) {
if(ctx.getValue() != null && !(ctx.getValue() instanceof AtlasObjectId)) {
LOG.warn("mapSoftRefValue: Was expecting AtlasObjectId, but found: {}", ctx.getValue().getClass());
return null;
}
String softRefValue = null;
if(ctx.getValue() != null) {
AtlasObjectId objectId = (AtlasObjectId) ctx.getValue();
String resolvedGuid = AtlasTypeUtil.isUnAssignedGuid(objectId.getGuid())
? context.getGuidAssignments().get(objectId.getGuid())
: objectId.getGuid();
softRefValue = String.format(SOFT_REF_FORMAT, objectId.getTypeName(), resolvedGuid);
}
return softRefValue;
}
private Object mapSoftRefValueWithUpdate(AttributeMutationContext ctx, EntityMutationContext context) {
String softRefValue = (String) mapSoftRefValue(ctx, context);
AtlasGraphUtilsV2.setProperty(ctx.getReferringVertex(), ctx.getVertexProperty(), softRefValue);
return softRefValue;
}
private void addInverseReference(EntityMutationContext context, AtlasAttribute inverseAttribute, AtlasEdge edge, Map<String, Object> relationshipAttributes) throws AtlasBaseException { private void addInverseReference(EntityMutationContext context, AtlasAttribute inverseAttribute, AtlasEdge edge, Map<String, Object> relationshipAttributes) throws AtlasBaseException {
AtlasStructType inverseType = inverseAttribute.getDefinedInType(); AtlasStructType inverseType = inverseAttribute.getDefinedInType();
AtlasVertex inverseVertex = edge.getInVertex(); AtlasVertex inverseVertex = edge.getInVertex();
...@@ -844,6 +876,7 @@ public class EntityGraphMapper { ...@@ -844,6 +876,7 @@ public class EntityGraphMapper {
AtlasAttribute attribute = ctx.getAttribute(); AtlasAttribute attribute = ctx.getAttribute();
Map<String, Object> currentMap = getMapElementsProperty(mapType, ctx.getReferringVertex(), ctx.getVertexProperty(), attribute); Map<String, Object> currentMap = getMapElementsProperty(mapType, ctx.getReferringVertex(), ctx.getVertexProperty(), attribute);
boolean isReference = isReference(mapType.getValueType()); boolean isReference = isReference(mapType.getValueType());
boolean isSoftReference = ctx.getAttribute().getAttributeDef().isSoftReferenced();
if (MapUtils.isNotEmpty(newVal)) { if (MapUtils.isNotEmpty(newVal)) {
String propertyName = ctx.getVertexProperty(); String propertyName = ctx.getVertexProperty();
...@@ -851,14 +884,14 @@ public class EntityGraphMapper { ...@@ -851,14 +884,14 @@ public class EntityGraphMapper {
if (isReference) { if (isReference) {
for (Map.Entry<Object, Object> entry : newVal.entrySet()) { for (Map.Entry<Object, Object> entry : newVal.entrySet()) {
String key = entry.getKey().toString(); String key = entry.getKey().toString();
AtlasEdge existingEdge = getEdgeIfExists(mapType, currentMap, key); AtlasEdge existingEdge = isSoftReference ? null : getEdgeIfExists(mapType, currentMap, key);
AttributeMutationContext mapCtx = new AttributeMutationContext(ctx.getOp(), ctx.getReferringVertex(), attribute, entry.getValue(), AttributeMutationContext mapCtx = new AttributeMutationContext(ctx.getOp(), ctx.getReferringVertex(), attribute, entry.getValue(),
propertyName, mapType.getValueType(), existingEdge); propertyName, mapType.getValueType(), existingEdge);
// Add/Update/Remove property value // Add/Update/Remove property value
Object newEntry = mapCollectionElementsToVertex(mapCtx, context); Object newEntry = mapCollectionElementsToVertex(mapCtx, context);
if (newEntry instanceof AtlasEdge) { if (!isSoftReference && newEntry instanceof AtlasEdge) {
AtlasEdge edge = (AtlasEdge) newEntry; AtlasEdge edge = (AtlasEdge) newEntry;
edge.setProperty(ATTRIBUTE_KEY_PROPERTY_KEY, key); edge.setProperty(ATTRIBUTE_KEY_PROPERTY_KEY, key);
...@@ -875,6 +908,10 @@ public class EntityGraphMapper { ...@@ -875,6 +908,10 @@ public class EntityGraphMapper {
newMap.put(key, newEntry); newMap.put(key, newEntry);
} }
if (isSoftReference) {
newMap.put(key, newEntry);
}
} }
Map<String, Object> finalMap = removeUnusedMapEntries(attribute, ctx.getReferringVertex(), currentMap, newMap); Map<String, Object> finalMap = removeUnusedMapEntries(attribute, ctx.getReferringVertex(), currentMap, newMap);
...@@ -885,6 +922,10 @@ public class EntityGraphMapper { ...@@ -885,6 +922,10 @@ public class EntityGraphMapper {
newVal.forEach((key, value) -> newMap.put(key.toString(), value)); newVal.forEach((key, value) -> newMap.put(key.toString(), value));
} }
if (isSoftReference) {
ctx.getReferringVertex().setProperty(propertyName, new HashMap<>(newMap));
}
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
...@@ -904,15 +945,16 @@ public class EntityGraphMapper { ...@@ -904,15 +945,16 @@ public class EntityGraphMapper {
AtlasArrayType arrType = (AtlasArrayType) attribute.getAttributeType(); AtlasArrayType arrType = (AtlasArrayType) attribute.getAttributeType();
AtlasType elementType = arrType.getElementType(); AtlasType elementType = arrType.getElementType();
boolean isReference = isReference(elementType); boolean isReference = isReference(elementType);
boolean isSoftReference = ctx.getAttribute().getAttributeDef().isSoftReferenced();
AtlasAttribute inverseRefAttribute = attribute.getInverseRefAttribute(); AtlasAttribute inverseRefAttribute = attribute.getInverseRefAttribute();
Cardinality cardinality = attribute.getAttributeDef().getCardinality(); Cardinality cardinality = attribute.getAttributeDef().getCardinality();
List<Object> newElementsCreated = new ArrayList<>(); List<Object> newElementsCreated = new ArrayList<>();
List<Object> currentElements; List<Object> currentElements;
if (isReference) { if (isReference && !isSoftReference) {
currentElements = (List) getCollectionElementsUsingRelationship(ctx.getReferringVertex(), attribute); currentElements = (List) getCollectionElementsUsingRelationship(ctx.getReferringVertex(), attribute);
} else { } else {
currentElements = (List) getArrayElementsProperty(elementType, ctx.getReferringVertex(), ctx.getVertexProperty()); currentElements = (List) getArrayElementsProperty(elementType, isSoftReference, ctx.getReferringVertex(), ctx.getVertexProperty());
} }
if (CollectionUtils.isNotEmpty(newElements)) { if (CollectionUtils.isNotEmpty(newElements)) {
...@@ -921,7 +963,7 @@ public class EntityGraphMapper { ...@@ -921,7 +963,7 @@ public class EntityGraphMapper {
} }
for (int index = 0; index < newElements.size(); index++) { for (int index = 0; index < newElements.size(); index++) {
AtlasEdge existingEdge = getEdgeAt(currentElements, index, elementType); AtlasEdge existingEdge = (isSoftReference) ? null : getEdgeAt(currentElements, index, elementType);
AttributeMutationContext arrCtx = new AttributeMutationContext(ctx.getOp(), ctx.getReferringVertex(), ctx.getAttribute(), newElements.get(index), AttributeMutationContext arrCtx = new AttributeMutationContext(ctx.getOp(), ctx.getReferringVertex(), ctx.getAttribute(), newElements.get(index),
ctx.getVertexProperty(), elementType, existingEdge); ctx.getVertexProperty(), elementType, existingEdge);
...@@ -940,7 +982,7 @@ public class EntityGraphMapper { ...@@ -940,7 +982,7 @@ public class EntityGraphMapper {
} }
} }
if (isReference) { if (isReference && !isSoftReference) {
List<AtlasEdge> additionalEdges = removeUnusedArrayEntries(attribute, (List) currentElements, (List) newElementsCreated, ctx.getReferringVertex()); List<AtlasEdge> additionalEdges = removeUnusedArrayEntries(attribute, (List) currentElements, (List) newElementsCreated, ctx.getReferringVertex());
newElementsCreated.addAll(additionalEdges); newElementsCreated.addAll(additionalEdges);
} }
...@@ -955,7 +997,7 @@ public class EntityGraphMapper { ...@@ -955,7 +997,7 @@ public class EntityGraphMapper {
} }
// for dereference on way out // for dereference on way out
setArrayElementsProperty(elementType, ctx.getReferringVertex(), ctx.getVertexProperty(), newElementsCreated); setArrayElementsProperty(elementType, isSoftReference, ctx.getReferringVertex(), ctx.getVertexProperty(), newElementsCreated);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("<== mapArrayValue({})", ctx); LOG.debug("<== mapArrayValue({})", ctx);
...@@ -1020,6 +1062,10 @@ public class EntityGraphMapper { ...@@ -1020,6 +1062,10 @@ public class EntityGraphMapper {
case OBJECT_ID_TYPE: case OBJECT_ID_TYPE:
AtlasEntityType instanceType = getInstanceType(ctx.getValue()); AtlasEntityType instanceType = getInstanceType(ctx.getValue());
ctx.setElementType(instanceType); ctx.setElementType(instanceType);
if (ctx.getAttributeDef().isSoftReferenced()) {
return mapSoftRefValue(ctx, context);
}
return mapObjectIdValueUsingRelationship(ctx, context); return mapObjectIdValueUsingRelationship(ctx, context);
default: default:
...@@ -1220,8 +1266,8 @@ public class EntityGraphMapper { ...@@ -1220,8 +1266,8 @@ public class EntityGraphMapper {
return ret; return ret;
} }
public static List<Object> getArrayElementsProperty(AtlasType elementType, AtlasVertex vertex, String vertexPropertyName) { public static List<Object> getArrayElementsProperty(AtlasType elementType, boolean isSoftReference, AtlasVertex vertex, String vertexPropertyName) {
if (isReference(elementType)) { if (!isSoftReference && isReference(elementType)) {
return (List)vertex.getListProperty(vertexPropertyName, AtlasEdge.class); return (List)vertex.getListProperty(vertexPropertyName, AtlasEdge.class);
} }
else { else {
...@@ -1269,8 +1315,8 @@ public class EntityGraphMapper { ...@@ -1269,8 +1315,8 @@ public class EntityGraphMapper {
return Collections.emptyList(); return Collections.emptyList();
} }
private void setArrayElementsProperty(AtlasType elementType, AtlasVertex vertex, String vertexPropertyName, List<Object> values) { private void setArrayElementsProperty(AtlasType elementType, boolean isSoftReference, AtlasVertex vertex, String vertexPropertyName, List<Object> values) {
if (!isReference(elementType)) { if (!isReference(elementType) || isSoftReference) {
AtlasGraphUtilsV2.setEncodedProperty(vertex, vertexPropertyName, values); AtlasGraphUtilsV2.setEncodedProperty(vertex, vertexPropertyName, values);
} }
} }
...@@ -1292,7 +1338,7 @@ public class EntityGraphMapper { ...@@ -1292,7 +1338,7 @@ public class EntityGraphMapper {
} }
private void updateInConsistentOwnedMapVertices(AttributeMutationContext ctx, AtlasMapType mapType, Object val) { private void updateInConsistentOwnedMapVertices(AttributeMutationContext ctx, AtlasMapType mapType, Object val) {
if (mapType.getValueType().getTypeCategory() == TypeCategory.OBJECT_ID_TYPE) { if (mapType.getValueType().getTypeCategory() == TypeCategory.OBJECT_ID_TYPE && !ctx.getAttributeDef().isSoftReferenced()) {
AtlasEdge edge = (AtlasEdge) val; AtlasEdge edge = (AtlasEdge) val;
if (ctx.getAttribute().isOwnedRef() && getStatus(edge) == DELETED && getStatus(edge.getInVertex()) == DELETED) { if (ctx.getAttribute().isOwnedRef() && getStatus(edge) == DELETED && getStatus(edge.getInVertex()) == DELETED) {
......
...@@ -79,39 +79,12 @@ import static org.apache.atlas.glossary.GlossaryUtils.TERM_ASSIGNMENT_ATTR_EXPRE ...@@ -79,39 +79,12 @@ import static org.apache.atlas.glossary.GlossaryUtils.TERM_ASSIGNMENT_ATTR_EXPRE
import static org.apache.atlas.glossary.GlossaryUtils.TERM_ASSIGNMENT_ATTR_SOURCE; import static org.apache.atlas.glossary.GlossaryUtils.TERM_ASSIGNMENT_ATTR_SOURCE;
import static org.apache.atlas.glossary.GlossaryUtils.TERM_ASSIGNMENT_ATTR_STATUS; import static org.apache.atlas.glossary.GlossaryUtils.TERM_ASSIGNMENT_ATTR_STATUS;
import static org.apache.atlas.glossary.GlossaryUtils.TERM_ASSIGNMENT_ATTR_STEWARD; import static org.apache.atlas.glossary.GlossaryUtils.TERM_ASSIGNMENT_ATTR_STEWARD;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_BIGDECIMAL; import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.*;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_BIGINTEGER;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_BOOLEAN;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_BYTE;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_DATE;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_DOUBLE;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_FLOAT;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_INT;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_LONG;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_SHORT;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_STRING;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID; import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL; import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_VALIDITY_PERIODS_KEY; import static org.apache.atlas.repository.Constants.CLASSIFICATION_VALIDITY_PERIODS_KEY;
import static org.apache.atlas.repository.Constants.TERM_ASSIGNMENT_LABEL; import static org.apache.atlas.repository.Constants.TERM_ASSIGNMENT_LABEL;
import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX; import static org.apache.atlas.repository.graph.GraphHelper.*;
import static org.apache.atlas.repository.graph.GraphHelper.getAdjacentEdgesByLabel;
import static org.apache.atlas.repository.graph.GraphHelper.getAllClassificationEdges;
import static org.apache.atlas.repository.graph.GraphHelper.getAllTraitNames;
import static org.apache.atlas.repository.graph.GraphHelper.getBlockedClassificationIds;
import static org.apache.atlas.repository.graph.GraphHelper.getArrayElementsProperty;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityStatus;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertices;
import static org.apache.atlas.repository.graph.GraphHelper.getGuid;
import static org.apache.atlas.repository.graph.GraphHelper.getIncomingEdgesByLabel;
import static org.apache.atlas.repository.graph.GraphHelper.getPrimitiveMap;
import static org.apache.atlas.repository.graph.GraphHelper.getReferenceMap;
import static org.apache.atlas.repository.graph.GraphHelper.getOutGoingEdgesByLabel;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags;
import static org.apache.atlas.repository.graph.GraphHelper.getRelationshipGuid;
import static org.apache.atlas.repository.graph.GraphHelper.getRemovePropagations;
import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
import static org.apache.atlas.repository.graph.GraphHelper.isPropagationEnabled;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex; import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isReference; import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isReference;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection; import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
...@@ -131,6 +104,9 @@ public final class EntityGraphRetriever { ...@@ -131,6 +104,9 @@ public final class EntityGraphRetriever {
public static final String DESCRIPTION = "description"; public static final String DESCRIPTION = "description";
public static final String OWNER = "owner"; public static final String OWNER = "owner";
public static final String CREATE_TIME = "createTime"; public static final String CREATE_TIME = "createTime";
private static final String SOFT_REFERENCE_FORMAT_SEPERATOR = ":";
private static final int SOFT_REFERENCE_FORMAT_INDEX_TYPE_NAME = 0;
private static final int SOFT_REFERENCE_FORMAT_INDEX_GUID = 1;
public static final String QUALIFIED_NAME = "qualifiedName"; public static final String QUALIFIED_NAME = "qualifiedName";
private static final TypeReference<List<TimeBoundary>> TIME_BOUNDARIES_LIST_TYPE = new TypeReference<List<TimeBoundary>>() {}; private static final TypeReference<List<TimeBoundary>> TIME_BOUNDARIES_LIST_TYPE = new TypeReference<List<TimeBoundary>>() {};
...@@ -679,13 +655,25 @@ public final class EntityGraphRetriever { ...@@ -679,13 +655,25 @@ public final class EntityGraphRetriever {
ret = mapVertexToStruct(entityVertex, edgeLabel, null, entityExtInfo, isMinExtInfo); ret = mapVertexToStruct(entityVertex, edgeLabel, null, entityExtInfo, isMinExtInfo);
break; break;
case OBJECT_ID_TYPE: case OBJECT_ID_TYPE:
if(attribute.getAttributeDef().isSoftReferenced()) {
ret = mapVertexToObjectIdForSoftRef(entityVertex, attribute.getVertexPropertyName());
} else {
ret = mapVertexToObjectId(entityVertex, edgeLabel, null, entityExtInfo, isOwnedAttribute, edgeDirection, isMinExtInfo); ret = mapVertexToObjectId(entityVertex, edgeLabel, null, entityExtInfo, isOwnedAttribute, edgeDirection, isMinExtInfo);
}
break; break;
case ARRAY: case ARRAY:
if(attribute.getAttributeDef().isSoftReferenced()) {
ret = mapVertexToArrayForSoftRef(entityVertex, attribute.getVertexPropertyName());
} else {
ret = mapVertexToArray(entityVertex, entityExtInfo, isOwnedAttribute, attribute, isMinExtInfo); ret = mapVertexToArray(entityVertex, entityExtInfo, isOwnedAttribute, attribute, isMinExtInfo);
}
break; break;
case MAP: case MAP:
if(attribute.getAttributeDef().isSoftReferenced()) {
ret = mapVertexToMapForSoftRef(entityVertex, attribute.getVertexPropertyName());
} else {
ret = mapVertexToMap(entityVertex, entityExtInfo, isOwnedAttribute, attribute, isMinExtInfo); ret = mapVertexToMap(entityVertex, entityExtInfo, isOwnedAttribute, attribute, isMinExtInfo);
}
break; break;
case CLASSIFICATION: case CLASSIFICATION:
// do nothing // do nothing
...@@ -695,6 +683,76 @@ public final class EntityGraphRetriever { ...@@ -695,6 +683,76 @@ public final class EntityGraphRetriever {
return ret; return ret;
} }
private Map<String, AtlasObjectId> mapVertexToMapForSoftRef(AtlasVertex entityVertex, String propertyName) {
Map map = entityVertex.getProperty(propertyName, Map.class);
if (MapUtils.isEmpty(map)) {
return null;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Mapping map attribute {} for vertex {}", propertyName, entityVertex);
}
Map ret = new HashMap();
for (Object mapKey : map.keySet()) {
String softRefRaw = (String) map.get(mapKey);
AtlasObjectId mapValue = getAtlasObjectIdFromSoftRefFormat(softRefRaw);
if (mapValue != null) {
ret.put(mapKey, mapValue);
}
}
return ret;
}
private List<AtlasObjectId> mapVertexToArrayForSoftRef(AtlasVertex entityVertex, String propertyName) {
List rawValue = entityVertex.getListProperty(propertyName, List.class);
if (CollectionUtils.isEmpty(rawValue)) {
return null;
}
List list = (List) rawValue;
List<AtlasObjectId> objectIds = new ArrayList<>();
for (Object o : list) {
if (!(o instanceof String)) {
continue;
}
AtlasObjectId objectId = getAtlasObjectIdFromSoftRefFormat((String) o);
if(objectId == null) {
continue;
}
objectIds.add(objectId);
}
return objectIds;
}
private AtlasObjectId mapVertexToObjectIdForSoftRef(AtlasVertex entityVertex, String vertexPropertyName) {
String rawValue = AtlasGraphUtilsV2.getEncodedProperty(entityVertex, vertexPropertyName, String.class);
if(StringUtils.isEmpty(rawValue)) {
return null;
}
return getAtlasObjectIdFromSoftRefFormat(rawValue);
}
private AtlasObjectId getAtlasObjectIdFromSoftRefFormat(String rawValue) {
if(StringUtils.isEmpty(rawValue)) {
return null;
}
String[] objectIdParts = StringUtils.split(rawValue, SOFT_REFERENCE_FORMAT_SEPERATOR);
if(objectIdParts.length < 2) {
LOG.warn("Expecting value to be formatted for softRef. Instead found: {}", rawValue);
return null;
}
return new AtlasObjectId(objectIdParts[SOFT_REFERENCE_FORMAT_INDEX_GUID],
objectIdParts[SOFT_REFERENCE_FORMAT_INDEX_TYPE_NAME]);
}
private Map<String, Object> mapVertexToMap(AtlasVertex entityVertex, AtlasEntityExtInfo entityExtInfo, private Map<String, Object> mapVertexToMap(AtlasVertex entityVertex, AtlasEntityExtInfo entityExtInfo,
boolean isOwnedAttribute, AtlasAttribute attribute, final boolean isMinExtInfo) throws AtlasBaseException { boolean isOwnedAttribute, AtlasAttribute attribute, final boolean isMinExtInfo) throws AtlasBaseException {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v1;
import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.TestResourceFileUtils;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@Guice(modules = TestModules.SoftDeleteModule.class)
public class SoftReferenceTest {
private static final String TYPE_RDBMS_DB = "rdbms_db";
private static final String RDBMS_DB_FILE = "rdbms-db";
private static final String TYPE_RDBMS_STORAGE = "rdbms_storage";
private static final String TYPESDEF_FILE_NAME = "typesdef-soft-ref";
private static final String RDBMS_DB_STORAGE_PROPERTY = "sd";
private static final String RDBMS_DB_TABLES_PROPERTY = "tables";
private static final String RDBMS_DB_REGIONS_PROPERTY = "regions";
private static final String RDBMS_SD_PROPERTY = "rdbms_db.sd";
private static final String TYPE_RDBMS_TABLES = "rdbms_table";
@Inject
AtlasTypeRegistry typeRegistry;
@Inject
private AtlasTypeDefStore typeDefStore;
@Inject
private AtlasEntityStore entityStore;
private AtlasType dbType;
private String dbGuid;
private String storageGuid;
@Test
public void typeCreationFromFile() throws IOException, AtlasBaseException {
String typesDefJson = TestResourceFileUtils.getJson(TYPESDEF_FILE_NAME);
AtlasTypesDef typesDef = AtlasType.fromJson(typesDefJson, AtlasTypesDef.class);
assertNotNull(typesDef);
typeDefStore.createTypesDef(typesDef);
dbType = typeRegistry.getType(TYPE_RDBMS_DB);
assertNotNull(dbType);
AtlasEntityDef dbType = typeRegistry.getEntityDefByName(TYPE_RDBMS_DB);
assertNotNull(dbType);
assertTrue(dbType.getAttribute(RDBMS_DB_STORAGE_PROPERTY).isSoftReferenced());
assertTrue(dbType.getAttribute(RDBMS_DB_TABLES_PROPERTY).isSoftReferenced());
assertTrue(dbType.getAttribute(RDBMS_DB_REGIONS_PROPERTY).isSoftReferenced());
assertNotNull(typeRegistry.getEntityDefByName(TYPE_RDBMS_STORAGE));
assertNotNull(typeRegistry.getEntityDefByName(TYPE_RDBMS_TABLES));
}
@Test(dependsOnMethods = "typeCreationFromFile")
public void entityCreationUsingSoftRef() throws IOException, AtlasBaseException {
final int EXPECTED_ENTITY_COUNT = 6;
AtlasEntity.AtlasEntityWithExtInfo dbEntity = AtlasType.fromJson(
TestResourceFileUtils.getJson(RDBMS_DB_FILE), AtlasEntity.AtlasEntityWithExtInfo.class);
EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(dbEntity), false);
assertNotNull(response);
assertTrue(response.getCreatedEntities().size() == EXPECTED_ENTITY_COUNT);
assertGraphStructure(response.getCreatedEntities().get(0).getGuid(),
response.getCreatedEntities().get(1).getGuid(), RDBMS_SD_PROPERTY);
dbGuid = response.getCreatedEntities().get(0).getGuid();
storageGuid = response.getCreatedEntities().get(1).getGuid();
}
@Test(dependsOnMethods = "entityCreationUsingSoftRef")
public void deletetingCollections() throws AtlasBaseException {
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = entityStore.getById(dbGuid);
assertNotNull(entityWithExtInfo);
List list = (List)entityWithExtInfo.getEntity().getAttribute(RDBMS_DB_TABLES_PROPERTY);
list.remove(1);
Map map = (Map) entityWithExtInfo.getEntity().getAttribute(RDBMS_DB_REGIONS_PROPERTY);
map.remove("east");
EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(entityWithExtInfo), true);
assertNotNull(response);
assertTrue(response.getPartialUpdatedEntities().size() > 0);
assertAttribute(dbGuid, storageGuid, 1, 1);
}
@Test(dependsOnMethods = "deletetingCollections")
public void addingCollections() throws AtlasBaseException {
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = entityStore.getById(dbGuid);
assertNotNull(entityWithExtInfo);
addNewTables(entityWithExtInfo);
addNewRegions(entityWithExtInfo);
EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(entityWithExtInfo), true);
assertNotNull(response);
assertTrue(response.getPartialUpdatedEntities().size() > 0);
assertAttribute(dbGuid, storageGuid, 3, 3);
}
private void addNewRegions(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException {
Map map = (Map) entityWithExtInfo.getEntity().getAttribute(RDBMS_DB_REGIONS_PROPERTY);
AtlasEntity region1 = getDefaultTableEntity("r1");
AtlasEntity region2 = getDefaultTableEntity("r2");
map.put("north", new AtlasObjectId(region1.getGuid(), region1.getTypeName()));
map.put("south", new AtlasObjectId(region2.getGuid(), region2.getTypeName()));
entityWithExtInfo.addReferredEntity(region1);
entityWithExtInfo.addReferredEntity(region2);
}
private void addNewTables(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException {
List list = (List)entityWithExtInfo.getEntity().getAttribute(RDBMS_DB_TABLES_PROPERTY);
AtlasEntity table1 = getDefaultTableEntity("newTable-1");
AtlasEntity table2 = getDefaultTableEntity("newTable-2");
entityWithExtInfo.addReferredEntity(table1);
entityWithExtInfo.addReferredEntity(table2);
list.add(new AtlasObjectId(table1.getGuid(), table1.getTypeName()));
list.add(new AtlasObjectId(table2.getGuid(), table2.getTypeName()));
}
private AtlasEntity getDefaultTableEntity(String name) throws AtlasBaseException {
AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(TYPE_RDBMS_TABLES);
AtlasEntity ret = type.createDefaultValue();
ret.setAttribute("name", name);
return ret;
}
private void assertGraphStructure(String dbGuid, String storageGuid, String propertyName) throws AtlasBaseException {
AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(dbGuid);
Iterator<AtlasEdge> edgesOut = vertex.getEdges(AtlasEdgeDirection.OUT).iterator();
Iterator<AtlasEdge> edgesIn = vertex.getEdges(AtlasEdgeDirection.IN).iterator();
String sd = AtlasGraphUtilsV2.getProperty(vertex, propertyName, String.class);
assertNotNull(sd);
assertAttribute(dbGuid, storageGuid, 2, 2);
assertFalse(edgesOut.hasNext());
assertFalse(edgesIn.hasNext());
assertNotNull(vertex);
}
private void assertAttribute(String dbGuid, String storageGuid, int expectedTableCount, int expectedRegionCount) throws AtlasBaseException {
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = entityStore.getById(dbGuid);
AtlasEntity entity = entityWithExtInfo.getEntity();
Object val = entity.getAttribute(RDBMS_DB_STORAGE_PROPERTY);
assertTrue(val instanceof AtlasObjectId);
assertEquals(((AtlasObjectId) val).getTypeName(), TYPE_RDBMS_STORAGE);
assertEquals(((AtlasObjectId) val).getGuid(), storageGuid);
assertNotNull(entity.getAttribute(RDBMS_DB_TABLES_PROPERTY));
assertEquals(((List) entity.getAttribute(RDBMS_DB_TABLES_PROPERTY)).size(), expectedTableCount);
assertNotNull(entity.getAttribute(RDBMS_DB_REGIONS_PROPERTY));
assertEquals(((Map) entity.getAttribute(RDBMS_DB_REGIONS_PROPERTY)).size(), expectedRegionCount);
}
}
{
"entity": {
"attributes": {
"name": "employee",
"sd": {
"guid": "-99288075821829",
"typeName": "rdbms_storage"
},
"tables": [
{
"guid": "-99288075821830",
"typeName": "rdbms_table"
},
{
"guid": "-99288075821831",
"typeName": "rdbms_table"
}
],
"regions": {
"west": {
"guid": "-99288075821832",
"typeName": "rdbms_table"
},
"east": {
"guid": "-99288075821833",
"typeName": "rdbms_table"
}
}
},
"classifications": [],
"typeName": "rdbms_db"
},
"referredEntities": {
"-99288075821829": {
"guid": "-99288075821829",
"typeName": "rdbms_storage",
"attributes": {
"name": "binary"
}
},
"-99288075821830": {
"guid": "-99288075821830",
"typeName": "rdbms_table",
"attributes": {
"name": "open"
}
},
"-99288075821831": {
"guid": "-99288075821831",
"typeName": "rdbms_table",
"attributes": {
"name": "close"
}
},
"-99288075821832": {
"guid": "-99288075821832",
"typeName": "rdbms_table",
"attributes": {
"name": "west"
}
},
"-99288075821833": {
"guid": "-99288075821833",
"typeName": "rdbms_table",
"attributes": {
"name": "east"
}
}
}
}
{
"enumDefs": [],
"structDefs": [],
"classificationDefs": [],
"entityDefs": [
{
"category": "ENTITY",
"name": "rdbms_table",
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "name",
"typeName": "string",
"isOptional": false,
"cardinality": "SINGLE",
"valuesMinCount": -1,
"valuesMaxCount": -1,
"isUnique": false,
"isIndexable": false
}
],
"superTypes": []
},
{
"category": "ENTITY",
"name": "rdbms_storage",
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "name",
"typeName": "string",
"isOptional": false,
"cardinality": "SINGLE",
"valuesMinCount": -1,
"valuesMaxCount": -1,
"isUnique": false,
"isIndexable": false
}
],
"superTypes": []
},
{
"category": "ENTITY",
"name": "rdbms_db",
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "name",
"typeName": "string",
"isOptional": false,
"cardinality": "SINGLE",
"valuesMinCount": -1,
"valuesMaxCount": -1,
"isUnique": true,
"isIndexable": true
},
{
"name": "sd",
"typeName": "rdbms_storage",
"isOptional": true,
"cardinality": "SINGLE",
"valuesMinCount": -1,
"valuesMaxCount": -1,
"isUnique": false,
"isIndexable": false,
"constraints": [
{
"type": "ownedRef"
}
],
"options": {
"isSoftReference": "true"
}
},
{
"name": "tables",
"typeName": "array<rdbms_table>",
"isOptional": true,
"cardinality": "SINGLE",
"valuesMinCount": -1,
"valuesMaxCount": -1,
"isUnique": false,
"isIndexable": false,
"constraints": [
{
"type": "ownedRef"
}
],
"options": {
"isSoftReference": "true"
}
},
{
"name": "regions",
"typeName": "map<string,rdbms_table>",
"isOptional": true,
"cardinality": "SINGLE",
"valuesMinCount": -1,
"valuesMaxCount": -1,
"isUnique": false,
"isIndexable": false,
"constraints": [
{
"type": "ownedRef"
}
],
"options": {
"isSoftReference": "true"
}
}
],
"superTypes": []
}
]
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment