Commit 98f4d40a by Suma Shivaprasad

ATLAS-571 Modify Atlas client for necessary changes in context of HA (yhemanth via sumasai)

parent c1d4e7c9
...@@ -96,7 +96,15 @@ public class HiveMetaStoreBridge { ...@@ -96,7 +96,15 @@ public class HiveMetaStoreBridge {
UserGroupInformation ugi) throws Exception { UserGroupInformation ugi) throws Exception {
this(hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME), this(hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME),
Hive.get(hiveConf), Hive.get(hiveConf),
new AtlasClient(atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL), ugi, doAsUser)); atlasConf, doAsUser, ugi);
}
HiveMetaStoreBridge(String clusterName, Hive hiveClient,
Configuration atlasConf, String doAsUser, UserGroupInformation ugi) {
this.clusterName = clusterName;
this.hiveClient = hiveClient;
String baseUrls = atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL);
this.atlasClient = new AtlasClient(ugi, doAsUser, baseUrls.split(","));
} }
HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient) { HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient) {
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.atlas; package org.apache.atlas;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException; import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.ClientResponse;
...@@ -42,6 +43,8 @@ import javax.ws.rs.HttpMethod; ...@@ -42,6 +43,8 @@ import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder; import javax.ws.rs.core.UriBuilder;
import java.io.IOException;
import java.net.ConnectException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
...@@ -91,23 +94,66 @@ public class AtlasClient { ...@@ -91,23 +94,66 @@ public class AtlasClient {
public static final String JSON_MEDIA_TYPE = MediaType.APPLICATION_JSON + "; charset=UTF-8"; public static final String JSON_MEDIA_TYPE = MediaType.APPLICATION_JSON + "; charset=UTF-8";
public static final String UNKNOWN_STATUS = "Unknown status"; public static final String UNKNOWN_STATUS = "Unknown status";
private WebResource service; public static final String ATLAS_CLIENT_HA_RETRIES_KEY = "atlas.client.ha.retries";
// Setting the default value based on testing failovers while client code like quickstart is running.
public static final int DEFAULT_NUM_RETRIES = 4;
public static final String ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms";
// Setting the default value based on testing failovers while client code like quickstart is running.
// With number of retries, this gives a total time of about 20s for the server to start.
public static final int DEFAULT_SLEEP_BETWEEN_RETRIES_MS = 5000;
protected AtlasClient() { private WebResource service;
//do nothing. For LocalAtlasClient private AtlasClientContext atlasClientContext;
} private Configuration configuration;
/**
* Create a new AtlasClient.
*
* @param baseUrl The URL of the Atlas server to connect to.
*/
public AtlasClient(String baseUrl) { public AtlasClient(String baseUrl) {
this(baseUrl, null, null); this(baseUrl, null, null);
} }
/**
* Create a new Atlas Client.
* @param baseUrl The URL of the Atlas server to connect to.
* @param ugi The {@link UserGroupInformation} of logged in user.
* @param doAsUser The user on whose behalf queries will be executed.
*/
public AtlasClient(String baseUrl, UserGroupInformation ugi, String doAsUser) { public AtlasClient(String baseUrl, UserGroupInformation ugi, String doAsUser) {
initializeState(new String[] {baseUrl}, ugi, doAsUser);
}
/**
* Create a new Atlas client.
* @param ugi The {@link UserGroupInformation} of logged in user, can be null in unsecure mode.
* @param doAsUser The user on whose behalf queries will be executed, can be null in unsecure mode.
* @param baseUrls A list of URLs that point to an ensemble of Atlas servers working in
* High Availability mode. The client will automatically determine the
* active instance on startup and also when there is a scenario of
* failover.
*/
public AtlasClient(UserGroupInformation ugi, String doAsUser, String... baseUrls) {
initializeState(baseUrls, ugi, doAsUser);
}
private void initializeState(String[] baseUrls, UserGroupInformation ugi, String doAsUser) {
configuration = getClientProperties();
Client client = getClient(configuration, ugi, doAsUser);
String activeServiceUrl = determineActiveServiceURL(baseUrls, client);
atlasClientContext = new AtlasClientContext(baseUrls, client, ugi, doAsUser);
service = client.resource(UriBuilder.fromUri(activeServiceUrl).build());
}
@VisibleForTesting
protected Client getClient(Configuration configuration, UserGroupInformation ugi, String doAsUser) {
DefaultClientConfig config = new DefaultClientConfig(); DefaultClientConfig config = new DefaultClientConfig();
Configuration clientConfig = null; Configuration clientConfig = null;
int readTimeout = 60000; int readTimeout = 60000;
int connectTimeout = 60000; int connectTimeout = 60000;
try { try {
clientConfig = getClientProperties(); clientConfig = configuration;
if (clientConfig.getBoolean(TLS_ENABLED, false)) { if (clientConfig.getBoolean(TLS_ENABLED, false)) {
// create an SSL properties configuration if one doesn't exist. SSLFactory expects a file, so forced // create an SSL properties configuration if one doesn't exist. SSLFactory expects a file, so forced
// to create a // to create a
...@@ -124,26 +170,109 @@ public class AtlasClient { ...@@ -124,26 +170,109 @@ public class AtlasClient {
SecureClientUtils.getClientConnectionHandler(config, clientConfig, doAsUser, ugi); SecureClientUtils.getClientConnectionHandler(config, clientConfig, doAsUser, ugi);
Client client = new Client(handler, config); Client client = new Client(handler, config);
client.resource(UriBuilder.fromUri(baseUrl).build());
client.setReadTimeout(readTimeout); client.setReadTimeout(readTimeout);
client.setConnectTimeout(connectTimeout); client.setConnectTimeout(connectTimeout);
return client;
}
service = client.resource(UriBuilder.fromUri(baseUrl).build()); @VisibleForTesting
protected String determineActiveServiceURL(String[] baseUrls, Client client) {
if (baseUrls.length == 0) {
throw new IllegalArgumentException("Base URLs cannot be null or empty");
}
String baseUrl;
AtlasServerEnsemble atlasServerEnsemble = new AtlasServerEnsemble(baseUrls);
if (atlasServerEnsemble.hasSingleInstance()) {
baseUrl = atlasServerEnsemble.firstURL();
LOG.info("Client has only one service URL, will use that for all actions: {}", baseUrl);
return baseUrl;
} else {
try {
baseUrl = selectActiveServerAddress(client, atlasServerEnsemble);
} catch (AtlasServiceException e) {
LOG.error("None of the passed URLs are active: {}", atlasServerEnsemble, e);
throw new IllegalArgumentException("None of the passed URLs are active " + atlasServerEnsemble, e);
}
}
return baseUrl;
}
private String selectActiveServerAddress(Client client, AtlasServerEnsemble serverEnsemble)
throws AtlasServiceException {
List<String> serverInstances = serverEnsemble.getMembers();
String activeServerAddress = null;
for (String serverInstance : serverInstances) {
LOG.info("Trying with address {}", serverInstance);
activeServerAddress = getAddressIfActive(client, serverInstance);
if (activeServerAddress != null) {
LOG.info("Found service {} as active service.", serverInstance);
break;
}
}
if (activeServerAddress != null)
return activeServerAddress;
else
throw new AtlasServiceException(API.STATUS, new RuntimeException("Could not find any active instance"));
} }
// for testing private String getAddressIfActive(Client client, String serverInstance) {
AtlasClient(WebResource service) { String activeServerAddress = null;
for (int i = 0; i < getNumberOfRetries(); i++) {
try {
WebResource service = client.resource(UriBuilder.fromUri(serverInstance).build());
String adminStatus = getAdminStatus(service);
if (adminStatus.equals("ACTIVE")) {
activeServerAddress = serverInstance;
break;
} else {
LOG.info("Service {} is not active.. will retry.", serverInstance);
}
} catch (Exception e) {
LOG.error("Could not get status from service {} after {} tries.", serverInstance, i, e);
}
sleepBetweenRetries();
LOG.warn("Service {} is not active.", serverInstance);
}
return activeServerAddress;
}
private void sleepBetweenRetries(){
try {
Thread.sleep(getSleepBetweenRetriesMs());
} catch (InterruptedException e) {
LOG.error("Interrupted from sleeping between retries.", e);
}
}
private int getSleepBetweenRetriesMs() {
return configuration.getInt(ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY, DEFAULT_SLEEP_BETWEEN_RETRIES_MS);
}
private int getNumberOfRetries() {
return configuration.getInt(ATLAS_CLIENT_HA_RETRIES_KEY, DEFAULT_NUM_RETRIES);
}
@VisibleForTesting
AtlasClient(WebResource service, Configuration configuration) {
this.service = service; this.service = service;
this.configuration = configuration;
} }
protected Configuration getClientProperties() throws AtlasException { protected Configuration getClientProperties() {
return ApplicationProperties.get(); try {
if (configuration == null) {
configuration = ApplicationProperties.get();
}
} catch (AtlasException e) {
LOG.error("Exception while loading configuration.", e);
}
return configuration;
} }
public boolean isServerReady() throws AtlasServiceException { public boolean isServerReady() throws AtlasServiceException {
WebResource resource = getResource(API.VERSION); WebResource resource = getResource(API.VERSION);
try { try {
callAPIWithResource(API.VERSION, resource); callAPIWithResource(API.VERSION, resource, null);
return true; return true;
} catch (ClientHandlerException che) { } catch (ClientHandlerException che) {
return false; return false;
...@@ -164,9 +293,31 @@ public class AtlasClient { ...@@ -164,9 +293,31 @@ public class AtlasClient {
* @throws AtlasServiceException if there is a HTTP error. * @throws AtlasServiceException if there is a HTTP error.
*/ */
public String getAdminStatus() throws AtlasServiceException { public String getAdminStatus() throws AtlasServiceException {
return getAdminStatus(service);
}
private void handleClientHandlerException(ClientHandlerException che) {
if (isRetryableException(che)) {
atlasClientContext.getClient().destroy();
LOG.warn("Destroyed current context while handling ClientHandlerEception.");
LOG.warn("Will retry and create new context.");
sleepBetweenRetries();
initializeState(atlasClientContext.getBaseUrls(), atlasClientContext.getUgi(),
atlasClientContext.getDoAsUser());
return;
}
throw che;
}
private boolean isRetryableException(ClientHandlerException che) {
return che.getCause().getClass().equals(IOException.class)
|| che.getCause().getClass().equals(ConnectException.class);
}
private String getAdminStatus(WebResource service) throws AtlasServiceException {
String result = UNKNOWN_STATUS; String result = UNKNOWN_STATUS;
WebResource resource = getResource(API.STATUS); WebResource resource = getResource(service, API.STATUS);
JSONObject response = callAPIWithResource(API.STATUS, resource); JSONObject response = callAPIWithResource(API.STATUS, resource, null);
try { try {
result = response.getString("Status"); result = response.getString("Status");
} catch (JSONException e) { } catch (JSONException e) {
...@@ -282,9 +433,8 @@ public class AtlasClient { ...@@ -282,9 +433,8 @@ public class AtlasClient {
} }
public String getType(String typeName) throws AtlasServiceException { public String getType(String typeName) throws AtlasServiceException {
WebResource resource = getResource(API.GET_TYPE, typeName);
try { try {
JSONObject response = callAPIWithResource(API.GET_TYPE, resource); JSONObject response = callAPI(API.GET_TYPE, null, typeName);;
return response.getString(DEFINITION); return response.getString(DEFINITION);
} catch (AtlasServiceException e) { } catch (AtlasServiceException e) {
if (Response.Status.NOT_FOUND.equals(e.getStatus())) { if (Response.Status.NOT_FOUND.equals(e.getStatus())) {
...@@ -366,11 +516,37 @@ public class AtlasClient { ...@@ -366,11 +516,37 @@ public class AtlasClient {
* @param attribute property key * @param attribute property key
* @param value property value * @param value property value
*/ */
public void updateEntityAttribute(String guid, String attribute, String value) throws AtlasServiceException { public void updateEntityAttribute(final String guid, final String attribute, String value) throws AtlasServiceException {
API api = API.UPDATE_ENTITY_PARTIAL; callAPIWithRetries(API.UPDATE_ENTITY_PARTIAL, value, new ResourceCreator() {
WebResource resource = getResource(api, guid); @Override
resource = resource.queryParam(ATTRIBUTE_NAME, attribute); public WebResource createResource() {
callAPIWithResource(api, resource, value); API api = API.UPDATE_ENTITY_PARTIAL;
WebResource resource = getResource(api, guid);
resource = resource.queryParam(ATTRIBUTE_NAME, attribute);
return resource;
}
});
}
@VisibleForTesting
JSONObject callAPIWithRetries(API api, Object requestObject, ResourceCreator resourceCreator)
throws AtlasServiceException {
for (int i = 0; i < getNumberOfRetries(); i++) {
WebResource resource = resourceCreator.createResource();
try {
LOG.info("using resource {} for {} times", resource.getURI(), i);
JSONObject result = callAPIWithResource(api, resource, requestObject);
return result;
} catch (ClientHandlerException che) {
if (i==(getNumberOfRetries()-1)) {
throw che;
}
LOG.warn("Handled exception in calling api {}", api.getPath(), che);
LOG.warn("Exception's cause: {}", che.getCause().getClass());
handleClientHandlerException(che);
}
}
throw new AtlasServiceException(api, new RuntimeException("Could not get response after retries."));
} }
/** /**
...@@ -392,15 +568,20 @@ public class AtlasClient { ...@@ -392,15 +568,20 @@ public class AtlasClient {
* @param uniqueAttributeValue Attribute Value that uniquely identifies the entity * @param uniqueAttributeValue Attribute Value that uniquely identifies the entity
* @param entity entity definition * @param entity entity definition
*/ */
public String updateEntity(String entityType, String uniqueAttributeName, String uniqueAttributeValue, public String updateEntity(final String entityType, final String uniqueAttributeName, final String uniqueAttributeValue,
Referenceable entity) throws AtlasServiceException { Referenceable entity) throws AtlasServiceException {
API api = API.UPDATE_ENTITY_PARTIAL; final API api = API.UPDATE_ENTITY_PARTIAL;
WebResource resource = getResource(api, "qualifiedName");
resource = resource.queryParam(TYPE, entityType);
resource = resource.queryParam(ATTRIBUTE_NAME, uniqueAttributeName);
resource = resource.queryParam(ATTRIBUTE_VALUE, uniqueAttributeValue);
String entityJson = InstanceSerialization.toJson(entity, true); String entityJson = InstanceSerialization.toJson(entity, true);
JSONObject response = callAPIWithResource(api, resource, entityJson); JSONObject response = callAPIWithRetries(api, entityJson, new ResourceCreator() {
@Override
public WebResource createResource() {
WebResource resource = getResource(api, "qualifiedName");
resource = resource.queryParam(TYPE, entityType);
resource = resource.queryParam(ATTRIBUTE_NAME, uniqueAttributeName);
resource = resource.queryParam(ATTRIBUTE_VALUE, uniqueAttributeValue);
return resource;
}
});
try { try {
return response.getString(GUID); return response.getString(GUID);
} catch (JSONException e) { } catch (JSONException e) {
...@@ -415,13 +596,18 @@ public class AtlasClient { ...@@ -415,13 +596,18 @@ public class AtlasClient {
* @return List of deleted entity guids * @return List of deleted entity guids
* @throws AtlasServiceException * @throws AtlasServiceException
*/ */
public List<String> deleteEntities(String ... guids) throws AtlasServiceException { public List<String> deleteEntities(final String ... guids) throws AtlasServiceException {
API api = API.DELETE_ENTITIES; JSONObject jsonResponse = callAPIWithRetries(API.DELETE_ENTITIES, null, new ResourceCreator() {
WebResource resource = getResource(api); @Override
for (String guid : guids) { public WebResource createResource() {
resource = resource.queryParam(GUID.toLowerCase(), guid); API api = API.DELETE_ENTITIES;
} WebResource resource = getResource(api);
JSONObject jsonResponse = callAPIWithResource(API.DELETE_ENTITIES, resource); for (String guid : guids) {
resource = resource.queryParam(GUID.toLowerCase(), guid);
}
return resource;
}
});
return extractResults(jsonResponse, GUID); return extractResults(jsonResponse, GUID);
} }
...@@ -457,12 +643,18 @@ public class AtlasClient { ...@@ -457,12 +643,18 @@ public class AtlasClient {
* @return result object * @return result object
* @throws AtlasServiceException * @throws AtlasServiceException
*/ */
public Referenceable getEntity(String entityType, String attribute, String value) throws AtlasServiceException { public Referenceable getEntity(final String entityType, final String attribute, final String value)
WebResource resource = getResource(API.GET_ENTITY); throws AtlasServiceException {
resource = resource.queryParam(TYPE, entityType); JSONObject jsonResponse = callAPIWithRetries(API.GET_ENTITY, null, new ResourceCreator() {
resource = resource.queryParam(ATTRIBUTE_NAME, attribute); @Override
resource = resource.queryParam(ATTRIBUTE_VALUE, value); public WebResource createResource() {
JSONObject jsonResponse = callAPIWithResource(API.GET_ENTITY, resource); WebResource resource = getResource(API.GET_ENTITY);
resource = resource.queryParam(TYPE, entityType);
resource = resource.queryParam(ATTRIBUTE_NAME, attribute);
resource = resource.queryParam(ATTRIBUTE_VALUE, value);
return resource;
}
});
try { try {
String entityInstanceDefinition = jsonResponse.getString(AtlasClient.DEFINITION); String entityInstanceDefinition = jsonResponse.getString(AtlasClient.DEFINITION);
return InstanceSerialization.fromJsonReferenceable(entityInstanceDefinition, true); return InstanceSerialization.fromJsonReferenceable(entityInstanceDefinition, true);
...@@ -477,10 +669,15 @@ public class AtlasClient { ...@@ -477,10 +669,15 @@ public class AtlasClient {
* @return * @return
* @throws AtlasServiceException * @throws AtlasServiceException
*/ */
public List<String> listEntities(String entityType) throws AtlasServiceException { public List<String> listEntities(final String entityType) throws AtlasServiceException {
WebResource resource = getResource(API.LIST_ENTITIES); JSONObject jsonResponse = callAPIWithRetries(API.LIST_ENTITIES, null, new ResourceCreator() {
resource = resource.queryParam(TYPE, entityType); @Override
JSONObject jsonResponse = callAPIWithResource(API.LIST_ENTITIES, resource); public WebResource createResource() {
WebResource resource = getResource(API.LIST_ENTITIES);
resource = resource.queryParam(TYPE, entityType);
return resource;
}
});
return extractResults(jsonResponse, AtlasClient.RESULTS); return extractResults(jsonResponse, AtlasClient.RESULTS);
} }
...@@ -508,10 +705,15 @@ public class AtlasClient { ...@@ -508,10 +705,15 @@ public class AtlasClient {
* @return * @return
* @throws AtlasServiceException * @throws AtlasServiceException
*/ */
public JSONArray search(String searchQuery) throws AtlasServiceException { public JSONArray search(final String searchQuery) throws AtlasServiceException {
WebResource resource = getResource(API.SEARCH); JSONObject result = callAPIWithRetries(API.SEARCH, null, new ResourceCreator() {
resource = resource.queryParam(QUERY, searchQuery); @Override
JSONObject result = callAPIWithResource(API.SEARCH, resource); public WebResource createResource() {
WebResource resource = getResource(API.SEARCH);
resource = resource.queryParam(QUERY, searchQuery);
return resource;
}
});
try { try {
return result.getJSONArray(RESULTS); return result.getJSONArray(RESULTS);
} catch (JSONException e) { } catch (JSONException e) {
...@@ -521,34 +723,21 @@ public class AtlasClient { ...@@ -521,34 +723,21 @@ public class AtlasClient {
} }
/** /**
* Search given type name, an attribute and its value. Uses search dsl
* @param typeName name of the entity type
* @param attributeName attribute name
* @param attributeValue attribute value
* @return result json object
* @throws AtlasServiceException
*/
public JSONArray rawSearch(String typeName, String attributeName, Object attributeValue)
throws AtlasServiceException {
// String gremlinQuery = String.format(
// "g.V.has(\"typeName\",\"%s\").and(_().has(\"%s.%s\", T.eq, \"%s\")).toList()",
// typeName, typeName, attributeName, attributeValue);
// return searchByGremlin(gremlinQuery);
String dslQuery = String.format("%s where %s = \"%s\"", typeName, attributeName, attributeValue);
return searchByDSL(dslQuery);
}
/**
* Search given query DSL * Search given query DSL
* @param query DSL query * @param query DSL query
* @return result json object * @return result json object
* @throws AtlasServiceException * @throws AtlasServiceException
*/ */
public JSONArray searchByDSL(String query) throws AtlasServiceException { public JSONArray searchByDSL(final String query) throws AtlasServiceException {
LOG.debug("DSL query: {}", query); LOG.debug("DSL query: {}", query);
WebResource resource = getResource(API.SEARCH_DSL); JSONObject result = callAPIWithRetries(API.SEARCH_DSL, null, new ResourceCreator() {
resource = resource.queryParam(QUERY, query); @Override
JSONObject result = callAPIWithResource(API.SEARCH_DSL, resource); public WebResource createResource() {
WebResource resource = getResource(API.SEARCH_DSL);
resource = resource.queryParam(QUERY, query);
return resource;
}
});
try { try {
return result.getJSONArray(RESULTS); return result.getJSONArray(RESULTS);
} catch (JSONException e) { } catch (JSONException e) {
...@@ -562,11 +751,16 @@ public class AtlasClient { ...@@ -562,11 +751,16 @@ public class AtlasClient {
* @return result json object * @return result json object
* @throws AtlasServiceException * @throws AtlasServiceException
*/ */
public JSONArray searchByGremlin(String gremlinQuery) throws AtlasServiceException { public JSONArray searchByGremlin(final String gremlinQuery) throws AtlasServiceException {
LOG.debug("Gremlin query: " + gremlinQuery); LOG.debug("Gremlin query: " + gremlinQuery);
WebResource resource = getResource(API.SEARCH_GREMLIN); JSONObject result = callAPIWithRetries(API.SEARCH_GREMLIN, null, new ResourceCreator() {
resource = resource.queryParam(QUERY, gremlinQuery); @Override
JSONObject result = callAPIWithResource(API.SEARCH_GREMLIN, resource); public WebResource createResource() {
WebResource resource = getResource(API.SEARCH_GREMLIN);
resource = resource.queryParam(QUERY, gremlinQuery);
return resource;
}
});
try { try {
return result.getJSONArray(RESULTS); return result.getJSONArray(RESULTS);
} catch (JSONException e) { } catch (JSONException e) {
...@@ -580,10 +774,15 @@ public class AtlasClient { ...@@ -580,10 +774,15 @@ public class AtlasClient {
* @return result json object * @return result json object
* @throws AtlasServiceException * @throws AtlasServiceException
*/ */
public JSONObject searchByFullText(String query) throws AtlasServiceException { public JSONObject searchByFullText(final String query) throws AtlasServiceException {
WebResource resource = getResource(API.SEARCH_FULL_TEXT); return callAPIWithRetries(API.SEARCH_FULL_TEXT, null, new ResourceCreator() {
resource = resource.queryParam(QUERY, query); @Override
return callAPIWithResource(API.SEARCH_FULL_TEXT, resource); public WebResource createResource() {
WebResource resource = getResource(API.SEARCH_FULL_TEXT);
resource = resource.queryParam(QUERY, query);
return resource;
}
});
} }
public JSONObject getInputGraph(String datasetName) throws AtlasServiceException { public JSONObject getInputGraph(String datasetName) throws AtlasServiceException {
...@@ -604,15 +803,11 @@ public class AtlasClient { ...@@ -604,15 +803,11 @@ public class AtlasClient {
} }
} }
public String getRequestId(JSONObject json) throws AtlasServiceException { private WebResource getResource(API api, String... pathParams) {
try { return getResource(service, api, pathParams);
return json.getString(REQUEST_ID);
} catch (JSONException e) {
throw new AtlasServiceException(e);
}
} }
private WebResource getResource(API api, String... pathParams) { private WebResource getResource(WebResource service, API api, String... pathParams) {
WebResource resource = service.path(api.getPath()); WebResource resource = service.path(api.getPath());
if (pathParams != null) { if (pathParams != null) {
for (String pathParam : pathParams) { for (String pathParam : pathParams) {
...@@ -622,29 +817,75 @@ public class AtlasClient { ...@@ -622,29 +817,75 @@ public class AtlasClient {
return resource; return resource;
} }
private JSONObject callAPIWithResource(API api, WebResource resource) throws AtlasServiceException {
return callAPIWithResource(api, resource, null);
}
private JSONObject callAPIWithResource(API api, WebResource resource, Object requestObject) private JSONObject callAPIWithResource(API api, WebResource resource, Object requestObject)
throws AtlasServiceException { throws AtlasServiceException {
ClientResponse clientResponse = resource.accept(JSON_MEDIA_TYPE).type(JSON_MEDIA_TYPE) ClientResponse clientResponse = null;
.method(api.getMethod(), ClientResponse.class, requestObject); for (int i = 0; i < getNumberOfRetries(); i++) {
clientResponse = resource.accept(JSON_MEDIA_TYPE).type(JSON_MEDIA_TYPE)
if (clientResponse.getStatus() == api.getExpectedStatus().getStatusCode()) { .method(api.getMethod(), ClientResponse.class, requestObject);
String responseAsString = clientResponse.getEntity(String.class);
try { if (clientResponse.getStatus() == api.getExpectedStatus().getStatusCode()) {
return new JSONObject(responseAsString); String responseAsString = clientResponse.getEntity(String.class);
} catch (JSONException e) { try {
throw new AtlasServiceException(api, e); return new JSONObject(responseAsString);
} catch (JSONException e) {
throw new AtlasServiceException(api, e);
}
} else if (clientResponse.getStatus() != ClientResponse.Status.SERVICE_UNAVAILABLE.getStatusCode()) {
break;
} else {
LOG.error("Got a service unavailable when calling: {}, will retry..", resource);
sleepBetweenRetries();
} }
} }
throw new AtlasServiceException(api, clientResponse); throw new AtlasServiceException(api, clientResponse);
} }
private JSONObject callAPI(API api, Object requestObject, String... pathParams) throws AtlasServiceException { private JSONObject callAPI(final API api, Object requestObject, final String... pathParams)
WebResource resource = getResource(api, pathParams); throws AtlasServiceException {
return callAPIWithResource(api, resource, requestObject); return callAPIWithRetries(api, requestObject, new ResourceCreator() {
@Override
public WebResource createResource() {
return getResource(api, pathParams);
}
});
} }
/**
* A class to capture input state while creating the client.
*
* The information here will be reused when the client is re-initialized on switch-over
* in case of High Availability.
*/
private class AtlasClientContext {
private String[] baseUrls;
private Client client;
private final UserGroupInformation ugi;
private final String doAsUser;
public AtlasClientContext(String[] baseUrls, Client client, UserGroupInformation ugi, String doAsUser) {
this.baseUrls = baseUrls;
this.client = client;
this.ugi = ugi;
this.doAsUser = doAsUser;
}
public UserGroupInformation getUgi() {
return ugi;
}
public String getDoAsUser() {
return doAsUser;
}
public Client getClient() {
return client;
}
public String[] getBaseUrls() {
return baseUrls;
}
}
} }
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import scala.actors.threadpool.Arrays;
import java.util.List;
public class AtlasServerEnsemble {
private final String[] urls;
public AtlasServerEnsemble(String[] baseUrls) {
Preconditions.checkArgument((baseUrls!=null && baseUrls.length>0),
"List of baseURLs cannot be null or empty.");
for (String baseUrl : baseUrls) {
Preconditions.checkArgument(StringUtils.isNotEmpty(baseUrl),
"Base URL cannot be null or empty.");
}
urls = baseUrls;
}
public boolean hasSingleInstance() {
return urls.length==1;
}
public String firstURL() {
return urls[0];
}
public List<String> getMembers() {
return Arrays.asList(urls);
}
}
/* /**
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
...@@ -15,28 +15,14 @@ ...@@ -15,28 +15,14 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.security;
import java.util.Arrays; package org.apache.atlas;
import java.util.List;
import com.sun.jersey.api.client.WebResource;
/** /**
* * An interface to capture the closure of how a WebResource is created.
*/ */
public interface SecurityProperties { public interface ResourceCreator {
String TLS_ENABLED = "atlas.enableTLS"; WebResource createResource();
String KEYSTORE_FILE_KEY = "keystore.file";
String DEFAULT_KEYSTORE_FILE_LOCATION = "target/atlas.keystore";
String KEYSTORE_PASSWORD_KEY = "keystore.password";
String TRUSTSTORE_FILE_KEY = "truststore.file";
String DEFATULT_TRUSTORE_FILE_LOCATION = "target/atlas.keystore";
String TRUSTSTORE_PASSWORD_KEY = "truststore.password";
String SERVER_CERT_PASSWORD_KEY = "password";
String CLIENT_AUTH_KEY = "client.auth.enabled";
String CERT_STORES_CREDENTIAL_PROVIDER_PATH = "cert.stores.credential.provider.path";
String SSL_CLIENT_PROPERTIES = "ssl-client.xml";
String BIND_ADDRESS = "atlas.server.bind.address";
String ATLAS_SSL_EXCLUDE_CIPHER_SUITES = "atlas.ssl.exclude.cipher.suites";
List<String> DEFAULT_CIPHER_SUITES = Arrays.asList(".*NULL.*", ".*RC4.*", ".*MD5.*",".*DES.*",".*DSS.*");
} }
...@@ -17,28 +17,58 @@ ...@@ -17,28 +17,58 @@
package org.apache.atlas; package org.apache.atlas;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException; import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource; import com.sun.jersey.api.client.WebResource;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import java.net.ConnectException;
import java.net.URI;
import java.net.URISyntaxException;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.fail; import static org.testng.Assert.fail;
public class AtlasClientTest { public class AtlasClientTest {
@Mock
private WebResource service;
@Mock
private Configuration configuration;
@Mock
private Client client;
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Test @Test
public void shouldVerifyServerIsReady() throws AtlasServiceException { public void shouldVerifyServerIsReady() throws AtlasServiceException {
WebResource webResource = mock(WebResource.class); setupRetryParams();
AtlasClient atlasClient = new AtlasClient(webResource);
WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, webResource); AtlasClient atlasClient = new AtlasClient(service, configuration);
WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, service);
ClientResponse response = mock(ClientResponse.class); ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(response.getEntity(String.class)).thenReturn("{\"Version\":\"version-rrelease\",\"Name\":\"apache-atlas\"," + when(response.getEntity(String.class)).thenReturn("{\"Version\":\"version-rrelease\",\"Name\":\"apache-atlas\"," +
...@@ -49,19 +79,16 @@ public class AtlasClientTest { ...@@ -49,19 +79,16 @@ public class AtlasClientTest {
} }
private WebResource.Builder setupBuilder(AtlasClient.API api, WebResource webResource) { private WebResource.Builder setupBuilder(AtlasClient.API api, WebResource webResource) {
WebResource adminVersionResource = mock(WebResource.class); when(webResource.path(api.getPath())).thenReturn(service);
when(webResource.path(api.getPath())).thenReturn(adminVersionResource); WebResource.Builder builder = getBuilder(service);
WebResource.Builder builder = mock(WebResource.Builder.class);
when(adminVersionResource.accept(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder);
when(builder.type(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder);
return builder; return builder;
} }
@Test @Test
public void shouldReturnFalseIfServerIsNotReady() throws AtlasServiceException { public void shouldReturnFalseIfServerIsNotReady() throws AtlasServiceException {
WebResource webResource = mock(WebResource.class); setupRetryParams();
AtlasClient atlasClient = new AtlasClient(webResource); AtlasClient atlasClient = new AtlasClient(service, configuration);
WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, webResource); WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, service);
when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenThrow( when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenThrow(
new ClientHandlerException()); new ClientHandlerException());
assertFalse(atlasClient.isServerReady()); assertFalse(atlasClient.isServerReady());
...@@ -69,9 +96,9 @@ public class AtlasClientTest { ...@@ -69,9 +96,9 @@ public class AtlasClientTest {
@Test @Test
public void shouldReturnFalseIfServiceIsUnavailable() throws AtlasServiceException { public void shouldReturnFalseIfServiceIsUnavailable() throws AtlasServiceException {
WebResource webResource = mock(WebResource.class); setupRetryParams();
AtlasClient atlasClient = new AtlasClient(webResource); AtlasClient atlasClient = new AtlasClient(service, configuration);
WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, webResource); WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, service);
ClientResponse response = mock(ClientResponse.class); ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.SERVICE_UNAVAILABLE.getStatusCode()); when(response.getStatus()).thenReturn(Response.Status.SERVICE_UNAVAILABLE.getStatusCode());
when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.SERVICE_UNAVAILABLE); when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.SERVICE_UNAVAILABLE);
...@@ -83,9 +110,10 @@ public class AtlasClientTest { ...@@ -83,9 +110,10 @@ public class AtlasClientTest {
@Test(expectedExceptions = AtlasServiceException.class) @Test(expectedExceptions = AtlasServiceException.class)
public void shouldThrowErrorIfAnyResponseOtherThanServiceUnavailable() throws AtlasServiceException { public void shouldThrowErrorIfAnyResponseOtherThanServiceUnavailable() throws AtlasServiceException {
WebResource webResource = mock(WebResource.class); setupRetryParams();
AtlasClient atlasClient = new AtlasClient(webResource);
WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, webResource); AtlasClient atlasClient = new AtlasClient(service, configuration);
WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, service);
ClientResponse response = mock(ClientResponse.class); ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); when(response.getStatus()).thenReturn(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.INTERNAL_SERVER_ERROR); when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.INTERNAL_SERVER_ERROR);
...@@ -98,10 +126,11 @@ public class AtlasClientTest { ...@@ -98,10 +126,11 @@ public class AtlasClientTest {
@Test @Test
public void shouldGetAdminStatus() throws AtlasServiceException { public void shouldGetAdminStatus() throws AtlasServiceException {
WebResource webResource = mock(WebResource.class); setupRetryParams();
AtlasClient atlasClient = new AtlasClient(webResource);
AtlasClient atlasClient = new AtlasClient(service, configuration);
WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, webResource); WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
ClientResponse response = mock(ClientResponse.class); ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(response.getEntity(String.class)).thenReturn("{\"Status\":\"Active\"}"); when(response.getEntity(String.class)).thenReturn("{\"Status\":\"Active\"}");
...@@ -113,10 +142,11 @@ public class AtlasClientTest { ...@@ -113,10 +142,11 @@ public class AtlasClientTest {
@Test(expectedExceptions = AtlasServiceException.class) @Test(expectedExceptions = AtlasServiceException.class)
public void shouldReturnStatusAsUnknownOnException() throws AtlasServiceException { public void shouldReturnStatusAsUnknownOnException() throws AtlasServiceException {
WebResource webResource = mock(WebResource.class); setupRetryParams();
AtlasClient atlasClient = new AtlasClient(webResource);
AtlasClient atlasClient = new AtlasClient(service, configuration);
WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, webResource); WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
ClientResponse response = mock(ClientResponse.class); ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); when(response.getStatus()).thenReturn(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.INTERNAL_SERVER_ERROR); when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.INTERNAL_SERVER_ERROR);
...@@ -128,10 +158,10 @@ public class AtlasClientTest { ...@@ -128,10 +158,10 @@ public class AtlasClientTest {
@Test @Test
public void shouldReturnStatusAsUnknownIfJSONIsInvalid() throws AtlasServiceException { public void shouldReturnStatusAsUnknownIfJSONIsInvalid() throws AtlasServiceException {
WebResource webResource = mock(WebResource.class); setupRetryParams();
AtlasClient atlasClient = new AtlasClient(webResource); AtlasClient atlasClient = new AtlasClient(service, configuration);
WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, webResource); WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
ClientResponse response = mock(ClientResponse.class); ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(response.getEntity(String.class)).thenReturn("{\"status\":\"Active\"}"); when(response.getEntity(String.class)).thenReturn("{\"status\":\"Active\"}");
...@@ -140,4 +170,245 @@ public class AtlasClientTest { ...@@ -140,4 +170,245 @@ public class AtlasClientTest {
String status = atlasClient.getAdminStatus(); String status = atlasClient.getAdminStatus();
assertEquals(status, AtlasClient.UNKNOWN_STATUS); assertEquals(status, AtlasClient.UNKNOWN_STATUS);
} }
@Test
public void shouldReturnBaseURLAsPassedInURL() {
AtlasClient atlasClient = new AtlasClient(service, configuration);
String serviceURL = atlasClient.determineActiveServiceURL(new String[]{"http://localhost:21000"}, client);
assertEquals(serviceURL, "http://localhost:21000");
}
@Test
public void shouldSelectActiveAmongMultipleServersIfHAIsEnabled() {
setupRetryParams();
when(client.resource(UriBuilder.fromUri("http://localhost:31000").build())).thenReturn(service);
when(client.resource(UriBuilder.fromUri("http://localhost:41000").build())).thenReturn(service);
WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
ClientResponse firstResponse = mock(ClientResponse.class);
when(firstResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(firstResponse.getEntity(String.class)).thenReturn("{\"Status\":\"PASSIVE\"}");
ClientResponse secondResponse = mock(ClientResponse.class);
when(secondResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(secondResponse.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}");
when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).
thenReturn(firstResponse).thenReturn(firstResponse).thenReturn(firstResponse).
thenReturn(secondResponse);
AtlasClient atlasClient = new AtlasClient(service, configuration);
String serviceURL = atlasClient.determineActiveServiceURL(
new String[]{"http://localhost:31000", "http://localhost:41000"},
client);
assertEquals(serviceURL, "http://localhost:41000");
}
@Test
public void shouldRetryUntilServiceBecomesActive() {
setupRetryParams();
when(client.resource(UriBuilder.fromUri("http://localhost:31000").build())).thenReturn(service);
WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(response.getEntity(String.class)).thenReturn("{\"Status\":\"BECOMING_ACTIVE\"}");
ClientResponse nextResponse = mock(ClientResponse.class);
when(nextResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(nextResponse.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}");
when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).
thenReturn(response).thenReturn(response).thenReturn(nextResponse);
AtlasClient atlasClient = new AtlasClient(service, configuration);
String serviceURL = atlasClient.determineActiveServiceURL(
new String[] {"http://localhost:31000","http://localhost:41000"},
client);
assertEquals(serviceURL, "http://localhost:31000");
}
@Test
public void shouldRetryIfCannotConnectToServiceInitially() {
setupRetryParams();
when(client.resource(UriBuilder.fromUri("http://localhost:31000").build())).thenReturn(service);
WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(response.getEntity(String.class)).thenReturn("{\"Status\":\"BECOMING_ACTIVE\"}");
ClientResponse nextResponse = mock(ClientResponse.class);
when(nextResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(nextResponse.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}");
when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).
thenThrow(new ClientHandlerException("Simulating connection exception")).
thenReturn(response).
thenReturn(nextResponse);
AtlasClient atlasClient = new AtlasClient(service, configuration);
String serviceURL = atlasClient.determineActiveServiceURL(
new String[] {"http://localhost:31000","http://localhost:41000"},
client);
assertEquals(serviceURL, "http://localhost:31000");
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void shouldThrowExceptionIfActiveServerIsNotFound() {
setupRetryParams();
when(client.resource(UriBuilder.fromUri("http://localhost:31000").build())).thenReturn(service);
WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(response.getEntity(String.class)).thenReturn("{\"Status\":\"BECOMING_ACTIVE\"}");
when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).
thenThrow(new ClientHandlerException("Simulating connection exception")).
thenReturn(response).
thenReturn(response);
AtlasClient atlasClient = new AtlasClient(service, configuration);
String serviceURL = atlasClient.determineActiveServiceURL(
new String[] {"http://localhost:31000","http://localhost:41000"},
client);
assertNull(serviceURL);
}
@Test
public void shouldRetryAPICallsOnClientHandlerException() throws AtlasServiceException, URISyntaxException {
setupRetryParams();
ResourceCreator resourceCreator = mock(ResourceCreator.class);
WebResource resourceObject = mock(WebResource.class);
when(resourceObject.getURI()).
thenReturn(new URI("http://localhost:31000/api/atlas/types")).
thenReturn(new URI("http://localhost:41000/api/atlas/types")).
thenReturn(new URI("http://localhost:41000/api/atlas/types"));
WebResource.Builder builder = getBuilder(resourceObject);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(response.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}");
when(builder.method(AtlasClient.API.LIST_TYPES.getMethod(), ClientResponse.class, null)).
thenThrow(new ClientHandlerException("simulating exception in calling API", new ConnectException())).
thenReturn(response);
when(resourceCreator.createResource()).thenReturn(resourceObject);
AtlasClient atlasClient = getClientForTest("http://localhost:31000","http://localhost:41000");
atlasClient.callAPIWithRetries(AtlasClient.API.LIST_TYPES, null, resourceCreator);
verify(client).destroy();
verify(client).resource(UriBuilder.fromUri("http://localhost:31000").build());
verify(client).resource(UriBuilder.fromUri("http://localhost:41000").build());
}
@Test
public void shouldRetryWithSameClientIfSingleAddressIsUsed() throws URISyntaxException, AtlasServiceException {
setupRetryParams();
ResourceCreator resourceCreator = mock(ResourceCreator.class);
WebResource resourceObject = mock(WebResource.class);
when(resourceObject.getURI()).
thenReturn(new URI("http://localhost:31000/api/atlas/types"));
WebResource.Builder builder = getBuilder(resourceObject);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(response.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}");
when(builder.method(AtlasClient.API.LIST_TYPES.getMethod(), ClientResponse.class, null)).
thenThrow(new ClientHandlerException("simulating exception in calling API", new ConnectException())).
thenReturn(response);
when(resourceCreator.createResource()).thenReturn(resourceObject);
AtlasClient atlasClient = getClientForTest("http://localhost:31000");
atlasClient.callAPIWithRetries(AtlasClient.API.LIST_TYPES, null, resourceCreator);
verify(client).destroy();
verify(client, times(2)).resource(UriBuilder.fromUri("http://localhost:31000").build());
}
@Test
public void shouldRetryAPICallsOnServiceUnavailable() throws AtlasServiceException, URISyntaxException {
setupRetryParams();
ResourceCreator resourceCreator = mock(ResourceCreator.class);
WebResource resourceObject = mock(WebResource.class);
when(resourceObject.getURI()).
thenReturn(new URI("http://localhost:31000/api/atlas/types")).
thenReturn(new URI("http://localhost:41000/api/atlas/types")).
thenReturn(new URI("http://localhost:41000/api/atlas/types"));
WebResource.Builder builder = getBuilder(resourceObject);
ClientResponse firstResponse = mock(ClientResponse.class);
when(firstResponse.getStatus()).thenReturn(Response.Status.SERVICE_UNAVAILABLE.getStatusCode());
when(firstResponse.getClientResponseStatus()).thenReturn(ClientResponse.Status.SERVICE_UNAVAILABLE);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(response.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}");
when(builder.method(AtlasClient.API.LIST_TYPES.getMethod(), ClientResponse.class, null)).
thenThrow(new ClientHandlerException("simulating exception in calling API", new ConnectException())).
thenReturn(firstResponse).
thenReturn(response);
when(resourceCreator.createResource()).thenReturn(resourceObject);
AtlasClient atlasClient = getClientForTest("http://localhost:31000","http://localhost:41000");
atlasClient.callAPIWithRetries(AtlasClient.API.LIST_TYPES, null, resourceCreator);
verify(client).destroy();
verify(client).resource(UriBuilder.fromUri("http://localhost:31000").build());
verify(client).resource(UriBuilder.fromUri("http://localhost:41000").build());
}
private WebResource.Builder getBuilder(WebResource resourceObject) {
WebResource.Builder builder = mock(WebResource.Builder.class);
when(resourceObject.accept(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder);
when(builder.type(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder);
return builder;
}
private void setupRetryParams() {
when(configuration.getInt(AtlasClient.ATLAS_CLIENT_HA_RETRIES_KEY, AtlasClient.DEFAULT_NUM_RETRIES)).
thenReturn(3);
when(configuration.getInt(AtlasClient.ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY,
AtlasClient.DEFAULT_SLEEP_BETWEEN_RETRIES_MS)).
thenReturn(1);
}
private AtlasClient getClientForTest(final String... baseUrls) {
return new AtlasClient(null, null, baseUrls) {
boolean firstCall = true;
@Override
protected String determineActiveServiceURL(String[] baseUrls, Client client) {
String returnUrl = baseUrls[0];
if (baseUrls.length > 1 && !firstCall) {
returnUrl = baseUrls[1];
}
firstCall = false;
return returnUrl;
}
@Override
protected Configuration getClientProperties() {
return configuration;
}
@Override
protected Client getClient(Configuration configuration, UserGroupInformation ugi, String doAsUser) {
return client;
}
};
}
} }
...@@ -51,5 +51,10 @@ ...@@ -51,5 +51,10 @@
<artifactId>commons-configuration</artifactId> <artifactId>commons-configuration</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>
...@@ -18,34 +18,36 @@ ...@@ -18,34 +18,36 @@
package org.apache.atlas.ha; package org.apache.atlas.ha;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasException;
import org.apache.atlas.security.SecurityProperties; import org.apache.atlas.security.SecurityProperties;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.net.NetUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress; import java.util.ArrayList;
import java.util.List;
/** /**
* A wrapper for getting configuration entries related to HighAvailability. * A wrapper for getting configuration entries related to HighAvailability.
*/ */
public class HAConfiguration { public final class HAConfiguration {
private HAConfiguration() {
}
private static final Logger LOG = LoggerFactory.getLogger(HAConfiguration.class); private static final Logger LOG = LoggerFactory.getLogger(HAConfiguration.class);
public static final String ATLAS_SERVER_HA_PREFIX = "atlas.server.ha"; public static final String ATLAS_SERVER_HA_PREFIX = "atlas.server.ha.";
public static final String ATLAS_SERVER_HA_ENABLED_KEY = ATLAS_SERVER_HA_PREFIX + ".enabled"; public static final String ATLAS_SERVER_HA_ENABLED_KEY = ATLAS_SERVER_HA_PREFIX + "enabled";
public static final String ATLAS_SERVER_ADDRESS_PREFIX = "atlas.server.address."; public static final String ATLAS_SERVER_ADDRESS_PREFIX = "atlas.server.address.";
public static final String ATLAS_SERVER_IDS = "atlas.server.ids"; public static final String ATLAS_SERVER_IDS = "atlas.server.ids";
public static final String HA_ZOOKEEPER_CONNECT = ATLAS_SERVER_HA_PREFIX + ".zookeeper.connect"; public static final String HA_ZOOKEEPER_CONNECT = ATLAS_SERVER_HA_PREFIX + "zookeeper.connect";
public static final int DEFAULT_ZOOKEEPER_CONNECT_SLEEPTIME_MILLIS = 1000; public static final int DEFAULT_ZOOKEEPER_CONNECT_SLEEPTIME_MILLIS = 1000;
public static final String HA_ZOOKEEPER_RETRY_SLEEPTIME_MILLIS = ATLAS_SERVER_HA_PREFIX + ".zookeeper.retry.sleeptime.ms"; public static final String HA_ZOOKEEPER_RETRY_SLEEPTIME_MILLIS =
public static final String HA_ZOOKEEPER_NUM_RETRIES = ATLAS_SERVER_HA_PREFIX + ".zookeeper.num.retries"; ATLAS_SERVER_HA_PREFIX + "zookeeper.retry.sleeptime.ms";
public static final String HA_ZOOKEEPER_NUM_RETRIES = ATLAS_SERVER_HA_PREFIX + "zookeeper.num.retries";
public static final int DEFAULT_ZOOKEEPER_CONNECT_NUM_RETRIES = 3; public static final int DEFAULT_ZOOKEEPER_CONNECT_NUM_RETRIES = 3;
public static final String HA_ZOOKEEPER_SESSION_TIMEOUT_MS = ATLAS_SERVER_HA_PREFIX + ".zookeeper.session.timeout.ms"; public static final String HA_ZOOKEEPER_SESSION_TIMEOUT_MS =
ATLAS_SERVER_HA_PREFIX + "zookeeper.session.timeout.ms";
public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MILLIS = 20000; public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MILLIS = 20000;
/** /**
...@@ -58,53 +60,6 @@ public class HAConfiguration { ...@@ -58,53 +60,6 @@ public class HAConfiguration {
} }
/** /**
* Return the ID corresponding to this Atlas instance.
*
* The match is done by looking for an ID configured in {@link HAConfiguration#ATLAS_SERVER_IDS} key
* that has a host:port entry for the key {@link HAConfiguration#ATLAS_SERVER_ADDRESS_PREFIX}+ID where
* the host is a local IP address and port is set in the system property
* {@link AtlasConstants#SYSTEM_PROPERTY_APP_PORT}.
*
* @param configuration
* @return
* @throws AtlasException if no ID is found that maps to a local IP Address or port
*/
public static String getAtlasServerId(Configuration configuration) throws AtlasException {
// ids are already trimmed by this method
String[] ids = configuration.getStringArray(ATLAS_SERVER_IDS);
String matchingServerId = null;
int appPort = Integer.parseInt(System.getProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT));
for (String id : ids) {
String hostPort = configuration.getString(ATLAS_SERVER_ADDRESS_PREFIX +id);
if (!StringUtils.isEmpty(hostPort)) {
InetSocketAddress socketAddress;
try {
socketAddress = NetUtils.createSocketAddr(hostPort);
} catch (Exception e) {
LOG.warn("Exception while trying to get socket address for " + hostPort, e);
continue;
}
if (!socketAddress.isUnresolved()
&& NetUtils.isLocalAddress(socketAddress.getAddress())
&& appPort == socketAddress.getPort()) {
LOG.info("Found matched server id " + id + " with host port: " + hostPort);
matchingServerId = id;
break;
}
} else {
LOG.info("Could not find matching address entry for id: " + id);
}
}
if (matchingServerId == null) {
String msg = String.format("Could not find server id for this instance. " +
"Unable to find IDs matching any local host and port binding among %s",
StringUtils.join(ids, ","));
throw new AtlasException(msg);
}
return matchingServerId;
}
/**
* Get the web server address that a server instance with the passed ID is bound to. * Get the web server address that a server instance with the passed ID is bound to.
* *
* This method uses the property {@link SecurityProperties#TLS_ENABLED} to determine whether * This method uses the property {@link SecurityProperties#TLS_ENABLED} to determine whether
...@@ -121,8 +76,17 @@ public class HAConfiguration { ...@@ -121,8 +76,17 @@ public class HAConfiguration {
return protocol + hostPort; return protocol + hostPort;
} }
public static List<String> getServerInstances(Configuration configuration) {
String[] serverIds = configuration.getStringArray(ATLAS_SERVER_IDS);
List<String> serverInstances = new ArrayList<>(serverIds.length);
for (String serverId : serverIds) {
serverInstances.add(getBoundAddressForId(configuration, serverId));
}
return serverInstances;
}
/** /**
* A collection of Zookeeper specific configuration that is used by High Availability code * A collection of Zookeeper specific configuration that is used by High Availability code.
*/ */
public static class ZookeeperProperties { public static class ZookeeperProperties {
private String connectString; private String connectString;
...@@ -156,14 +120,27 @@ public class HAConfiguration { ...@@ -156,14 +120,27 @@ public class HAConfiguration {
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) {
if (o == null || getClass() != o.getClass()) return false; return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ZookeeperProperties that = (ZookeeperProperties) o; ZookeeperProperties that = (ZookeeperProperties) o;
if (retriesSleepTimeMillis != that.retriesSleepTimeMillis) return false; if (retriesSleepTimeMillis != that.retriesSleepTimeMillis) {
if (numRetries != that.numRetries) return false; return false;
if (sessionTimeout != that.sessionTimeout) return false; }
if (numRetries != that.numRetries) {
return false;
}
if (sessionTimeout != that.sessionTimeout) {
return false;
}
return !(connectString != null ? !connectString.equals(that.connectString) : that.connectString != null); return !(connectString != null ? !connectString.equals(that.connectString) : that.connectString != null);
} }
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.security;
import java.util.Arrays;
import java.util.List;
/**
*
*/
public final class SecurityProperties {
private SecurityProperties() {
}
public static final String TLS_ENABLED = "atlas.enableTLS";
public static final String KEYSTORE_FILE_KEY = "keystore.file";
public static final String DEFAULT_KEYSTORE_FILE_LOCATION = "target/atlas.keystore";
public static final String KEYSTORE_PASSWORD_KEY = "keystore.password";
public static final String TRUSTSTORE_FILE_KEY = "truststore.file";
public static final String DEFATULT_TRUSTORE_FILE_LOCATION = "target/atlas.keystore";
public static final String TRUSTSTORE_PASSWORD_KEY = "truststore.password";
public static final String SERVER_CERT_PASSWORD_KEY = "password";
public static final String CLIENT_AUTH_KEY = "client.auth.enabled";
public static final String CERT_STORES_CREDENTIAL_PROVIDER_PATH = "cert.stores.credential.provider.path";
public static final String SSL_CLIENT_PROPERTIES = "ssl-client.xml";
public static final String BIND_ADDRESS = "atlas.server.bind.address";
public static final String ATLAS_SSL_EXCLUDE_CIPHER_SUITES = "atlas.ssl.exclude.cipher.suites";
public static final List<String> DEFAULT_CIPHER_SUITES = Arrays.asList(
".*NULL.*", ".*RC4.*", ".*MD5.*", ".*DES.*", ".*DSS.*");
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.ha;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.security.SecurityProperties;
import org.apache.commons.configuration.Configuration;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.List;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
public class HAConfigurationTest {
@Mock
private Configuration configuration;
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
System.setProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT, AtlasConstants.DEFAULT_APP_PORT_STR);
}
@Test
public void testShouldReturnHTTPSBoundAddress() {
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21443");
when(configuration.getBoolean(SecurityProperties.TLS_ENABLED)).thenReturn(true);
String address = HAConfiguration.getBoundAddressForId(configuration, "id1");
assertEquals(address, "https://127.0.0.1:21443");
}
@Test
public void testShouldReturnListOfAddressesInConfig() {
when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1", "id2"});
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000");
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id2")).thenReturn("127.0.0.1:31000");
List<String> serverInstances = HAConfiguration.getServerInstances(configuration);
assertEquals(serverInstances.size(), 2);
assertTrue(serverInstances.contains("http://127.0.0.1:21000"));
assertTrue(serverInstances.contains("http://127.0.0.1:31000"));
}
}
...@@ -13,6 +13,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ...@@ -13,6 +13,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES: ALL CHANGES:
ATLAS-571 Modify Atlas client for necessary changes in context of HA (yhemanth via sumasai)
ATLAS-620 Disable hbase based entity audit (shwethags) ATLAS-620 Disable hbase based entity audit (shwethags)
ATLAS-618 Fix assembly for hdfs-module (sumasai via yhemanth) ATLAS-618 Fix assembly for hdfs-module (sumasai via yhemanth)
ATLAS-573 Inherited attributes disappear from entities after server restart (dkantor via sumasai) ATLAS-573 Inherited attributes disappear from entities after server restart (dkantor via sumasai)
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.ha;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasException;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.net.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
public class AtlasServerIdSelector {
private static final Logger LOG = LoggerFactory.getLogger(AtlasServerIdSelector.class);
/**
* Return the ID corresponding to this Atlas instance.
*
* The match is done by looking for an ID configured in {@link HAConfiguration#ATLAS_SERVER_IDS} key
* that has a host:port entry for the key {@link HAConfiguration#ATLAS_SERVER_ADDRESS_PREFIX}+ID where
* the host is a local IP address and port is set in the system property
* {@link AtlasConstants#SYSTEM_PROPERTY_APP_PORT}.
*
* @param configuration
* @return
* @throws AtlasException if no ID is found that maps to a local IP Address or port
*/
public static String selectServerId(Configuration configuration) throws AtlasException {
// ids are already trimmed by this method
String[] ids = configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS);
String matchingServerId = null;
int appPort = Integer.parseInt(System.getProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT));
for (String id : ids) {
String hostPort = configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +id);
if (!StringUtils.isEmpty(hostPort)) {
InetSocketAddress socketAddress;
try {
socketAddress = NetUtils.createSocketAddr(hostPort);
} catch (Exception e) {
LOG.warn("Exception while trying to get socket address for " + hostPort, e);
continue;
}
if (!socketAddress.isUnresolved()
&& NetUtils.isLocalAddress(socketAddress.getAddress())
&& appPort == socketAddress.getPort()) {
LOG.info("Found matched server id " + id + " with host port: " + hostPort);
matchingServerId = id;
break;
}
} else {
LOG.info("Could not find matching address entry for id: " + id);
}
}
if (matchingServerId == null) {
String msg = String.format("Could not find server id for this instance. " +
"Unable to find IDs matching any local host and port binding among %s",
StringUtils.join(ids, ","));
throw new AtlasException(msg);
}
return matchingServerId;
}
}
...@@ -20,7 +20,6 @@ package org.apache.atlas.ha; ...@@ -20,7 +20,6 @@ package org.apache.atlas.ha;
import org.apache.atlas.AtlasConstants; import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.security.SecurityProperties;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.MockitoAnnotations; import org.mockito.MockitoAnnotations;
...@@ -31,8 +30,7 @@ import static org.mockito.Mockito.when; ...@@ -31,8 +30,7 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail; import static org.testng.Assert.fail;
public class HAConfigurationTest { public class AtlasServerIdSelectorTest {
@Mock @Mock
private Configuration configuration; private Configuration configuration;
...@@ -48,14 +46,14 @@ public class HAConfigurationTest { ...@@ -48,14 +46,14 @@ public class HAConfigurationTest {
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:31000"); when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:31000");
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id2")).thenReturn("127.0.0.1:21000"); when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id2")).thenReturn("127.0.0.1:21000");
String atlasServerId = HAConfiguration.getAtlasServerId(configuration); String atlasServerId = AtlasServerIdSelector.selectServerId(configuration);
assertEquals(atlasServerId, "id2"); assertEquals(atlasServerId, "id2");
} }
@Test(expectedExceptions = AtlasException.class) @Test(expectedExceptions = AtlasException.class)
public void testShouldFailIfNoIDsConfiguration() throws AtlasException { public void testShouldFailIfNoIDsConfiguration() throws AtlasException {
when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {}); when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {});
HAConfiguration.getAtlasServerId(configuration); AtlasServerIdSelector.selectServerId(configuration);
fail("Should not return any server id if IDs not found in configuration"); fail("Should not return any server id if IDs not found in configuration");
} }
...@@ -64,27 +62,7 @@ public class HAConfigurationTest { ...@@ -64,27 +62,7 @@ public class HAConfigurationTest {
when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1", "id2"}); when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1", "id2"});
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:31000"); when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:31000");
HAConfiguration.getAtlasServerId(configuration); AtlasServerIdSelector.selectServerId(configuration);
fail("Should not return any server id if no matching address found for any ID"); fail("Should not return any server id if no matching address found for any ID");
} }
@Test
public void testShouldReturnHTTPBoundAddress() {
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000");
when(configuration.getBoolean(SecurityProperties.TLS_ENABLED)).thenReturn(false);
String address = HAConfiguration.getBoundAddressForId(configuration, "id1");
assertEquals(address, "http://127.0.0.1:21000");
}
@Test
public void testShouldReturnHTTPSBoundAddress() {
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21443");
when(configuration.getBoolean(SecurityProperties.TLS_ENABLED)).thenReturn(true);
String address = HAConfiguration.getBoundAddressForId(configuration, "id1");
assertEquals(address, "https://127.0.0.1:21443");
}
} }
...@@ -113,7 +113,8 @@ public class QuickStart { ...@@ -113,7 +113,8 @@ public class QuickStart {
private final AtlasClient metadataServiceClient; private final AtlasClient metadataServiceClient;
QuickStart(String baseUrl) { QuickStart(String baseUrl) {
metadataServiceClient = new AtlasClient(baseUrl); String[] urls = baseUrl.split(",");
metadataServiceClient = new AtlasClient(null, null, urls);
} }
void createTypes() throws Exception { void createTypes() throws Exception {
......
...@@ -23,6 +23,7 @@ import com.google.inject.Provider; ...@@ -23,6 +23,7 @@ import com.google.inject.Provider;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.AtlasServerIdSelector;
import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.service.Service; import org.apache.atlas.service.Service;
...@@ -101,7 +102,7 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene ...@@ -101,7 +102,7 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene
return; return;
} }
cacheActiveStateChangeHandlers(); cacheActiveStateChangeHandlers();
serverId = HAConfiguration.getAtlasServerId(configuration); serverId = AtlasServerIdSelector.selectServerId(configuration);
joinElection(); joinElection();
} }
......
...@@ -85,7 +85,7 @@ public class CuratorFactory { ...@@ -85,7 +85,7 @@ public class CuratorFactory {
* Create a new instance {@link LeaderLatch} * Create a new instance {@link LeaderLatch}
* @param serverId the ID used to register this instance with curator. * @param serverId the ID used to register this instance with curator.
* This ID should typically be obtained using * This ID should typically be obtained using
* {@link HAConfiguration#getAtlasServerId(Configuration)} * {@link org.apache.atlas.ha.AtlasServerIdSelector#selectServerId(Configuration)}
* @return * @return
*/ */
public LeaderLatch leaderLatchInstance(String serverId) { public LeaderLatch leaderLatchInstance(String serverId) {
......
...@@ -86,7 +86,7 @@ public class NegativeSSLAndKerberosTest extends BaseSSLAndKerberosTest { ...@@ -86,7 +86,7 @@ public class NegativeSSLAndKerberosTest extends BaseSSLAndKerberosTest {
dgiClient = new AtlasClient(DGI_URL) { dgiClient = new AtlasClient(DGI_URL) {
@Override @Override
protected PropertiesConfiguration getClientProperties() throws AtlasException { protected PropertiesConfiguration getClientProperties() {
return configuration; return configuration;
} }
}; };
......
...@@ -102,7 +102,7 @@ public class SSLAndKerberosTest extends BaseSSLAndKerberosTest { ...@@ -102,7 +102,7 @@ public class SSLAndKerberosTest extends BaseSSLAndKerberosTest {
public AtlasClient run() throws Exception { public AtlasClient run() throws Exception {
return new AtlasClient(DGI_URL) { return new AtlasClient(DGI_URL) {
@Override @Override
protected PropertiesConfiguration getClientProperties() throws AtlasException { protected PropertiesConfiguration getClientProperties() {
return configuration; return configuration;
} }
}; };
......
...@@ -78,7 +78,7 @@ public class SSLTest extends BaseSSLAndKerberosTest { ...@@ -78,7 +78,7 @@ public class SSLTest extends BaseSSLAndKerberosTest {
dgiCLient = new AtlasClient(DGI_URL) { dgiCLient = new AtlasClient(DGI_URL) {
@Override @Override
protected PropertiesConfiguration getClientProperties() throws AtlasException { protected PropertiesConfiguration getClientProperties() {
return configuration; return configuration;
} }
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment