Commit 60789da3 by Madhan Neethiraj

ATLAS-2495: updated Kafka version to 1.0.0

parent ac2c1a77
......@@ -79,6 +79,7 @@ atlas.kafka.entities.group.id=atlas_entities
atlas.kafka.enable.auto.commit=false
atlas.kafka.auto.offset.reset=earliest
atlas.kafka.session.timeout.ms=30000
atlas.kafka.offsets.topic.replication.factor=1
......
......@@ -79,6 +79,7 @@ atlas.kafka.entities.group.id=atlas_entities
atlas.kafka.enable.auto.commit=false
atlas.kafka.auto.offset.reset=earliest
atlas.kafka.session.timeout.ms=30000
atlas.kafka.offsets.topic.replication.factor=1
......
......@@ -79,6 +79,7 @@ atlas.kafka.entities.group.id=atlas_entities
atlas.kafka.enable.auto.commit=false
atlas.kafka.auto.offset.reset=earliest
atlas.kafka.session.timeout.ms=30000
atlas.kafka.offsets.topic.replication.factor=1
......
......@@ -79,6 +79,7 @@ atlas.kafka.entities.group.id=atlas_entities
atlas.kafka.enable.auto.commit=false
atlas.kafka.auto.offset.reset=earliest
atlas.kafka.session.timeout.ms=30000
atlas.kafka.offsets.topic.replication.factor=1
......
......@@ -81,6 +81,7 @@ atlas.kafka.entities.group.id=atlas_entities
atlas.kafka.enable.auto.commit=false
atlas.kafka.auto.offset.reset=earliest
atlas.kafka.session.timeout.ms=30000
atlas.kafka.offsets.topic.replication.factor=1
......
......@@ -97,6 +97,7 @@ atlas.kafka.entities.group.id=atlas_entities
atlas.kafka.enable.auto.commit=false
atlas.kafka.auto.offset.reset=earliest
atlas.kafka.session.timeout.ms=30000
atlas.kafka.offsets.topic.replication.factor=1
......
......@@ -74,6 +74,7 @@ atlas.kafka.consumer.timeout.ms=100
atlas.kafka.auto.commit.interval.ms=100
atlas.kafka.hook.group.id=atlas
atlas.kafka.entities.group.id=atlas_entities
atlas.kafka.offsets.topic.replication.factor=1
######### Entity Audit Configs #########
atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS
......
......@@ -75,6 +75,7 @@ atlas.kafka.consumer.timeout.ms=100
atlas.kafka.auto.commit.interval.ms=100
atlas.kafka.hook.group.id=atlas
atlas.kafka.entities.group.id=atlas_entities
atlas.kafka.offsets.topic.replication.factor=1
######### Entity Audit Configs #########
atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS
......
......@@ -97,6 +97,7 @@ atlas.kafka.entities.group.id=atlas_entities
atlas.kafka.enable.auto.commit=false
atlas.kafka.auto.offset.reset=earliest
atlas.kafka.session.timeout.ms=30000
atlas.kafka.offsets.topic.replication.factor=1
......
......@@ -17,15 +17,16 @@
*/
package org.apache.atlas.kafka;
import kafka.metrics.KafkaMetricsReporter;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.Time;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.utils.Time;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
......@@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import scala.Option;
import scala.collection.mutable.Buffer;
import javax.inject.Inject;
import java.io.File;
......@@ -43,6 +45,7 @@ import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Component
......@@ -135,7 +138,10 @@ public class EmbeddedKafkaServer implements Service {
brokerConfig.setProperty("log.dirs", constructDir("kafka").getAbsolutePath());
brokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1));
kafkaServer = new KafkaServer(KafkaConfig.fromProps(brokerConfig), new SystemTime(), Option.apply(this.getClass().getName()));
List<KafkaMetricsReporter> metrics = new ArrayList<>();
Buffer<KafkaMetricsReporter> metricsReporters = scala.collection.JavaConversions.asScalaBuffer(metrics);
kafkaServer = new KafkaServer(KafkaConfig.fromProps(brokerConfig), new SystemTime(), Option.apply(this.getClass().getName()), metricsReporters);
kafkaServer.startup();
......@@ -174,6 +180,11 @@ public class EmbeddedKafkaServer implements Service {
}
@Override
public long hiResClockMs() {
return TimeUnit.NANOSECONDS.toMillis(nanoseconds());
}
@Override
public void sleep(long arg0) {
try {
Thread.sleep(arg0);
......
......@@ -77,7 +77,8 @@ public class KafkaNotificationMockTest {
String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
String message = "This is a test message";
Future returnValue = mock(Future.class);
when(returnValue.get()).thenReturn(new RecordMetadata(new TopicPartition(topicName, 0), 0, 0));
TopicPartition topicPartition = new TopicPartition(topicName, 0);
when(returnValue.get()).thenReturn(new RecordMetadata(topicPartition, 0, 0, 0, Long.valueOf(0), 0, 0));
ProducerRecord expectedRecord = new ProducerRecord(topicName, message);
when(producer.send(expectedRecord)).thenReturn(returnValue);
......
......@@ -546,7 +546,7 @@
<hadoop.version>2.7.1</hadoop.version>
<hbase.version>1.1.2</hbase.version>
<solr.version>5.5.1</solr.version>
<kafka.version>0.10.0.0</kafka.version>
<kafka.version>1.0.0</kafka.version>
<kafka.scala.binary.version>2.11</kafka.scala.binary.version>
<curator.version>2.11.0</curator.version>
<zookeeper.version>3.4.6</zookeeper.version>
......
......@@ -36,7 +36,6 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.v1.model.instance.Id;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct;
......@@ -57,7 +56,6 @@ import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
......
......@@ -80,6 +80,7 @@ atlas.kafka.entities.group.id=atlas_entities
atlas.kafka.enable.auto.commit=false
atlas.kafka.auto.offset.reset=earliest
atlas.kafka.session.timeout.ms=30000
atlas.kafka.offsets.topic.replication.factor=1
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment