Commit 6050a8e6 by Le Ma Committed by Sarath Subramanian

ATLAS-3497 Add audit entry for adding/removing labels

parent 10a11053
...@@ -49,7 +49,7 @@ public class EntityAuditEvent implements Serializable { ...@@ -49,7 +49,7 @@ public class EntityAuditEvent implements Serializable {
ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, TAG_ADD, TAG_DELETE, TAG_UPDATE, ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, TAG_ADD, TAG_DELETE, TAG_UPDATE,
PROPAGATED_TAG_ADD, PROPAGATED_TAG_DELETE, PROPAGATED_TAG_UPDATE, PROPAGATED_TAG_ADD, PROPAGATED_TAG_DELETE, PROPAGATED_TAG_UPDATE,
ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE, ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE,
TERM_ADD, TERM_DELETE; TERM_ADD, TERM_DELETE, LABEL_ADD, LABEL_DELETE;
public static EntityAuditAction fromString(String strValue) { public static EntityAuditAction fromString(String strValue) {
switch (strValue) { switch (strValue) {
...@@ -84,6 +84,10 @@ public class EntityAuditEvent implements Serializable { ...@@ -84,6 +84,10 @@ public class EntityAuditEvent implements Serializable {
return TERM_ADD; return TERM_ADD;
case "TERM_DELETE": case "TERM_DELETE":
return TERM_DELETE; return TERM_DELETE;
case "LABEL_ADD":
return LABEL_ADD;
case "LABEL_DELETE":
return LABEL_DELETE;
} }
throw new IllegalArgumentException("No enum constant " + EntityAuditAction.class.getCanonicalName() + "." + strValue); throw new IllegalArgumentException("No enum constant " + EntityAuditAction.class.getCanonicalName() + "." + strValue);
......
...@@ -35,7 +35,9 @@ define(['require'], function(require) { ...@@ -35,7 +35,9 @@ define(['require'], function(require) {
ENTITY_IMPORT_UPDATE: "Entity Updated by import", ENTITY_IMPORT_UPDATE: "Entity Updated by import",
ENTITY_IMPORT_DELETE: "Entity Deleted by import", ENTITY_IMPORT_DELETE: "Entity Deleted by import",
TERM_ADD: "Term Added", TERM_ADD: "Term Added",
TERM_DELETE: "Term Deleted" TERM_DELETE: "Term Deleted",
LABEL_ADD: "Label Added",
LABEL_DELETE: "Label Deleted"
} }
Enums.entityStateReadOnly = { Enums.entityStateReadOnly = {
......
...@@ -35,7 +35,9 @@ define(['require'], function(require) { ...@@ -35,7 +35,9 @@ define(['require'], function(require) {
ENTITY_IMPORT_UPDATE: "Entity Updated by import", ENTITY_IMPORT_UPDATE: "Entity Updated by import",
ENTITY_IMPORT_DELETE: "Entity Deleted by import", ENTITY_IMPORT_DELETE: "Entity Deleted by import",
TERM_ADD: "Term Added", TERM_ADD: "Term Added",
TERM_DELETE: "Term Deleted" TERM_DELETE: "Term Deleted",
LABEL_ADD: "Label Added",
LABEL_DELETE: "Label Deleted"
} }
Enums.entityStateReadOnly = { Enums.entityStateReadOnly = {
......
...@@ -26,6 +26,7 @@ import org.apache.atlas.model.instance.AtlasRelatedObjectId; ...@@ -26,6 +26,7 @@ import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.instance.AtlasRelationship; import org.apache.atlas.model.instance.AtlasRelationship;
import java.util.List; import java.util.List;
import java.util.Set;
/** /**
* Entity change notification listener V2. * Entity change notification listener V2.
...@@ -121,4 +122,22 @@ public interface EntityChangeListenerV2 { ...@@ -121,4 +122,22 @@ public interface EntityChangeListenerV2 {
* @param isImport * @param isImport
*/ */
void onRelationshipsDeleted(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException; void onRelationshipsDeleted(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException;
/**
* This is upon add new labels to an entity.
*
* @param entity the entity
* @param labels labels that needs to be added to an entity
* @throws AtlasBaseException if the listener notification fails
*/
void onLabelsAdded(AtlasEntity entity, Set<String> labels) throws AtlasBaseException;
/**
* This is upon deleting labels from an entity.
*
* @param entity the entity
* @param labels labels that needs to be deleted for an entity
* @throws AtlasBaseException if the listener notification fails
*/
void onLabelsDeleted(AtlasEntity entity, Set<String> labels) throws AtlasBaseException;
} }
\ No newline at end of file
...@@ -50,7 +50,7 @@ public class EntityAuditEventV2 implements Serializable { ...@@ -50,7 +50,7 @@ public class EntityAuditEventV2 implements Serializable {
ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE, ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE,
CLASSIFICATION_ADD, CLASSIFICATION_DELETE, CLASSIFICATION_UPDATE, CLASSIFICATION_ADD, CLASSIFICATION_DELETE, CLASSIFICATION_UPDATE,
PROPAGATED_CLASSIFICATION_ADD, PROPAGATED_CLASSIFICATION_DELETE, PROPAGATED_CLASSIFICATION_UPDATE, PROPAGATED_CLASSIFICATION_ADD, PROPAGATED_CLASSIFICATION_DELETE, PROPAGATED_CLASSIFICATION_UPDATE,
TERM_ADD, TERM_DELETE; TERM_ADD, TERM_DELETE, LABEL_ADD, LABEL_DELETE;
public static EntityAuditActionV2 fromString(String strValue) { public static EntityAuditActionV2 fromString(String strValue) {
switch (strValue) { switch (strValue) {
...@@ -85,6 +85,10 @@ public class EntityAuditEventV2 implements Serializable { ...@@ -85,6 +85,10 @@ public class EntityAuditEventV2 implements Serializable {
return TERM_ADD; return TERM_ADD;
case "TERM_DELETE": case "TERM_DELETE":
return TERM_DELETE; return TERM_DELETE;
case "LABEL_ADD":
return LABEL_ADD;
case "LABEL_DELETE":
return LABEL_DELETE;
} }
throw new IllegalArgumentException("No enum constant " + EntityAuditActionV2.class.getCanonicalName() + "." + strValue); throw new IllegalArgumentException("No enum constant " + EntityAuditActionV2.class.getCanonicalName() + "." + strValue);
......
...@@ -47,6 +47,7 @@ import java.util.ArrayList; ...@@ -47,6 +47,7 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_ADD; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_ADD;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_DELETE; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_DELETE;
...@@ -57,6 +58,8 @@ import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV ...@@ -57,6 +58,8 @@ import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_DELETE; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_DELETE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_UPDATE; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_UPDATE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_UPDATE; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_UPDATE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.LABEL_ADD;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.LABEL_DELETE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_ADD; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_ADD;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_DELETE; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_DELETE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_UPDATE; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_UPDATE;
...@@ -241,6 +244,40 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 { ...@@ -241,6 +244,40 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
} }
} }
@Override
public void onLabelsAdded(AtlasEntity entity, Set<String> labels) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(labels)) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEventV2> events = new ArrayList<>();
String addedLabels = StringUtils.join(labels, " ");
events.add(createEvent(entity, LABEL_ADD, "Added labels: " + addedLabels));
auditRepository.putEventsV2(events);
RequestContext.get().endMetricRecord(metric);
}
}
@Override
public void onLabelsDeleted(AtlasEntity entity, Set<String> labels) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(labels)) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEventV2> events = new ArrayList<>();
String deletedLabels = StringUtils.join(labels, " ");
events.add(createEvent(entity, LABEL_DELETE, "Deleted labels: " + deletedLabels));
auditRepository.putEventsV2(events);
RequestContext.get().endMetricRecord(metric);
}
}
private EntityAuditEventV2 createEvent(AtlasEntity entity, EntityAuditActionV2 action, String details) { private EntityAuditEventV2 createEvent(AtlasEntity entity, EntityAuditActionV2 action, String details) {
return new EntityAuditEventV2(entity.getGuid(), RequestContext.get().getRequestTime(), return new EntityAuditEventV2(entity.getGuid(), RequestContext.get().getRequestTime(),
RequestContext.get().getUser(), action, details, entity); RequestContext.get().getUser(), action, details, entity);
......
...@@ -394,6 +394,10 @@ public class AtlasInstanceConverter { ...@@ -394,6 +394,10 @@ public class AtlasInstanceConverter {
return EntityAuditEvent.EntityAuditAction.PROPAGATED_TAG_DELETE; return EntityAuditEvent.EntityAuditAction.PROPAGATED_TAG_DELETE;
case PROPAGATED_CLASSIFICATION_UPDATE: case PROPAGATED_CLASSIFICATION_UPDATE:
return EntityAuditEvent.EntityAuditAction.PROPAGATED_TAG_UPDATE; return EntityAuditEvent.EntityAuditAction.PROPAGATED_TAG_UPDATE;
case LABEL_ADD:
return EntityAuditEvent.EntityAuditAction.LABEL_ADD;
case LABEL_DELETE:
return EntityAuditEvent.EntityAuditAction.LABEL_DELETE;
} }
return null; return null;
......
...@@ -1851,16 +1851,10 @@ public final class GraphHelper { ...@@ -1851,16 +1851,10 @@ public final class GraphHelper {
} }
private static Set<String> parseLabelsString(String labels) { private static Set<String> parseLabelsString(String labels) {
Set<String> ret = null; Set<String> ret = new HashSet<>();
if (StringUtils.isNotEmpty(labels)) { if (StringUtils.isNotEmpty(labels)) {
ret = new HashSet<>(); ret.addAll(Arrays.asList(StringUtils.split(labels, "\\" + LABEL_NAME_DELIMITER)));
for (String label : labels.split("\\" + LABEL_NAME_DELIMITER)) {
if (StringUtils.isNotEmpty(label)) {
ret.add(label);
}
}
} }
return ret; return ret;
......
...@@ -256,6 +256,19 @@ public class AtlasEntityChangeNotifier { ...@@ -256,6 +256,19 @@ public class AtlasEntityChangeNotifier {
} }
} }
public void onLabelsUpdatedFromEntity(String entityGuid, Set<String> addedLabels, Set<String> deletedLabels) throws AtlasBaseException {
doFullTextMapping(entityGuid);
if (isV2EntityNotificationEnabled) {
AtlasEntity entity = instanceConverter.getAndCacheEntity(entityGuid);
for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
listener.onLabelsDeleted(entity, deletedLabels);
listener.onLabelsAdded(entity, addedLabels);
}
}
}
public void notifyPropagatedEntities() throws AtlasBaseException { public void notifyPropagatedEntities() throws AtlasBaseException {
RequestContext context = RequestContext.get(); RequestContext context = RequestContext.get();
Map<String, List<AtlasClassification>> addedPropagations = context.getAddedPropagations(); Map<String, List<AtlasClassification>> addedPropagations = context.getAddedPropagations();
......
...@@ -92,6 +92,7 @@ import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdg ...@@ -92,6 +92,7 @@ import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdg
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertex; import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertex;
import static org.apache.atlas.repository.graph.GraphHelper.getDefaultRemovePropagations; import static org.apache.atlas.repository.graph.GraphHelper.getDefaultRemovePropagations;
import static org.apache.atlas.repository.graph.GraphHelper.getDelimitedClassificationNames; import static org.apache.atlas.repository.graph.GraphHelper.getDelimitedClassificationNames;
import static org.apache.atlas.repository.graph.GraphHelper.getLabels;
import static org.apache.atlas.repository.graph.GraphHelper.getMapElementsProperty; import static org.apache.atlas.repository.graph.GraphHelper.getMapElementsProperty;
import static org.apache.atlas.repository.graph.GraphHelper.getStatus; import static org.apache.atlas.repository.graph.GraphHelper.getStatus;
import static org.apache.atlas.repository.graph.GraphHelper.getTraitLabel; import static org.apache.atlas.repository.graph.GraphHelper.getTraitLabel;
...@@ -331,15 +332,28 @@ public class EntityGraphMapper { ...@@ -331,15 +332,28 @@ public class EntityGraphMapper {
} }
} }
public void setLabels(AtlasVertex vertex, Set<String> labels) { public void setLabels(AtlasVertex vertex, Set<String> labels) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(labels)) { final Set<String> currentLabels = getLabels(vertex);
AtlasGraphUtilsV2.setEncodedProperty(vertex, LABELS_PROPERTY_KEY, getLabelString(labels)); final Set<String> addedLabels;
final Set<String> removedLabels;
if (CollectionUtils.isEmpty(currentLabels)) {
addedLabels = labels;
removedLabels = null;
} else if (CollectionUtils.isEmpty(labels)) {
addedLabels = null;
removedLabels = labels;
} else { } else {
vertex.removeProperty(LABELS_PROPERTY_KEY); addedLabels = new HashSet<String>(CollectionUtils.subtract(labels, currentLabels));
removedLabels = new HashSet<String>(CollectionUtils.subtract(currentLabels, labels));
} }
updateLabels(vertex, labels);
entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), addedLabels, removedLabels);
} }
public void addLabels(AtlasVertex vertex, Set<String> labels) { public void addLabels(AtlasVertex vertex, Set<String> labels) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(labels)) { if (CollectionUtils.isNotEmpty(labels)) {
final Set<String> existingLabels = GraphHelper.getLabels(vertex); final Set<String> existingLabels = GraphHelper.getLabels(vertex);
final Set<String> updatedLabels; final Set<String> updatedLabels;
...@@ -347,25 +361,40 @@ public class EntityGraphMapper { ...@@ -347,25 +361,40 @@ public class EntityGraphMapper {
if (CollectionUtils.isEmpty(existingLabels)) { if (CollectionUtils.isEmpty(existingLabels)) {
updatedLabels = labels; updatedLabels = labels;
} else { } else {
updatedLabels = existingLabels; updatedLabels = new HashSet<>(existingLabels);
updatedLabels.addAll(labels); updatedLabels.addAll(labels);
} }
if (!updatedLabels.equals(existingLabels)) {
setLabels(vertex, updatedLabels); updateLabels(vertex, updatedLabels);
updatedLabels.removeAll(existingLabels);
entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), updatedLabels, null);
}
} }
} }
public void removeLabels(AtlasVertex vertex, Set<String> labels) { public void removeLabels(AtlasVertex vertex, Set<String> labels) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(labels)) { if (CollectionUtils.isNotEmpty(labels)) {
final Set<String> existingLabels = GraphHelper.getLabels(vertex); final Set<String> existingLabels = GraphHelper.getLabels(vertex);
Set<String> updatedLabels = null; Set<String> updatedLabels;
if (CollectionUtils.isNotEmpty(existingLabels)) { if (CollectionUtils.isNotEmpty(existingLabels)) {
updatedLabels = existingLabels; updatedLabels = new HashSet<>(existingLabels);
updatedLabels.removeAll(labels); updatedLabels.removeAll(labels);
if (!updatedLabels.equals(existingLabels)) {
updateLabels(vertex, updatedLabels);
existingLabels.removeAll(updatedLabels);
entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), null, existingLabels);
}
}
}
} }
setLabels(vertex, updatedLabels); private void updateLabels(AtlasVertex vertex, Set<String> labels) {
if (CollectionUtils.isNotEmpty(labels)) {
AtlasGraphUtilsV2.setEncodedProperty(vertex, LABELS_PROPERTY_KEY, getLabelString(labels));
} else {
vertex.removeProperty(LABELS_PROPERTY_KEY);
} }
} }
......
...@@ -1147,19 +1147,19 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -1147,19 +1147,19 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
AtlasEntity tblEntity = getEntityFromStore(tblEntityGuid); AtlasEntity tblEntity = getEntityFromStore(tblEntityGuid);
Assert.assertNull(tblEntity.getLabels()); Assert.assertTrue(tblEntity.getLabels().isEmpty());
} }
@Test (dependsOnMethods = "clearLabelsToEntity") @Test (dependsOnMethods = "clearLabelsToEntity")
public void nullLabelsToEntity() throws AtlasBaseException { public void emptyLabelsToEntity() throws AtlasBaseException {
entityStore.setLabels(tblEntityGuid, null); entityStore.setLabels(tblEntityGuid, null);
AtlasEntity tblEntity = getEntityFromStore(tblEntityGuid); AtlasEntity tblEntity = getEntityFromStore(tblEntityGuid);
Assert.assertNull(tblEntity.getLabels()); Assert.assertTrue(tblEntity.getLabels().isEmpty());
} }
@Test (dependsOnMethods = "nullLabelsToEntity") @Test (dependsOnMethods = "emptyLabelsToEntity")
public void invalidLabelLengthToEntity() throws AtlasBaseException { public void invalidLabelLengthToEntity() throws AtlasBaseException {
Set<String> labels = new HashSet<>(); Set<String> labels = new HashSet<>();
labels.add(randomAlphanumeric(50)); labels.add(randomAlphanumeric(50));
...@@ -1173,7 +1173,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -1173,7 +1173,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
} }
@Test (dependsOnMethods = "invalidLabelLengthToEntity") @Test (dependsOnMethods = "invalidLabelLengthToEntity")
public void invalidLabelCharactersToEntity() throws AtlasBaseException { public void invalidLabelCharactersToEntity() {
Set<String> labels = new HashSet<>(); Set<String> labels = new HashSet<>();
labels.add("label-1_100_45"); labels.add("label-1_100_45");
labels.add("LABEL-1_200-55"); labels.add("LABEL-1_200-55");
...@@ -1227,6 +1227,6 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -1227,6 +1227,6 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
labels.add("label_3_add"); labels.add("label_3_add");
entityStore.removeLabels(tblEntityGuid, labels); entityStore.removeLabels(tblEntityGuid, labels);
tblEntity = getEntityFromStore(tblEntityGuid); tblEntity = getEntityFromStore(tblEntityGuid);
Assert.assertNull(tblEntity.getLabels()); Assert.assertTrue(tblEntity.getLabels().isEmpty());
} }
} }
\ No newline at end of file
...@@ -128,6 +128,16 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 { ...@@ -128,6 +128,16 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
// do nothing -> notification not sent out for term removal from entities // do nothing -> notification not sent out for term removal from entities
} }
@Override
public void onLabelsDeleted(AtlasEntity entity, Set<String> labels) throws AtlasBaseException {
// do nothing -> notification not sent out for label removal to entities
}
@Override
public void onLabelsAdded(AtlasEntity entity, Set<String> labels) throws AtlasBaseException {
// do nothing -> notification not sent out for label assignment to entities
}
private void notifyEntityEvents(List<AtlasEntity> entities, OperationType operationType) throws AtlasBaseException { private void notifyEntityEvents(List<AtlasEntity> entities, OperationType operationType) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityNotification"); MetricRecorder metric = RequestContext.get().startMetricRecord("entityNotification");
List<EntityNotificationV2> messages = new ArrayList<>(); List<EntityNotificationV2> messages = new ArrayList<>();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment