Commit 2237a895 by Sarath Subramanian

ATLAS-2609: Update audit and notification listener to handle propagated…

ATLAS-2609: Update audit and notification listener to handle propagated classification add/delete/update
parent 30dd2f5b
......@@ -47,6 +47,7 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
public class EntityAuditEvent implements Serializable {
public enum EntityAuditAction {
ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, TAG_ADD, TAG_DELETE, TAG_UPDATE,
PROPAGATED_TAG_ADD, PROPAGATED_TAG_DELETE, PROPAGATED_TAG_UPDATE,
ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE;
public static EntityAuditAction fromString(String strValue) {
......@@ -72,6 +73,12 @@ public class EntityAuditEvent implements Serializable {
case "CLASSIFICATION_UPDATE":
case "TAG_UPDATE":
return TAG_UPDATE;
case "PROPAGATED_TAG_ADD":
return PROPAGATED_TAG_ADD;
case "PROPAGATED_TAG_DELETE":
return PROPAGATED_TAG_DELETE;
case "PROPAGATED_TAG_UPDATE":
return PROPAGATED_TAG_UPDATE;
}
throw new IllegalArgumentException("No enum constant " + EntityAuditAction.class.getCanonicalName() + "." + strValue);
......
......@@ -28,6 +28,9 @@ define(['require'], function(require) {
TAG_ADD: "Classification Added",
TAG_DELETE: "Classification Deleted",
TAG_UPDATE: "Classification Updated",
PROPAGATED_TAG_ADD: "Propagated Classification Added",
PROPAGATED_TAG_DELETE: "Propagated Classification Deleted",
PROPAGATED_TAG_UPDATE: "Propagated Classification Updated",
ENTITY_IMPORT_CREATE: "Entity Created by import",
ENTITY_IMPORT_UPDATE: "Entity Updated by import",
ENTITY_IMPORT_DELETE: "Entity Deleted by import"
......
......@@ -74,8 +74,8 @@ public interface EntityChangeListenerV2 {
* This is upon deleting classifications from an entity.
*
* @param entity the entity
* @param classificationNames classifications names for the instance that needs to be deleted from entity
* @param classifications classifications that needs to be updated for an entity
* @throws AtlasBaseException if the listener notification fails
*/
void onClassificationsDeleted(AtlasEntity entity, List<String> classificationNames) throws AtlasBaseException;
void onClassificationsDeleted(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException;
}
\ No newline at end of file
......@@ -45,7 +45,8 @@ public class EntityAuditEventV2 implements Serializable {
public enum EntityAuditAction {
ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE,
ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE,
CLASSIFICATION_ADD, CLASSIFICATION_DELETE, CLASSIFICATION_UPDATE;
CLASSIFICATION_ADD, CLASSIFICATION_DELETE, CLASSIFICATION_UPDATE,
PROPAGATED_CLASSIFICATION_ADD, PROPAGATED_CLASSIFICATION_DELETE, PROPAGATED_CLASSIFICATION_UPDATE;
public static EntityAuditAction fromString(String strValue) {
switch (strValue) {
......@@ -70,6 +71,12 @@ public class EntityAuditEventV2 implements Serializable {
case "CLASSIFICATION_UPDATE":
case "TAG_UPDATE":
return CLASSIFICATION_UPDATE;
case "PROPAGATED_CLASSIFICATION_ADD":
return PROPAGATED_CLASSIFICATION_ADD;
case "PROPAGATED_CLASSIFICATION_DELETE":
return PROPAGATED_CLASSIFICATION_DELETE;
case "PROPAGATED_CLASSIFICATION_UPDATE":
return PROPAGATED_CLASSIFICATION_UPDATE;
}
throw new IllegalArgumentException("No enum constant " + EntityAuditAction.class.getCanonicalName() + "." + strValue);
......
......@@ -156,10 +156,10 @@ public abstract class AbstractStorageBasedAuditRepository implements Service, En
APPLICATION_PROPERTIES = config;
}
protected byte[] getKey(String id, Long ts) {
protected byte[] getKey(String id, Long ts, int index) {
assert id != null : "entity id can't be null";
assert ts != null : "timestamp can't be null";
String keyStr = id + FIELD_SEPARATOR + ts;
String keyStr = id + FIELD_SEPARATOR + ts + FIELD_SEPARATOR + index;
return Bytes.toBytes(keyStr);
}
......
......@@ -94,10 +94,10 @@ public class EntityAuditListener implements EntityChangeListener {
}
@Override
public void onTraitsDeleted(Referenceable entity, Collection<String> traitNames) throws AtlasException {
if (traitNames != null) {
for (String traitName : traitNames) {
EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_DELETE, "Deleted trait: " + traitName);
public void onTraitsDeleted(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException {
if (traits != null) {
for (Struct trait : traits) {
EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_DELETE, "Deleted trait: " + trait.getTypeName());
auditRepository.putEventsV1(event);
}
......
......@@ -30,6 +30,7 @@ import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
......@@ -50,6 +51,9 @@ import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.ENTITY_IMPORT_DELETE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.ENTITY_IMPORT_UPDATE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.ENTITY_UPDATE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.PROPAGATED_CLASSIFICATION_ADD;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.PROPAGATED_CLASSIFICATION_DELETE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.PROPAGATED_CLASSIFICATION_UPDATE;
@Component
public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
......@@ -109,7 +113,11 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasClassification classification : classifications) {
if (entity.getGuid().equals(classification.getEntityGuid())) {
events.add(createEvent(entity, CLASSIFICATION_ADD, "Added classification: " + AtlasType.toJson(classification)));
} else {
events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + AtlasType.toJson(classification)));
}
}
auditRepository.putEventsV2(events);
......@@ -120,9 +128,20 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
public void onClassificationsUpdated(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications)) {
List<EntityAuditEventV2> events = new ArrayList<>();
String guid = entity.getGuid();
for (AtlasClassification classification : classifications) {
if (guid.equals(classification.getEntityGuid())) {
events.add(createEvent(entity, CLASSIFICATION_UPDATE, "Updated classification: " + AtlasType.toJson(classification)));
} else {
if (isPropagatedClassificationAdded(guid, classification)) {
events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + AtlasType.toJson(classification)));
} else if (isPropagatedClassificationDeleted(guid, classification)) {
events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + classification.getTypeName()));
} else {
events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_UPDATE, "Updated propagated classification: " + AtlasType.toJson(classification)));
}
}
}
auditRepository.putEventsV2(events);
......@@ -130,12 +149,16 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
}
@Override
public void onClassificationsDeleted(AtlasEntity entity, List<String> classificationNames) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classificationNames)) {
public void onClassificationsDeleted(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications)) {
List<EntityAuditEventV2> events = new ArrayList<>();
for (String classificationName : classificationNames) {
events.add(createEvent(entity, CLASSIFICATION_DELETE, "Deleted classification: " + classificationName));
for (AtlasClassification classification : classifications) {
if (StringUtils.equals(entity.getGuid(), classification.getEntityGuid())) {
events.add(createEvent(entity, CLASSIFICATION_DELETE, "Deleted classification: " + classification.getTypeName()));
} else {
events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + classification.getTypeName()));
}
}
auditRepository.putEventsV2(events);
......@@ -180,6 +203,37 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
return auditString;
}
private boolean isPropagatedClassificationAdded(String guid, AtlasClassification classification) {
Map<String, List<AtlasClassification>> addedPropagations = RequestContextV1.get().getAddedPropagations();
return hasPropagatedEntry(addedPropagations, guid, classification);
}
private boolean isPropagatedClassificationDeleted(String guid, AtlasClassification classification) {
Map<String, List<AtlasClassification>> removedPropagations = RequestContextV1.get().getRemovedPropagations();
return hasPropagatedEntry(removedPropagations, guid, classification);
}
private boolean hasPropagatedEntry(Map<String, List<AtlasClassification>> propagationsMap, String guid, AtlasClassification classification) {
boolean ret = false;
if (MapUtils.isNotEmpty(propagationsMap) && propagationsMap.containsKey(guid) && CollectionUtils.isNotEmpty(propagationsMap.get(guid))) {
List<AtlasClassification> classifications = propagationsMap.get(guid);
String classificationName = classification.getTypeName();
String entityGuid = classification.getEntityGuid();
for (AtlasClassification c : classifications) {
if (StringUtils.equals(c.getTypeName(), classificationName) && StringUtils.equals(c.getEntityGuid(), entityGuid)) {
ret = true;
break;
}
}
}
return ret;
}
private Map<String, Object> pruneEntityAttributesForAudit(AtlasEntity entity) {
Map<String, Object> ret = null;
Map<String, Object> entityAttributes = entity.getAttributes();
......
......@@ -27,9 +27,6 @@ import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.service.Service;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.HBaseConfiguration;
......@@ -57,11 +54,8 @@ import javax.inject.Singleton;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* HBase based repository for entity audit events
......@@ -110,9 +104,12 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
try {
table = connection.getTable(tableName);
List<Put> puts = new ArrayList<>(events.size());
for (EntityAuditEvent event : events) {
for (int index = 0; index < events.size(); index++) {
EntityAuditEvent event = events.get(index);
LOG.debug("Adding entity audit event {}", event);
Put put = new Put(getKey(event.getEntityId(), event.getTimestamp()));
Put put = new Put(getKey(event.getEntityId(), event.getTimestamp(), index));
addColumn(put, COLUMN_ACTION, event.getAction());
addColumn(put, COLUMN_USER, event.getUser());
addColumn(put, COLUMN_DETAIL, event.getDetails());
......@@ -141,12 +138,14 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
table = connection.getTable(tableName);
List<Put> puts = new ArrayList<>(events.size());
for (EntityAuditEventV2 event : events) {
for (int index = 0; index < events.size(); index++) {
EntityAuditEventV2 event = events.get(index);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding entity audit event {}", event);
}
Put put = new Put(getKey(event.getEntityId(), event.getTimestamp()));
Put put = new Put(getKey(event.getEntityId(), event.getTimestamp(), index));
addColumn(put, COLUMN_ACTION, event.getAction());
addColumn(put, COLUMN_USER, event.getUser());
......@@ -197,7 +196,7 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
if (StringUtils.isEmpty(startKey)) {
//Set start row to entity id + max long value
byte[] entityBytes = getKey(entityId, Long.MAX_VALUE);
byte[] entityBytes = getKey(entityId, Long.MAX_VALUE, Integer.MAX_VALUE);
scan = scan.setStartRow(entityBytes);
} else {
scan = scan.setStartRow(Bytes.toBytes(startKey));
......@@ -287,7 +286,7 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
.setSmall(true);
if (StringUtils.isEmpty(startKey)) {
//Set start row to entity id + max long value
byte[] entityBytes = getKey(entityId, Long.MAX_VALUE);
byte[] entityBytes = getKey(entityId, Long.MAX_VALUE, Integer.MAX_VALUE);
scan = scan.setStartRow(entityBytes);
} else {
scan = scan.setStartRow(Bytes.toBytes(startKey));
......
......@@ -375,6 +375,12 @@ public class AtlasInstanceConverter {
return EntityAuditEvent.EntityAuditAction.TAG_DELETE;
case CLASSIFICATION_UPDATE:
return EntityAuditEvent.EntityAuditAction.TAG_UPDATE;
case PROPAGATED_CLASSIFICATION_ADD:
return EntityAuditEvent.EntityAuditAction.PROPAGATED_TAG_ADD;
case PROPAGATED_CLASSIFICATION_DELETE:
return EntityAuditEvent.EntityAuditAction.PROPAGATED_TAG_DELETE;
case PROPAGATED_CLASSIFICATION_UPDATE:
return EntityAuditEvent.EntityAuditAction.PROPAGATED_TAG_UPDATE;
}
return null;
......@@ -400,6 +406,12 @@ public class AtlasInstanceConverter {
return EntityAuditEventV2.EntityAuditAction.CLASSIFICATION_DELETE;
case TAG_UPDATE:
return EntityAuditEventV2.EntityAuditAction.CLASSIFICATION_UPDATE;
case PROPAGATED_TAG_ADD:
return EntityAuditEventV2.EntityAuditAction.PROPAGATED_CLASSIFICATION_ADD;
case PROPAGATED_TAG_DELETE:
return EntityAuditEventV2.EntityAuditAction.PROPAGATED_CLASSIFICATION_DELETE;
case PROPAGATED_TAG_UPDATE:
return EntityAuditEventV2.EntityAuditAction.PROPAGATED_CLASSIFICATION_UPDATE;
}
return null;
......
......@@ -20,9 +20,11 @@ package org.apache.atlas.repository.store.graph.v1;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.listener.EntityChangeListenerV2;
import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
......@@ -41,6 +43,7 @@ import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.util.AtlasRepositoryConfiguration;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -50,9 +53,12 @@ import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.PROPAGATED_CLASSIFICATION_ADD;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.PROPAGATED_CLASSIFICATION_DELETE;
import static org.apache.atlas.util.AtlasRepositoryConfiguration.isV2EntityNotificationEnabled;
......@@ -100,6 +106,8 @@ public class AtlasEntityChangeNotifier {
notifyListeners(updatedEntities, EntityOperation.UPDATE, isImport);
notifyListeners(partiallyUpdatedEntities, EntityOperation.PARTIAL_UPDATE, isImport);
notifyListeners(deletedEntities, EntityOperation.DELETE, isImport);
notifyPropagatedEntities();
}
public void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
......@@ -156,25 +164,26 @@ public class AtlasEntityChangeNotifier {
}
}
public void onClassificationDeletedFromEntity(AtlasEntity entity, List<String> deletedClassificationNames) throws AtlasBaseException {
public void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
if (isV2EntityNotificationEnabled()) {
doFullTextMapping(entity.getGuid());
for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
listener.onClassificationsDeleted(entity, deletedClassificationNames);
listener.onClassificationsDeleted(entity, deletedClassifications);
}
} else {
doFullTextMapping(entity.getGuid());
Referenceable entityRef = toReferenceable(entity.getGuid());
List<Struct> traits = toStruct(deletedClassifications);
if (entityRef == null || CollectionUtils.isEmpty(deletedClassificationNames)) {
if (entityRef == null || CollectionUtils.isEmpty(deletedClassifications)) {
return;
}
for (EntityChangeListener listener : entityChangeListeners) {
try {
listener.onTraitsDeleted(entityRef, deletedClassificationNames);
listener.onTraitsDeleted(entityRef, traits);
} catch (AtlasException e) {
throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitDelete");
}
......@@ -183,6 +192,36 @@ public class AtlasEntityChangeNotifier {
}
}
public void notifyPropagatedEntities() throws AtlasBaseException {
RequestContextV1 context = RequestContextV1.get();
Map<String, List<AtlasClassification>> addedPropagations = context.getAddedPropagations();
Map<String, List<AtlasClassification>> removedPropagations = context.getRemovedPropagations();
notifyPropagatedEntities(addedPropagations, PROPAGATED_CLASSIFICATION_ADD);
notifyPropagatedEntities(removedPropagations, PROPAGATED_CLASSIFICATION_DELETE);
}
private void notifyPropagatedEntities(Map<String, List<AtlasClassification>> entityPropagationMap, EntityAuditAction action) throws AtlasBaseException {
if (MapUtils.isEmpty(entityPropagationMap) || action == null) {
return;
}
for (String guid : entityPropagationMap.keySet()) {
AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(guid);
AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
if (entity == null) {
continue;
}
if (action == PROPAGATED_CLASSIFICATION_ADD) {
onClassificationAddedToEntity(entity, entityPropagationMap.get(guid));
} else if (action == PROPAGATED_CLASSIFICATION_DELETE) {
onClassificationDeletedFromEntity(entity, entityPropagationMap.get(guid));
}
}
}
private String getListenerName(EntityChangeListener listener) {
return listener.getClass().getSimpleName();
}
......
......@@ -19,6 +19,7 @@
package org.apache.atlas.repository.store.graph.v1;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
......@@ -84,12 +85,14 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
private final EntityGraphRetriever entityRetriever;
private final DeleteHandlerV1 deleteHandler;
private final GraphHelper graphHelper = GraphHelper.getInstance();
private final AtlasEntityChangeNotifier entityChangeNotifier;
@Inject
public AtlasRelationshipStoreV1(AtlasTypeRegistry typeRegistry, DeleteHandlerV1 deleteHandler) {
public AtlasRelationshipStoreV1(AtlasTypeRegistry typeRegistry, DeleteHandlerV1 deleteHandler, AtlasEntityChangeNotifier entityChangeNotifier) {
this.typeRegistry = typeRegistry;
this.entityRetriever = new EntityGraphRetriever(typeRegistry);
this.deleteHandler = deleteHandler;
this.entityChangeNotifier = entityChangeNotifier;
}
@Override
......@@ -112,6 +115,9 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
LOG.debug("<== create({}): {}", relationship, ret);
}
// notify entities for added/removed classification propagation
entityChangeNotifier.notifyPropagatedEntities();
return ret;
}
......@@ -181,6 +187,9 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
AtlasRelationship ret = updateRelationship(edge, relationship);
// notify entities for added/removed classification propagation
entityChangeNotifier.notifyPropagatedEntities();
if (LOG.isDebugEnabled()) {
LOG.debug("<== update({}): {}", relationship, ret);
}
......@@ -229,8 +238,20 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_ALREADY_DELETED, guid);
}
// remove tag propagations
List<AtlasVertex> propagatedClassificationVertices = getClassificationVertices(edge);
for (AtlasVertex classificationVertex : propagatedClassificationVertices) {
List<AtlasVertex> removePropagationFromVertices = graphHelper.getPropagatedEntityVertices(classificationVertex);
deleteHandler.removeTagPropagation(classificationVertex, removePropagationFromVertices);
}
deleteHandler.deleteRelationships(Collections.singleton(edge));
// notify entities for added/removed classification propagation
entityChangeNotifier.notifyPropagatedEntities();
if (LOG.isDebugEnabled()) {
LOG.debug("<== deleteById({}): {}", guid);
}
......@@ -698,7 +719,7 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
handleBlockedClassifications(ret, relationship.getBlockedPropagatedClassifications());
// propagate tags
entityRetriever.addTagPropagation(ret, tagPropagation);
deleteHandler.addTagPropagation(ret, tagPropagation);
}
return ret;
......
......@@ -1462,30 +1462,43 @@ public class EntityGraphMapper {
validateClassificationExists(traitNames, classificationNames);
Map<AtlasVertex, List<String>> removedClassifications = new HashMap<>();
Map<AtlasVertex, List<AtlasClassification>> removedClassifications = new HashMap<>();
for (String classificationName : classificationNames) {
AtlasVertex classificationVertex = getClassificationVertex(entityVertex, classificationName);
AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex);
// remove classification from propagated entities if propagation is turned on
if (isPropagationEnabled(classificationVertex)) {
List<AtlasVertex> impactedVertices = deleteHandler.removeTagPropagation(classificationVertex);
List<AtlasVertex> propagatedEntityVertices = deleteHandler.removeTagPropagation(classificationVertex);
if (CollectionUtils.isNotEmpty(impactedVertices)) {
for (AtlasVertex impactedVertex : impactedVertices) {
List<String> classifications = removedClassifications.get(impactedVertex);
// add propagated entities and deleted classification details to removeClassifications map
if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) {
for (AtlasVertex propagatedEntityVertex : propagatedEntityVertices) {
List<AtlasClassification> classifications = removedClassifications.get(propagatedEntityVertex);
if (classifications == null) {
classifications = new ArrayList<>();
removedClassifications.put(impactedVertex, classifications);
removedClassifications.put(propagatedEntityVertex, classifications);
}
classifications.add(classificationName);
classifications.add(classification);
}
}
}
// add associated entity and deleted classification details to removeClassifications map
List<AtlasClassification> classifications = removedClassifications.get(entityVertex);
if (classifications == null) {
classifications = new ArrayList<>();
removedClassifications.put(entityVertex, classifications);
}
classifications.add(classification);
// remove classifications from associated entity
if (LOG.isDebugEnabled()) {
LOG.debug("Removing classification: [{}] from: [{}][{}] with edge label: [{}]", classificationName,
......@@ -1499,15 +1512,13 @@ public class EntityGraphMapper {
traitNames.remove(classificationName);
}
removedClassifications.put(entityVertex, classificationNames);
updateTraitNamesProperty(entityVertex, traitNames);
updateModificationMetadata(entityVertex);
for (Map.Entry<AtlasVertex, List<String>> entry : removedClassifications.entrySet()) {
for (Map.Entry<AtlasVertex, List<AtlasClassification>> entry : removedClassifications.entrySet()) {
String guid = GraphHelper.getGuid(entry.getKey());
List<String> deletedClassificationNames = entry.getValue();
List<AtlasClassification> deletedClassificationNames = entry.getValue();
AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(guid);
AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
......@@ -1532,12 +1543,16 @@ public class EntityGraphMapper {
List<AtlasVertex> entitiesToPropagateTo = new ArrayList<>();
Map<AtlasVertex, List<AtlasClassification>> addedPropagations = null;
Map<AtlasVertex, List<String>> removedPropagations = null;
Map<AtlasVertex, List<AtlasClassification>> removedPropagations = null;
for (AtlasClassification classification : classifications) {
String classificationName = classification.getTypeName();
String classificationEntityGuid = classification.getEntityGuid();
if (StringUtils.isEmpty(classificationEntityGuid)) {
classification.setEntityGuid(guid);
}
if (StringUtils.isNotEmpty(classificationEntityGuid) && !StringUtils.equalsIgnoreCase(guid, classificationEntityGuid)) {
throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_UPDATE_FROM_PROPAGATED_ENTITY, classificationName);
}
......@@ -1597,7 +1612,7 @@ public class EntityGraphMapper {
if (updatedTagPropagation != null && currentTagPropagation != updatedTagPropagation) {
if (updatedTagPropagation) {
if (CollectionUtils.isEmpty(entitiesToPropagateTo)) {
entitiesToPropagateTo = graphHelper.getImpactedVertices(guid);
entitiesToPropagateTo = graphHelper.getImpactedVerticesWithRestrictions(guid, classificationVertex.getIdForDisplay());
}
if (CollectionUtils.isNotEmpty(entitiesToPropagateTo)) {
......@@ -1625,7 +1640,7 @@ public class EntityGraphMapper {
removedPropagations = new HashMap<>();
for (AtlasVertex impactedVertex : impactedVertices) {
List<String> removedClassifications = removedPropagations.get(impactedVertex);
List<AtlasClassification> removedClassifications = removedPropagations.get(impactedVertex);
if (removedClassifications == null) {
removedClassifications = new ArrayList<>();
......@@ -1633,7 +1648,7 @@ public class EntityGraphMapper {
removedPropagations.put(impactedVertex, removedClassifications);
}
removedClassifications.add(classification.getTypeName());
removedClassifications.add(classification);
}
}
}
......@@ -1654,15 +1669,14 @@ public class EntityGraphMapper {
String entityGuid = GraphHelper.getGuid(vertex);
AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid);
AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
List<AtlasClassification> updatedClassificationList = StringUtils.equals(entityGuid, guid) ? updatedClassifications : Collections.emptyList();
entityChangeNotifier.onClassificationUpdatedToEntity(entity, updatedClassificationList);
entityChangeNotifier.onClassificationUpdatedToEntity(entity, updatedClassifications);
}
if (removedPropagations != null) {
for (Map.Entry<AtlasVertex, List<String>> entry : removedPropagations.entrySet()) {
for (Map.Entry<AtlasVertex, List<AtlasClassification>> entry : removedPropagations.entrySet()) {
AtlasVertex vertex = entry.getKey();
List<String> removedClassifications = entry.getValue();
List<AtlasClassification> removedClassifications = entry.getValue();
String entityGuid = GraphHelper.getGuid(vertex);
AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid);
AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
......@@ -1701,11 +1715,14 @@ public class EntityGraphMapper {
AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid);
AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
AtlasClassification classification;
if (updatedState == PropagationState.DELETED) {
entityChangeNotifier.onClassificationDeletedFromEntity(entity, Collections.singletonList(classificationName));
classification = entityRetriever.toAtlasClassification(getClassificationVertex(entityVertex, classificationName));
entityChangeNotifier.onClassificationDeletedFromEntity(entity, Collections.singletonList(classification));
} else {
AtlasClassification classification = entityRetriever.toAtlasClassification(propagatedEdge.getInVertex());
classification = entityRetriever.toAtlasClassification(propagatedEdge.getInVertex());
entityChangeNotifier.onClassificationAddedToEntity(entity, Collections.singletonList(classification));
}
......
......@@ -1049,154 +1049,4 @@ public final class EntityGraphRetriever {
relationship.setAttribute(attribute.getName(), attrValue);
}
}
public void addTagPropagation(AtlasEdge edge, PropagateTags propagateTags) throws AtlasBaseException {
if (edge == null) {
return;
}
AtlasVertex outVertex = edge.getOutVertex();
AtlasVertex inVertex = edge.getInVertex();
if (propagateTags == ONE_TO_TWO || propagateTags == PropagateTags.BOTH) {
addTagPropagation(outVertex, inVertex, edge);
}
if (propagateTags == PropagateTags.TWO_TO_ONE || propagateTags == PropagateTags.BOTH) {
addTagPropagation(inVertex, outVertex, edge);
}
}
public void removeTagPropagation(AtlasEdge edge, PropagateTags propagateTags) throws AtlasBaseException {
if (edge == null) {
return;
}
AtlasVertex outVertex = edge.getOutVertex();
AtlasVertex inVertex = edge.getInVertex();
if (propagateTags == ONE_TO_TWO || propagateTags == PropagateTags.BOTH) {
removeTagPropagation(outVertex, inVertex, edge);
}
if (propagateTags == PropagateTags.TWO_TO_ONE || propagateTags == PropagateTags.BOTH) {
removeTagPropagation(inVertex, outVertex, edge);
}
}
private void addTagPropagation(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasEdge edge) throws AtlasBaseException {
final List<AtlasVertex> classificationVertices = getPropagationEnabledClassificationVertices(fromVertex);
final List<AtlasVertex> impactedEntityVertices = CollectionUtils.isNotEmpty(classificationVertices) ? graphHelper.getIncludedImpactedVerticesWithReferences(toVertex, getRelationshipGuid(edge)) : null;
if (CollectionUtils.isNotEmpty(impactedEntityVertices)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Propagate {} tags: from {} entity to {} entities", classificationVertices.size(), getTypeName(fromVertex), impactedEntityVertices.size());
}
for (AtlasVertex classificationVertex : classificationVertices) {
String classificationName = getTypeName(classificationVertex);
AtlasVertex associatedEntityVertex = getAssociatedEntityVertex(classificationVertex);
AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classificationName);
for (AtlasVertex impactedEntityVertex : impactedEntityVertices) {
if (getClassificationEdge(impactedEntityVertex, classificationVertex) != null) {
if (LOG.isDebugEnabled()) {
LOG.debug(" --> Classification edge already exists from [{}] --> [{}][{}] using edge label: [{}]",
getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), classificationName);
}
continue;
} else if (getPropagatedClassificationEdge(impactedEntityVertex, classificationVertex) != null) {
if (LOG.isDebugEnabled()) {
LOG.debug(" --> Propagated classification edge already exists from [{}] --> [{}][{}] using edge label: [{}]",
getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL);
}
continue;
}
String entityTypeName = getTypeName(impactedEntityVertex);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName);
if (!classificationType.canApplyToEntityType(entityType)) {
if (LOG.isDebugEnabled()) {
LOG.debug(" --> Not creating propagated classification edge from [{}] --> [{}][{}], classification is not applicable for entity type",
getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex));
}
continue;
}
if (LOG.isDebugEnabled()) {
LOG.debug(" --> Creating propagated classification edge from [{}] --> [{}][{}] using edge label: [{}]",
getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL);
}
AtlasEdge existingEdge = getPropagatedClassificationEdge(impactedEntityVertex, classificationVertex);
if (existingEdge != null) {
continue;
}
graphHelper.addClassificationEdge(impactedEntityVertex, classificationVertex, true);
addToPropagatedTraitNames(impactedEntityVertex, classificationName);
}
}
}
}
private void removeTagPropagation(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasEdge edge) throws AtlasBaseException {
final List<AtlasVertex> classificationVertices = getPropagationEnabledClassificationVertices(fromVertex);
final List<AtlasVertex> impactedEntityVertices = CollectionUtils.isNotEmpty(classificationVertices) ? graphHelper.getIncludedImpactedVerticesWithReferences(toVertex, getRelationshipGuid(edge)) : null;
if (CollectionUtils.isNotEmpty(impactedEntityVertices)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing {} propagated tags: for {} from {} entities", classificationVertices.size(), getTypeName(fromVertex), impactedEntityVertices.size());
}
for (AtlasVertex classificationVertex : classificationVertices) {
String classificationName = getTypeName(classificationVertex);
AtlasVertex associatedEntityVertex = getAssociatedEntityVertex(classificationVertex);
List<AtlasVertex> referrals = graphHelper.getIncludedImpactedVerticesWithReferences(associatedEntityVertex, getRelationshipGuid(edge));
for (AtlasVertex impactedEntityVertex : impactedEntityVertices) {
if (referrals.contains(impactedEntityVertex)) {
if (LOG.isDebugEnabled()) {
if (StringUtils.equals(getGuid(impactedEntityVertex), getGuid(associatedEntityVertex))) {
LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}], since [{}] is associated with [{}]",
getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL, classificationName, getTypeName(associatedEntityVertex));
} else {
LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}], since [{}] is propagated through other path",
getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL, classificationName);
}
}
continue;
}
// remove propagated classification edge and classificationName from propagatedTraitNames vertex property
AtlasEdge propagatedEdge = getPropagatedClassificationEdge(impactedEntityVertex, classificationVertex);
if (propagatedEdge != null) {
if (LOG.isDebugEnabled()) {
LOG.debug(" --> Removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}]",
getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL);
}
graphHelper.removeEdge(propagatedEdge);
if (getClassificationEdgeState(propagatedEdge) == ACTIVE) {
removeFromPropagatedTraitNames(impactedEntityVertex, classificationName);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] using edge label: [{}], since edge doesn't exist",
getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL);
}
}
}
}
}
}
}
\ No newline at end of file
......@@ -82,6 +82,9 @@ public abstract class AtlasRelationshipStoreV1Test {
@Inject
EntityGraphMapper graphMapper;
@Inject
AtlasEntityChangeNotifier entityNotifier;
AtlasEntityStore entityStore;
AtlasRelationshipStore relationshipStore;
AtlasEntityChangeNotifier mockChangeNotifier = mock(AtlasEntityChangeNotifier.class);
......@@ -116,7 +119,7 @@ public abstract class AtlasRelationshipStoreV1Test {
@BeforeTest
public void init() throws Exception {
entityStore = new AtlasEntityStoreV1(deleteHandler, typeRegistry, mockChangeNotifier, graphMapper);
relationshipStore = new AtlasRelationshipStoreV1(typeRegistry, deleteHandler);
relationshipStore = new AtlasRelationshipStoreV1(typeRegistry, deleteHandler, entityNotifier);
RequestContextV1.clear();
RequestContextV1.get().setUser(TestUtilsV2.TEST_USER, null);
......
......@@ -18,8 +18,10 @@
package org.apache.atlas;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -33,6 +35,8 @@ public class RequestContextV1 {
private final Map<String, AtlasObjectId> updatedEntities = new HashMap<>();
private final Map<String, AtlasObjectId> deletedEntities = new HashMap<>();
private final Map<String, AtlasEntityWithExtInfo> entityCacheV2 = new HashMap<>();
private final Map<String, List<AtlasClassification>> addedPropagations = new HashMap<>();
private final Map<String, List<AtlasClassification>> removedPropagations = new HashMap<>();
private final long requestTime = System.currentTimeMillis();
private String user;
......@@ -63,6 +67,8 @@ public class RequestContextV1 {
instance.updatedEntities.clear();
instance.deletedEntities.clear();
instance.entityCacheV2.clear();
instance.addedPropagations.clear();
instance.removedPropagations.clear();
}
CURRENT_CONTEXT.remove();
......@@ -101,6 +107,42 @@ public class RequestContextV1 {
}
}
public void recordAddedPropagation(String guid, AtlasClassification classification) {
if (StringUtils.isNotEmpty(guid) && classification != null) {
List<AtlasClassification> classifications = addedPropagations.get(guid);
if (classifications == null) {
classifications = new ArrayList<>();
}
classifications.add(classification);
addedPropagations.put(guid, classifications);
}
}
public void recordRemovedPropagation(String guid, AtlasClassification classification) {
if (StringUtils.isNotEmpty(guid) && classification != null) {
List<AtlasClassification> classifications = removedPropagations.get(guid);
if (classifications == null) {
classifications = new ArrayList<>();
}
classifications.add(classification);
removedPropagations.put(guid, classifications);
}
}
public Map<String, List<AtlasClassification>> getAddedPropagations() {
return addedPropagations;
}
public Map<String, List<AtlasClassification>> getRemovedPropagations() {
return removedPropagations;
}
/**
* Adds the specified instance to the cache
*
......
......@@ -62,11 +62,11 @@ public interface EntityChangeListener {
* This is upon deleting a trait from a typed instance.
*
* @param entity the entity
* @param traitNames trait name for the instance that needs to be deleted from entity
* @param traits trait that needs to be added to entity
*
* @throws AtlasException if the listener notification fails
*/
void onTraitsDeleted(Referenceable entity, Collection<String> traitNames) throws AtlasException;
void onTraitsDeleted(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException;
/**
* This is upon updating a trait from a typed instance.
......
......@@ -44,7 +44,7 @@ public class NoOpNotificationChangeListener implements EntityChangeListener {
}
@Override
public void onTraitsDeleted(Referenceable entity, Collection<String> traitNames) throws AtlasException {
public void onTraitsDeleted(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException {
}
......
......@@ -89,7 +89,7 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
}
@Override
public void onClassificationsDeleted(AtlasEntity entity, List<String> classificationNames) throws AtlasBaseException {
public void onClassificationsDeleted(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
notifyEntityEvents(Collections.singletonList(entity), CLASSIFICATION_DELETE);
}
......
......@@ -85,7 +85,7 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
}
@Override
public void onTraitsDeleted(Referenceable entity, Collection<String> traitNames) throws AtlasException {
public void onTraitsDeleted(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException {
notifyOfEntityEvent(Collections.singleton(entity), OperationType.TRAIT_DELETE);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment