Commit 009330de by Suma Shivaprasad

ATLAS-572 Handle secure instance of Zookeeper for leader election.(yhemanth via sumasai)

parent 47619ee6
......@@ -20,8 +20,6 @@ package org.apache.atlas.ha;
import org.apache.atlas.security.SecurityProperties;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
......@@ -31,24 +29,29 @@ import java.util.List;
*/
public final class HAConfiguration {
public static final String ATLAS_SERVER_ZK_ROOT_DEFAULT = "/apache_atlas";
private HAConfiguration() {
}
private static final Logger LOG = LoggerFactory.getLogger(HAConfiguration.class);
public static final String ATLAS_SERVER_HA_PREFIX = "atlas.server.ha.";
public static final String ZOOKEEPER_PREFIX = "zookeeper.";
public static final String ATLAS_SERVER_HA_ZK_ROOT_KEY = ATLAS_SERVER_HA_PREFIX + ZOOKEEPER_PREFIX + "zkroot";
public static final String ATLAS_SERVER_HA_ENABLED_KEY = ATLAS_SERVER_HA_PREFIX + "enabled";
public static final String ATLAS_SERVER_ADDRESS_PREFIX = "atlas.server.address.";
public static final String ATLAS_SERVER_IDS = "atlas.server.ids";
public static final String HA_ZOOKEEPER_CONNECT = ATLAS_SERVER_HA_PREFIX + "zookeeper.connect";
public static final String HA_ZOOKEEPER_CONNECT = ATLAS_SERVER_HA_PREFIX + ZOOKEEPER_PREFIX + "connect";
public static final int DEFAULT_ZOOKEEPER_CONNECT_SLEEPTIME_MILLIS = 1000;
public static final String HA_ZOOKEEPER_RETRY_SLEEPTIME_MILLIS =
ATLAS_SERVER_HA_PREFIX + "zookeeper.retry.sleeptime.ms";
public static final String HA_ZOOKEEPER_NUM_RETRIES = ATLAS_SERVER_HA_PREFIX + "zookeeper.num.retries";
ATLAS_SERVER_HA_PREFIX + ZOOKEEPER_PREFIX + "retry.sleeptime.ms";
public static final String HA_ZOOKEEPER_NUM_RETRIES = ATLAS_SERVER_HA_PREFIX + ZOOKEEPER_PREFIX + "num.retries";
public static final int DEFAULT_ZOOKEEPER_CONNECT_NUM_RETRIES = 3;
public static final String HA_ZOOKEEPER_SESSION_TIMEOUT_MS =
ATLAS_SERVER_HA_PREFIX + "zookeeper.session.timeout.ms";
ATLAS_SERVER_HA_PREFIX + ZOOKEEPER_PREFIX + "session.timeout.ms";
public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MILLIS = 20000;
public static final String HA_ZOOKEEPER_ACL = ATLAS_SERVER_HA_PREFIX + ZOOKEEPER_PREFIX + "acl";
public static final String HA_ZOOKEEPER_AUTH = ATLAS_SERVER_HA_PREFIX + ZOOKEEPER_PREFIX + "auth";
/**
* Return whether HA is enabled or not.
......@@ -90,16 +93,22 @@ public final class HAConfiguration {
*/
public static class ZookeeperProperties {
private String connectString;
private String zkRoot;
private int retriesSleepTimeMillis;
private int numRetries;
private int sessionTimeout;
private String acl;
private String auth;
public ZookeeperProperties(String connectString, int retriesSleepTimeMillis, int numRetries,
int sessionTimeout) {
public ZookeeperProperties(String connectString, String zkRoot, int retriesSleepTimeMillis, int numRetries,
int sessionTimeout, String acl, String auth) {
this.connectString = connectString;
this.zkRoot = zkRoot;
this.retriesSleepTimeMillis = retriesSleepTimeMillis;
this.numRetries = numRetries;
this.sessionTimeout = sessionTimeout;
this.acl = acl;
this.auth = auth;
}
public String getConnectString() {
......@@ -118,6 +127,18 @@ public final class HAConfiguration {
return sessionTimeout;
}
public String getAcl() {
return acl;
}
public String getAuth() {
return auth;
}
public String getZkRoot() {
return zkRoot;
}
@Override
public boolean equals(Object o) {
if (this == o) {
......@@ -132,35 +153,53 @@ public final class HAConfiguration {
if (retriesSleepTimeMillis != that.retriesSleepTimeMillis) {
return false;
}
if (numRetries != that.numRetries) {
return false;
}
if (sessionTimeout != that.sessionTimeout) {
return false;
}
return !(connectString != null ? !connectString.equals(that.connectString) : that.connectString != null);
if (!connectString.equals(that.connectString)) {
return false;
}
if (!zkRoot.equals(that.zkRoot)) {
return false;
}
if (acl != null ? !acl.equals(that.acl) : that.acl != null) {
return false;
}
return !(auth != null ? !auth.equals(that.auth) : that.auth != null);
}
@Override
public int hashCode() {
int result = connectString != null ? connectString.hashCode() : 0;
int result = connectString.hashCode();
result = 31 * result + zkRoot.hashCode();
result = 31 * result + retriesSleepTimeMillis;
result = 31 * result + numRetries;
result = 31 * result + sessionTimeout;
result = 31 * result + (acl != null ? acl.hashCode() : 0);
result = 31 * result + (auth != null ? auth.hashCode() : 0);
return result;
}
public boolean hasAcl() {
return getAcl()!=null;
}
public boolean hasAuth() {
return getAuth()!=null;
}
}
public static ZookeeperProperties getZookeeperProperties(Configuration configuration) {
String zookeeperConnectString = configuration.getString("atlas.kafka.zookeeper.connect");
String zookeeperConnectString = configuration.getString("atlas.kafka." + ZOOKEEPER_PREFIX + "connect");
if (configuration.containsKey(HA_ZOOKEEPER_CONNECT)) {
zookeeperConnectString = configuration.getString(HA_ZOOKEEPER_CONNECT);
}
String zkRoot = configuration.getString(ATLAS_SERVER_HA_ZK_ROOT_KEY, ATLAS_SERVER_ZK_ROOT_DEFAULT);
int retriesSleepTimeMillis = configuration.getInt(HA_ZOOKEEPER_RETRY_SLEEPTIME_MILLIS,
DEFAULT_ZOOKEEPER_CONNECT_SLEEPTIME_MILLIS);
......@@ -168,6 +207,10 @@ public final class HAConfiguration {
int sessionTimeout = configuration.getInt(HA_ZOOKEEPER_SESSION_TIMEOUT_MS,
DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MILLIS);
return new ZookeeperProperties(zookeeperConnectString, retriesSleepTimeMillis, numRetries, sessionTimeout);
String acl = configuration.getString(HA_ZOOKEEPER_ACL);
String auth = configuration.getString(HA_ZOOKEEPER_AUTH);
return new ZookeeperProperties(zookeeperConnectString, zkRoot, retriesSleepTimeMillis, numRetries,
sessionTimeout, acl, auth);
}
}
......@@ -64,4 +64,22 @@ public class HAConfigurationTest {
assertTrue(serverInstances.contains("http://127.0.0.1:21000"));
assertTrue(serverInstances.contains("http://127.0.0.1:31000"));
}
@Test
public void testShouldGetZookeeperAcl() {
when(configuration.getString(HAConfiguration.HA_ZOOKEEPER_ACL)).thenReturn("sasl:myclient@EXAMPLE.COM");
HAConfiguration.ZookeeperProperties zookeeperProperties =
HAConfiguration.getZookeeperProperties(configuration);
assertTrue(zookeeperProperties.hasAcl());
}
@Test
public void testShouldGetZookeeperAuth() {
when(configuration.getString(HAConfiguration.HA_ZOOKEEPER_AUTH)).thenReturn("sasl:myclient@EXAMPLE.COM");
HAConfiguration.ZookeeperProperties zookeeperProperties =
HAConfiguration.getZookeeperProperties(configuration);
assertTrue(zookeeperProperties.hasAuth());
}
}
......@@ -106,3 +106,6 @@ atlas.server.ha.enabled=false
#atlas.server.ha.zookeeper.retry.sleeptime.ms=1000
#atlas.server.ha.zookeeper.num.retries=3
#atlas.server.ha.zookeeper.session.timeout.ms=20000
## if ACLs need to be set on the created nodes, uncomment these lines and set the values ##
#atlas.server.ha.zookeeper.acl=<scheme>:<id>
#atlas.server.ha.zookeeper.auth=<scheme>:<authinfo>
\ No newline at end of file
......@@ -13,6 +13,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
ATLAS-572 Handle secure instance of Zookeeper for leader election.(yhemanth via sumasai)
ATLAS-605 Hook Notifications for DELETE entity needs to be supported (sumasai)
ATLAS-607 Add Support for delete entity through a qualifiedName (sumasai via yhemanth)
ATLAS-571 Modify Atlas client for necessary changes in context of HA (yhemanth via sumasai)
......
......@@ -108,7 +108,8 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene
private void joinElection() {
LOG.info("Starting leader election for {}", serverId);
leaderLatch = curatorFactory.leaderLatchInstance(serverId);
String zkRoot = HAConfiguration.getZookeeperProperties(configuration).getZkRoot();
leaderLatch = curatorFactory.leaderLatchInstance(serverId, zkRoot);
leaderLatch.addListener(this);
try {
leaderLatch.start();
......
......@@ -27,11 +27,17 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.actors.threadpool.Arrays;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
/**
* An object that encapsulates storing and retrieving state related to an Active Atlas server.
......@@ -45,7 +51,8 @@ public class ActiveInstanceState {
private final Configuration configuration;
private final CuratorFactory curatorFactory;
public static final String APACHE_ATLAS_ACTIVE_SERVER_INFO = "/apache_atlas_active_server_info";
public static final String APACHE_ATLAS_ACTIVE_SERVER_INFO = "/active_server_info";
private static final Logger LOG = LoggerFactory.getLogger(ActiveInstanceState.class);
/**
......@@ -80,14 +87,29 @@ public class ActiveInstanceState {
public void update(String serverId) throws Exception {
CuratorFramework client = curatorFactory.clientInstance();
String atlasServerAddress = HAConfiguration.getBoundAddressForId(configuration, serverId);
Stat serverInfo = client.checkExists().forPath(APACHE_ATLAS_ACTIVE_SERVER_INFO);
HAConfiguration.ZookeeperProperties zookeeperProperties =
HAConfiguration.getZookeeperProperties(configuration);
List<ACL> acls = ZooDefs.Ids.OPEN_ACL_UNSAFE;
if (zookeeperProperties.hasAcl()) {
acls = Arrays.asList(new ACL[]{AtlasZookeeperSecurityProperties.parseAcl(zookeeperProperties.getAcl())});
}
Stat serverInfo = client.checkExists().forPath(getZnodePath());
if (serverInfo == null) {
client.create().withMode(CreateMode.EPHEMERAL).forPath(APACHE_ATLAS_ACTIVE_SERVER_INFO);
client.create().
withMode(CreateMode.EPHEMERAL).
withACL(acls).
forPath(getZnodePath());
}
client.setData().forPath(APACHE_ATLAS_ACTIVE_SERVER_INFO,
client.setData().forPath(getZnodePath(),
atlasServerAddress.getBytes(Charset.forName("UTF-8")));
}
private String getZnodePath() {
String zkRoot = configuration.getString(HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY,
HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);
return zkRoot+APACHE_ATLAS_ACTIVE_SERVER_INFO;
}
/**
* Retrieve state of the active server instance.
*
......@@ -98,7 +120,7 @@ public class ActiveInstanceState {
CuratorFramework client = curatorFactory.clientInstance();
String serverAddress = null;
try {
byte[] bytes = client.getData().forPath(APACHE_ATLAS_ACTIVE_SERVER_INFO);
byte[] bytes = client.getData().forPath(getZnodePath());
serverAddress = new String(bytes, Charset.forName("UTF-8"));
} catch (Exception e) {
LOG.error("Error getting active server address", e);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.service;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.AuthInfo;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
/**
* A class that parses configuration strings into Zookeeper ACL and Auth values.
*/
public class AtlasZookeeperSecurityProperties {
/**
* Get an {@link ACL} by parsing input string.
* @param aclString A string of the form scheme:id
* @return {@link ACL} with the perms set to {@link org.apache.zookeeper.ZooDefs.Perms#ALL} and scheme and id
* taken from configuration values.
*/
public static ACL parseAcl(String aclString) {
String[] aclComponents = getComponents(aclString, "acl", "scheme:id");
return new ACL(ZooDefs.Perms.ALL, new Id(aclComponents[0], aclComponents[1]));
}
private static String[] getComponents(String securityString, String variableName, String formatExample) {
Preconditions.checkArgument(!StringUtils.isEmpty(securityString),
String.format("%s cannot be null or empty. " +
"Needs to be of form %s", variableName, formatExample));
String[] aclComponents = securityString.split(":", 2);
if (aclComponents.length != 2) {
throw new IllegalArgumentException(
String.format("Invalid %s string. " +
"Needs to be of form %s", variableName, formatExample));
}
return aclComponents;
}
/**
* Get an {@link AuthInfo} by parsing input string.
* @param authString A string of the form scheme:authString
* @return {@link AuthInfo} with the scheme and auth taken from configuration values.
*/
public static AuthInfo parseAuth(String authString) {
String[] authComponents = getComponents(authString, "authString", "scheme:authString");
return new AuthInfo(authComponents[0], authComponents[1].getBytes(Charsets.UTF_8));
}
}
......@@ -18,16 +18,28 @@
package org.apache.atlas.web.service;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.inject.Singleton;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.curator.framework.AuthInfo;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
/**
* A factory to create objects related to Curator.
......@@ -36,7 +48,16 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
*/
@Singleton
public class CuratorFactory {
public static final String APACHE_ATLAS_LEADER_ELECTOR_PATH = "/apache_atlas_leader_elector_path";
private static final Logger LOG = LoggerFactory.getLogger(CuratorFactory.class);
public static final String APACHE_ATLAS_LEADER_ELECTOR_PATH = "/leader_elector_path";
public static final String SASL_SCHEME = "sasl";
public static final String WORLD_SCHEME = "world";
public static final String ANYONE_ID = "anyone";
public static final String AUTH_SCHEME = "auth";
public static final String DIGEST_SCHEME = "digest";
public static final String IP_SCHEME = "ip";
private final Configuration configuration;
private CuratorFramework curatorFramework;
......@@ -46,19 +67,98 @@ public class CuratorFactory {
* @throws AtlasException
*/
public CuratorFactory() throws AtlasException {
configuration = ApplicationProperties.get();
this(ApplicationProperties.get());
}
public CuratorFactory(Configuration configuration) {
this.configuration = configuration;
initializeCuratorFramework();
}
private void initializeCuratorFramework() {
@VisibleForTesting
protected void initializeCuratorFramework() {
HAConfiguration.ZookeeperProperties zookeeperProperties =
HAConfiguration.getZookeeperProperties(configuration);
curatorFramework = CuratorFrameworkFactory.builder().
CuratorFrameworkFactory.Builder builder = getBuilder(zookeeperProperties);
enhanceBuilderWithSecurityParameters(zookeeperProperties, builder);
curatorFramework = builder.build();
curatorFramework.start();
}
@VisibleForTesting
void enhanceBuilderWithSecurityParameters(HAConfiguration.ZookeeperProperties zookeeperProperties,
CuratorFrameworkFactory.Builder builder) {
ACLProvider aclProvider = getAclProvider(zookeeperProperties);
AuthInfo authInfo = null;
if (zookeeperProperties.hasAuth()) {
authInfo = AtlasZookeeperSecurityProperties.parseAuth(zookeeperProperties.getAuth());
}
if (aclProvider != null) {
LOG.info("Setting up acl provider.");
builder.aclProvider(aclProvider);
if (authInfo != null) {
byte[] auth = authInfo.getAuth();
LOG.info("Setting up auth provider with scheme: {} and id: {}", authInfo.getScheme(),
getIdForLogging(authInfo.getScheme(), new String(auth, Charsets.UTF_8)));
builder.authorization(authInfo.getScheme(), auth);
}
}
}
private String getCurrentUser() {
try {
return UserGroupInformation.getCurrentUser().getUserName();
} catch (IOException ioe) {
return "unknown";
}
}
private ACLProvider getAclProvider(HAConfiguration.ZookeeperProperties zookeeperProperties) {
ACLProvider aclProvider = null;
if (zookeeperProperties.hasAcl()) {
final ACL acl = AtlasZookeeperSecurityProperties.parseAcl(zookeeperProperties.getAcl());
LOG.info("Setting ACL for id {} with scheme {} and perms {}.",
getIdForLogging(acl.getId().getScheme(), acl.getId().getId()),
acl.getId().getScheme(), acl.getPerms());
LOG.info("Current logged in user: {}", getCurrentUser());
final List<ACL> acls = Arrays.asList(new ACL[]{acl});
aclProvider = new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return acls;
}
@Override
public List<ACL> getAclForPath(String path) {
return acls;
}
};
}
return aclProvider;
}
private String getIdForLogging(String scheme, String id) {
if (scheme.equalsIgnoreCase(SASL_SCHEME) ||
scheme.equalsIgnoreCase(IP_SCHEME)) {
return id;
} else if (scheme.equalsIgnoreCase(WORLD_SCHEME)) {
return ANYONE_ID;
} else if (scheme.equalsIgnoreCase(AUTH_SCHEME) ||
scheme.equalsIgnoreCase(DIGEST_SCHEME)) {
return id.split(":")[0];
}
return "unknown";
}
private CuratorFrameworkFactory.Builder getBuilder(HAConfiguration.ZookeeperProperties zookeeperProperties) {
return CuratorFrameworkFactory.builder().
connectString(zookeeperProperties.getConnectString()).
sessionTimeoutMs(zookeeperProperties.getSessionTimeout()).
retryPolicy(new ExponentialBackoffRetry(
zookeeperProperties.getRetriesSleepTimeMillis(), zookeeperProperties.getNumRetries())).build();
curatorFramework.start();
zookeeperProperties.getRetriesSleepTimeMillis(), zookeeperProperties.getNumRetries()));
}
/**
......@@ -86,9 +186,10 @@ public class CuratorFactory {
* @param serverId the ID used to register this instance with curator.
* This ID should typically be obtained using
* {@link org.apache.atlas.ha.AtlasServerIdSelector#selectServerId(Configuration)}
* @param zkRoot the root znode under which the leader latch node is added.
* @return
*/
public LeaderLatch leaderLatchInstance(String serverId) {
return new LeaderLatch(curatorFramework, APACHE_ATLAS_LEADER_ELECTOR_PATH, serverId);
public LeaderLatch leaderLatchInstance(String serverId, String zkRoot) {
return new LeaderLatch(curatorFramework, zkRoot+APACHE_ATLAS_LEADER_ELECTOR_PATH, serverId);
}
}
......@@ -28,12 +28,16 @@ import org.apache.curator.framework.api.SetDataBuilder;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import scala.actors.threadpool.Arrays;
import java.nio.charset.Charset;
......@@ -67,16 +71,20 @@ public class ActiveInstanceStateTest {
public void testSharedPathIsCreatedIfNotExists() throws Exception {
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn(HOST_PORT);
when(configuration.getString(
HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);
when(curatorFactory.clientInstance()).thenReturn(curatorFramework);
ExistsBuilder existsBuilder = mock(ExistsBuilder.class);
when(curatorFramework.checkExists()).thenReturn(existsBuilder);
when(existsBuilder.forPath(ActiveInstanceState.APACHE_ATLAS_ACTIVE_SERVER_INFO)).thenReturn(null);
when(existsBuilder.forPath(getPath())).thenReturn(null);
CreateBuilder createBuilder = mock(CreateBuilder.class);
when(curatorFramework.create()).thenReturn(createBuilder);
when(createBuilder.withMode(CreateMode.EPHEMERAL)).thenReturn(createBuilder);
when(createBuilder.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)).thenReturn(createBuilder);
SetDataBuilder setDataBuilder = mock(SetDataBuilder.class);
when(curatorFramework.setData()).thenReturn(setDataBuilder);
......@@ -84,17 +92,58 @@ public class ActiveInstanceStateTest {
ActiveInstanceState activeInstanceState = new ActiveInstanceState(configuration, curatorFactory);
activeInstanceState.update("id1");
verify(createBuilder).forPath(ActiveInstanceState.APACHE_ATLAS_ACTIVE_SERVER_INFO);
verify(createBuilder).forPath(getPath());
}
private String getPath() {
return HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT
+ ActiveInstanceState.APACHE_ATLAS_ACTIVE_SERVER_INFO;
}
@Test
public void testSharedPathIsCreatedWithRightACLIfNotExists() throws Exception {
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn(HOST_PORT);
when(configuration.getString(HAConfiguration.HA_ZOOKEEPER_ACL)).thenReturn("sasl:myclient@EXAMPLE.COM");
when(configuration.getString(
HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);
when(curatorFactory.clientInstance()).thenReturn(curatorFramework);
ExistsBuilder existsBuilder = mock(ExistsBuilder.class);
when(curatorFramework.checkExists()).thenReturn(existsBuilder);
when(existsBuilder.forPath(getPath())).thenReturn(null);
CreateBuilder createBuilder = mock(CreateBuilder.class);
when(curatorFramework.create()).thenReturn(createBuilder);
when(createBuilder.withMode(CreateMode.EPHEMERAL)).thenReturn(createBuilder);
ACL expectedAcl = new ACL(ZooDefs.Perms.ALL, new Id("sasl", "myclient@EXAMPLE.COM"));
when(createBuilder.
withACL(Arrays.asList(new ACL[]{expectedAcl}))).thenReturn(createBuilder);
SetDataBuilder setDataBuilder = mock(SetDataBuilder.class);
when(curatorFramework.setData()).thenReturn(setDataBuilder);
ActiveInstanceState activeInstanceState = new ActiveInstanceState(configuration, curatorFactory);
activeInstanceState.update("id1");
verify(createBuilder).forPath(getPath());
}
@Test
public void testDataIsUpdatedWithAtlasServerAddress() throws Exception {
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn(HOST_PORT);
when(configuration.getString(
HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);
when(curatorFactory.clientInstance()).thenReturn(curatorFramework);
ExistsBuilder existsBuilder = mock(ExistsBuilder.class);
when(curatorFramework.checkExists()).thenReturn(existsBuilder);
when(existsBuilder.forPath(ActiveInstanceState.APACHE_ATLAS_ACTIVE_SERVER_INFO)).thenReturn(new Stat());
when(existsBuilder.forPath(getPath())).thenReturn(new Stat());
SetDataBuilder setDataBuilder = mock(SetDataBuilder.class);
when(curatorFramework.setData()).thenReturn(setDataBuilder);
......@@ -103,17 +152,21 @@ public class ActiveInstanceStateTest {
activeInstanceState.update("id1");
verify(setDataBuilder).forPath(
ActiveInstanceState.APACHE_ATLAS_ACTIVE_SERVER_INFO,
getPath(),
SERVER_ADDRESS.getBytes(Charset.forName("UTF-8")));
}
@Test
public void testShouldReturnActiveServerAddress() throws Exception {
when(curatorFactory.clientInstance()).thenReturn(curatorFramework);
when(configuration.getString(
HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);
GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
when(curatorFramework.getData()).thenReturn(getDataBuilder);
when(getDataBuilder.forPath(ActiveInstanceState.APACHE_ATLAS_ACTIVE_SERVER_INFO)).
when(getDataBuilder.forPath(getPath())).
thenReturn(SERVER_ADDRESS.getBytes(Charset.forName("UTF-8")));
ActiveInstanceState activeInstanceState = new ActiveInstanceState(configuration, curatorFactory);
......@@ -125,10 +178,14 @@ public class ActiveInstanceStateTest {
@Test
public void testShouldHandleExceptionsInFetchingServerAddress() throws Exception {
when(curatorFactory.clientInstance()).thenReturn(curatorFramework);
when(configuration.getString(
HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);
GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
when(curatorFramework.getData()).thenReturn(getDataBuilder);
when(getDataBuilder.forPath(ActiveInstanceState.APACHE_ATLAS_ACTIVE_SERVER_INFO)).
when(getDataBuilder.forPath(getPath())).
thenThrow(new Exception());
ActiveInstanceState activeInstanceState = new ActiveInstanceState(configuration, curatorFactory);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.service;
import com.google.common.base.Charsets;
import org.apache.curator.framework.AuthInfo;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
public class AtlasZookeeperSecurityPropertiesTest {
@Test
public void shouldGetAcl() {
ACL acl = AtlasZookeeperSecurityProperties.parseAcl("sasl:myclient@EXAMPLE.COM");
assertEquals(acl.getId().getScheme(), "sasl");
assertEquals(acl.getId().getId(), "myclient@EXAMPLE.COM");
assertEquals(acl.getPerms(), ZooDefs.Perms.ALL);
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void shouldThrowExceptionForNullAcl() {
ACL acl = AtlasZookeeperSecurityProperties.parseAcl(null);
fail("Should have thrown exception for null ACL string");
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void shouldThrowExceptionForInvalidAclString() {
ACL acl = AtlasZookeeperSecurityProperties.parseAcl("randomAcl");
fail("Should have thrown exception for null ACL string");
}
@Test
public void idsWithColonsAreValid() {
ACL acl = AtlasZookeeperSecurityProperties.parseAcl("auth:user:password");
assertEquals(acl.getId().getScheme(), "auth");
assertEquals(acl.getId().getId(), "user:password");
}
@Test
public void shouldGetAuth() {
AuthInfo authInfo = AtlasZookeeperSecurityProperties.parseAuth("digest:user:password");
assertEquals(authInfo.getScheme(), "digest");
assertEquals(authInfo.getAuth(), "user:password".getBytes(Charsets.UTF_8));
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.service;
import com.google.common.base.Charsets;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static junit.framework.TestCase.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class CuratorFactoryTest {
@Mock
private Configuration configuration;
@Mock
private HAConfiguration.ZookeeperProperties zookeeperProperties;
@Mock
private CuratorFrameworkFactory.Builder builder;
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Test
public void shouldAddAuthorization() {
when(zookeeperProperties.hasAcl()).thenReturn(true);
when(zookeeperProperties.getAcl()).thenReturn("sasl:myclient@EXAMPLE.COM");
when(zookeeperProperties.hasAuth()).thenReturn(true);
when(zookeeperProperties.getAuth()).thenReturn("sasl:myclient@EXAMPLE.COM");
CuratorFactory curatorFactory = new CuratorFactory(configuration) {
@Override
protected void initializeCuratorFramework() {
}
};
curatorFactory.enhanceBuilderWithSecurityParameters(zookeeperProperties, builder);
verify(builder).aclProvider(any(ACLProvider.class));
verify(builder).authorization(eq("sasl"), eq("myclient@EXAMPLE.COM".getBytes(Charsets.UTF_8)));
}
@Test
public void shouldAddAclProviderWithRightACL() {
when(zookeeperProperties.hasAcl()).thenReturn(true);
when(zookeeperProperties.getAcl()).thenReturn("sasl:myclient@EXAMPLE.COM");
when(zookeeperProperties.hasAuth()).thenReturn(false);
CuratorFactory curatorFactory = new CuratorFactory(configuration) {
@Override
protected void initializeCuratorFramework() {
}
};
curatorFactory.enhanceBuilderWithSecurityParameters(zookeeperProperties, builder);
verify(builder).aclProvider(argThat(new ArgumentMatcher<ACLProvider>() {
@Override
public boolean matches(Object o) {
ACLProvider aclProvider = (ACLProvider) o;
ACL acl = aclProvider.getDefaultAcl().get(0);
return acl.getId().getId().equals("myclient@EXAMPLE.COM")
&& acl.getId().getScheme().equals("sasl");
}
}));
}
@Test
public void shouldNotAddAnySecureParameters() {
when(zookeeperProperties.hasAcl()).thenReturn(false);
when(zookeeperProperties.hasAuth()).thenReturn(false);
CuratorFactory curatorFactory = new CuratorFactory(configuration) {
@Override
protected void initializeCuratorFramework() {
}
};
verifyZeroInteractions(builder);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment