Commit c7540b38 by Madhan Neethiraj

ATLAS-1557: IDBased resolver should attempt unique-attribute match as well

parent 2a93a6aa
......@@ -90,6 +90,7 @@ public class AtlasClassificationType extends AtlasStructType {
this.superTypes = Collections.unmodifiableList(s);
this.allSuperTypes = Collections.unmodifiableSet(allS);
this.allAttributes = Collections.unmodifiableMap(allA);
this.uniqAttributes = getUniqueAttributes(this.allAttributes);
this.allSubTypes = new HashSet<>(); // this will be populated in resolveReferencesPhase2()
}
......
......@@ -90,6 +90,7 @@ public class AtlasEntityType extends AtlasStructType {
this.superTypes = Collections.unmodifiableList(s);
this.allSuperTypes = Collections.unmodifiableSet(allS);
this.allAttributes = Collections.unmodifiableMap(allA);
this.uniqAttributes = getUniqueAttributes(this.allAttributes);
this.allSubTypes = new HashSet<>(); // this will be populated in resolveReferencesPhase2()
}
......
......@@ -29,6 +29,7 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -47,6 +48,7 @@ public class AtlasStructType extends AtlasType {
private final AtlasStructDef structDef;
protected Map<String, AtlasAttribute> allAttributes = Collections.emptyMap();
protected Map<String, AtlasAttribute> uniqAttributes = Collections.emptyMap();
public AtlasStructType(AtlasStructDef structDef) {
super(structDef);
......@@ -104,6 +106,7 @@ public class AtlasStructType extends AtlasType {
resolveConstraints(typeRegistry);
this.allAttributes = Collections.unmodifiableMap(a);
this.uniqAttributes = getUniqueAttributes(this.allAttributes);
}
private void resolveConstraints(AtlasTypeRegistry typeRegistry) throws AtlasBaseException {
......@@ -176,6 +179,10 @@ public class AtlasStructType extends AtlasType {
return allAttributes;
}
public Map<String, AtlasAttribute> getUniqAttributes() {
return uniqAttributes;
}
public AtlasAttribute getAttribute(String attributeName) {
return allAttributes.get(attributeName);
}
......@@ -403,6 +410,20 @@ public class AtlasStructType extends AtlasType {
return type instanceof AtlasEntityType ? (AtlasEntityType)type : null;
}
protected Map<String, AtlasAttribute> getUniqueAttributes(Map<String, AtlasAttribute> attributes) {
Map<String, AtlasAttribute> ret = new HashMap<>();
if (MapUtils.isNotEmpty(attributes)) {
for (AtlasAttribute attribute : attributes.values()) {
if (attribute.getAttributeDef().getIsUnique()) {
ret.put(attribute.getName(), attribute);
}
}
}
return Collections.unmodifiableMap(ret);
}
public static class AtlasAttribute {
private final AtlasStructType definedInType;
private final AtlasType attributeType;
......@@ -466,12 +487,7 @@ public class AtlasStructType extends AtlasType {
public String getInverseRefAttribute() { return inverseRefAttribute; }
private static String getQualifiedAttributeName(AtlasStructDef structDef, String attrName) {
final String typeName = structDef.getName();
return attrName.contains(".") ? attrName : String.format("%s.%s", typeName, attrName);
}
private static String encodePropertyKey(String key) {
public static String encodePropertyKey(String key) {
if (StringUtils.isBlank(key)) {
return key;
}
......@@ -483,7 +499,7 @@ public class AtlasStructType extends AtlasType {
return key;
}
private static String decodePropertyKey(String key) {
public static String decodePropertyKey(String key) {
if (StringUtils.isBlank(key)) {
return key;
}
......@@ -495,6 +511,11 @@ public class AtlasStructType extends AtlasType {
return key;
}
private static String getQualifiedAttributeName(AtlasStructDef structDef, String attrName) {
final String typeName = structDef.getName();
return attrName.contains(".") ? attrName : String.format("%s.%s", typeName, attrName);
}
private static String[][] RESERVED_CHAR_ENCODE_MAP = new String[][] {
new String[] { "{", "_o" },
new String[] { "}", "_c" },
......
......@@ -114,7 +114,7 @@ public class AtlasEntityGraphDiscoveryV1 implements EntityGraphDiscovery {
}
protected void resolveReferences() throws AtlasBaseException {
EntityResolver[] entityResolvers = new EntityResolver[] { new IDBasedEntityResolver(),
EntityResolver[] entityResolvers = new EntityResolver[] { new IDBasedEntityResolver(typeRegistry),
new UniqAttrBasedEntityResolver(typeRegistry)
};
......
......@@ -35,6 +35,7 @@ import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -208,7 +209,13 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
if (entity != null) {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
context.addUpdated(entity, entityType, vertex);
String guidVertex = AtlasGraphUtilsV1.getIdFromVertex(vertex);
if (!StringUtils.equals(guidVertex, guid)) { // if entity was found by unique attribute
entity.setGuid(guidVertex);
}
context.addUpdated(guid, entity, entityType, vertex);
RequestContextV1.get().recordEntityUpdate(entity.getAtlasObjectId());
}
......
......@@ -17,8 +17,6 @@
*/
package org.apache.atlas.repository.store.graph.v1;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
......@@ -34,8 +32,9 @@ import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -51,15 +50,6 @@ public class AtlasGraphUtilsV1 {
public static final String SUPERTYPE_EDGE_LABEL = PROPERTY_PREFIX + ".supertype";
public static final String VERTEX_TYPE = "typeSystem";
public static final BiMap<String, String> RESERVED_CHARS_ENCODE_MAP =
HashBiMap.create(new HashMap<String, String>() {{
put("{", "_o");
put("}", "_c");
put("\"", "_q");
put("$", "_d");
put("%", "_p");
}});
public static String getTypeDefPropertyKey(AtlasBaseTypeDef typeDef) {
return getTypeDefPropertyKey(typeDef.getName());
......@@ -120,25 +110,13 @@ public class AtlasGraphUtilsV1 {
}
public static String encodePropertyKey(String key) {
String ret = key;
if (StringUtils.isNotBlank(key)) {
for (String str : RESERVED_CHARS_ENCODE_MAP.keySet()) {
ret = ret.replace(str, RESERVED_CHARS_ENCODE_MAP.get(str));
}
}
String ret = AtlasStructType.AtlasAttribute.encodePropertyKey(key);
return ret;
}
public static String decodePropertyKey(String key) {
String ret = key;
if (StringUtils.isNotBlank(key)) {
for (String encodedStr : RESERVED_CHARS_ENCODE_MAP.values()) {
ret = ret.replace(encodedStr, RESERVED_CHARS_ENCODE_MAP.inverse().get(encodedStr));
}
}
String ret = AtlasStructType.AtlasAttribute.decodePropertyKey(key);
return ret;
}
......@@ -201,27 +179,81 @@ public class AtlasGraphUtilsV1 {
return returnType.cast(property);
}
public static AtlasVertex getVertexByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException {
AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query();
public static AtlasVertex getVertexByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> attrValues) throws AtlasBaseException {
AtlasVertex vertex = findByUniqueAttributes(entityType, attrValues);
if (vertex == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, entityType.getTypeName(),
attrValues.toString());
}
return vertex;
}
public static AtlasVertex findByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> attrValues) {
AtlasVertex vertex = null;
final Map<String, AtlasAttribute> uniqueAttributes = entityType.getUniqAttributes();
if (MapUtils.isNotEmpty(uniqueAttributes) && MapUtils.isNotEmpty(attrValues)) {
for (AtlasAttribute attribute : uniqueAttributes.values()) {
Object attrValue = attrValues.get(attribute.getName());
if (attrValue == null) {
continue;
}
vertex = AtlasGraphUtilsV1.findByTypeAndPropertyName(entityType.getTypeName(), attribute.getVertexPropertyName(), attrValue);
if (vertex == null) {
vertex = AtlasGraphUtilsV1.findBySuperTypeAndPropertyName(entityType.getTypeName(), attribute.getVertexPropertyName(), attrValue);
}
if (vertex != null) {
break;
}
}
}
return vertex;
}
public static AtlasVertex findByGuid(String guid) {
AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
.has(Constants.GUID_PROPERTY_KEY, guid)
.has(Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name());
Iterator<AtlasVertex> results = query.vertices().iterator();
for (Map.Entry<String, Object> e : uniqAttributes.entrySet()) {
String attrName = e.getKey();
Object attrValue = e.getValue();
AtlasVertex vertex = results.hasNext() ? results.next() : null;
query = query.has(entityType.getQualifiedAttributeName(attrName), attrValue);
return vertex;
}
Iterator<AtlasVertex> result = query.has(Constants.ENTITY_TYPE_PROPERTY_KEY, entityType.getTypeName())
public static AtlasVertex findByTypeAndPropertyName(String typeName, String propertyName, Object attrVal) {
AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
.has(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName)
.has(Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name())
.vertices().iterator();
AtlasVertex entityVertex = result.hasNext() ? result.next() : null;
.has(propertyName, attrVal);
if (entityVertex == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, entityType.getTypeName(),
uniqAttributes.toString());
Iterator<AtlasVertex> results = query.vertices().iterator();
AtlasVertex vertex = results.hasNext() ? results.next() : null;
return vertex;
}
return entityVertex;
public static AtlasVertex findBySuperTypeAndPropertyName(String typeName, String propertyName, Object attrVal) {
AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
.has(Constants.SUPER_TYPES_PROPERTY_KEY, typeName)
.has(Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name())
.has(propertyName, attrVal);
Iterator<AtlasVertex> results = query.vertices().iterator();
AtlasVertex vertex = results.hasNext() ? results.next() : null;
return vertex;
}
private static String toString(AtlasElement element) {
......
......@@ -286,18 +286,30 @@ public class EntityGraphMapper {
AtlasEdge ret = null;
if (ctx.getCurrentEdge() != null) {
updateVertex((AtlasStruct) ctx.getValue(), ctx.getCurrentEdge().getInVertex(), context);
AtlasStruct structVal = null;
if (ctx.getValue() instanceof AtlasStruct) {
structVal = (AtlasStruct)ctx.getValue();
} else if (ctx.getValue() instanceof Map) {
structVal = new AtlasStruct(ctx.getAttrType().getTypeName(), (Map) AtlasTypeUtil.toStructAttributes((Map)ctx.getValue()));
}
if (structVal != null) {
updateVertex(structVal, ctx.getCurrentEdge().getInVertex(), context);
}
ret = ctx.getCurrentEdge();
} else if (ctx.getValue() != null) {
String edgeLabel = AtlasGraphUtilsV1.getEdgeLabel(ctx.getVertexProperty());
AtlasStruct structVal = null;
if (ctx.getValue() instanceof AtlasStruct) {
ret = createVertex((AtlasStruct) ctx.getValue(), ctx.getReferringVertex(), edgeLabel, context);
structVal = (AtlasStruct) ctx.getValue();
} else if (ctx.getValue() instanceof Map) {
AtlasStruct stuct = new AtlasStruct(ctx.getAttrType().getTypeName(), (Map) AtlasTypeUtil.toStructAttributes((Map)ctx.getValue()));
structVal = new AtlasStruct(ctx.getAttrType().getTypeName(), (Map) AtlasTypeUtil.toStructAttributes((Map)ctx.getValue()));
}
ret = createVertex(stuct, ctx.getReferringVertex(), edgeLabel, context);
if (structVal != null) {
ret = createVertex(structVal, ctx.getReferringVertex(), edgeLabel, context);
}
}
......
......@@ -51,10 +51,15 @@ public class EntityMutationContext {
}
}
public void addUpdated(AtlasEntity entity, AtlasEntityType type, AtlasVertex atlasVertex) {
public void addUpdated(String internalGuid, AtlasEntity entity, AtlasEntityType type, AtlasVertex atlasVertex) {
entitiesUpdated.add(entity);
entityVsType.put(entity.getGuid(), type);
entityVsVertex.put(entity.getGuid(), atlasVertex);
if (!StringUtils.equals(internalGuid, entity.getGuid())) {
guidAssignments.put(internalGuid, entity.getGuid());
entityVsVertex.put(internalGuid, atlasVertex);
}
}
public EntityGraphDiscoveryContext getDiscoveryContext() {
......
......@@ -19,14 +19,14 @@ package org.apache.atlas.repository.store.graph.v1;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
import org.apache.atlas.repository.store.graph.EntityResolver;
import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -34,9 +34,12 @@ import org.slf4j.LoggerFactory;
public class IDBasedEntityResolver implements EntityResolver {
private static final Logger LOG = LoggerFactory.getLogger(IDBasedEntityResolver.class);
private final GraphHelper graphHelper = GraphHelper.getInstance();
private final AtlasTypeRegistry typeRegistry;
public IDBasedEntityResolver(AtlasTypeRegistry typeRegistry) {
this.typeRegistry = typeRegistry;
}
public EntityGraphDiscoveryContext resolveEntityReferences(EntityGraphDiscoveryContext context) throws AtlasBaseException {
if (context == null) {
......@@ -46,34 +49,36 @@ public class IDBasedEntityResolver implements EntityResolver {
EntityStream entityStream = context.getEntityStream();
for (String guid : context.getReferencedGuids()) {
if (AtlasEntity.isAssigned(guid)) { // validate in graph repo that given guid exists
AtlasVertex vertex = resolveGuid(guid);
boolean isAssignedGuid = AtlasEntity.isAssigned(guid);
AtlasVertex vertex = isAssignedGuid ? AtlasGraphUtilsV1.findByGuid(guid) : null;
context.addResolvedGuid(guid, vertex);
} else if (entityStream.getByGuid(guid) != null) { //check if entity stream have this reference id
context.addLocalGuidReference(guid);
} else {
throw new AtlasBaseException(AtlasErrorCode.REFERENCED_ENTITY_NOT_FOUND, guid);
}
}
if (vertex == null) { // if not found in the store, look if the entity is present in the stream
AtlasEntity entity = entityStream.getByGuid(guid);
return context;
if (entity != null) { // look for the entity in the store using unique-attributes
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
if (entityType == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName());
}
private AtlasVertex resolveGuid(String guid) throws AtlasBaseException {
//validate in graph repo that given guid, typename exists
AtlasVertex vertex = null;
try {
vertex = graphHelper.findVertex(Constants.GUID_PROPERTY_KEY, guid,
Constants.STATE_PROPERTY_KEY, Id.EntityState.ACTIVE.name());
} catch (EntityNotFoundException e) {
//Ignore
vertex = AtlasGraphUtilsV1.findByUniqueAttributes(entityType, entity.getAttributes());
} else if (!isAssignedGuid) { // for local-guids, entity must be in the stream
throw new AtlasBaseException(AtlasErrorCode.REFERENCED_ENTITY_NOT_FOUND, guid);
}
}
if (vertex != null) {
return vertex;
context.addResolvedGuid(guid, vertex);
} else {
if (isAssignedGuid) {
throw new AtlasBaseException(AtlasErrorCode.REFERENCED_ENTITY_NOT_FOUND, guid);
} else {
context.addLocalGuidReference(guid);
}
}
}
return context;
}
}
......@@ -43,7 +43,8 @@ import java.util.Map;
public class UniqAttrBasedEntityResolver implements EntityResolver {
private static final Logger LOG = LoggerFactory.getLogger(UniqAttrBasedEntityResolver.class);
private final GraphHelper graphHelper = GraphHelper.getInstance();
private final static GraphHelper graphHelper = GraphHelper.getInstance();
private final AtlasTypeRegistry typeRegistry;
public UniqAttrBasedEntityResolver(AtlasTypeRegistry typeRegistry) {
......@@ -61,7 +62,13 @@ public class UniqAttrBasedEntityResolver implements EntityResolver {
for (AtlasObjectId objId : context.getReferencedByUniqAttribs()) {
//query in graph repo that given unique attribute - check for deleted also?
AtlasVertex vertex = resolveByUniqueAttribute(objId);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(objId.getTypeName());
if (entityType == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), objId.getTypeName());
}
AtlasVertex vertex = AtlasGraphUtilsV1.findByUniqueAttributes(entityType, objId.getUniqueAttributes());
if (vertex != null) {
context.addResolvedIdByUniqAttribs(objId, vertex);
......@@ -73,75 +80,5 @@ public class UniqAttrBasedEntityResolver implements EntityResolver {
return context;
}
private AtlasVertex resolveByUniqueAttribute(AtlasObjectId entityId) throws AtlasBaseException {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityId.getTypeName());
if (entityType == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entityId.getTypeName());
}
final Map<String, Object> uniqueAttributes = entityId.getUniqueAttributes();
if (MapUtils.isNotEmpty(uniqueAttributes)) {
for (Map.Entry<String, Object> e : uniqueAttributes.entrySet()) {
String attrName = e.getKey();
Object attrValue = e.getValue();
AtlasAttribute attr = entityType.getAttribute(attrName);
if (attr == null || !attr.getAttributeDef().getIsUnique() || attrValue == null) {
continue;
}
AtlasVertex vertex = findByTypeAndQualifiedName(entityId.getTypeName(), attr.getQualifiedName(), attrValue);
if (vertex == null) {
vertex = findBySuperTypeAndQualifiedName(entityId.getTypeName(), attr.getQualifiedName(), attrValue);
}
if (vertex != null) {
return vertex;
}
}
}
throw new AtlasBaseException(AtlasErrorCode.REFERENCED_ENTITY_NOT_FOUND, entityId.toString());
}
private AtlasVertex findByTypeAndQualifiedName(String typeName, String qualifiedAttrName, Object attrVal) {
AtlasVertex vertex = null;
try {
vertex = graphHelper.findVertex(qualifiedAttrName, attrVal,
Constants.ENTITY_TYPE_PROPERTY_KEY, typeName,
Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name());
if (LOG.isDebugEnabled()) {
LOG.debug("Found vertex by unique attribute and type ({}={}), {} ", qualifiedAttrName, attrVal, typeName);
}
} catch (EntityNotFoundException e) {
//Ignore if not found
}
return vertex;
}
private AtlasVertex findBySuperTypeAndQualifiedName(String typeName, String qualifiedAttrName, Object attrVal) {
AtlasVertex vertex = null;
try {
vertex = graphHelper.findVertex(qualifiedAttrName, attrVal,
Constants.SUPER_TYPES_PROPERTY_KEY, typeName,
Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name());
if (LOG.isDebugEnabled()) {
LOG.debug("Found vertex by unique attribute and supertype ({}={}), {} ", qualifiedAttrName, attrVal, typeName);
}
} catch (EntityNotFoundException e) {
//Ignore if not found
}
return vertex;
}
}
......@@ -209,6 +209,7 @@ public class AtlasEntityStoreV1Test {
columns.add(col3.getAtlasObjectId());
columns.add(col4.getAtlasObjectId());
tableEntity.setAttribute(TestUtilsV2.COLUMNS_ATTR_NAME, columns);
entitiesInfo.addReferredEntity(col3);
entitiesInfo.addReferredEntity(col4);
......@@ -223,6 +224,7 @@ public class AtlasEntityStoreV1Test {
columns.clear();
columns.add(col4.getAtlasObjectId());
columns.add(col3.getAtlasObjectId());
tableEntity.setAttribute(TestUtilsV2.COLUMNS_ATTR_NAME, columns);
init();
response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment