Commit cbc34271 by Deep Singh Committed by Sarath Subramanian

ATLAS-4043: Added option to list internal kafka topic as well through Kafka AdminClient API

parent 5037871a
...@@ -28,6 +28,7 @@ import org.apache.kafka.clients.admin.DescribeTopicsResult; ...@@ -28,6 +28,7 @@ import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.TopicExistsException;
...@@ -57,6 +58,7 @@ public class KafkaUtils implements AutoCloseable { ...@@ -57,6 +58,7 @@ public class KafkaUtils implements AutoCloseable {
private static final String JAAS_PRINCIPAL_PROP = "principal"; private static final String JAAS_PRINCIPAL_PROP = "principal";
private static final String JAAS_DEFAULT_CLIENT_NAME = "KafkaClient"; private static final String JAAS_DEFAULT_CLIENT_NAME = "KafkaClient";
private static final String JAAS_TICKET_BASED_CLIENT_NAME = "ticketBased-KafkaClient"; private static final String JAAS_TICKET_BASED_CLIENT_NAME = "ticketBased-KafkaClient";
private static final String IMPORT_INTERNAL_TOPICS = "atlas.hook.kafka.import.internal.topics";
public static final String ATLAS_KAFKA_PROPERTY_PREFIX = "atlas.kafka"; public static final String ATLAS_KAFKA_PROPERTY_PREFIX = "atlas.kafka";
...@@ -64,6 +66,8 @@ public class KafkaUtils implements AutoCloseable { ...@@ -64,6 +66,8 @@ public class KafkaUtils implements AutoCloseable {
final protected AdminClient adminClient; final protected AdminClient adminClient;
final protected boolean importInternalTopics;
public KafkaUtils(Configuration atlasConfiguration) { public KafkaUtils(Configuration atlasConfiguration) {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("==> KafkaUtils() "); LOG.debug("==> KafkaUtils() ");
...@@ -72,6 +76,7 @@ public class KafkaUtils implements AutoCloseable { ...@@ -72,6 +76,7 @@ public class KafkaUtils implements AutoCloseable {
setKafkaJAASProperties(atlasConfiguration, kafkaConfiguration); setKafkaJAASProperties(atlasConfiguration, kafkaConfiguration);
adminClient = AdminClient.create(this.kafkaConfiguration); adminClient = AdminClient.create(this.kafkaConfiguration);
importInternalTopics = atlasConfiguration.getBoolean(IMPORT_INTERNAL_TOPICS, false);
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("<== KafkaUtils() "); LOG.debug("<== KafkaUtils() ");
...@@ -106,7 +111,7 @@ public class KafkaUtils implements AutoCloseable { ...@@ -106,7 +111,7 @@ public class KafkaUtils implements AutoCloseable {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("==> KafkaUtils.listAllTopics() "); LOG.debug("==> KafkaUtils.listAllTopics() ");
} }
ListTopicsResult listTopicsResult = adminClient.listTopics(); ListTopicsResult listTopicsResult = adminClient.listTopics((new ListTopicsOptions()).listInternal(importInternalTopics));
List<String> topicNameList = new ArrayList<>(listTopicsResult.names().get()); List<String> topicNameList = new ArrayList<>(listTopicsResult.names().get());
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment