Commit d541a378 by Madhan Neethiraj

ATLAS-2075: notification enhancement to handle large messages, using compression…

ATLAS-2075: notification enhancement to handle large messages, using compression and multi-part messages (cherry picked from commit 99243ee8e18656acd72601468c99e7781a0b04f7)
parent 3af54364
...@@ -33,6 +33,9 @@ public enum AtlasConfiguration { ...@@ -33,6 +33,9 @@ public enum AtlasConfiguration {
QUERY_PARAM_MAX_LENGTH("atlas.query.param.max.length", 4*1024), QUERY_PARAM_MAX_LENGTH("atlas.query.param.max.length", 4*1024),
NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES("atlas.notification.message.max.length.bytes", (1000 * 1000)),
NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled", true),
//search configuration //search configuration
SEARCH_MAX_LIMIT("atlas.search.maxlimit", 10000), SEARCH_MAX_LIMIT("atlas.search.maxlimit", 10000),
SEARCH_DEFAULT_LIMIT("atlas.search.defaultlimit", 100); SEARCH_DEFAULT_LIMIT("atlas.search.defaultlimit", 100);
...@@ -63,6 +66,10 @@ public enum AtlasConfiguration { ...@@ -63,6 +66,10 @@ public enum AtlasConfiguration {
return APPLICATION_PROPERTIES.getLong(propertyName, Long.valueOf(defaultValue.toString()).longValue()); return APPLICATION_PROPERTIES.getLong(propertyName, Long.valueOf(defaultValue.toString()).longValue());
} }
public boolean getBoolean() {
return APPLICATION_PROPERTIES.getBoolean(propertyName, Boolean.valueOf(defaultValue.toString()).booleanValue());
}
public String getString() { public String getString() {
return APPLICATION_PROPERTIES.getString(propertyName, defaultValue.toString()); return APPLICATION_PROPERTIES.getString(propertyName, defaultValue.toString());
} }
......
...@@ -71,6 +71,10 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> { ...@@ -71,6 +71,10 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
T message = deserializer.deserialize(record.value().toString()); T message = deserializer.deserialize(record.value().toString());
if (message == null) {
continue;
}
messages.add(new AtlasKafkaMessage(message, record.offset(), record.partition())); messages.add(new AtlasKafkaMessage(message, record.offset(), record.partition()));
} }
} }
......
...@@ -202,7 +202,7 @@ public class KafkaNotification extends AbstractNotification implements Service { ...@@ -202,7 +202,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
// ----- AbstractNotification -------------------------------------------- // ----- AbstractNotification --------------------------------------------
@Override @Override
public void sendInternal(NotificationType type, String... messages) throws NotificationException { public void sendInternal(NotificationType type, List<String> messages) throws NotificationException {
if (producer == null) { if (producer == null) {
createProducer(); createProducer();
} }
...@@ -210,7 +210,7 @@ public class KafkaNotification extends AbstractNotification implements Service { ...@@ -210,7 +210,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
} }
@VisibleForTesting @VisibleForTesting
void sendInternalToProducer(Producer p, NotificationType type, String[] messages) throws NotificationException { void sendInternalToProducer(Producer p, NotificationType type, List<String> messages) throws NotificationException {
String topic = TOPIC_MAP.get(type); String topic = TOPIC_MAP.get(type);
List<MessageContext> messageContexts = new ArrayList<>(); List<MessageContext> messageContexts = new ArrayList<>();
for (String message : messages) { for (String message : messages) {
......
...@@ -44,7 +44,7 @@ import java.util.Map; ...@@ -44,7 +44,7 @@ import java.util.Map;
/** /**
* Base notification message deserializer. * Base notification message deserializer.
*/ */
public abstract class AbstractMessageDeserializer<T> extends VersionedMessageDeserializer<T> { public abstract class AbstractMessageDeserializer<T> extends AtlasNotificationMessageDeserializer<T> {
private static final Map<Type, JsonDeserializer> DESERIALIZER_MAP = new HashMap<>(); private static final Map<Type, JsonDeserializer> DESERIALIZER_MAP = new HashMap<>();
...@@ -63,16 +63,16 @@ public abstract class AbstractMessageDeserializer<T> extends VersionedMessageDes ...@@ -63,16 +63,16 @@ public abstract class AbstractMessageDeserializer<T> extends VersionedMessageDes
/** /**
* Create a deserializer. * Create a deserializer.
* *
* @param versionedMessageType the type of the versioned message * @param notificationMessageType the type of the notification message
* @param expectedVersion the expected message version * @param expectedVersion the expected message version
* @param deserializerMap map of individual deserializers used to define this message deserializer * @param deserializerMap map of individual deserializers used to define this message deserializer
* @param notificationLogger logger for message version mismatch * @param notificationLogger logger for message version mismatch
*/ */
public AbstractMessageDeserializer(Type versionedMessageType, public AbstractMessageDeserializer(Type notificationMessageType,
MessageVersion expectedVersion, MessageVersion expectedVersion,
Map<Type, JsonDeserializer> deserializerMap, Map<Type, JsonDeserializer> deserializerMap,
Logger notificationLogger) { Logger notificationLogger) {
super(versionedMessageType, expectedVersion, getDeserializer(deserializerMap), notificationLogger); super(notificationMessageType, expectedVersion, getDeserializer(deserializerMap), notificationLogger);
} }
......
...@@ -26,21 +26,34 @@ import com.google.gson.JsonSerializationContext; ...@@ -26,21 +26,34 @@ import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer; import com.google.gson.JsonSerializer;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind;
import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.atlas.notification.AtlasNotificationBaseMessage.MESSAGE_COMPRESSION_ENABLED;
import static org.apache.atlas.notification.AtlasNotificationBaseMessage.MESSAGE_MAX_LENGTH_BYTES;
/** /**
* Abstract notification interface implementation. * Abstract notification interface implementation.
*/ */
public abstract class AbstractNotification implements NotificationInterface { public abstract class AbstractNotification implements NotificationInterface {
private static final Logger LOG = LoggerFactory.getLogger(AbstractNotification.class);
private static String msgIdPrefix = UUID.randomUUID().toString();
private static AtomicInteger msgIdSuffix = new AtomicInteger(0);
/** /**
* The current expected version for notification messages. * The current expected version for notification messages.
...@@ -48,6 +61,9 @@ public abstract class AbstractNotification implements NotificationInterface { ...@@ -48,6 +61,9 @@ public abstract class AbstractNotification implements NotificationInterface {
public static final MessageVersion CURRENT_MESSAGE_VERSION = new MessageVersion("1.0.0"); public static final MessageVersion CURRENT_MESSAGE_VERSION = new MessageVersion("1.0.0");
public static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded"; public static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded";
public static final int MAX_BYTES_PER_CHAR = 4; // each char can encode upto 4 bytes in UTF-8
private final boolean embedded; private final boolean embedded;
private final boolean isHAEnabled; private final boolean isHAEnabled;
...@@ -77,10 +93,12 @@ public abstract class AbstractNotification implements NotificationInterface { ...@@ -77,10 +93,12 @@ public abstract class AbstractNotification implements NotificationInterface {
@Override @Override
public <T> void send(NotificationType type, List<T> messages) throws NotificationException { public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
String[] strMessages = new String[messages.size()]; List<String> strMessages = new ArrayList<>(messages.size());
for (int index = 0; index < messages.size(); index++) { for (int index = 0; index < messages.size(); index++) {
strMessages[index] = getMessageJson(messages.get(index)); createNotificationMessages(messages.get(index), strMessages);
} }
sendInternal(type, strMessages); sendInternal(type, strMessages);
} }
...@@ -117,11 +135,17 @@ public abstract class AbstractNotification implements NotificationInterface { ...@@ -117,11 +135,17 @@ public abstract class AbstractNotification implements NotificationInterface {
* *
* @throws NotificationException if an error occurs while sending * @throws NotificationException if an error occurs while sending
*/ */
protected abstract void sendInternal(NotificationType type, String[] messages) throws NotificationException; protected abstract void sendInternal(NotificationType type, List<String> messages) throws NotificationException;
// ----- utility methods ------------------------------------------------- // ----- utility methods -------------------------------------------------
public static String getMessageJson(Object message) {
AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message);
return GSON.toJson(notificationMsg);
}
/** /**
* Get the notification message JSON from the given object. * Get the notification message JSON from the given object.
* *
...@@ -129,10 +153,75 @@ public abstract class AbstractNotification implements NotificationInterface { ...@@ -129,10 +153,75 @@ public abstract class AbstractNotification implements NotificationInterface {
* *
* @return the message as a JSON string * @return the message as a JSON string
*/ */
public static String getMessageJson(Object message) { public static void createNotificationMessages(Object message, List<String> msgJsonList) {
VersionedMessage<?> versionedMessage = new VersionedMessage<>(CURRENT_MESSAGE_VERSION, message); AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message);
String msgJson = GSON.toJson(notificationMsg);
boolean msgLengthExceedsLimit = (msgJson.length() * MAX_BYTES_PER_CHAR) > MESSAGE_MAX_LENGTH_BYTES;
if (msgLengthExceedsLimit) { // get utf-8 bytes for msgJson and check for length limit again
byte[] msgBytes = AtlasNotificationBaseMessage.getBytesUtf8(msgJson);
msgLengthExceedsLimit = msgBytes.length > MESSAGE_MAX_LENGTH_BYTES;
if (msgLengthExceedsLimit) {
String msgId = getNextMessageId();
CompressionKind compressionKind = CompressionKind.NONE;
if (MESSAGE_COMPRESSION_ENABLED) {
byte[] encodedBytes = AtlasNotificationBaseMessage.gzipCompressAndEncodeBase64(msgBytes);
compressionKind = CompressionKind.GZIP;
LOG.info("Compressed large message: msgID={}, uncompressed={} bytes, compressed={} bytes", msgId, msgBytes.length, encodedBytes.length);
msgLengthExceedsLimit = encodedBytes.length > MESSAGE_MAX_LENGTH_BYTES;
return GSON.toJson(versionedMessage); if (!msgLengthExceedsLimit) { // no need to split
AtlasNotificationStringMessage compressedMsg = new AtlasNotificationStringMessage(encodedBytes, msgId, compressionKind);
msgJson = GSON.toJson(compressedMsg); // msgJson will not have multi-byte characters here, due to use of encodeBase64() above
msgBytes = null; // not used after this point
} else { // encodedBytes will be split
msgJson = null; // not used after this point
msgBytes = encodedBytes;
}
}
if (msgLengthExceedsLimit) {
// compressed messages are already base64-encoded
byte[] encodedBytes = compressionKind != CompressionKind.NONE ? msgBytes : AtlasNotificationBaseMessage.encodeBase64(msgBytes);
int splitCount = encodedBytes.length / MESSAGE_MAX_LENGTH_BYTES;
if ((encodedBytes.length % MESSAGE_MAX_LENGTH_BYTES) != 0) {
splitCount++;
}
LOG.info("Splitting large message: msgID={}, length={} bytes, splitCount={}", msgId, encodedBytes.length, splitCount);
for (int i = 0, offset = 0; i < splitCount; i++) {
int length = MESSAGE_MAX_LENGTH_BYTES;
if ((offset + length) > encodedBytes.length) {
length = encodedBytes.length - offset;
}
AtlasNotificationStringMessage splitMsg = new AtlasNotificationStringMessage(encodedBytes, offset, length, msgId, compressionKind, i, splitCount);
String splitMsgJson = GSON.toJson(splitMsg);
msgJsonList.add(splitMsgJson);
offset += length;
}
}
}
}
if (!msgLengthExceedsLimit) {
msgJsonList.add(msgJson);
}
} }
...@@ -158,4 +247,16 @@ public abstract class AbstractNotification implements NotificationInterface { ...@@ -158,4 +247,16 @@ public abstract class AbstractNotification implements NotificationInterface {
return new JsonParser().parse(src.toString()).getAsJsonArray(); return new JsonParser().parse(src.toString()).getAsJsonArray();
} }
} }
private static String getNextMessageId() {
String nextMsgIdPrefix = msgIdPrefix;
int nextMsgIdSuffix = msgIdSuffix.getAndIncrement();
if (nextMsgIdSuffix == Short.MAX_VALUE) { // get a new UUID after 32,767 IDs
msgIdPrefix = UUID.randomUUID().toString();
msgIdSuffix = new AtomicInteger(0);
}
return nextMsgIdPrefix + "_" + Integer.toString(nextMsgIdSuffix);
}
} }
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import org.apache.atlas.AtlasConfiguration;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.commons.compress.utils.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
public class AtlasNotificationBaseMessage {
private static final Logger LOG = LoggerFactory.getLogger(AtlasNotificationBaseMessage.class);
public static final int MESSAGE_MAX_LENGTH_BYTES = AtlasConfiguration.NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES.getInt() - 512; // 512 bytes for envelop;
public static final boolean MESSAGE_COMPRESSION_ENABLED = AtlasConfiguration.NOTIFICATION_MESSAGE_COMPRESSION_ENABLED.getBoolean();
public enum CompressionKind { NONE, GZIP };
private MessageVersion version = null;
private String msgId = null;
private CompressionKind msgCompressionKind = CompressionKind.NONE;
private int msgSplitIdx = 1;
private int msgSplitCount = 1;
public AtlasNotificationBaseMessage() {
}
public AtlasNotificationBaseMessage(MessageVersion version) {
this(version, null, CompressionKind.NONE);
}
public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind) {
this.version = version;
this.msgId = msgId;
this.msgCompressionKind = msgCompressionKind;
}
public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind, int msgSplitIdx, int msgSplitCount) {
this.version = version;
this.msgId = msgId;
this.msgCompressionKind = msgCompressionKind;
this.msgSplitIdx = msgSplitIdx;
this.msgSplitCount = msgSplitCount;
}
public void setVersion(MessageVersion version) {
this.version = version;
}
public MessageVersion getVersion() {
return version;
}
public String getMsgId() {
return msgId;
}
public void setMsgId(String msgId) {
this.msgId = msgId;
}
public CompressionKind getMsgCompressionKind() {
return msgCompressionKind;
}
public void setMsgCompressed(CompressionKind msgCompressionKind) {
this.msgCompressionKind = msgCompressionKind;
}
public int getMsgSplitIdx() {
return msgSplitIdx;
}
public void setMsgSplitIdx(int msgSplitIdx) {
this.msgSplitIdx = msgSplitIdx;
}
public int getMsgSplitCount() {
return msgSplitCount;
}
public void setMsgSplitCount(int msgSplitCount) {
this.msgSplitCount = msgSplitCount;
}
/**
* Compare the version of this message with the given version.
*
* @param compareToVersion the version to compare to
*
* @return a negative integer, zero, or a positive integer as this message's version is less than, equal to,
* or greater than the given version.
*/
public int compareVersion(MessageVersion compareToVersion) {
return version.compareTo(compareToVersion);
}
public static byte[] getBytesUtf8(String str) {
return StringUtils.getBytesUtf8(str);
}
public static String getStringUtf8(byte[] bytes) {
return StringUtils.newStringUtf8(bytes);
}
public static byte[] encodeBase64(byte[] bytes) {
return Base64.encodeBase64(bytes);
}
public static byte[] decodeBase64(byte[] bytes) {
return Base64.decodeBase64(bytes);
}
public static byte[] gzipCompressAndEncodeBase64(byte[] bytes) {
return encodeBase64(gzipCompress(bytes));
}
public static byte[] decodeBase64AndGzipUncompress(byte[] bytes) {
return gzipUncompress(decodeBase64(bytes));
}
public static String gzipCompress(String str) {
byte[] bytes = getBytesUtf8(str);
byte[] compressedBytes = gzipCompress(bytes);
byte[] encodedBytes = encodeBase64(compressedBytes);
return getStringUtf8(encodedBytes);
}
public static String gzipUncompress(String str) {
byte[] encodedBytes = getBytesUtf8(str);
byte[] compressedBytes = decodeBase64(encodedBytes);
byte[] bytes = gzipUncompress(compressedBytes);
return getStringUtf8(bytes);
}
public static byte[] gzipCompress(byte[] content) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try {
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);
gzipOutputStream.write(content);
gzipOutputStream.close();
} catch (IOException e) {
LOG.error("gzipCompress(): error compressing {} bytes", content.length, e);
throw new RuntimeException(e);
}
return byteArrayOutputStream.toByteArray();
}
public static byte[] gzipUncompress(byte[] content) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
IOUtils.copy(new GZIPInputStream(new ByteArrayInputStream(content)), out);
} catch (IOException e) {
LOG.error("gzipUncompress(): error uncompressing {} bytes", content.length, e);
}
return out.toByteArray();
}
}
...@@ -21,12 +21,7 @@ package org.apache.atlas.notification; ...@@ -21,12 +21,7 @@ package org.apache.atlas.notification;
/** /**
* Represents a notification message that is associated with a version. * Represents a notification message that is associated with a version.
*/ */
public class VersionedMessage<T> { public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
/**
* The version of the message.
*/
private final MessageVersion version;
/** /**
* The actual message. * The actual message.
...@@ -37,38 +32,18 @@ public class VersionedMessage<T> { ...@@ -37,38 +32,18 @@ public class VersionedMessage<T> {
// ----- Constructors ---------------------------------------------------- // ----- Constructors ----------------------------------------------------
/** /**
* Create a versioned message. * Create a notification message.
* *
* @param version the message version * @param version the message version
* @param message the actual message * @param message the actual message
*/ */
public VersionedMessage(MessageVersion version, T message) { public AtlasNotificationMessage(MessageVersion version, T message) {
this.version = version; super(version);
this.message = message;
}
// ----- VersionedMessage ------------------------------------------------ this.message = message;
/**
* Compare the version of this message with the given version.
*
* @param compareToVersion the version to compare to
*
* @return a negative integer, zero, or a positive integer as this message's version is less than, equal to,
* or greater than the given version.
*/
public int compareVersion(MessageVersion compareToVersion) {
return version.compareTo(compareToVersion);
} }
// ----- accessors -------------------------------------------------------
public MessageVersion getVersion() {
return version;
}
public T getMessage() { public T getMessage() {
return message; return message;
} }
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import com.google.gson.Gson;
import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;
/**
* Deserializer that works with notification messages. The version of each deserialized message is checked against an
* expected version.
*/
public abstract class AtlasNotificationMessageDeserializer<T> implements MessageDeserializer<T> {
private static final Logger LOG = LoggerFactory.getLogger(AtlasNotificationMessageDeserializer.class);
public static final String VERSION_MISMATCH_MSG =
"Notification message version mismatch. Expected %s but recieved %s. Message %s";
private final Type notificationMessageType;
private final Type messageType;
private final MessageVersion expectedVersion;
private final Logger notificationLogger;
private final Gson gson;
private final Map<String, AtlasNotificationStringMessage[]> splitMsgBuffer = new HashMap<>();
// ----- Constructors ----------------------------------------------------
/**
* Create a notification message deserializer.
*
* @param notificationMessageType the type of the notification message
* @param expectedVersion the expected message version
* @param gson JSON serialization/deserialization
* @param notificationLogger logger for message version mismatch
*/
public AtlasNotificationMessageDeserializer(Type notificationMessageType, MessageVersion expectedVersion,
Gson gson, Logger notificationLogger) {
this.notificationMessageType = notificationMessageType;
this.messageType = ((ParameterizedType) notificationMessageType).getActualTypeArguments()[0];
this.expectedVersion = expectedVersion;
this.gson = gson;
this.notificationLogger = notificationLogger;
}
// ----- MessageDeserializer ---------------------------------------------
@Override
public T deserialize(String messageJson) {
final T ret;
AtlasNotificationBaseMessage msg = gson.fromJson(messageJson, AtlasNotificationBaseMessage.class);
if (msg.getVersion() == null) { // older style messages not wrapped with AtlasNotificationMessage
ret = gson.fromJson(messageJson, messageType);
} else {
String msgJson = messageJson;
if (msg.getMsgSplitCount() > 1) { // multi-part message
AtlasNotificationStringMessage splitMsg = gson.fromJson(msgJson, AtlasNotificationStringMessage.class);
checkVersion(splitMsg, msgJson);
String msgId = splitMsg.getMsgId();
if (StringUtils.isEmpty(msgId)) {
LOG.error("Received multi-part message with no message ID. Ignoring message");
msg = null;
} else {
final int splitIdx = splitMsg.getMsgSplitIdx();
final int splitCount = splitMsg.getMsgSplitCount();
final AtlasNotificationStringMessage[] splitMsgs;
if (splitIdx == 0) {
splitMsgs = new AtlasNotificationStringMessage[splitCount];
splitMsgBuffer.put(msgId, splitMsgs);
} else {
splitMsgs = splitMsgBuffer.get(msgId);
}
if (splitMsgs == null) {
LOG.error("Received multi-part message: msgID={}, {} of {}, but first message didn't arrive. Ignoring message", msgId, splitIdx + 1, splitCount);
msg = null;
} else if (splitMsgs.length <= splitIdx) {
LOG.error("Received multi-part message: msgID={}, {} of {} - out of bounds. Ignoring message", msgId, splitIdx + 1, splitCount);
msg = null;
} else {
LOG.info("Received multi-part message: msgID={}, {} of {}", msgId, splitIdx + 1, splitCount);
splitMsgs[splitIdx] = splitMsg;
if (splitIdx == (splitCount - 1)) { // last message
splitMsgBuffer.remove(msgId);
boolean isValidMessage = true;
StringBuilder sb = new StringBuilder();
for (int i = 0; i < splitMsgs.length; i++) {
splitMsg = splitMsgs[i];
if (splitMsg == null) {
LOG.warn("Multi-part message: msgID={}, message {} of {} is missing. Ignoring message", msgId, i + 1, splitCount);
isValidMessage = false;
break;
}
sb.append(splitMsg.getMessage());
}
if (isValidMessage) {
msgJson = sb.toString();
if (CompressionKind.GZIP.equals(splitMsg.getMsgCompressionKind())) {
byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8(msgJson);
byte[] bytes = AtlasNotificationBaseMessage.decodeBase64AndGzipUncompress(encodedBytes);
msgJson = AtlasNotificationBaseMessage.getStringUtf8(bytes);
LOG.info("Received multi-part, compressed message: msgID={}, compressed={} bytes, uncompressed={} bytes", msgId, encodedBytes.length, bytes.length);
} else {
byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8(msgJson);
byte[] bytes = AtlasNotificationBaseMessage.decodeBase64(encodedBytes);
msgJson = AtlasNotificationBaseMessage.getStringUtf8(bytes);
LOG.info("Received multi-part message: msgID={}, compressed={} bytes, uncompressed={} bytes", msgId, encodedBytes.length, bytes.length);
}
msg = gson.fromJson(msgJson, AtlasNotificationBaseMessage.class);
} else {
msg = null;
}
} else { // more messages to arrive
msg = null;
}
}
}
}
if (msg != null) {
if (CompressionKind.GZIP.equals(msg.getMsgCompressionKind())) {
AtlasNotificationStringMessage compressedMsg = gson.fromJson(msgJson, AtlasNotificationStringMessage.class);
byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8(compressedMsg.getMessage());
byte[] bytes = AtlasNotificationBaseMessage.decodeBase64AndGzipUncompress(encodedBytes);
msgJson = AtlasNotificationBaseMessage.getStringUtf8(bytes);
LOG.info("Received compressed message: msgID={}, compressed={} bytes, uncompressed={} bytes", compressedMsg.getMsgId(), encodedBytes.length, bytes.length);
}
AtlasNotificationMessage<T> atlasNotificationMessage = gson.fromJson(msgJson, notificationMessageType);
checkVersion(atlasNotificationMessage, msgJson);
ret = atlasNotificationMessage.getMessage();
} else {
ret = null;
}
}
return ret;
}
// ----- helper methods --------------------------------------------------
/**
* Check the message version against the expected version.
*
* @param notificationMessage the notification message
* @param messageJson the notification message json
*
* @throws IncompatibleVersionException if the message version is incompatable with the expected version
*/
protected void checkVersion(AtlasNotificationBaseMessage notificationMessage, String messageJson) {
int comp = notificationMessage.compareVersion(expectedVersion);
// message has newer version
if (comp > 0) {
String msg = String.format(VERSION_MISMATCH_MSG, expectedVersion, notificationMessage.getVersion(), messageJson);
notificationLogger.error(msg);
throw new IncompatibleVersionException(msg);
}
// message has older version
if (comp < 0) {
notificationLogger.info(String.format(VERSION_MISMATCH_MSG, expectedVersion, notificationMessage.getVersion(), messageJson));
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
public class AtlasNotificationStringMessage extends AtlasNotificationBaseMessage {
private String message = null;
public AtlasNotificationStringMessage() {
super(AbstractNotification.CURRENT_MESSAGE_VERSION);
}
public AtlasNotificationStringMessage(String message) {
super(AbstractNotification.CURRENT_MESSAGE_VERSION);
this.message = message;
}
public AtlasNotificationStringMessage(String message, String msgId, CompressionKind compressionKind) {
super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind);
this.message = message;
}
public AtlasNotificationStringMessage(byte[] encodedBytes, String msgId, CompressionKind compressionKind) {
super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind);
this.message = AtlasNotificationBaseMessage.getStringUtf8(encodedBytes);
}
public AtlasNotificationStringMessage(byte[] encodedBytes, int offset, int length, String msgId, CompressionKind compressionKind, int msgSplitIdx, int msgSplitCount) {
super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind, msgSplitIdx, msgSplitCount);
this.message = new String(encodedBytes, offset, length);
}
public void setMessage(String message) {
this.message = message;
}
public String getMessage() {
return message;
}
}
...@@ -29,6 +29,9 @@ public class MessageVersion implements Comparable<MessageVersion> { ...@@ -29,6 +29,9 @@ public class MessageVersion implements Comparable<MessageVersion> {
* Used for message with no version (old format). * Used for message with no version (old format).
*/ */
public static final MessageVersion NO_VERSION = new MessageVersion("0"); public static final MessageVersion NO_VERSION = new MessageVersion("0");
public static final MessageVersion VERSION_1 = new MessageVersion("1.0.0");
public static final MessageVersion CURRENT_VERSION = VERSION_1;
private final String version; private final String version;
......
...@@ -54,9 +54,9 @@ public interface NotificationInterface { ...@@ -54,9 +54,9 @@ public interface NotificationInterface {
* Versioned notification message class types. * Versioned notification message class types.
*/ */
Type HOOK_VERSIONED_MESSAGE_TYPE = Type HOOK_VERSIONED_MESSAGE_TYPE =
new TypeToken<VersionedMessage<HookNotification.HookNotificationMessage>>(){}.getType(); new TypeToken<AtlasNotificationMessage<HookNotification.HookNotificationMessage>>(){}.getType();
Type ENTITY_VERSIONED_MESSAGE_TYPE = new TypeToken<VersionedMessage<EntityNotification>>(){}.getType(); Type ENTITY_VERSIONED_MESSAGE_TYPE = new TypeToken<AtlasNotificationMessage<EntityNotification>>(){}.getType();
/** /**
* Atlas notification types. * Atlas notification types.
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import com.google.gson.Gson;
import org.slf4j.Logger;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
/**
* Deserializer that works with versioned messages. The version of each deserialized message is checked against an
* expected version.
*/
public abstract class VersionedMessageDeserializer<T> implements MessageDeserializer<T> {
public static final String VERSION_MISMATCH_MSG =
"Notification message version mismatch. Expected %s but recieved %s. Message %s";
private final Type versionedMessageType;
private final MessageVersion expectedVersion;
private final Logger notificationLogger;
private final Gson gson;
// ----- Constructors ----------------------------------------------------
/**
* Create a versioned message deserializer.
*
* @param versionedMessageType the type of the versioned message
* @param expectedVersion the expected message version
* @param gson JSON serialization/deserialization
* @param notificationLogger logger for message version mismatch
*/
public VersionedMessageDeserializer(Type versionedMessageType, MessageVersion expectedVersion,
Gson gson, Logger notificationLogger) {
this.versionedMessageType = versionedMessageType;
this.expectedVersion = expectedVersion;
this.gson = gson;
this.notificationLogger = notificationLogger;
}
// ----- MessageDeserializer ---------------------------------------------
@Override
public T deserialize(String messageJson) {
VersionedMessage<T> versionedMessage = gson.fromJson(messageJson, versionedMessageType);
// older style messages not wrapped with VersionedMessage
if (versionedMessage.getVersion() == null) {
Type t = ((ParameterizedType) versionedMessageType).getActualTypeArguments()[0];
versionedMessage = new VersionedMessage<>(MessageVersion.NO_VERSION, gson.<T>fromJson(messageJson, t));
}
checkVersion(versionedMessage, messageJson);
return versionedMessage.getMessage();
}
// ----- helper methods --------------------------------------------------
/**
* Check the message version against the expected version.
*
* @param versionedMessage the versioned message
* @param messageJson the notification message json
*
* @throws IncompatibleVersionException if the message version is incompatable with the expected version
*/
protected void checkVersion(VersionedMessage<T> versionedMessage, String messageJson) {
int comp = versionedMessage.compareVersion(expectedVersion);
// message has newer version
if (comp > 0) {
String msg =
String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion(), messageJson);
notificationLogger.error(msg);
throw new IncompatibleVersionException(msg);
}
// message has older version
if (comp < 0) {
notificationLogger.info(String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion(),
messageJson));
}
}
}
...@@ -19,11 +19,8 @@ ...@@ -19,11 +19,8 @@
package org.apache.atlas.kafka; package org.apache.atlas.kafka;
import kafka.message.MessageAndMetadata; import kafka.message.MessageAndMetadata;
import org.apache.atlas.notification.AbstractNotification; import org.apache.atlas.notification.*;
import org.apache.atlas.notification.MessageVersion; import org.apache.atlas.notification.AtlasNotificationMessage;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.IncompatibleVersionException;
import org.apache.atlas.notification.VersionedMessage;
import org.apache.atlas.notification.entity.EntityNotificationImplTest; import org.apache.atlas.notification.entity.EntityNotificationImplTest;
import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.IStruct;
...@@ -82,7 +79,7 @@ public class KafkaConsumerTest { ...@@ -82,7 +79,7 @@ public class KafkaConsumerTest {
HookNotification.EntityUpdateRequest message = HookNotification.EntityUpdateRequest message =
new HookNotification.EntityUpdateRequest("user1", entity); new HookNotification.EntityUpdateRequest("user1", entity);
String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), message)); String json = AbstractNotification.GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), message));
kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0))); kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0)));
List<ConsumerRecord> klist = new ArrayList<>(); List<ConsumerRecord> klist = new ArrayList<>();
...@@ -119,7 +116,7 @@ public class KafkaConsumerTest { ...@@ -119,7 +116,7 @@ public class KafkaConsumerTest {
HookNotification.EntityUpdateRequest message = HookNotification.EntityUpdateRequest message =
new HookNotification.EntityUpdateRequest("user1", entity); new HookNotification.EntityUpdateRequest("user1", entity);
String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), message)); String json = AbstractNotification.GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), message));
kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0))); kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0)));
List<ConsumerRecord> klist = new ArrayList<>(); List<ConsumerRecord> klist = new ArrayList<>();
......
...@@ -17,20 +17,14 @@ ...@@ -17,20 +17,14 @@
*/ */
package org.apache.atlas.kafka; package org.apache.atlas.kafka;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import org.apache.atlas.notification.MessageDeserializer;
import org.apache.atlas.notification.NotificationConsumer; import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException; import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface; import org.apache.atlas.notification.NotificationInterface;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
...@@ -38,11 +32,9 @@ import java.util.Map; ...@@ -38,11 +32,9 @@ import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.apache.atlas.kafka.AtlasKafkaConsumer; import scala.actors.threadpool.Arrays;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
...@@ -90,7 +82,7 @@ public class KafkaNotificationMockTest { ...@@ -90,7 +82,7 @@ public class KafkaNotificationMockTest {
when(producer.send(expectedRecord)).thenReturn(returnValue); when(producer.send(expectedRecord)).thenReturn(returnValue);
kafkaNotification.sendInternalToProducer(producer, kafkaNotification.sendInternalToProducer(producer,
NotificationInterface.NotificationType.HOOK, new String[]{message}); NotificationInterface.NotificationType.HOOK, Arrays.asList(new String[]{message}));
verify(producer).send(expectedRecord); verify(producer).send(expectedRecord);
} }
...@@ -112,7 +104,7 @@ public class KafkaNotificationMockTest { ...@@ -112,7 +104,7 @@ public class KafkaNotificationMockTest {
try { try {
kafkaNotification.sendInternalToProducer(producer, kafkaNotification.sendInternalToProducer(producer,
NotificationInterface.NotificationType.HOOK, new String[]{message}); NotificationInterface.NotificationType.HOOK, Arrays.asList(new String[]{message}));
fail("Should have thrown NotificationException"); fail("Should have thrown NotificationException");
} catch (NotificationException e) { } catch (NotificationException e) {
assertEquals(e.getFailedMessages().size(), 1); assertEquals(e.getFailedMessages().size(), 1);
...@@ -142,7 +134,7 @@ public class KafkaNotificationMockTest { ...@@ -142,7 +134,7 @@ public class KafkaNotificationMockTest {
try { try {
kafkaNotification.sendInternalToProducer(producer, kafkaNotification.sendInternalToProducer(producer,
NotificationInterface.NotificationType.HOOK, new String[]{message1, message2}); NotificationInterface.NotificationType.HOOK, Arrays.asList(new String[]{message1, message2}));
fail("Should have thrown NotificationException"); fail("Should have thrown NotificationException");
} catch (NotificationException e) { } catch (NotificationException e) {
assertEquals(e.getFailedMessages().size(), 2); assertEquals(e.getFailedMessages().size(), 2);
......
...@@ -30,7 +30,6 @@ import java.util.LinkedList; ...@@ -30,7 +30,6 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import static org.mockito.Matchers.endsWith;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
...@@ -57,15 +56,15 @@ public class AbstractNotificationConsumerTest { ...@@ -57,15 +56,15 @@ public class AbstractNotificationConsumerTest {
List jsonList = new LinkedList<>(); List jsonList = new LinkedList<>();
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1))); jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1)));
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage2))); jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage2)));
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage3))); jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage3)));
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage4))); jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage4)));
Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType(); Type notificationMessageType = new TypeToken<AtlasNotificationMessage<TestMessage>>(){}.getType();
NotificationConsumer<TestMessage> consumer = NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger); new TestNotificationConsumer<>(notificationMessageType, jsonList, logger);
List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(); List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
...@@ -91,9 +90,9 @@ public class AbstractNotificationConsumerTest { ...@@ -91,9 +90,9 @@ public class AbstractNotificationConsumerTest {
List jsonList = new LinkedList<>(); List jsonList = new LinkedList<>();
String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1)); String json1 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1));
String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("0.0.5"), testMessage2)); String json2 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("0.0.5"), testMessage2));
String json3 = GSON.toJson(new VersionedMessage<>(new MessageVersion("0.5.0"), testMessage3)); String json3 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("0.5.0"), testMessage3));
String json4 = GSON.toJson(testMessage4); String json4 = GSON.toJson(testMessage4);
jsonList.add(json1); jsonList.add(json1);
...@@ -101,10 +100,10 @@ public class AbstractNotificationConsumerTest { ...@@ -101,10 +100,10 @@ public class AbstractNotificationConsumerTest {
jsonList.add(json3); jsonList.add(json3);
jsonList.add(json4); jsonList.add(json4);
Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType(); Type notificationMessageType = new TypeToken<AtlasNotificationMessage<TestMessage>>(){}.getType();
NotificationConsumer<TestMessage> consumer = NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger); new TestNotificationConsumer<>(notificationMessageType, jsonList, logger);
List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(); List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
...@@ -127,16 +126,16 @@ public class AbstractNotificationConsumerTest { ...@@ -127,16 +126,16 @@ public class AbstractNotificationConsumerTest {
List jsonList = new LinkedList<>(); List jsonList = new LinkedList<>();
String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1)); String json1 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1));
String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), testMessage2)); String json2 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), testMessage2));
jsonList.add(json1); jsonList.add(json1);
jsonList.add(json2); jsonList.add(json2);
Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType(); Type notificationMessageType = new TypeToken<AtlasNotificationMessage<TestMessage>>(){}.getType();
NotificationConsumer<TestMessage> consumer = NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger); new TestNotificationConsumer<>(notificationMessageType, jsonList, logger);
try { try {
List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(); List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
...@@ -187,8 +186,8 @@ public class AbstractNotificationConsumerTest { ...@@ -187,8 +186,8 @@ public class AbstractNotificationConsumerTest {
private final List<T> messageList; private final List<T> messageList;
private int index = 0; private int index = 0;
public TestNotificationConsumer(Type versionedMessageType, List<T> messages, Logger logger) { public TestNotificationConsumer(Type notificationMessageType, List<T> messages, Logger logger) {
super(new TestDeserializer<T>(versionedMessageType, logger)); super(new TestDeserializer<T>(notificationMessageType, logger));
this.messageList = messages; this.messageList = messages;
} }
...@@ -222,10 +221,10 @@ public class AbstractNotificationConsumerTest { ...@@ -222,10 +221,10 @@ public class AbstractNotificationConsumerTest {
} }
} }
private static final class TestDeserializer<T> extends VersionedMessageDeserializer<T> { private static final class TestDeserializer<T> extends AtlasNotificationMessageDeserializer<T> {
private TestDeserializer(Type versionedMessageType, Logger logger) { private TestDeserializer(Type notificationMessageType, Logger logger) {
super(versionedMessageType, AbstractNotification.CURRENT_MESSAGE_VERSION, GSON, logger); super(notificationMessageType, AbstractNotification.CURRENT_MESSAGE_VERSION, GSON, logger);
} }
} }
} }
...@@ -23,6 +23,7 @@ import org.apache.atlas.notification.hook.HookNotification; ...@@ -23,6 +23,7 @@ import org.apache.atlas.notification.hook.HookNotification;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
...@@ -44,17 +45,18 @@ public class AbstractNotificationTest { ...@@ -44,17 +45,18 @@ public class AbstractNotificationTest {
TestMessage message2 = new TestMessage(HookNotification.HookNotificationType.TYPE_CREATE, "user1"); TestMessage message2 = new TestMessage(HookNotification.HookNotificationType.TYPE_CREATE, "user1");
TestMessage message3 = new TestMessage(HookNotification.HookNotificationType.ENTITY_FULL_UPDATE, "user1"); TestMessage message3 = new TestMessage(HookNotification.HookNotificationType.ENTITY_FULL_UPDATE, "user1");
String messageJson1 = AbstractNotification.getMessageJson(message1); List<String> messageJson = new ArrayList<>();
String messageJson2 = AbstractNotification.getMessageJson(message2); AbstractNotification.createNotificationMessages(message1, messageJson);
String messageJson3 = AbstractNotification.getMessageJson(message3); AbstractNotification.createNotificationMessages(message2, messageJson);
AbstractNotification.createNotificationMessages(message3, messageJson);
notification.send(NotificationInterface.NotificationType.HOOK, message1, message2, message3); notification.send(NotificationInterface.NotificationType.HOOK, message1, message2, message3);
assertEquals(NotificationInterface.NotificationType.HOOK, notification.type); assertEquals(NotificationInterface.NotificationType.HOOK, notification.type);
assertEquals(3, notification.messages.length); assertEquals(3, notification.messages.size());
assertEquals(messageJson1, notification.messages[0]); assertEquals(messageJson.get(0), notification.messages.get(0));
assertEquals(messageJson2, notification.messages[1]); assertEquals(messageJson.get(1), notification.messages.get(1));
assertEquals(messageJson3, notification.messages[2]); assertEquals(messageJson.get(2), notification.messages.get(2));
} }
@Test @Test
...@@ -72,17 +74,16 @@ public class AbstractNotificationTest { ...@@ -72,17 +74,16 @@ public class AbstractNotificationTest {
messages.add(message2); messages.add(message2);
messages.add(message3); messages.add(message3);
String messageJson1 = AbstractNotification.getMessageJson(message1); List<String> messageJson = new ArrayList<>();
String messageJson2 = AbstractNotification.getMessageJson(message2); AbstractNotification.createNotificationMessages(message1, messageJson);
String messageJson3 = AbstractNotification.getMessageJson(message3); AbstractNotification.createNotificationMessages(message2, messageJson);
AbstractNotification.createNotificationMessages(message3, messageJson);
notification.send(NotificationInterface.NotificationType.HOOK, messages); notification.send(NotificationInterface.NotificationType.HOOK, messages);
assertEquals(NotificationInterface.NotificationType.HOOK, notification.type); assertEquals(NotificationInterface.NotificationType.HOOK, notification.type);
assertEquals(3, notification.messages.length); assertEquals(messageJson.size(), notification.messages.size());
assertEquals(messageJson1, notification.messages[0]); assertEquals(messageJson, notification.messages);
assertEquals(messageJson2, notification.messages[1]);
assertEquals(messageJson3, notification.messages[2]);
} }
public static class TestMessage extends HookNotification.HookNotificationMessage { public static class TestMessage extends HookNotification.HookNotificationMessage {
...@@ -94,14 +95,14 @@ public class AbstractNotificationTest { ...@@ -94,14 +95,14 @@ public class AbstractNotificationTest {
public static class TestNotification extends AbstractNotification { public static class TestNotification extends AbstractNotification {
private NotificationType type; private NotificationType type;
private String[] messages; private List<String> messages;
public TestNotification(Configuration applicationProperties) throws AtlasException { public TestNotification(Configuration applicationProperties) throws AtlasException {
super(applicationProperties); super(applicationProperties);
} }
@Override @Override
protected void sendInternal(NotificationType notificationType, String[] notificationMessages) protected void sendInternal(NotificationType notificationType, List<String> notificationMessages)
throws NotificationException { throws NotificationException {
type = notificationType; type = notificationType;
......
...@@ -23,23 +23,23 @@ import org.testng.annotations.Test; ...@@ -23,23 +23,23 @@ import org.testng.annotations.Test;
import static org.testng.Assert.*; import static org.testng.Assert.*;
/** /**
* VersionedMessage tests. * AtlasNotificationMessage tests.
*/ */
public class VersionedMessageTest { public class AtlasNotificationMessageTest {
@Test @Test
public void testGetVersion() throws Exception { public void testGetVersion() throws Exception {
MessageVersion version = new MessageVersion("1.0.0"); MessageVersion version = new MessageVersion("1.0.0");
VersionedMessage<String> versionedMessage = new VersionedMessage<>(version, "a"); AtlasNotificationMessage<String> atlasNotificationMessage = new AtlasNotificationMessage<>(version, "a");
assertEquals(versionedMessage.getVersion(), version); assertEquals(atlasNotificationMessage.getVersion(), version);
} }
@Test @Test
public void testGetMessage() throws Exception { public void testGetMessage() throws Exception {
String message = "a"; String message = "a";
MessageVersion version = new MessageVersion("1.0.0"); MessageVersion version = new MessageVersion("1.0.0");
VersionedMessage<String> versionedMessage = new VersionedMessage<>(version, message); AtlasNotificationMessage<String> atlasNotificationMessage = new AtlasNotificationMessage<>(version, message);
assertEquals(versionedMessage.getMessage(), message); assertEquals(atlasNotificationMessage.getMessage(), message);
} }
@Test @Test
...@@ -48,10 +48,10 @@ public class VersionedMessageTest { ...@@ -48,10 +48,10 @@ public class VersionedMessageTest {
MessageVersion version2 = new MessageVersion("2.0.0"); MessageVersion version2 = new MessageVersion("2.0.0");
MessageVersion version3 = new MessageVersion("0.5.0"); MessageVersion version3 = new MessageVersion("0.5.0");
VersionedMessage<String> versionedMessage = new VersionedMessage<>(version1, "a"); AtlasNotificationMessage<String> atlasNotificationMessage = new AtlasNotificationMessage<>(version1, "a");
assertTrue(versionedMessage.compareVersion(version1) == 0); assertTrue(atlasNotificationMessage.compareVersion(version1) == 0);
assertTrue(versionedMessage.compareVersion(version2) < 0); assertTrue(atlasNotificationMessage.compareVersion(version2) < 0);
assertTrue(versionedMessage.compareVersion(version3) > 0); assertTrue(atlasNotificationMessage.compareVersion(version3) > 0);
} }
} }
...@@ -24,6 +24,7 @@ import org.apache.atlas.typesystem.Referenceable; ...@@ -24,6 +24,7 @@ import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct; import org.apache.atlas.typesystem.Struct;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
...@@ -48,9 +49,20 @@ public class EntityMessageDeserializerTest { ...@@ -48,9 +49,20 @@ public class EntityMessageDeserializerTest {
EntityNotificationImpl notification = EntityNotificationImpl notification =
new EntityNotificationImpl(entity, EntityNotification.OperationType.TRAIT_ADD, traitInfo); new EntityNotificationImpl(entity, EntityNotification.OperationType.TRAIT_ADD, traitInfo);
String json = AbstractNotification.getMessageJson(notification); List<String> jsonMsgList = new ArrayList<>();
AbstractNotification.createNotificationMessages(notification, jsonMsgList);
EntityNotification deserializedNotification = null;
for (String jsonMsg : jsonMsgList) {
deserializedNotification = deserializer.deserialize(jsonMsg);
if (deserializedNotification != null) {
break;
}
}
EntityNotification deserializedNotification = deserializer.deserialize(json);
assertEquals(deserializedNotification.getOperationType(), notification.getOperationType()); assertEquals(deserializedNotification.getOperationType(), notification.getOperationType());
assertEquals(deserializedNotification.getEntity().getId(), notification.getEntity().getId()); assertEquals(deserializedNotification.getEntity().getId(), notification.getEntity().getId());
assertEquals(deserializedNotification.getEntity().getTypeName(), notification.getEntity().getTypeName()); assertEquals(deserializedNotification.getEntity().getTypeName(), notification.getEntity().getTypeName());
......
...@@ -20,51 +20,151 @@ package org.apache.atlas.notification.hook; ...@@ -20,51 +20,151 @@ package org.apache.atlas.notification.hook;
import org.apache.atlas.notification.AbstractNotification; import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.entity.EntityNotificationImplTest; import org.apache.atlas.notification.entity.EntityNotificationImplTest;
import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest;
import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct; import org.apache.atlas.typesystem.Struct;
import org.apache.commons.lang3.RandomStringUtils;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertTrue;
/** /**
* HookMessageDeserializer tests. * HookMessageDeserializer tests.
*/ */
public class HookMessageDeserializerTest { public class HookMessageDeserializerTest {
HookMessageDeserializer deserializer = new HookMessageDeserializer();
@Test @Test
public void testDeserialize() throws Exception { public void testDeserialize() throws Exception {
HookMessageDeserializer deserializer = new HookMessageDeserializer(); Referenceable entity = generateEntityWithTrait();
EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
List<String> jsonMsgList = new ArrayList<>();
AbstractNotification.createNotificationMessages(message, jsonMsgList);
HookNotificationMessage deserializedMessage = deserialize(jsonMsgList);
assertEqualMessage(deserializedMessage, message);
}
// validate deserialization of legacy message, which doesn't use MessageVersion
@Test
public void testDeserializeLegacyMessage() throws Exception {
Referenceable entity = generateEntityWithTrait();
EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
String jsonMsg = AbstractNotification.GSON.toJson(message);
HookNotificationMessage deserializedMessage = deserializer.deserialize(jsonMsg);
assertEqualMessage(deserializedMessage, message);
}
@Test
public void testDeserializeCompressedMessage() throws Exception {
Referenceable entity = generateLargeEntityWithTrait();
EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
List<String> jsonMsgList = new ArrayList<>();
AbstractNotification.createNotificationMessages(message, jsonMsgList);
assertTrue(jsonMsgList.size() == 1);
String compressedMsg = jsonMsgList.get(0);
String uncompressedMsg = AbstractNotification.GSON.toJson(message);
assertTrue(compressedMsg.length() < uncompressedMsg.length(), "Compressed message (" + compressedMsg.length() + ") should be shorter than uncompressed message (" + uncompressedMsg.length() + ")");
HookNotificationMessage deserializedMessage = deserialize(jsonMsgList);
assertEqualMessage(deserializedMessage, message);
}
Referenceable entity = EntityNotificationImplTest.getEntity("id"); @Test
String traitName = "MyTrait"; public void testDeserializeSplitMessage() throws Exception {
List<IStruct> traitInfo = new LinkedList<>(); Referenceable entity = generateVeryLargeEntityWithTrait();
IStruct trait = new Struct(traitName, Collections.<String, Object>emptyMap()); EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
traitInfo.add(trait);
List<String> jsonMsgList = new ArrayList<>();
AbstractNotification.createNotificationMessages(message, jsonMsgList);
HookNotification.EntityUpdateRequest message = assertTrue(jsonMsgList.size() > 1);
new HookNotification.EntityUpdateRequest("user1", entity);
String json = AbstractNotification.getMessageJson(message); HookNotificationMessage deserializedMessage = deserialize(jsonMsgList);
HookNotification.HookNotificationMessage deserializedMessage = deserializer.deserialize(json); assertEqualMessage(deserializedMessage, message);
}
private Referenceable generateEntityWithTrait() {
Referenceable ret = EntityNotificationImplTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap()));
return ret;
}
private HookNotificationMessage deserialize(List<String> jsonMsgList) {
HookNotificationMessage deserializedMessage = null;
for (String jsonMsg : jsonMsgList) {
deserializedMessage = deserializer.deserialize(jsonMsg);
if (deserializedMessage != null) {
break;
}
}
return deserializedMessage;
}
private void assertEqualMessage(HookNotificationMessage deserializedMessage, EntityUpdateRequest message) throws Exception {
assertNotNull(deserializedMessage);
assertEquals(deserializedMessage.getType(), message.getType()); assertEquals(deserializedMessage.getType(), message.getType());
assertEquals(deserializedMessage.getUser(), message.getUser()); assertEquals(deserializedMessage.getUser(), message.getUser());
assertTrue(deserializedMessage instanceof HookNotification.EntityUpdateRequest); assertTrue(deserializedMessage instanceof EntityUpdateRequest);
HookNotification.EntityUpdateRequest deserializedEntityUpdateRequest = EntityUpdateRequest deserializedEntityUpdateRequest = (EntityUpdateRequest) deserializedMessage;
(HookNotification.EntityUpdateRequest) deserializedMessage; Referenceable deserializedEntity = deserializedEntityUpdateRequest.getEntities().get(0);
Referenceable entity = message.getEntities().get(0);
String traitName = entity.getTraits().get(0);
Referenceable deserializedEntity = deserializedEntityUpdateRequest.getEntities().get(0);
assertEquals(deserializedEntity.getId(), entity.getId()); assertEquals(deserializedEntity.getId(), entity.getId());
assertEquals(deserializedEntity.getTypeName(), entity.getTypeName()); assertEquals(deserializedEntity.getTypeName(), entity.getTypeName());
assertEquals(deserializedEntity.getTraits(), entity.getTraits()); assertEquals(deserializedEntity.getTraits(), entity.getTraits());
assertEquals(deserializedEntity.getTrait(traitName), entity.getTrait(traitName)); assertEquals(deserializedEntity.getTrait(traitName).hashCode(), entity.getTrait(traitName).hashCode());
}
private Referenceable generateLargeEntityWithTrait() {
Referenceable ret = EntityNotificationImplTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap()));
// add 100 attributes, each with value of size 10k
// Json Size=1,027,984; GZipped Size=16,387 ==> will compress, but not split
String attrValue = RandomStringUtils.randomAlphanumeric(10 * 1024); // use the same value for all attributes - to aid better compression
for (int i = 0; i < 100; i++) {
ret.set("attr_" + i, attrValue);
}
return ret;
}
private Referenceable generateVeryLargeEntityWithTrait() {
Referenceable ret = EntityNotificationImplTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap()));
// add 300 attributes, each with value of size 10k
// Json Size=3,082,384; GZipped Size=2,313,357 ==> will compress & split
for (int i = 0; i < 300; i++) {
ret.set("attr_" + i, RandomStringUtils.randomAlphanumeric(10 * 1024));
}
return ret;
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment