Commit 758b3d4d by apoorvnaik Committed by Madhan Neethiraj

ATLAS-1267: Client for V2 APIs and TypesREST integration tests

parent f6388234
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.api.client.filter.HTTPBasicAuthFilter;
import com.sun.jersey.api.json.JSONConfiguration;
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
import org.apache.atlas.security.SecureClientUtils;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import java.io.IOException;
import java.net.ConnectException;
import java.util.List;
import java.util.Map;
import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED;
public abstract class AtlasBaseClient {
public static final String BASE_URI = "api/atlas/";
public static final String TYPES = "types";
public static final String ADMIN_VERSION = "admin/version";
public static final String ADMIN_STATUS = "admin/status";
public static final String HTTP_AUTHENTICATION_ENABLED = "atlas.http.authentication.enabled";
//Admin operations
public static final APIInfo VERSION = new APIInfo(BASE_URI + ADMIN_VERSION, HttpMethod.GET, Response.Status.OK);
public static final APIInfo STATUS = new APIInfo(BASE_URI + ADMIN_STATUS, HttpMethod.GET, Response.Status.OK);
static final String JSON_MEDIA_TYPE = MediaType.APPLICATION_JSON + "; charset=UTF-8";
static final String UNKNOWN_STATUS = "Unknown status";
static final String ATLAS_CLIENT_HA_RETRIES_KEY = "atlas.client.ha.retries";
// Setting the default value based on testing failovers while client code like quickstart is running.
static final int DEFAULT_NUM_RETRIES = 4;
static final String ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms";
// Setting the default value based on testing failovers while client code like quickstart is running.
// With number of retries, this gives a total time of about 20s for the server to start.
static final int DEFAULT_SLEEP_BETWEEN_RETRIES_MS = 5000;
private static final Logger LOG = LoggerFactory.getLogger(AtlasBaseClient.class);
protected WebResource service;
protected Configuration configuration;
private String basicAuthUser;
private String basicAuthPassword;
private AtlasClientContext atlasClientContext;
private boolean retryEnabled = false;
protected AtlasBaseClient() {}
protected AtlasBaseClient(String[] baseUrl, String[] basicAuthUserNamePassword) {
if (basicAuthUserNamePassword != null) {
if (basicAuthUserNamePassword.length > 0) {
this.basicAuthUser = basicAuthUserNamePassword[0];
}
if (basicAuthUserNamePassword.length > 1) {
this.basicAuthPassword = basicAuthUserNamePassword[1];
}
}
initializeState(baseUrl, null, null);
}
protected AtlasBaseClient(String... baseUrls) throws AtlasException {
this(getCurrentUGI(), baseUrls);
}
protected AtlasBaseClient(UserGroupInformation ugi, String[] baseUrls) {
this(ugi, ugi.getShortUserName(), baseUrls);
}
protected AtlasBaseClient(UserGroupInformation ugi, String doAsUser, String[] baseUrls) {
initializeState(baseUrls, ugi, doAsUser);
}
@VisibleForTesting
protected AtlasBaseClient(WebResource service, Configuration configuration) {
this.service = service;
this.configuration = configuration;
}
@VisibleForTesting
protected AtlasBaseClient(Configuration configuration, String[] baseUrl, String[] basicAuthUserNamePassword) {
if (basicAuthUserNamePassword != null) {
if (basicAuthUserNamePassword.length > 0) {
this.basicAuthUser = basicAuthUserNamePassword[0];
}
if (basicAuthUserNamePassword.length > 1) {
this.basicAuthPassword = basicAuthUserNamePassword[1];
}
}
initializeState(configuration, baseUrl, null, null);
}
protected static UserGroupInformation getCurrentUGI() throws AtlasException {
try {
return UserGroupInformation.getCurrentUser();
} catch (IOException e) {
throw new AtlasException(e);
}
}
void initializeState(String[] baseUrls, UserGroupInformation ugi, String doAsUser) {
initializeState(getClientProperties(), baseUrls, ugi, doAsUser);
}
void initializeState(Configuration configuration, String[] baseUrls, UserGroupInformation ugi, String doAsUser) {
this.configuration = configuration;
Client client = getClient(configuration, ugi, doAsUser);
if ((!AuthenticationUtil.isKerberosAuthenticationEnabled()) && basicAuthUser != null && basicAuthPassword != null) {
final HTTPBasicAuthFilter authFilter = new HTTPBasicAuthFilter(basicAuthUser, basicAuthPassword);
client.addFilter(authFilter);
}
String activeServiceUrl = determineActiveServiceURL(baseUrls, client);
atlasClientContext = new AtlasClientContext(baseUrls, client, ugi, doAsUser);
service = client.resource(UriBuilder.fromUri(activeServiceUrl).build());
}
@VisibleForTesting
protected Client getClient(Configuration configuration, UserGroupInformation ugi, String doAsUser) {
DefaultClientConfig config = new DefaultClientConfig();
// Enable POJO mapping feature
config.getFeatures().put(JSONConfiguration.FEATURE_POJO_MAPPING, Boolean.TRUE);
int readTimeout = configuration.getInt("atlas.client.readTimeoutMSecs", 60000);;
int connectTimeout = configuration.getInt("atlas.client.connectTimeoutMSecs", 60000);
if (configuration.getBoolean(TLS_ENABLED, false)) {
// create an SSL properties configuration if one doesn't exist. SSLFactory expects a file, so forced
// to create a
// configuration object, persist it, then subsequently pass in an empty configuration to SSLFactory
try {
SecureClientUtils.persistSSLClientConfiguration(configuration);
} catch (Exception e) {
LOG.info("Error processing client configuration.", e);
}
}
final URLConnectionClientHandler handler;
if ((!AuthenticationUtil.isKerberosAuthenticationEnabled()) && basicAuthUser != null && basicAuthPassword != null) {
if (configuration.getBoolean(TLS_ENABLED, false)) {
handler = SecureClientUtils.getUrlConnectionClientHandler();
} else {
handler = new URLConnectionClientHandler();
}
} else {
handler = SecureClientUtils.getClientConnectionHandler(config, configuration, doAsUser, ugi);
}
Client client = new Client(handler, config);
client.setReadTimeout(readTimeout);
client.setConnectTimeout(connectTimeout);
return client;
}
@VisibleForTesting
protected String determineActiveServiceURL(String[] baseUrls, Client client) {
if (baseUrls.length == 0) {
throw new IllegalArgumentException("Base URLs cannot be null or empty");
}
final String baseUrl;
AtlasServerEnsemble atlasServerEnsemble = new AtlasServerEnsemble(baseUrls);
if (atlasServerEnsemble.hasSingleInstance()) {
baseUrl = atlasServerEnsemble.firstURL();
LOG.info("Client has only one service URL, will use that for all actions: {}", baseUrl);
} else {
try {
baseUrl = selectActiveServerAddress(client, atlasServerEnsemble);
} catch (AtlasServiceException e) {
LOG.error("None of the passed URLs are active: {}", atlasServerEnsemble, e);
throw new IllegalArgumentException("None of the passed URLs are active " + atlasServerEnsemble, e);
}
}
return baseUrl;
}
private String selectActiveServerAddress(Client client, AtlasServerEnsemble serverEnsemble)
throws AtlasServiceException {
List<String> serverInstances = serverEnsemble.getMembers();
String activeServerAddress = null;
for (String serverInstance : serverInstances) {
LOG.info("Trying with address {}", serverInstance);
activeServerAddress = getAddressIfActive(client, serverInstance);
if (activeServerAddress != null) {
LOG.info("Found service {} as active service.", serverInstance);
break;
}
}
if (activeServerAddress != null)
return activeServerAddress;
else
throw new AtlasServiceException(STATUS, new RuntimeException("Could not find any active instance"));
}
private String getAddressIfActive(Client client, String serverInstance) {
String activeServerAddress = null;
for (int i = 0; i < getNumberOfRetries(); i++) {
try {
WebResource service = client.resource(UriBuilder.fromUri(serverInstance).build());
String adminStatus = getAdminStatus(service);
if (StringUtils.equals(adminStatus, "ACTIVE")) {
activeServerAddress = serverInstance;
break;
} else {
LOG.info("attempt #{}: Service {} - is not active. status={}", (i+1), serverInstance, adminStatus);
}
} catch (Exception e) {
LOG.error("attempt #{}: Service {} - could not get status", (i+1), serverInstance, e);
}
sleepBetweenRetries();
}
return activeServerAddress;
}
protected Configuration getClientProperties() {
try {
if (configuration == null) {
configuration = ApplicationProperties.get();
}
} catch (AtlasException e) {
LOG.error("Exception while loading configuration.", e);
}
return configuration;
}
public boolean isServerReady() throws AtlasServiceException {
WebResource resource = getResource(VERSION.getPath());
try {
callAPIWithResource(VERSION, resource, null, JSONObject.class);
return true;
} catch (ClientHandlerException che) {
return false;
} catch (AtlasServiceException ase) {
if (ase.getStatus() != null && ase.getStatus().equals(ClientResponse.Status.SERVICE_UNAVAILABLE)) {
LOG.warn("Received SERVICE_UNAVAILABLE, server is not yet ready");
return false;
}
throw ase;
}
}
protected WebResource getResource(String path, String... pathParams) {
return getResource(service, path, pathParams);
}
protected <T> T callAPIWithResource(APIInfo api, WebResource resource, Object requestObject, Class<T> responseType) throws AtlasServiceException {
ClientResponse clientResponse = null;
int i = 0;
do {
clientResponse = resource
.accept(JSON_MEDIA_TYPE)
.type(JSON_MEDIA_TYPE)
.method(api.getMethod(), ClientResponse.class, requestObject);
LOG.debug("API {} returned status {}", resource.getURI(), clientResponse.getStatus());
if (clientResponse.getStatus() == api.getExpectedStatus().getStatusCode()) {
if (null == responseType) {
LOG.warn("No response type specified, returning null");
return null;
}
try {
if (responseType == JSONObject.class) {
String stringEntity = clientResponse.getEntity(String.class);
try {
return (T) new JSONObject(stringEntity);
} catch (JSONException e) {
throw new AtlasServiceException(api, e);
}
} else {
return clientResponse.getEntity(responseType);
}
} catch (ClientHandlerException e) {
throw new AtlasServiceException(api, e);
}
} else if (clientResponse.getStatus() != ClientResponse.Status.SERVICE_UNAVAILABLE.getStatusCode()) {
break;
} else {
LOG.error("Got a service unavailable when calling: {}, will retry..", resource);
sleepBetweenRetries();
}
i++;
} while (i < getNumberOfRetries());
throw new AtlasServiceException(api, clientResponse);
}
private WebResource getResource(WebResource service, String path, String... pathParams) {
WebResource resource = service.path(path);
if (pathParams != null) {
for (String pathParam : pathParams) {
resource = resource.path(pathParam);
}
}
return resource;
}
void sleepBetweenRetries() {
try {
Thread.sleep(getSleepBetweenRetriesMs());
} catch (InterruptedException e) {
LOG.error("Interrupted from sleeping between retries.", e);
}
}
int getNumberOfRetries() {
return configuration.getInt(AtlasBaseClient.ATLAS_CLIENT_HA_RETRIES_KEY, AtlasBaseClient.DEFAULT_NUM_RETRIES);
}
private int getSleepBetweenRetriesMs() {
return configuration.getInt(AtlasBaseClient.ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY, AtlasBaseClient.DEFAULT_SLEEP_BETWEEN_RETRIES_MS);
}
/**
* Return status of the service instance the client is pointing to.
*
* @return One of the values in ServiceState.ServiceStateValue or {@link #UNKNOWN_STATUS} if
* there is a JSON parse exception
* @throws AtlasServiceException if there is a HTTP error.
*/
public String getAdminStatus() throws AtlasServiceException {
return getAdminStatus(service);
}
private String getAdminStatus(WebResource service) throws AtlasServiceException {
String result = AtlasBaseClient.UNKNOWN_STATUS;
WebResource resource = getResource(service, STATUS.getPath());
JSONObject response = callAPIWithResource(STATUS, resource, null, JSONObject.class);
try {
result = response.getString("Status");
} catch (JSONException e) {
LOG.error("Exception while parsing admin status response. Returned response {}", response.toString(), e);
}
return result;
}
boolean isRetryableException(ClientHandlerException che) {
return che.getCause().getClass().equals(IOException.class)
|| che.getCause().getClass().equals(ConnectException.class);
}
void handleClientHandlerException(ClientHandlerException che) {
if (isRetryableException(che)) {
atlasClientContext.getClient().destroy();
LOG.warn("Destroyed current context while handling ClientHandlerEception.");
LOG.warn("Will retry and create new context.");
sleepBetweenRetries();
initializeState(atlasClientContext.getBaseUrls(), atlasClientContext.getUgi(),
atlasClientContext.getDoAsUser());
return;
}
throw che;
}
public boolean isRetryEnabled() {
return retryEnabled;
}
public void setRetryEnabled(boolean retryEnabled) {
this.retryEnabled = retryEnabled;
}
@VisibleForTesting
JSONObject callAPIWithRetries(APIInfo api, Object requestObject, ResourceCreator resourceCreator)
throws AtlasServiceException {
for (int i = 0; i < getNumberOfRetries(); i++) {
WebResource resource = resourceCreator.createResource();
try {
LOG.debug("Using resource {} for {} times", resource.getURI(), i);
JSONObject result = callAPIWithResource(api, resource, requestObject);
return result;
} catch (ClientHandlerException che) {
if (i == (getNumberOfRetries() - 1)) {
throw che;
}
LOG.warn("Handled exception in calling api {}", api.getPath(), che);
LOG.warn("Exception's cause: {}", che.getCause().getClass());
handleClientHandlerException(che);
}
}
throw new AtlasServiceException(api, new RuntimeException("Could not get response after retries."));
}
protected JSONObject callAPIWithResource(APIInfo api, WebResource resource, Object requestObject)
throws AtlasServiceException {
return callAPIWithResource(api, resource, requestObject, JSONObject.class);
}
protected JSONObject callAPI(final APIInfo api, Object requestObject, final String... pathParams)
throws AtlasServiceException {
return callAPIWithRetries(api, requestObject, new ResourceCreator() {
@Override
public WebResource createResource() {
return getResource(api, pathParams);
}
});
}
protected <T> T callAPI(APIInfo api, Object requestObject, Class<T> responseType, String... params)
throws AtlasServiceException {
return callAPIWithResource(api, getResource(api, params), requestObject, responseType);
}
protected WebResource getResource(APIInfo api, String... pathParams) {
return getResource(service, api, pathParams);
}
// Modify URL to include the path params
private WebResource getResource(WebResource service, APIInfo api, String... pathParams) {
WebResource resource = service.path(api.getPath());
if (pathParams != null) {
for (String pathParam : pathParams) {
resource = resource.path(pathParam);
}
}
return resource;
}
protected <T> T callAPI(APIInfo api, Object requestObject, Class<T> responseType, Map<String, String> queryParams)
throws AtlasServiceException {
return callAPIWithResource(api, getResource(api, queryParams), requestObject, responseType);
}
protected WebResource getResource(APIInfo api, Map<String, String> queryParams) {
return getResource(service, api, queryParams);
}
// Modify URL to include the query params
private WebResource getResource(WebResource service, APIInfo api, Map<String, String> queryParams) {
WebResource resource = service.path(api.getPath());
if (null != queryParams && !queryParams.isEmpty()) {
for (Map.Entry<String, String> entry : queryParams.entrySet()) {
resource = resource.queryParam(entry.getKey(), entry.getValue());
}
}
return resource;
}
protected APIInfo formatPath(APIInfo apiInfo, String ... params) {
return new APIInfo(String.format(apiInfo.getPath(), params), apiInfo.getMethod(), apiInfo.getExpectedStatus());
}
@VisibleForTesting
void setConfiguration(Configuration configuration) {
this.configuration = configuration;
}
@VisibleForTesting
void setService(WebResource resource) {
this.service = resource;
}
public static class APIInfo {
private final String method;
private final String path;
private final Response.Status status;
APIInfo(String path, String method, Response.Status status) {
this.path = path;
this.method = method;
this.status = status;
}
public String getMethod() {
return method;
}
public String getPath() {
return path;
}
public Response.Status getExpectedStatus() {
return status;
}
}
/**
* A class to capture input state while creating the client.
*
* The information here will be reused when the client is re-initialized on switch-over
* in case of High Availability.
*/
private class AtlasClientContext {
private String[] baseUrls;
private Client client;
private String doAsUser;
private UserGroupInformation ugi;
public AtlasClientContext(String[] baseUrls, Client client, UserGroupInformation ugi, String doAsUser) {
this.baseUrls = baseUrls;
this.client = client;
this.ugi = ugi;
this.doAsUser = doAsUser;
}
public Client getClient() {
return client;
}
public String[] getBaseUrls() {
return baseUrls;
}
public String getDoAsUser() {
return doAsUser;
}
public UserGroupInformation getUgi() {
return ugi;
}
}
}
......@@ -20,16 +20,10 @@ package org.apache.atlas;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.api.client.filter.HTTPBasicAuthFilter;
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
import org.apache.atlas.security.SecureClientUtils;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.TypesDef;
......@@ -40,7 +34,6 @@ import org.apache.atlas.typesystem.types.DataTypes;
import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
......@@ -50,12 +43,6 @@ import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import java.io.IOException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
......@@ -63,12 +50,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.Response;
/**
* Client for metadata.
*/
public class AtlasClient {
public class AtlasClient extends AtlasBaseClient {
private static final Logger LOG = LoggerFactory.getLogger(AtlasClient.class);
public static final String TYPE = "type";
......@@ -90,10 +78,6 @@ public class AtlasClient {
public static final String START_KEY = "startKey";
public static final String NUM_RESULTS = "count";
public static final String BASE_URI = "api/atlas/";
public static final String ADMIN_VERSION = "admin/version";
public static final String ADMIN_STATUS = "admin/status";
public static final String TYPES = "types";
public static final String URI_ENTITY = "entities";
public static final String URI_ENTITY_AUDIT = "audit";
public static final String URI_SEARCH = "discovery/search";
......@@ -128,39 +112,11 @@ public class AtlasClient {
public static final String REFERENCEABLE_SUPER_TYPE = "Referenceable";
public static final String REFERENCEABLE_ATTRIBUTE_NAME = "qualifiedName";
public static final String JSON_MEDIA_TYPE = MediaType.APPLICATION_JSON + "; charset=UTF-8";
public static final String UNKNOWN_STATUS = "Unknown status";
public static final String ATLAS_CLIENT_HA_RETRIES_KEY = "atlas.client.ha.retries";
// Setting the default value based on testing failovers while client code like quickstart is running.
public static final int DEFAULT_NUM_RETRIES = 4;
public static final String ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms";
public static final String HTTP_AUTHENTICATION_ENABLED = "atlas.http.authentication.enabled";
// Setting the default value based on testing failovers while client code like quickstart is running.
// With number of retries, this gives a total time of about 20s for the server to start.
public static final int DEFAULT_SLEEP_BETWEEN_RETRIES_MS = 5000;
private WebResource service;
private AtlasClientContext atlasClientContext;
private Configuration configuration;
private String basicAuthUser;
private String basicAuthPassword;
// New constuctor for Basic auth
public AtlasClient(String[] baseUrl, String[] basicAuthUserNamepassword) {
if (basicAuthUserNamepassword != null) {
if (basicAuthUserNamepassword.length > 0) {
this.basicAuthUser = basicAuthUserNamepassword[0];
}
if (basicAuthUserNamepassword.length > 1) {
this.basicAuthPassword = basicAuthUserNamepassword[1];
}
}
initializeState(baseUrl, null, null);
super(baseUrl, basicAuthUserNamepassword);
}
/**
......@@ -187,14 +143,6 @@ public class AtlasClient {
initializeState(baseUrls, ugi, doAsUser);
}
private static UserGroupInformation getCurrentUGI() throws AtlasException {
try {
return UserGroupInformation.getCurrentUser();
} catch (IOException e) {
throw new AtlasException(e);
}
}
private AtlasClient(UserGroupInformation ugi, String[] baseUrls) {
this(ugi, ugi.getShortUserName(), baseUrls);
}
......@@ -204,268 +152,25 @@ public class AtlasClient {
//Do nothing
}
private void initializeState(String[] baseUrls, UserGroupInformation ugi, String doAsUser) {
configuration = getClientProperties();
Client client = getClient(configuration, ugi, doAsUser);
if ((!AuthenticationUtil.isKerberosAuthenticationEnabled()) && basicAuthUser!=null && basicAuthPassword!=null) {
final HTTPBasicAuthFilter authFilter = new HTTPBasicAuthFilter(basicAuthUser, basicAuthPassword);
client.addFilter(authFilter);
}
String activeServiceUrl = determineActiveServiceURL(baseUrls, client);
atlasClientContext = new AtlasClientContext(baseUrls, client, ugi, doAsUser);
service = client.resource(UriBuilder.fromUri(activeServiceUrl).build());
}
@VisibleForTesting
protected Client getClient(Configuration configuration, UserGroupInformation ugi, String doAsUser) {
DefaultClientConfig config = new DefaultClientConfig();
Configuration clientConfig = null;
int readTimeout = 60000;
int connectTimeout = 60000;
try {
clientConfig = configuration;
if (clientConfig.getBoolean(TLS_ENABLED, false)) {
// create an SSL properties configuration if one doesn't exist. SSLFactory expects a file, so forced
// to create a
// configuration object, persist it, then subsequently pass in an empty configuration to SSLFactory
SecureClientUtils.persistSSLClientConfiguration(clientConfig);
}
readTimeout = clientConfig.getInt("atlas.client.readTimeoutMSecs", readTimeout);
connectTimeout = clientConfig.getInt("atlas.client.connectTimeoutMSecs", connectTimeout);
} catch (Exception e) {
LOG.info("Error processing client configuration.", e);
}
URLConnectionClientHandler handler = null;
if ((!AuthenticationUtil.isKerberosAuthenticationEnabled()) && basicAuthUser != null && basicAuthPassword != null) {
if (clientConfig.getBoolean(TLS_ENABLED, false)) {
handler = SecureClientUtils.getUrlConnectionClientHandler();
} else {
handler = new URLConnectionClientHandler();
}
} else {
handler =
SecureClientUtils.getClientConnectionHandler(config, clientConfig, doAsUser, ugi);
}
Client client = new Client(handler, config);
client.setReadTimeout(readTimeout);
client.setConnectTimeout(connectTimeout);
return client;
public AtlasClient(Configuration configuration, String[] baseUrl, String[] basicAuthUserNamepassword) {
super(configuration, baseUrl, basicAuthUserNamepassword);
}
@VisibleForTesting
protected String determineActiveServiceURL(String[] baseUrls, Client client) {
if (baseUrls.length == 0) {
throw new IllegalArgumentException("Base URLs cannot be null or empty");
}
String baseUrl;
AtlasServerEnsemble atlasServerEnsemble = new AtlasServerEnsemble(baseUrls);
if (atlasServerEnsemble.hasSingleInstance()) {
baseUrl = atlasServerEnsemble.firstURL();
LOG.info("Client has only one service URL, will use that for all actions: {}", baseUrl);
return baseUrl;
} else {
try {
baseUrl = selectActiveServerAddress(client, atlasServerEnsemble);
} catch (AtlasServiceException e) {
LOG.error("None of the passed URLs are active: {}", atlasServerEnsemble, e);
throw new IllegalArgumentException("None of the passed URLs are active " + atlasServerEnsemble, e);
}
}
return baseUrl;
}
private String selectActiveServerAddress(Client client, AtlasServerEnsemble serverEnsemble)
throws AtlasServiceException {
List<String> serverInstances = serverEnsemble.getMembers();
String activeServerAddress = null;
for (String serverInstance : serverInstances) {
LOG.info("Trying with address {}", serverInstance);
activeServerAddress = getAddressIfActive(client, serverInstance);
if (activeServerAddress != null) {
LOG.info("Found service {} as active service.", serverInstance);
break;
}
}
if (activeServerAddress != null)
return activeServerAddress;
else
throw new AtlasServiceException(API.STATUS, new RuntimeException("Could not find any active instance"));
}
private String getAddressIfActive(Client client, String serverInstance) {
String activeServerAddress = null;
for (int i = 0; i < getNumberOfRetries(); i++) {
try {
WebResource service = client.resource(UriBuilder.fromUri(serverInstance).build());
String adminStatus = getAdminStatus(service);
if (adminStatus.equals("ACTIVE")) {
activeServerAddress = serverInstance;
break;
} else {
LOG.info("Service {} is not active.. will retry.", serverInstance);
}
} catch (Exception e) {
LOG.error("Could not get status from service {} after {} tries.", serverInstance, i, e);
}
sleepBetweenRetries();
LOG.warn("Service {} is not active.", serverInstance);
}
return activeServerAddress;
}
private void sleepBetweenRetries(){
try {
Thread.sleep(getSleepBetweenRetriesMs());
} catch (InterruptedException e) {
LOG.error("Interrupted from sleeping between retries.", e);
}
}
private int getSleepBetweenRetriesMs() {
return configuration.getInt(ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY, DEFAULT_SLEEP_BETWEEN_RETRIES_MS);
}
private int getNumberOfRetries() {
return configuration.getInt(ATLAS_CLIENT_HA_RETRIES_KEY, DEFAULT_NUM_RETRIES);
public AtlasClient(Configuration configuration, String... baseUrls) throws AtlasException {
initializeState(configuration, baseUrls, getCurrentUGI(), getCurrentUGI().getShortUserName());
}
@VisibleForTesting
AtlasClient(WebResource service, Configuration configuration) {
this.service = service;
this.configuration = configuration;
}
protected Configuration getClientProperties() {
try {
if (configuration == null) {
configuration = ApplicationProperties.get();
}
} catch (AtlasException e) {
LOG.error("Exception while loading configuration.", e);
}
return configuration;
}
public boolean isServerReady() throws AtlasServiceException {
WebResource resource = getResource(API.VERSION);
try {
callAPIWithResource(API.VERSION, resource, null);
return true;
} catch (ClientHandlerException che) {
return false;
} catch (AtlasServiceException ase) {
if (ase.getStatus().equals(ClientResponse.Status.SERVICE_UNAVAILABLE)) {
LOG.warn("Received SERVICE_UNAVAILABLE, server is not yet ready");
return false;
}
throw ase;
}
super(service, configuration);
}
public WebResource getResource() {
return service;
}
public static class EntityResult {
private static final Gson gson = new GsonBuilder().setPrettyPrinting().create();
public static final String OP_CREATED = "created";
public static final String OP_UPDATED = "updated";
public static final String OP_DELETED = "deleted";
Map<String, List<String>> entities = new HashMap<>();
public EntityResult() {
//For gson
}
public EntityResult(List<String> created, List<String> updated, List<String> deleted) {
add(OP_CREATED, created);
add(OP_UPDATED, updated);
add(OP_DELETED, deleted);
}
private void add(String type, List<String> list) {
if (list != null && list.size() > 0) {
entities.put(type, list);
}
}
private List<String> get(String type) {
List<String> list = entities.get(type);
if (list == null) {
list = new ArrayList<>();
}
return list;
}
public List<String> getCreatedEntities() {
return get(OP_CREATED);
}
public List<String> getUpdateEntities() {
return get(OP_UPDATED);
}
public List<String> getDeletedEntities() {
return get(OP_DELETED);
}
@Override
public String toString() {
return gson.toJson(this);
}
public static EntityResult fromString(String json) throws AtlasServiceException {
return gson.fromJson(json, EntityResult.class);
}
}
/**
* Return status of the service instance the client is pointing to.
*
* @return One of the values in ServiceState.ServiceStateValue or {@link #UNKNOWN_STATUS} if there is a JSON parse
* exception
* @throws AtlasServiceException if there is a HTTP error.
*/
public String getAdminStatus() throws AtlasServiceException {
return getAdminStatus(service);
}
private void handleClientHandlerException(ClientHandlerException che) {
if (isRetryableException(che)) {
atlasClientContext.getClient().destroy();
LOG.warn("Destroyed current context while handling ClientHandlerEception.");
LOG.warn("Will retry and create new context.");
sleepBetweenRetries();
initializeState(atlasClientContext.getBaseUrls(), atlasClientContext.getUgi(),
atlasClientContext.getDoAsUser());
return;
}
throw che;
}
private boolean isRetryableException(ClientHandlerException che) {
return che.getCause().getClass().equals(IOException.class)
|| che.getCause().getClass().equals(ConnectException.class);
}
private String getAdminStatus(WebResource service) throws AtlasServiceException {
String result = UNKNOWN_STATUS;
WebResource resource = getResource(service, API.STATUS);
JSONObject response = callAPIWithResource(API.STATUS, resource, null);
try {
result = response.getString(STATUS);
} catch (JSONException e) {
LOG.error("Exception while parsing admin status response. Returned response {}", response.toString(), e);
}
return result;
}
public enum API {
//Admin operations
......@@ -530,8 +235,63 @@ public class AtlasClient {
public String getPath() {
return path;
}
public Response.Status getExpectedStatus() { return status; }
public Response.Status getExpectedStatus() {
return status;
}
}
public static class EntityResult {
public static final String OP_CREATED = "created";
public static final String OP_UPDATED = "updated";
public static final String OP_DELETED = "deleted";
Map<String, List<String>> entities = new HashMap<>();
public EntityResult() {
//For gson
}
public EntityResult(List<String> created, List<String> updated, List<String> deleted) {
add(OP_CREATED, created);
add(OP_UPDATED, updated);
add(OP_DELETED, deleted);
}
private void add(String type, List<String> list) {
if (list != null && list.size() > 0) {
entities.put(type, list);
}
}
private List<String> get(String type) {
List<String> list = entities.get(type);
if (list == null) {
list = new ArrayList<>();
}
return list;
}
public List<String> getCreatedEntities() {
return get(OP_CREATED);
}
public List<String> getUpdateEntities() {
return get(OP_UPDATED);
}
public List<String> getDeletedEntities() {
return get(OP_DELETED);
}
@Override
public String toString() {
return AtlasType.toJson(this);
}
public static EntityResult fromString(String json) throws AtlasServiceException {
return AtlasType.fromJson(json, EntityResult.class);
}
}
/**
......@@ -639,7 +399,7 @@ public class AtlasClient {
JSONObject response = callAPIWithRetries(API.LIST_TYPES, null, new ResourceCreator() {
@Override
public WebResource createResource() {
WebResource resource = getResource(API.LIST_TYPES);
WebResource resource = getResource(API.LIST_TYPES.getPath());
resource = resource.queryParam(TYPE, category.name());
return resource;
}
......@@ -780,27 +540,6 @@ public class AtlasClient {
return extractEntityResult(response);
}
@VisibleForTesting
JSONObject callAPIWithRetries(API api, Object requestObject, ResourceCreator resourceCreator)
throws AtlasServiceException {
for (int i = 0; i < getNumberOfRetries(); i++) {
WebResource resource = resourceCreator.createResource();
try {
LOG.debug("Using resource {} for {} times", resource.getURI(), i);
JSONObject result = callAPIWithResource(api, resource, requestObject);
return result;
} catch (ClientHandlerException che) {
if (i==(getNumberOfRetries()-1)) {
throw che;
}
LOG.warn("Handled exception in calling api {}", api.getPath(), che);
LOG.warn("Exception's cause: {}", che.getCause().getClass());
handleClientHandlerException(che);
}
}
throw new AtlasServiceException(api, new RuntimeException("Could not get response after retries."));
}
/**
* Supports Partial updates
* Updates properties set in the definition for the entity corresponding to guid
......@@ -1205,93 +944,26 @@ public class AtlasClient {
}
}
private WebResource getResource(API api, String... pathParams) {
return getResource(service, api, pathParams);
}
// Wrapper methods for compatibility
private WebResource getResource(WebResource service, API api, String... pathParams) {
WebResource resource = service.path(api.getPath());
if (pathParams != null) {
for (String pathParam : pathParams) {
resource = resource.path(pathParam);
}
}
return resource;
JSONObject callAPIWithResource(API api, WebResource resource, Object requestObject) throws AtlasServiceException {
return callAPIWithResource(toAPIInfo(api), resource, requestObject);
}
private JSONObject callAPIWithResource(API api, WebResource resource, Object requestObject)
throws AtlasServiceException {
ClientResponse clientResponse = null;
int i = 0;
do {
clientResponse = resource.accept(JSON_MEDIA_TYPE).type(JSON_MEDIA_TYPE)
.method(api.getMethod(), ClientResponse.class, requestObject);
LOG.debug("API {} returned status {}", resource.getURI(), clientResponse.getStatus());
if (clientResponse.getStatus() == api.getExpectedStatus().getStatusCode()) {
String responseAsString = clientResponse.getEntity(String.class);
try {
return new JSONObject(responseAsString);
} catch (JSONException e) {
throw new AtlasServiceException(api, e);
}
} else if (clientResponse.getStatus() != ClientResponse.Status.SERVICE_UNAVAILABLE.getStatusCode()) {
break;
} else {
LOG.error("Got a service unavailable when calling: {}, will retry..", resource);
sleepBetweenRetries();
}
i++;
} while (i < getNumberOfRetries());
throw new AtlasServiceException(api, clientResponse);
WebResource getResource(API api, String ... params) {
return getResource(toAPIInfo(api), params);
}
private JSONObject callAPI(final API api, Object requestObject, final String... pathParams)
throws AtlasServiceException {
return callAPIWithRetries(api, requestObject, new ResourceCreator() {
@Override
public WebResource createResource() {
return getResource(api, pathParams);
}
});
JSONObject callAPI(API api, Object requestObject, String ... params) throws AtlasServiceException {
return callAPI(toAPIInfo(api), requestObject, params);
}
/**
* A class to capture input state while creating the client.
*
* The information here will be reused when the client is re-initialized on switch-over
* in case of High Availability.
*/
private class AtlasClientContext {
private String[] baseUrls;
private Client client;
private String doAsUser;
private UserGroupInformation ugi;
public AtlasClientContext(String[] baseUrls, Client client, UserGroupInformation ugi, String doAsUser) {
this.baseUrls = baseUrls;
this.client = client;
this.ugi = ugi;
this.doAsUser = doAsUser;
}
public Client getClient() {
return client;
}
public String[] getBaseUrls() {
return baseUrls;
}
public String getDoAsUser() {
return doAsUser;
}
JSONObject callAPIWithRetries(API api, Object requestObject, ResourceCreator resourceCreator) throws AtlasServiceException {
return super.callAPIWithRetries(toAPIInfo(api), requestObject, resourceCreator);
}
public UserGroupInformation getUgi() {
return ugi;
}
private APIInfo toAPIInfo(API api){
return new APIInfo(api.getPath(), api.getMethod(), api.getExpectedStatus());
}
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.WebResource;
import org.apache.atlas.model.SearchFilter;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasClassification.AtlasClassifications;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import java.util.List;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.Response;
public class AtlasEntitiesClientV2 extends AtlasBaseClient {
public static final String ENTITY_API = BASE_URI + "v2/entity/";
public static final String ENTITIES_API = BASE_URI + "v2/entities/";
private static final APIInfo GET_ENTITY_BY_GUID = new APIInfo(ENTITY_API + "guid/", HttpMethod.GET, Response.Status.OK);
private static final APIInfo CREATE_ENTITY = new APIInfo(ENTITY_API, HttpMethod.POST, Response.Status.OK);
private static final APIInfo UPDATE_ENTITY = CREATE_ENTITY;
private static final APIInfo UPDATE_ENTITY_BY_GUID = new APIInfo(ENTITY_API + "guid/", HttpMethod.PUT, Response.Status.OK);
private static final APIInfo DELETE_ENTITY_BY_GUID = new APIInfo(ENTITY_API + "guid/", HttpMethod.DELETE, Response.Status.OK);
private static final APIInfo GET_CLASSIFICATIONS = new APIInfo(ENTITY_API + "guid/%s/classifications", HttpMethod.GET, Response.Status.OK);
private static final APIInfo ADD_CLASSIFICATIONS = new APIInfo(ENTITY_API + "guid/%s/classifications", HttpMethod.POST, Response.Status.OK);
private static final APIInfo UPDATE_CLASSIFICATIONS = new APIInfo(ENTITY_API + "guid/%s/classifications", HttpMethod.PUT, Response.Status.OK);
private static final APIInfo DELETE_CLASSIFICATION = new APIInfo(ENTITY_API + "guid/%s/classification/%s", HttpMethod.DELETE, Response.Status.OK);
private static final APIInfo GET_ENTITIES = new APIInfo(ENTITIES_API + "guids/", HttpMethod.GET, Response.Status.OK);
private static final APIInfo CREATE_ENTITIES = new APIInfo(ENTITIES_API, HttpMethod.POST, Response.Status.OK);
private static final APIInfo UPDATE_ENTITIES = CREATE_ENTITIES;
private static final APIInfo DELETE_ENTITIES = new APIInfo(ENTITIES_API + "guids/", HttpMethod.GET, Response.Status.OK);
private static final APIInfo SEARCH_ENTITIES = new APIInfo(ENTITIES_API, HttpMethod.GET, Response.Status.OK);
public AtlasEntitiesClientV2(String[] baseUrl, String[] basicAuthUserNamePassword) {
super(baseUrl, basicAuthUserNamePassword);
}
public AtlasEntitiesClientV2(String... baseUrls) throws AtlasException {
super(baseUrls);
}
public AtlasEntitiesClientV2(UserGroupInformation ugi, String doAsUser, String... baseUrls) {
super(ugi, doAsUser, baseUrls);
}
protected AtlasEntitiesClientV2() {
super();
}
@VisibleForTesting
AtlasEntitiesClientV2(WebResource service, Configuration configuration) {
super(service, configuration);
}
public AtlasEntity getEntityByGuid(String guid) throws AtlasServiceException {
return callAPI(GET_ENTITY_BY_GUID, null, AtlasEntity.class, guid);
}
public EntityMutationResponse createEntity(AtlasEntity atlasEntity) throws AtlasServiceException {
return callAPI(CREATE_ENTITY, atlasEntity, EntityMutationResponse.class);
}
public EntityMutationResponse updateEntity(AtlasEntity atlasEntity) throws AtlasServiceException {
return callAPI(UPDATE_ENTITY, atlasEntity, EntityMutationResponse.class);
}
public EntityMutationResponse updateEntity(String guid, AtlasEntity atlasEntity) throws AtlasServiceException {
return callAPI(UPDATE_ENTITY, atlasEntity, EntityMutationResponse.class, guid);
}
public AtlasEntity deleteEntityByGuid(String guid) throws AtlasServiceException {
return callAPI(DELETE_ENTITY_BY_GUID, null, AtlasEntity.class, guid);
}
public AtlasClassifications getClassifications(String guid) throws AtlasServiceException {
return callAPI(formatPath(GET_CLASSIFICATIONS, guid), null, AtlasClassifications.class);
}
public void addClassifications(String guid, List<AtlasClassification> classifications) throws AtlasServiceException {
callAPI(formatPath(ADD_CLASSIFICATIONS, guid), classifications, AtlasClassifications.class);
}
public void updateClassifications(String guid, List<AtlasClassification> classifications) throws AtlasServiceException {
callAPI(formatPath(UPDATE_CLASSIFICATIONS, guid), classifications, AtlasClassifications.class);
}
public void deleteClassifications(String guid, List<AtlasClassification> classifications) throws AtlasServiceException {
callAPI(formatPath(GET_CLASSIFICATIONS, guid), classifications, AtlasClassifications.class);
}
public void deleteClassification(String guid, String classificationName) throws AtlasServiceException {
callAPI(formatPath(DELETE_CLASSIFICATION, guid, classificationName), null, AtlasClassifications.class);
}
// Entities operations
public List<AtlasEntity> getEntities(List<String> entityIds) {
// TODO Map the query params correctly
return null;
}
public List<AtlasEntity> createEntities(List<AtlasEntity> atlasEntities) throws AtlasServiceException {
return (List<AtlasEntity>)callAPI(CREATE_ENTITIES, atlasEntities, List.class);
}
public List<AtlasEntity> updateEntities(List<AtlasEntity> atlasEntities) throws AtlasServiceException {
return (List<AtlasEntity>)callAPI(UPDATE_ENTITIES, atlasEntities, List.class);
}
public AtlasEntity.AtlasEntities searchEntities(SearchFilter searchFilter) throws AtlasServiceException {
return callAPI(GET_ENTITIES, null, AtlasEntity.AtlasEntities.class, searchFilter.getParams());
}
}
......@@ -31,21 +31,40 @@ public class AtlasServiceException extends Exception {
super("Metadata service API " + api + " failed", e);
}
public AtlasServiceException(AtlasBaseClient.APIInfo api, Exception e) {
super("Metadata service API " + api + " failed", e);
}
public AtlasServiceException(AtlasClient.API api, WebApplicationException e) throws JSONException {
this(api, ClientResponse.Status.fromStatusCode(e.getResponse().getStatus()),
((JSONObject) e.getResponse().getEntity()).getString("stackTrace"));
}
public AtlasServiceException(AtlasBaseClient.APIInfo api, WebApplicationException e) throws JSONException {
this(api, ClientResponse.Status.fromStatusCode(e.getResponse().getStatus()),
((JSONObject) e.getResponse().getEntity()).getString("stackTrace"));
}
private AtlasServiceException(AtlasClient.API api, ClientResponse.Status status, String response) {
super("Metadata service API " + api + " failed with status " + (status != null ? status.getStatusCode() : -1)
+ " (" + status + ") Response Body (" + response + ")");
this.status = status;
}
private AtlasServiceException(AtlasBaseClient.APIInfo api, ClientResponse.Status status, String response) {
super("Metadata service API " + api + " failed with status " + (status != null ? status.getStatusCode() : -1)
+ " (" + status + ") Response Body (" + response + ")");
this.status = status;
}
public AtlasServiceException(AtlasClient.API api, ClientResponse response) {
this(api, ClientResponse.Status.fromStatusCode(response.getStatus()), response.getEntity(String.class));
}
public AtlasServiceException(AtlasBaseClient.APIInfo api, ClientResponse response) {
this(api, ClientResponse.Status.fromStatusCode(response.getStatus()), response.getEntity(String.class));
}
public AtlasServiceException(Exception e) {
super(e);
}
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.WebResource;
import org.apache.atlas.model.SearchFilter;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasEnumDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.Response;
public class AtlasTypedefClientV2 extends AtlasBaseClient {
private static final String BASE_URI = "api/atlas/v2/types/";
private static final String TYPEDEFS_PATH = BASE_URI + "typedefs/";
private static final String GET_BY_NAME_TEMPLATE = BASE_URI + "%s/name/%s";
private static final String GET_BY_GUID_TEMPLATE = BASE_URI + "%s/guid/%s";
private static final APIInfo GET_ALL_TYPE_DEFS = new APIInfo(TYPEDEFS_PATH, HttpMethod.GET, Response.Status.OK);
private static final APIInfo CREATE_ALL_TYPE_DEFS = new APIInfo(TYPEDEFS_PATH, HttpMethod.POST, Response.Status.OK);
private static final APIInfo UPDATE_ALL_TYPE_DEFS = new APIInfo(TYPEDEFS_PATH, HttpMethod.PUT, Response.Status.OK);
private static final APIInfo DELETE_ALL_TYPE_DEFS = new APIInfo(TYPEDEFS_PATH, HttpMethod.DELETE, Response.Status.OK);
public AtlasTypedefClientV2(String[] baseUrl, String[] basicAuthUserNamePassword) {
super(baseUrl, basicAuthUserNamePassword);
}
public AtlasTypedefClientV2(String... baseUrls) throws AtlasException {
super(baseUrls);
}
public AtlasTypedefClientV2(UserGroupInformation ugi, String doAsUser, String... baseUrls) {
super(ugi, doAsUser, baseUrls);
}
protected AtlasTypedefClientV2() {
super();
}
@VisibleForTesting
AtlasTypedefClientV2(WebResource service, Configuration configuration) {
super(service, configuration);
}
/**
* Bulk retrieval API for retrieving all type definitions in Atlas
*
* @return A composite wrapper object with lists of all type definitions
*/
public AtlasTypesDef getAllTypeDefs(SearchFilter searchFilter) throws AtlasServiceException {
return callAPI(GET_ALL_TYPE_DEFS, null, AtlasTypesDef.class, searchFilter.getParams());
}
public AtlasEnumDef getEnumByName(final String name) throws AtlasServiceException {
return getTypeDefByName(name, AtlasEnumDef.class);
}
public AtlasEnumDef getEnumByGuid(final String guid) throws AtlasServiceException {
return getTypeDefByGuid(guid, AtlasEnumDef.class);
}
public AtlasStructDef getStructByName(final String name) throws AtlasServiceException {
return getTypeDefByName(name, AtlasStructDef.class);
}
public AtlasStructDef getStructByGuid(final String guid) throws AtlasServiceException {
return getTypeDefByGuid(guid, AtlasStructDef.class);
}
public AtlasClassificationDef getClassificationByName(final String name) throws AtlasServiceException {
return getTypeDefByName(name, AtlasClassificationDef.class);
}
public AtlasClassificationDef getClassificationByGuid(final String guid) throws AtlasServiceException {
return getTypeDefByGuid(guid, AtlasClassificationDef.class);
}
public AtlasEntityDef getEntityByName(final String name) throws AtlasServiceException {
return getTypeDefByName(name, AtlasEntityDef.class);
}
public AtlasEntityDef getEntityByGuid(final String guid) throws AtlasServiceException {
return getTypeDefByGuid(guid, AtlasEntityDef.class);
}
/**
* Bulk create APIs for all atlas type definitions, only new definitions will be created.
* Any changes to the existing definitions will be discarded
*
* @param typesDef A composite wrapper object with corresponding lists of the type definition
* @return A composite wrapper object with lists of type definitions that were successfully
* created
*/
public AtlasTypesDef createAtlasTypeDefs(final AtlasTypesDef typesDef) throws AtlasServiceException {
return callAPI(CREATE_ALL_TYPE_DEFS, AtlasType.toJson(typesDef), AtlasTypesDef.class);
}
/**
* Bulk update API for all types, changes detected in the type definitions would be persisted
*
* @param typesDef A composite object that captures all type definition changes
* @return A composite object with lists of type definitions that were updated
*/
public AtlasTypesDef updateAtlasTypeDefs(final AtlasTypesDef typesDef) throws AtlasServiceException {
return callAPI(UPDATE_ALL_TYPE_DEFS, AtlasType.toJson(typesDef), AtlasTypesDef.class);
}
/**
* Bulk delete API for all types
*
* @param typesDef A composite object that captures all types to be deleted
*/
public void deleteAtlasTypeDefs(final AtlasTypesDef typesDef) throws AtlasServiceException {
callAPI(DELETE_ALL_TYPE_DEFS, AtlasType.toJson(typesDef), AtlasTypesDef.class);
}
private <T> T getTypeDefByName(final String name, Class<T> typeDefClass) throws AtlasServiceException {
String atlasPath = getAtlasPath(typeDefClass);
APIInfo apiInfo = new APIInfo(String.format(GET_BY_NAME_TEMPLATE, atlasPath, name), HttpMethod.GET, Response.Status.OK);
return callAPI(apiInfo, null, typeDefClass);
}
private <T> T getTypeDefByGuid(final String guid, Class<T> typeDefClass) throws AtlasServiceException {
String atlasPath = getAtlasPath(typeDefClass);
APIInfo apiInfo = new APIInfo(String.format(GET_BY_GUID_TEMPLATE, atlasPath, guid), HttpMethod.GET, Response.Status.OK);
return callAPI(apiInfo, null, typeDefClass);
}
private <T> String getAtlasPath(Class<T> typeDefClass) {
if (typeDefClass.isAssignableFrom(AtlasEnumDef.class)) {
return "enumdef";
} else if (typeDefClass.isAssignableFrom(AtlasEntityDef.class)) {
return "entitydef";
} else if (typeDefClass.isAssignableFrom(AtlasClassificationDef.class)) {
return "classificationdef";
} else if (typeDefClass.isAssignableFrom(AtlasStructDef.class)) {
return "structdef";
}
// Code should never reach this pion
return "";
}
}
......@@ -21,6 +21,7 @@ import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.commons.configuration.Configuration;
......@@ -32,14 +33,15 @@ import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import java.net.ConnectException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.List;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyString;
......@@ -94,6 +96,7 @@ public class AtlasClientTest {
JSONObject jsonResponse = new JSONObject(new AtlasClient.EntityResult(Arrays.asList("id"), null, null).toString());
when(response.getEntity(String.class)).thenReturn(jsonResponse.toString());
when(response.getLength()).thenReturn(jsonResponse.length());
String entityJson = InstanceSerialization.toJson(new Referenceable("type"), true);
when(builder.method(anyString(), Matchers.<Class>any(), anyString())).thenReturn(response);
......@@ -157,9 +160,15 @@ public class AtlasClientTest {
WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(response.getEntity(String.class)).thenReturn("{\"Status\":\"Active\"}");
String activeStatus = "{\"Status\":\"Active\"}";
when(response.getEntity(String.class)).thenReturn(activeStatus);
when(response.getLength()).thenReturn(activeStatus.length());
when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).thenReturn(response);
// Fix after AtlasBaseClient
// atlasClient.setService();
String status = atlasClient.getAdminStatus();
assertEquals(status, "Active");
}
......@@ -212,10 +221,14 @@ public class AtlasClientTest {
WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
ClientResponse firstResponse = mock(ClientResponse.class);
when(firstResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(firstResponse.getEntity(String.class)).thenReturn("{\"Status\":\"PASSIVE\"}");
String passiveStatus = "{\"Status\":\"PASSIVE\"}";
when(firstResponse.getEntity(String.class)).thenReturn(passiveStatus);
when(firstResponse.getLength()).thenReturn(passiveStatus.length());
ClientResponse secondResponse = mock(ClientResponse.class);
when(secondResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(secondResponse.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}");
String activeStatus = "{\"Status\":\"ACTIVE\"}";
when(secondResponse.getEntity(String.class)).thenReturn(activeStatus);
when(secondResponse.getLength()).thenReturn(activeStatus.length());
when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).
thenReturn(firstResponse).thenReturn(firstResponse).thenReturn(firstResponse).
thenReturn(secondResponse);
......@@ -239,7 +252,9 @@ public class AtlasClientTest {
when(response.getEntity(String.class)).thenReturn("{\"Status\":\"BECOMING_ACTIVE\"}");
ClientResponse nextResponse = mock(ClientResponse.class);
when(nextResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(nextResponse.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}");
String activeStatus = "{\"Status\":\"ACTIVE\"}";
when(response.getEntity(String.class)).thenReturn(activeStatus);
when(response.getLength()).thenReturn(activeStatus.length());
when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).
thenReturn(response).thenReturn(response).thenReturn(nextResponse);
......@@ -262,13 +277,17 @@ public class AtlasClientTest {
when(response.getEntity(String.class)).thenReturn("{\"Status\":\"BECOMING_ACTIVE\"}");
ClientResponse nextResponse = mock(ClientResponse.class);
when(nextResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(nextResponse.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}");
String activeStatus = "{\"Status\":\"ACTIVE\"}";
when(response.getEntity(String.class)).thenReturn(activeStatus);
when(response.getLength()).thenReturn(activeStatus.length());
when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).
thenThrow(new ClientHandlerException("Simulating connection exception")).
thenReturn(response).
thenReturn(nextResponse);
AtlasClient atlasClient = new AtlasClient(service, configuration);
atlasClient.setService(service);
atlasClient.setConfiguration(configuration);
String serviceURL = atlasClient.determineActiveServiceURL(
new String[] {"http://localhost:31000","http://localhost:41000"},
......@@ -313,7 +332,9 @@ public class AtlasClientTest {
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(response.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}");
String activeStatus = "{\"Status\":\"ACTIVE\"}";
when(response.getEntity(String.class)).thenReturn(activeStatus);
when(response.getLength()).thenReturn(activeStatus.length());
when(builder.method(AtlasClient.API.LIST_TYPES.getMethod(), ClientResponse.class, null)).
thenThrow(new ClientHandlerException("simulating exception in calling API", new ConnectException())).
......@@ -323,6 +344,9 @@ public class AtlasClientTest {
AtlasClient atlasClient = getClientForTest("http://localhost:31000","http://localhost:41000");
atlasClient.setService(service);
atlasClient.setConfiguration(configuration);
atlasClient.callAPIWithRetries(AtlasClient.API.LIST_TYPES, null, resourceCreator);
verify(client).destroy();
......@@ -343,7 +367,9 @@ public class AtlasClientTest {
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(response.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}");
String activeStatus = "{\"Status\":\"ACTIVE\"}";
when(response.getEntity(String.class)).thenReturn(activeStatus);
when(response.getLength()).thenReturn(activeStatus.length());
when(builder.method(AtlasClient.API.LIST_TYPES.getMethod(), ClientResponse.class, null)).
thenThrow(new ClientHandlerException("simulating exception in calling API", new ConnectException())).
......@@ -354,6 +380,9 @@ public class AtlasClientTest {
AtlasClient atlasClient = getClientForTest("http://localhost:31000");
atlasClient.setService(resourceObject);
atlasClient.setConfiguration(configuration);
atlasClient.callAPIWithRetries(AtlasClient.API.LIST_TYPES, null, resourceCreator);
verify(client).destroy();
......@@ -379,7 +408,9 @@ public class AtlasClientTest {
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(response.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}");
String activeStatus = "{\"Status\":\"ACTIVE\"}";
when(response.getEntity(String.class)).thenReturn(activeStatus);
when(response.getLength()).thenReturn(activeStatus.length());
when(builder.method(AtlasClient.API.LIST_TYPES.getMethod(), ClientResponse.class, null)).
thenThrow(new ClientHandlerException("simulating exception in calling API", new ConnectException())).
......@@ -389,9 +420,12 @@ public class AtlasClientTest {
when(resourceCreator.createResource()).thenReturn(resourceObject);
AtlasClient atlasClient = getClientForTest("http://localhost:31000","http://localhost:41000");
atlasClient.setService(resourceObject);
atlasClient.setConfiguration(configuration);
atlasClient.callAPIWithRetries(AtlasClient.API.LIST_TYPES, null, resourceCreator);
verify(client).destroy();
verify(client).resource(UriBuilder.fromUri("http://localhost:31000").build());
verify(client).resource(UriBuilder.fromUri("http://localhost:41000").build());
......@@ -399,8 +433,9 @@ public class AtlasClientTest {
private WebResource.Builder getBuilder(WebResource resourceObject) {
WebResource.Builder builder = mock(WebResource.Builder.class);
when(resourceObject.accept(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder);
when(builder.type(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder);
when(resourceObject.path(anyString())).thenReturn(resourceObject);
when(resourceObject.accept(AtlasBaseClient.JSON_MEDIA_TYPE)).thenReturn(builder);
when(builder.type(AtlasBaseClient.JSON_MEDIA_TYPE)).thenReturn(builder);
return builder;
}
......@@ -413,7 +448,7 @@ public class AtlasClientTest {
}
private AtlasClient getClientForTest(final String... baseUrls) {
return new AtlasClient(null, null, baseUrls) {
return new AtlasClient((UserGroupInformation)null, (String)null, baseUrls) {
boolean firstCall = true;
@Override
protected String determineActiveServiceURL(String[] baseUrls, Client client) {
......
......@@ -19,6 +19,7 @@
package org.apache.atlas.web.resources;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.services.MetadataService;
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.resources;
import com.google.common.collect.ImmutableSet;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.AtlasTypedefClientV2;
import org.apache.atlas.model.SearchFilter;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasEnumDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.typesystem.types.DataTypes;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.ws.rs.core.Response;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality;
import static org.apache.atlas.type.AtlasTypeUtil.createClassTypeDef;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
/**
* Integration test for types jersey resource.
*/
public class TypedefsJerseyResourceIT extends BaseResourceIT {
private AtlasTypesDef typeDefinitions;
private AtlasTypedefClientV2 clientV2;
@BeforeClass
public void setUp() throws Exception {
super.setUp();
typeDefinitions = createHiveTypes();
if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
clientV2 = new AtlasTypedefClientV2(atlasUrls, new String[]{"admin", "admin"});
} else {
clientV2 = new AtlasTypedefClientV2(atlasUrls);
}
}
@AfterClass
public void tearDown() throws Exception {
emptyTypeDefs(typeDefinitions);
}
@Test
public void testCreate() throws Exception {
AtlasTypesDef atlasTypeDefs = clientV2.createAtlasTypeDefs(typeDefinitions);
Assert.assertNotNull(atlasTypeDefs);
assertFalse(atlasTypeDefs.isEmpty());
}
@Test
public void testDuplicateCreate() throws Exception {
AtlasEntityDef type = createClassTypeDef(randomString(),
ImmutableSet.<String>of(), AtlasTypeUtil.createUniqueRequiredAttrDef("name", "string"));
AtlasTypesDef typesDef = new AtlasTypesDef();
typesDef.getEntityDefs().add(type);
AtlasTypesDef created = clientV2.createAtlasTypeDefs(typesDef);
assertNotNull(created);
try {
created = clientV2.createAtlasTypeDefs(typesDef);
fail("Expected 409");
} catch (AtlasServiceException e) {
assertEquals(e.getStatus().getStatusCode(), Response.Status.CONFLICT.getStatusCode());
}
}
@Test
public void testUpdate() throws Exception {
AtlasEntityDef typeDefinition =
createClassTypeDef(randomString(), ImmutableSet.<String>of(),
AtlasTypeUtil.createUniqueRequiredAttrDef("name", "string"));
AtlasTypesDef atlasTypesDef = new AtlasTypesDef();
atlasTypesDef.getEntityDefs().add(typeDefinition);
AtlasTypesDef createdTypeDefs = clientV2.createAtlasTypeDefs(atlasTypesDef);
assertNotNull(createdTypeDefs);
assertEquals(createdTypeDefs.getEntityDefs().size(), atlasTypesDef.getEntityDefs().size());
//Add attribute description
typeDefinition = createClassTypeDef(typeDefinition.getName(),
ImmutableSet.<String>of(),
AtlasTypeUtil.createUniqueRequiredAttrDef("name", "string"),
AtlasTypeUtil.createOptionalAttrDef("description", "string"));
emptyTypeDefs(atlasTypesDef);
atlasTypesDef.getEntityDefs().add(typeDefinition);
AtlasTypesDef updatedTypeDefs = clientV2.updateAtlasTypeDefs(atlasTypesDef);
assertNotNull(updatedTypeDefs);
assertEquals(updatedTypeDefs.getEntityDefs().size(), atlasTypesDef.getEntityDefs().size());
assertEquals(updatedTypeDefs.getEntityDefs().get(0).getName(), atlasTypesDef.getEntityDefs().get(0).getName());
Map<String, String> filterParams = new HashMap<>();
filterParams.put(SearchFilter.PARAM_TYPE, "entity");
AtlasTypesDef allTypeDefs = clientV2.getAllTypeDefs(new SearchFilter(filterParams));
assertNotNull(allTypeDefs);
assertEquals(allTypeDefs.getEntityDefs().get(0).getAttributeDefs().size(), 2);
}
@Test(dependsOnMethods = "testCreate")
public void testGetDefinition() throws Exception {
if (CollectionUtils.isNotEmpty(typeDefinitions.getEnumDefs())) {
for (AtlasEnumDef atlasEnumDef : typeDefinitions.getEnumDefs()) {
verifyByNameAndGUID(atlasEnumDef);
}
}
if (CollectionUtils.isNotEmpty(typeDefinitions.getStructDefs())) {
for (AtlasStructDef structDef : typeDefinitions.getStructDefs()) {
verifyByNameAndGUID(structDef);
}
}
if (CollectionUtils.isNotEmpty(typeDefinitions.getClassificationDefs())) {
for (AtlasClassificationDef classificationDef : typeDefinitions.getClassificationDefs()) {
verifyByNameAndGUID(classificationDef);
}
}
if (CollectionUtils.isNotEmpty(typeDefinitions.getEntityDefs())) {
for (AtlasEntityDef entityDef : typeDefinitions.getEntityDefs()) {
verifyByNameAndGUID(entityDef);
}
}
}
@Test
public void testInvalidGets() throws Exception {
try {
AtlasEnumDef byName = clientV2.getEnumByName("blah");
fail("Get for invalid name should have reported a failure");
} catch (AtlasServiceException e) {
assertEquals(e.getStatus().getStatusCode(), Response.Status.NOT_FOUND.getStatusCode(),
"Should've returned a 404");
}
try {
AtlasEnumDef byGuid = clientV2.getEnumByGuid("blah");
fail("Get for invalid name should have reported a failure");
} catch (AtlasServiceException e) {
assertEquals(e.getStatus().getStatusCode(), Response.Status.NOT_FOUND.getStatusCode(),
"Should've returned a 404");
}
try {
AtlasStructDef byName = clientV2.getStructByName("blah");
fail("Get for invalid name should have reported a failure");
} catch (AtlasServiceException e) {
assertEquals(e.getStatus().getStatusCode(), Response.Status.NOT_FOUND.getStatusCode(),
"Should've returned a 404");
}
try {
AtlasStructDef byGuid = clientV2.getStructByGuid("blah");
fail("Get for invalid name should have reported a failure");
} catch (AtlasServiceException e) {
assertEquals(e.getStatus().getStatusCode(), Response.Status.NOT_FOUND.getStatusCode(),
"Should've returned a 404");
}
try {
AtlasClassificationDef byName = clientV2.getClassificationByName("blah");
fail("Get for invalid name should have reported a failure");
} catch (AtlasServiceException e) {
assertEquals(e.getStatus().getStatusCode(), Response.Status.NOT_FOUND.getStatusCode(),
"Should've returned a 404");
}
try {
AtlasClassificationDef byGuid = clientV2.getClassificationByGuid("blah");
fail("Get for invalid name should have reported a failure");
} catch (AtlasServiceException e) {
assertEquals(e.getStatus().getStatusCode(), Response.Status.NOT_FOUND.getStatusCode(),
"Should've returned a 404");
}
try {
AtlasEntityDef byName = clientV2.getEntityByName("blah");
fail("Get for invalid name should have reported a failure");
} catch (AtlasServiceException e) {
assertEquals(e.getStatus().getStatusCode(), Response.Status.NOT_FOUND.getStatusCode(),
"Should've returned a 404");
}
try {
AtlasEntityDef byGuid = clientV2.getEntityByGuid("blah");
fail("Get for invalid name should have reported a failure");
} catch (AtlasServiceException e) {
assertEquals(e.getStatus().getStatusCode(), Response.Status.NOT_FOUND.getStatusCode(),
"Should've returned a 404");
}
}
@Test
public void testListTypesByFilter() throws Exception {
AtlasStructDef.AtlasAttributeDef attr = AtlasTypeUtil.createOptionalAttrDef("attr", "string");
AtlasEntityDef classDefA = AtlasTypeUtil.createClassTypeDef("A" + randomString(), ImmutableSet.<String>of(), attr);
AtlasEntityDef classDefA1 = AtlasTypeUtil.createClassTypeDef("A1" + randomString(), ImmutableSet.of(classDefA.getName()), attr);
AtlasEntityDef classDefB = AtlasTypeUtil.createClassTypeDef("B" + randomString(), ImmutableSet.<String>of(), attr);
AtlasEntityDef classDefC = AtlasTypeUtil.createClassTypeDef("C" + randomString(), ImmutableSet.of(classDefB.getName(), classDefA.getName()), attr);
AtlasTypesDef atlasTypesDef = new AtlasTypesDef();
atlasTypesDef.getEntityDefs().add(classDefA);
atlasTypesDef.getEntityDefs().add(classDefA1);
atlasTypesDef.getEntityDefs().add(classDefB);
atlasTypesDef.getEntityDefs().add(classDefC);
AtlasTypesDef created = clientV2.createAtlasTypeDefs(atlasTypesDef);
assertNotNull(created);
assertEquals(created.getEntityDefs().size(), atlasTypesDef.getEntityDefs().size());
Map<String, String> searchParams = new HashMap<>();
searchParams.put(SearchFilter.PARAM_TYPE, "CLASS");
searchParams.put(SearchFilter.PARAM_SUPERTYPE, classDefA.getName());
SearchFilter searchFilter = new SearchFilter(searchParams);
AtlasTypesDef searchDefs = clientV2.getAllTypeDefs(searchFilter);
assertNotNull(searchDefs);
assertEquals(searchDefs.getEntityDefs().size(), 2);
searchParams.put(SearchFilter.PARAM_NOT_SUPERTYPE, classDefB.getName());
searchFilter = new SearchFilter(searchParams);
searchDefs = clientV2.getAllTypeDefs(searchFilter);
assertNotNull(searchDefs);
assertEquals(searchDefs.getEntityDefs().size(), 1);
}
private AtlasTypesDef createHiveTypes() throws Exception {
AtlasTypesDef atlasTypesDef = new AtlasTypesDef();
AtlasEntityDef databaseTypeDefinition =
createClassTypeDef("database", ImmutableSet.<String>of(),
AtlasTypeUtil.createUniqueRequiredAttrDef("name", "string"),
AtlasTypeUtil.createRequiredAttrDef("description", "string"));
atlasTypesDef.getEntityDefs().add(databaseTypeDefinition);
AtlasEntityDef tableTypeDefinition =
createClassTypeDef("table", ImmutableSet.<String>of(),
AtlasTypeUtil.createUniqueRequiredAttrDef("name", "string"),
AtlasTypeUtil.createRequiredAttrDef("description", "string"),
AtlasTypeUtil.createOptionalAttrDef("columnNames", DataTypes.arrayTypeName("string")),
AtlasTypeUtil.createOptionalAttrDef("created", "date"),
AtlasTypeUtil.createOptionalAttrDef("parameters",
DataTypes.mapTypeName("string", "string")),
AtlasTypeUtil.createRequiredAttrDef("type", "string"),
new AtlasAttributeDef("database", "database",
false,
Cardinality.SINGLE, 1, 1,
true, true,
Collections.<AtlasStructDef.AtlasConstraintDef>emptyList()));
atlasTypesDef.getEntityDefs().add(tableTypeDefinition);
AtlasClassificationDef fetlTypeDefinition = AtlasTypeUtil
.createTraitTypeDef("fetl", ImmutableSet.<String>of(),
AtlasTypeUtil.createRequiredAttrDef("level", "int"));
atlasTypesDef.getClassificationDefs().add(fetlTypeDefinition);
return atlasTypesDef;
}
private void verifyByNameAndGUID(AtlasBaseTypeDef typeDef) {
try {
AtlasBaseTypeDef byName = null;
if (typeDef.getCategory() == TypeCategory.ENUM) {
byName = clientV2.getEnumByName(typeDef.getName());
} else if (typeDef.getCategory() == TypeCategory.ENTITY) {
byName = clientV2.getEntityByName(typeDef.getName());
} else if (typeDef.getCategory() == TypeCategory.CLASSIFICATION) {
byName = clientV2.getClassificationByName(typeDef.getName());
} else if (typeDef.getCategory() == TypeCategory.STRUCT) {
byName = clientV2.getStructByName(typeDef.getName());
}
assertNotNull(byName);
} catch (AtlasServiceException e) {
fail("Get byName should've succeeded", e);
}
if (StringUtils.isNotBlank(typeDef.getGuid())) {
try {
AtlasBaseTypeDef byGuid = null;
if (typeDef.getCategory() == TypeCategory.ENUM) {
byGuid = clientV2.getEnumByGuid(typeDef.getGuid());
} else if (typeDef.getCategory() == TypeCategory.ENTITY) {
byGuid = clientV2.getEntityByGuid(typeDef.getGuid());
} else if (typeDef.getCategory() == TypeCategory.CLASSIFICATION) {
byGuid = clientV2.getClassificationByGuid(typeDef.getName());
} else if (typeDef.getCategory() == TypeCategory.STRUCT) {
byGuid = clientV2.getStructByGuid(typeDef.getGuid());
}
assertNotNull(byGuid);
} catch (AtlasServiceException e) {
fail("Get byGuid should've succeeded", e);
}
}
}
private void emptyTypeDefs(AtlasTypesDef def) {
def.getEnumDefs().clear();
def.getStructDefs().clear();
def.getClassificationDefs().clear();
def.getEntityDefs().clear();
}
}
......@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.typesystem.TypesDef;
......
......@@ -20,7 +20,6 @@ package org.apache.atlas.web.security;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.web.TestUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
......@@ -61,7 +60,7 @@ public class NegativeSSLAndKerberosTest extends BaseSSLAndKerberosTest {
// client will actually only leverage subset of these properties
final PropertiesConfiguration configuration = getSSLConfiguration(providerUrl);
persistSSLClientConfiguration((org.apache.commons.configuration.Configuration) configuration);
persistSSLClientConfiguration(configuration);
TestUtils.writeConfiguration(configuration, persistDir + File.separator +
ApplicationProperties.APPLICATION_PROPERTIES);
......@@ -103,12 +102,7 @@ public class NegativeSSLAndKerberosTest extends BaseSSLAndKerberosTest {
originalConf = System.getProperty("atlas.conf");
System.setProperty("atlas.conf", persistDir);
dgiClient = new AtlasClient(DGI_URL) {
@Override
protected PropertiesConfiguration getClientProperties() {
return configuration;
}
};
dgiClient = new AtlasClient(configuration, DGI_URL);
secureEmbeddedServer = new TestSecureEmbeddedServer(21443, getWarPath()) {
......
......@@ -20,8 +20,8 @@ package org.apache.atlas.web.security;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.web.TestUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
......@@ -30,6 +30,12 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.security.PrivilegedExceptionAction;
import javax.security.auth.Subject;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
......@@ -38,11 +44,6 @@ import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.security.PrivilegedExceptionAction;
import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED;
......@@ -120,12 +121,7 @@ public class SSLAndKerberosTest extends BaseSSLAndKerberosTest {
dgiCLient = proxyUser.doAs(new PrivilegedExceptionAction<AtlasClient>() {
@Override
public AtlasClient run() throws Exception {
return new AtlasClient(DGI_URL) {
@Override
protected PropertiesConfiguration getClientProperties() {
return configuration;
}
};
return new AtlasClient(configuration, DGI_URL);
}
});
......
......@@ -74,21 +74,16 @@ public class SSLTest extends BaseSSLAndKerberosTest {
setupCredentials();
final PropertiesConfiguration configuration = getSSLConfiguration(providerUrl);
String persistDir = writeConfiguration(configuration);
persistSSLClientConfiguration((org.apache.commons.configuration.Configuration) configuration);
persistSSLClientConfiguration(configuration);
originalConf = System.getProperty("atlas.conf");
System.setProperty("atlas.conf", persistDir);
atlasClient = new AtlasClient(new String[]{DGI_URL},new String[]{"admin","admin"}) {
@Override
protected PropertiesConfiguration getClientProperties() {
return configuration;
}
};
atlasClient = new AtlasClient(configuration, new String[]{DGI_URL},new String[]{"admin","admin"});
secureEmbeddedServer = new TestSecureEmbeddedServer(21443, getWarPath()) {
@Override
public PropertiesConfiguration getConfiguration() {
public org.apache.commons.configuration.Configuration getConfiguration() {
return configuration;
}
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment