Commit 9a8c7125 by ashutoshm Committed by Madhan Neethiraj

ATLAS-2192: notification consumer updates to handle stale split messages

parent 4fe70de8
......@@ -35,6 +35,8 @@ public enum AtlasConfiguration {
NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES("atlas.notification.message.max.length.bytes", (1000 * 1000)),
NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled", true),
NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds", 15 * 60),
NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS("atlas.notification.split.message.buffer.purge.interval.seconds", 5 * 60),
//search configuration
SEARCH_MAX_LIMIT("atlas.search.maxlimit", 10000),
......
......@@ -85,6 +85,7 @@ public abstract class AtlasHook {
notificationRetryInterval = atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);
notificationInterface = NotificationProvider.get();
notificationInterface.setCurrentUser(getUser());
LOG.info("Created Atlas Hook");
}
......
......@@ -31,12 +31,14 @@ import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.codehaus.jettison.json.JSONArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Type;
import java.net.Inet4Address;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
......@@ -64,6 +66,16 @@ public abstract class AbstractNotification implements NotificationInterface {
public static final int MAX_BYTES_PER_CHAR = 4; // each char can encode upto 4 bytes in UTF-8
/**
* IP address of the host in which this process has started
*/
private static String localHostAddress = "";
/**
*
*/
private static String currentUser = "";
private final boolean embedded;
private final boolean isHAEnabled;
......@@ -107,6 +119,11 @@ public abstract class AbstractNotification implements NotificationInterface {
send(type, Arrays.asList(messages));
}
@Override
public void setCurrentUser(String user) {
currentUser = user;
}
// ----- AbstractNotification --------------------------------------------
/**
......@@ -146,6 +163,24 @@ public abstract class AbstractNotification implements NotificationInterface {
return GSON.toJson(notificationMsg);
}
private static String getHostAddress() {
if (StringUtils.isEmpty(localHostAddress)) {
try {
localHostAddress = Inet4Address.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
LOG.warn("failed to get local host address", e);
localHostAddress = "";
}
}
return localHostAddress;
}
private static String getCurrentUser() {
return currentUser;
}
/**
* Get the notification message JSON from the given object.
*
......@@ -154,7 +189,7 @@ public abstract class AbstractNotification implements NotificationInterface {
* @return the message as a JSON string
*/
public static void createNotificationMessages(Object message, List<String> msgJsonList) {
AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message);
AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message, getHostAddress(), getCurrentUser());
String msgJson = GSON.toJson(notificationMsg);
boolean msgLengthExceedsLimit = (msgJson.length() * MAX_BYTES_PER_CHAR) > MESSAGE_MAX_LENGTH_BYTES;
......
......@@ -18,10 +18,16 @@
package org.apache.atlas.notification;
import org.joda.time.DateTimeZone;
import org.joda.time.Instant;
/**
* Represents a notification message that is associated with a version.
*/
public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
private String msgSourceIP;
private String msgCreatedBy;
private long msgCreationTime;
/**
* The actual message.
......@@ -38,12 +44,43 @@ public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
* @param message the actual message
*/
public AtlasNotificationMessage(MessageVersion version, T message) {
this(version, message, null, null);
}
public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy) {
super(version);
this.msgSourceIP = msgSourceIP;
this.msgCreatedBy = createdBy;
this.msgCreationTime = Instant.now().toDateTime(DateTimeZone.UTC).getMillis();
this.message = message;
}
public String getMsgSourceIP() {
return msgSourceIP;
}
public void setMsgSourceIP(String msgSourceIP) {
this.msgSourceIP = msgSourceIP;
}
public String getMsgCreatedBy() {
return msgCreatedBy;
}
public void setMsgCreatedBy(String msgCreatedBy) {
this.msgCreatedBy = msgCreatedBy;
}
public long getMsgCreationTime() {
return msgCreationTime;
}
public void setMsgCreationTime(long msgCreationTime) {
this.msgCreationTime = msgCreationTime;
}
public T getMessage() {
return message;
}
......
......@@ -18,6 +18,7 @@
package org.apache.atlas.notification;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind;
import org.apache.commons.lang3.StringUtils;
......@@ -26,8 +27,14 @@ import org.slf4j.LoggerFactory;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.atlas.AtlasConfiguration.NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS;
import static org.apache.atlas.AtlasConfiguration.NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS;
/**
* Deserializer that works with notification messages. The version of each deserialized message is checked against an
......@@ -47,8 +54,12 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
private final Gson gson;
private final Map<String, AtlasNotificationStringMessage[]> splitMsgBuffer = new HashMap<>();
private final Map<String, SplitMessageAggregator> splitMsgBuffer = new HashMap<>();
private final long splitMessageBufferPurgeIntervalMs;
private final long splitMessageSegmentsWaitTimeMs;
private long splitMessagesLastPurgeTime = System.currentTimeMillis();
private final AtomicLong messageCountTotal = new AtomicLong(0);
private final AtomicLong messageCountSinceLastInterval = new AtomicLong(0);
// ----- Constructors ----------------------------------------------------
/**
......@@ -61,11 +72,22 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
*/
public AtlasNotificationMessageDeserializer(Type notificationMessageType, MessageVersion expectedVersion,
Gson gson, Logger notificationLogger) {
this(notificationMessageType, expectedVersion, gson, notificationLogger,
NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS.getLong() * 1000,
NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS.getLong() * 1000);
}
public AtlasNotificationMessageDeserializer(Type notificationMessageType, MessageVersion expectedVersion,
Gson gson, Logger notificationLogger,
long splitMessageSegmentsWaitTimeMs,
long splitMessageBufferPurgeIntervalMs) {
this.notificationMessageType = notificationMessageType;
this.messageType = ((ParameterizedType) notificationMessageType).getActualTypeArguments()[0];
this.expectedVersion = expectedVersion;
this.gson = gson;
this.notificationLogger = notificationLogger;
this.splitMessageSegmentsWaitTimeMs = splitMessageSegmentsWaitTimeMs;
this.splitMessageBufferPurgeIntervalMs = splitMessageBufferPurgeIntervalMs;
}
// ----- MessageDeserializer ---------------------------------------------
......@@ -74,6 +96,9 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
public T deserialize(String messageJson) {
final T ret;
messageCountTotal.incrementAndGet();
messageCountSinceLastInterval.incrementAndGet();
AtlasNotificationBaseMessage msg = gson.fromJson(messageJson, AtlasNotificationBaseMessage.class);
if (msg.getVersion() == null) { // older style messages not wrapped with AtlasNotificationMessage
......@@ -96,12 +121,12 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
final int splitIdx = splitMsg.getMsgSplitIdx();
final int splitCount = splitMsg.getMsgSplitCount();
final AtlasNotificationStringMessage[] splitMsgs;
final SplitMessageAggregator splitMsgs;
if (splitIdx == 0) {
splitMsgs = new AtlasNotificationStringMessage[splitCount];
splitMsgs = new SplitMessageAggregator(splitMsg);
splitMsgBuffer.put(msgId, splitMsgs);
splitMsgBuffer.put(splitMsgs.getMsgId(), splitMsgs);
} else {
splitMsgs = splitMsgBuffer.get(msgId);
}
......@@ -110,24 +135,24 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
LOG.error("Received msgID={}: {} of {}, but first message didn't arrive. Ignoring message", msgId, splitIdx + 1, splitCount);
msg = null;
} else if (splitMsgs.length <= splitIdx) {
} else if (splitMsgs.getTotalSplitCount() <= splitIdx) {
LOG.error("Received msgID={}: {} of {} - out of bounds. Ignoring message", msgId, splitIdx + 1, splitCount);
msg = null;
} else {
LOG.info("Received msgID={}: {} of {}", msgId, splitIdx + 1, splitCount);
splitMsgs[splitIdx] = splitMsg;
boolean isReady = splitMsgs.add(splitMsg);
if (splitIdx == (splitCount - 1)) { // last message
if (isReady) { // last message
splitMsgBuffer.remove(msgId);
boolean isValidMessage = true;
StringBuilder sb = new StringBuilder();
for (int i = 0; i < splitMsgs.length; i++) {
splitMsg = splitMsgs[i];
for (int i = 0; i < splitMsgs.getTotalSplitCount(); i++) {
splitMsg = splitMsgs.get(i);
if (splitMsg == null) {
LOG.warn("MsgID={}: message {} of {} is missing. Ignoring message", msgId, i + 1, splitCount);
......@@ -192,9 +217,55 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
}
}
long now = System.currentTimeMillis();
long timeSinceLastPurge = now - splitMessagesLastPurgeTime;
if(timeSinceLastPurge >= splitMessageBufferPurgeIntervalMs) {
purgeStaleMessages(splitMsgBuffer, now, splitMessageSegmentsWaitTimeMs);
LOG.info("Notification processing stats: total={}, sinceLastStatsReport={}", messageCountTotal.get(), messageCountSinceLastInterval.getAndSet(0));
splitMessagesLastPurgeTime = now;
}
return ret;
}
@VisibleForTesting
static void purgeStaleMessages(Map<String, SplitMessageAggregator> splitMsgBuffer, long now, long maxWaitTime) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> purgeStaleMessages(bufferedMessageCount=" + splitMsgBuffer.size() + ")");
}
List<SplitMessageAggregator> evictionList = null;
for (SplitMessageAggregator aggregrator : splitMsgBuffer.values()) {
long waitTime = now - aggregrator.getFirstSplitTimestamp();
if (waitTime < maxWaitTime) {
continue;
}
if(evictionList == null) {
evictionList = new ArrayList<>();
}
evictionList.add(aggregrator);
}
if(evictionList != null) {
for (SplitMessageAggregator aggregrator : evictionList) {
LOG.error("evicting notification msgID={}, totalSplitCount={}, receivedSplitCount={}", aggregrator.getMsgId(), aggregrator.getTotalSplitCount(), aggregrator.getReceivedSplitCount());
splitMsgBuffer.remove(aggregrator.getMsgId());
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== purgeStaleMessages(bufferedMessageCount=" + splitMsgBuffer.size() + ")");
}
}
// ----- helper methods --------------------------------------------------
/**
......
......@@ -38,6 +38,12 @@ public class AtlasNotificationStringMessage extends AtlasNotificationBaseMessage
this.message = message;
}
public AtlasNotificationStringMessage(String message, String msgId, CompressionKind compressionKind, int msgSplitIdx, int msgSplitCount) {
super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind, msgSplitIdx, msgSplitCount);
this.message = message;
}
public AtlasNotificationStringMessage(byte[] encodedBytes, String msgId, CompressionKind compressionKind) {
super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind);
......
......@@ -99,6 +99,12 @@ public interface NotificationInterface {
}
/**
*
* @param user Name of the user under which the processes is running
*/
void setCurrentUser(String user);
/**
* Create notification consumers for the given notification type.
*
* @param notificationType the notification type (i.e. HOOK, ENTITIES)
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
public class SplitMessageAggregator {
private final String msgId;
private final AtlasNotificationStringMessage[] splitMessagesBuffer;
private final long firstSplitTimestamp;
public SplitMessageAggregator(AtlasNotificationStringMessage message) {
msgId = message.getMsgId();
splitMessagesBuffer = new AtlasNotificationStringMessage[message.getMsgSplitCount()];
firstSplitTimestamp = System.currentTimeMillis();
add(message);
}
public String getMsgId() {
return msgId;
}
public long getTotalSplitCount() {
return splitMessagesBuffer.length;
}
public long getReceivedSplitCount() {
long ret = 0;
for (AtlasNotificationStringMessage split : splitMessagesBuffer) {
if (split != null) {
ret++;
}
}
return ret;
}
public long getFirstSplitTimestamp() {
return firstSplitTimestamp;
}
public boolean add(AtlasNotificationStringMessage message) {
if (message.getMsgSplitIdx() < splitMessagesBuffer.length) {
splitMessagesBuffer[message.getMsgSplitIdx()] = message;
}
return message.getMsgSplitIdx() == (message.getMsgSplitCount() - 1);
}
public AtlasNotificationStringMessage get(int i) {
return splitMessagesBuffer[i];
}
}
......@@ -31,10 +31,8 @@ import java.util.List;
import java.util.Objects;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import org.apache.kafka.common.TopicPartition;
......
......@@ -26,6 +26,7 @@ import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.*;
......@@ -54,9 +55,9 @@ public class AbstractNotificationTest {
assertEquals(NotificationInterface.NotificationType.HOOK, notification.type);
assertEquals(3, notification.messages.size());
assertEquals(messageJson.get(0), notification.messages.get(0));
assertEquals(messageJson.get(1), notification.messages.get(1));
assertEquals(messageJson.get(2), notification.messages.get(2));
for (int i = 0; i < notification.messages.size(); i++) {
assertEqualsMessageJson(notification.messages.get(i), messageJson.get(i));
}
}
@Test
......@@ -81,9 +82,11 @@ public class AbstractNotificationTest {
notification.send(NotificationInterface.NotificationType.HOOK, messages);
assertEquals(NotificationInterface.NotificationType.HOOK, notification.type);
assertEquals(messageJson.size(), notification.messages.size());
assertEquals(messageJson, notification.messages);
assertEquals(notification.type, NotificationInterface.NotificationType.HOOK);
assertEquals(notification.messages.size(), messageJson.size());
for (int i = 0; i < notification.messages.size(); i++) {
assertEqualsMessageJson(notification.messages.get(i), messageJson.get(i));
}
}
public static class TestMessage extends HookNotification.HookNotificationMessage {
......@@ -93,6 +96,17 @@ public class AbstractNotificationTest {
}
}
// ignore msgCreationTime in Json
private void assertEqualsMessageJson(String msgJsonActual, String msgJsonExpected) {
Map<Object, Object> msgActual = AbstractNotification.GSON.fromJson(msgJsonActual, Map.class);
Map<Object, Object> msgExpected = AbstractNotification.GSON.fromJson(msgJsonExpected, Map.class);
msgActual.remove("msgCreationTime");
msgExpected.remove("msgCreationTime");
assertEquals(msgActual, msgExpected);
}
public static class TestNotification extends AbstractNotification {
private NotificationType type;
private List<String> messages;
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.HashMap;
import java.util.Map;
public class SplitMessageAggregatorTest {
@Test
public void verifyEviction() throws InterruptedException {
Map<String, SplitMessageAggregator> map = getStringSplitMessageAggregatorMap();
Thread.currentThread().sleep(500);
AtlasNotificationMessageDeserializer.purgeStaleMessages(map, System.currentTimeMillis(), 250);
Assert.assertEquals(map.size(), 0);
}
@Test
public void verifyEvictionDoesNotOccur() throws InterruptedException {
Map<String, SplitMessageAggregator> map = getStringSplitMessageAggregatorMap();
int expectedSize = map.size();
Thread.currentThread().sleep(500);
AtlasNotificationMessageDeserializer.purgeStaleMessages(map, System.currentTimeMillis(), Long.MAX_VALUE);
Assert.assertEquals(map.size(), expectedSize);
}
private Map<String, SplitMessageAggregator> getStringSplitMessageAggregatorMap() {
Map<String, SplitMessageAggregator> map = new HashMap<>();
map.put("1", getSplitMessageAggregator("1", 5));
map.put("2", getSplitMessageAggregator("2", 10));
return map;
}
private SplitMessageAggregator getSplitMessageAggregator(String id, int splitCount) {
SplitMessageAggregator sma = null;
for (int i = 0; i < splitCount; i++) {
AtlasNotificationStringMessage sm = new AtlasNotificationStringMessage("aaaaa", id, CompressionKind.NONE, i, splitCount);
if(sma == null) {
sma = new SplitMessageAggregator(sm);
} else {
sma.add(sm);
}
}
return sma;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment