Commit ab92cf1e by Andras Katona Committed by nixonrodrigues

ATLAS-3864: Follow up change, removed remaining usage of zkclient even from poms

Also removing kafka core dependency from kafka-bridge since it's not used any more Signed-off-by: 's avatarnixonrodrigues <nixon@apache.org>
parent 1907f640
......@@ -103,20 +103,6 @@
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>${zkclient.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${kafka.scala.binary.version}</artifactId>
<version>${kafka.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-webapp</artifactId>
<version>${jetty.version}</version>
......@@ -206,11 +192,6 @@
</artifactItem>
<artifactItem>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${kafka.scala.binary.version}</artifactId>
<version>${kafka.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</artifactItem>
......
......@@ -82,7 +82,7 @@ public class KafkaBridge {
public static void main(String[] args) {
int exitCode = EXIT_CODE_FAILED;
AtlasClientV2 atlasClientV2 = null;
KafkaBridge importer = null;
KafkaUtils kafkaUtils = null;
try {
Options options = new Options();
......@@ -111,8 +111,9 @@ public class KafkaBridge {
atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), urls);
}
importer = new KafkaBridge(atlasConf, atlasClientV2);
kafkaUtils = new KafkaUtils(atlasConf);
KafkaBridge importer = new KafkaBridge(atlasConf, atlasClientV2, kafkaUtils);
if (StringUtils.isNotEmpty(fileToImport)) {
File f = new File(fileToImport);
......@@ -146,25 +147,19 @@ public class KafkaBridge {
if (atlasClientV2 != null) {
atlasClientV2.close();
}
if (importer != null) {
importer.close();
if (kafkaUtils != null) {
kafkaUtils.close();
}
}
System.exit(exitCode);
}
public KafkaBridge(Configuration atlasConf, AtlasClientV2 atlasClientV2) throws Exception {
public KafkaBridge(Configuration atlasConf, AtlasClientV2 atlasClientV2, KafkaUtils kafkaUtils) throws Exception {
this.atlasClientV2 = atlasClientV2;
this.metadataNamespace = getMetadataNamespace(atlasConf);
this.kafkaUtils = new KafkaUtils(atlasConf);
this.availableTopics = kafkaUtils.listAllTopics();
}
public void close() {
if (this.kafkaUtils != null) {
this.kafkaUtils.close();
}
this.kafkaUtils = kafkaUtils;
this.availableTopics = this.kafkaUtils.listAllTopics();
}
private String getMetadataNamespace(Configuration config) {
......
......@@ -18,115 +18,127 @@
package org.apache.atlas.kafka.bridge;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasServiceException;
import org.I0Itec.zkclient.ZkClient;
import org.apache.atlas.kafka.bridge.KafkaBridge;
import org.apache.atlas.kafka.model.KafkaDataTypes;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.mockito.Mock;
import org.apache.atlas.utils.KafkaUtils;
import org.mockito.ArgumentCaptor;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import scala.Option;
import scala.collection.JavaConverters;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
public class KafkaBridgeTest {
private static final String TEST_TOPIC_NAME = "test_topic";
public static final String CLUSTER_NAME = "primary";
@Mock
private ZkClient zkClient;
@Mock
private ZkConnection zkConnection;
@Mock
private AtlasClient atlasClient;
@Mock
private AtlasClientV2 atlasClientV2;
@Mock
private AtlasEntity atlasEntity;
@Mock
EntityMutationResponse entityMutationResponse;
@Mock
KafkaBridge kafkaBridge;
public static final AtlasEntity.AtlasEntityWithExtInfo TOPIC_WITH_EXT_INFO = new AtlasEntity.AtlasEntityWithExtInfo(
getTopicEntityWithGuid("0dd466a4-3838-4537-8969-6abb8b9e9185"));
private static final String CLUSTER_NAME = "primary";
private static final String TOPIC_QUALIFIED_NAME = KafkaBridge.getTopicQualifiedName(CLUSTER_NAME, TEST_TOPIC_NAME);
@BeforeMethod
public void initializeMocks() {
MockitoAnnotations.initMocks(this);
}
@Test
public void testImportTopic() throws Exception {
List<String> topics = setupTopic(zkClient, TEST_TOPIC_NAME);
AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = new AtlasEntity.AtlasEntityWithExtInfo(
getTopicEntityWithGuid("0dd466a4-3838-4537-8969-6abb8b9e9185"));
KafkaBridge kafkaBridge = mock(KafkaBridge.class);
when(kafkaBridge.createEntityInAtlas(atlasEntityWithExtInfo)).thenReturn(atlasEntityWithExtInfo);
try {
kafkaBridge.importTopic(TEST_TOPIC_NAME);
} catch (Exception e) {
Assert.fail("KafkaBridge import failed ", e);
}
}
private void returnExistingTopic(String topicName, AtlasClientV2 atlasClientV2, String clusterName)
throws AtlasServiceException {
when(atlasClientV2.getEntityByAttribute(KafkaDataTypes.KAFKA_TOPIC.getName(),
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
getTopicQualifiedName(TEST_TOPIC_NAME,CLUSTER_NAME))))
.thenReturn((new AtlasEntity.AtlasEntityWithExtInfo(
getTopicEntityWithGuid("0dd466a4-3838-4537-8969-6abb8b9e9185"))));
}
private List<String> setupTopic(ZkClient zkClient, String topicName) {
List<String> topics = new ArrayList<>();
topics.add(topicName);
ZkUtils zkUtils = mock(ZkUtils.class);
when(zkUtils.getAllTopics()).thenReturn(JavaConverters.asScalaIteratorConverter(topics.iterator()).asScala().toSeq());
return topics;
}
private AtlasEntity getTopicEntityWithGuid(String guid) {
private static AtlasEntity getTopicEntityWithGuid(String guid) {
AtlasEntity ret = new AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName());
ret.setGuid(guid);
return ret;
}
private AtlasEntity createTopicReference() {
AtlasEntity topicEntity = new AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName());
return topicEntity;
@Test
public void testImportTopic() throws Exception {
KafkaUtils mockKafkaUtils = mock(KafkaUtils.class);
when(mockKafkaUtils.listAllTopics())
.thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
when(mockKafkaUtils.getPartitionCount(TEST_TOPIC_NAME))
.thenReturn(3);
EntityMutationResponse mockCreateResponse = mock(EntityMutationResponse.class);
AtlasEntityHeader mockAtlasEntityHeader = mock(AtlasEntityHeader.class);
when(mockAtlasEntityHeader.getGuid()).thenReturn(TOPIC_WITH_EXT_INFO.getEntity().getGuid());
when(mockCreateResponse.getCreatedEntities())
.thenReturn(Collections.singletonList(mockAtlasEntityHeader));
AtlasClientV2 mockAtlasClientV2 = mock(AtlasClientV2.class);
when(mockAtlasClientV2.createEntity(any()))
.thenReturn(mockCreateResponse);
when(mockAtlasClientV2.getEntityByGuid(TOPIC_WITH_EXT_INFO.getEntity().getGuid()))
.thenReturn(TOPIC_WITH_EXT_INFO);
KafkaBridge bridge = new KafkaBridge(ApplicationProperties.get(), mockAtlasClientV2, mockKafkaUtils);
bridge.importTopic(TEST_TOPIC_NAME);
ArgumentCaptor<AtlasEntity.AtlasEntityWithExtInfo> argumentCaptor = ArgumentCaptor.forClass(AtlasEntity.AtlasEntityWithExtInfo.class);
verify(mockAtlasClientV2).createEntity(argumentCaptor.capture());
AtlasEntity.AtlasEntityWithExtInfo entity = argumentCaptor.getValue();
assertEquals(entity.getEntity().getAttribute("qualifiedName"), TOPIC_QUALIFIED_NAME);
}
private String createTestTopic(String testTopic) {
return new String(testTopic);
@Test
public void testCreateTopic() throws Exception {
KafkaUtils mockKafkaUtils = mock(KafkaUtils.class);
when(mockKafkaUtils.listAllTopics())
.thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
when(mockKafkaUtils.getPartitionCount(TEST_TOPIC_NAME))
.thenReturn(3);
EntityMutationResponse mockCreateResponse = mock(EntityMutationResponse.class);
AtlasEntityHeader mockAtlasEntityHeader = mock(AtlasEntityHeader.class);
when(mockAtlasEntityHeader.getGuid()).thenReturn(TOPIC_WITH_EXT_INFO.getEntity().getGuid());
when(mockCreateResponse.getCreatedEntities())
.thenReturn(Collections.singletonList(mockAtlasEntityHeader));
AtlasClientV2 mockAtlasClientV2 = mock(AtlasClientV2.class);
when(mockAtlasClientV2.createEntity(any()))
.thenReturn(mockCreateResponse);
when(mockAtlasClientV2.getEntityByGuid(TOPIC_WITH_EXT_INFO.getEntity().getGuid()))
.thenReturn(TOPIC_WITH_EXT_INFO);
KafkaBridge bridge = new KafkaBridge(ApplicationProperties.get(), mockAtlasClientV2, mockKafkaUtils);
AtlasEntity.AtlasEntityWithExtInfo ret = bridge.createOrUpdateTopic(TEST_TOPIC_NAME);
assertEquals(TOPIC_WITH_EXT_INFO, ret);
}
private static String getTopicQualifiedName(String clusterName, String topic) {
return String.format("%s@%s", topic.toLowerCase(), clusterName);
@Test
public void testUpdateTopic() throws Exception {
KafkaUtils mockKafkaUtils = mock(KafkaUtils.class);
when(mockKafkaUtils.listAllTopics())
.thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
when(mockKafkaUtils.getPartitionCount(TEST_TOPIC_NAME))
.thenReturn(3);
EntityMutationResponse mockUpdateResponse = mock(EntityMutationResponse.class);
AtlasEntityHeader mockAtlasEntityHeader = mock(AtlasEntityHeader.class);
when(mockAtlasEntityHeader.getGuid()).thenReturn(TOPIC_WITH_EXT_INFO.getEntity().getGuid());
when(mockUpdateResponse.getUpdatedEntities())
.thenReturn(Collections.singletonList(mockAtlasEntityHeader));
AtlasClientV2 mockAtlasClientV2 = mock(AtlasClientV2.class);
when(mockAtlasClientV2.getEntityByAttribute(eq(KafkaDataTypes.KAFKA_TOPIC.getName()), any()))
.thenReturn(TOPIC_WITH_EXT_INFO);
when(mockAtlasClientV2.updateEntity(any()))
.thenReturn(mockUpdateResponse);
when(mockAtlasClientV2.getEntityByGuid(TOPIC_WITH_EXT_INFO.getEntity().getGuid()))
.thenReturn(TOPIC_WITH_EXT_INFO);
KafkaBridge bridge = new KafkaBridge(ApplicationProperties.get(), mockAtlasClientV2, mockKafkaUtils);
AtlasEntity.AtlasEntityWithExtInfo ret = bridge.createOrUpdateTopic(TEST_TOPIC_NAME);
assertEquals(TOPIC_WITH_EXT_INFO, ret);
}
}
\ No newline at end of file
......@@ -83,11 +83,6 @@
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-intg</artifactId>
<classifier>tests</classifier>
......@@ -179,11 +174,6 @@
<version>${kafka.version}</version>
</artifactItem>
<artifactItem>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>${zkclient.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
......
......@@ -747,7 +747,6 @@
<testng.version>6.9.4</testng.version>
<tinkerpop.version>3.4.6</tinkerpop.version>
<woodstox-core.version>5.0.3</woodstox-core.version>
<zkclient.version>0.8</zkclient.version>
<zookeeper.version>3.4.6</zookeeper.version>
</properties>
......@@ -1657,12 +1656,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>${zkclient.version}</version>
</dependency>
<!-- Fix for cassandra-all tranitive dependency CVE-2017-18640 : https://nvd.nist.gov/vuln/detail/CVE-2017-18640 -->
<dependency>
<groupId>org.yaml</groupId>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment