Commit 1deb8c16 by Sarath Subramanian

ATLAS-2715: Create audit events for term-entity association and disassociation

parent bcd5bb60
...@@ -48,7 +48,8 @@ public class EntityAuditEvent implements Serializable { ...@@ -48,7 +48,8 @@ public class EntityAuditEvent implements Serializable {
public enum EntityAuditAction { public enum EntityAuditAction {
ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, TAG_ADD, TAG_DELETE, TAG_UPDATE, ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, TAG_ADD, TAG_DELETE, TAG_UPDATE,
PROPAGATED_TAG_ADD, PROPAGATED_TAG_DELETE, PROPAGATED_TAG_UPDATE, PROPAGATED_TAG_ADD, PROPAGATED_TAG_DELETE, PROPAGATED_TAG_UPDATE,
ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE; ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE,
TERM_ADD, TERM_DELETE;
public static EntityAuditAction fromString(String strValue) { public static EntityAuditAction fromString(String strValue) {
switch (strValue) { switch (strValue) {
...@@ -79,6 +80,10 @@ public class EntityAuditEvent implements Serializable { ...@@ -79,6 +80,10 @@ public class EntityAuditEvent implements Serializable {
return PROPAGATED_TAG_DELETE; return PROPAGATED_TAG_DELETE;
case "PROPAGATED_TAG_UPDATE": case "PROPAGATED_TAG_UPDATE":
return PROPAGATED_TAG_UPDATE; return PROPAGATED_TAG_UPDATE;
case "TERM_ADD":
return TERM_ADD;
case "TERM_DELETE":
return TERM_DELETE;
} }
throw new IllegalArgumentException("No enum constant " + EntityAuditAction.class.getCanonicalName() + "." + strValue); throw new IllegalArgumentException("No enum constant " + EntityAuditAction.class.getCanonicalName() + "." + strValue);
......
...@@ -33,7 +33,9 @@ define(['require'], function(require) { ...@@ -33,7 +33,9 @@ define(['require'], function(require) {
PROPAGATED_CLASSIFICATION_UPDATE: "Propagated Classification Updated", PROPAGATED_CLASSIFICATION_UPDATE: "Propagated Classification Updated",
ENTITY_IMPORT_CREATE: "Entity Created by import", ENTITY_IMPORT_CREATE: "Entity Created by import",
ENTITY_IMPORT_UPDATE: "Entity Updated by import", ENTITY_IMPORT_UPDATE: "Entity Updated by import",
ENTITY_IMPORT_DELETE: "Entity Deleted by import" ENTITY_IMPORT_DELETE: "Entity Deleted by import",
TERM_ADD: "Term Added",
TERM_DELETE: "Term Deleted"
} }
Enums.entityStateReadOnly = { Enums.entityStateReadOnly = {
......
...@@ -19,8 +19,10 @@ ...@@ -19,8 +19,10 @@
package org.apache.atlas.listener; package org.apache.atlas.listener;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import java.util.List; import java.util.List;
...@@ -78,4 +80,20 @@ public interface EntityChangeListenerV2 { ...@@ -78,4 +80,20 @@ public interface EntityChangeListenerV2 {
* @throws AtlasBaseException if the listener notification fails * @throws AtlasBaseException if the listener notification fails
*/ */
void onClassificationsDeleted(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException; void onClassificationsDeleted(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException;
/**
* This is upon adding a new term to an entity.
*
* @param term the term
* @param entities list of entities to which the term is assigned
*/
void onTermAdded(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) throws AtlasBaseException;
/**
* This is upon removing a term from an entity.
*
* @param term the term
* @param entities list of entities to which the term is assigned
*/
void onTermDeleted(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) throws AtlasBaseException;
} }
\ No newline at end of file
...@@ -49,7 +49,8 @@ public class EntityAuditEventV2 implements Serializable { ...@@ -49,7 +49,8 @@ public class EntityAuditEventV2 implements Serializable {
ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE,
ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE, ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE,
CLASSIFICATION_ADD, CLASSIFICATION_DELETE, CLASSIFICATION_UPDATE, CLASSIFICATION_ADD, CLASSIFICATION_DELETE, CLASSIFICATION_UPDATE,
PROPAGATED_CLASSIFICATION_ADD, PROPAGATED_CLASSIFICATION_DELETE, PROPAGATED_CLASSIFICATION_UPDATE; PROPAGATED_CLASSIFICATION_ADD, PROPAGATED_CLASSIFICATION_DELETE, PROPAGATED_CLASSIFICATION_UPDATE,
TERM_ADD, TERM_DELETE;
public static EntityAuditActionV2 fromString(String strValue) { public static EntityAuditActionV2 fromString(String strValue) {
switch (strValue) { switch (strValue) {
...@@ -80,6 +81,10 @@ public class EntityAuditEventV2 implements Serializable { ...@@ -80,6 +81,10 @@ public class EntityAuditEventV2 implements Serializable {
return PROPAGATED_CLASSIFICATION_DELETE; return PROPAGATED_CLASSIFICATION_DELETE;
case "PROPAGATED_CLASSIFICATION_UPDATE": case "PROPAGATED_CLASSIFICATION_UPDATE":
return PROPAGATED_CLASSIFICATION_UPDATE; return PROPAGATED_CLASSIFICATION_UPDATE;
case "TERM_ADD":
return TERM_ADD;
case "TERM_DELETE":
return TERM_DELETE;
} }
throw new IllegalArgumentException("No enum constant " + EntityAuditActionV2.class.getCanonicalName() + "." + strValue); throw new IllegalArgumentException("No enum constant " + EntityAuditActionV2.class.getCanonicalName() + "." + strValue);
......
...@@ -24,6 +24,7 @@ import org.apache.atlas.model.glossary.relations.AtlasGlossaryHeader; ...@@ -24,6 +24,7 @@ import org.apache.atlas.model.glossary.relations.AtlasGlossaryHeader;
import org.apache.atlas.model.glossary.relations.AtlasRelatedTermHeader; import org.apache.atlas.model.glossary.relations.AtlasRelatedTermHeader;
import org.apache.atlas.model.glossary.relations.AtlasTermCategorizationHeader; import org.apache.atlas.model.glossary.relations.AtlasTermCategorizationHeader;
import org.apache.atlas.model.instance.AtlasRelatedObjectId; import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import java.util.HashMap; import java.util.HashMap;
...@@ -286,6 +287,16 @@ public class AtlasGlossaryTerm extends AtlasGlossaryBaseObject { ...@@ -286,6 +287,16 @@ public class AtlasGlossaryTerm extends AtlasGlossaryBaseObject {
} }
@JsonIgnore @JsonIgnore
public String toAuditString() {
AtlasGlossaryTerm t = new AtlasGlossaryTerm();
t.setGuid(this.getGuid());
t.setDisplayName(this.getDisplayName());
t.setQualifiedName(this.getQualifiedName());
return AtlasType.toJson(t);
}
@JsonIgnore
public boolean hasTerms() { public boolean hasTerms() {
return hasTerms; return hasTerms;
} }
......
...@@ -31,6 +31,7 @@ import org.apache.atlas.model.instance.AtlasRelatedObjectId; ...@@ -31,6 +31,7 @@ import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.ogm.DataAccess; import org.apache.atlas.repository.ogm.DataAccess;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
import org.apache.atlas.repository.store.graph.v1.AtlasEntityChangeNotifier;
import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1; import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
...@@ -56,24 +57,26 @@ import static org.apache.atlas.glossary.GlossaryUtils.getGlossarySkeleton; ...@@ -56,24 +57,26 @@ import static org.apache.atlas.glossary.GlossaryUtils.getGlossarySkeleton;
@Service @Service
public class GlossaryService { public class GlossaryService {
private static final Logger LOG = LoggerFactory.getLogger(GlossaryService.class); private static final Logger LOG = LoggerFactory.getLogger(GlossaryService.class);
private static final boolean DEBUG_ENABLED = LOG.isDebugEnabled(); private static final boolean DEBUG_ENABLED = LOG.isDebugEnabled();
private static final String QUALIFIED_NAME_ATTR = "qualifiedName";
private static final String QUALIFIED_NAME_ATTR = "qualifiedName"; private final DataAccess dataAccess;
private final GlossaryTermUtils glossaryTermUtils;
private final DataAccess dataAccess; private final GlossaryCategoryUtils glossaryCategoryUtils;
private final GlossaryTermUtils glossaryTermUtils; private final AtlasTypeRegistry atlasTypeRegistry;
private final GlossaryCategoryUtils glossaryCategoryUtils; private final AtlasEntityChangeNotifier entityChangeNotifier;
private final AtlasTypeRegistry atlasTypeRegistry;
private final char[] invalidNameChars = {'@', '.'}; private final char[] invalidNameChars = {'@', '.'};
@Inject @Inject
public GlossaryService(DataAccess dataAccess, final AtlasRelationshipStore relationshipStore, final AtlasTypeRegistry typeRegistry) { public GlossaryService(DataAccess dataAccess, final AtlasRelationshipStore relationshipStore,
this.dataAccess = dataAccess; final AtlasTypeRegistry typeRegistry, AtlasEntityChangeNotifier entityChangeNotifier) {
this.atlasTypeRegistry = typeRegistry; this.dataAccess = dataAccess;
glossaryTermUtils = new GlossaryTermUtils(relationshipStore, typeRegistry, dataAccess); atlasTypeRegistry = typeRegistry;
glossaryCategoryUtils = new GlossaryCategoryUtils(relationshipStore, typeRegistry, dataAccess); glossaryTermUtils = new GlossaryTermUtils(relationshipStore, typeRegistry, dataAccess);
glossaryCategoryUtils = new GlossaryCategoryUtils(relationshipStore, typeRegistry, dataAccess);
this.entityChangeNotifier = entityChangeNotifier;
} }
/** /**
...@@ -477,24 +480,32 @@ public class GlossaryService { ...@@ -477,24 +480,32 @@ public class GlossaryService {
if (DEBUG_ENABLED) { if (DEBUG_ENABLED) {
LOG.debug("==> GlossaryService.assignTermToEntities({}, {})", termGuid, relatedObjectIds); LOG.debug("==> GlossaryService.assignTermToEntities({}, {})", termGuid, relatedObjectIds);
} }
AtlasGlossaryTerm glossaryTerm = dataAccess.load(getAtlasGlossaryTermSkeleton(termGuid)); AtlasGlossaryTerm glossaryTerm = dataAccess.load(getAtlasGlossaryTermSkeleton(termGuid));
glossaryTermUtils.processTermAssignments(glossaryTerm, relatedObjectIds); glossaryTermUtils.processTermAssignments(glossaryTerm, relatedObjectIds);
entityChangeNotifier.onTermAddedToEntities(glossaryTerm, relatedObjectIds);
if (DEBUG_ENABLED) { if (DEBUG_ENABLED) {
LOG.debug("<== GlossaryService.assignTermToEntities()"); LOG.debug("<== GlossaryService.assignTermToEntities()");
} }
} }
@GraphTransaction @GraphTransaction
public void removeTermFromEntities(String termGuid, List<AtlasRelatedObjectId> relatedObjectIds) throws AtlasBaseException { public void removeTermFromEntities(String termGuid, List<AtlasRelatedObjectId> relatedObjectIds) throws AtlasBaseException {
if (LOG.isDebugEnabled()) { if (DEBUG_ENABLED) {
LOG.debug("==> GlossaryService.removeTermFromEntities({}, {})", termGuid, relatedObjectIds); LOG.debug("==> GlossaryService.removeTermFromEntities({}, {})", termGuid, relatedObjectIds);
} }
AtlasGlossaryTerm glossaryTerm = dataAccess.load(getAtlasGlossaryTermSkeleton(termGuid)); AtlasGlossaryTerm glossaryTerm = dataAccess.load(getAtlasGlossaryTermSkeleton(termGuid));
glossaryTermUtils.processTermDissociation(glossaryTerm, relatedObjectIds); glossaryTermUtils.processTermDissociation(glossaryTerm, relatedObjectIds);
if (LOG.isDebugEnabled()) { entityChangeNotifier.onTermDeletedFromEntities(glossaryTerm, relatedObjectIds);
if (DEBUG_ENABLED) {
LOG.debug("<== GlossaryService.removeTermFromEntities()"); LOG.debug("<== GlossaryService.removeTermFromEntities()");
} }
} }
......
...@@ -23,6 +23,7 @@ import org.apache.atlas.EntityAuditEvent; ...@@ -23,6 +23,7 @@ import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.EntityAuditEvent.EntityAuditAction; import org.apache.atlas.EntityAuditEvent.EntityAuditAction;
import org.apache.atlas.RequestContextV1; import org.apache.atlas.RequestContextV1;
import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct; import org.apache.atlas.v1.model.instance.Struct;
import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasEntityType;
...@@ -43,6 +44,9 @@ import java.util.HashMap; ...@@ -43,6 +44,9 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TERM_ADD;
import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TERM_DELETE;
/** /**
* Listener on entity create/update/delete, tag add/delete. Adds the corresponding audit event to the audit repository. * Listener on entity create/update/delete, tag add/delete. Adds the corresponding audit event to the audit repository.
*/ */
...@@ -127,6 +131,28 @@ public class EntityAuditListener implements EntityChangeListener { ...@@ -127,6 +131,28 @@ public class EntityAuditListener implements EntityChangeListener {
auditRepository.putEventsV1(events); auditRepository.putEventsV1(events);
} }
@Override
public void onTermAdded(Collection<Referenceable> entities, AtlasGlossaryTerm term) throws AtlasException {
List<EntityAuditEvent> events = new ArrayList<>();
for (Referenceable entity : entities) {
events.add(createEvent(entity, TERM_ADD, "Added term: " + term.toAuditString()));
}
auditRepository.putEventsV1(events);
}
@Override
public void onTermDeleted(Collection<Referenceable> entities, AtlasGlossaryTerm term) throws AtlasException {
List<EntityAuditEvent> events = new ArrayList<>();
for (Referenceable entity : entities) {
events.add(createEvent(entity, TERM_DELETE, "Deleted term: " + term.toAuditString()));
}
auditRepository.putEventsV1(events);
}
public List<EntityAuditEvent> getAuditEvents(String guid) throws AtlasException{ public List<EntityAuditEvent> getAuditEvents(String guid) throws AtlasException{
return auditRepository.listEventsV1(guid, null, (short) 10); return auditRepository.listEventsV1(guid, null, (short) 10);
} }
...@@ -290,6 +316,12 @@ public class EntityAuditListener implements EntityChangeListener { ...@@ -290,6 +316,12 @@ public class EntityAuditListener implements EntityChangeListener {
case ENTITY_IMPORT_DELETE: case ENTITY_IMPORT_DELETE:
ret = "Deleted by import: "; ret = "Deleted by import: ";
break; break;
case TERM_ADD:
ret = "Added term: ";
break;
case TERM_DELETE:
ret = "Deleted term: ";
break;
default: default:
ret = "Unknown: "; ret = "Unknown: ";
} }
...@@ -328,6 +360,12 @@ public class EntityAuditListener implements EntityChangeListener { ...@@ -328,6 +360,12 @@ public class EntityAuditListener implements EntityChangeListener {
case ENTITY_IMPORT_DELETE: case ENTITY_IMPORT_DELETE:
ret = "Deleted by import: "; ret = "Deleted by import: ";
break; break;
case TERM_ADD:
ret = "Added term: ";
break;
case TERM_DELETE:
ret = "Deleted term: ";
break;
default: default:
ret = "Unknown: "; ret = "Unknown: ";
} }
......
...@@ -17,15 +17,18 @@ ...@@ -17,15 +17,18 @@
*/ */
package org.apache.atlas.repository.audit; package org.apache.atlas.repository.audit;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.EntityAuditEvent.EntityAuditAction; import org.apache.atlas.EntityAuditEvent.EntityAuditAction;
import org.apache.atlas.RequestContextV1; import org.apache.atlas.RequestContextV1;
import org.apache.atlas.model.audit.EntityAuditEventV2; import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2; import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.listener.EntityChangeListenerV2; import org.apache.atlas.listener.EntityChangeListenerV2;
import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute; import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
...@@ -56,18 +59,22 @@ import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV ...@@ -56,18 +59,22 @@ import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_ADD; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_ADD;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_DELETE; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_DELETE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_UPDATE; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_UPDATE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.TERM_ADD;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.TERM_DELETE;
@Component @Component
public class EntityAuditListenerV2 implements EntityChangeListenerV2 { public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
private static final Logger LOG = LoggerFactory.getLogger(EntityAuditListenerV2.class); private static final Logger LOG = LoggerFactory.getLogger(EntityAuditListenerV2.class);
private final EntityAuditRepository auditRepository; private final EntityAuditRepository auditRepository;
private final AtlasTypeRegistry typeRegistry; private final AtlasTypeRegistry typeRegistry;
private final AtlasInstanceConverter instanceConverter;
@Inject @Inject
public EntityAuditListenerV2(EntityAuditRepository auditRepository, AtlasTypeRegistry typeRegistry) { public EntityAuditListenerV2(EntityAuditRepository auditRepository, AtlasTypeRegistry typeRegistry, AtlasInstanceConverter instanceConverter) {
this.auditRepository = auditRepository; this.auditRepository = auditRepository;
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.instanceConverter = instanceConverter;
} }
@Override @Override
...@@ -167,6 +174,42 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 { ...@@ -167,6 +174,42 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
} }
} }
@Override
public void onTermAdded(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) throws AtlasBaseException {
if (term != null && CollectionUtils.isNotEmpty(entities)) {
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasRelatedObjectId relatedObjectId : entities) {
AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(relatedObjectId.getGuid());
AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
if (entity != null) {
events.add(createEvent(entity, TERM_ADD, "Added term: " + term.toAuditString()));
}
}
auditRepository.putEventsV2(events);
}
}
@Override
public void onTermDeleted(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) throws AtlasBaseException {
if (term != null && CollectionUtils.isNotEmpty(entities)) {
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasRelatedObjectId relatedObjectId : entities) {
AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(relatedObjectId.getGuid());
AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
if (entity != null) {
events.add(createEvent(entity, TERM_DELETE, "Deleted term: " + term.toAuditString()));
}
}
auditRepository.putEventsV2(events);
}
}
private EntityAuditEventV2 createEvent(AtlasEntity entity, EntityAuditActionV2 action, String details) { private EntityAuditEventV2 createEvent(AtlasEntity entity, EntityAuditActionV2 action, String details) {
return new EntityAuditEventV2(entity.getGuid(), RequestContextV1.get().getRequestTime(), return new EntityAuditEventV2(entity.getGuid(), RequestContextV1.get().getRequestTime(),
RequestContextV1.get().getUser(), action, details, entity); RequestContextV1.get().getUser(), action, details, entity);
...@@ -310,6 +353,12 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 { ...@@ -310,6 +353,12 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
case ENTITY_IMPORT_DELETE: case ENTITY_IMPORT_DELETE:
ret = "Deleted by import: "; ret = "Deleted by import: ";
break; break;
case TERM_ADD:
ret = "Added term: ";
break;
case TERM_DELETE:
ret = "Deleted term: ";
break;
default: default:
ret = "Unknown: "; ret = "Unknown: ";
} }
...@@ -348,6 +397,12 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 { ...@@ -348,6 +397,12 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
case ENTITY_IMPORT_DELETE: case ENTITY_IMPORT_DELETE:
ret = "Deleted by import: "; ret = "Deleted by import: ";
break; break;
case TERM_ADD:
ret = "Added term: ";
break;
case TERM_DELETE:
ret = "Deleted term: ";
break;
default: default:
ret = "Unknown: "; ret = "Unknown: ";
} }
......
...@@ -70,6 +70,8 @@ import java.util.List; ...@@ -70,6 +70,8 @@ import java.util.List;
import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_ADD; import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_ADD;
import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_DELETE; import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_DELETE;
import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_UPDATE; import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_UPDATE;
import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TERM_ADD;
import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TERM_DELETE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditType; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditType;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditType.ENTITY_AUDIT_V1; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditType.ENTITY_AUDIT_V1;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditType.ENTITY_AUDIT_V2; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditType.ENTITY_AUDIT_V2;
...@@ -319,19 +321,25 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito ...@@ -319,19 +321,25 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
if (StringUtils.isNotEmpty(v1DetailsWithPrefix)) { if (StringUtils.isNotEmpty(v1DetailsWithPrefix)) {
EntityAuditAction v1AuditAction = EntityAuditAction.fromString(getResultString(result, COLUMN_ACTION)); EntityAuditAction v1AuditAction = EntityAuditAction.fromString(getResultString(result, COLUMN_ACTION));
String v1AuditPrefix = EntityAuditListener.getV1AuditPrefix(v1AuditAction);
String[] split = v1DetailsWithPrefix.split(v1AuditPrefix);
if (ArrayUtils.isNotEmpty(split) && split.length == 2) { if (v1AuditAction == TERM_ADD || v1AuditAction == TERM_DELETE) {
String v1AuditDetails = split[1]; // for terms audit v1 and v2 structure is same
Referenceable referenceable = AtlasType.fromV1Json(v1AuditDetails, Referenceable.class); ret = v1DetailsWithPrefix;
String v2Json = (referenceable != null) ? toV2Json(referenceable, v1AuditAction) : v1AuditDetails; } else {
String v1AuditPrefix = EntityAuditListener.getV1AuditPrefix(v1AuditAction);
String[] split = v1DetailsWithPrefix.split(v1AuditPrefix);
if (ArrayUtils.isNotEmpty(split) && split.length == 2) {
String v1AuditDetails = split[1];
Referenceable referenceable = AtlasType.fromV1Json(v1AuditDetails, Referenceable.class);
String v2Json = (referenceable != null) ? toV2Json(referenceable, v1AuditAction) : v1AuditDetails;
if (v2Json != null) { if (v2Json != null) {
ret = getV2AuditPrefix(v1AuditAction) + v2Json; ret = getV2AuditPrefix(v1AuditAction) + v2Json;
}
} else {
ret = v1DetailsWithPrefix;
} }
} else {
ret = v1DetailsWithPrefix;
} }
} }
......
...@@ -25,11 +25,13 @@ import org.apache.atlas.exception.AtlasBaseException; ...@@ -25,11 +25,13 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.listener.EntityChangeListenerV2; import org.apache.atlas.listener.EntityChangeListenerV2;
import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2; import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2;
import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations.EntityOperation; import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasEntityType;
...@@ -192,6 +194,44 @@ public class AtlasEntityChangeNotifier { ...@@ -192,6 +194,44 @@ public class AtlasEntityChangeNotifier {
} }
} }
public void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
// listeners notified on term-entity association only if v2 notifications are enabled
if (isV2EntityNotificationEnabled()) {
for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
listener.onTermAdded(term, entityIds);
}
} else {
List<Referenceable> entityRefs = toReferenceables(entityIds);
for (EntityChangeListener listener : entityChangeListeners) {
try {
listener.onTermAdded(entityRefs, term);
} catch (AtlasException e) {
throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TermAdd");
}
}
}
}
public void onTermDeletedFromEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
// listeners notified on term-entity disassociation only if v2 notifications are enabled
if (isV2EntityNotificationEnabled()) {
for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
listener.onTermDeleted(term, entityIds);
}
} else {
List<Referenceable> entityRefs = toReferenceables(entityIds);
for (EntityChangeListener listener : entityChangeListeners) {
try {
listener.onTermDeleted(entityRefs, term);
} catch (AtlasException e) {
throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TermDelete");
}
}
}
}
public void notifyPropagatedEntities() throws AtlasBaseException { public void notifyPropagatedEntities() throws AtlasBaseException {
RequestContextV1 context = RequestContextV1.get(); RequestContextV1 context = RequestContextV1.get();
Map<String, List<AtlasClassification>> addedPropagations = context.getAddedPropagations(); Map<String, List<AtlasClassification>> addedPropagations = context.getAddedPropagations();
...@@ -297,6 +337,20 @@ public class AtlasEntityChangeNotifier { ...@@ -297,6 +337,20 @@ public class AtlasEntityChangeNotifier {
return ret; return ret;
} }
private List<Referenceable> toReferenceables(List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
List<Referenceable> ret = new ArrayList<>();
if (CollectionUtils.isNotEmpty(entityIds)) {
for (AtlasRelatedObjectId relatedObjectId : entityIds) {
String entityGuid = relatedObjectId.getGuid();
ret.add(toReferenceable(entityGuid));
}
}
return ret;
}
private Referenceable toReferenceable(String entityId) throws AtlasBaseException { private Referenceable toReferenceable(String entityId) throws AtlasBaseException {
Referenceable ret = null; Referenceable ret = null;
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package org.apache.atlas.listener; package org.apache.atlas.listener;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct; import org.apache.atlas.v1.model.instance.Struct;
...@@ -86,4 +87,22 @@ public interface EntityChangeListener { ...@@ -86,4 +87,22 @@ public interface EntityChangeListener {
* @throws AtlasException * @throws AtlasException
*/ */
void onEntitiesDeleted(Collection<Referenceable> entities, boolean isImport) throws AtlasException; void onEntitiesDeleted(Collection<Referenceable> entities, boolean isImport) throws AtlasException;
/**
* This is upon adding a new term to a list of typed instance.
*
* @param entities entity list
* @param term term that needs to be added to entity
* @throws AtlasException if the listener notification fails
*/
void onTermAdded(Collection<Referenceable> entities, AtlasGlossaryTerm term) throws AtlasException;
/**
* This is upon adding a new trait to a typed instance.
*
* @param entities entity list
* @param term term that needs to be added to entity
* @throws AtlasException if the listener notification fails
*/
void onTermDeleted(Collection<Referenceable> entities, AtlasGlossaryTerm term) throws AtlasException;
} }
...@@ -20,6 +20,7 @@ package org.apache.atlas.migration; ...@@ -20,6 +20,7 @@ package org.apache.atlas.migration;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct; import org.apache.atlas.v1.model.instance.Struct;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -57,4 +58,14 @@ public class NoOpNotificationChangeListener implements EntityChangeListener { ...@@ -57,4 +58,14 @@ public class NoOpNotificationChangeListener implements EntityChangeListener {
public void onEntitiesDeleted(Collection<Referenceable> entities, boolean isImport) throws AtlasException { public void onEntitiesDeleted(Collection<Referenceable> entities, boolean isImport) throws AtlasException {
} }
@Override
public void onTermAdded(Collection<Referenceable> entities, AtlasGlossaryTerm term) throws AtlasException {
}
@Override
public void onTermDeleted(Collection<Referenceable> entities, AtlasGlossaryTerm term) throws AtlasException {
}
} }
...@@ -21,9 +21,11 @@ import org.apache.atlas.AtlasErrorCode; ...@@ -21,9 +21,11 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContextV1; import org.apache.atlas.RequestContextV1;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.listener.EntityChangeListenerV2; import org.apache.atlas.listener.EntityChangeListenerV2;
import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2; import org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2;
import org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2.OperationType; import org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2.OperationType;
import org.apache.atlas.type.AtlasClassificationType; import org.apache.atlas.type.AtlasClassificationType;
...@@ -102,6 +104,16 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 { ...@@ -102,6 +104,16 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
notifyEntityEvents(Collections.singletonList(entity), CLASSIFICATION_DELETE); notifyEntityEvents(Collections.singletonList(entity), CLASSIFICATION_DELETE);
} }
@Override
public void onTermAdded(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) {
// do nothing -> notification not sent out for term assignment to entities
}
@Override
public void onTermDeleted(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) {
// do nothing -> notification not sent out for term removal from entities
}
private void notifyEntityEvents(List<AtlasEntity> entities, OperationType operationType) throws AtlasBaseException { private void notifyEntityEvents(List<AtlasEntity> entities, OperationType operationType) throws AtlasBaseException {
List<EntityNotificationV2> messages = new ArrayList<>(); List<EntityNotificationV2> messages = new ArrayList<>();
......
...@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; ...@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
import org.apache.atlas.notification.NotificationInterface.NotificationType; import org.apache.atlas.notification.NotificationInterface.NotificationType;
import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct; import org.apache.atlas.v1.model.instance.Struct;
...@@ -99,6 +100,16 @@ public class NotificationEntityChangeListener implements EntityChangeListener { ...@@ -99,6 +100,16 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
notifyOfEntityEvent(entities, OperationType.ENTITY_DELETE); notifyOfEntityEvent(entities, OperationType.ENTITY_DELETE);
} }
@Override
public void onTermAdded(Collection<Referenceable> entities, AtlasGlossaryTerm term) throws AtlasException {
// do nothing
}
@Override
public void onTermDeleted(Collection<Referenceable> entities, AtlasGlossaryTerm term) throws AtlasException {
// do nothing
}
// ----- helper methods ------------------------------------------------- // ----- helper methods -------------------------------------------------
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment