Commit 80135a8d by Mandar Ambawane Committed by Madhan Neethiraj

ATLAS-3689: added entity-audit entries on business attributes add/update/delete to an entity

parent c135962b
...@@ -39,8 +39,7 @@ define(['require'], function(require) { ...@@ -39,8 +39,7 @@ define(['require'], function(require) {
LABEL_ADD: "Label(s) Added", LABEL_ADD: "Label(s) Added",
LABEL_DELETE: "Label(s) Deleted", LABEL_DELETE: "Label(s) Deleted",
ENTITY_PURGE: "Entity Purged", ENTITY_PURGE: "Entity Purged",
BUSINESS_ATTRIBUTE_ADD: "Business Attribute(s) Added", BUSINESS_ATTRIBUTE_UPDATE: "Business Attribute(s) Updated"
BUSINESS_ATTRIBUTE_DELETE: "Business Attribute(s) Deleted"
} }
Enums.entityStateReadOnly = { Enums.entityStateReadOnly = {
...@@ -214,4 +213,4 @@ define(['require'], function(require) { ...@@ -214,4 +213,4 @@ define(['require'], function(require) {
1: "true" 1: "true"
}; };
return Enums; return Enums;
}); });
\ No newline at end of file
...@@ -124,7 +124,11 @@ define(['require', ...@@ -124,7 +124,11 @@ define(['require',
relationshipAttributes = parseDetailsObject.relationshipAttributes; relationshipAttributes = parseDetailsObject.relationshipAttributes;
if (attributesDetails) { if (attributesDetails) {
that.ui.attributeDetails.removeClass('hide'); that.ui.attributeDetails.removeClass('hide');
that.action.indexOf("Classification") === -1 ? that.ui.panelAttrHeading.html("Technical properties ") : that.ui.panelAttrHeading.html("Properties "); if (that.action.includes("Classification") || that.action.includes("Business Attribute") != -1) {
that.ui.panelAttrHeading.html("Properties ");
} else {
that.ui.panelAttrHeading.html("Technical properties ");
}
var attrTable = that.createTableWithValues(attributesDetails); var attrTable = that.createTableWithValues(attributesDetails);
that.ui.attributeCard.html( that.ui.attributeCard.html(
attrTable); attrTable);
...@@ -174,4 +178,4 @@ define(['require', ...@@ -174,4 +178,4 @@ define(['require',
} }
}); });
return CreateAuditTableLayoutView; return CreateAuditTableLayoutView;
}); });
\ No newline at end of file
...@@ -26,6 +26,7 @@ import org.apache.atlas.model.instance.AtlasRelatedObjectId; ...@@ -26,6 +26,7 @@ import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.instance.AtlasRelationship; import org.apache.atlas.model.instance.AtlasRelationship;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
/** /**
...@@ -173,4 +174,12 @@ public interface EntityChangeListenerV2 { ...@@ -173,4 +174,12 @@ public interface EntityChangeListenerV2 {
* @throws AtlasBaseException if the listener notification fails * @throws AtlasBaseException if the listener notification fails
*/ */
void onLabelsDeleted(AtlasEntity entity, Set<String> labels) throws AtlasBaseException; void onLabelsDeleted(AtlasEntity entity, Set<String> labels) throws AtlasBaseException;
/**
*
* @param entity the entity
* @param updatedBusinessAttributes business metadata attribute
* @throws AtlasBaseException if the listener notification fails
*/
void onBusinessAttributesUpdated(AtlasEntity entity, Map<String, Map<String, Object>> updatedBusinessAttributes) throws AtlasBaseException;
} }
\ No newline at end of file
...@@ -50,7 +50,8 @@ public class EntityAuditEventV2 implements Serializable { ...@@ -50,7 +50,8 @@ public class EntityAuditEventV2 implements Serializable {
ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE, ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE,
CLASSIFICATION_ADD, CLASSIFICATION_DELETE, CLASSIFICATION_UPDATE, CLASSIFICATION_ADD, CLASSIFICATION_DELETE, CLASSIFICATION_UPDATE,
PROPAGATED_CLASSIFICATION_ADD, PROPAGATED_CLASSIFICATION_DELETE, PROPAGATED_CLASSIFICATION_UPDATE, PROPAGATED_CLASSIFICATION_ADD, PROPAGATED_CLASSIFICATION_DELETE, PROPAGATED_CLASSIFICATION_UPDATE,
TERM_ADD, TERM_DELETE, LABEL_ADD, LABEL_DELETE, ENTITY_PURGE; TERM_ADD, TERM_DELETE, LABEL_ADD, LABEL_DELETE, ENTITY_PURGE,
BUSINESS_ATTRIBUTE_UPDATE;
public static EntityAuditActionV2 fromString(String strValue) { public static EntityAuditActionV2 fromString(String strValue) {
switch (strValue) { switch (strValue) {
...@@ -91,6 +92,8 @@ public class EntityAuditEventV2 implements Serializable { ...@@ -91,6 +92,8 @@ public class EntityAuditEventV2 implements Serializable {
return LABEL_ADD; return LABEL_ADD;
case "LABEL_DELETE": case "LABEL_DELETE":
return LABEL_DELETE; return LABEL_DELETE;
case "BUSINESS_ATTRIBUTE_UPDATE":
return BUSINESS_ATTRIBUTE_UPDATE;
} }
throw new IllegalArgumentException("No enum constant " + EntityAuditActionV2.class.getCanonicalName() + "." + strValue); throw new IllegalArgumentException("No enum constant " + EntityAuditActionV2.class.getCanonicalName() + "." + strValue);
......
...@@ -28,11 +28,13 @@ import org.apache.atlas.model.instance.AtlasClassification; ...@@ -28,11 +28,13 @@ import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasRelatedObjectId; import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.instance.AtlasRelationship; import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute; import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasJson;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder; import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils; import org.apache.commons.collections.MapUtils;
...@@ -50,6 +52,7 @@ import java.util.List; ...@@ -50,6 +52,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.BUSINESS_ATTRIBUTE_UPDATE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_ADD; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_ADD;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_DELETE; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_DELETE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_UPDATE; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_UPDATE;
...@@ -341,6 +344,57 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 { ...@@ -341,6 +344,57 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
} }
} }
@Override
public void onRelationshipsAdded(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("New relationship(s) added to repository(" + relationships.size() + ")");
}
}
@Override
public void onRelationshipsUpdated(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Relationship(s) updated(" + relationships.size() + ")");
}
}
@Override
public void onRelationshipsDeleted(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Relationship(s) deleted from repository(" + relationships.size() + ")");
}
}
@Override
public void onRelationshipsPurged(List<AtlasRelationship> relationships) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Relationship(s) purged from repository(" + relationships.size() + ")");
}
}
@Override
public void onBusinessAttributesUpdated(AtlasEntity entity, Map<String, Map<String, Object>> updatedBusinessAttributes) throws AtlasBaseException {
if (MapUtils.isNotEmpty(updatedBusinessAttributes)) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEventV2> auditEvents = new ArrayList<>();
for (Map.Entry<String, Map<String, Object>> entry : updatedBusinessAttributes.entrySet()) {
String bmName = entry.getKey();
Map<String, Object> attributes = entry.getValue();
String details = AtlasJson.toJson(new AtlasStruct(bmName, attributes));
EntityAuditEventV2 auditEvent = createEvent(entity, BUSINESS_ATTRIBUTE_UPDATE, "Updated business attributes: " + details);
auditEvents.add(auditEvent);
}
auditRepository.putEventsV2(auditEvents);
RequestContext.get().endMetricRecord(metric);
}
}
private EntityAuditEventV2 createEvent(AtlasEntity entity, EntityAuditActionV2 action, String details) { private EntityAuditEventV2 createEvent(AtlasEntity entity, EntityAuditActionV2 action, String details) {
return new EntityAuditEventV2(entity.getGuid(), RequestContext.get().getRequestTime(), return new EntityAuditEventV2(entity.getGuid(), RequestContext.get().getRequestTime(),
RequestContext.get().getUser(), action, details, entity); RequestContext.get().getUser(), action, details, entity);
...@@ -566,32 +620,4 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 { ...@@ -566,32 +620,4 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
return ret; return ret;
} }
@Override
public void onRelationshipsAdded(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("New relationship(s) added to repository(" + relationships.size() + ")");
}
}
@Override
public void onRelationshipsUpdated(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Relationship(s) updated(" + relationships.size() + ")");
}
}
@Override
public void onRelationshipsDeleted(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Relationship(s) deleted from repository(" + relationships.size() + ")");
}
}
@Override
public void onRelationshipsPurged(List<AtlasRelationship> relationships) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Relationship(s) purged from repository(" + relationships.size() + ")");
}
}
} }
...@@ -359,6 +359,18 @@ public class AtlasEntityChangeNotifier implements IAtlasEntityChangeNotifier { ...@@ -359,6 +359,18 @@ public class AtlasEntityChangeNotifier implements IAtlasEntityChangeNotifier {
notifyPropagatedEntities(removedPropagations, PROPAGATED_CLASSIFICATION_DELETE); notifyPropagatedEntities(removedPropagations, PROPAGATED_CLASSIFICATION_DELETE);
} }
@Override
public void onBusinessAttributesUpdated(String entityGuid, Map<String, Map<String, Object>> updatedBusinessAttributes) throws AtlasBaseException{
if (isV2EntityNotificationEnabled) {
AtlasEntity entity = instanceConverter.getAndCacheEntity(entityGuid);
for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
listener.onBusinessAttributesUpdated(entity, updatedBusinessAttributes);
}
}
}
private void notifyPropagatedEntities(Map<String, List<AtlasClassification>> entityPropagationMap, EntityAuditActionV2 action) throws AtlasBaseException { private void notifyPropagatedEntities(Map<String, List<AtlasClassification>> entityPropagationMap, EntityAuditActionV2 action) throws AtlasBaseException {
if (MapUtils.isEmpty(entityPropagationMap) || action == null) { if (MapUtils.isEmpty(entityPropagationMap) || action == null) {
return; return;
...@@ -763,5 +775,4 @@ public class AtlasEntityChangeNotifier implements IAtlasEntityChangeNotifier { ...@@ -763,5 +775,4 @@ public class AtlasEntityChangeNotifier implements IAtlasEntityChangeNotifier {
} }
} }
} }
} }
...@@ -399,24 +399,6 @@ public class EntityGraphMapper { ...@@ -399,24 +399,6 @@ public class EntityGraphMapper {
} }
} }
private void updateLabels(AtlasVertex vertex, Set<String> labels) {
if (CollectionUtils.isNotEmpty(labels)) {
AtlasGraphUtilsV2.setEncodedProperty(vertex, LABELS_PROPERTY_KEY, getLabelString(labels));
} else {
vertex.removeProperty(LABELS_PROPERTY_KEY);
}
}
private String getLabelString(Collection<String> labels) {
String ret = null;
if (!labels.isEmpty()) {
ret = LABEL_NAME_DELIMITER + String.join(LABEL_NAME_DELIMITER, labels) + LABEL_NAME_DELIMITER;
}
return ret;
}
/* /*
* reset/overwrite business attributes of the entity with given values * reset/overwrite business attributes of the entity with given values
*/ */
...@@ -426,6 +408,7 @@ public class EntityGraphMapper { ...@@ -426,6 +408,7 @@ public class EntityGraphMapper {
} }
Map<String, Map<String, AtlasBusinessAttribute>> entityTypeBusinessAttributes = entityType.getBusinessAttributes(); Map<String, Map<String, AtlasBusinessAttribute>> entityTypeBusinessAttributes = entityType.getBusinessAttributes();
Map<String, Map<String, Object>> updatedBusinessAttributes = new HashMap<>();
for (Map.Entry<String, Map<String, AtlasBusinessAttribute>> entry : entityTypeBusinessAttributes.entrySet()) { for (Map.Entry<String, Map<String, AtlasBusinessAttribute>> entry : entityTypeBusinessAttributes.entrySet()) {
String bmName = entry.getKey(); String bmName = entry.getKey();
...@@ -444,6 +427,8 @@ public class EntityGraphMapper { ...@@ -444,6 +427,8 @@ public class EntityGraphMapper {
} }
mapAttribute(bmAttribute, bmAttrNewValue, entityVertex, CREATE, new EntityMutationContext()); mapAttribute(bmAttribute, bmAttrNewValue, entityVertex, CREATE, new EntityMutationContext());
addToUpdatedBusinessAttributes(updatedBusinessAttributes, bmAttribute, bmAttrNewValue);
} }
} else { } else {
if (bmAttrNewValue != null) { if (bmAttrNewValue != null) {
...@@ -453,6 +438,8 @@ public class EntityGraphMapper { ...@@ -453,6 +438,8 @@ public class EntityGraphMapper {
} }
mapAttribute(bmAttribute, bmAttrNewValue, entityVertex, UPDATE, new EntityMutationContext()); mapAttribute(bmAttribute, bmAttrNewValue, entityVertex, UPDATE, new EntityMutationContext());
addToUpdatedBusinessAttributes(updatedBusinessAttributes, bmAttribute, bmAttrNewValue);
} }
} else { } else {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
...@@ -460,11 +447,17 @@ public class EntityGraphMapper { ...@@ -460,11 +447,17 @@ public class EntityGraphMapper {
} }
entityVertex.removeProperty(bmAttribute.getVertexPropertyName()); entityVertex.removeProperty(bmAttribute.getVertexPropertyName());
addToUpdatedBusinessAttributes(updatedBusinessAttributes, bmAttribute, bmAttrNewValue);
} }
} }
} }
} }
if (MapUtils.isNotEmpty(updatedBusinessAttributes)) {
entityChangeNotifier.onBusinessAttributesUpdated(AtlasGraphUtilsV2.getIdFromVertex(entityVertex), updatedBusinessAttributes);
}
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("<== setBusinessAttributes(entityVertex={}, entityType={}, businessAttributes={}", entityVertex, entityType.getTypeName(), businessAttributes); LOG.debug("<== setBusinessAttributes(entityVertex={}, entityType={}, businessAttributes={}", entityVertex, entityType.getTypeName(), businessAttributes);
} }
...@@ -479,6 +472,7 @@ public class EntityGraphMapper { ...@@ -479,6 +472,7 @@ public class EntityGraphMapper {
} }
Map<String, Map<String, AtlasBusinessAttribute>> entityTypeBusinessAttributes = entityType.getBusinessAttributes(); Map<String, Map<String, AtlasBusinessAttribute>> entityTypeBusinessAttributes = entityType.getBusinessAttributes();
Map<String, Map<String, Object>> updatedBusinessAttributes = new HashMap<>();
if (MapUtils.isNotEmpty(entityTypeBusinessAttributes) && MapUtils.isNotEmpty(businessAttributes)) { if (MapUtils.isNotEmpty(entityTypeBusinessAttributes) && MapUtils.isNotEmpty(businessAttributes)) {
for (Map.Entry<String, Map<String, AtlasBusinessAttribute>> entry : entityTypeBusinessAttributes.entrySet()) { for (Map.Entry<String, Map<String, AtlasBusinessAttribute>> entry : entityTypeBusinessAttributes.entrySet()) {
...@@ -503,16 +497,24 @@ public class EntityGraphMapper { ...@@ -503,16 +497,24 @@ public class EntityGraphMapper {
if (existingValue == null) { if (existingValue == null) {
if (bmAttrValue != null) { if (bmAttrValue != null) {
mapAttribute(bmAttribute, bmAttrValue, entityVertex, CREATE, new EntityMutationContext()); mapAttribute(bmAttribute, bmAttrValue, entityVertex, CREATE, new EntityMutationContext());
addToUpdatedBusinessAttributes(updatedBusinessAttributes, bmAttribute, bmAttrValue);
} }
} else { } else {
if (!Objects.equals(existingValue, bmAttrValue)) { if (!Objects.equals(existingValue, bmAttrValue)) {
mapAttribute(bmAttribute, bmAttrValue, entityVertex, UPDATE, new EntityMutationContext()); mapAttribute(bmAttribute, bmAttrValue, entityVertex, UPDATE, new EntityMutationContext());
addToUpdatedBusinessAttributes(updatedBusinessAttributes, bmAttribute, bmAttrValue);
} }
} }
} }
} }
} }
if (MapUtils.isNotEmpty(updatedBusinessAttributes)) {
entityChangeNotifier.onBusinessAttributesUpdated(AtlasGraphUtilsV2.getIdFromVertex(entityVertex), updatedBusinessAttributes);
}
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("<== addOrUpdateBusinessAttributes(entityVertex={}, entityType={}, businessAttributes={}", entityVertex, entityType.getTypeName(), businessAttributes); LOG.debug("<== addOrUpdateBusinessAttributes(entityVertex={}, entityType={}, businessAttributes={}", entityVertex, entityType.getTypeName(), businessAttributes);
} }
...@@ -521,12 +523,13 @@ public class EntityGraphMapper { ...@@ -521,12 +523,13 @@ public class EntityGraphMapper {
/* /*
* remove the given business attributes from the entity * remove the given business attributes from the entity
*/ */
public void removeBusinessAttributes(AtlasVertex entityVertex, AtlasEntityType entityType, Map<String, Map<String, Object>> businessAttributes) { public void removeBusinessAttributes(AtlasVertex entityVertex, AtlasEntityType entityType, Map<String, Map<String, Object>> businessAttributes) throws AtlasBaseException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("==> removeBusinessAttributes(entityVertex={}, entityType={}, businessAttributes={}", entityVertex, entityType.getTypeName(), businessAttributes); LOG.debug("==> removeBusinessAttributes(entityVertex={}, entityType={}, businessAttributes={}", entityVertex, entityType.getTypeName(), businessAttributes);
} }
Map<String, Map<String, AtlasBusinessAttribute>> entityTypeBusinessAttributes = entityType.getBusinessAttributes(); Map<String, Map<String, AtlasBusinessAttribute>> entityTypeBusinessAttributes = entityType.getBusinessAttributes();
Map<String, Map<String, Object>> updatedBusinessAttributes = new HashMap<>();
if (MapUtils.isNotEmpty(entityTypeBusinessAttributes) && MapUtils.isNotEmpty(businessAttributes)) { if (MapUtils.isNotEmpty(entityTypeBusinessAttributes) && MapUtils.isNotEmpty(businessAttributes)) {
for (Map.Entry<String, Map<String, AtlasBusinessAttribute>> entry : entityTypeBusinessAttributes.entrySet()) { for (Map.Entry<String, Map<String, AtlasBusinessAttribute>> entry : entityTypeBusinessAttributes.entrySet()) {
...@@ -539,16 +542,22 @@ public class EntityGraphMapper { ...@@ -539,16 +542,22 @@ public class EntityGraphMapper {
Map<String, Object> entityBmAttributes = businessAttributes.get(bmName); Map<String, Object> entityBmAttributes = businessAttributes.get(bmName);
for (AtlasBusinessAttribute bmttribute : bmAttributes.values()) { for (AtlasBusinessAttribute bmAttribute : bmAttributes.values()) {
// if (entityBmAttributes is empty) remove all attributes in this business-metadata // if (entityBmAttributes is empty) remove all attributes in this business-metadata
// else remove the attribute only if its given in entityBmAttributes // else remove the attribute only if its given in entityBmAttributes
if (MapUtils.isEmpty(entityBmAttributes) || entityBmAttributes.containsKey(bmttribute.getName())) { if (MapUtils.isEmpty(entityBmAttributes) || entityBmAttributes.containsKey(bmAttribute.getName())) {
entityVertex.removeProperty(bmttribute.getVertexPropertyName()); entityVertex.removeProperty(bmAttribute.getVertexPropertyName());
addToUpdatedBusinessAttributes(updatedBusinessAttributes, bmAttribute, null);
} }
} }
} }
} }
if (MapUtils.isNotEmpty(updatedBusinessAttributes)) {
entityChangeNotifier.onBusinessAttributesUpdated(AtlasGraphUtilsV2.getIdFromVertex(entityVertex), updatedBusinessAttributes);
}
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("<== removeBusinessAttributes(entityVertex={}, entityType={}, businessAttributes={}", entityVertex, entityType.getTypeName(), businessAttributes); LOG.debug("<== removeBusinessAttributes(entityVertex={}, entityType={}, businessAttributes={}", entityVertex, entityType.getTypeName(), businessAttributes);
} }
...@@ -2559,4 +2568,35 @@ public class EntityGraphMapper { ...@@ -2559,4 +2568,35 @@ public class EntityGraphMapper {
return propagatedEntities; return propagatedEntities;
} }
private void updateLabels(AtlasVertex vertex, Set<String> labels) {
if (CollectionUtils.isNotEmpty(labels)) {
AtlasGraphUtilsV2.setEncodedProperty(vertex, LABELS_PROPERTY_KEY, getLabelString(labels));
} else {
vertex.removeProperty(LABELS_PROPERTY_KEY);
}
}
private String getLabelString(Collection<String> labels) {
String ret = null;
if (!labels.isEmpty()) {
ret = LABEL_NAME_DELIMITER + String.join(LABEL_NAME_DELIMITER, labels) + LABEL_NAME_DELIMITER;
}
return ret;
}
private void addToUpdatedBusinessAttributes(Map<String, Map<String, Object>> updatedBusinessAttributes, AtlasBusinessAttribute bmAttribute, Object attrValue) {
String bmName = bmAttribute.getDefinedInType().getTypeName();
Map<String, Object> attributes = updatedBusinessAttributes.get(bmName);
if(attributes == null){
attributes = new HashMap<>();
updatedBusinessAttributes.put(bmName, attributes);
}
attributes.put(bmAttribute.getName(), attrValue);
}
} }
...@@ -27,6 +27,7 @@ import org.apache.atlas.model.instance.EntityMutationResponse; ...@@ -27,6 +27,7 @@ import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.notification.EntityNotification; import org.apache.atlas.model.notification.EntityNotification;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
public interface IAtlasEntityChangeNotifier { public interface IAtlasEntityChangeNotifier {
...@@ -51,4 +52,6 @@ public interface IAtlasEntityChangeNotifier { ...@@ -51,4 +52,6 @@ public interface IAtlasEntityChangeNotifier {
void notifyPropagatedEntities() throws AtlasBaseException; void notifyPropagatedEntities() throws AtlasBaseException;
void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException; void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException;
void onBusinessAttributesUpdated(String entityGuid, Map<String, Map<String, Object>> updatedBusinessAttributes) throws AtlasBaseException;
} }
...@@ -28,6 +28,7 @@ import org.apache.atlas.model.notification.EntityNotification; ...@@ -28,6 +28,7 @@ import org.apache.atlas.model.notification.EntityNotification;
import org.apache.atlas.repository.store.graph.v2.IAtlasEntityChangeNotifier; import org.apache.atlas.repository.store.graph.v2.IAtlasEntityChangeNotifier;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
public class EntityChangeNotifierNop implements IAtlasEntityChangeNotifier { public class EntityChangeNotifierNop implements IAtlasEntityChangeNotifier {
...@@ -85,4 +86,9 @@ public class EntityChangeNotifierNop implements IAtlasEntityChangeNotifier { ...@@ -85,4 +86,9 @@ public class EntityChangeNotifierNop implements IAtlasEntityChangeNotifier {
public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException { public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException {
} }
@Override
public void onBusinessAttributesUpdated(String entityGuid, Map<String, Map<String, Object>> updatedBusinessAttributes) throws AtlasBaseException {
}
} }
...@@ -316,4 +316,9 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 { ...@@ -316,4 +316,9 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
public void onRelationshipsPurged(List<AtlasRelationship> relationships) throws AtlasBaseException { public void onRelationshipsPurged(List<AtlasRelationship> relationships) throws AtlasBaseException {
// do nothing -> notification not sent out for term purged from entities as its been sent in case of delete // do nothing -> notification not sent out for term purged from entities as its been sent in case of delete
} }
@Override
public void onBusinessAttributesUpdated(AtlasEntity entity, Map<String, Map<String, Object>> updatedBusinessAttributes) throws AtlasBaseException{
// do nothing -> notification not sent out for business metadata attribute updation from entities
}
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment