Commit bcabde9b by nixonrodrigues Committed by Madhan Neethiraj

ATLAS-1908: updated NotificationConsumer with a method removed earlier to avoid…

ATLAS-1908: updated NotificationConsumer with a method removed earlier to avoid breaking existing usage (like in Ranger) Change-Id: Ib8a7f338da7fd0f710fc683da87871e3d9c32035 Signed-off-by: 's avatarMadhan Neethiraj <madhan@apache.org>
parent eddab3b1
...@@ -52,9 +52,15 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> { ...@@ -52,9 +52,15 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
} }
public List<AtlasKafkaMessage<T>> receive() { public List<AtlasKafkaMessage<T>> receive() {
return this.receive(this.pollTimeoutMilliSeconds);
}
@Override
public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
List<AtlasKafkaMessage<T>> messages = new ArrayList(); List<AtlasKafkaMessage<T>> messages = new ArrayList();
ConsumerRecords<?, ?> records = kafkaConsumer.poll(pollTimeoutMilliSeconds); ConsumerRecords<?, ?> records = kafkaConsumer.poll(timeoutMilliSeconds);
if (records != null) { if (records != null) {
for (ConsumerRecord<?, ?> record : records) { for (ConsumerRecord<?, ?> record : records) {
...@@ -70,6 +76,7 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> { ...@@ -70,6 +76,7 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
} }
return messages; return messages;
} }
......
...@@ -44,4 +44,13 @@ public interface NotificationConsumer<T> { ...@@ -44,4 +44,13 @@ public interface NotificationConsumer<T> {
* @return List containing kafka message and partionId and offset. * @return List containing kafka message and partionId and offset.
*/ */
List<AtlasKafkaMessage<T>> receive(); List<AtlasKafkaMessage<T>> receive();
/**
* Fetch data for the topics from Kafka
* @param timeoutMilliSeconds poll timeout
* @return List containing kafka message and partionId and offset.
*/
List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds);
} }
...@@ -204,6 +204,11 @@ public class AbstractNotificationConsumerTest { ...@@ -204,6 +204,11 @@ public class AbstractNotificationConsumerTest {
@Override @Override
public List<AtlasKafkaMessage<T>> receive() { public List<AtlasKafkaMessage<T>> receive() {
return receive(1000L);
}
@Override
public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
List<AtlasKafkaMessage<T>> tempMessageList = new ArrayList(); List<AtlasKafkaMessage<T>> tempMessageList = new ArrayList();
for(Object json : messageList) { for(Object json : messageList) {
tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String)json), -1, -1)); tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String)json), -1, -1));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment