Commit 6f421e99 by Shwetha GS

ATLAS-158 Provide Atlas Entity Change Notification (tbeerbower via shwethags)

parent c93e0972
......@@ -19,6 +19,7 @@
package org.apache.atlas.listener;
import org.apache.atlas.AtlasException;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import java.util.Collection;
......@@ -29,28 +30,40 @@ import java.util.Collection;
public interface EntityChangeListener {
/**
* This is upon adding a new typed instance to the repository.
* This is upon adding new entities to the repository.
*
* @param typedInstances a typed instance
* @throws AtlasException
* @param entities the created entities
*
* @throws AtlasException if the listener notification fails
*/
void onEntitiesAdded(Collection<ITypedReferenceableInstance> entities) throws AtlasException;
/**
* This is upon updating an entity.
*
* @param entity the updated entity
*
* @throws AtlasException if the listener notification fails
*/
void onEntityAdded(Collection<ITypedReferenceableInstance> typedInstances) throws AtlasException;
void onEntityUpdated(ITypedReferenceableInstance entity) throws AtlasException;
/**
* This is upon adding a new trait to a typed instance.
*
* @param guid globally unique identifier for the entity
* @param traitName trait name for the instance that needs to be added to entity
* @throws AtlasException
* @param entity the entity
* @param trait trait that needs to be added to entity
*
* @throws AtlasException if the listener notification fails
*/
void onTraitAdded(String guid, String traitName) throws AtlasException;
void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException;
/**
* This is upon deleting a trait from a typed instance.
*
* @param guid globally unique identifier for the entity
* @param entity the entity
* @param traitName trait name for the instance that needs to be deleted from entity
* @throws AtlasException
*
* @throws AtlasException if the listener notification fails
*/
void onTraitDeleted(String guid, String traitName) throws AtlasException;
void onTraitDeleted(ITypedReferenceableInstance entity, String traitName) throws AtlasException;
}
......@@ -54,6 +54,7 @@ atlas.kafka.bootstrap.servers=localhost:9027
atlas.kafka.zookeeper.session.timeout.ms=400
atlas.kafka.zookeeper.sync.time.ms=20
atlas.kafka.auto.commit.interval.ms=1000
atlas.kafka.hook.group.id=atlas
######### Hive Lineage Configs #########
......
......@@ -18,7 +18,7 @@ Available bridges are:
---++ Notification
Notification is used for reliable entity registration from hooks and for entity/type change notifications. Atlas, by default, provides kafka integration, but its possible to provide other implementations as well. Atlas service starts embedded kafka server by default.
Notification is used for reliable entity registration from hooks and for entity/type change notifications. Atlas, by default, provides Kafka integration, but its possible to provide other implementations as well. Atlas service starts embedded Kafka server by default.
Atlas also provides NotificationHookConsumer that runs in Atlas Service and listens to messages from hook and registers the entities in Atlas.
<img src="images/twiki/notification.png" height="10" width="20" />
......
......@@ -73,7 +73,7 @@ atlas.lineage.hive.table.schema.query=hive_table where name=?, columns
</verbatim>
---+++ Notification Configs
Refer http://kafka.apache.org/documentation.html#configuration for kafka configuration. All kafka configs should be prefixed with 'atlas.kafka.'
Refer http://kafka.apache.org/documentation.html#configuration for Kafka configuration. All Kafka configs should be prefixed with 'atlas.kafka.'
<verbatim>
atlas.notification.embedded=true
......@@ -83,8 +83,16 @@ atlas.kafka.bootstrap.servers=localhost:9027
atlas.kafka.zookeeper.session.timeout.ms=400
atlas.kafka.zookeeper.sync.time.ms=20
atlas.kafka.auto.commit.interval.ms=1000
atlas.kafka.hook.group.id=atlas
</verbatim>
Note that Kafka group ids are specified for a specific topic. The Kafka group id configuration for entity notifications is 'atlas.kafka.entities.group.id'
<verbatim>
atlas.kafka.entities.group.id=<consumer id>
</verbatim>
---+++ Client Configs
<verbatim>
atlas.client.readTimeoutMSecs=60000
......
---+ Entity Change Notifications
To receive Atlas entity notifications a consumer should be obtained through the notification interface. Entity change notifications are sent every time a change is made to an entity. Operations that result in an entity change notification are:
* <code>ENTITY_CREATE</code> - Create a new entity.
* <code>ENTITY_UPDATE</code> - Update an attribute of an existing entity.
* <code>TRAIT_ADD</code> - Add a trait to an entity.
* <code>TRAIT_DELETE</code> - Delete a trait from an entity.
<verbatim>
// Obtain provider through injection…
Provider<NotificationInterface> provider;
// Get the notification interface
NotificationInterface notification = provider.get();
// Create consumers
List<NotificationConsumer<EntityNotification>> consumers =
notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
</verbatim>
The consumer exposes the Iterator interface that should be used to get the entity notifications as they are posted. The hasNext() method blocks until a notification is available.
<verbatim>
while(consumer.hasNext()) {
EntityNotification notification = consumer.next();
IReferenceableInstance entity = notification.getEntity();
}
</verbatim>
......@@ -43,6 +43,8 @@ allows integration with the whole enterprise data ecosystem.
* [[Search][Search]]
* [[security][security]]
* [[Configuration][Configuration]]
* Notification
* [[Notification-Entity][Entity Notification]]
* Bridges
* [[Bridge-Hive][Hive Bridge]]
......
......@@ -76,5 +76,10 @@
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
</dependency>
</dependencies>
</project>
......@@ -20,28 +20,50 @@ package org.apache.atlas.kafka;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.AbstractNotificationConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaConsumer implements NotificationConsumer {
/**
* Kafka specific notification consumer.
*
* @param <T> the notification type returned by this consumer
*/
public class KafkaConsumer<T> extends AbstractNotificationConsumer<T> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
private final int consumerId;
private final ConsumerIterator iterator;
public KafkaConsumer(KafkaStream<String, String> stream, int consumerId) {
// ----- Constructors ----------------------------------------------------
/**
* Create a Kafka consumer.
*
* @param type the notification type returned by this consumer
* @param stream the underlying Kafka stream
* @param consumerId an id value for this consumer
*/
public KafkaConsumer(Class<T> type, KafkaStream<String, String> stream, int consumerId) {
super(type);
this.iterator = stream.iterator();
this.consumerId = consumerId;
}
// ----- Iterator --------------------------------------------------------
@Override
public boolean hasNext() {
return iterator.hasNext();
}
// ----- AbstractNotificationConsumer ------------------------------------
@Override
public String next() {
public String getNext() {
MessageAndMetadata message = iterator.next();
LOG.debug("Read message: conumerId: {}, topic - {}, partition - {}, offset - {}, message - {}",
consumerId, message.topic(), message.partition(), message.offset(), message.message());
......
......@@ -27,16 +27,13 @@ import kafka.server.KafkaServer;
import kafka.utils.Time;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
......@@ -61,7 +58,10 @@ import java.util.Properties;
import java.util.concurrent.Future;
@Singleton
public class KafkaNotification extends NotificationInterface implements Service {
/**
* Kafka specific access point to the Atlas notification framework.
*/
public class KafkaNotification extends AbstractNotification implements Service {
public static final Logger LOG = LoggerFactory.getLogger(KafkaNotification.class);
public static final String PROPERTY_PREFIX = "atlas.kafka";
......@@ -72,7 +72,8 @@ public class KafkaNotification extends NotificationInterface implements Service
public static final String ATLAS_ENTITIES_TOPIC = "ATLAS_ENTITIES";
public static final String ATLAS_TYPES_TOPIC = "ATLAS_TYPES";
private static final String ATLAS_GROUP = "atlas";
protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";
private KafkaServer kafkaServer;
private ServerCnxnFactory factory;
private Properties properties;
......@@ -80,20 +81,21 @@ public class KafkaNotification extends NotificationInterface implements Service
private KafkaProducer producer = null;
private List<ConsumerConnector> consumerConnectors = new ArrayList<>();
private KafkaConsumer consumer;
private static final Map<NotificationType, String> topicMap = new HashMap<NotificationType, String>() {{
put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);
put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC);
put(NotificationType.TYPES, ATLAS_TYPES_TOPIC);
}};
private synchronized void createProducer() {
if (producer == null) {
producer = new KafkaProducer(properties);
}
}
// ----- Constructors ----------------------------------------------------
/**
* Construct a KafkaNotification.
*
* @param applicationProperties the application properties used to configure Kafka
*
* @throws AtlasException if the notification interface can not be created
*/
public KafkaNotification(Configuration applicationProperties) throws AtlasException {
super(applicationProperties);
Configuration subsetConfiguration =
......@@ -110,58 +112,16 @@ public class KafkaNotification extends NotificationInterface implements Service
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//todo take group id as argument to allow multiple consumers??
properties.put(ConsumerConfig.GROUP_ID_CONFIG, ATLAS_GROUP);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "roundrobin");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest");
//todo new APIs not available yet
// consumer = new KafkaConsumer(properties);
// consumer.subscribe(ATLAS_HOOK_TOPIC);
}
private URL getURL(String url) throws MalformedURLException {
try {
return new URL(url);
} catch(MalformedURLException e) {
return new URL("http://" + url);
}
}
private String startZk() throws IOException, InterruptedException, URISyntaxException {
String zkValue = properties.getProperty("zookeeper.connect");
LOG.debug("Starting zookeeper at {}", zkValue);
URL zkAddress = getURL(zkValue);
this.factory = NIOServerCnxnFactory.createFactory(
new InetSocketAddress(zkAddress.getHost(), zkAddress.getPort()), 1024);
File snapshotDir = constructDir("zk/txn");
File logDir = constructDir("zk/snap");
factory.startup(new ZooKeeperServer(snapshotDir, logDir, 500));
return factory.getLocalAddress().getAddress().toString();
}
private void startKafka() throws IOException, URISyntaxException {
String kafkaValue = properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
LOG.debug("Starting kafka at {}", kafkaValue);
URL kafkaAddress = getURL(kafkaValue);
Properties brokerConfig = properties;
brokerConfig.setProperty("broker.id", "1");
brokerConfig.setProperty("host.name", kafkaAddress.getHost());
brokerConfig.setProperty("port", String.valueOf(kafkaAddress.getPort()));
brokerConfig.setProperty("log.dirs", constructDir("kafka").getAbsolutePath());
brokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1));
kafkaServer = new KafkaServer(new KafkaConfig(brokerConfig), new SystemTime());
kafkaServer.startup();
LOG.debug("Embedded kafka server started with broker config {}", brokerConfig);
}
// ----- Service ---------------------------------------------------------
@Override
public void start() throws AtlasException {
......@@ -186,51 +146,27 @@ public class KafkaNotification extends NotificationInterface implements Service
}
}
private static class SystemTime implements Time {
@Override
public long milliseconds() {
return System.currentTimeMillis();
}
@Override
public long nanoseconds() {
return System.nanoTime();
}
// ----- NotificationInterface -------------------------------------------
@Override
public void sleep(long arg0) {
try {
Thread.sleep(arg0);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType,
int numConsumers) {
String topic = topicMap.get(notificationType);
private File constructDir(String dirPrefix) {
File file = new File(properties.getProperty(ATLAS_KAFKA_DATA), dirPrefix);
if (!file.exists() && !file.mkdirs()) {
throw new RuntimeException("could not create temp directory: " + file.getAbsolutePath());
}
return file;
}
Properties consumerProperties = getConsumerProperties(notificationType);
@Override
public List<NotificationConsumer> createConsumers(NotificationType type, int numConsumers) {
String topic = topicMap.get(type);
ConsumerConnector consumerConnector =
Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(properties));
ConsumerConnector consumerConnector = createConsumerConnector(consumerProperties);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, numConsumers);
StringDecoder decoder = new StringDecoder(null);
Map<String, List<KafkaStream<String, String>>> streamsMap =
consumerConnector.createMessageStreams(topicCountMap, decoder, decoder);
List<KafkaStream<String, String>> kafkaConsumers = streamsMap.get(topic);
List<NotificationConsumer> consumers = new ArrayList<>(numConsumers);
List<NotificationConsumer<T>> consumers = new ArrayList<>(numConsumers);
int consumerId = 0;
for (KafkaStream stream : kafkaConsumers) {
consumers.add(new org.apache.atlas.kafka.KafkaConsumer(stream, consumerId++));
consumers.add(createKafkaConsumer(notificationType.getClassType(), stream, consumerId++));
}
consumerConnectors.add(consumerConnector);
......@@ -269,35 +205,129 @@ public class KafkaNotification extends NotificationInterface implements Service
producer = null;
}
if (consumer != null) {
consumer.close();
consumer = null;
}
for (ConsumerConnector consumerConnector : consumerConnectors) {
consumerConnector.shutdown();
}
consumerConnectors.clear();
}
//New API, not used now
private List<String> receive(long timeout) throws NotificationException {
Map<String, ConsumerRecords> recordsMap = consumer.poll(timeout);
List<String> messages = new ArrayList<>();
if (recordsMap != null) {
for (ConsumerRecords records : recordsMap.values()) {
List<ConsumerRecord> recordList = records.records();
for (ConsumerRecord record : recordList) {
// ----- helper methods --------------------------------------------------
/**
* Create a Kafka consumer connector from the given properties.
*
* @param properties the properties for creating the consumer connector
*
* @return a new Kafka consumer connector
*/
protected ConsumerConnector createConsumerConnector(Properties properties) {
return Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(properties));
}
/**
* Create a Kafka consumer from the given Kafka stream.
*
* @param stream the Kafka stream
* @param consumerId the id for the new consumer
*
* @return a new Kafka consumer
*/
protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T> type, KafkaStream stream,
int consumerId) {
return new org.apache.atlas.kafka.KafkaConsumer<T>(type, stream, consumerId);
}
// Get properties for consumer request
private Properties getConsumerProperties(NotificationType type) {
// find the configured group id for the given notification type
String groupId = properties.getProperty(type.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY);
if (groupId == null) {
throw new IllegalStateException("No configuration group id set for the notification type " + type);
}
Properties consumerProperties = new Properties();
consumerProperties.putAll(properties);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return consumerProperties;
}
private File constructDir(String dirPrefix) {
File file = new File(properties.getProperty(ATLAS_KAFKA_DATA), dirPrefix);
if (!file.exists() && !file.mkdirs()) {
throw new RuntimeException("could not create temp directory: " + file.getAbsolutePath());
}
return file;
}
private synchronized void createProducer() {
if (producer == null) {
producer = new KafkaProducer(properties);
}
}
private URL getURL(String url) throws MalformedURLException {
try {
String message = (String) record.value();
LOG.debug("Received message from topic {}: {}", ATLAS_HOOK_TOPIC, message);
messages.add(message);
} catch (Exception e) {
throw new NotificationException(e);
return new URL(url);
} catch(MalformedURLException e) {
return new URL("http://" + url);
}
}
private String startZk() throws IOException, InterruptedException, URISyntaxException {
String zkValue = properties.getProperty("zookeeper.connect");
LOG.debug("Starting zookeeper at {}", zkValue);
URL zkAddress = getURL(zkValue);
this.factory = NIOServerCnxnFactory.createFactory(
new InetSocketAddress(zkAddress.getHost(), zkAddress.getPort()), 1024);
File snapshotDir = constructDir("zk/txn");
File logDir = constructDir("zk/snap");
factory.startup(new ZooKeeperServer(snapshotDir, logDir, 500));
return factory.getLocalAddress().getAddress().toString();
}
private void startKafka() throws IOException, URISyntaxException {
String kafkaValue = properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
LOG.debug("Starting kafka at {}", kafkaValue);
URL kafkaAddress = getURL(kafkaValue);
Properties brokerConfig = properties;
brokerConfig.setProperty("broker.id", "1");
brokerConfig.setProperty("host.name", kafkaAddress.getHost());
brokerConfig.setProperty("port", String.valueOf(kafkaAddress.getPort()));
brokerConfig.setProperty("log.dirs", constructDir("kafka").getAbsolutePath());
brokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1));
kafkaServer = new KafkaServer(new KafkaConfig(brokerConfig), new SystemTime());
kafkaServer.startup();
LOG.debug("Embedded kafka server started with broker config {}", brokerConfig);
}
// ----- inner class : SystemTime ----------------------------------------
private static class SystemTime implements Time {
@Override
public long milliseconds() {
return System.currentTimeMillis();
}
@Override
public long nanoseconds() {
return System.nanoTime();
}
@Override
public void sleep(long arg0) {
try {
Thread.sleep(arg0);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return messages;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import org.apache.atlas.AtlasException;
import org.apache.commons.configuration.Configuration;
/**
* Abstract notification interface implementation.
*/
public abstract class AbstractNotification implements NotificationInterface {
private static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded";
private final boolean embedded;
// ----- Constructors ------------------------------------------------------
public AbstractNotification(Configuration applicationProperties) throws AtlasException {
this.embedded = applicationProperties.getBoolean(PROPERTY_EMBEDDED, false);
}
// ----- AbstractNotificationInterface -------------------------------------
/**
* Determine whether or not the notification service embedded in Atlas server.
*
* @return true if the the notification service embedded in Atlas server.
*/
protected final boolean isEmbedded() {
return embedded;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
import com.google.gson.reflect.TypeToken;
import org.apache.atlas.notification.entity.EntityNotification;
import org.apache.atlas.notification.entity.EntityNotificationImpl;
import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
/**
* Abstract notification consumer.
*/
public abstract class AbstractNotificationConsumer<T> implements NotificationConsumer<T> {
private static final Gson GSON = new GsonBuilder().
registerTypeAdapter(ImmutableList.class, new ImmutableListDeserializer()).
registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer()).
registerTypeAdapter(EntityNotification.class, new EntityNotificationDeserializer()).
registerTypeAdapter(IStruct.class, new StructDeserializer()).
registerTypeAdapter(IReferenceableInstance.class, new ReferenceableDeserializer()).
registerTypeAdapter(JSONArray.class, new JSONArrayDeserializer()).
create();
private final Class<T> type;
// ----- Constructors ------------------------------------------------------
/**
* Construct an AbstractNotificationConsumer.
*
* @param type the notification type
*/
public AbstractNotificationConsumer(Class<T> type) {
this.type = type;
}
// ----- AbstractNotificationConsumer -------------------------------------
/**
* Get the next notification as a string.
*
* @return the next notification in string form
*/
protected abstract String getNext();
// ----- Iterator ---------------------------------------------------------
@Override
public T next() {
return GSON.fromJson(getNext(), type);
}
@Override
public void remove() {
throw new UnsupportedOperationException("The remove method is not supported.");
}
// ----- inner class : ImmutableListDeserializer ---------------------------
private static class ImmutableListDeserializer implements JsonDeserializer<ImmutableList<?>> {
public static final Type LIST_TYPE = new TypeToken<List<?>>() {}.getType();
@Override
public ImmutableList<?> deserialize(JsonElement json, Type type,
JsonDeserializationContext context) throws JsonParseException {
final List<?> list = context.deserialize(json, LIST_TYPE);
return ImmutableList.copyOf(list);
}
}
// ----- inner class : ImmutableMapDeserializer ----------------------------
public static class ImmutableMapDeserializer implements JsonDeserializer<ImmutableMap<?, ?>> {
public static final Type MAP_TYPE = new TypeToken<Map<?, ?>>() {}.getType();
@Override
public ImmutableMap<?, ?> deserialize(JsonElement json, Type type,
JsonDeserializationContext context) throws JsonParseException {
final Map<?, ?> map = context.deserialize(json, MAP_TYPE);
return ImmutableMap.copyOf(map);
}
}
// ----- inner class : EntityNotificationDeserializer ----------------------
public final static class EntityNotificationDeserializer implements JsonDeserializer<EntityNotification> {
@Override
public EntityNotification deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) throws JsonParseException {
return context.deserialize(json, EntityNotificationImpl.class);
}
}
// ----- inner class : StructDeserializer -------------------------------
public final static class StructDeserializer implements JsonDeserializer<IStruct> {
@Override
public IStruct deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) throws JsonParseException {
return context.deserialize(json, Struct.class);
}
}
// ----- inner class : ReferenceableDeserializer ------------------------
public final static class ReferenceableDeserializer implements JsonDeserializer<IStruct> {
@Override
public IReferenceableInstance deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) throws JsonParseException {
return InstanceSerialization.fromJsonReferenceable(json.toString(), true);
}
}
// ----- inner class : JSONArrayDeserializer ----------------------------
public final static class JSONArrayDeserializer implements JsonDeserializer<JSONArray> {
@Override
public JSONArray deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) throws JsonParseException {
try {
return new JSONArray(json.toString());
} catch (JSONException e) {
throw new JsonParseException(e.getMessage(), e);
}
}
}
}
......@@ -17,16 +17,8 @@
package org.apache.atlas.notification;
public interface NotificationConsumer {
/**
* If there are more messages
* @return
*/
boolean hasNext();
import java.util.Iterator;
/**
* Next message - blocking call
* @return
*/
String next();
// TODO : docs!
public interface NotificationConsumer<T> extends Iterator<T>{
}
......@@ -55,11 +55,11 @@ public class NotificationHookConsumer implements Service {
String atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000");
atlasClient = new AtlasClient(atlasEndpoint);
int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
List<NotificationConsumer> consumers =
List<NotificationConsumer<JSONArray>> consumers =
notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
executors = Executors.newFixedThreadPool(consumers.size());
for (final NotificationConsumer consumer : consumers) {
for (final NotificationConsumer<JSONArray> consumer : consumers) {
executors.submit(new HookConsumer(consumer));
}
}
......@@ -78,19 +78,19 @@ public class NotificationHookConsumer implements Service {
}
class HookConsumer implements Runnable {
private final NotificationConsumer consumer;
private final NotificationConsumer<JSONArray> consumer;
public HookConsumer(NotificationConsumer consumerInterface) {
this.consumer = consumerInterface;
public HookConsumer(NotificationConsumer<JSONArray> consumer) {
this.consumer = consumer;
}
@Override
public void run() {
while(consumer.hasNext()) {
String entityJson = consumer.next();
JSONArray entityJson = consumer.next();
LOG.info("Processing message {}", entityJson);
try {
JSONArray guids = atlasClient.createEntity(new JSONArray(entityJson));
JSONArray guids = atlasClient.createEntity(entityJson);
LOG.info("Create entities with guid {}", guids);
} catch (Exception e) {
//todo handle failures
......
......@@ -17,36 +17,42 @@
package org.apache.atlas.notification;
import org.apache.atlas.AtlasException;
import org.apache.commons.configuration.Configuration;
import org.apache.atlas.notification.entity.EntityNotification;
import org.codehaus.jettison.json.JSONArray;
import java.util.List;
public abstract class NotificationInterface {
public static final String PROPERTY_PREFIX = "atlas.notification";
private static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded";
private boolean embedded;
// TODO : docs!
public interface NotificationInterface {
String PROPERTY_PREFIX = "atlas.notification";
public enum NotificationType {
HOOK, ENTITIES, TYPES
enum NotificationType {
HOOK(JSONArray.class), ENTITIES(EntityNotification.class);
private final Class classType;
NotificationType(Class classType) {
this.classType = classType;
}
public NotificationInterface(Configuration applicationProperties) throws AtlasException {
this.embedded = applicationProperties.getBoolean(PROPERTY_EMBEDDED, false);
public Class getClassType() {
return classType;
}
}
/**
* Is the notification service embedded in atlas server
* @return
* Create notification consumers for the given notification type.
*
* @param notificationType the notification type (i.e. HOOK, ENTITIES)
* @param numConsumers the number of consumers to create
* @param <T> the type of the notifications
*
* @return the list of created consumers
*/
protected final boolean isEmbedded() {
return embedded;
}
public abstract List<NotificationConsumer> createConsumers(NotificationType type, int numConsumers);
<T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers);
public abstract void send(NotificationType type, String... messages) throws NotificationException;
void send(NotificationType type, String... messages) throws NotificationException;
public abstract void close();
void close();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.entity;
import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.IStruct;
import java.util.List;
/**
* Notification of entity changes.
*/
public interface EntityNotification {
/**
* Operations that result in an entity notification.
*/
enum OperationType {
ENTITY_CREATE,
ENTITY_UPDATE,
TRAIT_ADD,
TRAIT_DELETE
}
// ----- EntityNotification ------------------------------------------------
/**
* Get the entity that is associated with this notification.
*
* @return the associated entity
*/
IReferenceableInstance getEntity();
/**
* Get flattened list of traits that are associated with this entity (includes super traits).
*
* @return the list of all traits
*/
List<IStruct> getAllTraits();
/**
* Get the type of operation that triggered this notification.
*
* @return the operation type
*/
OperationType getOperationType();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.entity;
import org.apache.atlas.AtlasException;
import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.types.FieldMapping;
import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.TypeSystem;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Entity notification implementation.
*/
public class EntityNotificationImpl implements EntityNotification {
private final Referenceable entity;
private final OperationType operationType;
private final List<IStruct> traits;
// ----- Constructors ------------------------------------------------------
/**
* No-arg constructor for serialization.
*/
@SuppressWarnings("unused")
private EntityNotificationImpl() throws AtlasException {
this(null, OperationType.ENTITY_CREATE, Collections.<IStruct>emptyList());
}
/**
* Construct an EntityNotification.
*
* @param entity the entity subject of the notification
* @param operationType the type of operation that caused the notification
* @param traits the traits for the given entity
*
* @throws AtlasException if the entity notification can not be created
*/
public EntityNotificationImpl(Referenceable entity, OperationType operationType, List<IStruct> traits)
throws AtlasException {
this.entity = entity;
this.operationType = operationType;
this.traits = traits;
}
/**
* Construct an EntityNotification.
*
* @param entity the entity subject of the notification
* @param operationType the type of operation that caused the notification
* @param typeSystem the Atlas type system
*
* @throws AtlasException if the entity notification can not be created
*/
public EntityNotificationImpl(Referenceable entity, OperationType operationType, TypeSystem typeSystem)
throws AtlasException {
this(entity, operationType, getAllTraits(entity, typeSystem));
}
// ----- EntityNotification ------------------------------------------------
@Override
public IReferenceableInstance getEntity() {
return entity;
}
@Override
public List<IStruct> getAllTraits() {
return traits;
}
@Override
public OperationType getOperationType() {
return operationType;
}
// ----- Object overrides --------------------------------------------------
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
EntityNotificationImpl that = (EntityNotificationImpl) o;
return !(entity != null ? !entity.equals(that.entity) : that.entity != null) &&
operationType == that.operationType &&
traits.equals(that.traits);
}
@Override
public int hashCode() {
int result = entity != null ? entity.hashCode() : 0;
result = 31 * result + operationType.hashCode();
result = 31 * result + traits.hashCode();
return result;
}
// ----- helper methods ----------------------------------------------------
private static List<IStruct> getAllTraits(IReferenceableInstance entityDefinition,
TypeSystem typeSystem) throws AtlasException {
List<IStruct> traitInfo = new LinkedList<>();
for (String traitName : entityDefinition.getTraits()) {
IStruct trait = entityDefinition.getTrait(traitName);
String typeName = trait.getTypeName();
Map<String, Object> valuesMap = trait.getValuesMap();
traitInfo.add(new Struct(typeName, valuesMap));
traitInfo.addAll(getSuperTraits(typeName, valuesMap, typeSystem));
}
return traitInfo;
}
private static List<IStruct> getSuperTraits(
String typeName, Map<String, Object> values, TypeSystem typeSystem) throws AtlasException {
List<IStruct> superTypes = new LinkedList<>();
TraitType traitDef = typeSystem.getDataType(TraitType.class, typeName);
Set<String> superTypeNames = traitDef.getAllSuperTypeNames();
for (String superTypeName : superTypeNames) {
TraitType superTraitDef = typeSystem.getDataType(TraitType.class, superTypeName);
Map<String, Object> superTypeValues = new HashMap<>();
FieldMapping fieldMapping = superTraitDef.fieldMapping();
if (fieldMapping != null) {
Set<String> superTypeAttributeNames = fieldMapping.fields.keySet();
for (String superTypeAttributeName : superTypeAttributeNames) {
if (values.containsKey(superTypeAttributeName)) {
superTypeValues.put(superTypeAttributeName, values.get(superTypeAttributeName));
}
}
}
IStruct superTrait = new Struct(superTypeName, superTypeValues);
superTypes.add(superTrait);
superTypes.addAll(getSuperTraits(superTypeName, values, typeSystem));
}
return superTypes;
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.entity;
import com.google.gson.Gson;
import org.apache.atlas.AtlasException;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.types.TypeSystem;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
/**
* Listen to the repository for entity changes and produce entity change notifications.
*/
public class NotificationEntityChangeListener implements EntityChangeListener {
private final NotificationInterface notificationInterface;
private final TypeSystem typeSystem;
private final Gson gson = new Gson();
// ----- Constructors ------------------------------------------------------
/**
* Construct a NotificationEntityChangeListener.
*
* @param notificationInterface the notification framework interface
* @param typeSystem the Atlas type system
*/
public NotificationEntityChangeListener(NotificationInterface notificationInterface, TypeSystem typeSystem) {
this.notificationInterface = notificationInterface;
this.typeSystem = typeSystem;
}
// ----- EntityChangeListener ----------------------------------------------
@Override
public void onEntitiesAdded(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
notifyOfEntityEvent(entities, EntityNotification.OperationType.ENTITY_CREATE);
}
@Override
public void onEntityUpdated(ITypedReferenceableInstance entity) throws AtlasException {
notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.ENTITY_UPDATE);
}
@Override
public void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException {
notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.TRAIT_ADD);
}
@Override
public void onTraitDeleted(ITypedReferenceableInstance entity, String traitName) throws AtlasException {
notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.TRAIT_DELETE);
}
// ----- helper methods ----------------------------------------------------
// send notification of entity change
private void notifyOfEntityEvent(Collection<ITypedReferenceableInstance> entityDefinitions,
EntityNotification.OperationType operationType) throws AtlasException {
List<String> messages = new LinkedList<>();
for (ITypedReferenceableInstance entityDefinition : entityDefinitions) {
Referenceable entity = new Referenceable(entityDefinition);
EntityNotificationImpl notification =
new EntityNotificationImpl(entity, operationType, typeSystem);
messages.add(gson.toJson(notification));
}
notificationInterface.send(NotificationInterface.NotificationType.ENTITIES,
messages.toArray(new String[messages.size()]));
}
}
......@@ -18,17 +18,37 @@
package org.apache.atlas.kafka;
import com.google.inject.Inject;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
import org.testng.Assert;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.codehaus.jettison.json.JSONArray;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@Guice(modules = NotificationModule.class)
public class KafkaNotificationTest {
......@@ -41,15 +61,62 @@ public class KafkaNotificationTest {
}
@Test
public void testSendReceiveMessage() throws AtlasException {
String msg1 = "message" + random();
String msg2 = "message" + random();
public void testSendReceiveMessage() throws Exception {
String msg1 = "[{\"message\": " + 123 + "}]";
String msg2 = "[{\"message\": " + 456 + "}]";
kafka.send(NotificationInterface.NotificationType.HOOK, msg1, msg2);
NotificationConsumer consumer = kafka.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
Assert.assertTrue(consumer.hasNext());
Assert.assertEquals(msg1, consumer.next());
Assert.assertTrue(consumer.hasNext());
Assert.assertEquals(msg2, consumer.next());
List<NotificationConsumer<JSONArray>> consumers =
kafka.createConsumers(NotificationInterface.NotificationType.HOOK, 1);
NotificationConsumer<JSONArray> consumer = consumers.get(0);
assertTrue(consumer.hasNext());
assertEquals(new JSONArray(msg1), consumer.next());
assertTrue(consumer.hasNext());
assertEquals(new JSONArray(msg2), consumer.next());
}
@Test
@SuppressWarnings("unchecked")
public void testCreateConsumers() throws Exception {
Configuration configuration = mock(Configuration.class);
Iterator iterator = mock(Iterator.class);
ConsumerConnector consumerConnector = mock(ConsumerConnector.class);
KafkaStream kafkaStream1 = mock(KafkaStream.class);
KafkaStream kafkaStream2 = mock(KafkaStream.class);
String groupId = "groupId9999";
when(configuration.subset(KafkaNotification.PROPERTY_PREFIX)).thenReturn(configuration);
when(configuration.getKeys()).thenReturn(iterator);
when(iterator.hasNext()).thenReturn(true).thenReturn(false);
when(iterator.next()).thenReturn("entities." + KafkaNotification.CONSUMER_GROUP_ID_PROPERTY);
when(configuration.getList("entities." + KafkaNotification.CONSUMER_GROUP_ID_PROPERTY))
.thenReturn(Collections.<Object>singletonList(groupId));
Map<String, List<KafkaStream<String, String>>> streamsMap = new HashMap<>();
List<KafkaStream<String, String>> kafkaStreamList = new LinkedList<>();
kafkaStreamList.add(kafkaStream1);
kafkaStreamList.add(kafkaStream2);
streamsMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, kafkaStreamList);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, 2);
when(consumerConnector.createMessageStreams(
eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class))).thenReturn(streamsMap);
TestKafkaNotification kafkaNotification = new TestKafkaNotification(configuration, consumerConnector);
List<NotificationConsumer<String>> consumers =
kafkaNotification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 2);
assertEquals(2, consumers.size());
// assert that all of the given kafka streams were used to create kafka consumers
List<KafkaStream> streams = kafkaNotification.kafkaStreams;
assertTrue(streams.contains(kafkaStream1));
assertTrue(streams.contains(kafkaStream2));
// assert that the given consumer group id was added to the properties used to create the consumer connector
Properties properties = kafkaNotification.consumerProperties;
assertEquals(groupId, properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
}
private String random() {
......@@ -60,4 +127,33 @@ public class KafkaNotificationTest {
public void teardown() throws Exception {
kafka.stop();
}
// Extended kafka notification class for testing.
private static class TestKafkaNotification extends KafkaNotification {
private final ConsumerConnector consumerConnector;
private Properties consumerProperties;
private List<KafkaStream> kafkaStreams = new LinkedList<>();
public TestKafkaNotification(Configuration applicationProperties,
ConsumerConnector consumerConnector) throws AtlasException {
super(applicationProperties);
this.consumerConnector = consumerConnector;
}
@Override
protected ConsumerConnector createConsumerConnector(Properties properties) {
this.consumerProperties = properties;
kafkaStreams.clear();
return consumerConnector;
}
@Override
protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T> type, KafkaStream stream,
int consumerId) {
kafkaStreams.add(stream);
return super.createKafkaConsumer(type, stream, consumerId);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.entity;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.testng.annotations.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
/**
* EntityNotificationImpl tests.
*/
public class EntityNotificationImplTest {
@Test
public void testGetEntity() throws Exception {
Referenceable entity = getEntity("id");
EntityNotificationImpl entityNotification =
new EntityNotificationImpl(entity, EntityNotification.OperationType.ENTITY_CREATE,
Collections.<IStruct>emptyList());
assertEquals(entity, entityNotification.getEntity());
}
@Test
public void testGetOperationType() throws Exception {
Referenceable entity = getEntity("id");
EntityNotificationImpl entityNotification =
new EntityNotificationImpl(entity, EntityNotification.OperationType.ENTITY_CREATE,
Collections.<IStruct>emptyList());
assertEquals(EntityNotification.OperationType.ENTITY_CREATE, entityNotification.getOperationType());
}
@Test
public void testGetAllTraits() throws Exception {
Referenceable entity = getEntity("id");
String traitName = "MyTrait";
List<IStruct> traitInfo = new LinkedList<>();
IStruct trait = new Struct(traitName, Collections.<String, Object>emptyMap());
traitInfo.add(trait);
EntityNotificationImpl entityNotification =
new EntityNotificationImpl(entity, EntityNotification.OperationType.TRAIT_ADD, traitInfo);
assertEquals(traitInfo, entityNotification.getAllTraits());
}
@Test
public void testGetAllTraits_superTraits() throws Exception {
TypeSystem typeSystem = mock(TypeSystem.class);
String traitName = "MyTrait";
IStruct myTrait = new Struct(traitName);
String superTraitName = "MySuperTrait";
TraitType traitDef = mock(TraitType.class);
Set<String> superTypeNames = Collections.singleton(superTraitName);
TraitType superTraitDef = mock(TraitType.class);
Set<String> superSuperTypeNames = Collections.emptySet();
Referenceable entity = getEntity("id", myTrait);
when(typeSystem.getDataType(TraitType.class, traitName)).thenReturn(traitDef);
when(typeSystem.getDataType(TraitType.class, superTraitName)).thenReturn(superTraitDef);
when(traitDef.getAllSuperTypeNames()).thenReturn(superTypeNames);
when(superTraitDef.getAllSuperTypeNames()).thenReturn(superSuperTypeNames);
EntityNotificationImpl entityNotification =
new EntityNotificationImpl(entity, EntityNotification.OperationType.TRAIT_ADD, typeSystem);
List<IStruct> allTraits = entityNotification.getAllTraits();
assertEquals(2, allTraits.size());
for (IStruct trait : allTraits) {
String typeName = trait.getTypeName();
assertTrue(typeName.equals(traitName) || typeName.equals(superTraitName));
}
}
@Test
public void testEquals() throws Exception {
Referenceable entity = getEntity("id");
EntityNotificationImpl entityNotification2 =
new EntityNotificationImpl(entity, EntityNotification.OperationType.ENTITY_CREATE,
Collections.<IStruct>emptyList());
EntityNotificationImpl entityNotification =
new EntityNotificationImpl(entity, EntityNotification.OperationType.ENTITY_CREATE,
Collections.<IStruct>emptyList());
assertTrue(entityNotification.equals(entityNotification2));
assertTrue(entityNotification2.equals(entityNotification));
}
private Referenceable getEntity(String id, IStruct ... traits) {
String typeName = "typeName";
Map<String, Object> values = new HashMap<>();
List<String> traitNames = new LinkedList<>();
Map<String, IStruct> traitMap = new HashMap<>();
for (IStruct trait : traits) {
String traitName = trait.getTypeName();
traitNames.add(traitName);
traitMap.put(traitName, trait);
}
return new Referenceable(id, typeName, values, traitNames, traitMap);
}
}
\ No newline at end of file
......@@ -9,6 +9,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags)
ALL CHANGES:
ATLAS-158 Provide Atlas Entity Change Notification (tbeerbower via shwethags)
ATALS-238 atlas_start.py- the Atlas server won’t restart after improper shutdown(ndjouri via sumasai)
ATLAS-293 UI Requires Internet Access For UI Facelift (darshankumar89 via shwethags)
ATLAS-292 The artifactId 'dashboard' should be 'atlas-dashboard' in the webapp/pom.xml (ltfxyz via shwethags)
......
......@@ -41,6 +41,11 @@
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-client</artifactId>
</dependency>
......
......@@ -31,6 +31,7 @@ import org.apache.atlas.listener.TypesChangeListener;
import org.apache.atlas.repository.IndexCreationException;
import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.typestore.ITypeStore;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.ITypedStruct;
import org.apache.atlas.typesystem.Referenceable;
......@@ -63,9 +64,11 @@ import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Simple wrapper over TypeSystem and MetadataRepository services with hooks
......@@ -247,7 +250,14 @@ public class DefaultMetadataService implements MetadataService {
final String[] guids = repository.createEntities(typedInstances);
onEntityAddedToRepo(Arrays.asList(typedInstances));
Set<ITypedReferenceableInstance> entitites = new HashSet<>();
for (String guid : guids) {
entitites.add(repository.getEntityDefinition(guid));
}
onEntitiesAddedToRepo(entitites);
return new JSONArray(Arrays.asList(guids)).toString();
}
......@@ -300,8 +310,8 @@ public class DefaultMetadataService implements MetadataService {
/**
* Validate that attribute is unique attribute
* @param entityType
* @param attributeName
* @param entityType the entity type
* @param attributeName the name of the attribute
*/
private void validateUniqueAttribute(String entityType, String attributeName) throws AtlasException {
ClassType type = typeSystem.getDataType(ClassType.class, entityType);
......@@ -332,6 +342,8 @@ public class DefaultMetadataService implements MetadataService {
ParamChecker.notEmpty(value, "property value cannot be null");
repository.updateEntity(guid, property, value);
onEntityUpdated(repository.getEntityDefinition(guid), property, value);
}
private void validateTypeExists(String entityType) throws AtlasException {
......@@ -385,7 +397,7 @@ public class DefaultMetadataService implements MetadataService {
repository.addTrait(guid, traitInstance);
onTraitAddedToEntity(guid, traitName);
onTraitAddedToEntity(repository.getEntityDefinition(guid), traitInstance);
}
private ITypedStruct deserializeTraitInstance(String traitInstanceDefinition)
......@@ -427,7 +439,7 @@ public class DefaultMetadataService implements MetadataService {
repository.deleteTrait(guid, traitNameToBeDeleted);
onTraitDeletedFromEntity(guid, traitNameToBeDeleted);
onTraitDeletedFromEntity(repository.getEntityDefinition(guid), traitNameToBeDeleted);
}
private void onTypesAdded(Map<String, IDataType> typesAdded) throws AtlasException {
......@@ -447,23 +459,29 @@ public class DefaultMetadataService implements MetadataService {
}
}
private void onEntityAddedToRepo(Collection<ITypedReferenceableInstance> typedInstances)
throws AtlasException {
private void onEntitiesAddedToRepo(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
for (EntityChangeListener listener : entityChangeListeners) {
listener.onEntityAdded(typedInstances);
listener.onEntitiesAdded(entities);
}
}
private void onEntityUpdated(ITypedReferenceableInstance entity, String property, String value)
throws AtlasException {
for (EntityChangeListener listener : entityChangeListeners) {
listener.onEntityUpdated(entity);
}
}
private void onTraitAddedToEntity(String typeName, String traitName) throws AtlasException {
private void onTraitAddedToEntity(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException {
for (EntityChangeListener listener : entityChangeListeners) {
listener.onTraitAdded(typeName, traitName);
listener.onTraitAdded(entity, trait);
}
}
private void onTraitDeletedFromEntity(String typeName, String traitName) throws AtlasException {
private void onTraitDeletedFromEntity(ITypedReferenceableInstance entity, String traitName) throws AtlasException {
for (EntityChangeListener listener : entityChangeListeners) {
listener.onTraitDeleted(typeName, traitName);
listener.onTraitDeleted(entity, traitName);
}
}
......
......@@ -19,6 +19,7 @@
package org.apache.atlas.services;
import org.apache.atlas.AtlasException;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.typesystem.types.DataTypes;
import org.codehaus.jettison.json.JSONObject;
......@@ -131,4 +132,18 @@ public interface MetadataService {
* @throws AtlasException
*/
void deleteTrait(String guid, String traitNameToBeDeleted) throws AtlasException;
/**
* Register a listener for entity change.
*
* @param listener the listener to register
*/
void registerListener(EntityChangeListener listener);
/**
* Unregister an entity change listener.
*
* @param listener the listener to unregister
*/
void unregisterListener(EntityChangeListener listener);
}
......@@ -20,9 +20,12 @@ package org.apache.atlas.typesystem;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.atlas.AtlasException;
import org.apache.atlas.classification.InterfaceAudience;
import org.apache.atlas.typesystem.persistence.Id;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -75,6 +78,27 @@ public class Referenceable extends Struct implements IReferenceableInstance {
traits = ImmutableMap.copyOf(_traits);
}
/**
* Construct a Referenceable from the given ITypedReferenceableInstance.
*
* @param instance the typed referenceable instance to copy
*
* @throws AtlasException if the referenceable can not be created
*/
public Referenceable(ITypedReferenceableInstance instance) throws AtlasException {
this(instance.getId()._getId(), instance.getTypeName(), instance.getValuesMap(), instance.getTraits(),
getTraits(instance));
}
/**
* No-arg constructor for serialization.
*/
@SuppressWarnings("unused")
private Referenceable() {
this("", "", Collections.<String, Object>emptyMap(), Collections.<String>emptyList(),
Collections.<String, IStruct>emptyMap());
}
@Override
public ImmutableList<String> getTraits() {
return traitNames;
......@@ -89,4 +113,13 @@ public class Referenceable extends Struct implements IReferenceableInstance {
public IStruct getTrait(String typeName) {
return traits.get(typeName);
}
private static Map<String, IStruct> getTraits(ITypedReferenceableInstance instance) {
Map<String, IStruct> traits = new HashMap<>();
for (String traitName : instance.getTraits() ) {
traits.put(traitName, instance.getTrait(traitName));
}
return traits;
}
}
......@@ -20,6 +20,7 @@ package org.apache.atlas.typesystem;
import org.apache.atlas.classification.InterfaceAudience;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
......@@ -41,6 +42,15 @@ public class Struct implements IStruct {
}
}
/**
* No-arg constructor for serialization.
*/
@SuppressWarnings("unused")
private Struct() {
this("", Collections.<String, Object>emptyMap());
}
@Override
public String getTypeName() {
return typeName;
......
......@@ -61,6 +61,8 @@ atlas.kafka.data=target/data/kafka
atlas.kafka.zookeeper.session.timeout.ms=400
atlas.kafka.zookeeper.sync.time.ms=20
atlas.kafka.auto.commit.interval.ms=100
atlas.kafka.hook.group.id=atlas
atlas.kafka.entities.group.id=atlas_entities
######### Security Properties #########
......
......@@ -33,9 +33,13 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule;
import org.apache.atlas.notification.entity.NotificationEntityChangeListener;
import org.apache.atlas.repository.graph.GraphProvider;
import org.apache.atlas.service.Services;
import org.apache.atlas.services.MetadataService;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.web.filters.AtlasAuthenticationFilter;
import org.apache.atlas.web.filters.AuditFilter;
import org.apache.commons.configuration.Configuration;
......@@ -114,6 +118,7 @@ public class GuiceServletConfig extends GuiceServletContextListener {
LoginProcessor loginProcessor = new LoginProcessor();
loginProcessor.login();
initMetadataService();
startServices();
}
......@@ -154,4 +159,17 @@ public class GuiceServletConfig extends GuiceServletContextListener {
Services services = injector.getInstance(Services.class);
services.stop();
}
// initialize the metadata service
private void initMetadataService() {
MetadataService metadataService = injector.getInstance(MetadataService.class);
// add a listener for entity changes
NotificationInterface notificationInterface = injector.getInstance(NotificationInterface.class);
NotificationEntityChangeListener listener =
new NotificationEntityChangeListener(notificationInterface, TypeSystem.getInstance());
metadataService.registerListener(listener);
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import com.google.inject.Inject;
import org.apache.atlas.notification.entity.EntityNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.web.resources.BaseResourceIT;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.util.List;
/**
* Entity Notification Integration Tests.
*/
@Guice(modules = NotificationModule.class)
public class EntityNotificationIT extends BaseResourceIT {
@Inject
private NotificationInterface notificationInterface;
@BeforeClass
public void setUp() throws Exception {
super.setUp();
createTypeDefinitions();
}
@Test
public void testEntityNotification() throws Exception {
List<NotificationConsumer<EntityNotification>> consumers =
notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
NotificationConsumer<EntityNotification> consumer = consumers.iterator().next();
final EntityNotificationConsumer notificationConsumer = new EntityNotificationConsumer(consumer);
Thread thread = new Thread(notificationConsumer);
thread.start();
createEntity("Sales", "Sales Database", "John ETL", "hdfs://host:8000/apps/warehouse/sales");
waitFor(10000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
return notificationConsumer.entityNotification != null;
}
});
Assert.assertNotNull(notificationConsumer.entityNotification);
Assert.assertEquals(EntityNotification.OperationType.ENTITY_CREATE, notificationConsumer.entityNotification.getOperationType());
Assert.assertEquals(DATABASE_TYPE, notificationConsumer.entityNotification.getEntity().getTypeName());
Assert.assertEquals("Sales", notificationConsumer.entityNotification.getEntity().get("name"));
}
private void createEntity(String name, String description, String owner, String locationUri, String... traitNames)
throws Exception {
Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("description", description);
referenceable.set("owner", owner);
referenceable.set("locationUri", locationUri);
referenceable.set("createTime", System.currentTimeMillis());
createInstance(referenceable);
}
private static class EntityNotificationConsumer implements Runnable {
private final NotificationConsumer<EntityNotification> consumerIterator;
private EntityNotification entityNotification = null;
public EntityNotificationConsumer(NotificationConsumer<EntityNotification> consumerIterator) {
this.consumerIterator = consumerIterator;
}
@Override
public void run() {
while(consumerIterator.hasNext()) {
entityNotification = consumerIterator.next();
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment