Commit a9a095e1 by Madhan Neethiraj

ATLAS-3044: fixed import to handle entity-delete

parent 0e0f217a
......@@ -117,6 +117,15 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
init();
}
public AtlasEntity(AtlasEntityHeader header) {
super(header.getTypeName(), header.getAttributes());
setGuid(header.getGuid());
setStatus(header.getStatus());
setClassifications(header.getClassifications());
setMeanings(header.getMeanings());
}
public AtlasEntity(Map map) {
super(map);
......
......@@ -104,6 +104,7 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {
public AtlasEntityHeader(AtlasEntity entity) {
super(entity.getTypeName(), entity.getAttributes());
setGuid(entity.getGuid());
setStatus(entity.getStatus());
setClassifications(entity.getClassifications());
if (CollectionUtils.isNotEmpty(entity.getClassifications())) {
......
......@@ -216,31 +216,6 @@ public class EntityMutationResponse {
}
}
@JsonIgnore
public void addEntity(EntityOperation op, AtlasObjectId entity) {
if (mutatedEntities == null) {
mutatedEntities = new HashMap<>();
} else {
// if an entity is already included in CREATE, ignore subsequent UPDATE, PARTIAL_UPDATE
if (op == EntityOperation.UPDATE || op == EntityOperation.PARTIAL_UPDATE) {
if (entityHeaderExists(getCreatedEntities(), entity.getGuid())) {
return;
}
}
}
List<AtlasEntityHeader> opEntities = mutatedEntities.get(op);
if (opEntities == null) {
opEntities = new ArrayList<>();
mutatedEntities.put(op, opEntities);
}
if (!entityHeaderExists(opEntities, entity.getGuid())) {
opEntities.add(new AtlasEntityHeader(entity.getTypeName(), entity.getGuid(), entity.getUniqueAttributes()));
}
}
private boolean entityHeaderExists(List<AtlasEntityHeader> entityHeaders, String guid) {
boolean ret = false;
......
......@@ -28,6 +28,7 @@ import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.model.typedef.AtlasRelationshipDef;
......@@ -1378,15 +1379,15 @@ public final class GraphHelper {
* Guid and AtlasVertex combo
*/
public static class VertexInfo {
private final AtlasObjectId entity;
private final AtlasEntityHeader entity;
private final AtlasVertex vertex;
public VertexInfo(AtlasObjectId entity, AtlasVertex vertex) {
public VertexInfo(AtlasEntityHeader entity, AtlasVertex vertex) {
this.entity = entity;
this.vertex = vertex;
}
public AtlasObjectId getEntity() { return entity; }
public AtlasEntityHeader getEntity() { return entity; }
public AtlasVertex getVertex() {
return vertex;
}
......
......@@ -26,6 +26,7 @@ import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
......@@ -191,7 +192,7 @@ public abstract class DeleteHandlerV1 {
continue;
}
AtlasObjectId entity = entityRetriever.toAtlasObjectId(vertex);
AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader(vertex);
String typeName = entity.getTypeName();
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
......@@ -349,7 +350,7 @@ public abstract class DeleteHandlerV1 {
AtlasGraphUtilsV2.setEncodedProperty(referencedVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, requestContext.getRequestTime());
AtlasGraphUtilsV2.setEncodedProperty(referencedVertex, MODIFIED_BY_KEY, requestContext.getUser());
requestContext.recordEntityUpdate(entityRetriever.toAtlasObjectId(referencedVertex));
requestContext.recordEntityUpdate(entityRetriever.toAtlasEntityHeader(referencedVertex));
}
}
} else {
......@@ -935,7 +936,7 @@ public abstract class DeleteHandlerV1 {
AtlasGraphUtilsV2.setEncodedProperty(outVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, requestContext.getRequestTime());
AtlasGraphUtilsV2.setEncodedProperty(outVertex, MODIFIED_BY_KEY, requestContext.getUser());
requestContext.recordEntityUpdate(entityRetriever.toAtlasObjectId(outVertex));
requestContext.recordEntityUpdate(entityRetriever.toAtlasEntityHeader(outVertex));
}
}
}
......
......@@ -56,6 +56,7 @@ import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
......@@ -95,6 +96,8 @@ public class AtlasEntityChangeNotifier {
return;
}
pruneResponse(entityMutationResponse);
List<AtlasEntityHeader> createdEntities = entityMutationResponse.getCreatedEntities();
List<AtlasEntityHeader> updatedEntities = entityMutationResponse.getUpdatedEntities();
List<AtlasEntityHeader> partiallyUpdatedEntities = entityMutationResponse.getPartialUpdatedEntities();
......@@ -442,7 +445,6 @@ public class AtlasEntityChangeNotifier {
if (CollectionUtils.isNotEmpty(entityHeaders)) {
for (AtlasEntityHeader entityHeader : entityHeaders) {
String entityGuid = entityHeader.getGuid();
String typeName = entityHeader.getTypeName();
AtlasEntityType entityType = atlasTypeRegistry.getEntityTypeByName(typeName);
......@@ -462,10 +464,10 @@ public class AtlasEntityChangeNotifier {
// delete notifications don't need all attributes. Hence the special handling for delete operation
if (operation == EntityOperation.DELETE) {
entity = new AtlasEntity(typeName, entityHeader.getAttributes());
entity.setGuid(entityGuid);
entity = new AtlasEntity(entityHeader);
} else {
String entityGuid = entityHeader.getGuid();
entity = instanceConverter.getAndCacheEntity(entityGuid);
}
......@@ -556,4 +558,81 @@ public class AtlasEntityChangeNotifier {
doFullTextMapping(Collections.singletonList(entityHeader));
}
private void pruneResponse(EntityMutationResponse resp) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> pruneResponse()");
}
List<AtlasEntityHeader> createdEntities = resp.getCreatedEntities();
List<AtlasEntityHeader> updatedEntities = resp.getUpdatedEntities();
List<AtlasEntityHeader> partialUpdatedEntities = resp.getPartialUpdatedEntities();
List<AtlasEntityHeader> deletedEntities = resp.getDeletedEntities();
// remove entities with DELETED status from created & updated lists
purgeDeletedEntities(createdEntities);
purgeDeletedEntities(updatedEntities);
purgeDeletedEntities(partialUpdatedEntities);
// remove entities deleted in this mutation from created & updated lists
if (deletedEntities != null) {
for (AtlasEntityHeader entity : deletedEntities) {
purgeEntity(entity.getGuid(), createdEntities);
purgeEntity(entity.getGuid(), updatedEntities);
purgeEntity(entity.getGuid(), partialUpdatedEntities);
}
}
// remove entities created in this mutation from updated lists
if (createdEntities != null) {
for (AtlasEntityHeader entity : createdEntities) {
purgeEntity(entity.getGuid(),updatedEntities);
purgeEntity(entity.getGuid(), partialUpdatedEntities);
}
}
// remove entities updated in this mutation from partial-updated list
if (updatedEntities != null) {
for (AtlasEntityHeader entity : updatedEntities) {
purgeEntity(entity.getGuid(), partialUpdatedEntities);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== pruneResponse()");
}
}
private void purgeDeletedEntities(List<AtlasEntityHeader> entities) {
if (entities != null) {
for (ListIterator<AtlasEntityHeader> iter = entities.listIterator(); iter.hasNext(); ) {
AtlasEntityHeader entity = iter.next();
if (entity.getStatus() == AtlasEntity.Status.DELETED) {
if (LOG.isDebugEnabled()) {
LOG.debug("purgeDeletedEntities(guid={}, status={}): REMOVED", entity.getGuid(), entity.getStatus());
}
iter.remove();
}
}
}
}
private void purgeEntity(String guid, List<AtlasEntityHeader> entities) {
if (guid != null && entities != null) {
for (ListIterator<AtlasEntityHeader> iter = entities.listIterator(); iter.hasNext(); ) {
AtlasEntityHeader entity = iter.next();
if (org.apache.commons.lang.StringUtils.equals(guid, entity.getGuid())) {
if (LOG.isDebugEnabled()) {
LOG.debug("purgeEntity(guid={}): REMOVED", entity.getGuid());
}
iter.remove();
}
}
}
}
}
......@@ -29,6 +29,7 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.*;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
......@@ -466,6 +467,10 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
@Override
@GraphTransaction
public void addClassifications(final String guid, final List<AtlasClassification> classifications) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding classifications={} to entity={}", classifications, guid);
}
if (StringUtils.isEmpty(guid)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified");
}
......@@ -474,17 +479,23 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Adding classifications={} to entity={}", classifications, guid);
AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(guid);
if (entityVertex == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
}
AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(entityVertex);
for (AtlasClassification classification : classifications) {
AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_ADD_CLASSIFICATION, entityHeader, classification),
"add classification: guid=", guid, ", classification=", classification.getTypeName());
}
EntityMutationContext context = new EntityMutationContext();
context.cacheEntity(guid, entityVertex, typeRegistry.getEntityTypeByName(entityHeader.getTypeName()));
GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
for (AtlasClassification classification : classifications) {
......@@ -494,7 +505,7 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
// validate if entity, not already associated with classifications
validateEntityAssociations(guid, classifications);
entityGraphMapper.addClassifications(new EntityMutationContext(), guid, classifications);
entityGraphMapper.addClassifications(context, guid, classifications);
}
@Override
......@@ -512,24 +523,38 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified");
}
AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(guid);
if (entityVertex == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
}
AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(entityVertex);
for (AtlasClassification classification : classifications) {
AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE_CLASSIFICATION, entityHeader, classification), "update classification: guid=", guid, ", classification=", classification.getTypeName());
}
EntityMutationContext context = new EntityMutationContext();
context.cacheEntity(guid, entityVertex, typeRegistry.getEntityTypeByName(entityHeader.getTypeName()));
GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
for (AtlasClassification classification : classifications) {
validateAndNormalize(classification);
}
entityGraphMapper.updateClassifications(new EntityMutationContext(), guid, classifications);
entityGraphMapper.updateClassifications(context, guid, classifications);
}
@Override
@GraphTransaction
public void addClassification(final List<String> guids, final AtlasClassification classification) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding classification={} to entities={}", classification, guids);
}
if (CollectionUtils.isEmpty(guids)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified");
}
......@@ -537,15 +562,21 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classification not specified");
}
EntityMutationContext context = new EntityMutationContext();
for (String guid : guids) {
AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(guid);
if (entityVertex == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
}
AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(entityVertex);
AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_ADD_CLASSIFICATION, entityHeader, classification),
"add classification: guid=", guid, ", classification=", classification.getTypeName());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Adding classification={} to entities={}", classification, guids);
context.cacheEntity(guid, entityVertex, typeRegistry.getEntityTypeByName(entityHeader.getTypeName()));
}
GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guids);
......@@ -557,7 +588,7 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
for (String guid : guids) {
validateEntityAssociations(guid, classifications);
entityGraphMapper.addClassifications(new EntityMutationContext(), guid, classifications);
entityGraphMapper.addClassifications(context, guid, classifications);
}
}
......@@ -813,7 +844,6 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
}
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
String guidVertex = AtlasGraphUtilsV2.getIdFromVertex(vertex);
if (!StringUtils.equals(guidVertex, guid)) { // if entity was found by unique attribute
......@@ -828,7 +858,6 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
//Create vertices which do not exist in the repository
if (RequestContext.get().isImportInProgress() && AtlasTypeUtil.isAssignedGuid(entity.getGuid())) {
vertex = entityGraphMapper.createVertexWithGuid(entity, entity.getGuid());
......@@ -851,6 +880,24 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
// during import, update the system attributes
if (RequestContext.get().isImportInProgress()) {
Status newStatus = entity.getStatus();
if (newStatus != null) {
Status currStatus = AtlasGraphUtilsV2.getState(vertex);
if (currStatus == Status.ACTIVE && newStatus == Status.DELETED) {
if (LOG.isDebugEnabled()) {
LOG.debug("entity-delete via import - guid={}", guid);
}
context.addEntityToDelete(vertex);
} else if (currStatus == Status.DELETED && newStatus == Status.ACTIVE) {
LOG.warn("attempt to activate deleted entity (guid={}). Ignored", guid);
entity.setStatus(currStatus);
}
}
entityGraphMapper.updateSystemAttributes(vertex, entity);
}
}
......@@ -895,11 +942,11 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
deleteDelegate.getHandler().deleteEntities(deletionCandidates); // this will update req with list of deleted/updated entities
for (AtlasObjectId entity : req.getDeletedEntities()) {
for (AtlasEntityHeader entity : req.getDeletedEntities()) {
response.addEntity(DELETE, entity);
}
for (AtlasObjectId entity : req.getUpdatedEntities()) {
for (AtlasEntityHeader entity : req.getUpdatedEntities()) {
response.addEntity(UPDATE, entity);
}
......
......@@ -139,10 +139,6 @@ public class EntityGraphMapper {
}
public void updateSystemAttributes(AtlasVertex vertex, AtlasEntity entity) {
if (entity.getStatus() != null) {
AtlasGraphUtilsV2.setEncodedProperty(vertex, STATE_PROPERTY_KEY, entity.getStatus().name());
}
if (entity.getVersion() != null) {
AtlasGraphUtilsV2.setEncodedProperty(vertex, VERSION_PROPERTY_KEY, entity.getVersion());
}
......@@ -226,13 +222,17 @@ public class EntityGraphMapper {
}
}
if (CollectionUtils.isNotEmpty(context.getEntitiesToDelete())) {
deleteDelegate.getHandler().deleteEntities(context.getEntitiesToDelete());
}
RequestContext req = RequestContext.get();
for (AtlasObjectId entity : req.getDeletedEntities()) {
for (AtlasEntityHeader entity : req.getDeletedEntities()) {
resp.addEntity(DELETE, entity);
}
for (AtlasObjectId entity : req.getUpdatedEntities()) {
for (AtlasEntityHeader entity : req.getUpdatedEntities()) {
if (isPartialUpdate) {
resp.addEntity(PARTIAL_UPDATE, entity);
}
......@@ -395,7 +395,7 @@ public class EntityGraphMapper {
switch (ctx.getAttrType().getTypeCategory()) {
case PRIMITIVE:
case ENUM:
return mapPrimitiveValue(ctx);
return mapPrimitiveValue(ctx, context);
case STRUCT: {
String edgeLabel = AtlasGraphUtilsV2.getEdgeLabel(ctx.getVertexProperty());
......@@ -577,7 +577,7 @@ public class EntityGraphMapper {
if (!requestContext.isDeletedEntity(GraphHelper.getGuid(inverseVertex))) {
updateModificationMetadata(inverseVertex);
requestContext.recordEntityUpdate(entityRetriever.toAtlasObjectId(inverseVertex));
requestContext.recordEntityUpdate(entityRetriever.toAtlasEntityHeader(inverseVertex));
}
}
}
......@@ -664,7 +664,7 @@ public class EntityGraphMapper {
return ret;
}
private Object mapPrimitiveValue(AttributeMutationContext ctx) {
private Object mapPrimitiveValue(AttributeMutationContext ctx, EntityMutationContext context) {
boolean isIndexableStrAttr = ctx.getAttributeDef().getIsIndexable() && ctx.getAttrType() instanceof AtlasBuiltInTypes.AtlasStringType;
Object ret = ctx.getValue();
......@@ -708,7 +708,7 @@ public class EntityGraphMapper {
String uniqPropName = ctx.getAttribute() != null ? ctx.getAttribute().getVertexUniquePropertyName() : null;
if (uniqPropName != null) {
if (AtlasGraphUtilsV2.getState(ctx.getReferringVertex()) == DELETED) {
if (context.isDeletedEntity(ctx.getReferringVertex()) || AtlasGraphUtilsV2.getState(ctx.getReferringVertex()) == DELETED) {
ctx.getReferringVertex().removeProperty(uniqPropName);
} else {
AtlasGraphUtilsV2.setEncodedProperty(ctx.getReferringVertex(), uniqPropName, ret);
......@@ -1355,6 +1355,7 @@ public class EntityGraphMapper {
AtlasEntityHeader header = new AtlasEntityHeader(entity.getTypeName());
header.setGuid(getIdFromVertex(vertex));
header.setStatus(entity.getStatus());
for (AtlasAttribute attribute : type.getUniqAttributes().values()) {
header.setAttribute(attribute.getName(), entity.getAttribute(attribute.getName()));
......@@ -1363,10 +1364,6 @@ public class EntityGraphMapper {
return header;
}
public static AtlasEntityHeader constructHeader(AtlasObjectId id) {
return new AtlasEntityHeader(id.getTypeName(), id.getGuid(), id.getUniqueAttributes());
}
private void updateInConsistentOwnedMapVertices(AttributeMutationContext ctx, AtlasMapType mapType, Object val) {
if (mapType.getValueType().getTypeCategory() == TypeCategory.OBJECT_ID_TYPE && !ctx.getAttributeDef().isSoftReferenced()) {
AtlasEdge edge = (AtlasEdge) val;
......@@ -1382,14 +1379,8 @@ public class EntityGraphMapper {
public void addClassifications(final EntityMutationContext context, String guid, List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications)) {
AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(guid);
if (entityVertex == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
}
final String entityTypeName = AtlasGraphUtilsV2.getTypeName(entityVertex);
final AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName);
final AtlasVertex entityVertex = context.getVertex(guid);
final AtlasEntityType entityType = context.getType(guid);
List<AtlasVertex> entitiesToPropagateTo = null;
Map<AtlasVertex, List<AtlasClassification>> propagations = null;
List<AtlasClassification> addClassifications = new ArrayList<>(classifications.size());
......@@ -1434,7 +1425,7 @@ public class EntityGraphMapper {
// ignore propagated classifications
if (LOG.isDebugEnabled()) {
LOG.debug("Adding classification [{}] to [{}] using edge label: [{}]", classificationName, entityTypeName, getTraitLabel(classificationName));
LOG.debug("Adding classification [{}] to [{}] using edge label: [{}]", classificationName, entityType.getTypeName(), getTraitLabel(classificationName));
}
AtlasGraphUtilsV2.addEncodedProperty(entityVertex, TRAIT_NAMES_PROPERTY_KEY, classificationName);
......@@ -1466,7 +1457,7 @@ public class EntityGraphMapper {
}
if (LOG.isDebugEnabled()) {
LOG.debug("Propagating tag: [{}][{}] to {}", classificationName, entityTypeName, getTypeNames(entitiesToPropagateTo));
LOG.debug("Propagating tag: [{}][{}] to {}", classificationName, entityType.getTypeName(), getTypeNames(entitiesToPropagateTo));
}
List<AtlasVertex> entitiesPropagatedTo = deleteDelegate.getHandler().addTagPropagation(classificationVertex, entitiesToPropagateTo);
......@@ -1478,12 +1469,12 @@ public class EntityGraphMapper {
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(" --> Not propagating classification: [{}][{}] - no entities found to propagate to.", getTypeName(classificationVertex), entityTypeName);
LOG.debug(" --> Not propagating classification: [{}][{}] - no entities found to propagate to.", getTypeName(classificationVertex), entityType.getTypeName());
}
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(" --> Not propagating classification: [{}][{}] - propagation is disabled.", getTypeName(classificationVertex), entityTypeName);
LOG.debug(" --> Not propagating classification: [{}][{}] - propagation is disabled.", getTypeName(classificationVertex), entityType.getTypeName());
}
}
......@@ -1867,7 +1858,7 @@ public class EntityGraphMapper {
if (!req.isUpdatedEntity(GraphHelper.getGuid(vertex))) {
updateModificationMetadata(vertex);
req.recordEntityUpdate(entityRetriever.toAtlasObjectId(vertex));
req.recordEntityUpdate(entityRetriever.toAtlasEntityHeader(vertex));
}
}
......
......@@ -209,46 +209,6 @@ public class EntityGraphRetriever {
return ret;
}
public AtlasEntityHeader toAtlasEntityHeader(AtlasEntity entity) {
AtlasEntityHeader ret = null;
String typeName = entity.getTypeName();
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
if (entityType != null) {
Map<String, Object> uniqueAttributes = new HashMap<>();
for (AtlasAttribute attribute : entityType.getUniqAttributes().values()) {
Object attrValue = entity.getAttribute(attribute.getName());
if (attrValue != null) {
uniqueAttributes.put(attribute.getName(), attrValue);
}
}
ret = new AtlasEntityHeader(entity.getTypeName(), entity.getGuid(), uniqueAttributes);
if (CollectionUtils.isNotEmpty(entity.getClassifications())) {
List<AtlasClassification> classifications = new ArrayList<>(entity.getClassifications().size());
List<String> classificationNames = new ArrayList<>(entity.getClassifications().size());
for (AtlasClassification classification : entity.getClassifications()) {
classifications.add(classification);
classificationNames.add(classification.getTypeName());
}
ret.setClassifications(classifications);
ret.setClassificationNames(classificationNames);
}
if (CollectionUtils.isNotEmpty(entity.getMeanings())) {
ret.setMeanings(entity.getMeanings());
ret.setMeaningNames(entity.getMeanings().stream().map(AtlasTermAssignmentHeader::getDisplayText).collect(Collectors.toList()));
}
}
return ret;
}
public AtlasObjectId toAtlasObjectId(AtlasVertex entityVertex) throws AtlasBaseException {
AtlasObjectId ret = null;
String typeName = entityVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class);
......
......@@ -27,18 +27,20 @@ import org.apache.commons.lang.StringUtils;
import java.util.*;
public class EntityMutationContext {
private EntityGraphDiscoveryContext context = null;
private final EntityGraphDiscoveryContext context;
private final List<AtlasEntity> entitiesCreated = new ArrayList<>();
private final List<AtlasEntity> entitiesUpdated = new ArrayList<>();
private final Map<String, AtlasEntityType> entityVsType = new HashMap<>();
private final Map<String, AtlasVertex> entityVsVertex = new HashMap<>();
private final Map<String, String> guidAssignments = new HashMap<>();
private List<AtlasVertex> entitiesToDelete = null;
public EntityMutationContext(final EntityGraphDiscoveryContext context) {
this.context = context;
}
public EntityMutationContext() {
this.context = null;
}
public void addCreated(String internalGuid, AtlasEntity entity, AtlasEntityType type, AtlasVertex atlasVertex) {
......@@ -65,6 +67,19 @@ public class EntityMutationContext {
}
}
public void addEntityToDelete(AtlasVertex vertex) {
if (entitiesToDelete == null) {
entitiesToDelete = new ArrayList<>();
}
entitiesToDelete.add(vertex);
}
public void cacheEntity(String guid, AtlasVertex vertex, AtlasEntityType entityType) {
entityVsType.put(guid, entityType);
entityVsVertex.put(guid, vertex);
}
public EntityGraphDiscoveryContext getDiscoveryContext() {
return this.context;
}
......@@ -81,6 +96,10 @@ public class EntityMutationContext {
return guidAssignments;
}
public List<AtlasVertex> getEntitiesToDelete() {
return entitiesToDelete;
}
public AtlasEntityType getType(String guid) {
return entityVsType.get(guid);
}
......@@ -130,6 +149,10 @@ public class EntityMutationContext {
return getFromCollection(parentGuid, getUpdatedEntities());
}
public boolean isDeletedEntity(AtlasVertex vertex) {
return entitiesToDelete != null && entitiesToDelete.contains(vertex);
}
private AtlasEntity getFromCollection(String parentGuid, Collection<AtlasEntity> coll) {
for (AtlasEntity e : coll) {
if(e.getGuid().equalsIgnoreCase(parentGuid)) {
......
......@@ -21,7 +21,7 @@ package org.apache.atlas;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.store.DeleteType;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
......@@ -39,8 +39,8 @@ public class RequestContext {
private static final boolean isMetricsEnabled = METRICS.isDebugEnabled();
private final long requestTime = System.currentTimeMillis();
private final Map<String, AtlasObjectId> updatedEntities = new HashMap<>();
private final Map<String, AtlasObjectId> deletedEntities = new HashMap<>();
private final Map<String, AtlasEntityHeader> updatedEntities = new HashMap<>();
private final Map<String, AtlasEntityHeader> deletedEntities = new HashMap<>();
private final Map<String, AtlasEntity> entityCache = new HashMap<>();
private final Map<String, AtlasEntityWithExtInfo> entityExtInfoCache = new HashMap<>();
private final Map<String, List<AtlasClassification>> addedPropagations = new HashMap<>();
......@@ -164,13 +164,13 @@ public class RequestContext {
isImportInProgress = importInProgress;
}
public void recordEntityUpdate(AtlasObjectId entity) {
public void recordEntityUpdate(AtlasEntityHeader entity) {
if (entity != null && entity.getGuid() != null) {
updatedEntities.put(entity.getGuid(), entity);
}
}
public void recordEntityDelete(AtlasObjectId entity) {
public void recordEntityDelete(AtlasEntityHeader entity) {
if (entity != null && entity.getGuid() != null) {
deletedEntities.put(entity.getGuid(), entity);
}
......@@ -248,11 +248,11 @@ public class RequestContext {
}
public Collection<AtlasObjectId> getUpdatedEntities() {
public Collection<AtlasEntityHeader> getUpdatedEntities() {
return updatedEntities.values();
}
public Collection<AtlasObjectId> getDeletedEntities() {
public Collection<AtlasEntityHeader> getDeletedEntities() {
return deletedEntities.values();
}
......
......@@ -60,7 +60,7 @@ public class AdminExportImportTestIT extends BaseResourceIT {
@Test(dependsOnMethods = "isActive")
public void importData() throws AtlasServiceException {
performImport(FILE_TO_IMPORT);
performImport(FILE_TO_IMPORT, 37);
assertReplicationData("cl1");
}
......@@ -84,21 +84,21 @@ public class AdminExportImportTestIT extends BaseResourceIT {
assertTrue(zs.getCreationOrder().size() > EXPECTED_CREATION_ORDER_SIZE);
}
private void performImport(String fileToImport) throws AtlasServiceException {
private void performImport(String fileToImport, int expectedProcessedEntitiesCount) throws AtlasServiceException {
AtlasImportRequest request = new AtlasImportRequest();
request.getOptions().put(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM, SOURCE_SERVER_NAME);
request.getOptions().put(AtlasImportRequest.TRANSFORMS_KEY, IMPORT_TRANSFORM_CLEAR_ATTRS);
performImport(fileToImport, request);
performImport(fileToImport, request, expectedProcessedEntitiesCount);
}
private void performImport(String fileToImport, AtlasImportRequest request) throws AtlasServiceException {
private void performImport(String fileToImport, AtlasImportRequest request, int expectedProcessedEntitiesCount) throws AtlasServiceException {
AtlasImportResult result = performImportUsing(fileToImport, request);
assertNotNull(result);
assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS);
assertNotNull(result.getMetrics());
assertEquals(result.getProcessedEntities().size(), 37);
assertEquals(result.getProcessedEntities().size(), expectedProcessedEntitiesCount, "processedEntities: expected=" + expectedProcessedEntitiesCount + ", found=" + result.getProcessedEntities().size() + ". result=" + result);
}
private AtlasImportResult performImportUsing(String fileToImport, AtlasImportRequest request) throws AtlasServiceException {
......@@ -126,7 +126,7 @@ public class AdminExportImportTestIT extends BaseResourceIT {
request.getOptions().put(AtlasImportRequest.TRANSFORMS_KEY, IMPORT_TRANSFORM_SET_DELETED);
try {
performImport(FILE_TO_IMPORT, request);
performImport(FILE_TO_IMPORT, request, 32); // initial import has 5 entities already in deleted state, hence current import will have 32 processed-entities
} catch (AtlasServiceException e) {
throw new SkipException("performTeardown: failed! Subsequent tests results may be affected.");
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment