Commit 0fc20719 by Suma Shivaprasad

ATLAS-513 Admin support for HA (yhemanth via sumasai)

parent 8bde666b
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.configuration.Configuration;
/**
* An application that allows users to run admin commands against an Atlas server.
*
* The application uses {@link AtlasClient} to send REST requests to the Atlas server. The details of connections
* and other configuration is specified in the Atlas properties file.
* Exit status of the application will be as follows:
* <li>0: successful execution</li>
* <li>1: error in options used for the application</li>
* <li>-1/255: application error</li>
*/
public class AtlasAdminClient {
private static final Option STATUS = new Option("status", false, "Get the status of an atlas instance");
private static final Options OPTIONS = new Options();
private static final int INVALID_OPTIONS_STATUS = 1;
private static final int PROGRAM_ERROR_STATUS = -1;
static {
OPTIONS.addOption(STATUS);
}
public static void main(String[] args) throws AtlasException, ParseException {
AtlasAdminClient atlasAdminClient = new AtlasAdminClient();
int result = atlasAdminClient.run(args);
System.exit(result);
}
private int run(String[] args) throws AtlasException {
CommandLine commandLine = parseCommandLineOptions(args);
Configuration configuration = ApplicationProperties.get();
String atlasServerUri = configuration.getString(
AtlasConstants.ATLAS_REST_ADDRESS_KEY, AtlasConstants.DEFAULT_ATLAS_REST_ADDRESS);
AtlasClient atlasClient = new AtlasClient(atlasServerUri, null, null);
return handleCommand(commandLine, atlasServerUri, atlasClient);
}
private int handleCommand(CommandLine commandLine, String atlasServerUri, AtlasClient atlasClient) {
int cmdStatus = PROGRAM_ERROR_STATUS;
if (commandLine.hasOption(STATUS.getOpt())) {
try {
System.out.println(atlasClient.getAdminStatus());
cmdStatus = 0;
} catch (AtlasServiceException e) {
System.err.println("Could not retrieve status of the server at " + atlasServerUri);
printStandardHttpErrorDetails(e);
}
} else {
System.err.println("Unsupported option. Refer to usage for valid options.");
printUsage(INVALID_OPTIONS_STATUS);
}
return cmdStatus;
}
private void printStandardHttpErrorDetails(AtlasServiceException e) {
System.err.println("Error details: ");
System.err.println("HTTP Status: " + e.getStatus().getStatusCode() + ","
+ e.getStatus().getReasonPhrase());
System.err.println("Exception message: " + e.getMessage());
}
private CommandLine parseCommandLineOptions(String[] args) {
if (args.length == 0) {
printUsage(INVALID_OPTIONS_STATUS);
}
CommandLineParser parser = new GnuParser();
CommandLine commandLine = null;
try {
commandLine = parser.parse(OPTIONS, args);
} catch (ParseException e) {
System.err.println("Could not parse command line options.");
printUsage(INVALID_OPTIONS_STATUS);
}
return commandLine;
}
private void printUsage(int statusCode) {
HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.printHelp("atlas_admin.py", OPTIONS);
System.exit(statusCode);
}
}
...@@ -70,6 +70,7 @@ public class AtlasClient { ...@@ -70,6 +70,7 @@ public class AtlasClient {
public static final String BASE_URI = "api/atlas/"; public static final String BASE_URI = "api/atlas/";
public static final String ADMIN_VERSION = "admin/version"; public static final String ADMIN_VERSION = "admin/version";
public static final String ADMIN_STATUS = "admin/status";
public static final String TYPES = "types"; public static final String TYPES = "types";
public static final String URI_ENTITY = "entities"; public static final String URI_ENTITY = "entities";
public static final String URI_SEARCH = "discovery/search"; public static final String URI_SEARCH = "discovery/search";
...@@ -88,6 +89,7 @@ public class AtlasClient { ...@@ -88,6 +89,7 @@ public class AtlasClient {
public static final String REFERENCEABLE_ATTRIBUTE_NAME = "qualifiedName"; public static final String REFERENCEABLE_ATTRIBUTE_NAME = "qualifiedName";
public static final String JSON_MEDIA_TYPE = MediaType.APPLICATION_JSON + "; charset=UTF-8"; public static final String JSON_MEDIA_TYPE = MediaType.APPLICATION_JSON + "; charset=UTF-8";
public static final String UNKNOWN_STATUS = "Unknown status";
private WebResource service; private WebResource service;
...@@ -154,10 +156,30 @@ public class AtlasClient { ...@@ -154,10 +156,30 @@ public class AtlasClient {
} }
} }
/**
* Return status of the service instance the client is pointing to.
*
* @return One of the values in ServiceState.ServiceStateValue or {@link #UNKNOWN_STATUS} if there is a JSON parse
* exception
* @throws AtlasServiceException if there is a HTTP error.
*/
public String getAdminStatus() throws AtlasServiceException {
String result = UNKNOWN_STATUS;
WebResource resource = getResource(API.STATUS);
JSONObject response = callAPIWithResource(API.STATUS, resource);
try {
result = response.getString("Status");
} catch (JSONException e) {
LOG.error("Exception while parsing admin status response. Returned response {}", response.toString(), e);
}
return result;
}
public enum API { public enum API {
//Admin operations //Admin operations
VERSION(BASE_URI + ADMIN_VERSION, HttpMethod.GET, Response.Status.OK), VERSION(BASE_URI + ADMIN_VERSION, HttpMethod.GET, Response.Status.OK),
STATUS(BASE_URI + ADMIN_STATUS, HttpMethod.GET, Response.Status.OK),
//Type operations //Type operations
CREATE_TYPE(BASE_URI + TYPES, HttpMethod.POST, Response.Status.CREATED), CREATE_TYPE(BASE_URI + TYPES, HttpMethod.POST, Response.Status.CREATED),
......
...@@ -28,6 +28,7 @@ import static org.junit.Assert.assertFalse; ...@@ -28,6 +28,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail; import static org.testng.Assert.fail;
public class AtlasClientTest { public class AtlasClientTest {
...@@ -37,7 +38,7 @@ public class AtlasClientTest { ...@@ -37,7 +38,7 @@ public class AtlasClientTest {
WebResource webResource = mock(WebResource.class); WebResource webResource = mock(WebResource.class);
AtlasClient atlasClient = new AtlasClient(webResource); AtlasClient atlasClient = new AtlasClient(webResource);
WebResource.Builder builder = setupBuilder(webResource); WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, webResource);
ClientResponse response = mock(ClientResponse.class); ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(response.getEntity(String.class)).thenReturn("{\"Version\":\"version-rrelease\",\"Name\":\"apache-atlas\"," + when(response.getEntity(String.class)).thenReturn("{\"Version\":\"version-rrelease\",\"Name\":\"apache-atlas\"," +
...@@ -47,9 +48,9 @@ public class AtlasClientTest { ...@@ -47,9 +48,9 @@ public class AtlasClientTest {
assertTrue(atlasClient.isServerReady()); assertTrue(atlasClient.isServerReady());
} }
private WebResource.Builder setupBuilder(WebResource webResource) { private WebResource.Builder setupBuilder(AtlasClient.API api, WebResource webResource) {
WebResource adminVersionResource = mock(WebResource.class); WebResource adminVersionResource = mock(WebResource.class);
when(webResource.path(AtlasClient.API.VERSION.getPath())).thenReturn(adminVersionResource); when(webResource.path(api.getPath())).thenReturn(adminVersionResource);
WebResource.Builder builder = mock(WebResource.Builder.class); WebResource.Builder builder = mock(WebResource.Builder.class);
when(adminVersionResource.accept(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder); when(adminVersionResource.accept(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder);
when(builder.type(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder); when(builder.type(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder);
...@@ -60,7 +61,7 @@ public class AtlasClientTest { ...@@ -60,7 +61,7 @@ public class AtlasClientTest {
public void shouldReturnFalseIfServerIsNotReady() throws AtlasServiceException { public void shouldReturnFalseIfServerIsNotReady() throws AtlasServiceException {
WebResource webResource = mock(WebResource.class); WebResource webResource = mock(WebResource.class);
AtlasClient atlasClient = new AtlasClient(webResource); AtlasClient atlasClient = new AtlasClient(webResource);
WebResource.Builder builder = setupBuilder(webResource); WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, webResource);
when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenThrow( when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenThrow(
new ClientHandlerException()); new ClientHandlerException());
assertFalse(atlasClient.isServerReady()); assertFalse(atlasClient.isServerReady());
...@@ -70,7 +71,7 @@ public class AtlasClientTest { ...@@ -70,7 +71,7 @@ public class AtlasClientTest {
public void shouldReturnFalseIfServiceIsUnavailable() throws AtlasServiceException { public void shouldReturnFalseIfServiceIsUnavailable() throws AtlasServiceException {
WebResource webResource = mock(WebResource.class); WebResource webResource = mock(WebResource.class);
AtlasClient atlasClient = new AtlasClient(webResource); AtlasClient atlasClient = new AtlasClient(webResource);
WebResource.Builder builder = setupBuilder(webResource); WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, webResource);
ClientResponse response = mock(ClientResponse.class); ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.SERVICE_UNAVAILABLE.getStatusCode()); when(response.getStatus()).thenReturn(Response.Status.SERVICE_UNAVAILABLE.getStatusCode());
when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.SERVICE_UNAVAILABLE); when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.SERVICE_UNAVAILABLE);
...@@ -84,7 +85,7 @@ public class AtlasClientTest { ...@@ -84,7 +85,7 @@ public class AtlasClientTest {
public void shouldThrowErrorIfAnyResponseOtherThanServiceUnavailable() throws AtlasServiceException { public void shouldThrowErrorIfAnyResponseOtherThanServiceUnavailable() throws AtlasServiceException {
WebResource webResource = mock(WebResource.class); WebResource webResource = mock(WebResource.class);
AtlasClient atlasClient = new AtlasClient(webResource); AtlasClient atlasClient = new AtlasClient(webResource);
WebResource.Builder builder = setupBuilder(webResource); WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, webResource);
ClientResponse response = mock(ClientResponse.class); ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); when(response.getStatus()).thenReturn(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.INTERNAL_SERVER_ERROR); when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.INTERNAL_SERVER_ERROR);
...@@ -94,4 +95,49 @@ public class AtlasClientTest { ...@@ -94,4 +95,49 @@ public class AtlasClientTest {
atlasClient.isServerReady(); atlasClient.isServerReady();
fail("Should throw exception"); fail("Should throw exception");
} }
@Test
public void shouldGetAdminStatus() throws AtlasServiceException {
WebResource webResource = mock(WebResource.class);
AtlasClient atlasClient = new AtlasClient(webResource);
WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, webResource);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(response.getEntity(String.class)).thenReturn("{\"Status\":\"Active\"}");
when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).thenReturn(response);
String status = atlasClient.getAdminStatus();
assertEquals(status, "Active");
}
@Test(expectedExceptions = AtlasServiceException.class)
public void shouldReturnStatusAsUnknownOnException() throws AtlasServiceException {
WebResource webResource = mock(WebResource.class);
AtlasClient atlasClient = new AtlasClient(webResource);
WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, webResource);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.INTERNAL_SERVER_ERROR);
when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).thenReturn(response);
String status = atlasClient.getAdminStatus();
fail("Should fail with AtlasServiceException");
}
@Test
public void shouldReturnStatusAsUnknownIfJSONIsInvalid() throws AtlasServiceException {
WebResource webResource = mock(WebResource.class);
AtlasClient atlasClient = new AtlasClient(webResource);
WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, webResource);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(response.getEntity(String.class)).thenReturn("{\"status\":\"Active\"}");
when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).thenReturn(response);
String status = atlasClient.getAdminStatus();
assertEquals(status, AtlasClient.UNKNOWN_STATUS);
}
} }
...@@ -30,4 +30,6 @@ public final class AtlasConstants { ...@@ -30,4 +30,6 @@ public final class AtlasConstants {
public static final String CLUSTER_NAME_ATTRIBUTE = "clusterName"; public static final String CLUSTER_NAME_ATTRIBUTE = "clusterName";
public static final String SYSTEM_PROPERTY_APP_PORT = "atlas.app.port"; public static final String SYSTEM_PROPERTY_APP_PORT = "atlas.app.port";
public static final String DEFAULT_APP_PORT_STR = "21000"; public static final String DEFAULT_APP_PORT_STR = "21000";
public static final String ATLAS_REST_ADDRESS_KEY = "atlas.rest.address";
public static final String DEFAULT_ATLAS_REST_ADDRESS = "http://localhost:21000";
} }
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import atlas_config as mc
import atlas_client_cmdline as cmdline
def main():
conf_dir = cmdline.setup_conf_dir()
jvm_opts_list = cmdline.setup_jvm_opts_list(conf_dir, 'atlas_admin.log')
atlas_classpath = cmdline.get_atlas_classpath(conf_dir)
process = mc.java("org.apache.atlas.AtlasAdminClient", sys.argv[1:], atlas_classpath, jvm_opts_list)
return process.wait()
if __name__ == '__main__':
try:
returncode = main()
except Exception as e:
print "Exception: %s " % str(e)
returncode = -1
sys.exit(returncode)
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import atlas_config as mc
ATLAS_COMMAND_OPTS="-Datlas.home=%s"
ATLAS_LOG_OPTS="-Datlas.log.dir=%s -Datlas.log.file=%s"
DEFAULT_JVM_OPTS="-Xmx1024m -Dlog4j.configuration=atlas-log4j.xml"
def setup_conf_dir():
atlas_home = mc.atlasDir()
return mc.dirMustExist(mc.confDir(atlas_home))
def get_atlas_classpath(confdir):
atlas_home = mc.atlasDir()
web_app_dir = mc.webAppDir(atlas_home)
mc.expandWebApp(atlas_home)
p = os.pathsep
atlas_classpath = confdir + p \
+ os.path.join(web_app_dir, "atlas", "WEB-INF", "classes") + p \
+ os.path.join(web_app_dir, "atlas", "WEB-INF", "lib", "*") + p \
+ os.path.join(atlas_home, "libext", "*")
if mc.isCygwin():
atlas_classpath = mc.convertCygwinPath(atlas_classpath, True)
return atlas_classpath
def setup_jvm_opts_list(confdir, log_name):
atlas_home = mc.atlasDir()
mc.executeEnvSh(confdir)
logdir = mc.dirMustExist(mc.logDir(atlas_home))
if mc.isCygwin():
# Pathnames that are passed to JVM must be converted to Windows format.
jvm_atlas_home = mc.convertCygwinPath(atlas_home)
jvm_logdir = mc.convertCygwinPath(logdir)
else:
jvm_atlas_home = atlas_home
jvm_logdir = logdir
# create sys property for conf dirs
jvm_opts_list = (ATLAS_LOG_OPTS % (jvm_logdir, log_name)).split()
cmd_opts = (ATLAS_COMMAND_OPTS % jvm_atlas_home)
jvm_opts_list.extend(cmd_opts.split())
atlas_jvm_opts = os.environ.get(mc.ATLAS_OPTS, DEFAULT_JVM_OPTS)
jvm_opts_list.extend(atlas_jvm_opts.split())
return jvm_opts_list
...@@ -19,46 +19,13 @@ import os ...@@ -19,46 +19,13 @@ import os
import sys import sys
import atlas_config as mc import atlas_config as mc
import atlas_client_cmdline as cmdline
ATLAS_LOG_OPTS="-Datlas.log.dir=%s -Datlas.log.file=quick_start.log"
ATLAS_COMMAND_OPTS="-Datlas.home=%s"
DEFAULT_JVM_OPTS="-Xmx1024m -Dlog4j.configuration=atlas-log4j.xml"
def main(): def main():
atlas_home = mc.atlasDir() conf_dir = cmdline.setup_conf_dir()
confdir = mc.dirMustExist(mc.confDir(atlas_home)) jvm_opts_list = cmdline.setup_jvm_opts_list(conf_dir, 'quick_start.log')
mc.executeEnvSh(confdir) atlas_classpath = cmdline.get_atlas_classpath(conf_dir)
logdir = mc.dirMustExist(mc.logDir(atlas_home))
if mc.isCygwin():
# Pathnames that are passed to JVM must be converted to Windows format.
jvm_atlas_home = mc.convertCygwinPath(atlas_home)
jvm_logdir = mc.convertCygwinPath(logdir)
else:
jvm_atlas_home = atlas_home
jvm_logdir = logdir
#create sys property for conf dirs
jvm_opts_list = (ATLAS_LOG_OPTS % jvm_logdir).split()
cmd_opts = (ATLAS_COMMAND_OPTS % jvm_atlas_home)
jvm_opts_list.extend(cmd_opts.split())
default_jvm_opts = DEFAULT_JVM_OPTS
atlas_jvm_opts = os.environ.get(mc.ATLAS_OPTS, default_jvm_opts)
jvm_opts_list.extend(atlas_jvm_opts.split())
#expand web app dir
web_app_dir = mc.webAppDir(atlas_home)
mc.expandWebApp(atlas_home)
p = os.pathsep
atlas_classpath = confdir + p \
+ os.path.join(web_app_dir, "atlas", "WEB-INF", "classes" ) + p \
+ os.path.join(web_app_dir, "atlas", "WEB-INF", "lib", "*" ) + p \
+ os.path.join(atlas_home, "libext", "*")
if mc.isCygwin():
atlas_classpath = mc.convertCygwinPath(atlas_classpath, True)
process = mc.java("org.apache.atlas.examples.QuickStart", sys.argv[1:], atlas_classpath, jvm_opts_list) process = mc.java("org.apache.atlas.examples.QuickStart", sys.argv[1:], atlas_classpath, jvm_opts_list)
return process.wait() return process.wait()
......
...@@ -99,9 +99,10 @@ atlas.rest.address=http://localhost:21000 ...@@ -99,9 +99,10 @@ atlas.rest.address=http://localhost:21000
######### High Availability Configuration ######## ######### High Availability Configuration ########
atlas.server.ha.enabled=false atlas.server.ha.enabled=false
atlas.server.ids=id1 #### Enabled the configs below as per need if HA is enabled #####
atlas.server.address.id1=localhost:21000 #atlas.server.ids=id1
atlas.server.ha.zookeeper.connect=localhost:2181 #atlas.server.address.id1=localhost:21000
atlas.server.ha.zookeeper.retry.sleeptime.ms=1000 #atlas.server.ha.zookeeper.connect=localhost:2181
atlas.server.ha.zookeeper.num.retries=3 #atlas.server.ha.zookeeper.retry.sleeptime.ms=1000
atlas.server.ha.zookeeper.session.timeout.ms=20000 #atlas.server.ha.zookeeper.num.retries=3
\ No newline at end of file #atlas.server.ha.zookeeper.session.timeout.ms=20000
...@@ -13,6 +13,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ...@@ -13,6 +13,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES: ALL CHANGES:
ATLAS-513 Admin support for HA (yhemanth via sumasai)
ATLAS-511 Ability to run multiple instances of Atlas Server with automatic failover to one active server (yhemanth via shwethags) ATLAS-511 Ability to run multiple instances of Atlas Server with automatic failover to one active server (yhemanth via shwethags)
ATLAS-577 Integrate entity audit with DefaultMetadataService (shwethags) ATLAS-577 Integrate entity audit with DefaultMetadataService (shwethags)
ATLAS-588 import-hive.sh fails while importing partitions for a non-partitioned table (sumasai via shwethags) ATLAS-588 import-hive.sh fails while importing partitions for a non-partitioned table (sumasai via shwethags)
......
...@@ -90,5 +90,5 @@ hbase.security.authentication=simple ...@@ -90,5 +90,5 @@ hbase.security.authentication=simple
atlas.hook.falcon.synchronous=true atlas.hook.falcon.synchronous=true
######### High Availability Configuration ######## ######### High Availability Configuration ########
atlas.server.ha.enabled=false atlas.server.ha.enabled=false
atlas.server.ids=id1 #atlas.server.ids=id1
atlas.server.address.id1=localhost:21000 #atlas.server.address.id1=localhost:21000
...@@ -77,7 +77,11 @@ public class ActiveServerFilter implements Filter { ...@@ -77,7 +77,11 @@ public class ActiveServerFilter implements Filter {
@Override @Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse,
FilterChain filterChain) throws IOException, ServletException { FilterChain filterChain) throws IOException, ServletException {
if (isInstanceActive()) { if (isFilteredURI(servletRequest)) {
LOG.debug("Is a filtered URI: {}. Passing request downstream.",
((HttpServletRequest)servletRequest).getRequestURI());
filterChain.doFilter(servletRequest, servletResponse);
} else if (isInstanceActive()) {
LOG.debug("Active. Passing request downstream"); LOG.debug("Active. Passing request downstream");
filterChain.doFilter(servletRequest, servletResponse); filterChain.doFilter(servletRequest, servletResponse);
} else if (serviceState.isInstanceInTransition()) { } else if (serviceState.isInstanceInTransition()) {
...@@ -97,6 +101,12 @@ public class ActiveServerFilter implements Filter { ...@@ -97,6 +101,12 @@ public class ActiveServerFilter implements Filter {
} }
} }
private boolean isFilteredURI(ServletRequest servletRequest) {
HttpServletRequest httpServletRequest = (HttpServletRequest) servletRequest;
String requestURI = httpServletRequest.getRequestURI();
return requestURI.contains("/admin/");
}
boolean isInstanceActive() { boolean isInstanceActive() {
return serviceState.getState() == ServiceState.ServiceStateValue.ACTIVE; return serviceState.getState() == ServiceState.ServiceStateValue.ACTIVE;
} }
......
...@@ -18,6 +18,8 @@ ...@@ -18,6 +18,8 @@
package org.apache.atlas.web.resources; package org.apache.atlas.web.resources;
import com.google.inject.Inject;
import org.apache.atlas.web.service.ServiceState;
import org.apache.atlas.web.util.Servlets; import org.apache.atlas.web.util.Servlets;
import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.configuration.PropertiesConfiguration;
...@@ -41,6 +43,12 @@ import javax.ws.rs.core.Response; ...@@ -41,6 +43,12 @@ import javax.ws.rs.core.Response;
public class AdminResource { public class AdminResource {
private Response version; private Response version;
private ServiceState serviceState;
@Inject
public AdminResource(ServiceState serviceState) {
this.serviceState = serviceState;
}
/** /**
* Fetches the thread stack dump for this application. * Fetches the thread stack dump for this application.
...@@ -98,4 +106,18 @@ public class AdminResource { ...@@ -98,4 +106,18 @@ public class AdminResource {
return version; return version;
} }
@GET
@Path("status")
@Produces(Servlets.JSON_MEDIA_TYPE)
public Response getStatus() {
JSONObject responseData = new JSONObject();
try {
responseData.put("Status", serviceState.getState().toString());
Response response = Response.ok(responseData).build();
return response;
} catch (JSONException e) {
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
}
}
} }
...@@ -34,6 +34,7 @@ import javax.ws.rs.HttpMethod; ...@@ -34,6 +34,7 @@ import javax.ws.rs.HttpMethod;
import java.io.IOException; import java.io.IOException;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertTrue;
...@@ -64,6 +65,8 @@ public class ActiveServerFilterTest { ...@@ -64,6 +65,8 @@ public class ActiveServerFilterTest {
@Test @Test
public void testShouldPassThroughRequestsIfActive() throws IOException, ServletException { public void testShouldPassThroughRequestsIfActive() throws IOException, ServletException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE); when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
when(servletRequest.getRequestURI()).thenReturn("api/atlas/types");
ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState); ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState);
activeServerFilter.doFilter(servletRequest, servletResponse, filterChain); activeServerFilter.doFilter(servletRequest, servletResponse, filterChain);
...@@ -74,6 +77,8 @@ public class ActiveServerFilterTest { ...@@ -74,6 +77,8 @@ public class ActiveServerFilterTest {
@Test @Test
public void testShouldFailIfCannotRetrieveActiveServerAddress() throws IOException, ServletException { public void testShouldFailIfCannotRetrieveActiveServerAddress() throws IOException, ServletException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
when(servletRequest.getRequestURI()).thenReturn("api/atlas/types");
ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState); ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState);
when(activeInstanceState.getActiveServerAddress()).thenReturn(null); when(activeInstanceState.getActiveServerAddress()).thenReturn(null);
...@@ -86,6 +91,8 @@ public class ActiveServerFilterTest { ...@@ -86,6 +91,8 @@ public class ActiveServerFilterTest {
@Test @Test
public void testShouldRedirectRequestToActiveServerAddress() throws IOException, ServletException { public void testShouldRedirectRequestToActiveServerAddress() throws IOException, ServletException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
when(servletRequest.getRequestURI()).thenReturn("api/atlas/types");
ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState); ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState);
when(activeInstanceState.getActiveServerAddress()).thenReturn(ACTIVE_SERVER_ADDRESS); when(activeInstanceState.getActiveServerAddress()).thenReturn(ACTIVE_SERVER_ADDRESS);
...@@ -100,6 +107,8 @@ public class ActiveServerFilterTest { ...@@ -100,6 +107,8 @@ public class ActiveServerFilterTest {
@Test @Test
public void testRedirectedRequestShouldContainQueryParameters() throws IOException, ServletException { public void testRedirectedRequestShouldContainQueryParameters() throws IOException, ServletException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
when(servletRequest.getRequestURI()).thenReturn("api/atlas/types");
ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState); ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState);
when(activeInstanceState.getActiveServerAddress()).thenReturn(ACTIVE_SERVER_ADDRESS); when(activeInstanceState.getActiveServerAddress()).thenReturn(ACTIVE_SERVER_ADDRESS);
...@@ -116,6 +125,8 @@ public class ActiveServerFilterTest { ...@@ -116,6 +125,8 @@ public class ActiveServerFilterTest {
@Test @Test
public void testShouldRedirectPOSTRequest() throws IOException, ServletException { public void testShouldRedirectPOSTRequest() throws IOException, ServletException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
when(servletRequest.getRequestURI()).thenReturn("api/atlas/types");
ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState); ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState);
when(activeInstanceState.getActiveServerAddress()).thenReturn(ACTIVE_SERVER_ADDRESS); when(activeInstanceState.getActiveServerAddress()).thenReturn(ACTIVE_SERVER_ADDRESS);
...@@ -131,6 +142,8 @@ public class ActiveServerFilterTest { ...@@ -131,6 +142,8 @@ public class ActiveServerFilterTest {
@Test @Test
public void testShouldRedirectPUTRequest() throws IOException, ServletException { public void testShouldRedirectPUTRequest() throws IOException, ServletException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
when(servletRequest.getRequestURI()).thenReturn("api/atlas/types");
ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState); ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState);
when(activeInstanceState.getActiveServerAddress()).thenReturn(ACTIVE_SERVER_ADDRESS); when(activeInstanceState.getActiveServerAddress()).thenReturn(ACTIVE_SERVER_ADDRESS);
...@@ -146,6 +159,8 @@ public class ActiveServerFilterTest { ...@@ -146,6 +159,8 @@ public class ActiveServerFilterTest {
@Test @Test
public void testShouldRedirectDELETERequest() throws IOException, ServletException { public void testShouldRedirectDELETERequest() throws IOException, ServletException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
when(servletRequest.getRequestURI()).thenReturn("api/atlas/types");
ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState); ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState);
when(activeInstanceState.getActiveServerAddress()).thenReturn(ACTIVE_SERVER_ADDRESS); when(activeInstanceState.getActiveServerAddress()).thenReturn(ACTIVE_SERVER_ADDRESS);
...@@ -163,10 +178,26 @@ public class ActiveServerFilterTest { ...@@ -163,10 +178,26 @@ public class ActiveServerFilterTest {
@Test @Test
public void testShouldReturnServiceUnavailableIfStateBecomingActive() throws IOException, ServletException { public void testShouldReturnServiceUnavailableIfStateBecomingActive() throws IOException, ServletException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.BECOMING_ACTIVE); when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.BECOMING_ACTIVE);
when(servletRequest.getRequestURI()).thenReturn("api/atlas/types");
ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState); ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState);
activeServerFilter.doFilter(servletRequest, servletResponse, filterChain); activeServerFilter.doFilter(servletRequest, servletResponse, filterChain);
verify(servletResponse).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); verify(servletResponse).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
} }
@Test
public void testShouldNotRedirectAdminAPIs() throws IOException, ServletException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
when(servletRequest.getMethod()).thenReturn(HttpMethod.GET);
when(servletRequest.getRequestURI()).
thenReturn("api/atlas/admin/asmasn"); // any Admin URI is fine.
ActiveServerFilter activeServerFilter = new ActiveServerFilter(activeInstanceState, serviceState);
activeServerFilter.doFilter(servletRequest, servletResponse, filterChain);
verify(filterChain).doFilter(servletRequest, servletResponse);
verifyZeroInteractions(activeInstanceState);
}
} }
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.resources;
import org.apache.atlas.web.service.ServiceState;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
public class AdminResourceTest {
@Mock
private ServiceState serviceState;
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Test
public void testStatusOfActiveServerIsReturned() throws JSONException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
AdminResource adminResource = new AdminResource(serviceState);
Response response = adminResource.getStatus();
assertEquals(response.getStatus(), HttpServletResponse.SC_OK);
JSONObject entity = (JSONObject) response.getEntity();
assertEquals(entity.get("Status"), "ACTIVE");
}
@Test
public void testResourceGetsValueFromServiceState() throws JSONException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
AdminResource adminResource = new AdminResource(serviceState);
Response response = adminResource.getStatus();
verify(serviceState).getState();
JSONObject entity = (JSONObject) response.getEntity();
assertEquals(entity.get("Status"), "PASSIVE");
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment