Commit 4feee3bf by Sarath Subramanian

ATLAS-1751: Implement REST endpoint to support update of classification attribute

parent 2615b308
...@@ -28,8 +28,8 @@ import java.util.Objects; ...@@ -28,8 +28,8 @@ import java.util.Objects;
*/ */
public class EntityAuditEvent { public class EntityAuditEvent {
public enum EntityAuditAction { public enum EntityAuditAction {
ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, TAG_ADD, TAG_DELETE, ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, TAG_ADD, TAG_DELETE, TAG_UPDATE,
ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE,
} }
private String entityId; private String entityId;
......
...@@ -27,6 +27,7 @@ define(['require'], function(require) { ...@@ -27,6 +27,7 @@ define(['require'], function(require) {
ENTITY_DELETE: "Entity Deleted", ENTITY_DELETE: "Entity Deleted",
TAG_ADD: "Tag Added", TAG_ADD: "Tag Added",
TAG_DELETE: "Tag Deleted", TAG_DELETE: "Tag Deleted",
TAG_UPDATE: "Tag Updated",
ENTITY_IMPORT_CREATE: "Entity Created by import", ENTITY_IMPORT_CREATE: "Entity Created by import",
ENTITY_IMPORT_UPDATE: "Entity Updated by import", ENTITY_IMPORT_UPDATE: "Entity Updated by import",
ENTITY_IMPORT_DELETE: "Entity Deleted by import" ENTITY_IMPORT_DELETE: "Entity Deleted by import"
......
...@@ -516,6 +516,7 @@ public final class TestUtilsV2 { ...@@ -516,6 +516,7 @@ public final class TestUtilsV2 {
public static final String TABLE_NAME = "bar"; public static final String TABLE_NAME = "bar";
public static final String CLASSIFICATION = "classification"; public static final String CLASSIFICATION = "classification";
public static final String PII = "PII"; public static final String PII = "PII";
public static final String PHI = "PHI";
public static final String SUPER_TYPE_NAME = "Base"; public static final String SUPER_TYPE_NAME = "Base";
public static final String STORAGE_DESC_TYPE = "hive_storagedesc"; public static final String STORAGE_DESC_TYPE = "hive_storagedesc";
public static final String PARTITION_STRUCT_TYPE = "partition_struct_type"; public static final String PARTITION_STRUCT_TYPE = "partition_struct_type";
...@@ -787,9 +788,14 @@ public final class TestUtilsV2 { ...@@ -787,9 +788,14 @@ public final class TestUtilsV2 {
AtlasTypeUtil.createTraitTypeDef("fetl" + CLASSIFICATION, "fetl" + CLASSIFICATION + _description, ImmutableSet.of(CLASSIFICATION), AtlasTypeUtil.createTraitTypeDef("fetl" + CLASSIFICATION, "fetl" + CLASSIFICATION + _description, ImmutableSet.of(CLASSIFICATION),
AtlasTypeUtil.createRequiredAttrDef("tag", "string")); AtlasTypeUtil.createRequiredAttrDef("tag", "string"));
AtlasClassificationDef phiTypeDefinition = AtlasTypeUtil.createTraitTypeDef(PHI, PHI + _description, ImmutableSet.<String>of(),
AtlasTypeUtil.createRequiredAttrDef("stringAttr", "string"),
AtlasTypeUtil.createRequiredAttrDef("booleanAttr", "boolean"),
AtlasTypeUtil.createRequiredAttrDef("integerAttr", "int"));
return AtlasTypeUtil.getTypesDef(ImmutableList.of(enumTypeDefinition), return AtlasTypeUtil.getTypesDef(ImmutableList.of(enumTypeDefinition),
ImmutableList.of(structTypeDefinition, partitionDefinition), ImmutableList.of(structTypeDefinition, partitionDefinition),
ImmutableList.of(classificationTypeDefinition, fetlClassificationTypeDefinition, piiTypeDefinition), ImmutableList.of(classificationTypeDefinition, fetlClassificationTypeDefinition, piiTypeDefinition, phiTypeDefinition),
ImmutableList.of(superTypeDefinition, databaseTypeDefinition, columnsDefinition, tableTypeDefinition, ImmutableList.of(superTypeDefinition, databaseTypeDefinition, columnsDefinition, tableTypeDefinition,
storageDescClsDef, partClsDef, processClsType)); storageDescClsDef, partClsDef, processClsType));
} }
......
...@@ -35,7 +35,8 @@ public interface EntityNotification { ...@@ -35,7 +35,8 @@ public interface EntityNotification {
ENTITY_UPDATE, ENTITY_UPDATE,
ENTITY_DELETE, ENTITY_DELETE,
TRAIT_ADD, TRAIT_ADD,
TRAIT_DELETE TRAIT_DELETE,
TRAIT_UPDATE
} }
......
...@@ -100,6 +100,18 @@ public class EntityAuditListener implements EntityChangeListener { ...@@ -100,6 +100,18 @@ public class EntityAuditListener implements EntityChangeListener {
} }
@Override @Override
public void onTraitsUpdated(ITypedReferenceableInstance entity, Collection<? extends IStruct> traits) throws AtlasException {
if (traits != null) {
for (IStruct trait : traits) {
EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_UPDATE,
"Updated trait: " + InstanceSerialization.toJson(trait, true));
auditRepository.putEvents(event);
}
}
}
@Override
public void onEntitiesDeleted(Collection<ITypedReferenceableInstance> entities, boolean isImport) throws AtlasException { public void onEntitiesDeleted(Collection<ITypedReferenceableInstance> entities, boolean isImport) throws AtlasException {
List<EntityAuditEvent> events = new ArrayList<>(); List<EntityAuditEvent> events = new ArrayList<>();
for (ITypedReferenceableInstance entity : entities) { for (ITypedReferenceableInstance entity : entities) {
...@@ -279,6 +291,9 @@ public class EntityAuditListener implements EntityChangeListener { ...@@ -279,6 +291,9 @@ public class EntityAuditListener implements EntityChangeListener {
case TAG_DELETE: case TAG_DELETE:
ret = "Deleted trait: "; ret = "Deleted trait: ";
break; break;
case TAG_UPDATE:
ret = "Updated trait: ";
break;
case ENTITY_IMPORT_CREATE: case ENTITY_IMPORT_CREATE:
ret = "Created by import: "; ret = "Created by import: ";
break; break;
......
...@@ -127,6 +127,11 @@ public interface AtlasEntityStore { ...@@ -127,6 +127,11 @@ public interface AtlasEntityStore {
*/ */
void addClassifications(String guid, List<AtlasClassification> classification) throws AtlasBaseException; void addClassifications(String guid, List<AtlasClassification> classification) throws AtlasBaseException;
/**
* Update classification(s)
*/
void updateClassifications(String guid, List<AtlasClassification> classifications) throws AtlasBaseException;
@GraphTransaction @GraphTransaction
void addClassification(List<String> guids, AtlasClassification classification) throws AtlasBaseException; void addClassification(List<String> guids, AtlasClassification classification) throws AtlasBaseException;
......
...@@ -125,6 +125,26 @@ public class AtlasEntityChangeNotifier { ...@@ -125,6 +125,26 @@ public class AtlasEntityChangeNotifier {
} }
} }
public void onClassificationUpdatedToEntity(String entityId, List<AtlasClassification> classifications) throws AtlasBaseException {
// Since the classification attributes are updated in the graph, we need to recursively remap the entityText
doFullTextMapping(entityId);
ITypedReferenceableInstance entity = toITypedReferenceable(entityId);
List<ITypedStruct> traits = toITypedStructs(classifications);
if (entity == null || CollectionUtils.isEmpty(traits)) {
return;
}
for (EntityChangeListener listener : entityChangeListeners) {
try {
listener.onTraitsUpdated(entity, traits);
} catch (AtlasException e) {
throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e);
}
}
}
private void notifyListeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException { private void notifyListeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException {
if (CollectionUtils.isEmpty(entityHeaders)) { if (CollectionUtils.isEmpty(entityHeaders)) {
return; return;
......
...@@ -441,6 +441,51 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { ...@@ -441,6 +441,51 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
@Override @Override
@GraphTransaction @GraphTransaction
public void updateClassifications(String guid, List<AtlasClassification> newClassifications) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Updating classifications={} for entity={}", newClassifications, guid);
}
if (StringUtils.isEmpty(guid)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid not specified");
}
if (CollectionUtils.isEmpty(newClassifications)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified");
}
EntityGraphMapper graphMapper = new EntityGraphMapper(deleteHandler, typeRegistry);
List<AtlasClassification> updatedClassifications = new ArrayList<>();
for (AtlasClassification newClassification : newClassifications) {
String classificationName = newClassification.getTypeName();
AtlasClassification oldClassification = getClassification(guid, classificationName);
if (oldClassification == null) {
throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationName);
}
validateAndNormalizeForUpdate(newClassification);
Map<String, Object> newAttrs = newClassification.getAttributes();
if (MapUtils.isNotEmpty(newAttrs)) {
for (String attrName : newAttrs.keySet()) {
oldClassification.setAttribute(attrName, newAttrs.get(attrName));
}
}
graphMapper.updateClassification(new EntityMutationContext(), guid, oldClassification);
updatedClassifications.add(oldClassification);
}
// notify listeners on update to classifications
entityChangeNotifier.onClassificationUpdatedToEntity(guid, updatedClassifications);
}
@Override
@GraphTransaction
public void addClassification(final List<String> guids, final AtlasClassification classification) throws AtlasBaseException { public void addClassification(final List<String> guids, final AtlasClassification classification) throws AtlasBaseException {
if (CollectionUtils.isEmpty(guids)) { if (CollectionUtils.isEmpty(guids)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified"); throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified");
...@@ -514,7 +559,6 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { ...@@ -514,7 +559,6 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
return graphRetriever.getClassification(guid, classificationName); return graphRetriever.getClassification(guid, classificationName);
} }
private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, EntityGraphMapper entityGraphMapper, boolean isPartialUpdate) throws AtlasBaseException { private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, EntityGraphMapper entityGraphMapper, boolean isPartialUpdate) throws AtlasBaseException {
EntityGraphDiscovery graphDiscoverer = new AtlasEntityGraphDiscoveryV1(typeRegistry, entityStream); EntityGraphDiscovery graphDiscoverer = new AtlasEntityGraphDiscoveryV1(typeRegistry, entityStream);
EntityGraphDiscoveryContext discoveryContext = graphDiscoverer.discoverEntities(); EntityGraphDiscoveryContext discoveryContext = graphDiscoverer.discoverEntities();
...@@ -607,6 +651,24 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { ...@@ -607,6 +651,24 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
type.getNormalizedValue(classification); type.getNormalizedValue(classification);
} }
private void validateAndNormalizeForUpdate(AtlasClassification classification) throws AtlasBaseException {
AtlasClassificationType type = typeRegistry.getClassificationTypeByName(classification.getTypeName());
if (type == null) {
throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classification.getTypeName());
}
List<String> messages = new ArrayList<>();
type.validateValueForUpdate(classification, classification.getTypeName(), messages);
if (!messages.isEmpty()) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, messages);
}
type.getNormalizedValueForUpdate(classification);
}
/** /**
* Validate if classification is not already associated with the entities * Validate if classification is not already associated with the entities
* @param guid unique entity id * @param guid unique entity id
......
...@@ -941,6 +941,35 @@ public class EntityGraphMapper { ...@@ -941,6 +941,35 @@ public class EntityGraphMapper {
} }
} }
public void updateClassification(final EntityMutationContext context, String guid, AtlasClassification classification)
throws AtlasBaseException {
AtlasVertex instanceVertex = AtlasGraphUtilsV1.findByGuid(guid);
if (instanceVertex == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
}
String entityTypeName = AtlasGraphUtilsV1.getTypeName(instanceVertex);
final AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName);
if (LOG.isDebugEnabled()) {
LOG.debug("Updating classification {} for entity {}", classification, guid);
}
// get the classification vertex from entity
String relationshipLabel = GraphHelper.getTraitLabel(entityTypeName, classification.getTypeName());
AtlasEdge classificationEdge = graphHelper.getEdgeForLabel(instanceVertex, relationshipLabel);
AtlasVertex classificationVertex = classificationEdge.getInVertex();
if (LOG.isDebugEnabled()) {
LOG.debug("updating vertex {} for trait {}", string(classificationVertex), classification.getTypeName());
}
mapClassification(EntityOperation.UPDATE, context, classification, entityType, instanceVertex, classificationVertex);
}
private AtlasEdge mapClassification(EntityOperation operation, final EntityMutationContext context, AtlasClassification classification, AtlasEntityType entityType, AtlasVertex parentInstanceVertex, AtlasVertex traitInstanceVertex) private AtlasEdge mapClassification(EntityOperation operation, final EntityMutationContext context, AtlasClassification classification, AtlasEntityType entityType, AtlasVertex parentInstanceVertex, AtlasVertex traitInstanceVertex)
throws AtlasBaseException { throws AtlasBaseException {
......
...@@ -1268,6 +1268,11 @@ public class DefaultMetadataServiceTest { ...@@ -1268,6 +1268,11 @@ public class DefaultMetadataServiceTest {
} }
@Override @Override
public void onTraitsUpdated(ITypedReferenceableInstance entity, Collection<? extends IStruct> traits)
throws AtlasException {
}
@Override
public void onEntitiesDeleted(Collection<ITypedReferenceableInstance> entities, boolean isImport) public void onEntitiesDeleted(Collection<ITypedReferenceableInstance> entities, boolean isImport)
throws AtlasException { throws AtlasException {
deletedEntities.clear(); deletedEntities.clear();
......
...@@ -67,6 +67,16 @@ public interface EntityChangeListener { ...@@ -67,6 +67,16 @@ public interface EntityChangeListener {
* @throws AtlasException if the listener notification fails * @throws AtlasException if the listener notification fails
*/ */
void onTraitsDeleted(ITypedReferenceableInstance entity, Collection<String> traitNames) throws AtlasException; void onTraitsDeleted(ITypedReferenceableInstance entity, Collection<String> traitNames) throws AtlasException;
/**
* This is upon updating a trait from a typed instance.
*
* @param entity the entity
* @param traits trait that needs to be added to entity
*
* @throws AtlasException if the listener notification fails
*/
void onTraitsUpdated(ITypedReferenceableInstance entity, Collection<? extends IStruct> traits) throws AtlasException;
/** /**
* This is upon deleting entities from the repository. * This is upon deleting entities from the repository.
......
...@@ -97,6 +97,11 @@ public class NotificationEntityChangeListener implements EntityChangeListener { ...@@ -97,6 +97,11 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
} }
@Override @Override
public void onTraitsUpdated(ITypedReferenceableInstance entity, Collection<? extends IStruct> traits) throws AtlasException {
notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.TRAIT_UPDATE);
}
@Override
public void onEntitiesDeleted(Collection<ITypedReferenceableInstance> entities, boolean isImport) throws AtlasException { public void onEntitiesDeleted(Collection<ITypedReferenceableInstance> entities, boolean isImport) throws AtlasException {
notifyOfEntityEvent(entities, EntityNotification.OperationType.ENTITY_DELETE); notifyOfEntityEvent(entities, EntityNotification.OperationType.ENTITY_DELETE);
} }
......
...@@ -353,6 +353,33 @@ public class EntityREST { ...@@ -353,6 +353,33 @@ public class EntityREST {
} }
/** /**
* Updates classifications to an existing entity represented by a guid.
* @param guid globally unique identifier for the entity
* @return classification for the given entity guid
*/
@PUT
@Path("/guid/{guid}/classifications")
@Produces(Servlets.JSON_MEDIA_TYPE)
public void updateClassification(@PathParam("guid") final String guid, List<AtlasClassification> classifications) throws AtlasBaseException {
AtlasPerfTracer perf = null;
try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityREST.updateClassification(" + guid + ")");
}
if (StringUtils.isEmpty(guid)) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
}
entitiesStore.updateClassifications(guid, classifications);
} finally {
AtlasPerfTracer.log(perf);
}
}
/**
* Deletes a given classification from an existing entity represented by a guid. * Deletes a given classification from an existing entity represented by a guid.
* @param guid globally unique identifier for the entity * @param guid globally unique identifier for the entity
* @param classificationName name of the classifcation * @param classificationName name of the classifcation
......
...@@ -22,6 +22,7 @@ import org.apache.atlas.RequestContext; ...@@ -22,6 +22,7 @@ import org.apache.atlas.RequestContext;
import org.apache.atlas.RequestContextV1; import org.apache.atlas.RequestContextV1;
import org.apache.atlas.TestUtilsV2; import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasClassification.AtlasClassifications;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
...@@ -44,6 +45,7 @@ import org.testng.annotations.Test; ...@@ -44,6 +45,7 @@ import org.testng.annotations.Test;
import javax.inject.Inject; import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -60,6 +62,8 @@ public class TestEntityREST { ...@@ -60,6 +62,8 @@ public class TestEntityREST {
private AtlasEntity dbEntity; private AtlasEntity dbEntity;
private AtlasClassification testClassification; private AtlasClassification testClassification;
private AtlasClassification phiClassification;
@BeforeClass @BeforeClass
public void setUp() throws Exception { public void setUp() throws Exception {
...@@ -123,7 +127,59 @@ public class TestEntityREST { ...@@ -123,7 +127,59 @@ public class TestEntityREST {
Assert.assertNotNull(retrievedClassification); Assert.assertNotNull(retrievedClassification);
Assert.assertEquals(retrievedClassification, testClassification); Assert.assertEquals(retrievedClassification, testClassification);
}
@Test(dependsOnMethods = "testGetEntityById")
public void testAddAndUpdateClassificationWithAttributes() throws Exception {
phiClassification = new AtlasClassification(TestUtilsV2.PHI, new HashMap<String, Object>() {{
put("stringAttr", "sample_string");
put("booleanAttr", true);
put("integerAttr", 100);
}});
testClassification = new AtlasClassification(TestUtilsV2.CLASSIFICATION, new HashMap<String, Object>() {{
put("tag", "tagName");
}});
entityREST.addClassifications(dbEntity.getGuid(), new ArrayList<>(Arrays.asList(phiClassification)));
final AtlasClassifications retrievedClassifications = entityREST.getClassifications(dbEntity.getGuid());
Assert.assertNotNull(retrievedClassifications);
final List<AtlasClassification> retrievedClassificationsList = retrievedClassifications.getList();
Assert.assertNotNull(retrievedClassificationsList);
final AtlasClassification retrievedClassification = entityREST.getClassification(dbEntity.getGuid(), TestUtilsV2.PHI);
Assert.assertNotNull(retrievedClassification);
Assert.assertEquals(retrievedClassification, phiClassification);
for (String attrName : retrievedClassification.getAttributes().keySet()) {
Assert.assertEquals(retrievedClassification.getAttribute(attrName), phiClassification.getAttribute(attrName));
}
// update multiple tags attributes
phiClassification = new AtlasClassification(TestUtilsV2.PHI, new HashMap<String, Object>() {{
put("stringAttr", "sample_string_v2");
put("integerAttr", 200);
}});
testClassification = new AtlasClassification(TestUtilsV2.CLASSIFICATION, new HashMap<String, Object>() {{
put("tag", "tagName_updated");
}});
entityREST.updateClassification(dbEntity.getGuid(), new ArrayList<>(Arrays.asList(phiClassification, testClassification)));
AtlasClassification updatedClassification = entityREST.getClassification(dbEntity.getGuid(), TestUtilsV2.PHI);
Assert.assertNotNull(updatedClassification);
Assert.assertEquals(updatedClassification.getAttribute("stringAttr"), "sample_string_v2");
Assert.assertEquals(updatedClassification.getAttribute("integerAttr"), 200);
Assert.assertEquals(updatedClassification.getAttribute("booleanAttr"), true);
updatedClassification = entityREST.getClassification(dbEntity.getGuid(), TestUtilsV2.CLASSIFICATION);
Assert.assertNotNull(updatedClassification);
Assert.assertEquals(updatedClassification.getAttribute("tag"), testClassification.getAttribute("tag"));
entityREST.deleteClassification(dbEntity.getGuid(), TestUtilsV2.PHI);
} }
@Test(dependsOnMethods = "testAddAndGetClassification") @Test(dependsOnMethods = "testAddAndGetClassification")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment