Commit 9cdc31d5 by Ashutosh Mestry

ATLAS-4068: Export/Import: Conditionally Support Simpultaneous Operations.

parent ff55b0a1
......@@ -22,7 +22,6 @@ import com.sun.jersey.multipart.FormDataParam;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.authorize.AtlasAdminAccessRequest;
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.authorize.AtlasEntityAccessRequest;
......@@ -102,7 +101,6 @@ import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
......@@ -392,7 +390,12 @@ public class AdminResource {
AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_EXPORT), "export");
acquireExportImportLock("export");
boolean preventMultipleRequests = request != null
&& !(request.getOptions().containsKey(AtlasExportRequest.OPTION_SKIP_LINEAGE)
|| request.getOptions().containsKey(AtlasExportRequest.OPTION_KEY_REPLICATED_TO));
if (preventMultipleRequests) {
acquireExportImportLock("export");
}
ZipSink exportSink = null;
boolean isSuccessful = false;
......@@ -419,22 +422,15 @@ public class AdminResource {
throw new AtlasBaseException(excp);
} finally {
releaseExportImportLock();
if (preventMultipleRequests) {
releaseExportImportLock();
}
if (exportSink != null) {
exportSink.close();
}
if (isSuccessful && CollectionUtils.isNotEmpty(result.getRequest().getItemsToExport())) {
Map<String, Object> optionMap = result.getRequest().getOptions();
optionMap.put(OPERATION_STATUS, result.getOperationStatus().name());
String params = AtlasJson.toJson(optionMap);
List<AtlasObjectId> objectIds = result.getRequest().getItemsToExport();
auditImportExportOperations(objectIds, AuditOperation.EXPORT, params);
}
addToExportOperationAudits(isSuccessful, result);
if (LOG.isDebugEnabled()) {
LOG.debug("<== AdminResource.export()");
......@@ -454,11 +450,15 @@ public class AdminResource {
AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "importData");
acquireExportImportLock("import");
AtlasImportResult result = null;
boolean preventMultipleRequests = true;
try {
AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class);
preventMultipleRequests = request != null && !request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM);
if (preventMultipleRequests) {
acquireExportImportLock("import");
}
result = importService.run(inputStream, request, Servlets.getUserName(httpServletRequest),
Servlets.getHostName(httpServletRequest),
......@@ -477,20 +477,16 @@ public class AdminResource {
throw new AtlasBaseException(excp);
} finally {
releaseExportImportLock();
if (preventMultipleRequests) {
releaseExportImportLock();
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== AdminResource.importData(binary)");
}
}
List<AtlasObjectId> objectIds = result.getExportResult().getRequest().getItemsToExport();
Map<String, Object> optionMap = new HashMap<>();
optionMap.put(OPERATION_STATUS, result.getOperationStatus().name());
String params = AtlasJson.toJson(optionMap);
auditImportExportOperations(objectIds, AuditOperation.IMPORT, params);
addToImportOperationAudits(result);
return result;
}
......@@ -536,13 +532,17 @@ public class AdminResource {
}
AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "importFile");
acquireExportImportLock("importFile");
boolean preventMultipleRequests = true;
AtlasImportResult result;
try {
AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class);
preventMultipleRequests = request != null && request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM);
if (preventMultipleRequests) {
acquireExportImportLock("importFile");
}
result = importService.run(request, AtlasAuthorizationUtils.getCurrentUserName(),
Servlets.getHostName(httpServletRequest),
AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest));
......@@ -559,7 +559,9 @@ public class AdminResource {
throw new AtlasBaseException(excp);
} finally {
releaseExportImportLock();
if (preventMultipleRequests) {
releaseExportImportLock();
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== AdminResource.importFile()");
......@@ -770,6 +772,29 @@ public class AdminResource {
importExportOperationLock.lock();
}
private void addToImportOperationAudits(AtlasImportResult result) throws AtlasBaseException {
List<AtlasObjectId> objectIds = result.getExportResult().getRequest().getItemsToExport();
Map<String, Object> optionMap = new HashMap<>();
optionMap.put(OPERATION_STATUS, result.getOperationStatus().name());
String params = AtlasJson.toJson(optionMap);
auditImportExportOperations(objectIds, AuditOperation.IMPORT, params);
}
private void addToExportOperationAudits(boolean isSuccessful, AtlasExportResult result) throws AtlasBaseException {
if (isSuccessful && CollectionUtils.isNotEmpty(result.getRequest().getItemsToExport())) {
Map<String, Object> optionMap = result.getRequest().getOptions();
optionMap.put(OPERATION_STATUS, result.getOperationStatus().name());
String params = AtlasJson.toJson(optionMap);
List<AtlasObjectId> objectIds = result.getRequest().getItemsToExport();
auditImportExportOperations(objectIds, AuditOperation.EXPORT, params);
}
}
private void auditImportExportOperations(List<AtlasObjectId> objectIds, AuditOperation auditOperation, String params) throws AtlasBaseException {
Map<String, Long> entityCountByType = objectIds.stream().collect(Collectors.groupingBy(AtlasObjectId::getTypeName, Collectors.counting()));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment