Commit b2ae1371 by Shwetha GS

ATLAS-631 Introduce Versioning to Atlas Notification Payload (tbeerbower via shwethags)

parent 73640cc6
......@@ -21,6 +21,7 @@ import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import org.apache.atlas.notification.AbstractNotificationConsumer;
import org.apache.atlas.notification.MessageDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -41,13 +42,16 @@ public class KafkaConsumer<T> extends AbstractNotificationConsumer<T> {
/**
* Create a Kafka consumer.
*
* @param type the notification type returned by this consumer
* @param stream the underlying Kafka stream
* @param consumerId an id value for this consumer
* @param type the notification type returned by this consumer
* @param deserializer the message deserializer used for this consumer
* @param stream the underlying Kafka stream
* @param consumerId an id value for this consumer
*/
public KafkaConsumer(Class<T> type, KafkaStream<String, String> stream, int consumerId) {
super(type);
this.iterator = stream.iterator();
public KafkaConsumer(Class<T> type,
MessageDeserializer<T> deserializer, KafkaStream<String, String> stream, int consumerId) {
super(deserializer);
this.iterator = stream.iterator();
this.consumerId = consumerId;
}
......
......@@ -28,6 +28,7 @@ import kafka.utils.Time;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.MessageDeserializer;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.service.Service;
......@@ -172,7 +173,10 @@ public class KafkaNotification extends AbstractNotification implements Service {
List<NotificationConsumer<T>> consumers = new ArrayList<>(numConsumers);
int consumerId = 0;
for (KafkaStream stream : kafkaConsumers) {
consumers.add(createKafkaConsumer(notificationType.getClassType(), stream, consumerId++));
KafkaConsumer<T> kafkaConsumer =
createKafkaConsumer(notificationType.getClassType(), notificationType.getDeserializer(),
stream, consumerId++);
consumers.add(kafkaConsumer);
}
consumerConnectors.add(consumerConnector);
......@@ -180,6 +184,22 @@ public class KafkaNotification extends AbstractNotification implements Service {
}
@Override
public void close() {
if (producer != null) {
producer.close();
producer = null;
}
for (ConsumerConnector consumerConnector : consumerConnectors) {
consumerConnector.shutdown();
}
consumerConnectors.clear();
}
// ----- AbstractNotification --------------------------------------------
@Override
public void sendInternal(NotificationType type, String... messages) throws NotificationException {
if (producer == null) {
createProducer();
......@@ -197,27 +217,13 @@ public class KafkaNotification extends AbstractNotification implements Service {
try {
RecordMetadata response = future.get();
LOG.debug("Sent message for topic - {}, partition - {}, offset - {}", response.topic(),
response.partition(), response.offset());
response.partition(), response.offset());
} catch (Exception e) {
throw new NotificationException(e);
}
}
}
@Override
public void close() {
if (producer != null) {
producer.close();
producer = null;
}
for (ConsumerConnector consumerConnector : consumerConnectors) {
consumerConnector.shutdown();
}
consumerConnectors.clear();
}
// ----- helper methods --------------------------------------------------
/**
......@@ -234,14 +240,17 @@ public class KafkaNotification extends AbstractNotification implements Service {
/**
* Create a Kafka consumer from the given Kafka stream.
*
* @param stream the Kafka stream
* @param consumerId the id for the new consumer
* @param type the notification type to be returned by the consumer
* @param deserializer the deserializer for the created consumers
* @param stream the Kafka stream
* @param consumerId the id for the new consumer
*
* @return a new Kafka consumer
*/
protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T> type, KafkaStream stream,
int consumerId) {
return new org.apache.atlas.kafka.KafkaConsumer<T>(type, stream, consumerId);
protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T> type,
MessageDeserializer<T> deserializer, KafkaStream stream,
int consumerId) {
return new org.apache.atlas.kafka.KafkaConsumer<T>(type, deserializer, stream, consumerId);
}
// Get properties for consumer request
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
import com.google.gson.reflect.TypeToken;
import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.slf4j.Logger;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Base notification message deserializer.
*/
public abstract class AbstractMessageDeserializer<T> extends VersionedMessageDeserializer<T> {
private static final Map<Type, JsonDeserializer> DESERIALIZER_MAP = new HashMap<>();
static {
DESERIALIZER_MAP.put(ImmutableList.class, new ImmutableListDeserializer());
DESERIALIZER_MAP.put(ImmutableMap.class, new ImmutableMapDeserializer());
DESERIALIZER_MAP.put(JSONArray.class, new JSONArrayDeserializer());
DESERIALIZER_MAP.put(IStruct.class, new StructDeserializer());
DESERIALIZER_MAP.put(IReferenceableInstance.class, new ReferenceableDeserializer());
DESERIALIZER_MAP.put(Referenceable.class, new ReferenceableDeserializer());
}
// ----- Constructors ----------------------------------------------------
/**
* Create a deserializer.
*
* @param versionedMessageType the type of the versioned message
* @param expectedVersion the expected message version
* @param deserializerMap map of individual deserializers used to define this message deserializer
* @param notificationLogger logger for message version mismatch
*/
public AbstractMessageDeserializer(Type versionedMessageType,
MessageVersion expectedVersion,
Map<Type, JsonDeserializer> deserializerMap,
Logger notificationLogger) {
super(versionedMessageType, expectedVersion, getDeserializer(deserializerMap), notificationLogger);
}
// ----- helper methods --------------------------------------------------
private static Gson getDeserializer(Map<Type, JsonDeserializer> deserializerMap) {
GsonBuilder builder = new GsonBuilder();
for (Map.Entry<Type, JsonDeserializer> entry : DESERIALIZER_MAP.entrySet()) {
builder.registerTypeAdapter(entry.getKey(), entry.getValue());
}
for (Map.Entry<Type, JsonDeserializer> entry : deserializerMap.entrySet()) {
builder.registerTypeAdapter(entry.getKey(), entry.getValue());
}
return builder.create();
}
// ----- deserializer classes --------------------------------------------
/**
* Deserializer for ImmutableList.
*/
protected static class ImmutableListDeserializer implements JsonDeserializer<ImmutableList<?>> {
public static final Type LIST_TYPE = new TypeToken<List<?>>() {
}.getType();
@Override
public ImmutableList<?> deserialize(JsonElement json, Type type,
JsonDeserializationContext context) {
final List<?> list = context.deserialize(json, LIST_TYPE);
return ImmutableList.copyOf(list);
}
}
/**
* Deserializer for ImmutableMap.
*/
protected static class ImmutableMapDeserializer implements JsonDeserializer<ImmutableMap<?, ?>> {
public static final Type MAP_TYPE = new TypeToken<Map<?, ?>>() {
}.getType();
@Override
public ImmutableMap<?, ?> deserialize(JsonElement json, Type type,
JsonDeserializationContext context) {
final Map<?, ?> map = context.deserialize(json, MAP_TYPE);
return ImmutableMap.copyOf(map);
}
}
/**
* Deserializer for JSONArray.
*/
protected static final class JSONArrayDeserializer implements JsonDeserializer<JSONArray> {
@Override
public JSONArray deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) {
try {
return new JSONArray(json.toString());
} catch (JSONException e) {
throw new JsonParseException(e.getMessage(), e);
}
}
}
/**
* Deserializer for Struct.
*/
protected static final class StructDeserializer implements JsonDeserializer<IStruct> {
@Override
public IStruct deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) {
return context.deserialize(json, Struct.class);
}
}
/**
* Deserializer for Referenceable.
*/
protected static final class ReferenceableDeserializer implements JsonDeserializer<IReferenceableInstance> {
@Override
public IReferenceableInstance deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) {
return InstanceSerialization.fromJsonReferenceable(json.toString(), true);
}
}
}
......@@ -17,10 +17,22 @@
*/
package org.apache.atlas.notification;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.commons.configuration.Configuration;
import org.codehaus.jettison.json.JSONArray;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.List;
......@@ -29,12 +41,26 @@ import java.util.List;
*/
public abstract class AbstractNotification implements NotificationInterface {
/**
* The current expected version for notification messages.
*/
public static final MessageVersion CURRENT_MESSAGE_VERSION = new MessageVersion("1.0.0");
private static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded";
private final boolean embedded;
private final boolean isHAEnabled;
/**
* Used for message serialization.
*/
public static final Gson GSON = new GsonBuilder().
registerTypeAdapter(IReferenceableInstance.class, new ReferenceableSerializer()).
registerTypeAdapter(Referenceable.class, new ReferenceableSerializer()).
registerTypeAdapter(JSONArray.class, new JSONArraySerializer()).
create();
// ----- Constructors ------------------------------------------------------
// ----- Constructors ----------------------------------------------------
public AbstractNotification(Configuration applicationProperties) throws AtlasException {
this.embedded = applicationProperties.getBoolean(PROPERTY_EMBEDDED, false);
......@@ -42,7 +68,23 @@ public abstract class AbstractNotification implements NotificationInterface {
}
// ----- AbstractNotificationInterface -------------------------------------
// ----- NotificationInterface -------------------------------------------
@Override
public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
String[] strMessages = new String[messages.size()];
for (int index = 0; index < messages.size(); index++) {
strMessages[index] = getMessageJson(messages.get(index));
}
sendInternal(type, strMessages);
}
@Override
public <T> void send(NotificationType type, T... messages) throws NotificationException {
send(type, Arrays.asList(messages));
}
// ----- AbstractNotification --------------------------------------------
/**
* Determine whether or not the notification service embedded in Atlas server.
......@@ -53,23 +95,62 @@ public abstract class AbstractNotification implements NotificationInterface {
return embedded;
}
/**
* Determine whether or not the high availability feature is enabled.
*
* @return true if the high availability feature is enabled.
*/
protected final boolean isHAEnabled() {
return isHAEnabled;
}
@Override
public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
String[] strMessages = new String[messages.size()];
for (int index = 0; index < messages.size(); index++) {
strMessages[index] = AbstractNotificationConsumer.GSON.toJson(messages.get(index));
}
sendInternal(type, strMessages);
/**
* Send the given messages.
*
* @param type the message type
* @param messages the array of messages to send
*
* @throws NotificationException if an error occurs while sending
*/
protected abstract void sendInternal(NotificationType type, String[] messages) throws NotificationException;
// ----- utility methods -------------------------------------------------
/**
* Get the notification message JSON from the given object.
*
* @param message the message in object form
*
* @return the message as a JSON string
*/
public static String getMessageJson(Object message) {
VersionedMessage<?> versionedMessage = new VersionedMessage<>(CURRENT_MESSAGE_VERSION, message);
return GSON.toJson(versionedMessage);
}
@Override
public <T> void send(NotificationType type, T... messages) throws NotificationException {
send(type, Arrays.asList(messages));
// ----- serializers -----------------------------------------------------
/**
* Serializer for Referenceable.
*/
public static final class ReferenceableSerializer implements JsonSerializer<IReferenceableInstance> {
@Override
public JsonElement serialize(IReferenceableInstance src, Type typeOfSrc, JsonSerializationContext context) {
String instanceJson = InstanceSerialization.toJson(src, true);
return new JsonParser().parse(instanceJson).getAsJsonObject();
}
}
protected abstract void sendInternal(NotificationType type, String[] messages) throws NotificationException;
/**
* Serializer for JSONArray.
*/
public static final class JSONArraySerializer implements JsonSerializer<JSONArray> {
@Override
public JsonElement serialize(JSONArray src, Type typeOfSrc, JsonSerializationContext context) {
return new JsonParser().parse(src.toString()).getAsJsonArray();
}
}
}
......@@ -17,50 +17,15 @@
*/
package org.apache.atlas.notification;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
import com.google.gson.JsonParser;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import com.google.gson.reflect.TypeToken;
import org.apache.atlas.notification.entity.EntityNotification;
import org.apache.atlas.notification.entity.EntityNotificationImpl;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
/**
* Abstract notification consumer.
*/
public abstract class AbstractNotificationConsumer<T> implements NotificationConsumer<T> {
public static final Gson GSON = new GsonBuilder().
registerTypeAdapter(ImmutableList.class, new ImmutableListDeserializer()).
registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer()).
registerTypeAdapter(EntityNotification.class, new EntityNotificationDeserializer()).
registerTypeAdapter(IStruct.class, new StructDeserializer()).
registerTypeAdapter(IReferenceableInstance.class, new ReferenceableSerializerDeserializer()).
registerTypeAdapter(Referenceable.class, new ReferenceableSerializerDeserializer()).
registerTypeAdapter(JSONArray.class, new JSONArraySerializerDeserializer()).
registerTypeAdapter(HookNotification.HookNotificationMessage.class, new HookNotification()).
create();
private final Class<T> type;
/**
* Deserializer used to deserialize notification messages for this consumer.
*/
private final MessageDeserializer<T> deserializer;
// ----- Constructors ----------------------------------------------------
......@@ -68,10 +33,10 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon
/**
* Construct an AbstractNotificationConsumer.
*
* @param type the notification type
* @param deserializer the message deserializer used by this consumer
*/
public AbstractNotificationConsumer(Class<T> type) {
this.type = type;
public AbstractNotificationConsumer(MessageDeserializer<T> deserializer) {
this.deserializer = deserializer;
}
......@@ -96,112 +61,11 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon
@Override
public T next() {
return GSON.fromJson(getNext(), type);
return deserializer.deserialize(getNext());
}
@Override
public T peek() {
return GSON.fromJson(peekMessage(), type);
}
/**
* Deserializer for ImmutableList used by AbstractNotificationConsumer.GSON.
*/
public static class ImmutableListDeserializer implements JsonDeserializer<ImmutableList<?>> {
public static final Type LIST_TYPE = new TypeToken<List<?>>() {
}.getType();
@Override
public ImmutableList<?> deserialize(JsonElement json, Type type,
JsonDeserializationContext context) {
final List<?> list = context.deserialize(json, LIST_TYPE);
return ImmutableList.copyOf(list);
}
}
/**
* Deserializer for ImmutableMap used by AbstractNotificationConsumer.GSON.
*/
public static class ImmutableMapDeserializer implements JsonDeserializer<ImmutableMap<?, ?>> {
public static final Type MAP_TYPE = new TypeToken<Map<?, ?>>() {
}.getType();
@Override
public ImmutableMap<?, ?> deserialize(JsonElement json, Type type,
JsonDeserializationContext context) {
final Map<?, ?> map = context.deserialize(json, MAP_TYPE);
return ImmutableMap.copyOf(map);
}
}
/**
* Deserializer for EntityNotification used by AbstractNotificationConsumer.GSON.
*/
public static final class EntityNotificationDeserializer implements JsonDeserializer<EntityNotification> {
@Override
public EntityNotification deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) {
return context.deserialize(json, EntityNotificationImpl.class);
}
}
/**
* Serde for Struct used by AbstractNotificationConsumer.GSON.
*/
public static final class StructDeserializer implements JsonDeserializer<IStruct>, JsonSerializer<IStruct> {
@Override
public IStruct deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) {
return context.deserialize(json, Struct.class);
}
@Override
public JsonElement serialize(IStruct src, Type typeOfSrc, JsonSerializationContext context) {
String instanceJson = InstanceSerialization.toJson(src, true);
return new JsonParser().parse(instanceJson).getAsJsonObject();
}
}
/**
* Serde for Referenceable used by AbstractNotificationConsumer.GSON.
*/
public static final class ReferenceableSerializerDeserializer implements JsonDeserializer<IStruct>,
JsonSerializer<IReferenceableInstance> {
@Override
public IReferenceableInstance deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) {
return InstanceSerialization.fromJsonReferenceable(json.toString(), true);
}
@Override
public JsonElement serialize(IReferenceableInstance src, Type typeOfSrc, JsonSerializationContext context) {
String instanceJson = InstanceSerialization.toJson(src, true);
return new JsonParser().parse(instanceJson).getAsJsonObject();
}
}
/**
* Serde for JSONArray used by AbstractNotificationConsumer.GSON.
*/
public static final class JSONArraySerializerDeserializer implements JsonDeserializer<JSONArray>,
JsonSerializer<JSONArray> {
@Override
public JSONArray deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) {
try {
return new JSONArray(json.toString());
} catch (JSONException e) {
throw new JsonParseException(e.getMessage(), e);
}
}
@Override
public JsonElement serialize(JSONArray src, Type typeOfSrc, JsonSerializationContext context) {
return new JsonParser().parse(src.toString()).getAsJsonArray();
}
return deserializer.deserialize(peekMessage());
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
/**
* Exception thrown when notification message is consumed that has a version that is incompatable with
* the expected version.
*/
public class IncompatibleVersionException extends RuntimeException {
// ----- Constructors ----------------------------------------------------
public IncompatibleVersionException(String message) {
super(message);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
/**
* Deserializer for JSON messages.
*/
public interface MessageDeserializer<T> {
/**
* Get a message of type T from the given JSON message string.
*
* @param json the JSON message
*
* @return the message deserialized from the given JSON
*/
T deserialize(String json);
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import java.util.ArrayList;
import java.util.Arrays;
/**
* Represents the version of a notification message.
*/
public class MessageVersion implements Comparable<MessageVersion> {
/**
* Used for message with no version (old format).
*/
public static final MessageVersion NO_VERSION = new MessageVersion("0");
private final String version;
// ----- Constructors ----------------------------------------------------
/**
* Create a message version.
*
* @param version the version string
*/
public MessageVersion(String version) {
this.version = version;
try {
getVersionParts();
} catch (NumberFormatException e) {
throw new IllegalArgumentException(String.format("Invalid version string : %s.", version), e);
}
}
// ----- Comparable ------------------------------------------------------
@Override
public int compareTo(MessageVersion that) {
if (that == null) {
return 1;
}
Integer[] thisParts = getVersionParts();
Integer[] thatParts = that.getVersionParts();
int length = Math.max(thisParts.length, thatParts.length);
for (int i = 0; i < length; i++) {
int comp = getVersionPart(thisParts, i) - getVersionPart(thatParts, i);
if (comp != 0) {
return comp;
}
}
return 0;
}
// ----- Object overrides ------------------------------------------------
@Override
public boolean equals(Object that) {
if (this == that){
return true;
}
if (that == null || getClass() != that.getClass()) {
return false;
}
return compareTo((MessageVersion) that) == 0;
}
@Override
public int hashCode() {
return Arrays.hashCode(getVersionParts());
}
// ----- helper methods --------------------------------------------------
/**
* Get the version parts array by splitting the version string.
* Strip the trailing zeros (i.e. '1.0.0' equals '1').
*
* @return the version parts array
*/
protected Integer[] getVersionParts() {
String[] sParts = version.split("\\.");
ArrayList<Integer> iParts = new ArrayList<>();
int trailingZeros = 0;
for (String sPart : sParts) {
Integer iPart = new Integer(sPart);
if (iPart == 0) {
++trailingZeros;
} else {
for (int i = 0; i < trailingZeros; ++i) {
iParts.add(0);
}
trailingZeros = 0;
iParts.add(iPart);
}
}
return iParts.toArray(new Integer[iParts.size()]);
}
private Integer getVersionPart(Integer[] versionParts, int i) {
return i < versionParts.length ? versionParts[i] : 0;
}
}
......@@ -17,9 +17,13 @@
*/
package org.apache.atlas.notification;
import org.apache.atlas.notification.entity.EntityMessageDeserializer;
import org.apache.atlas.notification.entity.EntityNotification;
import org.apache.atlas.notification.hook.HookMessageDeserializer;
import org.apache.atlas.notification.hook.HookNotification;
import com.google.gson.reflect.TypeToken;
import java.lang.reflect.Type;
import java.util.List;
/**
......@@ -37,25 +41,59 @@ public interface NotificationInterface {
String PROPERTY_PREFIX = "atlas.notification";
/**
* Notification message class types.
*/
Class<HookNotification.HookNotificationMessage> HOOK_NOTIFICATION_CLASS =
HookNotification.HookNotificationMessage.class;
Class<EntityNotification> ENTITY_NOTIFICATION_CLASS = EntityNotification.class;
/**
* Versioned notification message class types.
*/
Type HOOK_VERSIONED_MESSAGE_TYPE =
new TypeToken<VersionedMessage<HookNotification.HookNotificationMessage>>(){}.getType();
Type ENTITY_VERSIONED_MESSAGE_TYPE = new TypeToken<VersionedMessage<EntityNotification>>(){}.getType();
/**
* Atlas notification types.
*/
enum NotificationType {
HOOK(HookNotification.HookNotificationMessage.class), // notifications from the Atlas integration hook producers
ENTITIES(EntityNotification.class); // notifications to entity change consumers
// Notifications from the Atlas integration hooks.
HOOK(HOOK_NOTIFICATION_CLASS, new HookMessageDeserializer()),
// Notifications to entity change consumers.
ENTITIES(ENTITY_NOTIFICATION_CLASS, new EntityMessageDeserializer());
/**
* The notification class associated with this type.
*/
private final Class classType;
NotificationType(Class classType) {
/**
* The message deserializer for this type.
*/
private final MessageDeserializer deserializer;
NotificationType(Class classType, MessageDeserializer<?> deserializer) {
this.classType = classType;
this.deserializer = deserializer;
}
// ----- accessors ---------------------------------------------------
public Class getClassType() {
return classType;
}
public MessageDeserializer getDeserializer() {
return deserializer;
}
}
/**
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
/**
* Represents a notification message that is associated with a version.
*/
public class VersionedMessage<T> {
/**
* The version of the message.
*/
private final MessageVersion version;
/**
* The actual message.
*/
private final T message;
// ----- Constructors ----------------------------------------------------
/**
* Create a versioned message.
*
* @param version the message version
* @param message the actual message
*/
public VersionedMessage(MessageVersion version, T message) {
this.version = version;
this.message = message;
}
// ----- VersionedMessage ------------------------------------------------
/**
* Compare the version of this message with the given version.
*
* @param compareToVersion the version to compare to
*
* @return a negative integer, zero, or a positive integer as this message's version is less than, equal to,
* or greater than the given version.
*/
public int compareVersion(MessageVersion compareToVersion) {
return version.compareTo(compareToVersion);
}
// ----- accessors -------------------------------------------------------
public MessageVersion getVersion() {
return version;
}
public T getMessage() {
return message;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import com.google.gson.Gson;
import org.slf4j.Logger;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
/**
* Deserializer that works with versioned messages. The version of each deserialized message is checked against an
* expected version.
*/
public abstract class VersionedMessageDeserializer<T> implements MessageDeserializer<T> {
public static final String VERSION_MISMATCH_MSG =
"Notification message version mismatch. Expected %s but recieved %s";
private final Type versionedMessageType;
private final MessageVersion expectedVersion;
private final Logger notificationLogger;
private final Gson gson;
// ----- Constructors ----------------------------------------------------
/**
* Create a versioned message deserializer.
*
* @param versionedMessageType the type of the versioned message
* @param expectedVersion the expected message version
* @param gson JSON serialization/deserialization
* @param notificationLogger logger for message version mismatch
*/
public VersionedMessageDeserializer(Type versionedMessageType, MessageVersion expectedVersion,
Gson gson, Logger notificationLogger) {
this.versionedMessageType = versionedMessageType;
this.expectedVersion = expectedVersion;
this.gson = gson;
this.notificationLogger = notificationLogger;
}
// ----- MessageDeserializer ---------------------------------------------
@Override
public T deserialize(String messageJson) {
VersionedMessage<T> versionedMessage = gson.fromJson(messageJson, versionedMessageType);
// older style messages not wrapped with VersionedMessage
if (versionedMessage.getVersion() == null) {
Type t = ((ParameterizedType) versionedMessageType).getActualTypeArguments()[0];
versionedMessage = new VersionedMessage<>(MessageVersion.NO_VERSION, gson.<T>fromJson(messageJson, t));
}
checkVersion(versionedMessage, messageJson);
return versionedMessage.getMessage();
}
// ----- helper methods --------------------------------------------------
/**
* Check the message version against the expected version.
*
* @param versionedMessage the versioned message
* @param messageJson the notification message json
*
* @throws IncompatibleVersionException if the message version is incompatable with the expected version
*/
protected void checkVersion(VersionedMessage<T> versionedMessage, String messageJson) {
int comp = versionedMessage.compareVersion(expectedVersion);
// message has newer version
if (comp > 0) {
String msg = String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion());
notificationLogger.error(msg);
notificationLogger.info(messageJson);
throw new IncompatibleVersionException(msg);
}
// message has older version
if (comp < 0) {
notificationLogger.info(
String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion()));
notificationLogger.info(messageJson);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.entity;
import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import org.apache.atlas.notification.AbstractMessageDeserializer;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.NotificationInterface;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.Map;
/**
* Entity notification message deserializer.
*/
public class EntityMessageDeserializer extends AbstractMessageDeserializer<EntityNotification> {
/**
* Logger for entity notification messages.
*/
private static final Logger NOTIFICATION_LOGGER = LoggerFactory.getLogger(EntityMessageDeserializer.class);
// ----- Constructors ----------------------------------------------------
/**
* Create an entity notification message deserializer.
*/
public EntityMessageDeserializer() {
super(NotificationInterface.ENTITY_VERSIONED_MESSAGE_TYPE,
AbstractNotification.CURRENT_MESSAGE_VERSION, getDeserializerMap(), NOTIFICATION_LOGGER);
}
// ----- helper methods --------------------------------------------------
private static Map<Type, JsonDeserializer> getDeserializerMap() {
return Collections.<Type, JsonDeserializer>singletonMap(
NotificationInterface.ENTITY_NOTIFICATION_CLASS, new EntityNotificationDeserializer());
}
// ----- deserializer classes --------------------------------------------
/**
* Deserializer for EntityNotification.
*/
protected static final class EntityNotificationDeserializer implements JsonDeserializer<EntityNotification> {
@Override
public EntityNotification deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) {
return context.deserialize(json, EntityNotificationImpl.class);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.hook;
import com.google.gson.JsonDeserializer;
import org.apache.atlas.notification.AbstractMessageDeserializer;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.NotificationInterface;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.Map;
/**
* Hook notification message deserializer.
*/
public class HookMessageDeserializer extends AbstractMessageDeserializer<HookNotification.HookNotificationMessage> {
/**
* Logger for hook notification messages.
*/
private static final Logger NOTIFICATION_LOGGER = LoggerFactory.getLogger(HookMessageDeserializer.class);
// ----- Constructors ----------------------------------------------------
/**
* Create a hook notification message deserializer.
*/
public HookMessageDeserializer() {
super(NotificationInterface.HOOK_VERSIONED_MESSAGE_TYPE,
AbstractNotification.CURRENT_MESSAGE_VERSION, getDeserializerMap(), NOTIFICATION_LOGGER);
}
// ----- helper methods --------------------------------------------------
private static Map<Type, JsonDeserializer> getDeserializerMap() {
return Collections.<Type, JsonDeserializer>singletonMap(
NotificationInterface.HOOK_NOTIFICATION_CLASS, new HookNotification());
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.kafka;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.MessageVersion;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.IncompatibleVersionException;
import org.apache.atlas.notification.VersionedMessage;
import org.apache.atlas.notification.entity.EntityNotificationImplTest;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.codehaus.jettison.json.JSONException;
import org.testng.annotations.Test;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.*;
/**
* KafkaConsumer tests.
*/
public class KafkaConsumerTest {
private static final String TRAIT_NAME = "MyTrait";
@Test
public void testNext() throws Exception {
KafkaStream<String, String> stream = mock(KafkaStream.class);
ConsumerIterator<String, String> iterator = mock(ConsumerIterator.class);
MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class);
Referenceable entity = getEntity(TRAIT_NAME);
HookNotification.EntityUpdateRequest message =
new HookNotification.EntityUpdateRequest("user1", entity);
String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), message));
when(stream.iterator()).thenReturn(iterator);
when(iterator.hasNext()).thenReturn(true).thenReturn(false);
when(iterator.next()).thenReturn(messageAndMetadata).thenThrow(new NoSuchElementException());
when(messageAndMetadata.message()).thenReturn(json);
NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
new KafkaConsumer<>(NotificationInterface.NotificationType.HOOK.getClassType(),
NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99);
assertTrue(consumer.hasNext());
HookNotification.HookNotificationMessage consumedMessage = consumer.next();
assertMessagesEqual(message, consumedMessage, entity);
assertFalse(consumer.hasNext());
}
@Test
public void testNextVersionMismatch() throws Exception {
KafkaStream<String, String> stream = mock(KafkaStream.class);
ConsumerIterator<String, String> iterator = mock(ConsumerIterator.class);
MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class);
Referenceable entity = getEntity(TRAIT_NAME);
HookNotification.EntityUpdateRequest message =
new HookNotification.EntityUpdateRequest("user1", entity);
String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), message));
when(stream.iterator()).thenReturn(iterator);
when(iterator.hasNext()).thenReturn(true).thenReturn(false);
when(iterator.next()).thenReturn(messageAndMetadata).thenThrow(new NoSuchElementException());
when(messageAndMetadata.message()).thenReturn(json);
NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
new KafkaConsumer<>(NotificationInterface.NotificationType.HOOK.getClassType(),
NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99);
assertTrue(consumer.hasNext());
try {
consumer.next();
fail("Expected VersionMismatchException!");
} catch (IncompatibleVersionException e) {
e.printStackTrace();
}
assertFalse(consumer.hasNext());
}
@Test
public void testPeekMessage() throws Exception {
KafkaStream<String, String> stream = mock(KafkaStream.class);
ConsumerIterator<String, String> iterator = mock(ConsumerIterator.class);
MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class);
Referenceable entity = getEntity(TRAIT_NAME);
HookNotification.EntityUpdateRequest message =
new HookNotification.EntityUpdateRequest("user1", entity);
String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), message));
when(stream.iterator()).thenReturn(iterator);
when(iterator.hasNext()).thenReturn(true);
when(iterator.peek()).thenReturn(messageAndMetadata);
when(messageAndMetadata.message()).thenReturn(json);
NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
new KafkaConsumer<>(NotificationInterface.NotificationType.HOOK.getClassType(),
NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99);
assertTrue(consumer.hasNext());
HookNotification.HookNotificationMessage consumedMessage = consumer.peek();
assertMessagesEqual(message, consumedMessage, entity);
assertTrue(consumer.hasNext());
}
private Referenceable getEntity(String traitName) {
Referenceable entity = EntityNotificationImplTest.getEntity("id");
List<IStruct> traitInfo = new LinkedList<>();
IStruct trait = new Struct(traitName, Collections.<String, Object>emptyMap());
traitInfo.add(trait);
return entity;
}
private void assertMessagesEqual(HookNotification.EntityUpdateRequest message,
HookNotification.HookNotificationMessage consumedMessage,
Referenceable entity) throws JSONException {
assertEquals(consumedMessage.getType(), message.getType());
assertEquals(consumedMessage.getUser(), message.getUser());
assertTrue(consumedMessage instanceof HookNotification.EntityUpdateRequest);
HookNotification.EntityUpdateRequest deserializedEntityUpdateRequest =
(HookNotification.EntityUpdateRequest) consumedMessage;
Referenceable deserializedEntity = deserializedEntityUpdateRequest.getEntities().get(0);
assertEquals(deserializedEntity.getId(), entity.getId());
assertEquals(deserializedEntity.getTypeName(), entity.getTypeName());
assertEquals(deserializedEntity.getTraits(), entity.getTraits());
assertEquals(deserializedEntity.getTrait(TRAIT_NAME), entity.getTrait(TRAIT_NAME));
}
}
......@@ -22,6 +22,7 @@ import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.MessageDeserializer;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule;
......@@ -130,10 +131,12 @@ public class KafkaNotificationTest {
}
@Override
protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T> type, KafkaStream stream,
protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T> type,
MessageDeserializer<T> deserializer,
KafkaStream stream,
int consumerId) {
kafkaStreams.add(stream);
return super.createKafkaConsumer(type, stream, consumerId);
return super.createKafkaConsumer(type, deserializer, stream, consumerId);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.slf4j.Logger;
import org.testng.annotations.Test;
import java.lang.reflect.Type;
import java.util.LinkedList;
import java.util.List;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.*;
/**
* AbstractNotificationConsumer tests.
*/
public class AbstractNotificationConsumerTest {
private static final Gson GSON = new Gson();
@Test
public void testNext() throws Exception {
Logger logger = mock(Logger.class);
TestMessage testMessage1 = new TestMessage("sValue1", 99);
TestMessage testMessage2 = new TestMessage("sValue2", 98);
TestMessage testMessage3 = new TestMessage("sValue3", 97);
TestMessage testMessage4 = new TestMessage("sValue4", 96);
List<String> jsonList = new LinkedList<>();
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1)));
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage2)));
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage3)));
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage4)));
Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
assertTrue(consumer.hasNext());
assertEquals(testMessage1, consumer.next());
assertTrue(consumer.hasNext());
assertEquals(testMessage2, consumer.next());
assertTrue(consumer.hasNext());
assertEquals(testMessage3, consumer.next());
assertTrue(consumer.hasNext());
assertEquals(testMessage4, consumer.next());
assertFalse(consumer.hasNext());
}
@Test
public void testNextBackVersion() throws Exception {
Logger logger = mock(Logger.class);
TestMessage testMessage1 = new TestMessage("sValue1", 99);
TestMessage testMessage2 = new TestMessage("sValue2", 98);
TestMessage testMessage3 = new TestMessage("sValue3", 97);
TestMessage testMessage4 = new TestMessage("sValue4", 96);
List<String> jsonList = new LinkedList<>();
String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1));
String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("0.0.5"), testMessage2));
String json3 = GSON.toJson(new VersionedMessage<>(new MessageVersion("0.5.0"), testMessage3));
String json4 = GSON.toJson(testMessage4);
jsonList.add(json1);
jsonList.add(json2);
jsonList.add(json3);
jsonList.add(json4);
Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
assertTrue(consumer.hasNext());
assertEquals(new TestMessage("sValue1", 99), consumer.next());
assertTrue(consumer.hasNext());
assertEquals(new TestMessage("sValue2", 98), consumer.next());
verify(logger).info(json2);
assertTrue(consumer.hasNext());
assertEquals(new TestMessage("sValue3", 97), consumer.next());
verify(logger).info(json3);
assertTrue(consumer.hasNext());
assertEquals(new TestMessage("sValue4", 96), consumer.next());
verify(logger).info(json4);
assertFalse(consumer.hasNext());
}
@Test
public void testNextForwardVersion() throws Exception {
Logger logger = mock(Logger.class);
TestMessage testMessage1 = new TestMessage("sValue1", 99);
TestMessage testMessage2 = new TestMessage("sValue2", 98);
List<String> jsonList = new LinkedList<>();
String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1));
String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), testMessage2));
jsonList.add(json1);
jsonList.add(json2);
Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
assertTrue(consumer.hasNext());
assertEquals(testMessage1, consumer.next());
assertTrue(consumer.hasNext());
try {
consumer.next();
fail("Expected VersionMismatchException!");
} catch (IncompatibleVersionException e) {
verify(logger).info(json2);
}
assertFalse(consumer.hasNext());
}
@Test
public void testPeek() throws Exception {
Logger logger = mock(Logger.class);
TestMessage testMessage1 = new TestMessage("sValue1", 99);
TestMessage testMessage2 = new TestMessage("sValue2", 98);
TestMessage testMessage3 = new TestMessage("sValue3", 97);
TestMessage testMessage4 = new TestMessage("sValue4", 96);
List<String> jsonList = new LinkedList<>();
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1)));
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage2)));
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage3)));
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage4)));
Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
assertTrue(consumer.hasNext());
assertEquals(testMessage1, consumer.peek());
assertTrue(consumer.hasNext());
assertEquals(testMessage1, consumer.peek());
assertTrue(consumer.hasNext());
}
private static class TestMessage {
private String s;
private int i;
public TestMessage(String s, int i) {
this.s = s;
this.i = i;
}
public String getS() {
return s;
}
public void setS(String s) {
this.s = s;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TestMessage that = (TestMessage) o;
return i == that.i && (s != null ? s.equals(that.s) : that.s == null);
}
@Override
public int hashCode() {
int result = s != null ? s.hashCode() : 0;
result = 31 * result + i;
return result;
}
}
private static class TestNotificationConsumer<T> extends AbstractNotificationConsumer<T> {
private final List<String> messageList;
private int index = 0;
public TestNotificationConsumer(Type versionedMessageType, List<String> messages, Logger logger) {
super(new TestDeserializer<T>(versionedMessageType, logger));
this.messageList = messages;
}
@Override
protected String getNext() {
return messageList.get(index++);
}
@Override
protected String peekMessage() {
return messageList.get(index);
}
@Override
public boolean hasNext() {
return index < messageList.size();
}
}
private static final class TestDeserializer<T> extends VersionedMessageDeserializer<T> {
private TestDeserializer(Type versionedMessageType, Logger logger) {
super(versionedMessageType, AbstractNotification.CURRENT_MESSAGE_VERSION, GSON, logger);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.commons.configuration.Configuration;
import org.testng.annotations.Test;
import java.util.LinkedList;
import java.util.List;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.*;
/**
* AbstractNotification tests.
*/
public class AbstractNotificationTest {
@Test
public void testSend() throws Exception {
Configuration configuration = mock(Configuration.class);
TestNotification notification = new TestNotification(configuration);
TestMessage message1 = new TestMessage(HookNotification.HookNotificationType.ENTITY_CREATE, "user1");
TestMessage message2 = new TestMessage(HookNotification.HookNotificationType.TYPE_CREATE, "user1");
TestMessage message3 = new TestMessage(HookNotification.HookNotificationType.ENTITY_FULL_UPDATE, "user1");
String messageJson1 = AbstractNotification.getMessageJson(message1);
String messageJson2 = AbstractNotification.getMessageJson(message2);
String messageJson3 = AbstractNotification.getMessageJson(message3);
notification.send(NotificationInterface.NotificationType.HOOK, message1, message2, message3);
assertEquals(NotificationInterface.NotificationType.HOOK, notification.type);
assertEquals(3, notification.messages.length);
assertEquals(messageJson1, notification.messages[0]);
assertEquals(messageJson2, notification.messages[1]);
assertEquals(messageJson3, notification.messages[2]);
}
@Test
public void testSend2() throws Exception {
Configuration configuration = mock(Configuration.class);
TestNotification notification = new TestNotification(configuration);
TestMessage message1 = new TestMessage(HookNotification.HookNotificationType.ENTITY_CREATE, "user1");
TestMessage message2 = new TestMessage(HookNotification.HookNotificationType.TYPE_CREATE, "user1");
TestMessage message3 = new TestMessage(HookNotification.HookNotificationType.ENTITY_FULL_UPDATE, "user1");
List<TestMessage> messages = new LinkedList<>();
messages.add(message1);
messages.add(message2);
messages.add(message3);
String messageJson1 = AbstractNotification.getMessageJson(message1);
String messageJson2 = AbstractNotification.getMessageJson(message2);
String messageJson3 = AbstractNotification.getMessageJson(message3);
notification.send(NotificationInterface.NotificationType.HOOK, messages);
assertEquals(NotificationInterface.NotificationType.HOOK, notification.type);
assertEquals(3, notification.messages.length);
assertEquals(messageJson1, notification.messages[0]);
assertEquals(messageJson2, notification.messages[1]);
assertEquals(messageJson3, notification.messages[2]);
}
public static class TestMessage extends HookNotification.HookNotificationMessage {
public TestMessage(HookNotification.HookNotificationType type, String user) {
super(type, user);
}
}
public static class TestNotification extends AbstractNotification {
private NotificationType type;
private String[] messages;
public TestNotification(Configuration applicationProperties) throws AtlasException {
super(applicationProperties);
}
@Override
protected void sendInternal(NotificationType notificationType, String[] notificationMessages)
throws NotificationException {
type = notificationType;
messages = notificationMessages;
}
@Override
public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers) {
return null;
}
@Override
public void close() {
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import org.testng.annotations.Test;
import java.util.Arrays;
import static org.testng.Assert.*;
/**
* MessageVersion tests.
*/
public class MessageVersionTest {
@Test
public void testConstructor() throws Exception {
new MessageVersion("1.0.0");
try {
new MessageVersion("foo");
fail("Expected IllegalArgumentException");
} catch (IllegalArgumentException e) {
// expected
}
try {
new MessageVersion("A.0.0");
fail("Expected IllegalArgumentException");
} catch (IllegalArgumentException e) {
// expected
}
try {
new MessageVersion("1.0.0a");
fail("Expected IllegalArgumentException");
} catch (IllegalArgumentException e) {
// expected
}
}
@Test
public void testCompareTo() throws Exception {
MessageVersion version1 = new MessageVersion("1.0.0");
MessageVersion version2 = new MessageVersion("1.0.0");
MessageVersion version3 = new MessageVersion("2.0.0");
MessageVersion version4 = new MessageVersion("1");
MessageVersion version5 = new MessageVersion("1.5");
MessageVersion version6 = new MessageVersion("1.0.5");
assertTrue(version1.compareTo(version2) == 0);
assertTrue(version2.compareTo(version1) == 0);
assertTrue(version1.compareTo(version3) < 0);
assertTrue(version3.compareTo(version1) > 0);
assertTrue(version1.compareTo(version4) == 0);
assertTrue(version4.compareTo(version1) == 0);
assertTrue(version1.compareTo(version5) < 0);
assertTrue(version5.compareTo(version1) > 0);
assertTrue(version1.compareTo(version6) < 0);
assertTrue(version6.compareTo(version1) > 0);
}
@Test
public void testEquals() throws Exception {
MessageVersion version1 = new MessageVersion("1.0.0");
MessageVersion version2 = new MessageVersion("1.0.0");
MessageVersion version3 = new MessageVersion("2.0.0");
MessageVersion version4 = new MessageVersion("1");
MessageVersion version5 = new MessageVersion("1.5");
MessageVersion version6 = new MessageVersion("1.0.5");
assertTrue(version1.equals(version2));
assertTrue(version2.equals(version1));
assertFalse(version1.equals(version3));
assertFalse(version3.equals(version1));
assertTrue(version1.equals(version4));
assertTrue(version4.equals(version1));
assertFalse(version1.equals(version5));
assertFalse(version5.equals(version1));
assertFalse(version1.equals(version6));
assertFalse(version6.equals(version1));
}
@Test
public void testHashCode() throws Exception {
MessageVersion version1 = new MessageVersion("1.0.0");
MessageVersion version2 = new MessageVersion("1.0.0");
MessageVersion version3 = new MessageVersion("1");
assertEquals(version1.hashCode(), version2.hashCode());
assertEquals(version1.hashCode(), version3.hashCode());
}
@Test
public void testGetVersionParts() throws Exception {
MessageVersion version = new MessageVersion("1.0.0");
assertTrue(Arrays.equals(new Integer[]{1}, version.getVersionParts()));
version = new MessageVersion("1.0");
assertTrue(Arrays.equals(new Integer[]{1}, version.getVersionParts()));
version = new MessageVersion("1");
assertTrue(Arrays.equals(new Integer[]{1}, version.getVersionParts()));
version = new MessageVersion("1.0.2");
assertTrue(Arrays.equals(new Integer[]{1, 0, 2}, version.getVersionParts()));
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
/**
* VersionedMessage tests.
*/
public class VersionedMessageTest {
@Test
public void testGetVersion() throws Exception {
MessageVersion version = new MessageVersion("1.0.0");
VersionedMessage<String> versionedMessage = new VersionedMessage<>(version, "a");
assertEquals(versionedMessage.getVersion(), version);
}
@Test
public void testGetMessage() throws Exception {
String message = "a";
MessageVersion version = new MessageVersion("1.0.0");
VersionedMessage<String> versionedMessage = new VersionedMessage<>(version, message);
assertEquals(versionedMessage.getMessage(), message);
}
@Test
public void testCompareVersion() throws Exception {
MessageVersion version1 = new MessageVersion("1.0.0");
MessageVersion version2 = new MessageVersion("2.0.0");
MessageVersion version3 = new MessageVersion("0.5.0");
VersionedMessage<String> versionedMessage = new VersionedMessage<>(version1, "a");
assertTrue(versionedMessage.compareVersion(version1) == 0);
assertTrue(versionedMessage.compareVersion(version2) < 0);
assertTrue(versionedMessage.compareVersion(version3) > 0);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.entity;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.testng.annotations.Test;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import static org.testng.Assert.assertEquals;
/**
* EntityMessageDeserializer tests.
*/
public class EntityMessageDeserializerTest {
@Test
public void testDeserialize() throws Exception {
EntityMessageDeserializer deserializer = new EntityMessageDeserializer();
Referenceable entity = EntityNotificationImplTest.getEntity("id");
String traitName = "MyTrait";
List<IStruct> traitInfo = new LinkedList<>();
IStruct trait = new Struct(traitName, Collections.<String, Object>emptyMap());
traitInfo.add(trait);
EntityNotificationImpl notification =
new EntityNotificationImpl(entity, EntityNotification.OperationType.TRAIT_ADD, traitInfo);
String json = AbstractNotification.getMessageJson(notification);
EntityNotification deserializedNotification = deserializer.deserialize(json);
assertEquals(deserializedNotification.getOperationType(), notification.getOperationType());
assertEquals(deserializedNotification.getEntity().getId(), notification.getEntity().getId());
assertEquals(deserializedNotification.getEntity().getTypeName(), notification.getEntity().getTypeName());
assertEquals(deserializedNotification.getEntity().getTraits(), notification.getEntity().getTraits());
assertEquals(deserializedNotification.getEntity().getTrait(traitName),
notification.getEntity().getTrait(traitName));
}
}
......@@ -131,7 +131,7 @@ public class EntityNotificationImplTest {
assertTrue(entityNotification2.equals(entityNotification));
}
private Referenceable getEntity(String id, IStruct... traits) {
public static Referenceable getEntity(String id, IStruct... traits) {
String typeName = "typeName";
Map<String, Object> values = new HashMap<>();
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.hook;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.entity.EntityNotificationImplTest;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.testng.annotations.Test;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
/**
* HookMessageDeserializer tests.
*/
public class HookMessageDeserializerTest {
@Test
public void testDeserialize() throws Exception {
HookMessageDeserializer deserializer = new HookMessageDeserializer();
Referenceable entity = EntityNotificationImplTest.getEntity("id");
String traitName = "MyTrait";
List<IStruct> traitInfo = new LinkedList<>();
IStruct trait = new Struct(traitName, Collections.<String, Object>emptyMap());
traitInfo.add(trait);
HookNotification.EntityUpdateRequest message =
new HookNotification.EntityUpdateRequest("user1", entity);
String json = AbstractNotification.getMessageJson(message);
HookNotification.HookNotificationMessage deserializedMessage = deserializer.deserialize(json);
assertEquals(deserializedMessage.getType(), message.getType());
assertEquals(deserializedMessage.getUser(), message.getUser());
assertTrue(deserializedMessage instanceof HookNotification.EntityUpdateRequest);
HookNotification.EntityUpdateRequest deserializedEntityUpdateRequest =
(HookNotification.EntityUpdateRequest) deserializedMessage;
Referenceable deserializedEntity = deserializedEntityUpdateRequest.getEntities().get(0);
assertEquals(deserializedEntity.getId(), entity.getId());
assertEquals(deserializedEntity.getTypeName(), entity.getTypeName());
assertEquals(deserializedEntity.getTraits(), entity.getTraits());
assertEquals(deserializedEntity.getTrait(traitName), entity.getTrait(traitName));
}
}
......@@ -17,7 +17,7 @@
*/
package org.apache.atlas.notification.hook;
import org.apache.atlas.notification.AbstractNotificationConsumer;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.testng.annotations.Test;
......@@ -25,6 +25,9 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
public class HookNotificationTest {
public static final HookMessageDeserializer HOOK_MESSAGE_DESERIALIZER = new HookMessageDeserializer();
@Test
public void testNewMessageSerDe() throws Exception {
Referenceable entity1 = new Referenceable("sometype");
......@@ -34,9 +37,10 @@ public class HookNotificationTest {
String user = "user";
HookNotification.EntityCreateRequest request = new HookNotification.EntityCreateRequest(user, entity1, entity2);
String notificationJson = AbstractNotificationConsumer.GSON.toJson(request);
HookNotification.HookNotificationMessage actualNotification = AbstractNotificationConsumer.GSON.fromJson(
notificationJson, HookNotification.HookNotificationMessage.class);
String notificationJson = AbstractNotification.GSON.toJson(request);
HookNotification.HookNotificationMessage actualNotification =
HOOK_MESSAGE_DESERIALIZER.deserialize(notificationJson);
assertEquals(actualNotification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE);
assertEquals(actualNotification.getUser(), user);
......@@ -56,7 +60,7 @@ public class HookNotificationTest {
entity.set("attr", "value");
HookNotification.EntityCreateRequest request = new HookNotification.EntityCreateRequest(null, entity);
String notificationJsonFromCode = AbstractNotificationConsumer.GSON.toJson(request);
String notificationJsonFromCode = AbstractNotification.GSON.toJson(request);
System.out.println(notificationJsonFromCode);
//Json without user and assert that the string can be deserialised
......@@ -82,8 +86,10 @@ public class HookNotificationTest {
+ " \"type\": \"ENTITY_CREATE\"\n"
+ "}";
HookNotification.HookNotificationMessage actualNotification = AbstractNotificationConsumer.GSON.fromJson(
notificationJson, HookNotification.HookNotificationMessage.class);
HookNotification.HookNotificationMessage actualNotification =
HOOK_MESSAGE_DESERIALIZER.deserialize(notificationJson);
assertEquals(actualNotification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE);
assertNull(actualNotification.user);
assertEquals(actualNotification.getUser(), HookNotification.HookNotificationMessage.UNKNOW_USER);
......
......@@ -18,6 +18,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
ATLAS-631 Introduce Versioning to Atlas Notification Payload (tbeerbower via shwethags)
ATLAS-723 JSON deserialization regression (guptaneeru via shwethags)
ATLAS-728 Fix few typos in committer email IDs (yhemanth)
ATLAS-435 Add ORDER BY and Limit to search DSL (neerugupta via sumasai)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment