Commit 0d8f9f8d by nixonrodrigues Committed by Madhan Neethiraj

ATLAS-1908: updated to use existing Kafka consumer properties when equivalent…

ATLAS-1908: updated to use existing Kafka consumer properties when equivalent new Kafka consumer properties are not present Signed-off-by: 's avatarMadhan Neethiraj <madhan@apache.org>
parent de916084
...@@ -79,7 +79,7 @@ atlas.kafka.hook.group.id=atlas ...@@ -79,7 +79,7 @@ atlas.kafka.hook.group.id=atlas
atlas.kafka.enable.auto.commit=false atlas.kafka.enable.auto.commit=false
atlas.kafka.auto.offset.reset=earliest atlas.kafka.auto.offset.reset=earliest
atlas.kafka.session.timeout.ms=30000 atlas.kafka.session.timeout.ms=30000
atlas.kafka.poll.timeout.ms=1000
atlas.notification.create.topics=true atlas.notification.create.topics=true
atlas.notification.replicas=1 atlas.notification.replicas=1
......
...@@ -41,19 +41,20 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> { ...@@ -41,19 +41,20 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
private static final Logger LOG = LoggerFactory.getLogger(AtlasKafkaConsumer.class); private static final Logger LOG = LoggerFactory.getLogger(AtlasKafkaConsumer.class);
private final KafkaConsumer kafkaConsumer; private final KafkaConsumer kafkaConsumer;
private final boolean autoCommitEnabled; private final boolean autoCommitEnabled;
private long pollTimeoutMilliSeconds = 1000L;
public AtlasKafkaConsumer(MessageDeserializer<T> deserializer, KafkaConsumer kafkaConsumer, boolean autoCommitEnabled) { public AtlasKafkaConsumer(MessageDeserializer<T> deserializer, KafkaConsumer kafkaConsumer, boolean autoCommitEnabled, long pollTimeoutMilliSeconds) {
super(deserializer); super(deserializer);
this.kafkaConsumer = kafkaConsumer;
this.kafkaConsumer = kafkaConsumer;
this.autoCommitEnabled = autoCommitEnabled; this.autoCommitEnabled = autoCommitEnabled;
this.pollTimeoutMilliSeconds = pollTimeoutMilliSeconds;
} }
public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) { public List<AtlasKafkaMessage<T>> receive() {
List<AtlasKafkaMessage<T>> messages = new ArrayList(); List<AtlasKafkaMessage<T>> messages = new ArrayList();
ConsumerRecords<?, ?> records = kafkaConsumer.poll(timeoutMilliSeconds); ConsumerRecords<?, ?> records = kafkaConsumer.poll(pollTimeoutMilliSeconds);
if (records != null) { if (records != null) {
for (ConsumerRecord<?, ?> record : records) { for (ConsumerRecord<?, ?> record : records) {
......
...@@ -83,6 +83,7 @@ public class KafkaNotification extends AbstractNotification implements Service { ...@@ -83,6 +83,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
private Properties properties; private Properties properties;
private KafkaConsumer consumer = null; private KafkaConsumer consumer = null;
private KafkaProducer producer = null; private KafkaProducer producer = null;
private Long pollTimeOutMs = 1000L;
private static final Map<NotificationType, String> TOPIC_MAP = new HashMap<NotificationType, String>() { private static final Map<NotificationType, String> TOPIC_MAP = new HashMap<NotificationType, String>() {
{ {
...@@ -124,6 +125,13 @@ public class KafkaNotification extends AbstractNotification implements Service { ...@@ -124,6 +125,13 @@ public class KafkaNotification extends AbstractNotification implements Service {
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer"); "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
pollTimeOutMs = subsetConfiguration.getLong("poll.timeout.ms", 1000);
boolean oldApiCommitEnbleFlag = subsetConfiguration.getBoolean("auto.commit.enable",false);
//set old autocommit value if new autoCommit property is not set.
properties.put("enable.auto.commit", subsetConfiguration.getBoolean("enable.auto.commit", oldApiCommitEnbleFlag));
properties.put("session.timeout.ms", subsetConfiguration.getString("session.timeout.ms", "30000"));
} }
@VisibleForTesting @VisibleForTesting
...@@ -167,7 +175,7 @@ public class KafkaNotification extends AbstractNotification implements Service { ...@@ -167,7 +175,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType,
int numConsumers) { int numConsumers) {
return createConsumers(notificationType, numConsumers, return createConsumers(notificationType, numConsumers,
Boolean.valueOf(properties.getProperty("enable.auto.commit", "true"))); Boolean.valueOf(properties.getProperty("enable.auto.commit", properties.getProperty("auto.commit.enable","false"))));
} }
@VisibleForTesting @VisibleForTesting
...@@ -177,7 +185,7 @@ public class KafkaNotification extends AbstractNotification implements Service { ...@@ -177,7 +185,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
Properties consumerProperties = getConsumerProperties(notificationType); Properties consumerProperties = getConsumerProperties(notificationType);
List<NotificationConsumer<T>> consumers = new ArrayList<>(); List<NotificationConsumer<T>> consumers = new ArrayList<>();
AtlasKafkaConsumer kafkaConsumer = new AtlasKafkaConsumer(notificationType.getDeserializer(), getKafkaConsumer(consumerProperties,notificationType, autoCommitEnabled), autoCommitEnabled); AtlasKafkaConsumer kafkaConsumer = new AtlasKafkaConsumer(notificationType.getDeserializer(), getKafkaConsumer(consumerProperties,notificationType, autoCommitEnabled), autoCommitEnabled, pollTimeOutMs );
consumers.add(kafkaConsumer); consumers.add(kafkaConsumer);
return consumers; return consumers;
} }
......
...@@ -41,8 +41,7 @@ public interface NotificationConsumer<T> { ...@@ -41,8 +41,7 @@ public interface NotificationConsumer<T> {
/** /**
* Fetch data for the topics from Kafka * Fetch data for the topics from Kafka
* @param timeoutMilliSeconds poll timeout
* @return List containing kafka message and partionId and offset. * @return List containing kafka message and partionId and offset.
*/ */
List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds); List<AtlasKafkaMessage<T>> receive();
} }
...@@ -95,12 +95,12 @@ public class KafkaConsumerTest { ...@@ -95,12 +95,12 @@ public class KafkaConsumerTest {
ConsumerRecords records = new ConsumerRecords(mp); ConsumerRecords records = new ConsumerRecords(mp);
when(kafkaConsumer.poll(1000)).thenReturn(records); when(kafkaConsumer.poll(100)).thenReturn(records);
when(messageAndMetadata.message()).thenReturn(json); when(messageAndMetadata.message()).thenReturn(json);
AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer,false); AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, false, 100L);
List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive(1000); List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive();
assertTrue(messageList.size() > 0); assertTrue(messageList.size() > 0);
HookNotification.HookNotificationMessage consumedMessage = messageList.get(0).getMessage(); HookNotification.HookNotificationMessage consumedMessage = messageList.get(0).getMessage();
...@@ -131,12 +131,12 @@ public class KafkaConsumerTest { ...@@ -131,12 +131,12 @@ public class KafkaConsumerTest {
mp.put(tp,klist); mp.put(tp,klist);
ConsumerRecords records = new ConsumerRecords(mp); ConsumerRecords records = new ConsumerRecords(mp);
when(kafkaConsumer.poll(1000)).thenReturn(records); when(kafkaConsumer.poll(100L)).thenReturn(records);
when(messageAndMetadata.message()).thenReturn(json); when(messageAndMetadata.message()).thenReturn(json);
AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer ,false); AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer ,false, 100L);
try { try {
List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive(1000); List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive();
assertTrue(messageList.size() > 0); assertTrue(messageList.size() > 0);
HookNotification.HookNotificationMessage consumedMessage = messageList.get(0).getMessage(); HookNotification.HookNotificationMessage consumedMessage = messageList.get(0).getMessage();
...@@ -154,7 +154,7 @@ public class KafkaConsumerTest { ...@@ -154,7 +154,7 @@ public class KafkaConsumerTest {
TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, false); AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, false, 100L);
consumer.commit(tp, 1); consumer.commit(tp, 1);
...@@ -166,7 +166,7 @@ public class KafkaConsumerTest { ...@@ -166,7 +166,7 @@ public class KafkaConsumerTest {
TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, true); AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, true , 100L);
consumer.commit(tp, 1); consumer.commit(tp, 1);
......
...@@ -70,7 +70,7 @@ public class KafkaNotificationTest { ...@@ -70,7 +70,7 @@ public class KafkaNotificationTest {
List<AtlasKafkaMessage<Object>> messages = null ; List<AtlasKafkaMessage<Object>> messages = null ;
long startTime = System.currentTimeMillis(); //fetch starting time long startTime = System.currentTimeMillis(); //fetch starting time
while ((System.currentTimeMillis() - startTime) < 10000) { while ((System.currentTimeMillis() - startTime) < 10000) {
messages = consumer.receive(1000L); messages = consumer.receive();
if (messages.size() > 0) { if (messages.size() > 0) {
break; break;
} }
......
...@@ -67,7 +67,7 @@ public class AbstractNotificationConsumerTest { ...@@ -67,7 +67,7 @@ public class AbstractNotificationConsumerTest {
NotificationConsumer<TestMessage> consumer = NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger); new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(1000L); List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
assertFalse(messageList.isEmpty()); assertFalse(messageList.isEmpty());
...@@ -106,7 +106,7 @@ public class AbstractNotificationConsumerTest { ...@@ -106,7 +106,7 @@ public class AbstractNotificationConsumerTest {
NotificationConsumer<TestMessage> consumer = NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger); new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(1000L); List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
assertEquals(new TestMessage("sValue1", 99), messageList.get(0).getMessage()); assertEquals(new TestMessage("sValue1", 99), messageList.get(0).getMessage());
...@@ -138,7 +138,7 @@ public class AbstractNotificationConsumerTest { ...@@ -138,7 +138,7 @@ public class AbstractNotificationConsumerTest {
NotificationConsumer<TestMessage> consumer = NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger); new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
try { try {
List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(1000L); List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
messageList.get(1).getMessage(); messageList.get(1).getMessage();
...@@ -203,7 +203,7 @@ public class AbstractNotificationConsumerTest { ...@@ -203,7 +203,7 @@ public class AbstractNotificationConsumerTest {
} }
@Override @Override
public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) { public List<AtlasKafkaMessage<T>> receive() {
List<AtlasKafkaMessage<T>> tempMessageList = new ArrayList(); List<AtlasKafkaMessage<T>> tempMessageList = new ArrayList();
for(Object json : messageList) { for(Object json : messageList) {
tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String)json), -1, -1)); tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String)json), -1, -1));
......
...@@ -224,7 +224,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -224,7 +224,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
while (shouldRun.get()) { while (shouldRun.get()) {
try { try {
List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive(1000L); List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive();
for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) { for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) {
handleMessage(msg); handleMessage(msg);
} }
......
...@@ -155,7 +155,7 @@ public class NotificationHookConsumerKafkaTest { ...@@ -155,7 +155,7 @@ public class NotificationHookConsumerKafkaTest {
try { try {
long startTime = System.currentTimeMillis(); //fetch starting time long startTime = System.currentTimeMillis(); //fetch starting time
while ((System.currentTimeMillis() - startTime) < 10000) { while ((System.currentTimeMillis() - startTime) < 10000) {
List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive(1000L); List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive();
for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) { for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) {
hookConsumer.handleMessage(msg); hookConsumer.handleMessage(msg);
......
...@@ -637,7 +637,7 @@ public abstract class BaseResourceIT { ...@@ -637,7 +637,7 @@ public abstract class BaseResourceIT {
try { try {
while (System.currentTimeMillis() < maxCurrentTime) { while (System.currentTimeMillis() < maxCurrentTime) {
List<AtlasKafkaMessage<EntityNotification>> messageList = consumer.receive(1000); List<AtlasKafkaMessage<EntityNotification>> messageList = consumer.receive();
if(messageList.size() > 0) { if(messageList.size() > 0) {
EntityNotification notification = messageList.get(0).getMessage(); EntityNotification notification = messageList.get(0).getMessage();
if (predicate.evaluate(notification)) { if (predicate.evaluate(notification)) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment