Commit ce54e8a4 by apoorvnaik Committed by Madhan Neethiraj

ATLAS-1499: Notification processing using V2 Store (#2) - fixes in handling of…

ATLAS-1499: Notification processing using V2 Store (#2) - fixes in handling of partial-update notifications Signed-off-by: 's avatarMadhan Neethiraj <madhan@apache.org>
parent 1d85e95f
...@@ -125,7 +125,7 @@ public class AtlasInstanceConverter { ...@@ -125,7 +125,7 @@ public class AtlasInstanceConverter {
return ret; return ret;
} }
public AtlasEntity.AtlasEntitiesWithExtInfo getAtlasEntity(IReferenceableInstance referenceable) throws AtlasBaseException { public AtlasEntity.AtlasEntitiesWithExtInfo toAtlasEntity(IReferenceableInstance referenceable) throws AtlasBaseException {
AtlasEntityFormatConverter converter = (AtlasEntityFormatConverter) instanceFormatters.getConverter(TypeCategory.ENTITY); AtlasEntityFormatConverter converter = (AtlasEntityFormatConverter) instanceFormatters.getConverter(TypeCategory.ENTITY);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(referenceable.getTypeName()); AtlasEntityType entityType = typeRegistry.getEntityTypeByName(referenceable.getTypeName());
...@@ -187,9 +187,9 @@ public class AtlasInstanceConverter { ...@@ -187,9 +187,9 @@ public class AtlasInstanceConverter {
return new AtlasBaseException(e); return new AtlasBaseException(e);
} }
public AtlasEntity.AtlasEntitiesWithExtInfo getEntities(List<Referenceable> referenceables) throws AtlasBaseException { public AtlasEntity.AtlasEntitiesWithExtInfo toAtlasEntities(List<Referenceable> referenceables) throws AtlasBaseException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("==> getEntities"); LOG.debug("==> toAtlasEntities");
} }
AtlasFormatConverter.ConverterContext context = new AtlasFormatConverter.ConverterContext(); AtlasFormatConverter.ConverterContext context = new AtlasFormatConverter.ConverterContext();
...@@ -199,7 +199,7 @@ public class AtlasInstanceConverter { ...@@ -199,7 +199,7 @@ public class AtlasInstanceConverter {
context.addEntity(entity); context.addEntity(entity);
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("<== getEntities"); LOG.debug("<== toAtlasEntities");
} }
return context.getEntities(); return context.getEntities();
......
...@@ -224,9 +224,9 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { ...@@ -224,9 +224,9 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entity to update."); throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entity to update.");
} }
AtlasVertex entityVertex = AtlasGraphUtilsV1.getVertexByUniqueAttributes(entityType, uniqAttributes); String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, uniqAttributes);
updatedEntity.setGuid(AtlasGraphUtilsV1.getIdFromVertex(entityVertex)); updatedEntity.setGuid(guid);
return createOrUpdate(new AtlasEntityStream(updatedEntity), true); return createOrUpdate(new AtlasEntityStream(updatedEntity), true);
} }
...@@ -249,7 +249,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { ...@@ -249,7 +249,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
} }
} }
Collection<AtlasVertex> deletionCandidates = new ArrayList<AtlasVertex>(); Collection<AtlasVertex> deletionCandidates = new ArrayList<>();
deletionCandidates.add(vertex); deletionCandidates.add(vertex);
return deleteVertices(deletionCandidates); return deleteVertices(deletionCandidates);
......
...@@ -190,6 +190,11 @@ public class AtlasGraphUtilsV1 { ...@@ -190,6 +190,11 @@ public class AtlasGraphUtilsV1 {
return vertex; return vertex;
} }
public static String getGuidByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> attrValues) throws AtlasBaseException {
AtlasVertex vertexByUniqueAttributes = getVertexByUniqueAttributes(entityType, attrValues);
return getIdFromVertex(vertexByUniqueAttributes);
}
public static AtlasVertex findByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> attrValues) { public static AtlasVertex findByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> attrValues) {
AtlasVertex vertex = null; AtlasVertex vertex = null;
......
...@@ -29,9 +29,11 @@ import org.apache.atlas.ha.HAConfiguration; ...@@ -29,9 +29,11 @@ import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream; import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
import org.apache.atlas.service.Service; import org.apache.atlas.service.Service;
import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
...@@ -45,7 +47,6 @@ import org.slf4j.LoggerFactory; ...@@ -45,7 +47,6 @@ import org.slf4j.LoggerFactory;
import javax.inject.Inject; import javax.inject.Inject;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
...@@ -54,7 +55,10 @@ import java.util.concurrent.Executors; ...@@ -54,7 +55,10 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.atlas.notification.hook.HookNotification.*; import static org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest;
import static org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequest;
import static org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest;
import static org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
/** /**
* Consumer of notifications from hooks e.g., hive hook etc. * Consumer of notifications from hooks e.g., hive hook etc.
...@@ -249,7 +253,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -249,7 +253,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
EntityCreateRequest createRequest = (EntityCreateRequest) message; EntityCreateRequest createRequest = (EntityCreateRequest) message;
audit(messageUser, AtlasClient.API.CREATE_ENTITY); audit(messageUser, AtlasClient.API.CREATE_ENTITY);
entities = instanceConverter.getEntities(createRequest.getEntities()); entities = instanceConverter.toAtlasEntities(createRequest.getEntities());
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false); atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
break; break;
...@@ -262,11 +266,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -262,11 +266,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
audit(messageUser, AtlasClient.API.UPDATE_ENTITY_PARTIAL); audit(messageUser, AtlasClient.API.UPDATE_ENTITY_PARTIAL);
Referenceable referenceable = partialUpdateRequest.getEntity(); Referenceable referenceable = partialUpdateRequest.getEntity();
entities = instanceConverter.getEntities(Collections.singletonList(referenceable)); entities = instanceConverter.toAtlasEntity(referenceable);
// There should only be one root entity after the conversion
AtlasEntity entity = entities.getEntities().get(0); AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
// Need to set the attributes explicitly here as the qualified name might have changed during update String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, Object>(){
entity.setAttribute(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue()); { put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue()); }
});
// There should only be one root entity
entities.getEntities().get(0).setGuid(guid);
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), true); atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), true);
break; break;
...@@ -293,7 +302,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -293,7 +302,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
EntityUpdateRequest updateRequest = (EntityUpdateRequest) message; EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
audit(messageUser, AtlasClient.API.UPDATE_ENTITY); audit(messageUser, AtlasClient.API.UPDATE_ENTITY);
entities = instanceConverter.getEntities(updateRequest.getEntities()); entities = instanceConverter.toAtlasEntities(updateRequest.getEntities());
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false); atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
break; break;
......
...@@ -77,7 +77,7 @@ public class NotificationHookConsumerKafkaTest { ...@@ -77,7 +77,7 @@ public class NotificationHookConsumerKafkaTest {
AtlasType mockType = mock(AtlasType.class); AtlasType mockType = mock(AtlasType.class);
when(typeRegistry.getType(anyString())).thenReturn(mockType); when(typeRegistry.getType(anyString())).thenReturn(mockType);
AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class); AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class);
when(instanceConverter.getEntities(anyList())).thenReturn(mockEntity); when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity);
kafkaNotification = startKafkaServer(); kafkaNotification = startKafkaServer();
} }
......
...@@ -75,7 +75,7 @@ public class NotificationHookConsumerTest { ...@@ -75,7 +75,7 @@ public class NotificationHookConsumerTest {
AtlasType mockType = mock(AtlasType.class); AtlasType mockType = mock(AtlasType.class);
when(typeRegistry.getType(anyString())).thenReturn(mockType); when(typeRegistry.getType(anyString())).thenReturn(mockType);
AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class); AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class);
when(instanceConverter.getEntities(anyList())).thenReturn(mockEntity); when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity);
EntityMutationResponse mutationResponse = mock(EntityMutationResponse.class); EntityMutationResponse mutationResponse = mock(EntityMutationResponse.class);
when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenReturn(mutationResponse); when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenReturn(mutationResponse);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment