Commit 0feb60a2 by Sarath Subramanian Committed by Madhan Neethiraj

ATLAS 1607: notify listeners on classification addition/deletion

parent 48c10133
......@@ -77,18 +77,26 @@ public class EntityAuditListener implements EntityChangeListener {
}
@Override
public void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException {
EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_ADD,
"Added trait: " + InstanceSerialization.toJson(trait, true));
public void onTraitsAdded(ITypedReferenceableInstance entity, Collection<? extends IStruct> traits) throws AtlasException {
if (traits != null) {
for (IStruct trait : traits) {
EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_ADD,
"Added trait: " + InstanceSerialization.toJson(trait, true));
auditRepository.putEvents(event);
auditRepository.putEvents(event);
}
}
}
@Override
public void onTraitDeleted(ITypedReferenceableInstance entity, String traitName) throws AtlasException {
EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_DELETE, "Deleted trait: " + traitName);
public void onTraitsDeleted(ITypedReferenceableInstance entity, Collection<String> traitNames) throws AtlasException {
if (traitNames != null) {
for (String traitName : traitNames) {
EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_DELETE, "Deleted trait: " + traitName);
auditRepository.putEvents(event);
auditRepository.putEvents(event);
}
}
}
@Override
......
......@@ -24,6 +24,7 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
......@@ -32,8 +33,10 @@ import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.graph.*;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.ITypedStruct;
import org.apache.atlas.util.AtlasRepositoryConfiguration;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -105,6 +108,39 @@ public class AtlasEntityChangeNotifier {
}
}
public void onClassificationAddedToEntity(String entityId, List<AtlasClassification> classifications) throws AtlasBaseException {
ITypedReferenceableInstance entity = toITypedReferenceable(entityId);
List<ITypedStruct> traits = toITypedStructs(classifications);
if (entity == null || CollectionUtils.isEmpty(traits)) {
return;
}
for (EntityChangeListener listener : entityChangeListeners) {
try {
listener.onTraitsAdded(entity, traits);
} catch (AtlasException e) {
throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e);
}
}
}
public void onClassificationDeletedFromEntity(String entityId, List<String> traitNames) throws AtlasBaseException {
ITypedReferenceableInstance entity = toITypedReferenceable(entityId);
if (entity == null || CollectionUtils.isEmpty(traitNames)) {
return;
}
for (EntityChangeListener listener : entityChangeListeners) {
try {
listener.onTraitsDeleted(entity, traitNames);
} catch (AtlasException e) {
throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e);
}
}
}
private void notifyListeners(List<ITypedReferenceableInstance> typedRefInsts, EntityOperation operation) throws AtlasBaseException {
for (EntityChangeListener listener : entityChangeListeners) {
try {
......@@ -136,6 +172,32 @@ public class AtlasEntityChangeNotifier {
return ret;
}
private ITypedReferenceableInstance toITypedReferenceable(String entityId) throws AtlasBaseException {
ITypedReferenceableInstance ret = null;
if (StringUtils.isNotEmpty(entityId)) {
ret = instanceConverter.getITypedReferenceable(entityId);
}
return ret;
}
private List<ITypedStruct> toITypedStructs(List<AtlasClassification> classifications) throws AtlasBaseException {
List<ITypedStruct> ret = null;
if (classifications != null) {
ret = new ArrayList<>(classifications.size());
for (AtlasClassification classification : classifications) {
if (classification != null) {
ret.add(instanceConverter.getTrait(classification));
}
}
}
return ret;
}
private void doFullTextMapping(List<AtlasEntityHeader> atlasEntityHeaders) {
try {
if(!AtlasRepositoryConfiguration.isFullTextSearchEnabled()) {
......
......@@ -430,6 +430,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
EntityGraphMapper graphMapper = new EntityGraphMapper(deleteHandler, typeRegistry);
graphMapper.addClassifications(new EntityMutationContext(), guid, classifications);
// notify listeners on classification addition
entityChangeNotifier.onClassificationAddedToEntity(guid, classifications);
}
@Override
......@@ -448,8 +450,13 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
EntityGraphMapper graphMapper = new EntityGraphMapper(deleteHandler, typeRegistry);
List<AtlasClassification> classifications = Collections.singletonList(classification);
for (String guid : guids) {
graphMapper.addClassifications(new EntityMutationContext(), guid, Collections.singletonList(classification));
graphMapper.addClassifications(new EntityMutationContext(), guid, classifications);
// notify listeners on classification addition
entityChangeNotifier.onClassificationAddedToEntity(guid, classifications);
}
}
......@@ -470,6 +477,9 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
EntityGraphMapper entityGraphMapper = new EntityGraphMapper(deleteHandler, typeRegistry);
entityGraphMapper.deleteClassifications(guid, classificationNames);
// notify listeners on classification deletion
entityChangeNotifier.onClassificationDeletedFromEntity(guid, classificationNames);
}
@Override
......
......@@ -72,11 +72,7 @@ import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.*;
import javax.inject.Inject;
import javax.inject.Singleton;
......@@ -724,14 +720,18 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
}
private void onTraitAddedToEntity(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException {
Collection<IStruct> traits = Collections.singletonList(trait);
for (EntityChangeListener listener : entityChangeListeners) {
listener.onTraitAdded(entity, trait);
listener.onTraitsAdded(entity, traits);
}
}
private void onTraitDeletedFromEntity(ITypedReferenceableInstance entity, String traitName) throws AtlasException {
Collection<String> traitNames = Collections.singletonList(traitName);
for (EntityChangeListener listener : entityChangeListeners) {
listener.onTraitDeleted(entity, traitName);
listener.onTraitsDeleted(entity, traitNames);
}
}
......
......@@ -1258,12 +1258,12 @@ public class DefaultMetadataServiceTest {
}
@Override
public void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait)
public void onTraitsAdded(ITypedReferenceableInstance entity, Collection<? extends IStruct> traits)
throws AtlasException {
}
@Override
public void onTraitDeleted(ITypedReferenceableInstance entity, String traitName)
public void onTraitsDeleted(ITypedReferenceableInstance entity, Collection<String> traitNames)
throws AtlasException {
}
......
......@@ -52,21 +52,21 @@ public interface EntityChangeListener {
* This is upon adding a new trait to a typed instance.
*
* @param entity the entity
* @param trait trait that needs to be added to entity
* @param traits trait that needs to be added to entity
*
* @throws AtlasException if the listener notification fails
*/
void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException;
void onTraitsAdded(ITypedReferenceableInstance entity, Collection<? extends IStruct> traits) throws AtlasException;
/**
* This is upon deleting a trait from a typed instance.
*
* @param entity the entity
* @param traitName trait name for the instance that needs to be deleted from entity
* @param traitNames trait name for the instance that needs to be deleted from entity
*
* @throws AtlasException if the listener notification fails
*/
void onTraitDeleted(ITypedReferenceableInstance entity, String traitName) throws AtlasException;
void onTraitsDeleted(ITypedReferenceableInstance entity, Collection<String> traitNames) throws AtlasException;
/**
* This is upon deleting entities from the repository.
......
......@@ -87,12 +87,12 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
}
@Override
public void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException {
public void onTraitsAdded(ITypedReferenceableInstance entity, Collection<? extends IStruct> traits) throws AtlasException {
notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.TRAIT_ADD);
}
@Override
public void onTraitDeleted(ITypedReferenceableInstance entity, String traitName) throws AtlasException {
public void onTraitsDeleted(ITypedReferenceableInstance entity, Collection<String> traitNames) throws AtlasException {
notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.TRAIT_DELETE);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment