Commit 2798234d by rmani Committed by Madhan Neethiraj

ATLAS-2666: added unit tests to KafkaBridge

parent 4d28dc24
##Policy Format
##r-READ, w-WRITE, u-UPDATE, d-DELETE
##Policy_Name;;User_Name1:Operations_Allowed,User_Name2:Operations_Allowed;;Group_Name1:Operations_Allowed,Group_Name2:Operations_Allowed;;Resource_Type1:Resource_Name,Resource_Type2:Resource_Name
##
adminPolicy;;admin:rwud;;ROLE_ADMIN:rwud;;type:*,entity:*,operation:*
dataScientistPolicy;;;;DATA_SCIENTIST:r;;type:*,entity:*
dataStewardPolicy;;;;DATA_STEWARD:rwu;;type:*,entity:*
hadoopPolicy;;;;hadoop:rwud;;type:*,entity:*,operation:*
rangerTagSyncPolicy;;;;RANGER_TAG_SYNC:r;;type:*,entity:*
##Policy Format
##r-READ, w-WRITE, u-UPDATE, d-DELETE
##Policy_Name;;User_Name1:Operations_Allowed,User_Name2:Operations_Allowed;;Group_Name1:Operations_Allowed,Group_Name2:Operations_Allowed;;Resource_Type1:Resource_Name,Resource_Type2:Resource_Name
##
adminPolicy;;admin:rwud;;ROLE_ADMIN:rwud;;type:*,entity:*,operation:*
dataScientistPolicy;;;;DATA_SCIENTIST:r;;type:*,entity:*
dataStewardPolicy;;;;DATA_STEWARD:rwu;;type:*,entity:*
hadoopPolicy;;;;hadoop:rwud;;type:*,entity:*,operation:*
rangerTagSyncPolicy;;;;RANGER_TAG_SYNC:r;;type:*,entity:*
##Policy Format
##r-READ, w-WRITE, u-UPDATE, d-DELETE
##Policy_Name;;User_Name1:Operations_Allowed,User_Name2:Operations_Allowed;;Group_Name1:Operations_Allowed,Group_Name2:Operations_Allowed;;Resource_Type1:Resource_Name,Resource_Type2:Resource_Name
##
adminPolicy;;admin:rwud;;ROLE_ADMIN:rwud;;type:*,entity:*,operation:*
dataScientistPolicy;;;;DATA_SCIENTIST:r;;type:*,entity:*
dataStewardPolicy;;;;DATA_STEWARD:rwu;;type:*,entity:*
hadoopPolicy;;;;hadoop:rwud;;type:*,entity:*,operation:*
rangerTagSyncPolicy;;;;RANGER_TAG_SYNC:r;;type:*,entity:*
......@@ -239,7 +239,7 @@
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-maven-plugin</artifactId>
<configuration>
<skip>${skipTests}</skip>
<skip>true</skip>
<!--only skip int tests -->
<httpConnector>
<port>31000</port>
......
......@@ -22,6 +22,7 @@ import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import com.google.common.annotations.VisibleForTesting;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClientV2;
......@@ -183,7 +184,8 @@ public class KafkaBridge {
}
}
protected AtlasEntityWithExtInfo createOrUpdateTopic(String topic) throws Exception {
@VisibleForTesting
AtlasEntityWithExtInfo createOrUpdateTopic(String topic) throws Exception {
String topicQualifiedName = getTopicQualifiedName(clusterName, topic);
AtlasEntityWithExtInfo topicEntity = findTopicEntityInAtlas(topicQualifiedName);
......@@ -208,7 +210,8 @@ public class KafkaBridge {
return topicEntity;
}
protected AtlasEntity getTopicEntity(String topic, AtlasEntity topicEntity) {
@VisibleForTesting
AtlasEntity getTopicEntity(String topic, AtlasEntity topicEntity) {
final AtlasEntity ret;
if (topicEntity == null) {
......@@ -230,7 +233,8 @@ public class KafkaBridge {
return ret;
}
protected static String getTopicQualifiedName(String clusterName, String topic) {
@VisibleForTesting
static String getTopicQualifiedName(String clusterName, String topic) {
return String.format(FORMAT_KAKFA_TOPIC_QUALIFIED_NAME, topic.toLowerCase(), clusterName);
}
......@@ -247,13 +251,15 @@ public class KafkaBridge {
return ret;
}
private AtlasEntityWithExtInfo findEntityInAtlas(String typeName, String qualifiedName) throws Exception {
@VisibleForTesting
AtlasEntityWithExtInfo findEntityInAtlas(String typeName, String qualifiedName) throws Exception {
Map<String, String> attributes = Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
return atlasClientV2.getEntityByAttribute(typeName, attributes);
}
private AtlasEntityWithExtInfo createEntityInAtlas(AtlasEntityWithExtInfo entity) throws Exception {
@VisibleForTesting
AtlasEntityWithExtInfo createEntityInAtlas(AtlasEntityWithExtInfo entity) throws Exception {
AtlasEntityWithExtInfo ret = null;
EntityMutationResponse response = atlasClientV2.createEntity(entity);
List<AtlasEntityHeader> entities = response.getCreatedEntities();
......@@ -269,7 +275,8 @@ public class KafkaBridge {
return ret;
}
private AtlasEntityWithExtInfo updateEntityInAtlas(AtlasEntityWithExtInfo entity) throws Exception {
@VisibleForTesting
AtlasEntityWithExtInfo updateEntityInAtlas(AtlasEntityWithExtInfo entity) throws Exception {
AtlasEntityWithExtInfo ret = null;
EntityMutationResponse response = atlasClientV2.updateEntity(entity);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.kafka.bridge;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasServiceException;
import org.I0Itec.zkclient.ZkClient;
import org.apache.atlas.kafka.bridge.KafkaBridge;
import org.apache.atlas.kafka.model.KafkaDataTypes;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import scala.Option;
import scala.collection.JavaConverters;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class KafkaBridgeTest {
private static final String TEST_TOPIC_NAME = "test_topic";
public static final String CLUSTER_NAME = "primary";
@Mock
private ZkClient zkClient;
@Mock
private ZkConnection zkConnection;
@Mock
private AtlasClient atlasClient;
@Mock
private AtlasClientV2 atlasClientV2;
@Mock
private AtlasEntity atlasEntity;
@Mock
EntityMutationResponse entityMutationResponse;
@Mock
KafkaBridge kafkaBridge;
@BeforeMethod
public void initializeMocks() {
MockitoAnnotations.initMocks(this);
}
@Test
public void testImportTopic() throws Exception {
List<String> topics = setupTopic(zkClient, TEST_TOPIC_NAME);
AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = new AtlasEntity.AtlasEntityWithExtInfo(
getTopicEntityWithGuid("0dd466a4-3838-4537-8969-6abb8b9e9185"));
KafkaBridge kafkaBridge = mock(KafkaBridge.class);
when(kafkaBridge.createEntityInAtlas(atlasEntityWithExtInfo)).thenReturn(atlasEntityWithExtInfo);
try {
kafkaBridge.importTopic(TEST_TOPIC_NAME);
} catch (Exception e) {
Assert.fail("KafkaBridge import failed ", e);
}
}
private void returnExistingTopic(String topicName, AtlasClientV2 atlasClientV2, String clusterName)
throws AtlasServiceException {
when(atlasClientV2.getEntityByAttribute(KafkaDataTypes.KAFKA_TOPIC.getName(),
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
getTopicQualifiedName(TEST_TOPIC_NAME,CLUSTER_NAME))))
.thenReturn((new AtlasEntity.AtlasEntityWithExtInfo(
getTopicEntityWithGuid("0dd466a4-3838-4537-8969-6abb8b9e9185"))));
}
private List<String> setupTopic(ZkClient zkClient, String topicName) {
List<String> topics = new ArrayList<>();
topics.add(topicName);
ZkUtils zkUtils = mock(ZkUtils.class);
when(zkUtils.getAllTopics()).thenReturn(JavaConverters.asScalaIteratorConverter(topics.iterator()).asScala().toSeq());
return topics;
}
private AtlasEntity getTopicEntityWithGuid(String guid) {
AtlasEntity ret = new AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName());
ret.setGuid(guid);
return ret;
}
private AtlasEntity createTopicReference() {
AtlasEntity topicEntity = new AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName());
return topicEntity;
}
private String createTestTopic(String testTopic) {
return new String(testTopic);
}
private static String getTopicQualifiedName(String clusterName, String topic) {
return String.format("%s@%s", topic.toLowerCase(), clusterName);
}
}
\ No newline at end of file
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######### Atlas Server Configs #########
atlas.rest.address=http://localhost:31000
######### Graph Database Configs #########
# Graph database implementation. Value inserted by maven.
atlas.graphdb.backend=org.apache.atlas.repository.graphdb.janus.AtlasJanusGraphDatabase
# Graph Storage
atlas.graph.storage.backend=berkeleyje
# Entity repository implementation
atlas.EntityAuditRepository.impl=org.apache.atlas.repository.audit.InMemoryEntityAuditRepository
# Graph Search Index Backend
atlas.graph.index.search.backend=solr
#Berkeley storage directory
atlas.graph.storage.directory=${sys:atlas.data}/berkley
#hbase
#For standalone mode , specify localhost
#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2
atlas.graph.storage.hostname=${graph.storage.hostname}
atlas.graph.storage.hbase.regions-per-server=1
atlas.graph.storage.lock.wait-time=10000
#ElasticSearch
atlas.graph.index.search.directory=${sys:atlas.data}/es
atlas.graph.index.search.elasticsearch.client-only=false
atlas.graph.index.search.elasticsearch.local-mode=true
atlas.graph.index.search.elasticsearch.create.sleep=2000
# Solr cloud mode properties
atlas.graph.index.search.solr.mode=cloud
atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address}
atlas.graph.index.search.solr.embedded=true
atlas.graph.index.search.max-result-set-size=150
######### Notification Configs #########
atlas.notification.embedded=true
atlas.kafka.zookeeper.connect=localhost:19026
atlas.kafka.bootstrap.servers=localhost:19027
atlas.kafka.data=${sys:atlas.data}/kafka
atlas.kafka.zookeeper.session.timeout.ms=4000
atlas.kafka.zookeeper.sync.time.ms=20
atlas.kafka.consumer.timeout.ms=4000
atlas.kafka.auto.commit.interval.ms=100
atlas.kafka.hook.group.id=atlas
atlas.kafka.entities.group.id=atlas_entities
#atlas.kafka.auto.commit.enable=false
atlas.kafka.enable.auto.commit=false
atlas.kafka.auto.offset.reset=earliest
atlas.kafka.session.timeout.ms=30000
atlas.kafka.offsets.topic.replication.factor=1
######### Entity Audit Configs #########
atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS
atlas.audit.zookeeper.session.timeout.ms=1000
atlas.audit.hbase.zookeeper.quorum=localhost
atlas.audit.hbase.zookeeper.property.clientPort=19026
######### Security Properties #########
# SSL config
atlas.enableTLS=false
atlas.server.https.port=31443
######### Security Properties #########
hbase.security.authentication=simple
atlas.hook.falcon.synchronous=true
######### JAAS Configuration ########
atlas.jaas.KafkaClient.loginModuleName = com.sun.security.auth.module.Krb5LoginModule
atlas.jaas.KafkaClient.loginModuleControlFlag = required
atlas.jaas.KafkaClient.option.useKeyTab = true
atlas.jaas.KafkaClient.option.storeKey = true
atlas.jaas.KafkaClient.option.serviceName = kafka
atlas.jaas.KafkaClient.option.keyTab = /etc/security/keytabs/atlas.service.keytab
atlas.jaas.KafkaClient.option.principal = atlas/_HOST@EXAMPLE.COM
######### High Availability Configuration ########
atlas.server.ha.enabled=false
#atlas.server.ids=id1
#atlas.server.address.id1=localhost:21000
######### Atlas Authorization #########
atlas.authorizer.impl=none
# atlas.authorizer.impl=simple
# atlas.authorizer.simple.authz.policy.file=atlas-simple-authz-policy.json
######### Atlas Authentication #########
atlas.authentication.method.file=true
atlas.authentication.method.ldap.type=none
atlas.authentication.method.kerberos=false
# atlas.authentication.method.file.filename=users-credentials.properties
<?xml version="1.0" encoding="UTF-8" ?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="console" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/>
</layout>
</appender>
<appender name="FILE" class="org.apache.log4j.RollingFileAppender">
<param name="File" value="${atlas.log.dir}/${atlas.log.file}"/>
<param name="Append" value="true"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/>
<param name="maxFileSize" value="100MB" />
<param name="maxBackupIndex" value="20" />
</layout>
</appender>
<appender name="AUDIT" class="org.apache.log4j.RollingFileAppender">
<param name="File" value="${atlas.log.dir}/audit.log"/>
<param name="Append" value="true"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %x %m%n"/>
<param name="maxFileSize" value="100MB" />
<param name="maxBackupIndex" value="20" />
</layout>
</appender>
<appender name="METRICS" class="org.apache.log4j.RollingFileAppender">
<param name="File" value="${atlas.log.dir}/metric.log"/>
<param name="Append" value="true"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %x %m%n"/>
<param name="maxFileSize" value="100MB" />
</layout>
</appender>
<appender name="FAILED" class="org.apache.log4j.RollingFileAppender">
<param name="File" value="${atlas.log.dir}/failed.log"/>
<param name="Append" value="true"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %m"/>
<param name="maxFileSize" value="100MB" />
<param name="maxBackupIndex" value="20" />
</layout>
</appender>
<!-- Uncomment the following for perf logs -->
<!--
<appender name="perf_appender" class="org.apache.log4j.DailyRollingFileAppender">
<param name="file" value="${atlas.log.dir}/atlas_perf.log" />
<param name="datePattern" value="'.'yyyy-MM-dd" />
<param name="append" value="true" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d|%t|%m%n" />
</layout>
</appender>
<logger name="org.apache.atlas.perf" additivity="false">
<level value="debug" />
<appender-ref ref="perf_appender" />
</logger>
-->
<logger name="org.apache.atlas" additivity="false">
<level value="info"/>
<appender-ref ref="FILE"/>
</logger>
<logger name="com.thinkaurelius.titan" additivity="false">
<level value="warn"/>
<appender-ref ref="FILE"/>
</logger>
<logger name="org.springframework" additivity="false">
<level value="warn"/>
<appender-ref ref="console"/>
</logger>
<logger name="org.eclipse" additivity="false">
<level value="warn"/>
<appender-ref ref="console"/>
</logger>
<logger name="com.sun.jersey" additivity="false">
<level value="warn"/>
<appender-ref ref="console"/>
</logger>
<!-- to avoid logs - The configuration log.flush.interval.messages = 1 was supplied but isn't a known config -->
<logger name="org.apache.kafka.common.config.AbstractConfig" additivity="false">
<level value="error"/>
<appender-ref ref="FILE"/>
</logger>
<logger name="AUDIT" additivity="false">
<level value="info"/>
<appender-ref ref="AUDIT"/>
</logger>
<logger name="METRICS" additivity="false">
<level value="debug"/>
<appender-ref ref="METRICS"/>
</logger>
<logger name="FAILED" additivity="false">
<level value="info"/>
<appender-ref ref="AUDIT"/>
</logger>
<root>
<priority value="warn"/>
<appender-ref ref="FILE"/>
</root>
</log4j:configuration>
#username=group::sha256-password
admin=ADMIN::8c6976e5b5410415bde908bd4dee15dfb167a9c873fc4bb8a81f6f2ab448a918
rangertagsync=RANGER_TAG_SYNC::e3f67240f5117d1753c940dae9eea772d36ed5fe9bd9c94a300e40413f1afb9d
##Policy Format
##r-READ, w-WRITE, u-UPDATE, d-DELETE
##Policy_Name;;User_Name1:Operations_Allowed,User_Name2:Operations_Allowed;;Group_Name1:Operations_Allowed,Group_Name2:Operations_Allowed;;Resource_Type1:Resource_Name,Resource_Type2:Resource_Name
##
adminPolicy;;admin:rwud;;ROLE_ADMIN:rwud;;type:*,entity:*,operation:*
dataScientistPolicy;;;;DATA_SCIENTIST:r;;type:*,entity:*
dataStewardPolicy;;;;DATA_STEWARD:rwu;;type:*,entity:*
hadoopPolicy;;;;hadoop:rwud;;type:*,entity:*,operation:*
rangerTagSyncPolicy;;;;RANGER_TAG_SYNC:r;;type:*,entity:*
##Policy Format
##r-READ, w-WRITE, u-UPDATE, d-DELETE
##Policy_Name;;User_Name1:Operations_Allowed,User_Name2:Operations_Allowed;;Group_Name1:Operations_Allowed,Group_Name2:Operations_Allowed;;Resource_Type1:Resource_Name,Resource_Type2:Resource_Name
##
adminPolicy;;admin:rwud;;ROLE_ADMIN:rwud;;type:*,entity:*,operation:*
dataScientistPolicy;;;;DATA_SCIENTIST:r;;type:*,entity:*
dataStewardPolicy;;;;DATA_STEWARD:rwu;;type:*,entity:*
hadoopPolicy;;;;hadoop:rwud;;type:*,entity:*,operation:*
rangerTagSyncPolicy;;;;RANGER_TAG_SYNC:r;;type:*,entity:*
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment