Commit ba2b1449 by Madhan Neethiraj

ATLAS-2947: name of Kafka topics used by Atlas made configurable

parent fb7e9eaf
......@@ -468,7 +468,7 @@ def get_topics_to_create(confdir):
if topic_list is not None:
topics = topic_list.split(",")
else:
topics = ["ATLAS_HOOK", "ATLAS_ENTITIES"]
topics = [getConfigWithDefault("atlas.notification.hook.topic.name", "ATLAS_HOOK"), getConfigWithDefault("atlas.notification.entities.topic.name", "ATLAS_ENTITIES")]
return topics
def get_atlas_url_port(confdir):
......
......@@ -33,6 +33,9 @@ public enum AtlasConfiguration {
QUERY_PARAM_MAX_LENGTH("atlas.query.param.max.length", 4*1024),
NOTIFICATION_HOOK_TOPIC_NAME("atlas.notification.hook.topic.name", "ATLAS_HOOK"),
NOTIFICATION_ENTITIES_TOPIC_NAME("atlas.notification.entities.topic.name", "ATLAS_ENTITIES"),
NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES("atlas.notification.message.max.length.bytes", (1000 * 1000)),
NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled", true),
NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds", 15 * 60),
......
......@@ -19,6 +19,7 @@ package org.apache.atlas.kafka;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.NotificationConsumer;
......@@ -52,8 +53,8 @@ public class KafkaNotification extends AbstractNotification implements Service {
public static final Logger LOG = LoggerFactory.getLogger(KafkaNotification.class);
public static final String PROPERTY_PREFIX = "atlas.kafka";
public static final String ATLAS_HOOK_TOPIC = "ATLAS_HOOK";
public static final String ATLAS_ENTITIES_TOPIC = "ATLAS_ENTITIES";
public static final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
public static final String ATLAS_ENTITIES_TOPIC = AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString();
protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";
private static final String DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE = "This consumer has already been closed.";
......
......@@ -19,6 +19,7 @@
package org.apache.atlas.hook;
import kafka.utils.ZkUtils;
import org.apache.atlas.AtlasConfiguration;
import org.apache.commons.configuration.Configuration;
import org.testng.annotations.Test;
......@@ -34,6 +35,9 @@ import static org.testng.Assert.assertTrue;
public class AtlasTopicCreatorTest {
private final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
private final String ATLAS_ENTITIES_TOPIC = AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString();
@Test
public void shouldNotCreateAtlasTopicIfNotConfiguredToDoSo() {
......@@ -49,7 +53,7 @@ public class AtlasTopicCreatorTest {
return false;
}
};
atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK");
atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
assertFalse(topicExistsCalled[0]);
}
......@@ -80,7 +84,7 @@ public class AtlasTopicCreatorTest {
createTopicCalled[0] = true;
}
};
atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK");
atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
assertTrue(topicExistsCalled[0]);
assertFalse(createTopicCalled[0]);
}
......@@ -111,7 +115,7 @@ public class AtlasTopicCreatorTest {
createdTopic[0] = true;
}
};
atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK");
atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
assertTrue(createdTopic[0]);
}
......@@ -141,7 +145,7 @@ public class AtlasTopicCreatorTest {
throw new RuntimeException("Simulating failure during creating topic");
}
};
atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK");
atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
assertTrue(createTopicCalled[0]);
}
......@@ -154,8 +158,8 @@ public class AtlasTopicCreatorTest {
final ZkUtils zookeeperUtils = mock(ZkUtils.class);
final Map<String, Boolean> createdTopics = new HashMap<>();
createdTopics.put("ATLAS_HOOK", false);
createdTopics.put("ATLAS_ENTITIES", false);
createdTopics.put(ATLAS_HOOK_TOPIC, false);
createdTopics.put(ATLAS_ENTITIES_TOPIC, false);
AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
......@@ -174,9 +178,9 @@ public class AtlasTopicCreatorTest {
createdTopics.put(topicName, true);
}
};
atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", "ATLAS_ENTITIES");
assertTrue(createdTopics.get("ATLAS_HOOK"));
assertTrue(createdTopics.get("ATLAS_ENTITIES"));
atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC, ATLAS_ENTITIES_TOPIC);
assertTrue(createdTopics.get(ATLAS_HOOK_TOPIC));
assertTrue(createdTopics.get(ATLAS_ENTITIES_TOPIC));
}
@Test
......@@ -188,7 +192,7 @@ public class AtlasTopicCreatorTest {
final ZkUtils zookeeperUtils = mock(ZkUtils.class);
final Map<String, Boolean> createdTopics = new HashMap<>();
createdTopics.put("ATLAS_ENTITIES", false);
createdTopics.put(ATLAS_ENTITIES_TOPIC, false);
AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
......@@ -204,15 +208,15 @@ public class AtlasTopicCreatorTest {
@Override
protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) {
if (topicName.equals("ATLAS_HOOK")) {
if (topicName.equals(ATLAS_HOOK_TOPIC)) {
throw new RuntimeException("Simulating failure when creating ATLAS_HOOK topic");
} else {
createdTopics.put(topicName, true);
}
}
};
atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", "ATLAS_ENTITIES");
assertTrue(createdTopics.get("ATLAS_ENTITIES"));
atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC, ATLAS_ENTITIES_TOPIC);
assertTrue(createdTopics.get(ATLAS_ENTITIES_TOPIC));
}
@Test
......@@ -238,7 +242,7 @@ public class AtlasTopicCreatorTest {
protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) {
}
};
atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", "ATLAS_ENTITIES");
atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC, ATLAS_ENTITIES_TOPIC);
verify(zookeeperUtils, times(1)).close();
}
......@@ -250,8 +254,8 @@ public class AtlasTopicCreatorTest {
thenReturn(true);
final ZkUtils zookeeperUtils = mock(ZkUtils.class);
final Map<String, Boolean> createdTopics = new HashMap<>();
createdTopics.put("ATLAS_HOOK", false);
createdTopics.put("ATLAS_ENTITIES", false);
createdTopics.put(ATLAS_HOOK_TOPIC, false);
createdTopics.put(ATLAS_ENTITIES_TOPIC, false);
AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
@Override
......@@ -274,8 +278,8 @@ public class AtlasTopicCreatorTest {
return false;
}
};
atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", "ATLAS_ENTITIES");
assertFalse(createdTopics.get("ATLAS_HOOK"));
assertFalse(createdTopics.get("ATLAS_ENTITIES"));
atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC, ATLAS_ENTITIES_TOPIC);
assertFalse(createdTopics.get(ATLAS_HOOK_TOPIC));
assertFalse(createdTopics.get(ATLAS_ENTITIES_TOPIC));
}
}
......@@ -18,6 +18,7 @@
package org.apache.atlas.kafka;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct;
......@@ -53,11 +54,12 @@ import static org.testng.Assert.*;
public class KafkaConsumerTest {
private static final String TRAIT_NAME = "MyTrait";
private final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
@Mock
private KafkaConsumer kafkaConsumer;
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
......@@ -68,8 +70,8 @@ public class KafkaConsumerTest {
Referenceable entity = getEntity(TRAIT_NAME);
EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
String json = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), message));
TopicPartition tp = new TopicPartition("ATLAS_HOOK", 0);
List<ConsumerRecord<String, String>> klist = Collections.singletonList(new ConsumerRecord<>("ATLAS_HOOK", 0, 0L, "mykey", json));
TopicPartition tp = new TopicPartition(ATLAS_HOOK_TOPIC, 0);
List<ConsumerRecord<String, String>> klist = Collections.singletonList(new ConsumerRecord<>(ATLAS_HOOK_TOPIC, 0, 0L, "mykey", json));
Map mp = Collections.singletonMap(tp, klist);
ConsumerRecords records = new ConsumerRecords(mp);
......@@ -92,8 +94,8 @@ public class KafkaConsumerTest {
Referenceable entity = getEntity(TRAIT_NAME);
EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
String json = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), message));
TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
List<ConsumerRecord<String, String>> klist = Collections.singletonList(new ConsumerRecord<>("ATLAS_HOOK", 0, 0L, "mykey", json));
TopicPartition tp = new TopicPartition(ATLAS_HOOK_TOPIC,0);
List<ConsumerRecord<String, String>> klist = Collections.singletonList(new ConsumerRecord<>(ATLAS_HOOK_TOPIC, 0, 0L, "mykey", json));
Map mp = Collections.singletonMap(tp,klist);
ConsumerRecords records = new ConsumerRecords(mp);
......@@ -119,7 +121,7 @@ public class KafkaConsumerTest {
@Test
public void testCommitIsCalledIfAutoCommitDisabled() {
TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
TopicPartition tp = new TopicPartition(ATLAS_HOOK_TOPIC,0);
AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer, false, 100L);
consumer.commit(tp, 1);
......@@ -129,7 +131,7 @@ public class KafkaConsumerTest {
@Test
public void testCommitIsNotCalledIfAutoCommitEnabled() {
TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
TopicPartition tp = new TopicPartition(ATLAS_HOOK_TOPIC,0);
AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer, true , 100L);
consumer.commit(tp, 1);
......
......@@ -23,6 +23,7 @@ import kafka.utils.ShutdownableThread;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.RequestContext;
......@@ -92,6 +93,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private static final String ATTRIBUTE_INPUTS = "inputs";
private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName();
private static final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries";
......@@ -105,6 +107,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public static final int SERVER_READY_WAIT_TIME_MS = 1000;
private final AtlasEntityStore atlasEntityStore;
private final ServiceState serviceState;
private final AtlasInstanceConverter instanceConverter;
......@@ -601,7 +604,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
try {
recordFailedMessages();
TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition());
TopicPartition partition = new TopicPartition(ATLAS_HOOK_TOPIC, kafkaMessage.getPartition());
consumer.commit(partition, kafkaMessage.getOffset() + 1);
commitSucceessStatus = true;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment