Commit 4a57f92f by Ashutosh Mestry

ATLAS-3878: Notifications: Improve Memory Usage for HBase Audits Writing

parent ba1b40f0
......@@ -47,6 +47,7 @@ public enum AtlasConfiguration {
NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled", true),
NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds", 15 * 60),
NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS("atlas.notification.split.message.buffer.purge.interval.seconds", 5 * 60),
NOTIFICATION_FIXED_BUFFER_ITEMS_INCREMENT_COUNT("atlas.notification.fixed.buffer.items.increment.count", 10),
NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF("atlas.notification.consumer.create.shell.entity.for.non-existing.ref", true),
REST_API_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF("atlas.rest.create.shell.entity.for.non-existing.ref", false),
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.model;
public interface Clearable {
void clear();
}
......@@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.atlas.model.Clearable;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.type.AtlasType;
......@@ -44,7 +45,7 @@ import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditType.EN
@JsonIgnoreProperties(ignoreUnknown=true)
@XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY)
public class EntityAuditEventV2 implements Serializable {
public class EntityAuditEventV2 implements Serializable, Clearable {
public enum EntityAuditType { ENTITY_AUDIT_V1, ENTITY_AUDIT_V2 }
public enum EntityAuditActionV2 {
......@@ -255,6 +256,19 @@ public class EntityAuditEventV2 implements Serializable {
return ret;
}
@JsonIgnore
@Override
public void clear() {
entityId = null;
timestamp = 0L;
user = null;
action = null;
details = null;
eventKey = null;
entity = null;
type = null;
}
private String getJsonPartFromDetails() {
String ret = null;
if(StringUtils.isNotEmpty(details)) {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.utils;
import org.apache.atlas.model.Clearable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
public class FixedBufferList<T extends Clearable> {
private static final Logger LOG = LoggerFactory.getLogger(FixedBufferList.class);
private final Class<T> itemClass;
private final ArrayList<T> buffer;
private final int incrementCapacityBy;
private int length;
public FixedBufferList(Class<T> clazz) {
this(clazz, 1);
}
public FixedBufferList(Class<T> clazz, int incrementCapacityBy) {
this.incrementCapacityBy = (incrementCapacityBy <= 0 ? 1 : incrementCapacityBy);
this.itemClass = clazz;
this.buffer = new ArrayList<>();
}
public T next() {
request(length + 1);
return buffer.get(length++);
}
public List<T> toList() {
return this.buffer.subList(0, this.length);
}
public void reset() {
for (int i = 0; i < buffer.size(); i++) {
buffer.get(i).clear();
}
this.length = 0;
}
private void request(int requestedCapacity) {
if (requestedCapacity <= this.buffer.size()) {
return;
}
int oldCapacity = this.buffer.size();
int newCapacity = oldCapacity + this.incrementCapacityBy;
this.buffer.ensureCapacity(newCapacity);
instantiateItems(oldCapacity, newCapacity);
LOG.info("FixedBufferList: Requested: {} From: {} To:{}", requestedCapacity, oldCapacity, newCapacity);
}
private void instantiateItems(int startIndex, int maxSize) {
for (int i = startIndex; i < maxSize; i++) {
try {
this.buffer.add(itemClass.newInstance());
} catch (InstantiationException e) {
LOG.error("FixedBufferList: InstantiationException: Instantiation failed!", e);
} catch (IllegalAccessException e) {
LOG.error("FixedBufferList: IllegalAccessException: Instantiation failed!", e);
}
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.utils;
import org.apache.atlas.model.Clearable;
import org.apache.commons.lang.StringUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
public class FixedBufferListTest {
private String STR_PREFIX = "str:%s";
public static class Spying implements Clearable {
public static AtomicInteger callsToCtor = new AtomicInteger();
public static AtomicInteger callsToClear = new AtomicInteger();
private int anInt;
private String aString;
private long aLong;
public Spying() {
callsToCtor.incrementAndGet();
}
@Override
public void clear() {
callsToClear.incrementAndGet();
anInt = 0;
aString = StringUtils.EMPTY;
aLong = 0;
}
public static void resetCounters() {
Spying.callsToCtor.set(0);
Spying.callsToClear.set(0);
}
}
private static class SpyingFixedBufferList extends FixedBufferList<Spying> {
public SpyingFixedBufferList(int incrementCapacityFactor) {
super(Spying.class, incrementCapacityFactor);
}
}
@Test
public void instantiateListWithParameterizedClass() {
FixedBufferList<Spying> list = new FixedBufferList<>(Spying.class);
assertNotNull(list);
}
@Test
public void createdBasedOnInitialSize() {
Spying.resetCounters();
int incrementByFactor = 2;
SpyingFixedBufferList fixedBufferList = new SpyingFixedBufferList(incrementByFactor);
addElements(fixedBufferList, 0, 3);
List<Spying> list = fixedBufferList.toList();
assertSpyingList(list, 3);
assertEquals(Spying.callsToCtor.get(), incrementByFactor * 2);
}
@Test (dependsOnMethods = "createdBasedOnInitialSize")
public void bufferIncreasesIfNeeded() {
Spying.resetCounters();
int incrementSizeBy = 5;
SpyingFixedBufferList fixedBufferList = new SpyingFixedBufferList(incrementSizeBy);
addElements(fixedBufferList, 0, incrementSizeBy);
List<Spying> spyings = fixedBufferList.toList();
assertEquals(spyings.size(), incrementSizeBy);
assertEquals(Spying.callsToCtor.get(), incrementSizeBy);
fixedBufferList.reset();
addElements(fixedBufferList, 0, incrementSizeBy * 2);
spyings = fixedBufferList.toList();
assertEquals(Spying.callsToCtor.get(), incrementSizeBy * 2);
assertSpyingList(spyings, incrementSizeBy * 2);
assertEquals(Spying.callsToClear.get(), incrementSizeBy);
}
@Test
public void retrieveEmptyList() {
int size = 5;
SpyingFixedBufferList fixedBufferList = new SpyingFixedBufferList(size);
List<Spying> list = fixedBufferList.toList();
assertEquals(list.size(), 0);
addElements(fixedBufferList, 0, 3);
list = fixedBufferList.toList();
assertEquals(list.size(), 3);
}
private void assertSpyingList(List<Spying> list, int expectedSize) {
assertEquals(list.size(), expectedSize);
for (int i1 = 0; i1 < list.size(); i1++) {
Assert.assertNotNull(list.get(i1));
assertSpying(list.get(i1), i1);
}
}
private void assertSpying(Spying spying, int i) {
assertEquals(spying.aLong, i);
assertEquals(spying.anInt, i);
assertEquals(spying.aString, String.format(STR_PREFIX, i));
}
private Spying createSpyingClass(Spying spying, int i) {
spying.aLong = i;
spying.anInt = i;
spying.aString = String.format(STR_PREFIX, i);
return spying;
}
private void addElements(SpyingFixedBufferList fixedBufferList, int startIndex, int numElements) {
for (int i = startIndex; i < (startIndex + numElements); i++) {
Spying spyForUpdate = fixedBufferList.next();
createSpyingClass(spyForUpdate, i);
}
}
}
......@@ -17,6 +17,7 @@
*/
package org.apache.atlas.repository.audit;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.EntityAuditEvent.EntityAuditAction;
import org.apache.atlas.RequestContext;
import org.apache.atlas.model.audit.EntityAuditEventV2;
......@@ -36,6 +37,7 @@ import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasJson;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.atlas.utils.FixedBufferList;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
......@@ -45,8 +47,6 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -74,6 +74,9 @@ import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV
@Component
public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
private static final Logger LOG = LoggerFactory.getLogger(EntityAuditListenerV2.class);
private static final ThreadLocal<FixedBufferList<EntityAuditEventV2>> AUDIT_EVENTS_BUFFER =
ThreadLocal.withInitial(() -> new FixedBufferList<>(EntityAuditEventV2.class,
AtlasConfiguration.NOTIFICATION_FIXED_BUFFER_ITEMS_INCREMENT_COUNT.getInt()));
private final EntityAuditRepository auditRepository;
private final AtlasTypeRegistry typeRegistry;
......@@ -90,15 +93,12 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
public void onEntitiesAdded(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEventV2> events = new ArrayList<>();
FixedBufferList<EntityAuditEventV2> entitiesAdded = getAuditEventsList();
for (AtlasEntity entity : entities) {
EntityAuditEventV2 event = createEvent(entity, isImport ? ENTITY_IMPORT_CREATE : ENTITY_CREATE);
events.add(event);
createEvent(entitiesAdded.next(), entity, isImport ? ENTITY_IMPORT_CREATE : ENTITY_CREATE);
}
auditRepository.putEventsV2(events);
auditRepository.putEventsV2(entitiesAdded.toList());
RequestContext.get().endMetricRecord(metric);
}
......@@ -107,15 +107,12 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
public void onEntitiesUpdated(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEventV2> events = new ArrayList<>();
FixedBufferList<EntityAuditEventV2> updatedEvents = getAuditEventsList();
for (AtlasEntity entity : entities) {
EntityAuditEventV2 event = createEvent(entity, isImport ? ENTITY_IMPORT_UPDATE : ENTITY_UPDATE);
events.add(event);
createEvent(updatedEvents.next(), entity, isImport ? ENTITY_IMPORT_UPDATE : ENTITY_UPDATE);
}
auditRepository.putEventsV2(events);
auditRepository.putEventsV2(updatedEvents.toList());
RequestContext.get().endMetricRecord(metric);
}
......@@ -124,15 +121,12 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
public void onEntitiesDeleted(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEventV2> events = new ArrayList<>();
FixedBufferList<EntityAuditEventV2> deletedEntities = getAuditEventsList();
for (AtlasEntity entity : entities) {
EntityAuditEventV2 event = createEvent(entity, isImport ? ENTITY_IMPORT_DELETE : ENTITY_DELETE, "Deleted entity");
events.add(event);
createEvent(deletedEntities.next(), entity, isImport ? ENTITY_IMPORT_DELETE : ENTITY_DELETE, "Deleted entity");
}
auditRepository.putEventsV2(events);
auditRepository.putEventsV2(deletedEntities.toList());
RequestContext.get().endMetricRecord(metric);
}
......@@ -141,14 +135,12 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
public void onEntitiesPurged(List<AtlasEntity> entities) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEventV2> events = new ArrayList<>();
FixedBufferList<EntityAuditEventV2> eventsPurged = getAuditEventsList();
for (AtlasEntity entity : entities) {
EntityAuditEventV2 event = createEvent(entity, ENTITY_PURGE);
events.add(event);
createEvent(eventsPurged.next(), entity, ENTITY_PURGE);
}
auditRepository.putEventsV2(events);
auditRepository.putEventsV2(eventsPurged.toList());
RequestContext.get().endMetricRecord(metric);
}
......@@ -158,17 +150,16 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
if (CollectionUtils.isNotEmpty(classifications)) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEventV2> events = new ArrayList<>();
FixedBufferList<EntityAuditEventV2> classificationsAdded = getAuditEventsList();
for (AtlasClassification classification : classifications) {
if (entity.getGuid().equals(classification.getEntityGuid())) {
events.add(createEvent(entity, CLASSIFICATION_ADD, "Added classification: " + AtlasType.toJson(classification)));
createEvent(classificationsAdded.next(), entity, CLASSIFICATION_ADD, "Added classification: " + AtlasType.toJson(classification));
} else {
events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + AtlasType.toJson(classification)));
createEvent(classificationsAdded.next(), entity, PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + AtlasType.toJson(classification));
}
}
auditRepository.putEventsV2(events);
auditRepository.putEventsV2(classificationsAdded.toList());
RequestContext.get().endMetricRecord(metric);
}
......@@ -177,20 +168,20 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
@Override
public void onClassificationsAdded(List<AtlasEntity> entities, List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications)) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEventV2> events = Collections.synchronizedList(new ArrayList<>());
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
for (AtlasClassification classification : classifications) {
for (AtlasEntity entity : entities) {
if (entity.getGuid().equals(classification.getEntityGuid())) {
events.add(createEvent(entity, CLASSIFICATION_ADD, "Added classification: " + AtlasType.toJson(classification)));
createEvent(events.next(), entity, CLASSIFICATION_ADD, "Added classification: " + AtlasType.toJson(classification));
} else {
events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + AtlasType.toJson(classification)));
createEvent(events.next(), entity, PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + AtlasType.toJson(classification));
}
}
}
auditRepository.putEventsV2(events);
auditRepository.putEventsV2(events.toList());
RequestContext.get().endMetricRecord(metric);
}
......@@ -201,24 +192,24 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
if (CollectionUtils.isNotEmpty(classifications)) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEventV2> events = new ArrayList<>();
String guid = entity.getGuid();
FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
String guid = entity.getGuid();
for (AtlasClassification classification : classifications) {
if (guid.equals(classification.getEntityGuid())) {
events.add(createEvent(entity, CLASSIFICATION_UPDATE, "Updated classification: " + AtlasType.toJson(classification)));
createEvent(events.next(), entity, CLASSIFICATION_UPDATE, "Updated classification: " + AtlasType.toJson(classification));
} else {
if (isPropagatedClassificationAdded(guid, classification)) {
events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + AtlasType.toJson(classification)));
createEvent(events.next(), entity, PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + AtlasType.toJson(classification));
} else if (isPropagatedClassificationDeleted(guid, classification)) {
events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + classification.getTypeName()));
createEvent(events.next(), entity, PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + classification.getTypeName());
} else {
events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_UPDATE, "Updated propagated classification: " + AtlasType.toJson(classification)));
createEvent(events.next(), entity, PROPAGATED_CLASSIFICATION_UPDATE, "Updated propagated classification: " + AtlasType.toJson(classification));
}
}
}
auditRepository.putEventsV2(events);
auditRepository.putEventsV2(events.toList());
RequestContext.get().endMetricRecord(metric);
}
......@@ -227,19 +218,19 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
@Override
public void onClassificationsDeleted(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications)) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
MetricRecorder metric = RequestContext.get().startMetricRecord("onClassificationsDeleted");
List<EntityAuditEventV2> events = new ArrayList<>();
FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
for (AtlasClassification classification : classifications) {
if (StringUtils.equals(entity.getGuid(), classification.getEntityGuid())) {
events.add(createEvent(entity, CLASSIFICATION_DELETE, "Deleted classification: " + classification.getTypeName()));
createEvent(events.next(), entity, CLASSIFICATION_DELETE, "Deleted classification: " + classification.getTypeName());
} else {
events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + classification.getTypeName()));
createEvent(events.next(), entity, PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + classification.getTypeName());
}
}
auditRepository.putEventsV2(events);
auditRepository.putEventsV2(events.toList());
RequestContext.get().endMetricRecord(metric);
}
......@@ -248,20 +239,20 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
@Override
public void onClassificationsDeleted(List<AtlasEntity> entities, List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications) && CollectionUtils.isNotEmpty(entities)) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEventV2> events = Collections.synchronizedList(new ArrayList<>());
MetricRecorder metric = RequestContext.get().startMetricRecord("onClassificationsDeleted");
FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
for (AtlasClassification classification : classifications) {
for (AtlasEntity entity : entities) {
if (StringUtils.equals(entity.getGuid(), classification.getEntityGuid())) {
events.add(createEvent(entity, CLASSIFICATION_DELETE, "Deleted classification: " + classification.getTypeName()));
createEvent(events.next(), entity, CLASSIFICATION_DELETE, "Deleted classification: " + classification.getTypeName());
} else {
events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + classification.getTypeName()));
createEvent(events.next(), entity, PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + classification.getTypeName());
}
}
}
auditRepository.putEventsV2(events);
auditRepository.putEventsV2(events.toList());
RequestContext.get().endMetricRecord(metric);
}
......@@ -270,19 +261,19 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
@Override
public void onTermAdded(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) throws AtlasBaseException {
if (term != null && CollectionUtils.isNotEmpty(entities)) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
MetricRecorder metric = RequestContext.get().startMetricRecord("onTermAdded");
List<EntityAuditEventV2> events = new ArrayList<>();
FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
for (AtlasRelatedObjectId relatedObjectId : entities) {
AtlasEntity entity = instanceConverter.getAndCacheEntity(relatedObjectId.getGuid());
if (entity != null) {
events.add(createEvent(entity, TERM_ADD, "Added term: " + term.toAuditString()));
createEvent(events.next(), entity, TERM_ADD, "Added term: " + term.toAuditString());
}
}
auditRepository.putEventsV2(events);
auditRepository.putEventsV2(events.toList());
RequestContext.get().endMetricRecord(metric);
}
......@@ -291,19 +282,19 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
@Override
public void onTermDeleted(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) throws AtlasBaseException {
if (term != null && CollectionUtils.isNotEmpty(entities)) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
MetricRecorder metric = RequestContext.get().startMetricRecord("onTermDeleted");
List<EntityAuditEventV2> events = new ArrayList<>();
FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
for (AtlasRelatedObjectId relatedObjectId : entities) {
AtlasEntity entity = instanceConverter.getAndCacheEntity(relatedObjectId.getGuid());
if (entity != null) {
events.add(createEvent(entity, TERM_DELETE, "Deleted term: " + term.toAuditString()));
createEvent(events.next(), entity, TERM_DELETE, "Deleted term: " + term.toAuditString());
}
}
auditRepository.putEventsV2(events);
auditRepository.putEventsV2(events.toList());
RequestContext.get().endMetricRecord(metric);
}
......@@ -312,15 +303,15 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
@Override
public void onLabelsAdded(AtlasEntity entity, Set<String> labels) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(labels)) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
MetricRecorder metric = RequestContext.get().startMetricRecord("onLabelsAdded");
List<EntityAuditEventV2> events = new ArrayList<>();
FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
String addedLabels = StringUtils.join(labels, " ");
events.add(createEvent(entity, LABEL_ADD, "Added labels: " + addedLabels));
createEvent(events.next(), entity, LABEL_ADD, "Added labels: " + addedLabels);
auditRepository.putEventsV2(events);
auditRepository.putEventsV2(events.toList());
RequestContext.get().endMetricRecord(metric);
}
......@@ -329,15 +320,15 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
@Override
public void onLabelsDeleted(AtlasEntity entity, Set<String> labels) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(labels)) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
MetricRecorder metric = RequestContext.get().startMetricRecord("onLabelsDeleted");
List<EntityAuditEventV2> events = new ArrayList<>();
FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
String deletedLabels = StringUtils.join(labels, " ");
events.add(createEvent(entity, LABEL_DELETE, "Deleted labels: " + deletedLabels));
createEvent(events.next(), entity, LABEL_DELETE, "Deleted labels: " + deletedLabels);
auditRepository.putEventsV2(events);
auditRepository.putEventsV2(events.toList());
RequestContext.get().endMetricRecord(metric);
}
......@@ -376,33 +367,38 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
if (MapUtils.isNotEmpty(updatedBusinessAttributes)) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEventV2> auditEvents = new ArrayList<>();
FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
for (Map.Entry<String, Map<String, Object>> entry : updatedBusinessAttributes.entrySet()) {
String bmName = entry.getKey();
Map<String, Object> attributes = entry.getValue();
String details = AtlasJson.toJson(new AtlasStruct(bmName, attributes));
EntityAuditEventV2 auditEvent = createEvent(entity, BUSINESS_ATTRIBUTE_UPDATE, "Updated business attributes: " + details);
auditEvents.add(auditEvent);
createEvent(events.next(), entity, BUSINESS_ATTRIBUTE_UPDATE, "Updated business attributes: " + details);
}
auditRepository.putEventsV2(auditEvents);
auditRepository.putEventsV2(events.toList());
RequestContext.get().endMetricRecord(metric);
}
}
private EntityAuditEventV2 createEvent(AtlasEntity entity, EntityAuditActionV2 action, String details) {
return new EntityAuditEventV2(entity.getGuid(), RequestContext.get().getRequestTime(),
RequestContext.get().getUser(), action, details, entity);
private EntityAuditEventV2 createEvent(EntityAuditEventV2 entityAuditEventV2, AtlasEntity entity, EntityAuditActionV2 action, String details) {
entityAuditEventV2.setEntityId(entity.getGuid());
entityAuditEventV2.setTimestamp(RequestContext.get().getRequestTime());
entityAuditEventV2.setUser(RequestContext.get().getUser());
entityAuditEventV2.setAction(action);
entityAuditEventV2.setDetails(details);
entityAuditEventV2.setEntity(entity);
return entityAuditEventV2;
}
private EntityAuditEventV2 createEvent(AtlasEntity entity, EntityAuditActionV2 action) {
private EntityAuditEventV2 createEvent(EntityAuditEventV2 event, AtlasEntity entity, EntityAuditActionV2 action) {
String detail = getAuditEventDetail(entity, action);
return createEvent(entity, action, detail);
return createEvent(event, entity, action, detail);
}
private String getAuditEventDetail(AtlasEntity entity, EntityAuditActionV2 action) {
......@@ -619,4 +615,11 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
return ret;
}
private FixedBufferList<EntityAuditEventV2> getAuditEventsList() {
FixedBufferList<EntityAuditEventV2> ret = AUDIT_EVENTS_BUFFER.get();
ret.reset();
return ret;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment