Commit 05514255 by Madhan Neethiraj

ATLAS-2075: notification enhancement to handle large messages, using compression…

ATLAS-2075: notification enhancement to handle large messages, using compression and multi-part messages (#2) (cherry picked from commit 28941bfe7220fc0cf09d138e79e18d5e0a82c367)
parent d541a378
......@@ -190,16 +190,13 @@ public abstract class AbstractNotification implements NotificationInterface {
if (msgLengthExceedsLimit) {
// compressed messages are already base64-encoded
byte[] encodedBytes = compressionKind != CompressionKind.NONE ? msgBytes : AtlasNotificationBaseMessage.encodeBase64(msgBytes);
int splitCount = encodedBytes.length / MESSAGE_MAX_LENGTH_BYTES;
byte[] encodedBytes = MESSAGE_COMPRESSION_ENABLED ? msgBytes : AtlasNotificationBaseMessage.encodeBase64(msgBytes);
int splitCount = encodedBytes.length / MESSAGE_MAX_LENGTH_BYTES;
if ((encodedBytes.length % MESSAGE_MAX_LENGTH_BYTES) != 0) {
splitCount++;
}
LOG.info("Splitting large message: msgID={}, length={} bytes, splitCount={}", msgId, encodedBytes.length, splitCount);
for (int i = 0, offset = 0; i < splitCount; i++) {
int length = MESSAGE_MAX_LENGTH_BYTES;
......@@ -215,6 +212,8 @@ public abstract class AbstractNotification implements NotificationInterface {
offset += length;
}
LOG.info("Split large message: msgID={}, splitCount={}, length={} bytes", msgId, splitCount, encodedBytes.length);
}
}
}
......
......@@ -107,15 +107,15 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
}
if (splitMsgs == null) {
LOG.error("Received multi-part message: msgID={}, {} of {}, but first message didn't arrive. Ignoring message", msgId, splitIdx + 1, splitCount);
LOG.error("Received msgID={}: {} of {}, but first message didn't arrive. Ignoring message", msgId, splitIdx + 1, splitCount);
msg = null;
} else if (splitMsgs.length <= splitIdx) {
LOG.error("Received multi-part message: msgID={}, {} of {} - out of bounds. Ignoring message", msgId, splitIdx + 1, splitCount);
LOG.error("Received msgID={}: {} of {} - out of bounds. Ignoring message", msgId, splitIdx + 1, splitCount);
msg = null;
} else {
LOG.info("Received multi-part message: msgID={}, {} of {}", msgId, splitIdx + 1, splitCount);
LOG.info("Received msgID={}: {} of {}", msgId, splitIdx + 1, splitCount);
splitMsgs[splitIdx] = splitMsg;
......@@ -130,7 +130,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
splitMsg = splitMsgs[i];
if (splitMsg == null) {
LOG.warn("Multi-part message: msgID={}, message {} of {} is missing. Ignoring message", msgId, i + 1, splitCount);
LOG.warn("MsgID={}: message {} of {} is missing. Ignoring message", msgId, i + 1, splitCount);
isValidMessage = false;
......@@ -149,14 +149,14 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
msgJson = AtlasNotificationBaseMessage.getStringUtf8(bytes);
LOG.info("Received multi-part, compressed message: msgID={}, compressed={} bytes, uncompressed={} bytes", msgId, encodedBytes.length, bytes.length);
LOG.info("Received msgID={}: splitCount={}, compressed={} bytes, uncompressed={} bytes", msgId, splitCount, encodedBytes.length, bytes.length);
} else {
byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8(msgJson);
byte[] bytes = AtlasNotificationBaseMessage.decodeBase64(encodedBytes);
msgJson = AtlasNotificationBaseMessage.getStringUtf8(bytes);
LOG.info("Received multi-part message: msgID={}, compressed={} bytes, uncompressed={} bytes", msgId, encodedBytes.length, bytes.length);
LOG.info("Received msgID={}: splitCount={}, length={} bytes", msgId, splitCount, bytes.length);
}
msg = gson.fromJson(msgJson, AtlasNotificationBaseMessage.class);
......@@ -179,7 +179,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
msgJson = AtlasNotificationBaseMessage.getStringUtf8(bytes);
LOG.info("Received compressed message: msgID={}, compressed={} bytes, uncompressed={} bytes", compressedMsg.getMsgId(), encodedBytes.length, bytes.length);
LOG.info("Received msgID={}: compressed={} bytes, uncompressed={} bytes", compressedMsg.getMsgId(), encodedBytes.length, bytes.length);
}
AtlasNotificationMessage<T> atlasNotificationMessage = gson.fromJson(msgJson, notificationMessageType);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment