Commit a2e7738a by Hemanth Yamijala

ATLAS-901 Log messages that cannot be sent to Kafka to a specific log configuration (yhemanth)

parent 86dd72af
......@@ -168,9 +168,17 @@ atlas.notification.replicas=1
atlas.notification.kafka.service.principal=kafka/_HOST@EXAMPLE.COM
# Set this to the location of the keytab file for Kafka
#atlas.notification.kafka.keytab.location=/etc/security/keytabs/kafka.service.keytab
</verbatim>
These configuration parameters are useful for saving messages in case there are issues in reaching Kafka for
sending messages.
<verbatim>
# Whether to save messages that failed to be sent to Kafka, default is true
atlas.notification.log.failed.messages=true
# If saving messages is enabled, the file name to save them to. This file will be created under the log directory of the hook's host component - like HiveServer2
atlas.notification.failed.messages.filename=atlas_hook_failed_messages.log
</verbatim>
---++ Client Configs
<verbatim>
......
......@@ -18,9 +18,11 @@
package org.apache.atlas.hook;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Guice;
import com.google.inject.Injector;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule;
import org.apache.atlas.notification.hook.HookNotification;
......@@ -50,6 +52,15 @@ public abstract class AtlasHook {
protected static NotificationInterface notifInterface;
private static boolean logFailedMessages;
private static FailedMessagesLogger failedMessagesLogger;
public static final String ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY =
"atlas.notification.failed.messages.filename";
public static final String ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME = "atlas_hook_failed_messages.log";
public static final String ATLAS_NOTIFICATION_LOG_FAILED_MESSAGES_ENABLED_KEY =
"atlas.notification.log.failed.messages";
static {
try {
atlasProperties = ApplicationProperties.get();
......@@ -57,6 +68,14 @@ public abstract class AtlasHook {
LOG.info("Failed to load application properties", e);
}
String failedMessageFile = atlasProperties.getString(ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY,
ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME);
logFailedMessages = atlasProperties.getBoolean(ATLAS_NOTIFICATION_LOG_FAILED_MESSAGES_ENABLED_KEY, true);
if (logFailedMessages) {
failedMessagesLogger = new FailedMessagesLogger(failedMessageFile);
failedMessagesLogger.init();
}
Injector injector = Guice.createInjector(new NotificationModule());
notifInterface = injector.getInstance(NotificationInterface.class);
......@@ -89,18 +108,31 @@ public abstract class AtlasHook {
* @param maxRetries maximum number of retries while sending message to messaging system
*/
public static void notifyEntities(List<HookNotification.HookNotificationMessage> messages, int maxRetries) {
notifyEntitiesInternal(messages, maxRetries, notifInterface, logFailedMessages, failedMessagesLogger);
}
@VisibleForTesting
static void notifyEntitiesInternal(List<HookNotification.HookNotificationMessage> messages, int maxRetries,
NotificationInterface notificationInterface,
boolean shouldLogFailedMessages, FailedMessagesLogger logger) {
final String message = messages.toString();
int numRetries = 0;
while (true) {
try {
notifInterface.send(NotificationInterface.NotificationType.HOOK, messages);
notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages);
return;
} catch(Exception e) {
} catch (Exception e) {
numRetries++;
if (numRetries < maxRetries) {
LOG.debug("Failed to notify atlas for entity {}. Retrying", message, e);
} else {
if (shouldLogFailedMessages && e instanceof NotificationException) {
List<String> failedMessages = ((NotificationException) e).getFailedMessages();
for (String msg : failedMessages) {
logger.log(msg);
}
}
LOG.error("Failed to notify atlas for entity {} after {} retries. Quitting",
message, maxRetries, e);
return;
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hook;
import org.apache.log4j.Appender;
import org.apache.log4j.DailyRollingFileAppender;
import org.apache.log4j.FileAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import java.io.File;
import java.io.IOException;
import java.util.Enumeration;
/**
* A logger wrapper that can be used to write messages that failed to be sent to a log file.
*/
public class FailedMessagesLogger {
public static final String PATTERN_SPEC_TIMESTAMP_MESSAGE_NEWLINE = "%d{ISO8601} %m%n";
public static final String DATE_PATTERN = ".yyyy-MM-dd";
private final Logger logger = Logger.getLogger("org.apache.atlas.hook.FailedMessagesLogger");
private String failedMessageFile;
public FailedMessagesLogger(String failedMessageFile) {
this.failedMessageFile = failedMessageFile;
}
void init() {
String rootLoggerDirectory = getRootLoggerDirectory();
if (rootLoggerDirectory == null) {
return;
}
String absolutePath = new File(rootLoggerDirectory, failedMessageFile).getAbsolutePath();
try {
DailyRollingFileAppender failedLogFilesAppender = new DailyRollingFileAppender(
new PatternLayout(PATTERN_SPEC_TIMESTAMP_MESSAGE_NEWLINE), absolutePath, DATE_PATTERN);
logger.addAppender(failedLogFilesAppender);
logger.setLevel(Level.ERROR);
logger.setAdditivity(false);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* Get the root logger file location under which the failed log messages will be written.
*
* Since this class is used in Hooks which run within JVMs of other components like Hive,
* we want to write the failed messages file under the same location as where logs from
* the host component are saved. This method attempts to get such a location from the
* root logger's appenders. It will work only if at least one of the appenders is a {@link FileAppender}
*
* @return directory under which host component's logs are stored.
*/
private String getRootLoggerDirectory() {
String rootLoggerDirectory = null;
org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger();
Enumeration allAppenders = rootLogger.getAllAppenders();
while (allAppenders.hasMoreElements()) {
Appender appender = (Appender) allAppenders.nextElement();
if (appender instanceof FileAppender) {
FileAppender fileAppender = (FileAppender) appender;
String rootLoggerFile = fileAppender.getFile();
rootLoggerDirectory = new File(rootLoggerFile).getParent();
break;
}
}
return rootLoggerDirectory;
}
void log(String message) {
logger.error(message);
}
}
......@@ -37,6 +37,7 @@ import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
......@@ -90,6 +91,10 @@ public class KafkaNotification extends AbstractNotification implements Service {
}
};
@VisibleForTesting
String getTopicName(NotificationType notificationType) {
return TOPIC_MAP.get(notificationType);
}
// ----- Constructors ----------------------------------------------------
......@@ -214,24 +219,36 @@ public class KafkaNotification extends AbstractNotification implements Service {
if (producer == null) {
createProducer();
}
sendInternalToProducer(producer, type, messages);
}
@VisibleForTesting
void sendInternalToProducer(Producer p, NotificationType type, String[] messages) throws NotificationException {
String topic = TOPIC_MAP.get(type);
List<Future<RecordMetadata>> futures = new ArrayList<>();
List<MessageContext> messageContexts = new ArrayList<>();
for (String message : messages) {
ProducerRecord record = new ProducerRecord(topic, message);
LOG.debug("Sending message for topic {}: {}", topic, message);
futures.add(producer.send(record));
Future future = p.send(record);
messageContexts.add(new MessageContext(future, message));
}
for (Future<RecordMetadata> future : futures) {
List<String> failedMessages = new ArrayList<>();
Exception lastFailureException = null;
for (MessageContext context : messageContexts) {
try {
RecordMetadata response = future.get();
RecordMetadata response = context.getFuture().get();
LOG.debug("Sent message for topic - {}, partition - {}, offset - {}", response.topic(),
response.partition(), response.offset());
} catch (Exception e) {
throw new NotificationException(e);
LOG.warn("Could not send message - {}", context.getMessage(), e);
lastFailureException = e;
failedMessages.add(context.getMessage());
}
}
if (lastFailureException != null) {
throw new NotificationException(lastFailureException, failedMessages);
}
}
// ----- helper methods --------------------------------------------------
......@@ -359,4 +376,23 @@ public class KafkaNotification extends AbstractNotification implements Service {
}
}
}
private class MessageContext {
private final Future<RecordMetadata> future;
private final String message;
public MessageContext(Future<RecordMetadata> future, String message) {
this.future = future;
this.message = message;
}
public Future<RecordMetadata> getFuture() {
return future;
}
public String getMessage() {
return message;
}
}
}
......@@ -19,11 +19,24 @@ package org.apache.atlas.notification;
import org.apache.atlas.AtlasException;
import java.util.List;
/**
* Exception from notification.
*/
public class NotificationException extends AtlasException {
private List<String> failedMessages;
public NotificationException(Exception e) {
super(e);
}
public NotificationException(Exception e, List<String> failedMessages) {
super(e);
this.failedMessages = failedMessages;
}
public List<String> getFailedMessages() {
return failedMessages;
}
}
......@@ -21,24 +21,101 @@ package org.apache.atlas.hook;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.hook.HookNotification;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
public class AtlasHookTest {
@Mock
private NotificationInterface notificationInterface;
@Mock
private FailedMessagesLogger failedMessagesLogger;
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Test (timeOut = 10000)
public void testNotifyEntitiesDoesNotHangOnException() throws Exception {
List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>();
doThrow(new NotificationException(new Exception())).when(notificationInterface)
.send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages);
AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 0, notificationInterface, false,
failedMessagesLogger);
// if we've reached here, the method finished OK.
}
@Test
public void testNotifyEntitiesRetriesOnException() throws NotificationException {
List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>();
doThrow(new NotificationException(new Exception())).when(notificationInterface)
.send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages);
AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, false,
failedMessagesLogger);
verify(notificationInterface, times(2)).
send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages);
}
@Test
public void testFailedMessageIsLoggedIfRequired() throws NotificationException {
List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>();
doThrow(new NotificationException(new Exception(), Arrays.asList("test message")))
.when(notificationInterface)
.send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages);
AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, true,
failedMessagesLogger);
verify(failedMessagesLogger, times(1)).log("test message");
}
@Test
public void testnotifyEntities() throws Exception{
public void testFailedMessageIsNotLoggedIfNotRequired() throws NotificationException {
List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>();
NotificationInterface notifInterface = mock(NotificationInterface.class);
doThrow(new NotificationException(new Exception())).when(notifInterface)
doThrow(new NotificationException(new Exception(), Arrays.asList("test message")))
.when(notificationInterface)
.send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages);
AtlasHook.notifInterface = notifInterface;
AtlasHook.notifyEntities(hookNotificationMessages, 2);
System.out.println("AtlasHook.notifyEntities() returns successfully");
AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, false,
failedMessagesLogger);
verifyZeroInteractions(failedMessagesLogger);
}
@Test
public void testAllFailedMessagesAreLogged() throws NotificationException {
List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>();
doThrow(new NotificationException(new Exception(), Arrays.asList("test message1", "test message2")))
.when(notificationInterface)
.send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages);
AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, true,
failedMessagesLogger);
verify(failedMessagesLogger, times(1)).log("test message1");
verify(failedMessagesLogger, times(1)).log("test message2");
}
@Test
public void testFailedMessageIsNotLoggedIfNotANotificationException() throws Exception {
List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>();
doThrow(new RuntimeException("test message")).when(notificationInterface)
.send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages);
AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, true,
failedMessagesLogger);
verifyZeroInteractions(failedMessagesLogger);
}
}
......@@ -22,7 +22,12 @@ import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import org.apache.atlas.notification.MessageDeserializer;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.testng.annotations.Test;
import java.util.ArrayList;
......@@ -30,6 +35,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
......@@ -39,6 +46,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
public class KafkaNotificationTest {
......@@ -77,6 +85,83 @@ public class KafkaNotificationTest {
assertTrue(consumers.contains(consumer2));
}
@Test
@SuppressWarnings("unchecked")
public void shouldSendMessagesSuccessfully() throws NotificationException,
ExecutionException, InterruptedException {
Properties configProperties = mock(Properties.class);
KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
Producer producer = mock(Producer.class);
String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
String message = "This is a test message";
Future returnValue = mock(Future.class);
when(returnValue.get()).thenReturn(new RecordMetadata(new TopicPartition(topicName, 0), 0, 0));
ProducerRecord expectedRecord = new ProducerRecord(topicName, message);
when(producer.send(expectedRecord)).thenReturn(returnValue);
kafkaNotification.sendInternalToProducer(producer,
NotificationInterface.NotificationType.HOOK, new String[]{message});
verify(producer).send(expectedRecord);
}
@Test
@SuppressWarnings("unchecked")
public void shouldThrowExceptionIfProducerFails() throws NotificationException,
ExecutionException, InterruptedException {
Properties configProperties = mock(Properties.class);
KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
Producer producer = mock(Producer.class);
String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
String message = "This is a test message";
Future returnValue = mock(Future.class);
when(returnValue.get()).thenThrow(new RuntimeException("Simulating exception"));
ProducerRecord expectedRecord = new ProducerRecord(topicName, message);
when(producer.send(expectedRecord)).thenReturn(returnValue);
try {
kafkaNotification.sendInternalToProducer(producer,
NotificationInterface.NotificationType.HOOK, new String[]{message});
fail("Should have thrown NotificationException");
} catch (NotificationException e) {
assertEquals(e.getFailedMessages().size(), 1);
assertEquals(e.getFailedMessages().get(0), "This is a test message");
}
}
@Test
@SuppressWarnings("unchecked")
public void shouldCollectAllFailedMessagesIfProducerFails() throws NotificationException,
ExecutionException, InterruptedException {
Properties configProperties = mock(Properties.class);
KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
Producer producer = mock(Producer.class);
String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
String message1 = "This is a test message1";
String message2 = "This is a test message2";
Future returnValue1 = mock(Future.class);
when(returnValue1.get()).thenThrow(new RuntimeException("Simulating exception"));
Future returnValue2 = mock(Future.class);
when(returnValue2.get()).thenThrow(new RuntimeException("Simulating exception"));
ProducerRecord expectedRecord1 = new ProducerRecord(topicName, message1);
when(producer.send(expectedRecord1)).thenReturn(returnValue1);
ProducerRecord expectedRecord2 = new ProducerRecord(topicName, message2);
when(producer.send(expectedRecord2)).thenReturn(returnValue1);
try {
kafkaNotification.sendInternalToProducer(producer,
NotificationInterface.NotificationType.HOOK, new String[]{message1, message2});
fail("Should have thrown NotificationException");
} catch (NotificationException e) {
assertEquals(e.getFailedMessages().size(), 2);
assertEquals(e.getFailedMessages().get(0), "This is a test message1");
assertEquals(e.getFailedMessages().get(1), "This is a test message2");
}
}
class TestKafkaNotification extends KafkaNotification {
private final ConsumerConnector consumerConnector;
......
......@@ -23,6 +23,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
ATLAS-901 Log messages that cannot be sent to Kafka to a specific log configuration (yhemanth)
ATLAS-911 Get entity by unique attribute doesn't enforce type (shwethags)
ATLAS-899 Fix Hive Hook documentation (sumasai via yhemanth)
ATLAS-890 Log received messages in case of error (sumasai via yhemanth)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment