Commit 3ee4f253 by Madhan Neethiraj

ATLAS-2289: separate embedded kafka/zookeeper start/stop from KafkaNotification

parent 3205ca4a
......@@ -51,6 +51,7 @@ public class Services {
try {
for (Service service : services) {
LOG.info("Starting service {}", service.getClass().getName());
service.start();
}
} catch (Exception e) {
......@@ -61,12 +62,17 @@ public class Services {
@PreDestroy
public void stop() {
for (Service service : services) {
LOG.info("Stopping service {}", service.getClass().getName());
try {
service.stop();
} catch (Throwable e) {
LOG.warn("Error stopping service {}", service.getClass().getName(), e);
if (configuration.getBoolean("atlas.services.enabled", true)) {
for (int idx = services.size() - 1; idx >= 0; idx--) {
Service service = services.get(idx);
LOG.info("Stopping service {}", service.getClass().getName());
try {
service.stop();
} catch (Throwable e) {
LOG.warn("Error stopping service {}", service.getClass().getName(), e);
}
}
}
}
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.kafka;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.Time;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.kafka.clients.producer.*;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import scala.Option;
import javax.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.*;
@Component
@Order(2)
public class EmbeddedKafkaServer implements Service {
public static final Logger LOG = LoggerFactory.getLogger(EmbeddedKafkaServer.class);
public static final String PROPERTY_PREFIX = "atlas.kafka";
private static final String ATLAS_KAFKA_DATA = "data";
public static final String PROPERTY_EMBEDDED = "atlas.notification.embedded";
private final boolean isEmbedded;
private final Properties properties;
private KafkaServer kafkaServer;
private ServerCnxnFactory factory;
@Inject
public EmbeddedKafkaServer(Configuration applicationProperties) throws AtlasException {
Configuration kafkaConf = ApplicationProperties.getSubsetConfiguration(applicationProperties, PROPERTY_PREFIX);
this.isEmbedded = applicationProperties.getBoolean(PROPERTY_EMBEDDED, false);
this.properties = ConfigurationConverter.getProperties(kafkaConf);
}
@Override
public void start() throws AtlasException {
LOG.info("==> EmbeddedKafkaServer.start(isEmbedded={})", isEmbedded);
if (isEmbedded) {
try {
startZk();
startKafka();
} catch (Exception e) {
throw new AtlasException("Failed to start embedded kafka", e);
}
} else {
LOG.info("==> EmbeddedKafkaServer.start(): not embedded..nothing todo");
}
LOG.info("<== EmbeddedKafkaServer.start(isEmbedded={})", isEmbedded);
}
@Override
public void stop() {
LOG.info("==> EmbeddedKafkaServer.stop(isEmbedded={})", isEmbedded);
if (kafkaServer != null) {
kafkaServer.shutdown();
}
if (factory != null) {
factory.shutdown();
}
LOG.info("<== EmbeddedKafka.stop(isEmbedded={})", isEmbedded);
}
private String startZk() throws IOException, InterruptedException, URISyntaxException {
String zkValue = properties.getProperty("zookeeper.connect");
LOG.info("Starting zookeeper at {}", zkValue);
URL zkAddress = getURL(zkValue);
File snapshotDir = constructDir("zk/txn");
File logDir = constructDir("zk/snap");
factory = NIOServerCnxnFactory.createFactory(new InetSocketAddress(zkAddress.getHost(), zkAddress.getPort()), 1024);
factory.startup(new ZooKeeperServer(snapshotDir, logDir, 500));
String ret = factory.getLocalAddress().getAddress().toString();
LOG.info("Embedded zookeeper for Kafka started at {}", ret);
return ret;
}
private void startKafka() throws IOException, URISyntaxException {
String kafkaValue = properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
LOG.info("Starting kafka at {}", kafkaValue);
URL kafkaAddress = getURL(kafkaValue);
Properties brokerConfig = properties;
brokerConfig.setProperty("broker.id", "1");
brokerConfig.setProperty("host.name", kafkaAddress.getHost());
brokerConfig.setProperty("port", String.valueOf(kafkaAddress.getPort()));
brokerConfig.setProperty("log.dirs", constructDir("kafka").getAbsolutePath());
brokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1));
kafkaServer = new KafkaServer(KafkaConfig.fromProps(brokerConfig), new SystemTime(), Option.apply(this.getClass().getName()));
kafkaServer.startup();
LOG.info("Embedded kafka server started with broker config {}", brokerConfig);
}
private File constructDir(String dirPrefix) {
File file = new File(properties.getProperty(ATLAS_KAFKA_DATA), dirPrefix);
if (!file.exists() && !file.mkdirs()) {
throw new RuntimeException("could not create temp directory: " + file.getAbsolutePath());
}
return file;
}
private URL getURL(String url) throws MalformedURLException {
try {
return new URL(url);
} catch (MalformedURLException e) {
return new URL("http://" + url);
}
}
// ----- inner class : SystemTime ----------------------------------------
private static class SystemTime implements Time {
@Override
public long milliseconds() {
return System.currentTimeMillis();
}
@Override
public long nanoseconds() {
return System.nanoTime();
}
@Override
public void sleep(long arg0) {
try {
Thread.sleep(arg0);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
......@@ -18,26 +18,18 @@
package org.apache.atlas.notification;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.model.notification.AtlasNotificationBaseMessage;
import org.apache.atlas.model.notification.AtlasNotificationMessage;
import org.apache.atlas.model.notification.AtlasNotificationStringMessage;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.model.notification.AtlasNotificationBaseMessage.CompressionKind;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.model.notification.MessageVersion;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.codehaus.jettison.json.JSONArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Type;
import java.net.Inet4Address;
import java.net.UnknownHostException;
import java.util.ArrayList;
......@@ -63,8 +55,6 @@ public abstract class AbstractNotification implements NotificationInterface {
*/
public static final MessageVersion CURRENT_MESSAGE_VERSION = new MessageVersion("1.0.0");
public static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded";
public static final int MAX_BYTES_PER_CHAR = 4; // each char can encode upto 4 bytes in UTF-8
/**
......@@ -77,20 +67,13 @@ public abstract class AbstractNotification implements NotificationInterface {
*/
private static String currentUser = "";
private final boolean embedded;
private final boolean isHAEnabled;
// ----- Constructors ----------------------------------------------------
public AbstractNotification(Configuration applicationProperties) throws AtlasException {
this.embedded = applicationProperties.getBoolean(PROPERTY_EMBEDDED, false);
this.isHAEnabled = HAConfiguration.isHAEnabled(applicationProperties);
}
@VisibleForTesting
protected AbstractNotification() {
embedded = false;
isHAEnabled = false;
}
// ----- NotificationInterface -------------------------------------------
......@@ -117,25 +100,6 @@ public abstract class AbstractNotification implements NotificationInterface {
}
// ----- AbstractNotification --------------------------------------------
/**
* Determine whether or not the notification service embedded in Atlas server.
*
* @return true if the the notification service embedded in Atlas server.
*/
protected final boolean isEmbedded() {
return embedded;
}
/**
* Determine whether or not the high availability feature is enabled.
*
* @return true if the high availability feature is enabled.
*/
protected final boolean isHAEnabled() {
return isHAEnabled;
}
/**
* Send the given messages.
*
......@@ -250,30 +214,6 @@ public abstract class AbstractNotification implements NotificationInterface {
}
}
// ----- serializers -----------------------------------------------------
/**
* Serializer for Referenceable.
*/
public static final class ReferenceableSerializer implements JsonSerializer<Referenceable> {
@Override
public JsonElement serialize(Referenceable src, Type typeOfSrc, JsonSerializationContext context) {
String instanceJson = AtlasType.toV1Json(src);
return new JsonParser().parse(instanceJson).getAsJsonObject();
}
}
/**
* Serializer for JSONArray.
*/
public static final class JSONArraySerializer implements JsonSerializer<JSONArray> {
@Override
public JsonElement serialize(JSONArray src, Type typeOfSrc, JsonSerializationContext context) {
return new JsonParser().parse(src.toString()).getAsJsonArray();
}
}
private static String getNextMessageId() {
String nextMsgIdPrefix = msgIdPrefix;
int nextMsgIdSuffix = msgIdSuffix.getAndIncrement();
......
......@@ -35,22 +35,17 @@ import java.util.List;
import static org.testng.Assert.assertEquals;
public class KafkaNotificationTest {
private EmbeddedKafkaServer kafkaServer;
private KafkaNotification kafkaNotification;
@BeforeClass
public void setup() throws Exception {
Configuration properties = ApplicationProperties.get();
properties.setProperty("atlas.kafka.data", "target/" + RandomStringUtils.randomAlphanumeric(5));
kafkaNotification = new KafkaNotification(properties);
kafkaNotification.start();
initNotificationService();
}
@AfterClass
public void shutdown() throws Exception {
kafkaNotification.close();
kafkaNotification.stop();
cleanUpNotificationService();
}
@Test
......@@ -81,4 +76,29 @@ public class KafkaNotificationTest {
consumer.close();
}
void initNotificationService() throws Exception {
Configuration applicationProperties = ApplicationProperties.get();
applicationProperties.setProperty("atlas.kafka.data", "target/" + RandomStringUtils.randomAlphanumeric(5));
kafkaServer = new EmbeddedKafkaServer(applicationProperties);
kafkaNotification = new KafkaNotification(applicationProperties);
kafkaServer.start();
kafkaNotification.start();
Thread.sleep(2000);
}
void cleanUpNotificationService() throws Exception {
if (kafkaNotification != null) {
kafkaNotification.close();
kafkaNotification.stop();
}
if (kafkaServer != null) {
kafkaServer.stop();
}
}
}
......@@ -30,6 +30,7 @@ import org.apache.atlas.v1.model.typedef.*;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.v1.typesystem.types.utils.TypesUtil;
import org.apache.atlas.web.integration.BaseResourceIT;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import java.util.*;
......@@ -44,7 +45,6 @@ import static org.testng.Assert.assertTrue;
public class EntityNotificationIT extends BaseResourceIT {
private final String DATABASE_NAME = "db" + randomString();
private final String TABLE_NAME = "table" + randomString();
private final NotificationInterface notificationInterface = NotificationProvider.get();
private Id tableId;
private Id dbId;
private String traitName;
......@@ -54,6 +54,8 @@ public class EntityNotificationIT extends BaseResourceIT {
public void setUp() throws Exception {
super.setUp();
initNotificationService();
createTypeDefinitionsV1();
Referenceable HiveDBInstance = createHiveDBInstanceBuiltIn(DATABASE_NAME);
......@@ -63,6 +65,11 @@ public class EntityNotificationIT extends BaseResourceIT {
notificationConsumer = notificationInterface.createConsumers(NotificationType.ENTITIES, 1).get(0);
}
@AfterClass
public void teardown() throws Exception {
cleanUpNotificationService();
}
public void testCreateEntity() throws Exception {
Referenceable tableInstance = createHiveTableInstanceBuiltIn(DATABASE_NAME, TABLE_NAME, dbId);
......
......@@ -19,7 +19,6 @@
package org.apache.atlas.notification;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.kafka.NotificationProvider;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.v1.model.instance.Id;
import org.apache.atlas.v1.model.instance.Referenceable;
......@@ -46,18 +45,18 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
public static final String QUALIFIED_NAME = "qualifiedName";
public static final String CLUSTER_NAME = "clusterName";
private final NotificationInterface notificationInterface = NotificationProvider.get();
@BeforeClass
public void setUp() throws Exception {
super.setUp();
initNotificationService();
createTypeDefinitionsV1();
}
@AfterClass
public void teardown() throws Exception {
notificationInterface.close();
cleanUpNotificationService();
}
private void sendHookMessage(HookNotification message) throws NotificationException, InterruptedException {
......
......@@ -22,9 +22,7 @@ import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.kafka.NotificationProvider;
import org.apache.atlas.kafka.*;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.v1.model.instance.Referenceable;
......@@ -45,7 +43,6 @@ import org.testng.annotations.Test;
import java.util.List;
import org.apache.atlas.kafka.AtlasKafkaConsumer;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
......@@ -62,7 +59,9 @@ public class NotificationHookConsumerKafkaTest {
public static final String DESCRIPTION = "description";
public static final String QUALIFIED_NAME = "qualifiedName";
private final NotificationInterface notificationInterface = NotificationProvider.get();
private NotificationInterface notificationInterface = null;
private EmbeddedKafkaServer kafkaServer = null;
private KafkaNotification kafkaNotification = null;
@Mock
......@@ -77,8 +76,6 @@ public class NotificationHookConsumerKafkaTest {
@Mock
private AtlasTypeRegistry typeRegistry;
private KafkaNotification kafkaNotification;
@BeforeTest
public void setup() throws AtlasException, InterruptedException, AtlasBaseException {
MockitoAnnotations.initMocks(this);
......@@ -90,64 +87,53 @@ public class NotificationHookConsumerKafkaTest {
when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity);
kafkaNotification = startKafkaServer();
initNotificationService();
}
@AfterTest
public void shutdown() {
kafkaNotification.close();
kafkaNotification.stop();
cleanUpNotificationService();
}
@Test
public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException, AtlasBaseException {
try {
produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity()));
produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity()));
NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, false);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, false);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
consumeOneMessage(consumer, hookConsumer);
consumeOneMessage(consumer, hookConsumer);
verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean());
verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean());
// produce another message, and make sure it moves ahead. If commit succeeded, this would work.
produceMessage(new HookNotificationV1.EntityCreateRequest("test_user2", createEntity()));
consumeOneMessage(consumer, hookConsumer);
// produce another message, and make sure it moves ahead. If commit succeeded, this would work.
produceMessage(new HookNotificationV1.EntityCreateRequest("test_user2", createEntity()));
consumeOneMessage(consumer, hookConsumer);
verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
reset(atlasEntityStore);
}
finally {
kafkaNotification.close();
}
verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
reset(atlasEntityStore);
}
@Test(dependsOnMethods = "testConsumerConsumesNewMessageWithAutoCommitDisabled")
public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() throws Exception {
try {
produceMessage(new HookNotificationV1.EntityCreateRequest("test_user3", createEntity()));
produceMessage(new HookNotificationV1.EntityCreateRequest("test_user3", createEntity()));
NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, true);
NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, true);
assertNotNull (consumer);
assertNotNull (consumer);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
consumeOneMessage(consumer, hookConsumer);
verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean());
consumeOneMessage(consumer, hookConsumer);
verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean());
// produce another message, but this will not be consumed, as commit code is not executed in hook consumer.
produceMessage(new HookNotificationV1.EntityCreateRequest("test_user4", createEntity()));
// produce another message, but this will not be consumed, as commit code is not executed in hook consumer.
produceMessage(new HookNotificationV1.EntityCreateRequest("test_user4", createEntity()));
consumeOneMessage(consumer, hookConsumer);
verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
}
finally {
kafkaNotification.close();
}
consumeOneMessage(consumer, hookConsumer);
verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
}
AtlasKafkaConsumer<HookNotification> createNewConsumer(KafkaNotification kafkaNotification, boolean autoCommitEnabled) {
......@@ -185,25 +171,38 @@ public class NotificationHookConsumerKafkaTest {
return entity;
}
KafkaNotification startKafkaServer() throws AtlasException, InterruptedException {
protected String randomString() {
return RandomStringUtils.randomAlphanumeric(10);
}
private void produceMessage(HookNotification message) throws NotificationException {
kafkaNotification.send(NotificationInterface.NotificationType.HOOK, message);
}
void initNotificationService() throws AtlasException, InterruptedException {
Configuration applicationProperties = ApplicationProperties.get();
applicationProperties.setProperty("atlas.kafka.data", "target/" + RandomStringUtils.randomAlphanumeric(5));
kafkaNotification = new KafkaNotification(applicationProperties);
kafkaServer = new EmbeddedKafkaServer(applicationProperties);
kafkaNotification = new KafkaNotification(applicationProperties);
notificationInterface = kafkaNotification;
kafkaServer.start();
kafkaNotification.start();
Thread.sleep(2000);
return kafkaNotification;
}
protected String randomString() {
return RandomStringUtils.randomAlphanumeric(10);
}
void cleanUpNotificationService() {
if (kafkaNotification != null) {
kafkaNotification.close();
kafkaNotification.stop();
}
private void produceMessage(HookNotification message) throws NotificationException {
kafkaNotification.send(NotificationInterface.NotificationType.HOOK, message);
if (kafkaServer != null) {
kafkaServer.stop();
}
}
}
......@@ -34,6 +34,7 @@ import org.apache.atlas.model.typedef.*;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.v1.model.instance.Id;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct;
......@@ -95,6 +96,11 @@ public abstract class BaseResourceIT {
protected AtlasClientV2 atlasClientV2;
protected String[] atlasUrls;
protected NotificationInterface notificationInterface = null;
protected EmbeddedKafkaServer kafkaServer = null;
protected KafkaNotification kafkaNotification = null;
@BeforeClass
public void setUp() throws Exception {
//set high timeouts so that tests do not fail due to read timeouts while you
......@@ -675,4 +681,30 @@ public abstract class BaseResourceIT {
protected JSONArray searchByDSL(String dslQuery) throws AtlasServiceException {
return atlasClientV1.searchByDSL(dslQuery, 10, 0);
}
protected void initNotificationService() throws Exception {
Configuration applicationProperties = ApplicationProperties.get();
applicationProperties.setProperty("atlas.kafka.data", "target/" + RandomStringUtils.randomAlphanumeric(5));
kafkaServer = new EmbeddedKafkaServer(applicationProperties);
kafkaNotification = new KafkaNotification(applicationProperties);
notificationInterface = kafkaNotification;
kafkaServer.start();
kafkaNotification.start();
Thread.sleep(2000);
}
protected void cleanUpNotificationService() {
if (kafkaNotification != null) {
kafkaNotification.close();
kafkaNotification.stop();
}
if (kafkaServer != null) {
kafkaServer.stop();
}
}
}
......@@ -63,8 +63,6 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
private static final String TRAITS = "traits";
private NotificationInterface notificationInterface = NotificationProvider.get();
@BeforeClass
public void setUp() throws Exception {
super.setUp();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment