diff --git a/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java index 235b7ce..b793b9a 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java +++ b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java @@ -17,7 +17,6 @@ */ package org.apache.atlas.kafka; -import kafka.metrics.KafkaMetricsReporter; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; import org.apache.atlas.ApplicationProperties; @@ -35,7 +34,7 @@ import org.slf4j.LoggerFactory; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import scala.Option; -import scala.collection.mutable.Buffer; +import scala.collection.mutable.ArrayBuffer; import javax.inject.Inject; import java.io.File; @@ -45,7 +44,6 @@ import java.net.MalformedURLException; import java.net.URISyntaxException; import java.net.URL; import java.util.*; -import java.util.concurrent.TimeUnit; @Component @@ -138,10 +136,7 @@ public class EmbeddedKafkaServer implements Service { brokerConfig.setProperty("log.dirs", constructDir("kafka").getAbsolutePath()); brokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1)); - List<KafkaMetricsReporter> metrics = new ArrayList<>(); - Buffer<KafkaMetricsReporter> metricsReporters = scala.collection.JavaConversions.asScalaBuffer(metrics); - - kafkaServer = new KafkaServer(KafkaConfig.fromProps(brokerConfig), new SystemTime(), Option.apply(this.getClass().getName()), metricsReporters); + kafkaServer = new KafkaServer(KafkaConfig.fromProps(brokerConfig), Time.SYSTEM, Option.apply(this.getClass().getName()), new ArrayBuffer<>()); kafkaServer.startup(); @@ -165,32 +160,4 @@ public class EmbeddedKafkaServer implements Service { return new URL("http://" + url); } } - - - // ----- inner class : SystemTime ---------------------------------------- - private static class SystemTime implements Time { - @Override - public long milliseconds() { - return System.currentTimeMillis(); - } - - @Override - public long nanoseconds() { - return System.nanoTime(); - } - - @Override - public long hiResClockMs() { - return TimeUnit.NANOSECONDS.toMillis(nanoseconds()); - } - - @Override - public void sleep(long arg0) { - try { - Thread.sleep(arg0); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - } }