Commit 59648d28 by Madhan Neethiraj

ATLAS-3279: avoid unncessary retrieval of entity-extended info while sending notifications

Change-Id: I82e0bba27010709c74cd98a93f8a9c617577535e
parent 9062e2c8
...@@ -271,11 +271,11 @@ public class FullTextMapperV2 { ...@@ -271,11 +271,11 @@ public class FullTextMapperV2 {
} }
} }
private AtlasEntity getAndCacheEntity(String guid) throws AtlasBaseException { public AtlasEntity getAndCacheEntity(String guid) throws AtlasBaseException {
return getAndCacheEntity(guid, true); return getAndCacheEntity(guid, true);
} }
private AtlasEntity getAndCacheEntity(String guid, boolean includeReferences) throws AtlasBaseException { public AtlasEntity getAndCacheEntity(String guid, boolean includeReferences) throws AtlasBaseException {
RequestContext context = RequestContext.get(); RequestContext context = RequestContext.get();
AtlasEntity entity = context.getEntity(guid); AtlasEntity entity = context.getEntity(guid);
...@@ -294,7 +294,7 @@ public class FullTextMapperV2 { ...@@ -294,7 +294,7 @@ public class FullTextMapperV2 {
return entity; return entity;
} }
private AtlasEntityWithExtInfo getAndCacheEntityWithExtInfo(String guid) throws AtlasBaseException { public AtlasEntityWithExtInfo getAndCacheEntityWithExtInfo(String guid) throws AtlasBaseException {
RequestContext context = RequestContext.get(); RequestContext context = RequestContext.get();
AtlasEntityWithExtInfo entityWithExtInfo = context.getEntityWithExtInfo(guid); AtlasEntityWithExtInfo entityWithExtInfo = context.getEntityWithExtInfo(guid);
......
...@@ -92,7 +92,7 @@ public class AtlasEntityChangeNotifier { ...@@ -92,7 +92,7 @@ public class AtlasEntityChangeNotifier {
} }
public void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException { public void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException {
if (CollectionUtils.isEmpty(entityChangeListeners) || instanceConverter == null) { if (CollectionUtils.isEmpty(entityChangeListeners)) {
return; return;
} }
...@@ -118,7 +118,7 @@ public class AtlasEntityChangeNotifier { ...@@ -118,7 +118,7 @@ public class AtlasEntityChangeNotifier {
} }
public void notifyRelationshipMutation(AtlasRelationship relationship, EntityNotification.EntityNotificationV2.OperationType operationType) throws AtlasBaseException { public void notifyRelationshipMutation(AtlasRelationship relationship, EntityNotification.EntityNotificationV2.OperationType operationType) throws AtlasBaseException {
if (CollectionUtils.isEmpty(entityChangeListeners) || instanceConverter == null) { if (CollectionUtils.isEmpty(entityChangeListeners)) {
return; return;
} }
...@@ -145,75 +145,76 @@ public class AtlasEntityChangeNotifier { ...@@ -145,75 +145,76 @@ public class AtlasEntityChangeNotifier {
} else { } else {
updateFullTextMapping(entity.getGuid(), addedClassifications); updateFullTextMapping(entity.getGuid(), addedClassifications);
Referenceable entityRef = toReferenceable(entity.getGuid()); if (instanceConverter != null) {
List<Struct> traits = toStruct(addedClassifications); Referenceable entityRef = toReferenceable(entity.getGuid());
List<Struct> traits = toStruct(addedClassifications);
if (entity == null || CollectionUtils.isEmpty(traits)) { if (entity == null || CollectionUtils.isEmpty(traits)) {
return; return;
} }
for (EntityChangeListener listener : entityChangeListeners) { for (EntityChangeListener listener : entityChangeListeners) {
try { try {
listener.onTraitsAdded(entityRef, traits); listener.onTraitsAdded(entityRef, traits);
} catch (AtlasException e) { } catch (AtlasException e) {
throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitAdd"); throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitAdd");
}
} }
} }
} }
} }
public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException { public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException {
if (isV2EntityNotificationEnabled) { doFullTextMapping(entity.getGuid());
doFullTextMapping(entity.getGuid());
if (isV2EntityNotificationEnabled) {
for (EntityChangeListenerV2 listener : entityChangeListenersV2) { for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
listener.onClassificationsUpdated(entity, updatedClassifications); listener.onClassificationsUpdated(entity, updatedClassifications);
} }
} else { } else {
doFullTextMapping(entity.getGuid()); if (instanceConverter != null) {
Referenceable entityRef = toReferenceable(entity.getGuid());
List<Struct> traits = toStruct(updatedClassifications);
Referenceable entityRef = toReferenceable(entity.getGuid()); if (entityRef == null || CollectionUtils.isEmpty(traits)) {
List<Struct> traits = toStruct(updatedClassifications); return;
}
if (entityRef == null || CollectionUtils.isEmpty(traits)) {
return;
}
for (EntityChangeListener listener : entityChangeListeners) { for (EntityChangeListener listener : entityChangeListeners) {
try { try {
listener.onTraitsUpdated(entityRef, traits); listener.onTraitsUpdated(entityRef, traits);
} catch (AtlasException e) { } catch (AtlasException e) {
throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitUpdate"); throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitUpdate");
}
} }
} }
} }
} }
public void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException { public void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
if (isV2EntityNotificationEnabled) { doFullTextMapping(entity.getGuid());
doFullTextMapping(entity.getGuid());
if (isV2EntityNotificationEnabled) {
for (EntityChangeListenerV2 listener : entityChangeListenersV2) { for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
listener.onClassificationsDeleted(entity, deletedClassifications); listener.onClassificationsDeleted(entity, deletedClassifications);
} }
} else { } else {
doFullTextMapping(entity.getGuid()); if (instanceConverter != null) {
Referenceable entityRef = toReferenceable(entity.getGuid());
Referenceable entityRef = toReferenceable(entity.getGuid()); List<Struct> traits = toStruct(deletedClassifications);
List<Struct> traits = toStruct(deletedClassifications);
if (entityRef == null || CollectionUtils.isEmpty(deletedClassifications)) { if (entityRef == null || CollectionUtils.isEmpty(deletedClassifications)) {
return; return;
} }
for (EntityChangeListener listener : entityChangeListeners) { for (EntityChangeListener listener : entityChangeListeners) {
try { try {
listener.onTraitsDeleted(entityRef, traits); listener.onTraitsDeleted(entityRef, traits);
} catch (AtlasException e) { } catch (AtlasException e) {
throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitDelete"); throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitDelete");
}
} }
} }
} }
} }
...@@ -223,7 +224,7 @@ public class AtlasEntityChangeNotifier { ...@@ -223,7 +224,7 @@ public class AtlasEntityChangeNotifier {
for (EntityChangeListenerV2 listener : entityChangeListenersV2) { for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
listener.onTermAdded(term, entityIds); listener.onTermAdded(term, entityIds);
} }
} else { } else if (instanceConverter != null) {
List<Referenceable> entityRefs = toReferenceables(entityIds); List<Referenceable> entityRefs = toReferenceables(entityIds);
for (EntityChangeListener listener : entityChangeListeners) { for (EntityChangeListener listener : entityChangeListeners) {
...@@ -242,7 +243,7 @@ public class AtlasEntityChangeNotifier { ...@@ -242,7 +243,7 @@ public class AtlasEntityChangeNotifier {
for (EntityChangeListenerV2 listener : entityChangeListenersV2) { for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
listener.onTermDeleted(term, entityIds); listener.onTermDeleted(term, entityIds);
} }
} else { } else if (instanceConverter != null) {
List<Referenceable> entityRefs = toReferenceables(entityIds); List<Referenceable> entityRefs = toReferenceables(entityIds);
for (EntityChangeListener listener : entityChangeListeners) { for (EntityChangeListener listener : entityChangeListeners) {
...@@ -277,7 +278,7 @@ public class AtlasEntityChangeNotifier { ...@@ -277,7 +278,7 @@ public class AtlasEntityChangeNotifier {
continue; continue;
} }
AtlasEntity entity = instanceConverter.getAndCacheEntity(guid); AtlasEntity entity = fullTextMapperV2.getAndCacheEntity(guid);
if (entity == null) { if (entity == null) {
continue; continue;
...@@ -300,11 +301,15 @@ public class AtlasEntityChangeNotifier { ...@@ -300,11 +301,15 @@ public class AtlasEntityChangeNotifier {
return; return;
} }
MetricRecorder metric = RequestContext.get().startMetricRecord("notifyListeners");
if (isV2EntityNotificationEnabled) { if (isV2EntityNotificationEnabled) {
notifyV2Listeners(entityHeaders, operation, isImport); notifyV2Listeners(entityHeaders, operation, isImport);
} else { } else {
notifyV1Listeners(entityHeaders, operation, isImport); notifyV1Listeners(entityHeaders, operation, isImport);
} }
RequestContext.get().endMetricRecord(metric);
} }
private void notifyRelationshipListeners(List<AtlasRelationship> relationships, EntityOperation operation, boolean isImport) throws AtlasBaseException { private void notifyRelationshipListeners(List<AtlasRelationship> relationships, EntityOperation operation, boolean isImport) throws AtlasBaseException {
...@@ -322,24 +327,26 @@ public class AtlasEntityChangeNotifier { ...@@ -322,24 +327,26 @@ public class AtlasEntityChangeNotifier {
private void notifyV1Listeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException { private void notifyV1Listeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException {
List<Referenceable> typedRefInsts = toReferenceables(entityHeaders, operation); if (instanceConverter != null) {
List<Referenceable> typedRefInsts = toReferenceables(entityHeaders, operation);
for (EntityChangeListener listener : entityChangeListeners) { for (EntityChangeListener listener : entityChangeListeners) {
try { try {
switch (operation) { switch (operation) {
case CREATE: case CREATE:
listener.onEntitiesAdded(typedRefInsts, isImport); listener.onEntitiesAdded(typedRefInsts, isImport);
break; break;
case UPDATE: case UPDATE:
case PARTIAL_UPDATE: case PARTIAL_UPDATE:
listener.onEntitiesUpdated(typedRefInsts, isImport); listener.onEntitiesUpdated(typedRefInsts, isImport);
break; break;
case DELETE: case DELETE:
listener.onEntitiesDeleted(typedRefInsts, isImport); listener.onEntitiesDeleted(typedRefInsts, isImport);
break; break;
}
} catch (AtlasException e) {
throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), operation.toString());
} }
} catch (AtlasException e) {
throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), operation.toString());
} }
} }
} }
...@@ -383,17 +390,19 @@ public class AtlasEntityChangeNotifier { ...@@ -383,17 +390,19 @@ public class AtlasEntityChangeNotifier {
} }
} }
private List<Referenceable> toReferenceables(List<AtlasEntityHeader> entityHeaders, EntityOperation operation) throws AtlasBaseException { private List<Referenceable> toReferenceables(List<AtlasEntityHeader> entityHeaders, EntityOperation operation) throws AtlasBaseException {
List<Referenceable> ret = new ArrayList<>(entityHeaders.size()); List<Referenceable> ret = new ArrayList<>(entityHeaders.size());
// delete notifications don't need all attributes. Hence the special handling for delete operation if (instanceConverter != null) {
if (operation == EntityOperation.DELETE) { // delete notifications don't need all attributes. Hence the special handling for delete operation
for (AtlasEntityHeader entityHeader : entityHeaders) { if (operation == EntityOperation.DELETE) {
ret.add(new Referenceable(entityHeader.getGuid(), entityHeader.getTypeName(), entityHeader.getAttributes())); for (AtlasEntityHeader entityHeader : entityHeaders) {
} ret.add(new Referenceable(entityHeader.getGuid(), entityHeader.getTypeName(), entityHeader.getAttributes()));
} else { }
for (AtlasEntityHeader entityHeader : entityHeaders) { } else {
ret.add(toReferenceable(entityHeader.getGuid())); for (AtlasEntityHeader entityHeader : entityHeaders) {
ret.add(toReferenceable(entityHeader.getGuid()));
}
} }
} }
...@@ -403,7 +412,7 @@ public class AtlasEntityChangeNotifier { ...@@ -403,7 +412,7 @@ public class AtlasEntityChangeNotifier {
private List<Referenceable> toReferenceables(List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException { private List<Referenceable> toReferenceables(List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
List<Referenceable> ret = new ArrayList<>(); List<Referenceable> ret = new ArrayList<>();
if (CollectionUtils.isNotEmpty(entityIds)) { if (instanceConverter != null && CollectionUtils.isNotEmpty(entityIds)) {
for (AtlasRelatedObjectId relatedObjectId : entityIds) { for (AtlasRelatedObjectId relatedObjectId : entityIds) {
String entityGuid = relatedObjectId.getGuid(); String entityGuid = relatedObjectId.getGuid();
...@@ -417,17 +426,17 @@ public class AtlasEntityChangeNotifier { ...@@ -417,17 +426,17 @@ public class AtlasEntityChangeNotifier {
private Referenceable toReferenceable(String entityId) throws AtlasBaseException { private Referenceable toReferenceable(String entityId) throws AtlasBaseException {
Referenceable ret = null; Referenceable ret = null;
if (StringUtils.isNotEmpty(entityId)) { if (instanceConverter != null && StringUtils.isNotEmpty(entityId)) {
ret = instanceConverter.getReferenceable(entityId); ret = instanceConverter.getReferenceable(entityId);
} }
return ret; return ret;
} }
private List<Struct> toStruct(List<AtlasClassification> classifications) throws AtlasBaseException { private List<Struct> toStruct(List<AtlasClassification> classifications) throws AtlasBaseException {
List<Struct> ret = null; List<Struct> ret = null;
if (classifications != null) { if (instanceConverter != null && classifications != null) {
ret = new ArrayList<>(classifications.size()); ret = new ArrayList<>(classifications.size());
for (AtlasClassification classification : classifications) { for (AtlasClassification classification : classifications) {
...@@ -468,7 +477,7 @@ public class AtlasEntityChangeNotifier { ...@@ -468,7 +477,7 @@ public class AtlasEntityChangeNotifier {
} else { } else {
String entityGuid = entityHeader.getGuid(); String entityGuid = entityHeader.getGuid();
entity = instanceConverter.getAndCacheEntity(entityGuid); entity = fullTextMapperV2.getAndCacheEntity(entityGuid);
} }
if (entity != null) { if (entity != null) {
...@@ -545,6 +554,10 @@ public class AtlasEntityChangeNotifier { ...@@ -545,6 +554,10 @@ public class AtlasEntityChangeNotifier {
} }
private void doFullTextMapping(String guid) { private void doFullTextMapping(String guid) {
if(AtlasRepositoryConfiguration.isFreeTextSearchEnabled() || !AtlasRepositoryConfiguration.isFullTextSearchEnabled()) {
return;
}
AtlasEntityHeader entityHeader = new AtlasEntityHeader(); AtlasEntityHeader entityHeader = new AtlasEntityHeader();
entityHeader.setGuid(guid); entityHeader.setGuid(guid);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment