Commit 1907f640 by Andras Katona Committed by nixonrodrigues

ATLAS-4063: EmbeddedKafkaServer simplification

parent 321c6b83
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
*/ */
package org.apache.atlas.kafka; package org.apache.atlas.kafka;
import kafka.metrics.KafkaMetricsReporter;
import kafka.server.KafkaConfig; import kafka.server.KafkaConfig;
import kafka.server.KafkaServer; import kafka.server.KafkaServer;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
...@@ -35,7 +34,7 @@ import org.slf4j.LoggerFactory; ...@@ -35,7 +34,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import scala.Option; import scala.Option;
import scala.collection.mutable.Buffer; import scala.collection.mutable.ArrayBuffer;
import javax.inject.Inject; import javax.inject.Inject;
import java.io.File; import java.io.File;
...@@ -45,7 +44,6 @@ import java.net.MalformedURLException; ...@@ -45,7 +44,6 @@ import java.net.MalformedURLException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.URL; import java.net.URL;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit;
@Component @Component
...@@ -138,10 +136,7 @@ public class EmbeddedKafkaServer implements Service { ...@@ -138,10 +136,7 @@ public class EmbeddedKafkaServer implements Service {
brokerConfig.setProperty("log.dirs", constructDir("kafka").getAbsolutePath()); brokerConfig.setProperty("log.dirs", constructDir("kafka").getAbsolutePath());
brokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1)); brokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1));
List<KafkaMetricsReporter> metrics = new ArrayList<>(); kafkaServer = new KafkaServer(KafkaConfig.fromProps(brokerConfig), Time.SYSTEM, Option.apply(this.getClass().getName()), new ArrayBuffer<>());
Buffer<KafkaMetricsReporter> metricsReporters = scala.collection.JavaConversions.asScalaBuffer(metrics);
kafkaServer = new KafkaServer(KafkaConfig.fromProps(brokerConfig), new SystemTime(), Option.apply(this.getClass().getName()), metricsReporters);
kafkaServer.startup(); kafkaServer.startup();
...@@ -165,32 +160,4 @@ public class EmbeddedKafkaServer implements Service { ...@@ -165,32 +160,4 @@ public class EmbeddedKafkaServer implements Service {
return new URL("http://" + url); return new URL("http://" + url);
} }
} }
// ----- inner class : SystemTime ----------------------------------------
private static class SystemTime implements Time {
@Override
public long milliseconds() {
return System.currentTimeMillis();
}
@Override
public long nanoseconds() {
return System.nanoTime();
}
@Override
public long hiResClockMs() {
return TimeUnit.NANOSECONDS.toMillis(nanoseconds());
}
@Override
public void sleep(long arg0) {
try {
Thread.sleep(arg0);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment