Commit b627a681 by Shwetha GS

ATLAS-74 Create notification framework (shwethags)

parent 751b4c87
......@@ -256,10 +256,6 @@
<name>atlas.log.dir</name>
<value>${project.build.directory}/logs</value>
</systemProperty>
<systemProperty>
<name>atlas.conf</name>
<value>${project.build.directory}/test-classes</value>
</systemProperty>
</systemProperties>
<stopKey>atlas-stop</stopKey>
<stopPort>41001</stopPort>
......
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######### Graph Database Configs #########
#Refer http://s3.thinkaurelius.com/docs/titan/0.5.1/titan-config-ref.html
# Graph Storage
atlas.graph.storage.backend=${titan.storage.backend}
#Berkeley storage directory
atlas.graph.storage.directory=target/data/berkley
#hbase
#For standalone mode , specify localhost
#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2
atlas.graph.storage.hostname=${titan.storage.hostname}
# Graph Search Index Backend
atlas.graph.index.search.backend=${titan.index.backend}
#lucene
#atlas.graph.index.search.directory=target/data/lucene
#elasticsearch
atlas.graph.index.search.directory=./target/data/es
atlas.graph.index.search.elasticsearch.client-only=false
atlas.graph.index.search.elasticsearch.local-mode=true
atlas.graph.index.search.elasticsearch.create.sleep=2000
#solr in cloud mode
atlas.graph.index.search.solr.mode=cloud
atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address}
#solr in http mode
atlas.graph.index.search.solr.http-urls=http://localhost:8983/solr
######### Hive Lineage Configs #########
#atlas.lineage.hive.table.type.name=DataSet
#atlas.lineage.hive.process.type.name=Process
#atlas.lineage.hive.process.inputs.name=inputs
#atlas.lineage.hive.process.outputs.name=outputs
## Schema
#atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, columns
######### Security Properties #########
# SSL config
atlas.enableTLS=false
......@@ -17,19 +17,17 @@
package org.apache.atlas;
import org.apache.commons.configuration.AbstractConfiguration;
import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.ConfigurationUtils;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.URL;
import java.util.Arrays;
import java.util.Iterator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class ApplicationProperties extends PropertiesConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(ApplicationProperties.class);
......@@ -47,7 +45,9 @@ public class ApplicationProperties extends PropertiesConfiguration {
if (INSTANCE == null) {
synchronized (ApplicationProperties.class) {
if (INSTANCE == null) {
INSTANCE = get(APPLICATION_PROPERTIES);
Configuration applicationProperties = get(APPLICATION_PROPERTIES);
Configuration clientProperties = get(CLIENT_PROPERTIES);
INSTANCE = new CompositeConfiguration(Arrays.asList(applicationProperties, clientProperties));
}
}
}
......
......@@ -16,45 +16,22 @@
# limitations under the License.
#
######### Graph Database Configs #########
# Graph Storage
atlas.graph.storage.backend=${titan.storage.backend}
# Graph Search Index Backend
atlas.graph.index.search.backend=${titan.index.backend}
#Berkeley storage directory
atlas.graph.storage.directory=target/data/berkley
#hbase
#For standalone mode , specify localhost
#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2
atlas.graph.storage.hostname=${titan.storage.hostname}
#ElasticSearch
atlas.graph.index.search.directory=target/data/es
atlas.graph.index.search.elasticsearch.client-only=false
atlas.graph.index.search.elasticsearch.local-mode=true
atlas.graph.index.search.elasticsearch.create.sleep=2000
# Solr cloud mode properties
atlas.graph.index.search.solr.mode=cloud
atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address}
######### Security Properties #########
######### Hive Lineage Configs #########
# This models reflects the base super types for Data and Process
#atlas.lineage.hive.table.type.name=DataSet
#atlas.lineage.hive.process.type.name=Process
#atlas.lineage.hive.process.inputs.name=inputs
#atlas.lineage.hive.process.outputs.name=outputs
# SSL config
## Schema
atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, columns
atlas.enableTLS=false
#truststore.file=/path/to/truststore.jks
#cert.stores.credential.provider.path=jceks://file/path/to/credentialstore.jceks
#following only required for 2-way SSL
#keystore.file=/path/to/keystore.jks
######### Security Properties #########
# Authentication config
# SSL config
atlas.enableTLS=false
# enabled: true or false
atlas.http.authentication.enabled=false
# type: simple or kerberos
atlas.http.authentication.type=simple
######### Security Properties #########
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>apache-atlas</artifactId>
<groupId>org.apache.atlas</groupId>
<version>0.6-incubating-SNAPSHOT</version>
</parent>
<artifactId>atlas-notification</artifactId>
<description>Apache Atlas Client</description>
<name>Apache Atlas Notification</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-typesystem</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.kafka;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import org.apache.atlas.notification.NotificationConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaConsumer implements NotificationConsumer {
private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
private final int consumerId;
private final ConsumerIterator iterator;
public KafkaConsumer(KafkaStream<String, String> stream, int consumerId) {
this.iterator = stream.iterator();
this.consumerId = consumerId;
}
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public String next() {
MessageAndMetadata message = iterator.next();
LOG.debug("Read message: conumerId: {}, topic - {}, partition - {}, offset - {}, message - {}",
consumerId, message.topic(), message.partition(), message.offset(), message.message());
return (String) message.message();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.kafka;
import com.google.inject.Singleton;
import kafka.consumer.Consumer;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.Time;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
@Singleton
public class KafkaNotification extends NotificationInterface {
public static final Logger LOG = LoggerFactory.getLogger(KafkaNotification.class);
public static final String PROPERTY_PREFIX = NotificationInterface.PROPERTY_PREFIX + ".kafka";
private static final int ATLAS_ZK_PORT = 9026;
private static final int ATLAS_KAFKA_PORT = 9027;
private static final String ATLAS_KAFKA_DATA = "data";
public static final String ATLAS_HOOK_TOPIC = "ATLAS_HOOK";
public static final String ATLAS_ENTITIES_TOPIC = "ATLAS_ENTITIES";
public static final String ATLAS_TYPES_TOPIC = "ATLAS_TYPES";
private static final String ATLAS_GROUP = "atlas";
private KafkaServer kafkaServer;
private ServerCnxnFactory factory;
private Properties properties;
private KafkaProducer producer = null;
private List<ConsumerConnector> consumerConnectors = new ArrayList<>();
private KafkaConsumer consumer;
private static final Map<NotificationType, String> topicMap = new HashMap<NotificationType, String>() {{
put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);
put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC);
put(NotificationType.TYPES, ATLAS_TYPES_TOPIC);
}};
private synchronized void createProducer() {
if (producer == null) {
producer = new KafkaProducer(properties);
}
}
@Override
public void initialize(Configuration applicationProperties) throws AtlasException {
super.initialize(applicationProperties);
Configuration subsetConfiguration =
ApplicationProperties.getSubsetConfiguration(applicationProperties, PROPERTY_PREFIX);
properties = ConfigurationConverter.getProperties(subsetConfiguration);
//override to store offset in kafka
//todo do we need ability to replay?
//Override default configs
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//todo take group id as argument to allow multiple consumers??
properties.put(ConsumerConfig.GROUP_ID_CONFIG, ATLAS_GROUP);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "roundrobin");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest");
if (isEmbedded()) {
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + ATLAS_KAFKA_PORT);
properties.setProperty("zookeeper.connect", "localhost:" + ATLAS_ZK_PORT);
}
//todo new APIs not available yet
// consumer = new KafkaConsumer(properties);
// consumer.subscribe(ATLAS_HOOK_TOPIC);
}
@Override
protected void _startService() throws IOException {
startZk();
startKafka();
}
private String startZk() throws IOException {
//todo read zk endpoint from config
this.factory = NIOServerCnxnFactory.createFactory(new InetSocketAddress("0.0.0.0", ATLAS_ZK_PORT), 1024);
File snapshotDir = constructDir("zk/txn");
File logDir = constructDir("zk/snap");
try {
factory.startup(new ZooKeeperServer(snapshotDir, logDir, 500));
} catch (InterruptedException e) {
throw new IOException(e);
}
return factory.getLocalAddress().getAddress().toString();
}
private void startKafka() {
Properties brokerConfig = properties;
brokerConfig.setProperty("broker.id", "1");
//todo read kafka endpoint from config
brokerConfig.setProperty("host.name", "0.0.0.0");
brokerConfig.setProperty("port", String.valueOf(ATLAS_KAFKA_PORT));
brokerConfig.setProperty("log.dirs", constructDir("kafka").getAbsolutePath());
brokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1));
kafkaServer = new KafkaServer(new KafkaConfig(brokerConfig), new SystemTime());
kafkaServer.startup();
LOG.debug("Embedded kafka server started with broker config {}", brokerConfig);
}
private static class SystemTime implements Time {
@Override
public long milliseconds() {
return System.currentTimeMillis();
}
@Override
public long nanoseconds() {
return System.nanoTime();
}
@Override
public void sleep(long arg0) {
try {
Thread.sleep(arg0);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
private File constructDir(String dirPrefix) {
File file = new File(properties.getProperty(ATLAS_KAFKA_DATA), dirPrefix);
if (!file.exists() && !file.mkdirs()) {
throw new RuntimeException("could not create temp directory: " + file.getAbsolutePath());
}
return file;
}
@Override
public void _shutdown() {
if (producer != null) {
producer.close();
}
if (consumer != null) {
consumer.close();
}
for (ConsumerConnector consumerConnector : consumerConnectors) {
consumerConnector.shutdown();
}
if (kafkaServer != null) {
kafkaServer.shutdown();
}
if (factory != null) {
factory.shutdown();
}
}
@Override
public List<NotificationConsumer> createConsumers(NotificationType type, int numConsumers) {
String topic = topicMap.get(type);
ConsumerConnector consumerConnector =
Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(properties));
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, numConsumers);
StringDecoder decoder = new StringDecoder(null);
Map<String, List<KafkaStream<String, String>>> streamsMap =
consumerConnector.createMessageStreams(topicCountMap, decoder, decoder);
List<KafkaStream<String, String>> kafkaConsumers = streamsMap.get(topic);
List<NotificationConsumer> consumers = new ArrayList<>(numConsumers);
int consumerId = 0;
for (KafkaStream stream : kafkaConsumers) {
consumers.add(new org.apache.atlas.kafka.KafkaConsumer(stream, consumerId++));
}
consumerConnectors.add(consumerConnector);
return consumers;
}
@Override
public void send(NotificationType type, String... messages) throws NotificationException {
if (producer == null) {
createProducer();
}
String topic = topicMap.get(type);
List<Future<RecordMetadata>> futures = new ArrayList<>();
for (String message : messages) {
ProducerRecord record = new ProducerRecord(topic, message);
LOG.debug("Sending message for topic {}: {}", topic, message);
futures.add(producer.send(record));
}
for (Future<RecordMetadata> future : futures) {
try {
RecordMetadata response = future.get();
LOG.debug("Sent message for topic - {}, partition - {}, offset - {}", response.topic(),
response.partition(), response.offset());
} catch (Exception e) {
throw new NotificationException(e);
}
}
}
//New API, not used now
private List<String> receive(long timeout) throws NotificationException {
Map<String, ConsumerRecords> recordsMap = consumer.poll(timeout);
List<String> messages = new ArrayList<>();
if (recordsMap != null) {
for (ConsumerRecords records : recordsMap.values()) {
List<ConsumerRecord> recordList = records.records();
for (ConsumerRecord record : recordList) {
try {
String message = (String) record.value();
LOG.debug("Received message from topic {}: {}", ATLAS_HOOK_TOPIC, message);
messages.add(message);
} catch (Exception e) {
throw new NotificationException(e);
}
}
}
}
return messages;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
public interface NotificationConsumer {
/**
* If there are more messages
* @return
*/
boolean hasNext();
/**
* Next message - blocking call
* @return
*/
String next();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import org.apache.atlas.AtlasException;
public class NotificationException extends AtlasException {
public NotificationException(Exception e) {
super(e);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import com.google.inject.Inject;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class NotificationHookConsumer {
private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
public static final String ATLAS_ENDPOINT_PROPERTY = "atlas.rest.address";
@Inject
private static NotificationInterface notificationInterface;
private static ExecutorService executors;
private static AtlasClient atlasClient;
public static void start() throws AtlasException {
Configuration applicationProperties = ApplicationProperties.get();
notificationInterface.initialize(applicationProperties);
String atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000");
atlasClient = new AtlasClient(atlasEndpoint);
int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 2);
List<NotificationConsumer> consumers =
notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
executors = Executors.newFixedThreadPool(consumers.size());
for (final NotificationConsumer consumer : consumers) {
executors.submit(new HookConsumer(consumer));
}
}
public static void stop() {
notificationInterface.shutdown();
executors.shutdown();
}
static class HookConsumer implements Runnable {
private final NotificationConsumer consumer;
public HookConsumer(NotificationConsumer consumerInterface) {
this.consumer = consumerInterface;
}
@Override
public void run() {
while(consumer.hasNext()) {
String entityJson = consumer.next();
LOG.debug("Processing message {}", entityJson);
try {
atlasClient.createEntity(entityJson);
} catch (AtlasServiceException e) {
//todo handle failures
LOG.warn("Error handling message {}", entityJson);
}
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import org.apache.atlas.AtlasException;
import org.apache.commons.configuration.Configuration;
import java.io.IOException;
import java.util.List;
public abstract class NotificationInterface {
public static final String PROPERTY_PREFIX = "atlas.notification";
private static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded";
private boolean embedded;
public enum NotificationType {
HOOK, ENTITIES, TYPES
}
/**
* Initialise
* @param applicationProperties
* @throws AtlasException
*/
public void initialize(Configuration applicationProperties) throws AtlasException {
this.embedded = applicationProperties.getBoolean(PROPERTY_EMBEDDED, false);
}
/**
* Start embedded notification service on atlast server
* @throws IOException
*/
public final void startService() throws IOException {
if (embedded) {
_startService();
}
}
/**
* Is the notification service embedded in atlas server
* @return
*/
protected final boolean isEmbedded() {
return embedded;
}
protected abstract void _startService() throws IOException;
/**
* Shutdown - close all the connections
*/
public final void shutdown() {
_shutdown();
}
protected abstract void _shutdown();
public abstract List<NotificationConsumer> createConsumers(NotificationType type, int numConsumers);
public abstract void send(NotificationType type, String... messages) throws NotificationException;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import com.google.inject.AbstractModule;
import org.apache.atlas.kafka.KafkaNotification;
public class NotificationModule extends AbstractModule {
@Override
protected void configure() {
bind(NotificationInterface.class).to(KafkaNotification.class).asEagerSingleton();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.kafka;
import com.google.inject.Inject;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
@Guice(modules = NotificationModule.class)
public class KafkaNotificationTest {
@Inject
private NotificationInterface kafka;
@BeforeClass
public void setUp() throws Exception {
Configuration conf = ApplicationProperties.get();
conf.setProperty(KafkaNotification.PROPERTY_PREFIX + ".data", "target/data/kafka" + random());
kafka.initialize(conf);
kafka.startService();
}
@Test
public void testSendMessage() throws AtlasException {
String msg1 = "message" + random();
String msg2 = "message" + random();
kafka.send(NotificationInterface.NotificationType.HOOK, msg1, msg2);
NotificationConsumer consumer = kafka.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
Assert.assertTrue(consumer.hasNext());
Assert.assertEquals(msg1, consumer.next());
Assert.assertTrue(consumer.hasNext());
Assert.assertEquals(msg2, consumer.next());
}
private String random() {
return RandomStringUtils.randomAlphanumeric(5);
}
@AfterClass
public void teardown() throws Exception {
kafka.shutdown();
}
}
......@@ -329,6 +329,7 @@
<titan.version>0.5.4</titan.version>
<hadoop.version>2.7.0</hadoop.version>
<hbase.version>0.98.9-hadoop2</hbase.version>
<kafka.version>0.8.2.0</kafka.version>
<!-- scala versions -->
<scala.version>2.10.4</scala.version>
......@@ -420,6 +421,7 @@
</profiles>
<modules>
<module>typesystem</module>
<module>notification</module>
<module>client</module>
<module>repository</module>
<module>webapp</module>
......@@ -933,6 +935,12 @@
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-notification</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-client</artifactId>
<version>${project.version}</version>
</dependency>
......@@ -1114,6 +1122,25 @@
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>
......
......@@ -8,6 +8,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
ATLAS-3 Mixed Index creation fails with Date types (suma.shivaprasad via shwethags)
ALL CHANGES:
ATLAS-74 Create notification framework (shwethags)
ATLAS-93 import-hive.sh reports FileNotFoundException (shwethags)
ATLAS-92 import-hive.sh failed to find HiveMetaStoreBridge (airbots via shwethags)
ATLAS-16 jersey jaxb exception (shwethags)
......
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######### Graph Database Configs #########
#Refer http://s3.thinkaurelius.com/docs/titan/0.5.1/titan-config-ref.html
# Graph Storage
atlas.graph.storage.backend=${titan.storage.backend}
#Berkeley storage directory
atlas.graph.storage.directory=target/data/berkley
#hbase
#For standalone mode , specify localhost
#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2
atlas.graph.storage.hostname=${titan.storage.hostname}
# Graph Search Index Backend
atlas.graph.index.search.backend=${titan.index.backend}
#lucene
#atlas.graph.index.search.directory=target/data/lucene
#elasticsearch
atlas.graph.index.search.directory=./target/data/es
atlas.graph.index.search.elasticsearch.client-only=false
atlas.graph.index.search.elasticsearch.local-mode=true
atlas.graph.index.search.elasticsearch.create.sleep=2000
#solr in cloud mode
atlas.graph.index.search.solr.mode=cloud
atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address}
#solr in http mode
atlas.graph.index.search.solr.http-urls=http://localhost:8983/solr
######### Hive Lineage Configs #########
#atlas.lineage.hive.table.type.name=DataSet
#atlas.lineage.hive.process.type.name=Process
#atlas.lineage.hive.process.inputs.name=inputs
#atlas.lineage.hive.process.outputs.name=outputs
## Schema
atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, columns
######### Security Properties #########
# SSL config
atlas.enableTLS=false
......@@ -45,6 +45,10 @@ atlas.graph.index.search.elasticsearch.client-only=false
atlas.graph.index.search.elasticsearch.local-mode=true
atlas.graph.index.search.elasticsearch.create.sleep=2000
######### Notification Configs #########
atlas.notification.embedded=true
atlas.notification.kafka.data=${sys:atlas.home}/data/kafka
######### Hive Lineage Configs #########
# This models reflects the base super types for Data and Process
#atlas.lineage.hive.table.type.name=DataSet
......
......@@ -21,10 +21,12 @@
# SSL config
atlas.enableTLS=false
truststore.file=/path/to/truststore.jks
cert.stores.credential.provider.path=jceks://file/path/to/credentialstore.jceks
# following only required for 2-way SSL
keystore.file=/path/to/keystore.jks
#truststore.file=/path/to/truststore.jks
#cert.stores.credential.provider.path=jceks://file/path/to/credentialstore.jceks
#following only required for 2-way SSL
#keystore.file=/path/to/keystore.jks
# Authentication config
......
......@@ -18,8 +18,47 @@
######### Graph Database Configs #########
# Graph Storage
atlas.graph.storage.backend=inmemory
atlas.graph.storage.backend=${titan.storage.backend}
# Graph Search Index
atlas.graph.index.search.backend=lucene
atlas.graph.index.search.directory=target/data/lucene
# Graph Search Index Backend
atlas.graph.index.search.backend=${titan.index.backend}
#Berkeley storage directory
atlas.graph.storage.directory=target/data/berkley
#hbase
#For standalone mode , specify localhost
#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2
atlas.graph.storage.hostname=${titan.storage.hostname}
#ElasticSearch
atlas.graph.index.search.directory=target/data/es
atlas.graph.index.search.elasticsearch.client-only=false
atlas.graph.index.search.elasticsearch.local-mode=true
atlas.graph.index.search.elasticsearch.create.sleep=2000
# Solr cloud mode properties
atlas.graph.index.search.solr.mode=cloud
atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address}
######### Hive Lineage Configs #########
# This models reflects the base super types for Data and Process
#atlas.lineage.hive.table.type.name=DataSet
#atlas.lineage.hive.process.type.name=Process
#atlas.lineage.hive.process.inputs.name=inputs
#atlas.lineage.hive.process.outputs.name=outputs
## Schema
atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, columns
######### Notification Configs #########
atlas.notification.embedded=true
atlas.notification.implementation=org.apache.atlas.kafka.KafkaNotification
atlas.notification.kafka.data=target/data/kafka
######### Security Properties #########
# SSL config
atlas.enableTLS=false
######### Security Properties #########
......@@ -27,15 +27,6 @@
</layout>
</appender>
<appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
<param name="File" value="${user.dir}/target/logs/application.log"/>
<param name="Append" value="true"/>
<param name="Threshold" value="debug"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/>
</layout>
</appender>
<appender name="AUDIT" class="org.apache.log4j.DailyRollingFileAppender">
<param name="File" value="${user.dir}/target/logs/audit.log"/>
<param name="Append" value="true"/>
......@@ -55,23 +46,8 @@
<appender-ref ref="console"/>
</logger>
<logger name="com.thinkaurelius.titan" additivity="false">
<level value="warn"/>
<appender-ref ref="console"/>
</logger>
<logger name="org.elasticsearch" additivity="false">
<level value="warn"/>
<appender-ref ref="console"/>
</logger>
<logger name="org.apache.lucene" additivity="false">
<level value="warn"/>
<appender-ref ref="console"/>
</logger>
<root>
<priority value="info"/>
<priority value="warn"/>
<appender-ref ref="console"/>
</root>
......
......@@ -41,10 +41,27 @@ public final class Main {
private static final String APP_PORT = "port";
private static final String ATLAS_HOME = "atlas.home";
private static final String ATLAS_LOG_DIR = "atlas.log.dir";
public static final String ATLAS_SERVER_HTTPS_PORT =
"atlas.server.https.port";
public static final String ATLAS_SERVER_HTTP_PORT =
"atlas.server.http.port";
public static final String ATLAS_SERVER_HTTPS_PORT = "atlas.server.https.port";
public static final String ATLAS_SERVER_HTTP_PORT = "atlas.server.http.port";
private static EmbeddedServer server;
static {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
shutdown();
} catch (Exception e) {
LOG.debug("Failed to shutdown", e);
}
}
});
}
private static void shutdown() {
server.stop();
}
/**
* Prevent users from constructing this.
......@@ -84,7 +101,7 @@ public final class Main {
configuration.setProperty("atlas.enableTLS", String.valueOf(enableTLS));
showStartupInfo(buildConfiguration, enableTLS, appPort);
EmbeddedServer server = EmbeddedServer.newServer(appPort, appPath, enableTLS);
server = EmbeddedServer.newServer(appPort, appPath, enableTLS);
server.start();
}
......
......@@ -18,14 +18,16 @@
package org.apache.atlas.web.service;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.atlas.ApplicationProperties;
import org.apache.commons.configuration.Configuration;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.webapp.WebAppContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
......@@ -33,6 +35,8 @@ import java.io.IOException;
* This class embeds a Jetty server and a connector.
*/
public class EmbeddedServer {
public static final Logger LOG = LoggerFactory.getLogger(EmbeddedServer.class);
private static final int DEFAULT_BUFFER_SIZE = 16192;
protected final Server server = new Server();
......@@ -71,9 +75,9 @@ public class EmbeddedServer {
protected Integer getBufferSize() {
try {
PropertiesConfiguration configuration = new PropertiesConfiguration("application.properties");
Configuration configuration = ApplicationProperties.get();
return configuration.getInt("atlas.jetty.request.buffer.size", DEFAULT_BUFFER_SIZE);
} catch (ConfigurationException e) {
} catch (Exception e) {
// do nothing
}
......@@ -85,7 +89,11 @@ public class EmbeddedServer {
server.join();
}
public void stop() throws Exception {
public void stop() {
try {
server.stop();
} catch (Exception e) {
LOG.warn("Error during shutdown", e);
}
}
}
......@@ -57,7 +57,7 @@
</logger>
<root>
<priority value="info"/>
<priority value="warn"/>
<appender-ref ref="FILE"/>
</root>
......
......@@ -46,4 +46,9 @@ public class TestUtils {
public static String getTempDirectory() {
return System.getProperty("projectBaseDir") + "/webapp/target/" + random();
}
public static String getWarPath() {
return System.getProperty("projectBaseDir") + String.format("/webapp/target/atlas-webapp-%s",
System.getProperty("project.version"));
}
}
......@@ -16,9 +16,9 @@
*/
package org.apache.atlas.web.security;
import org.apache.atlas.web.TestUtils;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.ssl.SSLHostnameVerifier;
......@@ -35,10 +35,7 @@ import java.nio.file.Files;
import java.util.Locale;
import java.util.Properties;
import static org.apache.atlas.security.SecurityProperties.CERT_STORES_CREDENTIAL_PROVIDER_PATH;
import static org.apache.atlas.security.SecurityProperties.KEYSTORE_FILE_KEY;
import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED;
import static org.apache.atlas.security.SecurityProperties.TRUSTSTORE_FILE_KEY;
import static org.apache.atlas.security.SecurityProperties.*;
/**
*
......@@ -110,8 +107,7 @@ public class BaseSecurityTest {
}
protected String getWarPath() {
return System.getProperty("projectBaseDir") + String.format("/webapp/target/atlas-webapp-%s",
System.getProperty("project.version"));
return TestUtils.getWarPath();
}
protected PropertiesConfiguration getSSLConfiguration(String providerUrl) {
......
......@@ -17,6 +17,7 @@
package org.apache.atlas.web.service;
import org.apache.atlas.web.TestUtils;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.testng.Assert;
import org.testng.annotations.Test;
......@@ -37,9 +38,7 @@ public class SecureEmbeddedServerIT extends SecureEmbeddedServerITBase {
SecureEmbeddedServer secureEmbeddedServer = null;
try {
String appPath = System.getProperty("user.dir") + getWarPath();
secureEmbeddedServer = new SecureEmbeddedServer(21443, appPath) {
secureEmbeddedServer = new SecureEmbeddedServer(21443, TestUtils.getWarPath()) {
@Override
protected PropertiesConfiguration getConfiguration() {
return configuration;
......
......@@ -19,6 +19,7 @@ package org.apache.atlas.web.service;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import org.apache.atlas.web.TestUtils;
import org.apache.atlas.web.resources.AdminJerseyResourceIT;
import org.apache.atlas.web.resources.BaseResourceIT;
import org.apache.atlas.web.resources.EntityJerseyResourceIT;
......@@ -31,7 +32,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.hadoop.security.alias.JavaKeyStoreProvider;
import org.eclipse.jetty.webapp.WebAppContext;
import org.testng.Assert;
import org.testng.TestListenerAdapter;
import org.testng.TestNG;
......@@ -45,11 +45,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import static org.apache.atlas.security.SecurityProperties.CERT_STORES_CREDENTIAL_PROVIDER_PATH;
import static org.apache.atlas.security.SecurityProperties.DEFAULT_KEYSTORE_FILE_LOCATION;
import static org.apache.atlas.security.SecurityProperties.KEYSTORE_PASSWORD_KEY;
import static org.apache.atlas.security.SecurityProperties.SERVER_CERT_PASSWORD_KEY;
import static org.apache.atlas.security.SecurityProperties.TRUSTSTORE_PASSWORD_KEY;
import static org.apache.atlas.security.SecurityProperties.*;
/**
* Secure Test class for jersey resources.
......@@ -106,18 +102,13 @@ public class SecureEmbeddedServerITBase {
public void testNoConfiguredCredentialProvider() throws Exception {
try {
secureEmbeddedServer = new SecureEmbeddedServer(21443, "webapp/target/apache-atlas");
WebAppContext webapp = new WebAppContext();
webapp.setContextPath("/");
webapp.setWar(System.getProperty("user.dir") + getWarPath());
secureEmbeddedServer.server.setHandler(webapp);
secureEmbeddedServer = new SecureEmbeddedServer(21443, TestUtils.getWarPath());
secureEmbeddedServer.server.start();
Assert.fail("Should have thrown an exception");
} catch (IOException e) {
Assert.assertEquals("No credential provider path configured for storage of certificate store passwords",
e.getMessage());
Assert.assertEquals(e.getMessage(),
"No credential provider path configured for storage of certificate store passwords");
} finally {
secureEmbeddedServer.server.stop();
}
......@@ -130,7 +121,7 @@ public class SecureEmbeddedServerITBase {
configuration.setProperty(CERT_STORES_CREDENTIAL_PROVIDER_PATH, providerUrl);
try {
secureEmbeddedServer = new SecureEmbeddedServer(21443, "webapp/target/apache-atlas") {
secureEmbeddedServer = new SecureEmbeddedServer(21443, TestUtils.getWarPath()) {
@Override
protected PropertiesConfiguration getConfiguration() {
return configuration;
......@@ -157,17 +148,12 @@ public class SecureEmbeddedServerITBase {
setupCredentials();
try {
secureEmbeddedServer = new SecureEmbeddedServer(21443, "webapp/target/apache-atlas") {
secureEmbeddedServer = new SecureEmbeddedServer(21443, TestUtils.getWarPath()) {
@Override
protected PropertiesConfiguration getConfiguration() {
return configuration;
}
};
WebAppContext webapp = new WebAppContext();
webapp.setContextPath("/");
webapp.setWar(System.getProperty("user.dir") + getWarPath());
secureEmbeddedServer.server.setHandler(webapp);
secureEmbeddedServer.server.start();
TestListenerAdapter tla = new TestListenerAdapter();
......@@ -184,11 +170,6 @@ public class SecureEmbeddedServerITBase {
}
protected String getWarPath() {
return String
.format("/target/atlas-webapp-%s", System.getProperty("project.version"));
}
protected void setupCredentials() throws Exception {
Configuration conf = new Configuration(false);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment