Commit 81a0c6ff by Suma Shivaprasad

ATLAS-628 Starting two Atlas instances at the same time causes exceptions in HA…

ATLAS-628 Starting two Atlas instances at the same time causes exceptions in HA mode (yhemanth via sumasai)
parent 922a83c9
......@@ -21,13 +21,15 @@ import traceback
import atlas_config as mc
ATLAS_LOG_OPTS="-Datlas.log.dir=%s -Datlas.log.file=application.log"
ATLAS_LOG_OPTS="-Datlas.log.dir=%s -Datlas.log.file=%s.log"
ATLAS_COMMAND_OPTS="-Datlas.home=%s"
ATLAS_CONFIG_OPTS="-Datlas.conf=%s"
DEFAULT_JVM_OPTS="-Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=atlas-log4j.xml -Djava.net.preferIPv4Stack=true"
def main():
is_setup = (len(sys.argv)>1) and sys.argv[1] is not None and sys.argv[1] == '-setup'
atlas_home = mc.atlasDir()
confdir = mc.dirMustExist(mc.confDir(atlas_home))
mc.executeEnvSh(confdir)
......@@ -43,7 +45,10 @@ def main():
jvm_logdir = logdir
#create sys property for conf dirs
jvm_opts_list = (ATLAS_LOG_OPTS % jvm_logdir).split()
if not is_setup:
jvm_opts_list = (ATLAS_LOG_OPTS % (jvm_logdir, "application")).split()
else:
jvm_opts_list = (ATLAS_LOG_OPTS % (jvm_logdir, "atlas_setup")).split()
cmd_opts = (ATLAS_COMMAND_OPTS % jvm_atlas_home)
jvm_opts_list.extend(cmd_opts.split())
......@@ -93,6 +98,8 @@ def main():
pf.close()
if mc.exist_pid((int)(pid)):
if is_setup:
print "Cannot run setup when server is running."
mc.server_already_running(pid)
else:
mc.server_pid_not_running(pid)
......@@ -100,14 +107,21 @@ def main():
web_app_path = os.path.join(web_app_dir, "atlas")
if (mc.isCygwin()):
web_app_path = mc.convertCygwinPath(web_app_path)
if not is_setup:
start_atlas_server(atlas_classpath, atlas_pid_file, jvm_logdir, jvm_opts_list, web_app_path)
else:
process = mc.java("org.apache.atlas.web.setup.AtlasSetup", [], atlas_classpath, jvm_opts_list, jvm_logdir)
return process.wait()
def start_atlas_server(atlas_classpath, atlas_pid_file, jvm_logdir, jvm_opts_list, web_app_path):
args = ["-app", web_app_path]
args.extend(sys.argv[1:])
process = mc.java("org.apache.atlas.Atlas", args, atlas_classpath, jvm_opts_list, jvm_logdir)
mc.writePid(atlas_pid_file, process)
print "Apache Atlas Server started!!!\n"
if __name__ == '__main__':
try:
returncode = main()
......
......@@ -96,6 +96,8 @@ atlas.http.authentication.type=simple
######### Server Properties #########
atlas.rest.address=http://localhost:21000
# If enabled and set to true, this will run setup steps when the server starts
#atlas.server.run.setup.on.start=false
######### Entity Audit Configs #########
atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS
......
......@@ -204,3 +204,10 @@ atlas.client.ha.retries=4
# Specify interval between retries for a client.
atlas.client.ha.sleep.interval.ms=5000
</verbatim>
---++ Server Properties
<verbatim>
# Set the following property to true, to enable the setup steps to run on each server start. Default = false.
atlas.server.run.setup.on.start=false
</verbatim>
......@@ -69,14 +69,14 @@ graph persistence for a different HBase instance, please see "Graph persistence
---+++ Installing & Running Atlas
*Installing Atlas*
---++++ Installing Atlas
<verbatim>
tar -xzvf apache-atlas-${project.version}-bin.tar.gz
cd atlas-${project.version}
</verbatim>
*Configuring Atlas*
---++++ Configuring Atlas
By default config directory used by Atlas is {package dir}/conf. To override this set environment
variable ATLAS_CONF to the path of the conf dir.
......@@ -196,7 +196,25 @@ Pre-requisites for running Solr in cloud mode
* !SolrCloud has support for replication and sharding. It is highly recommended to use !SolrCloud with at least two Solr nodes running on different servers with replication enabled.
If using !SolrCloud, then you also need !ZooKeeper installed and configured with 3 or 5 !ZooKeeper nodes
*Starting Atlas Server*
---++++ Setting up Atlas
There are a few steps that setup dependencies of Atlas. One such example is setting up the Titan schema
in the storage backend of choice. In a simple single server setup, these are automatically setup with default
configuration when the server first accesses these dependencies.
However, there are scenarios when we may want to run setup steps explicitly as one time operations. For example, in a
multiple server scenario using [[HighAvailability][High Availability]], it is preferable to run setup steps from one
of the server instances the first time, and then start the services.
To run these steps one time, execute the command =bin/atlas_start.py -setup= from a single Atlas server instance.
However, the Atlas server does take care of parallel executions of the setup steps. Also, running the setup steps multiple
times is idempotent. Therefore, if one chooses to run the setup steps as part of server startup, for convenience,
then they should enable the configuration option =atlas.server.run.setup.on.start= by defining it with the value =true=
in the =atlas-application.properties= file.
---++++ Starting Atlas Server
<verbatim>
bin/atlas_start.py [-port <port>]
</verbatim>
......@@ -205,7 +223,8 @@ By default,
* To change the port, use -port option.
* atlas server starts with conf from {package dir}/conf. To override this (to use the same conf with multiple atlas upgrades), set environment variable ATLAS_CONF to the path of conf dir
*Using Atlas*
---+++ Using Atlas
* Quick start model - sample model and data
<verbatim>
bin/quick_start.py [<atlas endpoint>]
......@@ -242,7 +261,21 @@ By default,
Once atlas is started, you can view the status of atlas entities using the Web-based dashboard. You can open your browser at the corresponding port to use the web UI.
*Stopping Atlas Server*
---+++ Stopping Atlas Server
<verbatim>
bin/atlas_stop.py
</verbatim>
---+++ Known Issues
---++++ Setup
If the setup of Atlas service fails due to any reason, the next run of setup (either by an explicit invocation of
=atlas_start.py -setup= or by enabling the configuration option =atlas.server.run.setup.on.start=) will fail with
a message such as =A previous setup run may not have completed cleanly.=. In such cases, you would need to manually
ensure the setup can run and delete the Zookeeper node at =/apache_atlas/setup_in_progress= before attempting to
run setup again.
If the setup failed due to HBase Titan schema setup errors, it may be necessary to repair the HBase schema. If no
data has been stored, one can also disable and drop the 'titan' schema in HBase to let setup run again.
\ No newline at end of file
......@@ -17,6 +17,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
ATLAS-628 Starting two Atlas instances at the same time causes exceptions in HA mode (yhemanth via sumasai)
ATLAS-594 alter table rename doesnt work across databases (sumasai via shwethags)
ATLAS-586 While updating the multiple attributes, Atlas returns the response with escape characters (dkantor via shwethags)
ATLAS-582 Move Atlas UI to use backboneJS (kevalbhatt18 via shwethags)
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graph;
import com.thinkaurelius.titan.core.TitanGraph;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.setup.SetupException;
import org.apache.atlas.setup.SetupStep;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@link SetupStep} that initializes the Graph backend for Atlas.
*
* This class will initialize the specific backend implementation specified in
* the Atlas configuration for the key atlas.graph.storage.backend.
*/
public class GraphSchemaInitializer implements SetupStep {
private static final Logger LOG = LoggerFactory.getLogger(GraphSchemaInitializer.class);
@Override
public void run() throws SetupException {
LOG.info("Initializing graph schema backend.");
try {
// The implementation of this method internally creates the schema.
TitanGraphProvider.getGraphInstance();
LOG.info("Completed initializing graph schema backend.");
} catch (Exception e) {
LOG.error("Could not initialize graph schema backend due to exception, {}", e.getMessage(), e);
throw new SetupException("Could not initialize graph schema due to exception", e);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.setup;
public class SetupException extends Exception {
public SetupException(String message) {
super(message);
}
public SetupException(String message, Throwable cause) {
super(message, cause);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.setup;
/**
* Represents a step that initializes some dependency of Atlas.
*
* Implementations of this should be ideally written in an idempotent way.
* In particular, if already setup, they should result in a Noop.
*/
public interface SetupStep {
void run() throws SetupException;
}
......@@ -19,7 +19,9 @@
package org.apache.atlas;
import org.apache.atlas.security.SecurityProperties;
import org.apache.atlas.setup.SetupException;
import org.apache.atlas.web.service.EmbeddedServer;
import org.apache.atlas.web.setup.AtlasSetup;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Option;
......@@ -45,6 +47,7 @@ public final class Atlas {
private static final String ATLAS_LOG_DIR = "atlas.log.dir";
public static final String ATLAS_SERVER_HTTPS_PORT = "atlas.server.https.port";
public static final String ATLAS_SERVER_HTTP_PORT = "atlas.server.http.port";
public static final String ATLAS_SERVER_RUN_SETUP_KEY = "atlas.server.run.setup.on.start";
private static EmbeddedServer server;
......@@ -103,12 +106,30 @@ public final class Atlas {
final boolean enableTLS = isTLSEnabled(enableTLSFlag, appPort);
configuration.setProperty(SecurityProperties.TLS_ENABLED, String.valueOf(enableTLS));
runSetupIfRequired(configuration);
showStartupInfo(buildConfiguration, enableTLS, appPort);
server = EmbeddedServer.newServer(appPort, appPath, enableTLS);
server.start();
}
private static void runSetupIfRequired(Configuration configuration) throws SetupException {
boolean shouldRunSetup = configuration.getBoolean(ATLAS_SERVER_RUN_SETUP_KEY, false);
if (shouldRunSetup) {
LOG.warn("Running setup per configuration {}.", ATLAS_SERVER_RUN_SETUP_KEY);
AtlasSetup atlasSetup = new AtlasSetup();
try {
atlasSetup.run();
} catch (SetupException se) {
LOG.error("Failed running setup. Will not start the server.");
throw se;
}
LOG.warn("Finished running setup.");
} else {
LOG.info("Not running setup per configuration {}.", ATLAS_SERVER_RUN_SETUP_KEY);
}
}
private static void setApplicationHome() {
if (System.getProperty(ATLAS_HOME) == null) {
System.setProperty(ATLAS_HOME, "target");
......
......@@ -24,19 +24,16 @@ import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.actors.threadpool.Arrays;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
/**
......@@ -89,25 +86,22 @@ public class ActiveInstanceState {
String atlasServerAddress = HAConfiguration.getBoundAddressForId(configuration, serverId);
HAConfiguration.ZookeeperProperties zookeeperProperties =
HAConfiguration.getZookeeperProperties(configuration);
List<ACL> acls = ZooDefs.Ids.OPEN_ACL_UNSAFE;
if (zookeeperProperties.hasAcl()) {
acls = Arrays.asList(new ACL[]{AtlasZookeeperSecurityProperties.parseAcl(zookeeperProperties.getAcl())});
}
Stat serverInfo = client.checkExists().forPath(getZnodePath());
List<ACL> acls = Arrays.asList(
new ACL[]{AtlasZookeeperSecurityProperties.parseAcl(zookeeperProperties.getAcl(),
ZooDefs.Ids.OPEN_ACL_UNSAFE.get(0))});
Stat serverInfo = client.checkExists().forPath(getZnodePath(zookeeperProperties));
if (serverInfo == null) {
client.create().
withMode(CreateMode.EPHEMERAL).
withACL(acls).
forPath(getZnodePath());
forPath(getZnodePath(zookeeperProperties));
}
client.setData().forPath(getZnodePath(),
client.setData().forPath(getZnodePath(zookeeperProperties),
atlasServerAddress.getBytes(Charset.forName("UTF-8")));
}
private String getZnodePath() {
String zkRoot = configuration.getString(HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY,
HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);
return zkRoot+APACHE_ATLAS_ACTIVE_SERVER_INFO;
private String getZnodePath(HAConfiguration.ZookeeperProperties zookeeperProperties) {
return zookeeperProperties.getZkRoot()+APACHE_ATLAS_ACTIVE_SERVER_INFO;
}
/**
......@@ -120,7 +114,9 @@ public class ActiveInstanceState {
CuratorFramework client = curatorFactory.clientInstance();
String serverAddress = null;
try {
byte[] bytes = client.getData().forPath(getZnodePath());
HAConfiguration.ZookeeperProperties zookeeperProperties =
HAConfiguration.getZookeeperProperties(configuration);
byte[] bytes = client.getData().forPath(getZnodePath(zookeeperProperties));
serverAddress = new String(bytes, Charset.forName("UTF-8"));
} catch (Exception e) {
LOG.error("Error getting active server address", e);
......
......@@ -31,6 +31,13 @@ import org.apache.zookeeper.data.Id;
*/
public class AtlasZookeeperSecurityProperties {
public static ACL parseAcl(String aclString, ACL defaultAcl) {
if (StringUtils.isEmpty(aclString)) {
return defaultAcl;
}
return parseAcl(aclString);
}
/**
* Get an {@link ACL} by parsing input string.
* @param aclString A string of the form scheme:id
......
......@@ -30,10 +30,10 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -58,6 +58,7 @@ public class CuratorFactory {
public static final String AUTH_SCHEME = "auth";
public static final String DIGEST_SCHEME = "digest";
public static final String IP_SCHEME = "ip";
public static final String SETUP_LOCK = "/setup_lock";
private final Configuration configuration;
private CuratorFramework curatorFramework;
......@@ -192,4 +193,8 @@ public class CuratorFactory {
public LeaderLatch leaderLatchInstance(String serverId, String zkRoot) {
return new LeaderLatch(curatorFramework, zkRoot+APACHE_ATLAS_LEADER_ELECTOR_PATH, serverId);
}
public InterProcessMutex lockInstance(String zkRoot) {
return new InterProcessMutex(curatorFramework, zkRoot+ SETUP_LOCK);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.setup;
import com.google.inject.Guice;
import com.google.inject.Injector;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.setup.SetupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An application that is used to setup dependencies for the Atlas web service.
*
* This should be executed immediately after installation with the same configuration
* as the Atlas web service itself. The application runs all steps registered with {@link SetupSteps}.
*/
public class AtlasSetup {
private static final Logger LOG = LoggerFactory.getLogger(AtlasSetup.class);
private final Injector injector;
public AtlasSetup() {
injector = Guice.createInjector(new AtlasSetupModule());
LOG.info("Got injector: " + injector);
}
public static void main(String[] args) {
try {
AtlasSetup atlasSetup = new AtlasSetup();
atlasSetup.run();
LOG.info("Finished running all setup steps.");
} catch (SetupException e) {
LOG.error("Could not run setup step.", e);
}
}
public void run() throws SetupException {
SetupSteps setupSteps = injector.getInstance(SetupSteps.class);
LOG.info("Got setup steps.");
try {
setupSteps.runSetup(ApplicationProperties.get());
} catch (AtlasException e) {
throw new SetupException("Cannot get application properties.", e);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.setup;
import com.google.inject.AbstractModule;
import com.google.inject.multibindings.Multibinder;
import org.apache.atlas.repository.graph.GraphSchemaInitializer;
import org.apache.atlas.setup.SetupStep;
public class AtlasSetupModule extends AbstractModule {
@Override
protected void configure() {
Multibinder<SetupStep> setupStepMultibinder = Multibinder.newSetBinder(binder(), SetupStep.class);
setupStepMultibinder.addBinding().to(GraphSchemaInitializer.class);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.setup;
import com.google.common.base.Charsets;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.AtlasServerIdSelector;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.setup.SetupException;
import org.apache.atlas.setup.SetupStep;
import org.apache.atlas.web.service.AtlasZookeeperSecurityProperties;
import org.apache.atlas.web.service.CuratorFactory;
import org.apache.commons.configuration.Configuration;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
@Singleton
public class SetupSteps {
private static final Logger LOG = LoggerFactory.getLogger(SetupSteps.class);
public static final String SETUP_IN_PROGRESS_NODE = "/setup_in_progress";
private final Set<SetupStep> setupSteps;
private CuratorFactory curatorFactory;
@Inject
public SetupSteps(Set<SetupStep> steps, CuratorFactory curatorFactory) {
setupSteps = steps;
this.curatorFactory = curatorFactory;
}
/**
* Call each registered {@link SetupStep} one after the other.
* @throws SetupException Thrown with any error during running setup, including Zookeeper interactions, and
* individual failures in the {@link SetupStep}.
* @param configuration Configuration for Atlas server.
*/
public void runSetup(Configuration configuration) throws SetupException {
HAConfiguration.ZookeeperProperties zookeeperProperties = HAConfiguration.getZookeeperProperties(configuration);
InterProcessMutex lock = curatorFactory.lockInstance(zookeeperProperties.getZkRoot());
try {
LOG.info("Trying to acquire lock for running setup.");
lock.acquire();
LOG.info("Acquired lock for running setup.");
handleSetupInProgress(configuration, zookeeperProperties);
for (SetupStep step : setupSteps) {
LOG.info("Running setup step: " + step);
step.run();
}
clearSetupInProgress(zookeeperProperties);
} catch (SetupException se) {
LOG.error("Got setup exception while trying to setup", se);
throw se;
} catch (Exception e) {
LOG.error("Error running setup steps", e);
throw new SetupException("Error running setup steps", e);
} finally {
releaseLock(lock);
curatorFactory.close();
}
}
private void handleSetupInProgress(Configuration configuration, HAConfiguration.ZookeeperProperties zookeeperProperties) throws SetupException {
if (setupInProgress(zookeeperProperties)) {
throw new SetupException("A previous setup run may not have completed cleanly. " +
"Ensure setup can run and retry after clearing the zookeeper node at " +
lockPath(zookeeperProperties));
}
createSetupInProgressNode(configuration, zookeeperProperties);
}
private void releaseLock(InterProcessMutex lock) {
try {
lock.release();
LOG.info("Released lock after running setup.");
} catch (Exception e) {
LOG.error("Error releasing acquired lock.", e);
}
}
private boolean setupInProgress(HAConfiguration.ZookeeperProperties zookeeperProperties) {
CuratorFramework client = curatorFactory.clientInstance();
Stat lockInProgressStat;
String path = lockPath(zookeeperProperties);
try {
lockInProgressStat = client.checkExists().forPath(path);
return lockInProgressStat != null;
} catch (Exception e) {
LOG.error("Error checking if path {} exists.", path, e);
return true;
}
}
private void clearSetupInProgress(HAConfiguration.ZookeeperProperties zookeeperProperties)
throws Exception {
CuratorFramework client = curatorFactory.clientInstance();
String path = lockPath(zookeeperProperties);
client.delete().forPath(path);
LOG.info("Deleted lock path after completing setup {}", path);
}
private String lockPath(HAConfiguration.ZookeeperProperties zookeeperProperties) {
return zookeeperProperties.getZkRoot()+ SETUP_IN_PROGRESS_NODE;
}
private String getServerId(Configuration configuration) {
String serverId = configuration.getString(AtlasConstants.ATLAS_REST_ADDRESS_KEY,
AtlasConstants.DEFAULT_ATLAS_REST_ADDRESS);
try {
serverId = AtlasServerIdSelector.selectServerId(configuration);
} catch (AtlasException e) {
LOG.error("Could not select server id, defaulting to {}", serverId, e);
}
return serverId;
}
private void createSetupInProgressNode(Configuration configuration,
HAConfiguration.ZookeeperProperties zookeeperProperties)
throws SetupException {
String serverId = getServerId(configuration);
ACL acl = AtlasZookeeperSecurityProperties.parseAcl(zookeeperProperties.getAcl(),
ZooDefs.Ids.OPEN_ACL_UNSAFE.get(0));
List<ACL> acls = Arrays.asList(new ACL[]{acl});
CuratorFramework client = curatorFactory.clientInstance();
try {
String path = lockPath(zookeeperProperties);
client.create().withACL(acls).forPath(path, serverId.getBytes(Charsets.UTF_8));
LOG.info("Created lock node {}", path);
} catch (Exception e) {
throw new SetupException("Could not create lock node before running setup.", e);
}
}
}
......@@ -62,4 +62,10 @@ public class AtlasZookeeperSecurityPropertiesTest {
assertEquals(authInfo.getScheme(), "digest");
assertEquals(authInfo.getAuth(), "user:password".getBytes(Charsets.UTF_8));
}
@Test
public void shouldReturnDefaultAclIfNullOrEmpty() {
ACL acl = AtlasZookeeperSecurityProperties.parseAcl(null, ZooDefs.Ids.OPEN_ACL_UNSAFE.get(0));
assertEquals(acl, ZooDefs.Ids.OPEN_ACL_UNSAFE.get(0));
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment