Commit 1e3029bc by Shwetha GS

ATLAS-585 NotificationHookConsumer creates new AtlasClient for every message (shwethags)

parent 334429a8
......@@ -46,5 +46,9 @@ test-output
.DS_Store
*.swp
#atlas data directory creates when tests are run from IDE
**/atlas.data/**
**/${sys:atlas.data}/**
#hbase package downloaded
distro/hbase/*.tar.gz
\ No newline at end of file
distro/hbase/*.tar.gz
......@@ -152,11 +152,6 @@
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-server-api</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>hdfs-model</artifactId>
<version>${project.version}</version>
</artifactItem>
......
......@@ -239,11 +239,6 @@
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-server-api</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
......
......@@ -43,7 +43,6 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
......@@ -181,10 +180,10 @@ public class HiveMetaStoreBridge {
String entityJSON = InstanceSerialization.toJson(referenceable, true);
LOG.debug("Submitting new entity {} = {}", referenceable.getTypeName(), entityJSON);
JSONArray guids = getAtlasClient().createEntity(entityJSON);
List<String> guids = getAtlasClient().createEntity(entityJSON);
LOG.debug("created instance for type " + typeName + ", guid: " + guids);
return new Referenceable(guids.getString(0), referenceable.getTypeName(), null);
return new Referenceable(guids.get(0), referenceable.getTypeName(), null);
}
/**
......@@ -536,8 +535,7 @@ public class HiveMetaStoreBridge {
public static void main(String[] argv) throws Exception {
Configuration atlasConf = ApplicationProperties.get();
String atlasEndpoint = atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
AtlasClient atlasClient = new AtlasClient(atlasEndpoint, ugi, ugi.getShortUserName());
AtlasClient atlasClient = new AtlasClient(atlasEndpoint);
HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasClient);
hiveMetaStoreBridge.registerHiveDataModel();
......
......@@ -20,7 +20,6 @@ package org.apache.atlas.hive.hook;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
......@@ -31,12 +30,7 @@ import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.typesystem.json.TypesSerialization$;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.utils.ParamChecker;
import org.apache.commons.configuration.Configuration;
......@@ -51,7 +45,6 @@ import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
......@@ -59,9 +52,6 @@ import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.File;
import java.text.ParseException;
import java.util.Date;
......@@ -737,8 +727,6 @@ public class HiveHookIT {
columns = getColumns(DEFAULT_DB, tableName);
Assert.assertEquals(columns.size(), 2);
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
String newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName);
......@@ -749,6 +737,9 @@ public class HiveHookIT {
}
});
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
//Change name and add comment
oldColName = "name2";
newColName = "name3";
......
......@@ -234,11 +234,6 @@
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-server-api</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
......
......@@ -190,11 +190,6 @@
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-server-api</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>hdfs-model</artifactId>
<version>${project.version}</version>
</artifactItem>
......
......@@ -60,7 +60,7 @@ public class AtlasAdminClient {
Configuration configuration = ApplicationProperties.get();
String atlasServerUri = configuration.getString(
AtlasConstants.ATLAS_REST_ADDRESS_KEY, AtlasConstants.DEFAULT_ATLAS_REST_ADDRESS);
AtlasClient atlasClient = new AtlasClient(atlasServerUri, null, null);
AtlasClient atlasClient = new AtlasClient(atlasServerUri);
return handleCommand(commandLine, atlasServerUri, atlasClient);
}
......
......@@ -32,7 +32,6 @@ import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.atlas.typesystem.json.TypesSerialization$;
import org.apache.atlas.typesystem.types.AttributeDefinition;
import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
import org.apache.atlas.typesystem.types.TraitType;
......@@ -78,6 +77,7 @@ public class AtlasClient {
public static final String COUNT = "count";
public static final String ROWS = "rows";
public static final String DATATYPE = "dataType";
public static final String STATUS = "Status";
public static final String EVENTS = "events";
public static final String START_KEY = "startKey";
......@@ -115,6 +115,9 @@ public class AtlasClient {
// Setting the default value based on testing failovers while client code like quickstart is running.
public static final int DEFAULT_NUM_RETRIES = 4;
public static final String ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms";
public static final String HTTP_AUTHENTICATION_ENABLED = "atlas.http.authentication.enabled";
// Setting the default value based on testing failovers while client code like quickstart is running.
// With number of retries, this gives a total time of about 20s for the server to start.
public static final int DEFAULT_SLEEP_BETWEEN_RETRIES_MS = 5000;
......@@ -124,28 +127,20 @@ public class AtlasClient {
private Configuration configuration;
/**
* Create a new AtlasClient.
*
* @param baseUrl The URL of the Atlas server to connect to.
*/
public AtlasClient(String baseUrl) {
this(baseUrl, null, null);
}
/**
* Create a new Atlas Client.
* @param baseUrl The URL of the Atlas server to connect to.
* @param ugi The {@link UserGroupInformation} of logged in user.
* @param doAsUser The user on whose behalf queries will be executed.
* Create a new Atlas client.
* @param baseUrls A list of URLs that point to an ensemble of Atlas servers working in
* High Availability mode. The client will automatically determine the
* active instance on startup and also when there is a scenario of
* failover.
*/
public AtlasClient(String baseUrl, UserGroupInformation ugi, String doAsUser) {
initializeState(new String[] {baseUrl}, ugi, doAsUser);
public AtlasClient(String... baseUrls) throws AtlasException {
this(getCurrentUGI(), baseUrls);
}
/**
* Create a new Atlas client.
* @param ugi The {@link UserGroupInformation} of logged in user, can be null in unsecure mode.
* @param doAsUser The user on whose behalf queries will be executed, can be null in unsecure mode.
* @param ugi UserGroupInformation
* @param doAsUser
* @param baseUrls A list of URLs that point to an ensemble of Atlas servers working in
* High Availability mode. The client will automatically determine the
* active instance on startup and also when there is a scenario of
......@@ -155,6 +150,23 @@ public class AtlasClient {
initializeState(baseUrls, ugi, doAsUser);
}
private static UserGroupInformation getCurrentUGI() throws AtlasException {
try {
return UserGroupInformation.getCurrentUser();
} catch (IOException e) {
throw new AtlasException(e);
}
}
private AtlasClient(UserGroupInformation ugi, String[] baseUrls) {
this(ugi, ugi.getShortUserName(), baseUrls);
}
//Used by LocalAtlasClient
protected AtlasClient() {
//Do nothing
}
private void initializeState(String[] baseUrls, UserGroupInformation ugi, String doAsUser) {
configuration = getClientProperties();
Client client = getClient(configuration, ugi, doAsUser);
......@@ -340,7 +352,7 @@ public class AtlasClient {
WebResource resource = getResource(service, API.STATUS);
JSONObject response = callAPIWithResource(API.STATUS, resource, null);
try {
result = response.getString("Status");
result = response.getString(STATUS);
} catch (JSONException e) {
LOG.error("Exception while parsing admin status response. Returned response {}", response.toString(), e);
}
......@@ -418,12 +430,14 @@ public class AtlasClient {
public List<String> createType(String typeAsJson) throws AtlasServiceException {
LOG.debug("Creating type definition: {}", typeAsJson);
JSONObject response = callAPI(API.CREATE_TYPE, typeAsJson);
return extractResults(response, AtlasClient.TYPES, new ExtractOperation<String, JSONObject>() {
List<String> results = extractResults(response, AtlasClient.TYPES, new ExtractOperation<String, JSONObject>() {
@Override
String extractElement(JSONObject element) throws JSONException {
return element.getString(AtlasClient.NAME);
}
});
LOG.debug("Create type definition returned results: {}", results);
return results;
}
/**
......@@ -470,14 +484,16 @@ public class AtlasClient {
* @throws AtlasServiceException
*/
public List<String> updateType(String typeAsJson) throws AtlasServiceException {
LOG.debug("Updating tyep definition: {}", typeAsJson);
LOG.debug("Updating type definition: {}", typeAsJson);
JSONObject response = callAPI(API.UPDATE_TYPE, typeAsJson);
return extractResults(response, AtlasClient.TYPES, new ExtractOperation<String, JSONObject>() {
List<String> results = extractResults(response, AtlasClient.TYPES, new ExtractOperation<String, JSONObject>() {
@Override
String extractElement(JSONObject element) throws JSONException {
return element.getString(AtlasClient.NAME);
}
});
LOG.debug("Update type definition returned results: {}", results);
return results;
}
/**
......@@ -495,10 +511,11 @@ public class AtlasClient {
return extractResults(jsonObject, AtlasClient.RESULTS, new ExtractOperation<String, String>());
}
public String getType(String typeName) throws AtlasServiceException {
public TypesDef getType(String typeName) throws AtlasServiceException {
try {
JSONObject response = callAPI(API.GET_TYPE, null, typeName);;
return response.getString(DEFINITION);
String typeJson = response.getString(DEFINITION);
return TypesSerialization.fromJson(typeJson);
} catch (AtlasServiceException e) {
if (Response.Status.NOT_FOUND.equals(e.getStatus())) {
return null;
......@@ -515,14 +532,12 @@ public class AtlasClient {
* @return json array of guids
* @throws AtlasServiceException
*/
public JSONArray createEntity(JSONArray entities) throws AtlasServiceException {
protected List<String> createEntity(JSONArray entities) throws AtlasServiceException {
LOG.debug("Creating entities: {}", entities);
JSONObject response = callAPI(API.CREATE_ENTITY, entities.toString());
try {
return response.getJSONArray(GUID);
} catch (JSONException e) {
throw new AtlasServiceException(API.GET_ENTITY, e);
}
List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>());
LOG.debug("Create entities returned results: {}", results);
return results;
}
/**
......@@ -531,15 +546,15 @@ public class AtlasClient {
* @return json array of guids
* @throws AtlasServiceException
*/
public JSONArray createEntity(String... entitiesAsJson) throws AtlasServiceException {
public List<String> createEntity(String... entitiesAsJson) throws AtlasServiceException {
return createEntity(new JSONArray(Arrays.asList(entitiesAsJson)));
}
public JSONArray createEntity(Referenceable... entities) throws AtlasServiceException {
public List<String> createEntity(Referenceable... entities) throws AtlasServiceException {
return createEntity(Arrays.asList(entities));
}
public JSONArray createEntity(Collection<Referenceable> entities) throws AtlasServiceException {
public List<String> createEntity(Collection<Referenceable> entities) throws AtlasServiceException {
JSONArray entityArray = getEntitiesArray(entities);
return createEntity(entityArray);
}
......@@ -559,19 +574,21 @@ public class AtlasClient {
* @return json array of guids which were updated/created
* @throws AtlasServiceException
*/
public JSONArray updateEntities(Referenceable... entities) throws AtlasServiceException {
public List<String> updateEntities(Referenceable... entities) throws AtlasServiceException {
return updateEntities(Arrays.asList(entities));
}
public JSONArray updateEntities(Collection<Referenceable> entities) throws AtlasServiceException {
protected List<String> updateEntities(JSONArray entities) throws AtlasServiceException {
LOG.debug("Updating entities: {}", entities);
JSONObject response = callAPI(API.UPDATE_ENTITY, entities.toString());
List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>());
LOG.debug("Update entities returned results: {}", results);
return results;
}
public List<String> updateEntities(Collection<Referenceable> entities) throws AtlasServiceException {
JSONArray entitiesArray = getEntitiesArray(entities);
LOG.debug("Updating entities: {}", entitiesArray);
JSONObject response = callAPI(API.UPDATE_ENTITY, entitiesArray.toString());
try {
return response.getJSONArray(GUID);
} catch (JSONException e) {
throw new AtlasServiceException(API.UPDATE_ENTITY, e);
}
return updateEntities(entitiesArray);
}
/**
......@@ -651,6 +668,8 @@ public class AtlasClient {
Referenceable entity) throws AtlasServiceException {
final API api = API.UPDATE_ENTITY_PARTIAL;
String entityJson = InstanceSerialization.toJson(entity, true);
LOG.debug("Updating entity type: {}, attributeName: {}, attributeValue: {}, entity: {}", entityType,
uniqueAttributeName, uniqueAttributeValue, entityJson);
JSONObject response = callAPIWithRetries(api, entityJson, new ResourceCreator() {
@Override
public WebResource createResource() {
......@@ -661,10 +680,16 @@ public class AtlasClient {
return resource;
}
});
String result = getString(response, GUID);
LOG.debug("Update entity returned result: {}", result);
return result;
}
protected String getString(JSONObject jsonObject, String parameter) throws AtlasServiceException {
try {
return response.getString(GUID);
return jsonObject.getString(parameter);
} catch (JSONException e) {
throw new AtlasServiceException(api, e);
throw new AtlasServiceException(e);
}
}
......@@ -676,6 +701,7 @@ public class AtlasClient {
* @throws AtlasServiceException
*/
public List<String> deleteEntities(final String ... guids) throws AtlasServiceException {
LOG.debug("Deleting entities: {}", guids);
JSONObject jsonResponse = callAPIWithRetries(API.DELETE_ENTITIES, null, new ResourceCreator() {
@Override
public WebResource createResource() {
......@@ -687,7 +713,9 @@ public class AtlasClient {
return resource;
}
});
return extractResults(jsonResponse, GUID, new ExtractOperation<String, String>());
List<String> results = extractResults(jsonResponse, GUID, new ExtractOperation<String, String>());
LOG.debug("Delete entities returned results: {}", results);
return results;
}
/**
......@@ -699,13 +727,17 @@ public class AtlasClient {
*/
public List<String> deleteEntity(String entityType, String uniqueAttributeName, String uniqueAttributeValue)
throws AtlasServiceException {
LOG.debug("Deleting entity type: {}, attributeName: {}, attributeValue: {}", entityType, uniqueAttributeName,
uniqueAttributeValue);
API api = API.DELETE_ENTITY;
WebResource resource = getResource(api);
resource = resource.queryParam(TYPE, entityType);
resource = resource.queryParam(ATTRIBUTE_NAME, uniqueAttributeName);
resource = resource.queryParam(ATTRIBUTE_VALUE, uniqueAttributeValue);
JSONObject jsonResponse = callAPIWithResource(API.DELETE_ENTITIES, resource, null);
return extractResults(jsonResponse, GUID, new ExtractOperation<String, String>());
List<String> results = extractResults(jsonResponse, GUID, new ExtractOperation<String, String>());
LOG.debug("Delete entities returned results: {}", results);
return results;
}
/**
......@@ -789,13 +821,13 @@ public class AtlasClient {
return extractResults(jsonResponse, AtlasClient.RESULTS, new ExtractOperation<String, String>());
}
private class ExtractOperation<T, U> {
protected class ExtractOperation<T, U> {
T extractElement(U element) throws JSONException {
return (T) element;
}
}
private <T, U> List<T> extractResults(JSONObject jsonResponse, String key, ExtractOperation<T, U> extractInterafce)
protected <T, U> List<T> extractResults(JSONObject jsonResponse, String key, ExtractOperation<T, U> extractInterafce)
throws AtlasServiceException {
try {
JSONArray results = jsonResponse.getJSONArray(key);
......@@ -1011,22 +1043,12 @@ public class AtlasClient {
private class AtlasClientContext {
private String[] baseUrls;
private Client client;
private final UserGroupInformation ugi;
private final String doAsUser;
private String doAsUser;
private UserGroupInformation ugi;
public AtlasClientContext(String[] baseUrls, Client client, UserGroupInformation ugi, String doAsUser) {
this.baseUrls = baseUrls;
this.client = client;
this.ugi = ugi;
this.doAsUser = doAsUser;
}
public UserGroupInformation getUgi() {
return ugi;
}
public String getDoAsUser() {
return doAsUser;
}
public Client getClient() {
......@@ -1036,6 +1058,14 @@ public class AtlasClient {
public String[] getBaseUrls() {
return baseUrls;
}
public String getDoAsUser() {
return doAsUser;
}
public UserGroupInformation getUgi() {
return ugi;
}
}
}
......@@ -19,6 +19,10 @@
package org.apache.atlas;
import com.sun.jersey.api.client.ClientResponse;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import javax.ws.rs.WebApplicationException;
public class AtlasServiceException extends Exception {
private ClientResponse.Status status;
......@@ -27,12 +31,19 @@ public class AtlasServiceException extends Exception {
super("Metadata service API " + api + " failed", e);
}
public AtlasServiceException(AtlasClient.API api, WebApplicationException e) throws JSONException {
this(api, ClientResponse.Status.fromStatusCode(e.getResponse().getStatus()),
((JSONObject) e.getResponse().getEntity()).getString("stackTrace"));
}
private AtlasServiceException(AtlasClient.API api, ClientResponse.Status status, String response) {
super("Metadata service API " + api + " failed with status " + status.getStatusCode() + "(" +
status.getReasonPhrase() + ") Response Body (" + response + ")");
this.status = status;
}
public AtlasServiceException(AtlasClient.API api, ClientResponse response) {
super("Metadata service API " + api + " failed with status " +
response.getClientResponseStatus().getStatusCode() + "(" +
response.getClientResponseStatus().getReasonPhrase() + ") Response Body (" +
response.getEntity(String.class) + ")");
this.status = response.getClientResponseStatus();
this(api, ClientResponse.Status.fromStatusCode(response.getStatus()), response.getEntity(String.class));
}
public AtlasServiceException(Exception e) {
......
......@@ -20,6 +20,7 @@ import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
import org.apache.atlas.AtlasException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
......@@ -60,7 +61,7 @@ public class SecureClientUtils {
public static URLConnectionClientHandler getClientConnectionHandler(DefaultClientConfig config,
org.apache.commons.configuration.Configuration clientConfig, final String doAsUser,
org.apache.commons.configuration.Configuration clientConfig, String doAsUser,
final UserGroupInformation ugi) {
config.getProperties().put(URLConnectionClientHandler.PROPERTY_HTTP_URL_CONNECTION_SET_METHOD_WORKAROUND, true);
Configuration conf = new Configuration();
......@@ -80,17 +81,16 @@ public class SecureClientUtils {
final DelegationTokenAuthenticatedURL.Token token = new DelegationTokenAuthenticatedURL.Token();
HttpURLConnectionFactory httpURLConnectionFactory = null;
try {
UserGroupInformation ugiToUse = ugi != null ?
ugi : UserGroupInformation.getCurrentUser();
UserGroupInformation ugiToUse = ugi != null ? ugi : UserGroupInformation.getCurrentUser();
final UserGroupInformation actualUgi =
(ugiToUse.getAuthenticationMethod() ==
UserGroupInformation.AuthenticationMethod.PROXY)
? ugiToUse.getRealUser()
: ugiToUse;
LOG.info("Real User: {}, is from ticket cache? {}",
actualUgi,
actualUgi.isLoginTicketBased());
(ugiToUse.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.PROXY)
? ugiToUse.getRealUser() : ugiToUse;
LOG.info("Real User: {}, is from ticket cache? {}", actualUgi, actualUgi.isLoginTicketBased());
if (StringUtils.isEmpty(doAsUser)) {
doAsUser = actualUgi.getShortUserName();
}
LOG.info("doAsUser: {}", doAsUser);
final String finalDoAsUser = doAsUser;
httpURLConnectionFactory = new HttpURLConnectionFactory() {
@Override
public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
......@@ -99,9 +99,8 @@ public class SecureClientUtils {
@Override
public HttpURLConnection run() throws Exception {
try {
return new DelegationTokenAuthenticatedURL(
finalAuthenticator, connConfigurator)
.openConnection(url, token, doAsUser);
return new DelegationTokenAuthenticatedURL(finalAuthenticator, connConfigurator)
.openConnection(url, token, finalDoAsUser);
} catch (Exception e) {
throw new IOException(e);
}
......
......@@ -30,14 +30,12 @@ import org.testng.annotations.Test;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import java.net.ConnectException;
import java.net.URI;
import java.net.URISyntaxException;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
......@@ -326,6 +324,7 @@ public class AtlasClientTest {
thenReturn(response);
when(resourceCreator.createResource()).thenReturn(resourceObject);
when(configuration.getString("atlas.http.authentication.type", "simple")).thenReturn("simple");
AtlasClient atlasClient = getClientForTest("http://localhost:31000");
......
......@@ -61,7 +61,7 @@
</logger>
<root>
<priority value="info"/>
<priority value="warn"/>
<appender-ref ref="FILE"/>
</root>
......
......@@ -1371,7 +1371,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-war-plugin</artifactId>
<version>2.4</version>
<version>2.6</version>
</plugin>
<plugin>
......@@ -1657,15 +1657,17 @@
<exclude>**/overlays/**</exclude>
<exclude>dev-support/**</exclude>
<exclude>**/users-credentials.properties</exclude>
<exclude>**/public/css/animate.min.css</exclude>
<exclude>**/public/css/fonts/**</exclude>
<exclude>**/public/css/font-awesome.min.css</exclude>
<exclude>**/public/js/require-handlebars-plugin/**</exclude>
<exclude>**/node_modules/**</exclude>
<!-- All the npm plugins are copied here, so exclude it -->
<exclude>**/public/js/libs/**</exclude>
<exclude>**/public/css/animate.min.css</exclude>
<exclude>**/public/css/fonts/**</exclude>
<exclude>**/public/css/font-awesome.min.css</exclude>
<exclude>**/public/js/require-handlebars-plugin/**</exclude>
<exclude>**/node_modules/**</exclude>
<!-- All the npm plugins are copied here, so exclude it -->
<exclude>**/public/js/libs/**</exclude>
<!-- atlas data directory creates when tests are run from IDE -->
<exclude>**/atlas.data/**</exclude>
<exclude>**/${sys:atlas.data}/**</exclude>
</excludes>
</configuration>
<executions>
......
......@@ -18,6 +18,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
ATLAS-585 NotificationHookConsumer creates new AtlasClient for every message (shwethags)
ATLAS-682 Set HBase root dir to be relative to test target directory for HBaseBasedAuditRepositoryTest (shwethags via yhemanth)
ATLAS-742 Avoid downloading hbase multiple times (shwethags via yhemanth)
ATLAS-659 atlas_start fails on Windows (dkantor via shwethags)
......
......@@ -106,6 +106,12 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
// create a composite index for entity state
createCompositeAndMixedIndex(management, Constants.STATE_PROPERTY_KEY, String.class, false, Cardinality.SINGLE, true);
// create a composite index for entity state
createCompositeAndMixedIndex(management, Constants.TIMESTAMP_PROPERTY_KEY, Long.class, false, Cardinality.SINGLE, true);
// create a composite index for entity state
createCompositeAndMixedIndex(management, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class, false, Cardinality.SINGLE, true);
// create a composite and mixed index for type since it can be combined with other keys
createCompositeAndMixedIndex(management, Constants.ENTITY_TYPE_PROPERTY_KEY, String.class, false, Cardinality.SINGLE,
true);
......
......@@ -301,6 +301,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-war-plugin</artifactId>
<configuration>
<archiveClasses>true</archiveClasses>
<attachClasses>true</attachClasses>
<overlays>
<!-- <overlay>
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas;
import com.google.inject.Inject;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.web.filters.AuditFilter;
import org.apache.atlas.web.resources.EntityResource;
import org.apache.atlas.web.service.ServiceState;
import org.apache.atlas.web.util.DateTimeHelper;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import java.util.Date;
import java.util.List;
/**
* Local atlas client which calls the resource methods directly. Used by NotificationHookConsumer.
*/
public class LocalAtlasClient extends AtlasClient {
private static final String LOCALHOST = "localhost";
private static final String CLASS = LocalAtlasClient.class.getSimpleName();
public static final Logger LOG = LoggerFactory.getLogger(LocalAtlasClient.class);
private final EntityResource entityResource;
private final ServiceState serviceState;
@Inject
public LocalAtlasClient(ServiceState serviceState, EntityResource entityResource) {
super();
this.serviceState = serviceState;
this.entityResource = entityResource;
}
private String user;
public void setUser(String user) {
this.user = user;
}
private void setRequestContext() {
RequestContext requestContext = RequestContext.createContext();
requestContext.setUser(user);
}
@Override
public boolean isServerReady() throws AtlasServiceException {
return serviceState.getState() == ServiceState.ServiceStateValue.ACTIVE;
}
@Override
protected List<String> createEntity(final JSONArray entities) throws AtlasServiceException {
LOG.debug("Creating entities: {}", entities);
EntityOperation entityOperation = new EntityOperation(API.CREATE_ENTITY) {
@Override
Response invoke() {
return entityResource.submit(new LocalServletRequest(entities.toString()));
}
};
JSONObject response = entityOperation.run();
List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>());
LOG.debug("Create entities returned results: {}", results);
return results;
}
@Override
protected List<String> updateEntities(final JSONArray entities) throws AtlasServiceException {
LOG.debug("Updating entities: {}", entities);
EntityOperation entityOperation = new EntityOperation(API.UPDATE_ENTITY) {
@Override
Response invoke() {
return entityResource.updateEntities(new LocalServletRequest(entities.toString()));
}
};
JSONObject response = entityOperation.run();
List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>());
LOG.debug("Update entities returned results: {}", results);
return results;
}
private abstract class EntityOperation {
private final API api;
public EntityOperation(API api) {
this.api = api;
}
public JSONObject run() throws AtlasServiceException {
setRequestContext();
AuditFilter.audit(user, CLASS, api.getMethod(), LOCALHOST, api.getPath(), LOCALHOST, DateTimeHelper.formatDateUTC(new Date()));
try {
Response response = invoke();
return (JSONObject) response.getEntity();
} catch(WebApplicationException e) {
try {
throw new AtlasServiceException(api, e);
} catch (JSONException e1) {
throw new AtlasServiceException(e);
}
}
}
abstract Response invoke();
}
@Override
public String updateEntity(final String entityType, final String uniqueAttributeName,
final String uniqueAttributeValue, Referenceable entity) throws AtlasServiceException {
final String entityJson = InstanceSerialization.toJson(entity, true);
LOG.debug("Updating entity type: {}, attributeName: {}, attributeValue: {}, entity: {}", entityType,
uniqueAttributeName, uniqueAttributeValue, entityJson);
EntityOperation entityOperation = new EntityOperation(API.UPDATE_ENTITY_PARTIAL) {
@Override
Response invoke() {
return entityResource.updateByUniqueAttribute(entityType, uniqueAttributeName, uniqueAttributeValue,
new LocalServletRequest(entityJson));
}
};
JSONObject response = entityOperation.run();
String result = getString(response, GUID);
LOG.debug("Update entity returned result: {}", result);
return result;
}
@Override
public List<String> deleteEntity(final String entityType, final String uniqueAttributeName,
final String uniqueAttributeValue) throws AtlasServiceException {
LOG.debug("Deleting entity type: {}, attributeName: {}, attributeValue: {}", entityType, uniqueAttributeName,
uniqueAttributeValue);
EntityOperation entityOperation = new EntityOperation(API.DELETE_ENTITY) {
@Override
Response invoke() {
return entityResource.deleteEntities(null, entityType, uniqueAttributeName, uniqueAttributeValue);
}
};
JSONObject response = entityOperation.run();
List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>());
LOG.debug("Delete entities returned results: {}", results);
return results;
}
@Override
public String getAdminStatus() throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public List<String> createType(String typeAsJson) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public List<String> updateType(String typeAsJson) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public List<String> listTypes() throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public TypesDef getType(String typeName) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public void updateEntityAttribute(final String guid, final String attribute, String value) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public void updateEntity(String guid, Referenceable entity) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public List<String> deleteEntities(final String ... guids) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public Referenceable getEntity(String guid) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public Referenceable getEntity(final String entityType, final String attribute, final String value)
throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public List<String> listEntities(final String entityType) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public List<EntityAuditEvent> getEntityAuditEvents(String entityId, String startKey, short numResults)
throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public JSONArray search(final String searchQuery) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public JSONArray searchByDSL(final String query) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public JSONArray searchByGremlin(final String gremlinQuery) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public JSONObject searchByFullText(final String query) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public JSONObject getInputGraph(String datasetName) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public JSONObject getOutputGraph(String datasetName) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas;
import javax.servlet.AsyncContext;
import javax.servlet.DispatcherType;
import javax.servlet.RequestDispatcher;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
import javax.servlet.http.HttpUpgradeHandler;
import javax.servlet.http.Part;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.security.Principal;
import java.util.Collection;
import java.util.Enumeration;
import java.util.Locale;
import java.util.Map;
public class LocalServletRequest implements HttpServletRequest {
private final String payload;
LocalServletRequest(String payload) {
this.payload = payload;
}
public String getPayload() {
return payload;
}
@Override
public String getAuthType() {
throw new IllegalStateException("Not supported");
}
@Override
public Cookie[] getCookies() {
throw new IllegalStateException("Not supported");
}
@Override
public long getDateHeader(String name) {
throw new IllegalStateException("Not supported");
}
@Override
public String getHeader(String name) {
throw new IllegalStateException("Not supported");
}
@Override
public Enumeration<String> getHeaders(String name) {
throw new IllegalStateException("Not supported");
}
@Override
public Enumeration<String> getHeaderNames() {
throw new IllegalStateException("Not supported");
}
@Override
public int getIntHeader(String name) {
throw new IllegalStateException("Not supported");
}
@Override
public String getMethod() {
throw new IllegalStateException("Not supported");
}
@Override
public String getPathInfo() {
throw new IllegalStateException("Not supported");
}
@Override
public String getPathTranslated() {
throw new IllegalStateException("Not supported");
}
@Override
public String getContextPath() {
throw new IllegalStateException("Not supported");
}
@Override
public String getQueryString() {
throw new IllegalStateException("Not supported");
}
@Override
public String getRemoteUser() {
throw new IllegalStateException("Not supported");
}
@Override
public boolean isUserInRole(String role) {
throw new IllegalStateException("Not supported");
}
@Override
public Principal getUserPrincipal() {
throw new IllegalStateException("Not supported");
}
@Override
public String getRequestedSessionId() {
throw new IllegalStateException("Not supported");
}
@Override
public String getRequestURI() {
throw new IllegalStateException("Not supported");
}
@Override
public StringBuffer getRequestURL() {
throw new IllegalStateException("Not supported");
}
@Override
public String getServletPath() {
throw new IllegalStateException("Not supported");
}
@Override
public HttpSession getSession(boolean create) {
throw new IllegalStateException("Not supported");
}
@Override
public HttpSession getSession() {
throw new IllegalStateException("Not supported");
}
@Override
public String changeSessionId() {
throw new IllegalStateException("Not supported");
}
@Override
public boolean isRequestedSessionIdValid() {
throw new IllegalStateException("Not supported");
}
@Override
public boolean isRequestedSessionIdFromCookie() {
throw new IllegalStateException("Not supported");
}
@Override
public boolean isRequestedSessionIdFromURL() {
throw new IllegalStateException("Not supported");
}
@Override
public boolean isRequestedSessionIdFromUrl() {
throw new IllegalStateException("Not supported");
}
@Override
public boolean authenticate(HttpServletResponse response) throws IOException, ServletException {
throw new IllegalStateException("Not supported");
}
@Override
public void login(String username, String password) throws ServletException {
throw new IllegalStateException("Not supported");
}
@Override
public void logout() throws ServletException {
throw new IllegalStateException("Not supported");
}
@Override
public Collection<Part> getParts() throws IOException, ServletException {
throw new IllegalStateException("Not supported");
}
@Override
public Part getPart(String name) throws IOException, ServletException {
throw new IllegalStateException("Not supported");
}
@Override
public <T extends HttpUpgradeHandler> T upgrade(Class<T> handlerClass) throws IOException, ServletException {
throw new IllegalStateException("Not supported");
}
@Override
public Object getAttribute(String name) {
throw new IllegalStateException("Not supported");
}
@Override
public Enumeration<String> getAttributeNames() {
throw new IllegalStateException("Not supported");
}
@Override
public String getCharacterEncoding() {
throw new IllegalStateException("Not supported");
}
@Override
public void setCharacterEncoding(String env) throws UnsupportedEncodingException {
throw new IllegalStateException("Not supported");
}
@Override
public int getContentLength() {
throw new IllegalStateException("Not supported");
}
@Override
public long getContentLengthLong() {
throw new IllegalStateException("Not supported");
}
@Override
public String getContentType() {
throw new IllegalStateException("Not supported");
}
@Override
public ServletInputStream getInputStream() throws IOException {
throw new IllegalStateException("Not supported");
}
@Override
public String getParameter(String name) {
throw new IllegalStateException("Not supported");
}
@Override
public Enumeration<String> getParameterNames() {
throw new IllegalStateException("Not supported");
}
@Override
public String[] getParameterValues(String name) {
throw new IllegalStateException("Not supported");
}
@Override
public Map<String, String[]> getParameterMap() {
throw new IllegalStateException("Not supported");
}
@Override
public String getProtocol() {
throw new IllegalStateException("Not supported");
}
@Override
public String getScheme() {
throw new IllegalStateException("Not supported");
}
@Override
public String getServerName() {
throw new IllegalStateException("Not supported");
}
@Override
public int getServerPort() {
throw new IllegalStateException("Not supported");
}
@Override
public BufferedReader getReader() throws IOException {
throw new IllegalStateException("Not supported");
}
@Override
public String getRemoteAddr() {
throw new IllegalStateException("Not supported");
}
@Override
public String getRemoteHost() {
throw new IllegalStateException("Not supported");
}
@Override
public void setAttribute(String name, Object o) {
throw new IllegalStateException("Not supported");
}
@Override
public void removeAttribute(String name) {
throw new IllegalStateException("Not supported");
}
@Override
public Locale getLocale() {
throw new IllegalStateException("Not supported");
}
@Override
public Enumeration<Locale> getLocales() {
throw new IllegalStateException("Not supported");
}
@Override
public boolean isSecure() {
throw new IllegalStateException("Not supported");
}
@Override
public RequestDispatcher getRequestDispatcher(String path) {
throw new IllegalStateException("Not supported");
}
@Override
public String getRealPath(String path) {
throw new IllegalStateException("Not supported");
}
@Override
public int getRemotePort() {
throw new IllegalStateException("Not supported");
}
@Override
public String getLocalName() {
throw new IllegalStateException("Not supported");
}
@Override
public String getLocalAddr() {
throw new IllegalStateException("Not supported");
}
@Override
public int getLocalPort() {
throw new IllegalStateException("Not supported");
}
@Override
public ServletContext getServletContext() {
throw new IllegalStateException("Not supported");
}
@Override
public AsyncContext startAsync() throws IllegalStateException {
throw new IllegalStateException("Not supported");
}
@Override
public AsyncContext startAsync(ServletRequest servletRequest, ServletResponse servletResponse)
throws IllegalStateException {
throw new IllegalStateException("Not supported");
}
@Override
public boolean isAsyncStarted() {
throw new IllegalStateException("Not supported");
}
@Override
public boolean isAsyncSupported() {
throw new IllegalStateException("Not supported");
}
@Override
public AsyncContext getAsyncContext() {
throw new IllegalStateException("Not supported");
}
@Override
public DispatcherType getDispatcherType() {
throw new IllegalStateException("Not supported");
}
}
......@@ -21,7 +21,6 @@ package org.apache.atlas.examples;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
......@@ -112,9 +111,9 @@ public class QuickStart {
private final AtlasClient metadataServiceClient;
QuickStart(String baseUrl) {
QuickStart(String baseUrl) throws AtlasException {
String[] urls = baseUrl.split(",");
metadataServiceClient = new AtlasClient(null, null, urls);
metadataServiceClient = new AtlasClient(urls);
}
void createTypes() throws Exception {
......@@ -292,11 +291,11 @@ public class QuickStart {
String entityJSON = InstanceSerialization.toJson(referenceable, true);
System.out.println("Submitting new entity= " + entityJSON);
JSONArray guids = metadataServiceClient.createEntity(entityJSON);
List<String> guids = metadataServiceClient.createEntity(entityJSON);
System.out.println("created instance for type " + typeName + ", guid: " + guids);
// return the Id for created instance with guid
return new Id(guids.getString(guids.length()-1), referenceable.getId().getVersion(),
return new Id(guids.get(guids.size() - 1), referenceable.getId().getVersion(),
referenceable.getTypeName());
}
......
......@@ -17,18 +17,18 @@
*/
package org.apache.atlas.notification;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import kafka.consumer.ConsumerTimeoutException;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.LocalAtlasClient;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -45,20 +45,21 @@ import java.util.concurrent.atomic.AtomicBoolean;
@Singleton
public class NotificationHookConsumer implements Service, ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName();
public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
public static final String ATLAS_ENDPOINT_PROPERTY = "atlas.rest.address";
public static final int SERVER_READY_WAIT_TIME_MS = 1000;
private final LocalAtlasClient atlasClient;
private NotificationInterface notificationInterface;
private ExecutorService executors;
private String atlasEndpoint;
private Configuration applicationProperties;
private List<HookConsumer> consumers;
@Inject
public NotificationHookConsumer(NotificationInterface notificationInterface) {
public NotificationHookConsumer(NotificationInterface notificationInterface, LocalAtlasClient atlasClient) {
this.notificationInterface = notificationInterface;
this.atlasClient = atlasClient;
}
@Override
......@@ -70,7 +71,6 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
void startInternal(Configuration configuration,
ExecutorService executorService) {
this.applicationProperties = configuration;
this.atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000");
if (consumers == null) {
consumers = new ArrayList<>();
}
......@@ -88,7 +88,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
List<NotificationConsumer<HookNotification.HookNotificationMessage>> notificationConsumers =
notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
if (executorService == null) {
executorService = Executors.newFixedThreadPool(notificationConsumers.size());
executorService = Executors.newFixedThreadPool(notificationConsumers.size(),
new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
}
executors = executorService;
for (final NotificationConsumer<HookNotification.HookNotificationMessage> consumer : notificationConsumers) {
......@@ -183,9 +184,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
try {
if (hasNext()) {
HookNotification.HookNotificationMessage message = consumer.next();
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(message.getUser());
AtlasClient atlasClient = getAtlasClient(ugi);
atlasClient.setUser(message.getUser());
try {
switch (message.getType()) {
case ENTITY_CREATE:
......@@ -230,13 +229,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
}
protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
return new AtlasClient(atlasEndpoint, ugi, ugi.getShortUserName());
}
boolean serverAvailable(Timer timer) {
try {
AtlasClient atlasClient = getAtlasClient(UserGroupInformation.getCurrentUser());
while (!atlasClient.isServerReady()) {
try {
LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...",
......
......@@ -25,7 +25,6 @@ import org.apache.atlas.security.SecurityProperties;
import org.apache.atlas.web.util.Servlets;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
......@@ -57,10 +56,6 @@ public class AtlasAuthenticationFilter extends AuthenticationFilter {
private static final Logger LOG = LoggerFactory.getLogger(AtlasAuthenticationFilter.class);
static final String PREFIX = "atlas.http.authentication";
/**
* An options servlet is used to authenticate users. OPTIONS method is used for triggering authentication
* before invoking the actual resource.
*/
private HttpServlet optionsServlet;
/**
......@@ -128,47 +123,45 @@ public class AtlasAuthenticationFilter extends AuthenticationFilter {
@Override
public void doFilter(final ServletRequest request, final ServletResponse response,
final FilterChain filterChain) throws IOException, ServletException {
FilterChain filterChainWrapper = new FilterChain() {
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse)
throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
final HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
if (httpRequest.getMethod().equals("OPTIONS")) { // option request meant only for authentication
if (httpRequest.getMethod().equals("OPTIONS")) {
optionsServlet.service(request, response);
} else {
final String user = Servlets.getUserFromRequest(httpRequest);
if (StringUtils.isEmpty(user)) {
((HttpServletResponse) response).sendError(Response.Status.BAD_REQUEST.getStatusCode(),
"Param user.name can't be empty");
} else {
try {
NDC.push(user + ":" + httpRequest.getMethod() + httpRequest.getRequestURI());
RequestContext requestContext = RequestContext.get();
requestContext.setUser(user);
LOG.info("Request from authenticated user: {}, URL={}", user,
Servlets.getRequestURI(httpRequest));
filterChain.doFilter(servletRequest, servletResponse);
} finally {
NDC.pop();
}
try {
String requestUser = httpRequest.getRemoteUser();
NDC.push(requestUser + ":" + httpRequest.getMethod() + httpRequest.getRequestURI());
RequestContext requestContext = RequestContext.get();
requestContext.setUser(requestUser);
LOG.info("Request from authenticated user: {}, URL={}", requestUser,
Servlets.getRequestURI(httpRequest));
filterChain.doFilter(servletRequest, servletResponse);
} finally {
NDC.pop();
}
}
}
};
super.doFilter(request, response, filterChainWrapper);
try {
super.doFilter(request, response, filterChainWrapper);
} catch (NullPointerException e) {
//PseudoAuthenticationHandler.getUserName() from hadoop-auth throws NPE if user name is not specified
((HttpServletResponse) response).sendError(Response.Status.BAD_REQUEST.getStatusCode(),
"Authentication is enabled and user is not specified. Specify user.name parameter");
}
}
@Override
public void destroy() {
if (optionsServlet != null) {
optionsServlet.destroy();
}
optionsServlet.destroy();
super.destroy();
}
}
......@@ -99,7 +99,7 @@ public class AuditFilter implements Filter {
return userFromRequest == null ? "UNKNOWN" : userFromRequest;
}
private void audit(String who, String fromAddress, String whatRequest, String fromHost, String whatURL, String whatAddrs,
public static void audit(String who, String fromAddress, String whatRequest, String fromHost, String whatURL, String whatAddrs,
String whenISO9601) {
AUDIT_LOG.info("Audit: {}/{}-{} performed request {} {} ({}) at time {}", who, fromAddress, fromHost, whatRequest, whatURL,
whatAddrs, whenISO9601);
......
......@@ -58,7 +58,6 @@ public class GuiceServletConfig extends GuiceServletContextListener {
private static final Logger LOG = LoggerFactory.getLogger(GuiceServletConfig.class);
private static final String GUICE_CTX_PARAM = "guice.packages";
static final String HTTP_AUTHENTICATION_ENABLED = "atlas.http.authentication.enabled";
protected volatile Injector injector;
@Override
......@@ -126,7 +125,7 @@ public class GuiceServletConfig extends GuiceServletContextListener {
if (configuration == null) {
throw new ConfigurationException("Could not load application configuration");
}
if (Boolean.valueOf(configuration.getString(HTTP_AUTHENTICATION_ENABLED))) {
if (Boolean.valueOf(configuration.getString(AtlasClient.HTTP_AUTHENTICATION_ENABLED))) {
LOG.info("Enabling AuthenticationFilter");
filter("/*").through(AtlasAuthenticationFilter.class);
}
......
......@@ -19,6 +19,7 @@
package org.apache.atlas.web.resources;
import com.google.inject.Inject;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.web.service.ServiceState;
import org.apache.atlas.web.util.Servlets;
import org.apache.commons.configuration.ConfigurationException;
......@@ -113,7 +114,7 @@ public class AdminResource {
public Response getStatus() {
JSONObject responseData = new JSONObject();
try {
responseData.put("Status", serviceState.getState().toString());
responseData.put(AtlasClient.STATUS, serviceState.getState().toString());
Response response = Response.ok(responseData).build();
return response;
} catch (JSONException e) {
......
......@@ -330,7 +330,6 @@ public class EntityResource {
* @param entityType the entity type
* @param attribute the unique attribute used to identify the entity
* @param value the unique attribute value used to identify the entity
* @param request - Ignored
* @return response payload as json - including guids of entities(including composite references from that entity) that were deleted
*/
@DELETE
......@@ -338,8 +337,7 @@ public class EntityResource {
public Response deleteEntities(@QueryParam("guid") List<String> guids,
@QueryParam("type") String entityType,
@QueryParam("property") String attribute,
@QueryParam("value") String value,
@Context HttpServletRequest request) {
@QueryParam("value") String value) {
try {
List<String> deletedGuids = new ArrayList<>();
......
......@@ -19,10 +19,13 @@
package org.apache.atlas.web.util;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.LocalServletRequest;
import org.apache.atlas.utils.ParamChecker;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URLEncodedUtils;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
......@@ -34,6 +37,8 @@ import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.charset.Charset;
import java.util.List;
/**
* Utility functions for dealing with servlets.
......@@ -70,6 +75,28 @@ public final class Servlets {
return user;
}
user = getDoAsUser(httpRequest);
if (!StringUtils.isEmpty(user)) {
return user;
}
return null;
}
private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
private static final String DO_AS = "doAs";
public static String getDoAsUser(HttpServletRequest request) {
if (StringUtils.isNoneEmpty(request.getQueryString())) {
List<NameValuePair> list = URLEncodedUtils.parse(request.getQueryString(), UTF8_CHARSET);
if (list != null) {
for (NameValuePair nv : list) {
if (DO_AS.equals(nv.getName())) {
return nv.getValue();
}
}
}
}
return null;
}
......@@ -134,6 +161,11 @@ public final class Servlets {
}
public static String getRequestPayload(HttpServletRequest request) throws IOException {
//request is an instance of LocalServletRequest for calls from LocalAtlasClient
if (request instanceof LocalServletRequest) {
return ((LocalServletRequest) request).getPayload();
}
StringWriter writer = new StringWriter();
IOUtils.copy(request.getInputStream(), writer);
return writer.toString();
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.web.resources.EntityResource;
import org.apache.atlas.web.service.ServiceState;
import org.apache.commons.lang.RandomStringUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import java.util.Arrays;
import java.util.List;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
public class LocalAtlasClientTest {
@Mock
private EntityResource entityResource;
@Mock
private ServiceState serviceState;
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Test
public void testCreateEntity() throws Exception {
Response response = mock(Response.class);
when(entityResource.submit(any(HttpServletRequest.class))).thenReturn(response);
final String guid = random();
when(response.getEntity()).thenReturn(new JSONObject() {{
put(AtlasClient.GUID, new JSONArray(Arrays.asList(guid)));
}});
LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, entityResource);
List<String> results = atlasClient.createEntity(new Referenceable(random()));
assertEquals(results.size(), 1);
assertEquals(results.get(0), guid);
}
@Test
public void testException() throws Exception {
LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, entityResource);
Response response = mock(Response.class);
when(entityResource.submit(any(HttpServletRequest.class))).thenThrow(new WebApplicationException(response));
when(response.getEntity()).thenReturn(new JSONObject() {{
put("stackTrace", "stackTrace");
}});
when(response.getStatus()).thenReturn(Response.Status.BAD_REQUEST.getStatusCode());
try {
atlasClient.createEntity(new Referenceable(random()));
fail("Expected AtlasServiceException");
} catch(AtlasServiceException e) {
assertEquals(e.getStatus(), ClientResponse.Status.BAD_REQUEST);
}
when(entityResource.updateByUniqueAttribute(anyString(), anyString(), anyString(),
any(HttpServletRequest.class))).thenThrow(new WebApplicationException(response));
when(response.getStatus()).thenReturn(Response.Status.NOT_FOUND.getStatusCode());
try {
atlasClient.updateEntity(random(), random(), random(), new Referenceable(random()));
fail("Expected AtlasServiceException");
} catch(AtlasServiceException e) {
assertEquals(e.getStatus(), ClientResponse.Status.NOT_FOUND);
}
}
@Test
public void testIsServerReady() throws Exception {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, entityResource);
assertTrue(atlasClient.isServerReady());
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.BECOMING_ACTIVE);
assertFalse(atlasClient.isServerReady());
}
@Test
public void testUpdateEntity() throws Exception {
final String guid = random();
Response response = mock(Response.class);
when(entityResource.updateByUniqueAttribute(anyString(), anyString(), anyString(),
any(HttpServletRequest.class))).thenReturn(response);
when(response.getEntity()).thenReturn(new JSONObject() {{
put(AtlasClient.GUID, guid);
}});
LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, entityResource);
String actualId = atlasClient.updateEntity(random(), random(), random(), new Referenceable(random()));
assertEquals(actualId, guid);
}
@Test
public void testDeleteEntity() throws Exception {
final String guid = random();
Response response = mock(Response.class);
when(response.getEntity()).thenReturn(new JSONObject() {{
put(AtlasClient.GUID, new JSONArray(Arrays.asList(guid)));
}});
when(entityResource.deleteEntities(anyListOf(String.class), anyString(), anyString(), anyString())).thenReturn(response);
LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, entityResource);
List<String> results = atlasClient.deleteEntity(random(), random(), random());
assertEquals(results.size(), 1);
assertEquals(results.get(0), guid);
}
private String random() {
return RandomStringUtils.randomAlphanumeric(10);
}
}
......@@ -19,6 +19,7 @@
package org.apache.atlas.notification;
import com.google.inject.Inject;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.persistence.Id;
......@@ -29,6 +30,8 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.util.List;
import static org.testng.Assert.assertEquals;
@Guice(modules = NotificationModule.class)
......@@ -55,6 +58,28 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
}
@Test
public void testMessageHandleFailureConsumerContinues() throws Exception {
//send invalid message - update with invalid type
sendHookMessage(new HookNotification.EntityPartialUpdateRequest(TEST_USER, randomString(), null, null,
new Referenceable(randomString())));
//send valid message
final Referenceable entity = new Referenceable(DATABASE_TYPE);
entity.set("name", "db" + randomString());
entity.set("description", randomString());
sendHookMessage(new HookNotification.EntityCreateRequest(TEST_USER, entity));
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
entity.get("name")));
return results.length() == 1;
}
});
}
@Test
public void testCreateEntity() throws Exception {
final Referenceable entity = new Referenceable(DATABASE_TYPE);
entity.set("name", "db" + randomString());
......@@ -70,6 +95,13 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
return results.length() == 1;
}
});
//Assert that user passed in hook message is used in audit
Referenceable instance = serviceClient.getEntity(DATABASE_TYPE, "name", (String) entity.get("name"));
List<EntityAuditEvent> events =
serviceClient.getEntityAuditEvents(instance.getId()._getId(), (short) 1);
assertEquals(events.size(), 1);
assertEquals(events.get(0).getUser(), TEST_USER);
}
@Test
......@@ -132,7 +164,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
final String dbName = "db" + randomString();
entity.set("name", dbName);
entity.set("description", randomString());
final String dbId = serviceClient.createEntity(entity).getString(0);
final String dbId = serviceClient.createEntity(entity).get(0);
sendHookMessage(
new HookNotification.EntityDeleteRequest(TEST_USER, DATABASE_TYPE, "name", dbName));
......
......@@ -19,7 +19,7 @@ package org.apache.atlas.notification;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.atlas.LocalAtlasClient;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.commons.configuration.Configuration;
import org.mockito.Mock;
......@@ -31,7 +31,13 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue;
......@@ -41,7 +47,7 @@ public class NotificationHookConsumerTest {
private NotificationInterface notificationInterface;
@Mock
private AtlasClient atlasClient;
private LocalAtlasClient atlasClient;
@Mock
private Configuration configuration;
......@@ -56,14 +62,9 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumerCanProceedIfServerIsReady() throws InterruptedException, AtlasServiceException {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
@Override
protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
return atlasClient;
}
};
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
when(atlasClient.isServerReady()).thenReturn(true);
......@@ -74,14 +75,9 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws AtlasServiceException, InterruptedException {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
@Override
protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
return atlasClient;
}
};
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
when(atlasClient.isServerReady()).thenReturn(false, false, false, true);
......@@ -92,14 +88,9 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException, InterruptedException {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
@Override
protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
return atlasClient;
}
};
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
doThrow(new InterruptedException()).when(timer).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
when(atlasClient.isServerReady()).thenReturn(false);
......@@ -109,14 +100,9 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumerProceedsWithFalseOnAtlasServiceException() throws AtlasServiceException {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) {
@Override
protected AtlasClient getAtlasClient(UserGroupInformation ugi) {
return atlasClient;
}
};
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
when(atlasClient.isServerReady()).thenThrow(new AtlasServiceException(AtlasClient.API.VERSION,
new Exception()));
......@@ -132,7 +118,7 @@ public class NotificationHookConsumerTest {
consumers.add(mock(NotificationConsumer.class));
when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
thenReturn(consumers);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
notificationHookConsumer.startInternal(configuration, executorService);
verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1);
verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
......@@ -146,7 +132,7 @@ public class NotificationHookConsumerTest {
consumers.add(mock(NotificationConsumer.class));
when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
thenReturn(consumers);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
notificationHookConsumer.startInternal(configuration, executorService);
verifyZeroInteractions(notificationInterface);
}
......@@ -159,7 +145,7 @@ public class NotificationHookConsumerTest {
consumers.add(mock(NotificationConsumer.class));
when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
thenReturn(consumers);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
notificationHookConsumer.startInternal(configuration, executorService);
notificationHookConsumer.instanceIsActive();
verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1);
......@@ -174,7 +160,7 @@ public class NotificationHookConsumerTest {
consumers.add(mock(NotificationConsumer.class));
when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
thenReturn(consumers);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
notificationHookConsumer.startInternal(configuration, executorService);
notificationHookConsumer.instanceIsPassive();
verify(notificationInterface).close();
......
......@@ -19,7 +19,7 @@ package org.apache.atlas.web.filters;
import org.apache.atlas.RequestContext;
import org.apache.atlas.web.security.BaseSecurityTest;
import org.apache.atlas.web.service.EmbeddedServer;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.eclipse.jetty.server.Server;
......@@ -39,14 +39,13 @@ import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.Properties;
import static org.testng.Assert.assertEquals;
/**
*
*/
public class AtlasAuthenticationKerberosFilterIT extends BaseSecurityTest {
public class AtlasAuthenticationKerberosFilterTest extends BaseSecurityTest {
public static final String TEST_USER_JAAS_SECTION = "TestUser";
public static final String TESTUSER = "testuser";
public static final String TESTPASS = "testpass";
......@@ -75,14 +74,14 @@ public class AtlasAuthenticationKerberosFilterIT extends BaseSecurityTest {
@Test(enabled = false)
public void testKerberosBasedLogin() throws Exception {
String originalConf = System.getProperty("atlas.conf");
System.setProperty("atlas.conf", System.getProperty("user.dir"));
setupKDCAndPrincipals();
TestEmbeddedServer server = null;
try {
// setup the atlas-application.properties file
generateKerberosTestProperties();
String confDirectory = generateKerberosTestProperties();
System.setProperty("atlas.conf", confDirectory);
// need to create the web application programmatically in order to control the injection of the test
// application properties
......@@ -127,8 +126,6 @@ public class AtlasAuthenticationKerberosFilterIT extends BaseSecurityTest {
}
}
}
protected Subject loginTestUser() throws LoginException, IOException {
......@@ -153,8 +150,8 @@ public class AtlasAuthenticationKerberosFilterIT extends BaseSecurityTest {
return lc.getSubject();
}
protected void generateKerberosTestProperties() throws IOException, ConfigurationException {
Properties props = new Properties();
protected String generateKerberosTestProperties() throws Exception {
PropertiesConfiguration props = new PropertiesConfiguration();
props.setProperty("atlas.http.authentication.enabled", "true");
props.setProperty("atlas.http.authentication.type", "kerberos");
props.setProperty("atlas.http.authentication.kerberos.principal", "HTTP/localhost@" + kdc.getRealm());
......@@ -162,7 +159,7 @@ public class AtlasAuthenticationKerberosFilterIT extends BaseSecurityTest {
props.setProperty("atlas.http.authentication.kerberos.name.rules",
"RULE:[1:$1@$0](.*@EXAMPLE.COM)s/@.*//\nDEFAULT");
generateTestProperties(props);
return writeConfiguration(props);
}
public void setupKDCAndPrincipals() throws Exception {
......
......@@ -19,21 +19,21 @@ package org.apache.atlas.web.filters;
import org.apache.atlas.RequestContext;
import org.apache.atlas.web.security.BaseSecurityTest;
import org.apache.atlas.web.service.EmbeddedServer;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.eclipse.jetty.server.Server;
import org.testng.annotations.Test;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Properties;
import static org.testng.Assert.assertEquals;
/**
*
*/
public class AtlasAuthenticationSimpleFilterIT extends BaseSecurityTest {
public class AtlasAuthenticationSimpleFilterTest extends BaseSecurityTest {
public static final String TESTUSER = "testuser";
class TestEmbeddedServer extends EmbeddedServer {
......@@ -61,19 +61,14 @@ public class AtlasAuthenticationSimpleFilterIT extends BaseSecurityTest {
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
connection.connect();
try {
assertEquals(connection.getResponseCode(), 403);
} catch (Exception e) {
e.printStackTrace();
}
assertEquals(connection.getResponseCode(), Response.Status.BAD_REQUEST.getStatusCode());
url = new URL("http://localhost:23001/?user.name=testuser");
connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
connection.connect();
assertEquals(connection.getResponseCode(), 200);
assertEquals(connection.getResponseCode(), Response.Status.OK.getStatusCode());
assertEquals(RequestContext.get().getUser(), TESTUSER);
} finally {
server.getServer().stop();
......@@ -83,16 +78,12 @@ public class AtlasAuthenticationSimpleFilterIT extends BaseSecurityTest {
System.clearProperty("atlas.conf");
}
}
}
protected void generateSimpleLoginConfiguration() throws IOException, ConfigurationException {
Properties config = new Properties();
protected String generateSimpleLoginConfiguration() throws Exception {
PropertiesConfiguration config = new PropertiesConfiguration();
config.setProperty("atlas.http.authentication.enabled", "true");
config.setProperty("atlas.http.authentication.type", "simple");
generateTestProperties(config);
return writeConfiguration(config);
}
}
......@@ -25,9 +25,7 @@ import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import kafka.consumer.ConsumerTimeoutException;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
......@@ -55,7 +53,6 @@ import org.apache.atlas.utils.ParamChecker;
import org.apache.atlas.web.util.Servlets;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -65,7 +62,6 @@ import org.testng.annotations.BeforeClass;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import java.util.List;
/**
......@@ -78,7 +74,8 @@ public abstract class BaseResourceIT {
protected WebResource service;
protected AtlasClient serviceClient;
public static final Logger LOG = LoggerFactory.getLogger(BaseResourceIT.class);
protected static final int MAX_WAIT_TIME = 1000;
protected static final int MAX_WAIT_TIME = 60000;
protected String baseUrl;
@BeforeClass
public void setUp() throws Exception {
......@@ -86,7 +83,7 @@ public abstract class BaseResourceIT {
DefaultClientConfig config = new DefaultClientConfig();
Client client = Client.create(config);
Configuration configuration = ApplicationProperties.get();
String baseUrl = configuration.getString(ATLAS_REST_ADDRESS, "http://localhost:21000/");
baseUrl = configuration.getString(ATLAS_REST_ADDRESS, "http://localhost:21000/");
client.resource(UriBuilder.fromUri(baseUrl).build());
service = client.resource(UriBuilder.fromUri(baseUrl).build());
......@@ -126,12 +123,12 @@ public abstract class BaseResourceIT {
String entityJSON = InstanceSerialization.toJson(referenceable, true);
System.out.println("Submitting new entity= " + entityJSON);
JSONArray guids = serviceClient.createEntity(entityJSON);
List<String> guids = serviceClient.createEntity(entityJSON);
System.out.println("created instance for type " + typeName + ", guid: " + guids);
// return the reference to created instance with guid
if (guids.length() > 0) {
return new Id(guids.getString(guids.length() - 1), 0, referenceable.getTypeName());
if (guids.size() > 0) {
return new Id(guids.get(guids.size() - 1), 0, referenceable.getTypeName());
}
return null;
}
......
......@@ -25,7 +25,6 @@ import com.google.gson.JsonSyntaxException;
import com.google.inject.Inject;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.EntityAuditEvent;
......@@ -51,6 +50,7 @@ import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.apache.atlas.web.util.Servlets;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
......@@ -64,7 +64,6 @@ import org.testng.annotations.Test;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
......@@ -122,6 +121,22 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
}
@Test
public void testRequestUser() throws Exception {
Referenceable entity = new Referenceable(DATABASE_TYPE);
entity.set("name", randomString());
entity.set("description", randomString());
String user = "testuser";
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
AtlasClient localClient = new AtlasClient(ugi, null, baseUrl);
String entityId = localClient.createEntity(entity).get(0);
List<EntityAuditEvent> events = serviceClient.getEntityAuditEvents(entityId, (short) 10);
assertEquals(events.size(), 1);
assertEquals(events.get(0).getUser(), user);
}
@Test
//API should accept single entity (or jsonarray of entities)
public void testSubmitSingleEntity() throws Exception {
Referenceable databaseInstance = new Referenceable(DATABASE_TYPE);
......@@ -149,7 +164,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
db.set("name", dbName);
db.set("description", randomString());
final String dbid = serviceClient.createEntity(db).getString(0);
final String dbid = serviceClient.createEntity(db).get(0);
assertEntityAudit(dbid, EntityAuditEvent.EntityAuditAction.ENTITY_CREATE);
waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() {
......@@ -164,8 +179,8 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
assertEquals(results.length(), 1);
//create entity again shouldn't create another instance with same unique attribute value
results = serviceClient.createEntity(db);
assertEquals(results.length(), 0);
List<String> entityResults = serviceClient.createEntity(db);
assertEquals(entityResults.size(), 0);
try {
waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() {
@Override
......@@ -214,7 +229,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
//create entity for the type
Referenceable instance = new Referenceable(typeDefinition.typeName);
instance.set("name", randomString());
String guid = serviceClient.createEntity(instance).getString(0);
String guid = serviceClient.createEntity(instance).get(0);
//update type - add attribute
typeDefinition = TypesUtil.createClassTypeDef(typeDefinition.typeName, ImmutableSet.<String>of(),
......
......@@ -33,6 +33,7 @@ import java.io.IOException;
*
*/
public class BaseSSLAndKerberosTest extends BaseSecurityTest {
public static final String TEST_USER_JAAS_SECTION = "TestUser";
public static final String TESTUSER = "testuser";
public static final String TESTPASS = "testpass";
protected static final String DGI_URL = "https://localhost:21443/";
......@@ -104,7 +105,7 @@ public class BaseSSLAndKerberosTest extends BaseSecurityTest {
kdc.createPrincipal(TESTUSER, TESTPASS);
StringBuilder jaas = new StringBuilder(1024);
jaas.append("TestUser {\n" +
jaas.append(TEST_USER_JAAS_SECTION + " {\n" +
" com.sun.security.auth.module.Krb5LoginModule required\nuseTicketCache=true;\n" +
"};\n");
jaas.append(createJAASEntry("Client", "dgi", userKeytabFile));
......
......@@ -140,10 +140,6 @@ public class BaseSecurityTest {
}
public static String writeConfiguration(final PropertiesConfiguration configuration) throws Exception {
String persistDir = TestUtils.getTempDirectory();
TestUtils.writeConfiguration(configuration, persistDir + File.separator +
ApplicationProperties.APPLICATION_PROPERTIES);
String confLocation = System.getProperty("atlas.conf");
URL url;
if (confLocation == null) {
......@@ -153,6 +149,10 @@ public class BaseSecurityTest {
}
PropertiesConfiguration configuredProperties = new PropertiesConfiguration();
configuredProperties.load(url);
configuredProperties.copy(configuration);
String persistDir = TestUtils.getTempDirectory();
TestUtils.writeConfiguration(configuredProperties, persistDir + File.separator +
ApplicationProperties.APPLICATION_PROPERTIES);
ApplicationProperties.forceReload();
......
......@@ -19,7 +19,6 @@
package org.apache.atlas.web.security;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.web.service.SecureEmbeddedServer;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.conf.Configuration;
......@@ -42,7 +41,7 @@ import static org.apache.atlas.security.SecurityProperties.SERVER_CERT_PASSWORD_
import static org.apache.atlas.security.SecurityProperties.TRUSTSTORE_PASSWORD_KEY;
public class SSLTest extends BaseSSLAndKerberosTest {
private AtlasClient dgiCLient;
private AtlasClient atlasClient;
private Path jksPath;
private String providerUrl;
private TestSecureEmbeddedServer secureEmbeddedServer;
......@@ -76,7 +75,7 @@ public class SSLTest extends BaseSSLAndKerberosTest {
final PropertiesConfiguration configuration = getSSLConfiguration(providerUrl);
String persistDir = writeConfiguration(configuration);
dgiCLient = new AtlasClient(DGI_URL) {
atlasClient = new AtlasClient(DGI_URL) {
@Override
protected PropertiesConfiguration getClientProperties() {
return configuration;
......@@ -139,6 +138,6 @@ public class SSLTest extends BaseSSLAndKerberosTest {
@Test
public void testService() throws Exception {
dgiCLient.listTypes();
atlasClient.listTypes();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment