Commit fb9f1e96 by Saqeeb Shaikh Committed by Sarath Subramanian

ATLAS-3568: Performance improvements in writing audit logs

parent 343b7832
......@@ -74,6 +74,15 @@ public interface EntityChangeListenerV2 {
void onClassificationsAdded(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException;
/**
* This is upon adding new classifications to entities.
*
* @param entities list of entities
* @param classifications classifications that are to be added to entities
* @throws AtlasBaseException if the listener notification fails
*/
void onClassificationsAdded(List<AtlasEntity> entities, List<AtlasClassification> classifications) throws AtlasBaseException;
/**
* This is upon updating classifications to an entity.
*
* @param entity the entity
......@@ -92,6 +101,15 @@ public interface EntityChangeListenerV2 {
void onClassificationsDeleted(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException;
/**
* This is upon deleting classifications from entities.
*
* @param entities list of entities
* @param classifications classifications that needs to be deleted from entities
* @throws AtlasBaseException if the listener notification fails
*/
void onClassificationsDeleted(List<AtlasEntity> entities, List<AtlasClassification> classifications) throws AtlasBaseException;
/**
* This is upon adding a new term to an entity.
*
* @param term the term
......
......@@ -44,6 +44,7 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -172,6 +173,28 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
}
@Override
public void onClassificationsAdded(List<AtlasEntity> entities, List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications)) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEventV2> events = Collections.synchronizedList(new ArrayList<>());
for (AtlasClassification classification : classifications) {
for (AtlasEntity entity : entities) {
if (entity.getGuid().equals(classification.getEntityGuid())) {
events.add(createEvent(entity, CLASSIFICATION_ADD, "Added classification: " + AtlasType.toJson(classification)));
} else {
events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + AtlasType.toJson(classification)));
}
}
}
auditRepository.putEventsV2(events);
RequestContext.get().endMetricRecord(metric);
}
}
@Override
public void onClassificationsUpdated(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications)) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
......@@ -221,6 +244,28 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
}
@Override
public void onClassificationsDeleted(List<AtlasEntity> entities, List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications) && CollectionUtils.isNotEmpty(entities)) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEventV2> events = Collections.synchronizedList(new ArrayList<>());
for (AtlasClassification classification : classifications) {
for (AtlasEntity entity : entities) {
if (StringUtils.equals(entity.getGuid(), classification.getEntityGuid())) {
events.add(createEvent(entity, CLASSIFICATION_DELETE, "Deleted classification: " + classification.getTypeName()));
} else {
events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + classification.getTypeName()));
}
}
}
auditRepository.putEventsV2(events);
RequestContext.get().endMetricRecord(metric);
}
}
@Override
public void onTermAdded(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) throws AtlasBaseException {
if (term != null && CollectionUtils.isNotEmpty(entities)) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
......
......@@ -151,7 +151,7 @@ public class AtlasEntityChangeNotifier {
Referenceable entityRef = toReferenceable(entity.getGuid());
List<Struct> traits = toStruct(addedClassifications);
if (entity == null || CollectionUtils.isEmpty(traits)) {
if (entityRef == null || CollectionUtils.isEmpty(traits)) {
return;
}
......@@ -166,6 +166,41 @@ public class AtlasEntityChangeNotifier {
}
}
public void onClassificationsAddedToEntities(List<AtlasEntity> entities, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
if (isV2EntityNotificationEnabled) {
doFullTextMappingHelper(entities);
for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
listener.onClassificationsAdded(entities, addedClassifications);
}
} else {
updateFullTextMapping(entities, addedClassifications);
if (instanceConverter != null) {
List<Struct> traits = toStruct(addedClassifications);
if (!CollectionUtils.isEmpty(traits)) {
for(AtlasEntity entity : entities) {
Referenceable entityRef = toReferenceable(entity.getGuid());
if (entityRef == null) {
LOG.warn("EntityRef with guid {} not found while adding classifications {} ", entity.getGuid(), addedClassifications);
continue;
}
for (EntityChangeListener listener : entityChangeListeners) {
try {
listener.onTraitsAdded(entityRef, traits);
} catch (AtlasException e) {
throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitAdd");
}
}
}
}
}
}
}
public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException {
doFullTextMapping(entity.getGuid());
......@@ -220,6 +255,39 @@ public class AtlasEntityChangeNotifier {
}
}
public void onClassificationsDeletedFromEntities(List<AtlasEntity> entities, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
doFullTextMappingHelper(entities);
if (isV2EntityNotificationEnabled) {
for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
listener.onClassificationsDeleted(entities, deletedClassifications);
}
} else {
if (instanceConverter != null) {
List<Struct> traits = toStruct(deletedClassifications);
if(!CollectionUtils.isEmpty(deletedClassifications)) {
for(AtlasEntity entity : entities) {
Referenceable entityRef = toReferenceable(entity.getGuid());
if (entityRef == null) {
LOG.warn("EntityRef with guid {} not found while deleting classifications {} ", entity.getGuid(), deletedClassifications);
continue;
}
for (EntityChangeListener listener : entityChangeListeners) {
try {
listener.onTraitsDeleted(entityRef, traits);
} catch (AtlasException e) {
throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitDelete");
}
}
}
}
}
}
}
public void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
// listeners notified on term-entity association only if v2 notifications are enabled
if (isV2EntityNotificationEnabled) {
......@@ -575,6 +643,12 @@ public class AtlasEntityChangeNotifier {
RequestContext.get().endMetricRecord(metric);
}
private void updateFullTextMapping(List<AtlasEntity> entities, List<AtlasClassification> classifications) {
for (AtlasEntity entity : entities) {
updateFullTextMapping(entity.getGuid(), classifications);
}
}
private void doFullTextMapping(String guid) {
if(AtlasRepositoryConfiguration.isFreeTextSearchEnabled() || !AtlasRepositoryConfiguration.isFullTextSearchEnabled()) {
return;
......@@ -586,6 +660,12 @@ public class AtlasEntityChangeNotifier {
doFullTextMapping(Collections.singletonList(entityHeader));
}
private void doFullTextMappingHelper(List<AtlasEntity> entities) {
for (AtlasEntity entity : entities) {
doFullTextMapping(entity.getGuid());
}
}
private void pruneResponse(EntityMutationResponse resp) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> pruneResponse()");
......
......@@ -631,6 +631,12 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
LOG.debug("Updating classifications={} for entity={}", classifications, guid);
}
AtlasPerfTracer perf = null;
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
AtlasPerfTracer.getPerfTracer(PERF_LOG, "AtlasEntityStoreV2.updateClassification()");
}
if (StringUtils.isEmpty(guid)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid not specified");
}
......@@ -663,6 +669,8 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
}
entityGraphMapper.updateClassifications(context, guid, classifications);
AtlasPerfTracer.log(perf);
}
@Override
......
......@@ -62,6 +62,7 @@ import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.utils.AtlasEntityUtil;
import org.apache.atlas.utils.AtlasJson;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
......@@ -112,6 +113,7 @@ import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelation
@Component
public class EntityGraphMapper {
private static final Logger LOG = LoggerFactory.getLogger(EntityGraphMapper.class);
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("entityGraphMapper");
private static final String SOFT_REF_FORMAT = "%s:%s";
private static final int INDEXED_STR_SAFE_LEN = AtlasConfiguration.GRAPHSTORE_INDEXED_STRING_SAFE_LENGTH.getInt();
......@@ -1693,7 +1695,7 @@ public class EntityGraphMapper {
final AtlasVertex entityVertex = context.getVertex(guid);
final AtlasEntityType entityType = context.getType(guid);
List<AtlasVertex> entitiesToPropagateTo = null;
Map<AtlasVertex, List<AtlasClassification>> propagations = null;
Map<AtlasClassification, HashSet<AtlasVertex>> addedClassifications = new HashMap<>();
List<AtlasClassification> addClassifications = new ArrayList<>(classifications.size());
for (AtlasClassification c : classifications) {
......@@ -1761,23 +1763,17 @@ public class EntityGraphMapper {
}
if (CollectionUtils.isNotEmpty(entitiesToPropagateTo)) {
if (propagations == null) {
propagations = new HashMap<>(entitiesToPropagateTo.size());
for (AtlasVertex entityToPropagateTo : entitiesToPropagateTo) {
propagations.put(entityToPropagateTo, new ArrayList<>());
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Propagating tag: [{}][{}] to {}", classificationName, entityType.getTypeName(), getTypeNames(entitiesToPropagateTo));
}
List<AtlasVertex> entitiesPropagatedTo = deleteDelegate.getHandler().addTagPropagation(classificationVertex, entitiesToPropagateTo);
if (entitiesPropagatedTo != null) {
for (AtlasVertex entityPropagatedTo : entitiesPropagatedTo) {
propagations.get(entityPropagatedTo).add(classification);
if (CollectionUtils.isNotEmpty(entitiesPropagatedTo)) {
if(addedClassifications.get(classification) == null) {
addedClassifications.put(classification, new HashSet<>(entitiesPropagatedTo));
} else {
addedClassifications.get(classification).addAll(entitiesPropagatedTo);
}
}
} else {
......@@ -1801,15 +1797,11 @@ public class EntityGraphMapper {
notificationVertices.addAll(entitiesToPropagateTo);
}
for (AtlasVertex vertex : notificationVertices) {
String entityGuid = GraphHelper.getGuid(vertex);
AtlasEntity entity = instanceConverter.getAndCacheEntity(entityGuid);
List<AtlasClassification> addedClassifications = StringUtils.equals(entityGuid, guid) ? addClassifications : propagations.get(vertex);
for (AtlasClassification classification : addedClassifications.keySet()) {
Set<AtlasVertex> vertices = addedClassifications.get(classification);
List<AtlasEntity> propagatedEntities = updateClassificationText(classification, vertices);
vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity));
if (CollectionUtils.isNotEmpty(addedClassifications)) {
entityChangeNotifier.onClassificationAddedToEntity(entity, addedClassifications);
}
entityChangeNotifier.onClassificationsAddedToEntities(propagatedEntities, classifications);
}
RequestContext.get().endMetricRecord(metric);
......@@ -1999,6 +1991,12 @@ public class EntityGraphMapper {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
}
AtlasPerfTracer perf = null;
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityGraphMapper.updateClassifications");
}
String entityTypeName = AtlasGraphUtilsV2.getTypeName(entityVertex);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName);
List<AtlasClassification> updatedClassifications = new ArrayList<>();
......@@ -2006,7 +2004,7 @@ public class EntityGraphMapper {
Set<AtlasVertex> notificationVertices = new HashSet<AtlasVertex>() {{ add(entityVertex); }};
Map<AtlasVertex, List<AtlasClassification>> addedPropagations = null;
Map<AtlasVertex, List<AtlasClassification>> removedPropagations = null;
Map<AtlasClassification, List<AtlasVertex>> removedPropagations = new HashMap<>();
for (AtlasClassification classification : classifications) {
String classificationName = classification.getTypeName();
......@@ -2116,21 +2114,17 @@ public class EntityGraphMapper {
List<AtlasVertex> impactedVertices = deleteDelegate.getHandler().removeTagPropagation(classificationVertex);
if (CollectionUtils.isNotEmpty(impactedVertices)) {
if (removedPropagations == null) {
removedPropagations = new HashMap<>();
for (AtlasVertex impactedVertex : impactedVertices) {
List<AtlasClassification> removedClassifications = removedPropagations.get(impactedVertex);
if (removedClassifications == null) {
removedClassifications = new ArrayList<>();
removedPropagations.put(impactedVertex, removedClassifications);
}
removedClassifications.add(classification);
}
}
/*
removedPropagations is a HashMap of entity against list of classifications i.e. for each entity 1 entry in the map.
Maintaining classification wise entity list lets us send the audit request in bulk,
since 1 classification is applied to many entities (including the child entities).
Eg. If a classification is being propagated to 1000 entities, its edge count would be 2000, as per removedPropagations map
we would have 2000 entries and value would always be 1 classification wrapped in a list.
By this rearrangement we maintain an entity list against each classification, as of now its entry size would be 1 (as per request from UI)
instead of 2000. Moreover this allows us to send audit request classification wise instead of separate requests for each entities.
This reduces audit calls from 2000 to 1.
*/
removedPropagations.put(classification, impactedVertices);
}
}
}
......@@ -2152,19 +2146,17 @@ public class EntityGraphMapper {
}
}
if (removedPropagations != null) {
for (Map.Entry<AtlasVertex, List<AtlasClassification>> entry : removedPropagations.entrySet()) {
AtlasVertex vertex = entry.getKey();
List<AtlasClassification> removedClassifications = entry.getValue();
String entityGuid = GraphHelper.getGuid(vertex);
AtlasEntity entity = instanceConverter.getAndCacheEntity(entityGuid);
if (MapUtils.isNotEmpty(removedPropagations)) {
for (AtlasClassification classification : removedPropagations.keySet()) {
List<AtlasVertex> propagatedVertices = removedPropagations.get(classification);
List<AtlasEntity> propagatedEntities = updateClassificationText(classification, propagatedVertices);
if (isActive(entity)) {
vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity));
entityChangeNotifier.onClassificationDeletedFromEntity(entity, removedClassifications);
}
//Sending audit request for all entities at once
entityChangeNotifier.onClassificationsDeletedFromEntities(propagatedEntities, Collections.singletonList(classification));
}
}
AtlasPerfTracer.log(perf);
}
private AtlasEdge mapClassification(EntityOperation operation, final EntityMutationContext context, AtlasClassification classification,
......@@ -2383,4 +2375,21 @@ public class EntityGraphMapper {
}
}
}
private List<AtlasEntity> updateClassificationText(AtlasClassification classification, Collection<AtlasVertex> propagatedVertices) throws AtlasBaseException {
List<AtlasEntity> propagatedEntities = new ArrayList<>();
if(CollectionUtils.isNotEmpty(propagatedVertices)) {
for(AtlasVertex vertex : propagatedVertices) {
AtlasEntity entity = instanceConverter.getAndCacheEntity(GraphHelper.getGuid(vertex));
if (isActive(entity)) {
vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity));
propagatedEntities.add(entity);
}
}
}
return propagatedEntities;
}
}
......@@ -107,6 +107,11 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
}
@Override
public void onClassificationsAdded(List<AtlasEntity> entities, List<AtlasClassification> classifications) throws AtlasBaseException {
notifyEntityEvents(entities, CLASSIFICATION_ADD);
}
@Override
public void onClassificationsUpdated(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
Map<String, List<AtlasClassification>> addedPropagations = RequestContext.get().getAddedPropagations();
Map<String, List<AtlasClassification>> removedPropagations = RequestContext.get().getRemovedPropagations();
......@@ -124,6 +129,11 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
}
@Override
public void onClassificationsDeleted(List<AtlasEntity> entities, List<AtlasClassification> classifications) throws AtlasBaseException {
notifyEntityEvents(entities, CLASSIFICATION_DELETE);
}
@Override
public void onTermAdded(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) {
// do nothing -> notification not sent out for term assignment to entities
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment