Commit 41997f64 by Ashutosh Mestry

ATLAS-2802: Atlas Client Update for Export and Import.

parent 06ff0752
......@@ -30,9 +30,14 @@ import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.api.client.filter.HTTPBasicAuthFilter;
import com.sun.jersey.api.json.JSONConfiguration;
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
import com.sun.jersey.multipart.BodyPart;
import com.sun.jersey.multipart.FormDataBodyPart;
import com.sun.jersey.multipart.FormDataMultiPart;
import com.sun.jersey.multipart.MultiPart;
import com.sun.jersey.multipart.file.FileDataBodyPart;
import com.sun.jersey.multipart.file.StreamDataBodyPart;
import com.sun.jersey.multipart.impl.MultiPartWriter;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.metrics.AtlasMetrics;
......@@ -41,6 +46,7 @@ import org.apache.atlas.type.AtlasType;
import org.apache.atlas.utils.AtlasJson;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
......@@ -52,8 +58,11 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.URI;
import java.nio.file.Paths;
......@@ -69,6 +78,7 @@ public abstract class AtlasBaseClient {
public static final String ADMIN_STATUS = "admin/status";
public static final String ADMIN_METRICS = "admin/metrics";
public static final String ADMIN_IMPORT = "admin/import";
public static final String ADMIN_EXPORT = "admin/export";
public static final String HTTP_AUTHENTICATION_ENABLED = "atlas.http.authentication.enabled";
public static final String QUERY = "query";
......@@ -91,6 +101,10 @@ public abstract class AtlasBaseClient {
static final int DEFAULT_SLEEP_BETWEEN_RETRIES_MS = 5000;
private static final Logger LOG = LoggerFactory.getLogger(AtlasBaseClient.class);
private static final API IMPORT = new API(BASE_URI + ADMIN_IMPORT, HttpMethod.POST, Response.Status.OK, MediaType.MULTIPART_FORM_DATA, MediaType.APPLICATION_JSON);
private static final API EXPORT = new API(BASE_URI + ADMIN_EXPORT, HttpMethod.POST, Response.Status.OK, MediaType.APPLICATION_JSON, MediaType.APPLICATION_OCTET_STREAM);
private static final String IMPORT_REQUEST_PARAMTER = "request";
private static final String IMPORT_DATA_PARAMETER = "data";
protected WebResource service;
protected Configuration configuration;
private String basicAuthUser;
......@@ -251,6 +265,8 @@ public abstract class AtlasBaseClient {
// Enable POJO mapping feature
config.getFeatures().put(JSONConfiguration.FEATURE_POJO_MAPPING, Boolean.TRUE);
config.getClasses().add(JacksonJaxbJsonProvider.class);
config.getClasses().add(MultiPartWriter.class);
int readTimeout = configuration.getInt("atlas.client.readTimeoutMSecs", 60000);
int connectTimeout = configuration.getInt("atlas.client.connectTimeoutMSecs", 60000);
if (configuration.getBoolean(TLS_ENABLED, false)) {
......@@ -367,7 +383,9 @@ public abstract class AtlasBaseClient {
return null;
}
try {
if (responseType.getRawClass().equals(ObjectNode.class)) {
if(api.getProduces().equals(MediaType.APPLICATION_OCTET_STREAM)) {
return (T) IOUtils.toByteArray(clientResponse.getEntityInputStream());
} else if (responseType.getRawClass().equals(ObjectNode.class)) {
String stringEntity = clientResponse.getEntity(String.class);
try {
JsonNode jsonObject = AtlasJson.parseToV1JsonNode(stringEntity);
......@@ -385,6 +403,8 @@ public abstract class AtlasBaseClient {
}
} catch (ClientHandlerException e) {
throw new AtlasServiceException(api, e);
} catch (IOException e) {
throw new AtlasServiceException(api, e);
}
} else if (clientResponse.getStatus() != ClientResponse.Status.SERVICE_UNAVAILABLE.getStatusCode()) {
break;
......@@ -414,7 +434,7 @@ public abstract class AtlasBaseClient {
return getResource(service, api, queryParams);
}
protected abstract API formatPathParameters(API api, String ... params);
protected abstract API formatPathParameters(API api, String... params);
void initializeState(String[] baseUrls, UserGroupInformation ugi, String doAsUser) {
initializeState(getClientProperties(), baseUrls, ugi, doAsUser);
......@@ -446,15 +466,59 @@ public abstract class AtlasBaseClient {
return configuration.getInt(AtlasBaseClient.ATLAS_CLIENT_HA_RETRIES_KEY, AtlasBaseClient.DEFAULT_NUM_RETRIES);
}
public byte[] exportData(AtlasExportRequest request) throws AtlasServiceException {
try {
return (byte[]) callAPI(EXPORT, Object.class, request);
} catch (Exception e) {
LOG.error("error writing to file", e);
throw new AtlasServiceException(e);
}
}
public void exportData(AtlasExportRequest request, String absolutePath) throws AtlasServiceException {
OutputStream fileOutputStream = null;
try {
byte[] fileBytes = exportData(request);
fileOutputStream = new FileOutputStream(new File(absolutePath));
IOUtils.write(fileBytes, fileOutputStream);
} catch (Exception e) {
LOG.error("error writing to file", e);
throw new AtlasServiceException(e);
} finally {
if(fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
LOG.error("error closing file", e);
throw new AtlasServiceException(e);
}
}
}
}
public AtlasImportResult importData(AtlasImportRequest request, String absoluteFilePath) throws AtlasServiceException {
FileDataBodyPart filePart = new FileDataBodyPart("data", new File(absoluteFilePath));
return performImportData(getImportRequestBodyPart(request),
new FileDataBodyPart(IMPORT_DATA_PARAMETER, new File(absoluteFilePath)));
}
public AtlasImportResult importData(AtlasImportRequest request, byte[] fileData) throws AtlasServiceException {
return performImportData(getImportRequestBodyPart(request),
new StreamDataBodyPart(IMPORT_DATA_PARAMETER, new ByteArrayInputStream(fileData)));
}
private AtlasImportResult performImportData(BodyPart requestPart, BodyPart filePart) throws AtlasServiceException {
MultiPart multipartEntity = new FormDataMultiPart()
.field("request", AtlasType.toJson(request), MediaType.APPLICATION_JSON_TYPE)
.bodyPart(requestPart)
.bodyPart(filePart);
return callAPI(IMPORT, AtlasImportResult.class, multipartEntity);
}
private FormDataBodyPart getImportRequestBodyPart(AtlasImportRequest request) {
return new FormDataBodyPart(IMPORT_REQUEST_PARAMTER, AtlasType.toJson(request), MediaType.APPLICATION_JSON_TYPE);
}
boolean isRetryableException(ClientHandlerException che) {
return che.getCause().getClass().equals(IOException.class)
|| che.getCause().getClass().equals(ConnectException.class);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.resources;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.clusterinfo.AtlasCluster;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.repository.impexp.ZipSource;
import org.apache.atlas.utils.TestResourceFileUtils;
import org.apache.atlas.web.integration.BaseResourceIT;
import org.testng.annotations.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
public class AdminExportImportTestIT extends BaseResourceIT {
private final String FILE_TO_IMPORT = "stocks-base.zip";
@Test
public void isActive() throws AtlasServiceException {
assertEquals(atlasClientV2.getAdminStatus(), "ACTIVE");
}
@Test(dependsOnMethods = "isActive")
public void importData() throws AtlasServiceException, IOException {
performImport(FILE_TO_IMPORT);
}
@Test(dependsOnMethods = "importData")
public void exportData() throws AtlasServiceException, IOException, AtlasBaseException {
final int EXPECTED_CREATION_ORDER_SIZE = 13;
AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(".", "export-incremental", AtlasExportRequest.class);
byte[] exportedBytes = atlasClientV2.exportData(request);
assertNotNull(exportedBytes);
ZipSource zs = new ZipSource(new ByteArrayInputStream(exportedBytes));
assertNotNull(zs.getExportResult());
assertEquals(zs.getCreationOrder().size(), EXPECTED_CREATION_ORDER_SIZE);
}
private void performImport(String fileToImport) throws AtlasServiceException {
AtlasImportRequest request = new AtlasImportRequest();
byte[] fileBytes = new byte[0];
try {
fileBytes = Files.readAllBytes(Paths.get(TestResourceFileUtils.getTestFilePath(fileToImport)));
} catch (IOException e) {
assertFalse(true, "Exception: " + e.getMessage());
}
AtlasImportResult result = atlasClientV2.importData(request, fileBytes);
assertNotNull(result);
assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS);
assertNotNull(result.getMetrics());
assertEquals(result.getProcessedEntities().size(), 37);
}
}
{
"itemsToExport": [
{
"typeName": "hive_db", "uniqueAttributes": { "qualifiedName": "stocks@cl1" }
}
],
"options": {
"fetchType": "full"
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment