Commit 4b2324f3 by Sarath Subramanian

ATLAS-2830: Tag Propagation, Entity Delete : Classification update notification…

ATLAS-2830: Tag Propagation, Entity Delete : Classification update notification not sent to entities down the lineage present after the deleted entity
parent 27e8b2a4
...@@ -26,6 +26,7 @@ import org.apache.atlas.AtlasErrorCode; ...@@ -26,6 +26,7 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext; import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.Status; import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelationship; import org.apache.atlas.model.instance.AtlasRelationship;
...@@ -76,6 +77,7 @@ import java.util.Objects; ...@@ -76,6 +77,7 @@ import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED; import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
import static org.apache.atlas.repository.Constants.ATTRIBUTE_INDEX_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.ATTRIBUTE_INDEX_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.ATTRIBUTE_KEY_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.ATTRIBUTE_KEY_PROPERTY_KEY;
...@@ -124,7 +126,7 @@ public final class GraphHelper { ...@@ -124,7 +126,7 @@ public final class GraphHelper {
try { try {
maxRetries = ApplicationProperties.get().getInt(RETRY_COUNT, 3); maxRetries = ApplicationProperties.get().getInt(RETRY_COUNT, 3);
retrySleepTimeMillis = ApplicationProperties.get().getLong(RETRY_DELAY, 1000); retrySleepTimeMillis = ApplicationProperties.get().getLong(RETRY_DELAY, 1000);
removePropagations = ApplicationProperties.get().getBoolean(DEFAULT_REMOVE_PROPAGATIONS_ON_ENTITY_DELETE, true); removePropagations = ApplicationProperties.get().getBoolean(DEFAULT_REMOVE_PROPAGATIONS_ON_ENTITY_DELETE, false);
} catch (AtlasException e) { } catch (AtlasException e) {
LOG.error("Could not load configuration. Setting to default value for " + RETRY_COUNT, e); LOG.error("Could not load configuration. Setting to default value for " + RETRY_COUNT, e);
} }
...@@ -1297,6 +1299,10 @@ public final class GraphHelper { ...@@ -1297,6 +1299,10 @@ public final class GraphHelper {
return element.getProperty(Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class); return element.getProperty(Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class);
} }
public static boolean isActive(AtlasEntity entity) {
return entity != null ? entity.getStatus() == ACTIVE : false;
}
/** /**
* For the given type, finds an unique attribute and checks if there is an existing instance with the same * For the given type, finds an unique attribute and checks if there is an existing instance with the same
* unique value * unique value
......
...@@ -1530,6 +1530,7 @@ public class EntityGraphMapper { ...@@ -1530,6 +1530,7 @@ public class EntityGraphMapper {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName); AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName);
List<AtlasClassification> updatedClassifications = new ArrayList<>(); List<AtlasClassification> updatedClassifications = new ArrayList<>();
List<AtlasVertex> entitiesToPropagateTo = new ArrayList<>(); List<AtlasVertex> entitiesToPropagateTo = new ArrayList<>();
Set<AtlasVertex> notificationVertices = new HashSet<AtlasVertex>() {{ add(entityVertex); }};
Map<AtlasVertex, List<AtlasClassification>> addedPropagations = null; Map<AtlasVertex, List<AtlasClassification>> addedPropagations = null;
Map<AtlasVertex, List<AtlasClassification>> removedPropagations = null; Map<AtlasVertex, List<AtlasClassification>> removedPropagations = null;
...@@ -1583,8 +1584,20 @@ public class EntityGraphMapper { ...@@ -1583,8 +1584,20 @@ public class EntityGraphMapper {
isClassificationUpdated = true; isClassificationUpdated = true;
} }
if (isClassificationUpdated && CollectionUtils.isEmpty(entitiesToPropagateTo)) { // check for removePropagationsOnEntityDelete update
entitiesToPropagateTo = graphHelper.getImpactedVerticesWithRestrictions(guid, classificationVertex.getIdForDisplay()); Boolean currentRemovePropagations = currentClassification.getRemovePropagationsOnEntityDelete();
Boolean updatedRemovePropagations = classification.getRemovePropagationsOnEntityDelete();
if (updatedRemovePropagations != null && (updatedRemovePropagations != currentRemovePropagations)) {
AtlasGraphUtilsV2.setProperty(classificationVertex, CLASSIFICATION_VERTEX_REMOVE_PROPAGATIONS_KEY, updatedRemovePropagations);
isClassificationUpdated = true;
}
if (isClassificationUpdated) {
List<AtlasVertex> propagatedEntityVertices = graphHelper.getAllPropagatedEntityVertices(classificationVertex);
notificationVertices.addAll(propagatedEntityVertices);
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
...@@ -1644,20 +1657,9 @@ public class EntityGraphMapper { ...@@ -1644,20 +1657,9 @@ public class EntityGraphMapper {
} }
} }
// handle update of 'removePropagationsOnEntityDelete' flag
Boolean currentRemovePropagations = currentClassification.getRemovePropagationsOnEntityDelete();
Boolean updatedRemovePropagations = classification.getRemovePropagationsOnEntityDelete();
if (updatedRemovePropagations != null && (updatedRemovePropagations != currentRemovePropagations)) {
AtlasGraphUtilsV2.setProperty(classificationVertex, CLASSIFICATION_VERTEX_REMOVE_PROPAGATIONS_KEY, updatedRemovePropagations);
}
updatedClassifications.add(currentClassification); updatedClassifications.add(currentClassification);
} }
// notify listeners on classification update
List<AtlasVertex> notificationVertices = new ArrayList<AtlasVertex>() {{ add(entityVertex); }};
if (CollectionUtils.isNotEmpty(entitiesToPropagateTo)) { if (CollectionUtils.isNotEmpty(entitiesToPropagateTo)) {
notificationVertices.addAll(entitiesToPropagateTo); notificationVertices.addAll(entitiesToPropagateTo);
} }
...@@ -1667,7 +1669,9 @@ public class EntityGraphMapper { ...@@ -1667,7 +1669,9 @@ public class EntityGraphMapper {
AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid); AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid);
AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null; AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
entityChangeNotifier.onClassificationUpdatedToEntity(entity, updatedClassifications); if (isActive(entity)) {
entityChangeNotifier.onClassificationUpdatedToEntity(entity, updatedClassifications);
}
} }
if (removedPropagations != null) { if (removedPropagations != null) {
...@@ -1678,7 +1682,9 @@ public class EntityGraphMapper { ...@@ -1678,7 +1682,9 @@ public class EntityGraphMapper {
AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid); AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid);
AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null; AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
entityChangeNotifier.onClassificationDeletedFromEntity(entity, removedClassifications); if (isActive(entity)) {
entityChangeNotifier.onClassificationDeletedFromEntity(entity, removedClassifications);
}
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment