Commit 56eefb2a by Madhan Neethiraj

ATLAS-2853: updated to send entity-notifications after successful graph transaction commit

parent 01b195a9
...@@ -38,7 +38,6 @@ import org.apache.atlas.type.AtlasEntityType; ...@@ -38,7 +38,6 @@ import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct; import org.apache.atlas.v1.model.instance.Struct;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.graph.FullTextMapperV2; import org.apache.atlas.repository.graph.FullTextMapperV2;
import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graph.GraphHelper;
...@@ -56,13 +55,11 @@ import java.util.ArrayList; ...@@ -56,13 +55,11 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_ADD; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_ADD;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_DELETE; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_DELETE;
import static org.apache.atlas.repository.Constants.ENTITY_TEXT_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.ENTITY_TEXT_PROPERTY_KEY;
import static org.apache.atlas.util.AtlasRepositoryConfiguration.isV2EntityNotificationEnabled;
@Component @Component
...@@ -74,6 +71,7 @@ public class AtlasEntityChangeNotifier { ...@@ -74,6 +71,7 @@ public class AtlasEntityChangeNotifier {
private final AtlasInstanceConverter instanceConverter; private final AtlasInstanceConverter instanceConverter;
private final FullTextMapperV2 fullTextMapperV2; private final FullTextMapperV2 fullTextMapperV2;
private final AtlasTypeRegistry atlasTypeRegistry; private final AtlasTypeRegistry atlasTypeRegistry;
private final boolean isV2EntityNotificationEnabled;
@Inject @Inject
...@@ -82,11 +80,12 @@ public class AtlasEntityChangeNotifier { ...@@ -82,11 +80,12 @@ public class AtlasEntityChangeNotifier {
AtlasInstanceConverter instanceConverter, AtlasInstanceConverter instanceConverter,
FullTextMapperV2 fullTextMapperV2, FullTextMapperV2 fullTextMapperV2,
AtlasTypeRegistry atlasTypeRegistry) { AtlasTypeRegistry atlasTypeRegistry) {
this.entityChangeListeners = entityChangeListeners; this.entityChangeListeners = entityChangeListeners;
this.entityChangeListenersV2 = entityChangeListenersV2; this.entityChangeListenersV2 = entityChangeListenersV2;
this.instanceConverter = instanceConverter; this.instanceConverter = instanceConverter;
this.fullTextMapperV2 = fullTextMapperV2; this.fullTextMapperV2 = fullTextMapperV2;
this.atlasTypeRegistry = atlasTypeRegistry; this.atlasTypeRegistry = atlasTypeRegistry;
this.isV2EntityNotificationEnabled = AtlasRepositoryConfiguration.isV2EntityNotificationEnabled();
} }
public void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException { public void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException {
...@@ -114,7 +113,7 @@ public class AtlasEntityChangeNotifier { ...@@ -114,7 +113,7 @@ public class AtlasEntityChangeNotifier {
} }
public void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException { public void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
if (isV2EntityNotificationEnabled()) { if (isV2EntityNotificationEnabled) {
doFullTextMapping(entity.getGuid()); doFullTextMapping(entity.getGuid());
for (EntityChangeListenerV2 listener : entityChangeListenersV2) { for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
...@@ -141,7 +140,7 @@ public class AtlasEntityChangeNotifier { ...@@ -141,7 +140,7 @@ public class AtlasEntityChangeNotifier {
} }
public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException { public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException {
if (isV2EntityNotificationEnabled()) { if (isV2EntityNotificationEnabled) {
doFullTextMapping(entity.getGuid()); doFullTextMapping(entity.getGuid());
for (EntityChangeListenerV2 listener : entityChangeListenersV2) { for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
...@@ -168,7 +167,7 @@ public class AtlasEntityChangeNotifier { ...@@ -168,7 +167,7 @@ public class AtlasEntityChangeNotifier {
} }
public void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException { public void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
if (isV2EntityNotificationEnabled()) { if (isV2EntityNotificationEnabled) {
doFullTextMapping(entity.getGuid()); doFullTextMapping(entity.getGuid());
for (EntityChangeListenerV2 listener : entityChangeListenersV2) { for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
...@@ -197,7 +196,7 @@ public class AtlasEntityChangeNotifier { ...@@ -197,7 +196,7 @@ public class AtlasEntityChangeNotifier {
public void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException { public void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
// listeners notified on term-entity association only if v2 notifications are enabled // listeners notified on term-entity association only if v2 notifications are enabled
if (isV2EntityNotificationEnabled()) { if (isV2EntityNotificationEnabled) {
for (EntityChangeListenerV2 listener : entityChangeListenersV2) { for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
listener.onTermAdded(term, entityIds); listener.onTermAdded(term, entityIds);
} }
...@@ -216,7 +215,7 @@ public class AtlasEntityChangeNotifier { ...@@ -216,7 +215,7 @@ public class AtlasEntityChangeNotifier {
public void onTermDeletedFromEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException { public void onTermDeletedFromEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
// listeners notified on term-entity disassociation only if v2 notifications are enabled // listeners notified on term-entity disassociation only if v2 notifications are enabled
if (isV2EntityNotificationEnabled()) { if (isV2EntityNotificationEnabled) {
for (EntityChangeListenerV2 listener : entityChangeListenersV2) { for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
listener.onTermDeleted(term, entityIds); listener.onTermDeleted(term, entityIds);
} }
...@@ -279,7 +278,7 @@ public class AtlasEntityChangeNotifier { ...@@ -279,7 +278,7 @@ public class AtlasEntityChangeNotifier {
return; return;
} }
if (isV2EntityNotificationEnabled()) { if (isV2EntityNotificationEnabled) {
notifyV2Listeners(entityHeaders, operation, isImport); notifyV2Listeners(entityHeaders, operation, isImport);
} else { } else {
notifyV1Listeners(entityHeaders, operation, isImport); notifyV1Listeners(entityHeaders, operation, isImport);
......
...@@ -35,6 +35,8 @@ import org.apache.atlas.type.AtlasTypeRegistry; ...@@ -35,6 +35,8 @@ import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils; import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.inject.Inject; import javax.inject.Inject;
...@@ -45,7 +47,6 @@ import java.util.List; ...@@ -45,7 +47,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import static org.apache.atlas.notification.NotificationInterface.NotificationType.ENTITIES;
import static org.apache.atlas.repository.graph.GraphHelper.isInternalType; import static org.apache.atlas.repository.graph.GraphHelper.isInternalType;
import static org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2.OperationType.*; import static org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2.OperationType.*;
import static org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.CREATE_TIME; import static org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.CREATE_TIME;
...@@ -56,15 +57,17 @@ import static org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.QU ...@@ -56,15 +57,17 @@ import static org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.QU
@Component @Component
public class EntityNotificationListenerV2 implements EntityChangeListenerV2 { public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
private final AtlasTypeRegistry typeRegistry; private static final Logger LOG = LoggerFactory.getLogger(EntityNotificationListenerV2.class);
private final NotificationInterface notificationInterface;
private final AtlasTypeRegistry typeRegistry;
private final EntityNotificationSender<EntityNotificationV2> notificationSender;
@Inject @Inject
public EntityNotificationListenerV2(AtlasTypeRegistry typeRegistry, public EntityNotificationListenerV2(AtlasTypeRegistry typeRegistry,
NotificationInterface notificationInterface, NotificationInterface notificationInterface,
Configuration configuration) { Configuration configuration) {
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.notificationInterface = notificationInterface; this.notificationSender = new EntityNotificationSender<>(notificationInterface, configuration);
} }
@Override @Override
...@@ -127,7 +130,7 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 { ...@@ -127,7 +130,7 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
if (!messages.isEmpty()) { if (!messages.isEmpty()) {
try { try {
notificationInterface.send(ENTITIES, messages); notificationSender.send(messages);
} catch (NotificationException e) { } catch (NotificationException e) {
throw new AtlasBaseException(AtlasErrorCode.ENTITY_NOTIFICATION_FAILED, e, operationType.name()); throw new AtlasBaseException(AtlasErrorCode.ENTITY_NOTIFICATION_FAILED, e, operationType.name());
} }
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import static org.apache.atlas.notification.NotificationInterface.NotificationType.ENTITIES;
public class EntityNotificationSender<T> {
private static final Logger LOG = LoggerFactory.getLogger(EntityNotificationSender.class);
private final static boolean NOTIFY_POST_COMMIT_DEFAULT = true;
private final NotificationSender<T> notificationSender;
public EntityNotificationSender(NotificationInterface notificationInterface, Configuration configuration) {
this(notificationInterface, configuration != null ? configuration.getBoolean("atlas.notification.send.postcommit", NOTIFY_POST_COMMIT_DEFAULT) : NOTIFY_POST_COMMIT_DEFAULT);
}
public EntityNotificationSender(NotificationInterface notificationInterface, boolean sendPostCommit) {
if (sendPostCommit) {
LOG.info("EntityNotificationSender: notifications will be sent after transaction commit");
this.notificationSender = new PostCommitNotificationSender(notificationInterface);
} else {
LOG.info("EntityNotificationSender: notifications will be sent inline (i.e. not waiting for transaction to commit)");
this.notificationSender = new InlineNotificationSender(notificationInterface);
}
}
public void send(List<T> notifications) throws NotificationException {
this.notificationSender.send(notifications);
}
private interface NotificationSender<T> {
void send(List<T> notifications) throws NotificationException;
}
private class InlineNotificationSender<T> implements NotificationSender<T> {
private final NotificationInterface notificationInterface;
public InlineNotificationSender(NotificationInterface notificationInterface) {
this.notificationInterface = notificationInterface;
}
@Override
public void send(List<T> notifications) throws NotificationException {
notificationInterface.send(ENTITIES, notifications);
}
}
private class PostCommitNotificationSender<T> implements NotificationSender<T> {
private final NotificationInterface notificationInterface;
private final ThreadLocal<PostCommitNotificationHook> postCommitNotificationHooks = new ThreadLocal<>();
public PostCommitNotificationSender(NotificationInterface notificationInterface) {
this.notificationInterface = notificationInterface;
}
@Override
public void send(List<T> notifications) throws NotificationException {
PostCommitNotificationHook notificationHook = postCommitNotificationHooks.get();
if (notificationHook == null) {
notificationHook = new PostCommitNotificationHook(notifications);
postCommitNotificationHooks.set(notificationHook);
} else {
notificationHook.addNotifications(notifications);
}
}
class PostCommitNotificationHook<T> extends GraphTransactionInterceptor.PostTransactionHook {
private final List<T> notifications = new ArrayList<>();
public PostCommitNotificationHook(List<T> notifications) {
this.addNotifications(notifications);
}
public void addNotifications(List<T> notifications) {
if (notifications != null) {
this.notifications.addAll(notifications);
}
}
@Override
public void onComplete(boolean isSuccess) {
postCommitNotificationHooks.remove();
if (CollectionUtils.isNotEmpty(notifications)) {
if (isSuccess) {
try {
notificationInterface.send(ENTITIES, notifications);
} catch (NotificationException excp) {
LOG.error("failed to send entity notifications", excp);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Transaction not committed. Not sending {} notifications: {}", notifications.size(), notifications);
}
}
}
}
}
}
}
\ No newline at end of file
...@@ -18,11 +18,9 @@ ...@@ -18,11 +18,9 @@
package org.apache.atlas.notification; package org.apache.atlas.notification;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.model.glossary.AtlasGlossaryTerm; import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
import org.apache.atlas.notification.NotificationInterface.NotificationType;
import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct; import org.apache.atlas.v1.model.instance.Struct;
import org.apache.atlas.v1.model.notification.EntityNotificationV1; import org.apache.atlas.v1.model.notification.EntityNotificationV1;
...@@ -45,11 +43,11 @@ import java.util.*; ...@@ -45,11 +43,11 @@ import java.util.*;
public class NotificationEntityChangeListener implements EntityChangeListener { public class NotificationEntityChangeListener implements EntityChangeListener {
protected static final String ATLAS_ENTITY_NOTIFICATION_PROPERTY = "atlas.notification.entity"; protected static final String ATLAS_ENTITY_NOTIFICATION_PROPERTY = "atlas.notification.entity";
private final NotificationInterface notificationInterface; private final AtlasTypeRegistry typeRegistry;
private final AtlasTypeRegistry typeRegistry; private final Configuration configuration;
private final Map<String, List<String>> notificationAttributesCache = new HashMap<>(); private final EntityNotificationSender<EntityNotificationV1> notificationSender;
private final Map<String, List<String>> notificationAttributesCache = new HashMap<>();
private static Configuration APPLICATION_PROPERTIES = null;
...@@ -62,9 +60,11 @@ public class NotificationEntityChangeListener implements EntityChangeListener { ...@@ -62,9 +60,11 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
* @param typeRegistry the Atlas type system * @param typeRegistry the Atlas type system
*/ */
@Inject @Inject
public NotificationEntityChangeListener(NotificationInterface notificationInterface, AtlasTypeRegistry typeRegistry) { public NotificationEntityChangeListener(NotificationInterface notificationInterface, AtlasTypeRegistry typeRegistry, Configuration configuration) {
this.notificationInterface = notificationInterface; this.typeRegistry = typeRegistry;
this.typeRegistry = typeRegistry; this.configuration = configuration;
this.notificationSender = new EntityNotificationSender<>(notificationInterface, configuration);
} }
...@@ -184,20 +184,17 @@ public class NotificationEntityChangeListener implements EntityChangeListener { ...@@ -184,20 +184,17 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
} }
if (!messages.isEmpty()) { if (!messages.isEmpty()) {
notificationInterface.send(NotificationType.ENTITIES, messages); notificationSender.send(messages);
} }
} }
private List<String> getNotificationAttributes(String entityType) { private List<String> getNotificationAttributes(String entityType) {
List<String> ret = null; List<String> ret = null;
initApplicationProperties();
if (notificationAttributesCache.containsKey(entityType)) { if (notificationAttributesCache.containsKey(entityType)) {
ret = notificationAttributesCache.get(entityType); ret = notificationAttributesCache.get(entityType);
} else if (APPLICATION_PROPERTIES != null) { } else if (configuration != null) {
String[] notificationAttributes = APPLICATION_PROPERTIES.getStringArray(ATLAS_ENTITY_NOTIFICATION_PROPERTY + "." + String[] notificationAttributes = configuration.getStringArray(ATLAS_ENTITY_NOTIFICATION_PROPERTY + "." + entityType + "." + "attributes.include");
entityType + "." + "attributes.include");
if (notificationAttributes != null) { if (notificationAttributes != null) {
ret = Arrays.asList(notificationAttributes); ret = Arrays.asList(notificationAttributes);
...@@ -208,14 +205,4 @@ public class NotificationEntityChangeListener implements EntityChangeListener { ...@@ -208,14 +205,4 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
return ret; return ret;
} }
private void initApplicationProperties() {
if (APPLICATION_PROPERTIES == null) {
try {
APPLICATION_PROPERTIES = ApplicationProperties.get();
} catch (AtlasException ex) {
// ignore
}
}
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment