Commit fc2a926c by Madhan Neethiraj

ATLAS-3054: improved batch processing in notificaiton handler to avoid…

ATLAS-3054: improved batch processing in notificaiton handler to avoid processing of an entity multiple times - #2
parent 50ff27c7
......@@ -508,7 +508,7 @@
"serviceType": "hive",
"typeVersion": "1.2",
"relationshipCategory": "COMPOSITION",
"relationshipLabel": "__hive_table.partitionkeys",
"relationshipLabel": "__hive_table.partitionKeys",
"endDef1": {
"type": "hive_table",
"name": "partitionKeys",
......
......@@ -20,6 +20,7 @@ package org.apache.atlas.type;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
......@@ -672,19 +673,25 @@ public class AtlasBuiltInTypes {
@Override
public AtlasObjectId getNormalizedValue(Object obj) {
AtlasObjectId ret = null;
if (obj != null) {
if (obj instanceof AtlasObjectId) {
return (AtlasObjectId) obj;
ret = (AtlasObjectId) obj;
} else if (obj instanceof Map) {
Map map = (Map) obj;
if (isValidMap(map)) {
return new AtlasObjectId(map);
if (map.containsKey(AtlasRelatedObjectId.KEY_RELATIONSHIP_TYPE)) {
ret = new AtlasRelatedObjectId(map);
} else {
ret = new AtlasObjectId(map);
}
}
}
}
return null;
return ret;
}
private boolean isValidMap(Map map) {
......
......@@ -799,7 +799,7 @@ public class AtlasEntityType extends AtlasStructType {
AtlasEntity entityObj = (AtlasEntity) obj;
for (String attributeName : relationshipAttributes.keySet()) {
Object value = entityObj.getAttribute(attributeName);
Object value = entityObj.getRelationshipAttribute(attributeName);
String relationshipType = AtlasEntityUtil.getRelationshipType(value);
AtlasAttribute attribute = getRelationshipAttribute(attributeName, relationshipType);
......@@ -824,7 +824,7 @@ public class AtlasEntityType extends AtlasStructType {
}
}
} else if (obj instanceof Map) {
Map attributes = AtlasTypeUtil.toStructAttributes((Map) obj);
Map attributes = AtlasTypeUtil.toRelationshipAttributes((Map) obj);
for (String attributeName : relationshipAttributes.keySet()) {
Object value = attributes.get(attributeName);
......
......@@ -159,7 +159,9 @@ public class AtlasRelationshipType extends AtlasStructType {
AtlasRelationshipEdgeDirection end2Direction = IN;
if (endDef1.getIsLegacyAttribute() && endDef2.getIsLegacyAttribute()) {
if (relationshipDef.getRelationshipLabel() == null) { // only if label hasn't been overridden
end2Direction = OUT;
}
} else if (!endDef1.getIsLegacyAttribute() && endDef2.getIsLegacyAttribute()) {
end1Direction = IN;
end2Direction = OUT;
......@@ -345,11 +347,12 @@ public class AtlasRelationshipType extends AtlasStructType {
}
attribute = new AtlasAttribute(entityType, attributeDef,
typeRegistry.getType(attrTypeName), relationshipLabel);
typeRegistry.getType(attrTypeName), getTypeName(), relationshipLabel);
} else {
// attribute already exists (legacy attribute which is also a relationship attribute)
// add relationshipLabel information to existing attribute
attribute.setRelationshipName(getTypeName());
attribute.setRelationshipEdgeLabel(relationshipLabel);
}
......
......@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import java.util.*;
import static org.apache.atlas.model.TypeCategory.*;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_PARAM_ATTRIBUTE;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_TYPE_INVERSE_REF;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_TYPE_OWNED_REF;
......@@ -701,19 +702,23 @@ public class AtlasStructType extends AtlasType {
private final String vertexPropertyName;
private final String vertexUniquePropertyName;
private final boolean isOwnedRef;
private final boolean isObjectRef;
private final String inverseRefAttributeName;
private AtlasAttribute inverseRefAttribute;
private String relationshipName;
private String relationshipEdgeLabel;
private AtlasRelationshipEdgeDirection relationshipEdgeDirection;
public AtlasAttribute(AtlasStructType definedInType, AtlasAttributeDef attrDef, AtlasType attributeType, String relationshipLabel) {
public AtlasAttribute(AtlasStructType definedInType, AtlasAttributeDef attrDef, AtlasType attributeType, String relationshipName, String relationshipLabel) {
this.definedInType = definedInType;
this.attributeDef = attrDef;
this.attributeType = attributeType.getTypeForAttribute();
this.qualifiedName = getQualifiedAttributeName(definedInType.getStructDef(), attributeDef.getName());
this.vertexPropertyName = encodePropertyKey(this.qualifiedName);
this.vertexUniquePropertyName = attrDef.getIsUnique() ? encodePropertyKey(getQualifiedAttributeName(definedInType.getStructDef(), UNIQUE_ATTRIBUTE_SHADE_PROPERTY_PREFIX + attributeDef.getName())) : null;
this.relationshipName = relationshipName;
this.relationshipEdgeLabel = getRelationshipEdgeLabel(relationshipLabel);
boolean isOwnedRef = false;
String inverseRefAttribute = null;
......@@ -736,10 +741,32 @@ public class AtlasStructType extends AtlasType {
this.isOwnedRef = isOwnedRef;
this.inverseRefAttributeName = inverseRefAttribute;
this.relationshipEdgeDirection = AtlasRelationshipEdgeDirection.OUT;
switch (attributeType.getTypeCategory()) {
case OBJECT_ID_TYPE:
isObjectRef = true;
break;
case MAP:
AtlasMapType mapType = (AtlasMapType) attributeType;
isObjectRef = mapType.getValueType().getTypeCategory() == OBJECT_ID_TYPE;
break;
case ARRAY:
AtlasArrayType arrayType = (AtlasArrayType) attributeType;
isObjectRef = arrayType.getElementType().getTypeCategory() == OBJECT_ID_TYPE;
break;
default:
isObjectRef = false;
break;
}
}
public AtlasAttribute(AtlasStructType definedInType, AtlasAttributeDef attrDef, AtlasType attributeType) {
this(definedInType, attrDef, attributeType, null);
this(definedInType, attrDef, attributeType, null, null);
}
public AtlasStructType getDefinedInType() { return definedInType; }
......@@ -766,12 +793,18 @@ public class AtlasStructType extends AtlasType {
public boolean isOwnedRef() { return isOwnedRef; }
public boolean isObjectRef() { return isObjectRef; }
public String getInverseRefAttributeName() { return inverseRefAttributeName; }
public AtlasAttribute getInverseRefAttribute() { return inverseRefAttribute; }
public void setInverseRefAttribute(AtlasAttribute inverseAttr) { inverseRefAttribute = inverseAttr; }
public String getRelationshipName() { return relationshipName; }
public void setRelationshipName(String relationshipName) { this.relationshipName = relationshipName; }
public String getRelationshipEdgeLabel() { return relationshipEdgeLabel; }
public void setRelationshipEdgeLabel(String relationshipEdgeLabel) { this.relationshipEdgeLabel = relationshipEdgeLabel; }
......
......@@ -27,6 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -126,11 +127,39 @@ public class AtlasEntityUtil {
if (val instanceof AtlasRelatedObjectId) {
ret = ((AtlasRelatedObjectId) val).getRelationshipType();
} else if (val instanceof Collection) {
String elemRelationshipType = null;
for (Object elem : (Collection) val) {
elemRelationshipType = getRelationshipType(elem);
if (elemRelationshipType != null) {
break;
}
}
ret = elemRelationshipType;
} else if (val instanceof Map) {
Map mapValue = (Map) val;
if (mapValue.containsKey(AtlasRelatedObjectId.KEY_RELATIONSHIP_TYPE)) {
Object relTypeName = ((Map) val).get(AtlasRelatedObjectId.KEY_RELATIONSHIP_TYPE);
ret = relTypeName != null ? relTypeName.toString() : null;
} else {
String entryRelationshipType = null;
for (Object entryVal : mapValue.values()) {
entryRelationshipType = getRelationshipType(entryVal);
if (entryRelationshipType != null) {
break;
}
}
ret = entryRelationshipType;
}
} else {
ret = null;
}
......
......@@ -537,7 +537,7 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
AtlasAttribute attribute = entityType.getAttribute(relation);
if (attribute != null) {
if (isRelationshipAttribute(attribute)) {
if (attribute.isObjectRef()) {
relation = attribute.getRelationshipEdgeLabel();
} else {
throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_ATTRIBUTE, relation, attribute.getTypeName());
......@@ -790,23 +790,6 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
return "";
}
private boolean isRelationshipAttribute(AtlasAttribute attribute) throws AtlasBaseException {
boolean ret = true;
AtlasType attrType = attribute.getAttributeType();
if (attrType.getTypeCategory() == ARRAY) {
attrType = ((AtlasArrayType) attrType).getElementType();
} else if (attrType.getTypeCategory() == MAP) {
attrType = ((AtlasMapType) attrType).getValueType();
}
if (attrType.getTypeCategory() != OBJECT_ID_TYPE) {
ret = false;
}
return ret;
}
private Set<String> getEntityStates() {
return new HashSet<>(Arrays.asList(ACTIVE.toString(), DELETED.toString()));
}
......
......@@ -23,6 +23,7 @@ import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.utils.AtlasEntityUtil;
import org.apache.atlas.v1.model.instance.Struct;
import org.apache.atlas.type.*;
import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType;
......@@ -134,11 +135,12 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter {
// Only process the requested/set attributes
for (String attrName : attributes.keySet()) {
Object v2Value = attributes.get(attrName);
AtlasAttribute attr = structType.getAttribute(attrName);
if (attr == null) {
if (isEntityType) {
attr = ((AtlasEntityType) structType).getRelationshipAttribute(attrName, null);
attr = ((AtlasEntityType) structType).getRelationshipAttribute(attrName, AtlasEntityUtil.getRelationshipType(v2Value));
}
if (attr == null) {
......@@ -149,7 +151,6 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter {
AtlasType attrType = attr.getAttributeType();
AtlasFormatConverter attrConverter = converterRegistry.getConverter(attrType.getTypeCategory());
Object v2Value = attributes.get(attr.getName());
if (v2Value != null && isEntityType && attr.isOwnedRef()) {
if (LOG.isDebugEnabled()) {
......@@ -256,6 +257,7 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter {
// Only process the requested/set attributes
for (Object attribKey : attributes.keySet()) {
String attrName = attribKey.toString();
Object v1Value = attributes.get(attrName);
AtlasAttribute attr = structType.getAttribute(attrName);
if (attr == null) {
......@@ -271,7 +273,6 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter {
AtlasType attrType = attr.getAttributeType();
AtlasFormatConverter attrConverter = converterRegistry.getConverter(attrType.getTypeCategory());
Object v1Value = attributes.get(attrName);
if (attrConverter.isValidValueV1(v1Value, attrType)) {
Object v2Value = attrConverter.fromV1ToV2(v1Value, attrType, context);
......
......@@ -323,7 +323,7 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
AtlasAttribute attr = entityType.getAttribute(attrName);
if (attr == null) {
attr = entityType.getRelationshipAttribute(attrName, null);
attr = entityType.getRelationshipAttribute(attrName, AtlasEntityUtil.getRelationshipType(attrValue));
if (attr == null) {
throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_ATTRIBUTE, attrName, entity.getTypeName());
......
......@@ -79,8 +79,43 @@ import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DE
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality.SET;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graph.GraphHelper.*;
import static org.apache.atlas.repository.Constants.ATTRIBUTE_KEY_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_STATUS;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_VALIDITY_PERIODS_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_PROPAGATE_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_REMOVE_PROPAGATIONS_KEY;
import static org.apache.atlas.repository.Constants.CREATED_BY_KEY;
import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.HOME_ID_KEY;
import static org.apache.atlas.repository.Constants.IS_PROXY_KEY;
import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY;
import static org.apache.atlas.repository.Constants.PROVENANCE_TYPE_KEY;
import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.SUPER_TYPES_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TRAIT_NAMES_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.ATTRIBUTE_INDEX_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.VERSION_PROPERTY_KEY;
import static org.apache.atlas.repository.graph.GraphHelper.getCollectionElementsUsingRelationship;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdge;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertex;
import static org.apache.atlas.repository.graph.GraphHelper.getDefaultRemovePropagations;
import static org.apache.atlas.repository.graph.GraphHelper.getMapElementsProperty;
import static org.apache.atlas.repository.graph.GraphHelper.getStatus;
import static org.apache.atlas.repository.graph.GraphHelper.getTraitLabel;
import static org.apache.atlas.repository.graph.GraphHelper.getTraitNames;
import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
import static org.apache.atlas.repository.graph.GraphHelper.getTypeNames;
import static org.apache.atlas.repository.graph.GraphHelper.isActive;
import static org.apache.atlas.repository.graph.GraphHelper.isPropagationEnabled;
import static org.apache.atlas.repository.graph.GraphHelper.isRelationshipEdge;
import static org.apache.atlas.repository.graph.GraphHelper.string;
import static org.apache.atlas.repository.graph.GraphHelper.updateModificationMetadata;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isReference;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
......@@ -827,27 +862,29 @@ public class EntityGraphMapper {
throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, (ctx.getValue() == null ? null : ctx.getValue().toString()));
}
String attributeName = ctx.getAttribute().getName();
AtlasType type = typeRegistry.getType(AtlasGraphUtilsV2.getTypeName(entityVertex));
AtlasRelationshipEdgeDirection edgeDirection = ctx.getAttribute().getRelationshipEdgeDirection();
if (type instanceof AtlasEntityType) {
AtlasEntityType entityType = (AtlasEntityType) type;
AtlasAttribute attribute = ctx.getAttribute();
String attributeName = attribute.getName();
// use relationship to create/update edges
if (entityType.hasRelationshipAttribute(attributeName)) {
Map<String, Object> relationshipAttributes = getRelationshipAttributes(ctx.getValue());
if (ctx.getCurrentEdge() != null) {
ret = updateRelationship(ctx.getCurrentEdge(), entityVertex, attributeVertex, edgeDirection, relationshipAttributes);
ret = updateRelationship(ctx.getCurrentEdge(), entityVertex, attributeVertex, attribute.getRelationshipEdgeDirection(), relationshipAttributes);
} else {
String relationshipName = graphHelper.getRelationshipTypeName(entityVertex, entityType, attributeName);
String relationshipName = attribute.getRelationshipName();
AtlasVertex fromVertex;
AtlasVertex toVertex;
if (edgeDirection == IN) {
if (StringUtils.isEmpty(relationshipName)) {
relationshipName = graphHelper.getRelationshipTypeName(entityVertex, entityType, attributeName);
}
if (attribute.getRelationshipEdgeDirection() == IN) {
fromVertex = attributeVertex;
toVertex = entityVertex;
......@@ -1106,21 +1143,29 @@ public class EntityGraphMapper {
}
private static AtlasObjectId getObjectId(Object val) throws AtlasBaseException {
AtlasObjectId ret = null;
if (val != null) {
if ( val instanceof AtlasObjectId) {
return ((AtlasObjectId) val);
ret = ((AtlasObjectId) val);
} else if (val instanceof Map) {
AtlasObjectId ret = new AtlasObjectId((Map)val);
Map map = (Map) val;
if (AtlasTypeUtil.isValid(ret)) {
return ret;
}
if (map.containsKey(AtlasRelatedObjectId.KEY_RELATIONSHIP_TYPE)) {
ret = new AtlasRelatedObjectId(map);
} else {
ret = new AtlasObjectId((Map) val);
}
if (!AtlasTypeUtil.isValid(ret)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, val.toString());
}
} else {
throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, val.toString());
}
}
return null;
return ret;
}
private static String getGuid(Object val) throws AtlasBaseException {
......
......@@ -36,7 +36,7 @@ public class HivePreprocessor {
private static final Logger LOG = LoggerFactory.getLogger(HivePreprocessor.class);
private static final String RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS = "hive_table_columns";
private static final String RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS = "hive_table_partitionKeys";
private static final String RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS = "hive_table_partitionkeys";
static class HiveTablePreprocessor extends EntityPreprocessor {
public HiveTablePreprocessor() {
......@@ -76,7 +76,7 @@ public class HivePreprocessor {
}
private void removeColumnsAttributeAndRegisterToMove(AtlasEntity entity, String attrName, String relationshipType, PreprocessorContext context) {
Object attrVal = entity.getAttribute(attrName);
Object attrVal = entity.removeAttribute(attrName);
if (attrVal != null) {
Set<String> guids = new HashSet<>();
......
......@@ -91,6 +91,10 @@ public class PreprocessorContext {
public boolean getRdbmsTypesRemoveOwnedRefAttrs() { return rdbmsTypesRemoveOwnedRefAttrs; }
public boolean isHivePreprocessEnabled() {
return !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || hiveTypesRemoveOwnedRefAttrs;
}
public List<AtlasEntity> getEntities() {
return entitiesWithExtInfo != null ? entitiesWithExtInfo.getEntities() : null;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment