Commit 8bad6b0b by Ashutosh Mestry

ATLAS-3396: ZipSourceWithBackingDirectory: Implementation. Port to master.

parent c9446eec
...@@ -82,6 +82,9 @@ ${graph.index.properties} ...@@ -82,6 +82,9 @@ ${graph.index.properties}
# Solr-specific configuration property # Solr-specific configuration property
atlas.graph.index.search.max-result-set-size=150 atlas.graph.index.search.max-result-set-size=150
######### Import Configs #########
#atlas.import.temp.directory=/temp/import
######### Notification Configs ######### ######### Notification Configs #########
atlas.notification.embedded=true atlas.notification.embedded=true
atlas.kafka.data=${sys:atlas.home}/data/kafka atlas.kafka.data=${sys:atlas.home}/data/kafka
......
...@@ -26,6 +26,7 @@ Following options are supported for Import process: ...@@ -26,6 +26,7 @@ Following options are supported for Import process:
* Specify transforms during import operation. * Specify transforms during import operation.
* Resume import by specifying starting entity guid. * Resume import by specifying starting entity guid.
* Optionally import type definition. * Optionally import type definition.
* Handling large imports.
---++++ Transforms ---++++ Transforms
...@@ -133,3 +134,13 @@ curl -g -X POST -u adminuser:password -H "Content-Type: application/json" ...@@ -133,3 +134,13 @@ curl -g -X POST -u adminuser:password -H "Content-Type: application/json"
-d r@importOptions.json -d r@importOptions.json
"http://localhost:21000/api/atlas/admin/importfile" "http://localhost:21000/api/atlas/admin/importfile"
</verbatim> </verbatim>
---++++ Handling Large Imports
By default, the Import Service stores all of the data in memory. This may be limiting for ZIPs containing large amount of data.
To configure temporary directory use the application property _atlas.import.temp.directory_. If this property is left blank, default in-memory implementation is used.
Please ensure that there is sufficient disk space available for the operation.
The contents of the directory created as backing store for the import operation will be erased after the operation is over.
...@@ -55,7 +55,9 @@ public enum AtlasConfiguration { ...@@ -55,7 +55,9 @@ public enum AtlasConfiguration {
//search configuration //search configuration
SEARCH_MAX_LIMIT("atlas.search.maxlimit", 10000), SEARCH_MAX_LIMIT("atlas.search.maxlimit", 10000),
SEARCH_DEFAULT_LIMIT("atlas.search.defaultlimit", 100); SEARCH_DEFAULT_LIMIT("atlas.search.defaultlimit", 100),
IMPORT_TEMP_DIRECTORY("atlas.import.temp.directory", "");
private static final Configuration APPLICATION_PROPERTIES; private static final Configuration APPLICATION_PROPERTIES;
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.atlas.repository.impexp; package org.apache.atlas.repository.impexp;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext; import org.apache.atlas.RequestContext;
import org.apache.atlas.entitytransform.BaseEntityHandler; import org.apache.atlas.entitytransform.BaseEntityHandler;
...@@ -27,22 +28,23 @@ import org.apache.atlas.model.impexp.AtlasImportRequest; ...@@ -27,22 +28,23 @@ import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.graph.BulkImporter; import org.apache.atlas.repository.store.graph.BulkImporter;
import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils; import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.inject.Inject; import javax.inject.Inject;
import java.io.ByteArrayInputStream;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.util.List; import java.util.List;
import static org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMERS_KEY; import static org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMERS_KEY;
...@@ -72,18 +74,25 @@ public class ImportService { ...@@ -72,18 +74,25 @@ public class ImportService {
this.importTransformsShaper = importTransformsShaper; this.importTransformsShaper = importTransformsShaper;
} }
public AtlasImportResult run(ZipSource source, String userName, public AtlasImportResult run(InputStream inputStream, String userName,
String hostName, String requestingIP) throws AtlasBaseException { String hostName, String requestingIP) throws AtlasBaseException {
return run(source, null, userName, hostName, requestingIP); return run(inputStream, null, userName, hostName, requestingIP);
} }
public AtlasImportResult run(ZipSource source, AtlasImportRequest request, String userName, public AtlasImportResult run(InputStream inputStream, AtlasImportRequest request, String userName,
String hostName, String requestingIP) throws AtlasBaseException { String hostName, String requestingIP) throws AtlasBaseException {
if (request == null) { if (request == null) {
request = new AtlasImportRequest(); request = new AtlasImportRequest();
} }
EntityImportStream source = createZipSource(inputStream, AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
return run(source, request, userName, hostName, requestingIP);
}
@VisibleForTesting
AtlasImportResult run(EntityImportStream source, AtlasImportRequest request, String userName,
String hostName, String requestingIP) throws AtlasBaseException {
AtlasImportResult result = new AtlasImportResult(request, userName, requestingIP, hostName, System.currentTimeMillis()); AtlasImportResult result = new AtlasImportResult(request, userName, requestingIP, hostName, System.currentTimeMillis());
try { try {
...@@ -112,7 +121,10 @@ public class ImportService { ...@@ -112,7 +121,10 @@ public class ImportService {
} finally { } finally {
RequestContext.get().setImportInProgress(false); RequestContext.get().setImportInProgress(false);
source.close(); if (source != null) {
source.close();
}
LOG.info("<== import(user={}, from={}): status={}", userName, requestingIP, result.getOperationStatus()); LOG.info("<== import(user={}, from={}): status={}", userName, requestingIP, result.getOperationStatus());
} }
...@@ -120,7 +132,7 @@ public class ImportService { ...@@ -120,7 +132,7 @@ public class ImportService {
} }
@VisibleForTesting @VisibleForTesting
void setImportTransform(ZipSource source, String transforms) throws AtlasBaseException { void setImportTransform(EntityImportStream source, String transforms) throws AtlasBaseException {
ImportTransforms importTransform = ImportTransforms.fromJson(transforms); ImportTransforms importTransform = ImportTransforms.fromJson(transforms);
if (importTransform == null) { if (importTransform == null) {
return; return;
...@@ -132,11 +144,10 @@ public class ImportService { ...@@ -132,11 +144,10 @@ public class ImportService {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
debugLog(" => transforms: {}", AtlasType.toJson(importTransform)); debugLog(" => transforms: {}", AtlasType.toJson(importTransform));
} }
} }
@VisibleForTesting @VisibleForTesting
void setEntityTransformerHandlers(ZipSource source, String transformersJson) throws AtlasBaseException { void setEntityTransformerHandlers(EntityImportStream source, String transformersJson) throws AtlasBaseException {
if (StringUtils.isEmpty(transformersJson)) { if (StringUtils.isEmpty(transformersJson)) {
return; return;
} }
...@@ -156,7 +167,7 @@ public class ImportService { ...@@ -156,7 +167,7 @@ public class ImportService {
LOG.debug(s, params); LOG.debug(s, params);
} }
private void setStartPosition(AtlasImportRequest request, ZipSource source) throws AtlasBaseException { private void setStartPosition(AtlasImportRequest request, EntityImportStream source) throws AtlasBaseException {
if (request.getStartGuid() != null) { if (request.getStartGuid() != null) {
source.setPositionUsingEntityGuid(request.getStartGuid()); source.setPositionUsingEntityGuid(request.getStartGuid());
} else if (request.getStartPosition() != null) { } else if (request.getStartPosition() != null) {
...@@ -164,8 +175,7 @@ public class ImportService { ...@@ -164,8 +175,7 @@ public class ImportService {
} }
} }
public AtlasImportResult run(AtlasImportRequest request, String userName, String hostName, String requestingIP) public AtlasImportResult run(AtlasImportRequest request, String userName, String hostName, String requestingIP) throws AtlasBaseException {
throws AtlasBaseException {
String fileName = request.getFileName(); String fileName = request.getFileName();
if (StringUtils.isBlank(fileName)) { if (StringUtils.isBlank(fileName)) {
...@@ -173,14 +183,11 @@ public class ImportService { ...@@ -173,14 +183,11 @@ public class ImportService {
} }
AtlasImportResult result = null; AtlasImportResult result = null;
try { try {
LOG.info("==> import(user={}, from={}, fileName={})", userName, requestingIP, fileName); LOG.info("==> import(user={}, from={}, fileName={})", userName, requestingIP, fileName);
String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(TRANSFORMS_KEY) : null;
File file = new File(fileName); File file = new File(fileName);
ZipSource source = new ZipSource(new ByteArrayInputStream(FileUtils.readFileToByteArray(file)), ImportTransforms.fromJson(transforms)); result = run(new FileInputStream(file), request, userName, hostName, requestingIP);
result = run(source, request, userName, hostName, requestingIP);
} catch (AtlasBaseException excp) { } catch (AtlasBaseException excp) {
LOG.error("import(user={}, from={}, fileName={}): failed", userName, requestingIP, excp); LOG.error("import(user={}, from={}, fileName={}): failed", userName, requestingIP, excp);
...@@ -189,10 +196,6 @@ public class ImportService { ...@@ -189,10 +196,6 @@ public class ImportService {
LOG.error("import(user={}, from={}, fileName={}): file not found", userName, requestingIP, excp); LOG.error("import(user={}, from={}, fileName={}): file not found", userName, requestingIP, excp);
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, fileName + ": file not found"); throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, fileName + ": file not found");
} catch (IOException excp) {
LOG.error("import(user={}, from={}, fileName={}): cannot read file", userName, requestingIP, excp);
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, fileName + ": cannot read file");
} catch (Exception excp) { } catch (Exception excp) {
LOG.error("import(user={}, from={}, fileName={}): failed", userName, requestingIP, excp); LOG.error("import(user={}, from={}, fileName={}): failed", userName, requestingIP, excp);
...@@ -214,7 +217,7 @@ public class ImportService { ...@@ -214,7 +217,7 @@ public class ImportService {
importTypeDefProcessor.processTypes(typeDefinitionMap, result); importTypeDefProcessor.processTypes(typeDefinitionMap, result);
} }
private void processEntities(String userName, ZipSource importSource, AtlasImportResult result) throws AtlasBaseException { private void processEntities(String userName, EntityImportStream importSource, AtlasImportResult result) throws AtlasBaseException {
result.setExportResult(importSource.getExportResult()); result.setExportResult(importSource.getExportResult());
this.bulkImporter.bulkImport(importSource, result); this.bulkImporter.bulkImport(importSource, result);
...@@ -228,4 +231,17 @@ public class ImportService { ...@@ -228,4 +231,17 @@ public class ImportService {
private int getDuration(long endTime, long startTime) { private int getDuration(long endTime, long startTime) {
return (int) (endTime - startTime); return (int) (endTime - startTime);
} }
private EntityImportStream createZipSource(InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException {
try {
if (StringUtils.isEmpty(configuredTemporaryDirectory)) {
return new ZipSource(inputStream);
}
return new ZipSourceWithBackingDirectory(inputStream, configuredTemporaryDirectory);
}
catch (IOException ex) {
throw new AtlasBaseException(ex);
}
}
} }
...@@ -82,20 +82,25 @@ public class ZipSource implements EntityImportStream { ...@@ -82,20 +82,25 @@ public class ZipSource implements EntityImportStream {
guidEntityJsonMap.get(key).equals("[]")); guidEntityJsonMap.get(key).equals("[]"));
} }
@Override
public ImportTransforms getImportTransform() { return this.importTransform; } public ImportTransforms getImportTransform() { return this.importTransform; }
@Override
public void setImportTransform(ImportTransforms importTransform) { public void setImportTransform(ImportTransforms importTransform) {
this.importTransform = importTransform; this.importTransform = importTransform;
} }
@Override
public List<BaseEntityHandler> getEntityHandlers() { public List<BaseEntityHandler> getEntityHandlers() {
return entityHandlers; return entityHandlers;
} }
@Override
public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) { public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) {
this.entityHandlers = entityHandlers; this.entityHandlers = entityHandlers;
} }
@Override
public AtlasTypesDef getTypesDef() throws AtlasBaseException { public AtlasTypesDef getTypesDef() throws AtlasBaseException {
final String fileName = ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString(); final String fileName = ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString();
...@@ -103,6 +108,7 @@ public class ZipSource implements EntityImportStream { ...@@ -103,6 +108,7 @@ public class ZipSource implements EntityImportStream {
return convertFromJson(AtlasTypesDef.class, s); return convertFromJson(AtlasTypesDef.class, s);
} }
@Override
public AtlasExportResult getExportResult() throws AtlasBaseException { public AtlasExportResult getExportResult() throws AtlasBaseException {
final String fileName = ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString(); final String fileName = ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString();
...@@ -147,6 +153,7 @@ public class ZipSource implements EntityImportStream { ...@@ -147,6 +153,7 @@ public class ZipSource implements EntityImportStream {
zipInputStream.close(); zipInputStream.close();
} }
@Override
public List<String> getCreationOrder() { public List<String> getCreationOrder() {
return this.creationOrder; return this.creationOrder;
} }
...@@ -210,6 +217,7 @@ public class ZipSource implements EntityImportStream { ...@@ -210,6 +217,7 @@ public class ZipSource implements EntityImportStream {
return s; return s;
} }
@Override
public void close() { public void close() {
try { try {
inputStream.close(); inputStream.close();
...@@ -284,7 +292,7 @@ public class ZipSource implements EntityImportStream { ...@@ -284,7 +292,7 @@ public class ZipSource implements EntityImportStream {
currentPosition = index; currentPosition = index;
reset(); reset();
for (int i = 0; i < creationOrder.size() && i <= index; i++) { for (int i = 0; i < creationOrder.size() && i <= index; i++) {
iterator.next(); onImportComplete(iterator.next());
} }
} }
......
...@@ -17,8 +17,15 @@ ...@@ -17,8 +17,15 @@
*/ */
package org.apache.atlas.repository.store.graph.v2; package org.apache.atlas.repository.store.graph.v2;
import org.apache.atlas.entitytransform.BaseEntityHandler;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.impexp.ImportTransforms;
import java.util.List;
public class AtlasEntityStreamForImport extends AtlasEntityStream implements EntityImportStream { public class AtlasEntityStreamForImport extends AtlasEntityStream implements EntityImportStream {
private int currentPosition = 0; private int currentPosition = 0;
...@@ -36,6 +43,11 @@ public class AtlasEntityStreamForImport extends AtlasEntityStream implements Ent ...@@ -36,6 +43,11 @@ public class AtlasEntityStreamForImport extends AtlasEntityStream implements Ent
} }
@Override @Override
public AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException {
return null;
}
@Override
public AtlasEntity getByGuid(String guid) { public AtlasEntity getByGuid(String guid) {
AtlasEntity ent = super.entitiesWithExtInfo.getEntity(guid); AtlasEntity ent = super.entitiesWithExtInfo.getEntity(guid);
...@@ -69,4 +81,44 @@ public class AtlasEntityStreamForImport extends AtlasEntityStream implements Ent ...@@ -69,4 +81,44 @@ public class AtlasEntityStreamForImport extends AtlasEntityStream implements Ent
public void onImportComplete(String guid) { public void onImportComplete(String guid) {
} }
@Override
public void setImportTransform(ImportTransforms importTransform) {
}
@Override
public ImportTransforms getImportTransform() {
return null;
}
@Override
public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) {
}
@Override
public List<BaseEntityHandler> getEntityHandlers() {
return null;
}
@Override
public AtlasTypesDef getTypesDef() throws AtlasBaseException {
return null;
}
@Override
public AtlasExportResult getExportResult() throws AtlasBaseException {
return null;
}
@Override
public List<String> getCreationOrder() {
return null;
}
@Override
public void close() {
}
} }
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.atlas.repository.store.graph.v2; package org.apache.atlas.repository.store.graph.v2;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext; import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.annotation.GraphTransaction;
...@@ -57,12 +58,14 @@ public class BulkImporterImpl implements BulkImporter { ...@@ -57,12 +58,14 @@ public class BulkImporterImpl implements BulkImporter {
private final EntityGraphRetriever entityGraphRetriever; private final EntityGraphRetriever entityGraphRetriever;
private AtlasTypeRegistry typeRegistry; private AtlasTypeRegistry typeRegistry;
private final int MAX_ATTEMPTS = 2; private final int MAX_ATTEMPTS = 2;
private boolean directoryBasedImportConfigured;
@Inject @Inject
public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) { public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
this.entityStore = entityStore; this.entityStore = entityStore;
this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry); this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.directoryBasedImportConfigured = StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
} }
@Override @Override
...@@ -205,9 +208,11 @@ public class BulkImporterImpl implements BulkImporter { ...@@ -205,9 +208,11 @@ public class BulkImporterImpl implements BulkImporter {
AtlasImportResult importResult, AtlasImportResult importResult,
Set<String> processedGuids, Set<String> processedGuids,
int currentIndex, int streamSize, float currentPercent) { int currentIndex, int streamSize, float currentPercent) {
updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult); if (!directoryBasedImportConfigured) {
updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult); updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult); updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
}
String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid()); String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid());
......
...@@ -18,17 +18,45 @@ ...@@ -18,17 +18,45 @@
package org.apache.atlas.repository.store.graph.v2; package org.apache.atlas.repository.store.graph.v2;
import org.apache.atlas.entitytransform.BaseEntityHandler;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.impexp.ImportTransforms;
import java.util.List;
public interface EntityImportStream extends EntityStream { public interface EntityImportStream extends EntityStream {
int size(); int size();
void setPosition(int position); void setPosition(int position);
int getPosition(); int getPosition();
void setPositionUsingEntityGuid(String guid); void setPositionUsingEntityGuid(String guid);
AtlasEntityWithExtInfo getNextEntityWithExtInfo(); AtlasEntityWithExtInfo getNextEntityWithExtInfo();
AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException;
void onImportComplete(String guid); void onImportComplete(String guid);
void setImportTransform(ImportTransforms importTransform);
public ImportTransforms getImportTransform();
void setEntityHandlers(List<BaseEntityHandler> entityHandlers);
List<BaseEntityHandler> getEntityHandlers();
AtlasTypesDef getTypesDef() throws AtlasBaseException;
AtlasExportResult getExportResult() throws AtlasBaseException;
List<String> getCreationOrder();
void close();
} }
...@@ -33,16 +33,20 @@ import org.apache.atlas.store.AtlasTypeDefStore; ...@@ -33,16 +33,20 @@ import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasClassificationType; import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.TestResourceFileUtils; import org.apache.atlas.utils.TestResourceFileUtils;
import org.apache.commons.io.IOUtils;
import org.testng.ITestContext; import org.testng.ITestContext;
import org.testng.SkipException; import org.testng.SkipException;
import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Guice; import org.testng.annotations.Guice;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import org.testng.annotations.DataProvider;
import javax.inject.Inject; import javax.inject.Inject;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
...@@ -52,8 +56,8 @@ import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREM ...@@ -52,8 +56,8 @@ import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREM
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createTypes; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createTypes;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getEntities; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getEntities;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertTrue;
...@@ -107,11 +111,13 @@ public class ExportIncrementalTest extends ExportImportTestBase { ...@@ -107,11 +111,13 @@ public class ExportIncrementalTest extends ExportImportTestBase {
} }
@Test @Test
public void atT0_ReturnsAllEntities() throws AtlasBaseException { public void atT0_ReturnsAllEntities() throws AtlasBaseException, IOException {
final int expectedEntityCount = 2; final int expectedEntityCount = 2;
AtlasExportRequest request = getIncrementalRequest(0); AtlasExportRequest request = getIncrementalRequest(0);
ZipSource source = runExportWithParameters(exportService, request); InputStream inputStream = runExportWithParameters(exportService, request);
ZipSource source = getZipSourceFromInputStream(inputStream);
AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount); AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount);
int count = 0; int count = 0;
...@@ -129,13 +135,15 @@ public class ExportIncrementalTest extends ExportImportTestBase { ...@@ -129,13 +135,15 @@ public class ExportIncrementalTest extends ExportImportTestBase {
} }
@Test(dependsOnMethods = "atT0_ReturnsAllEntities") @Test(dependsOnMethods = "atT0_ReturnsAllEntities")
public void atT1_NewClassificationAttachedToTable_ReturnsChangedTable() throws AtlasBaseException { public void atT1_NewClassificationAttachedToTable_ReturnsChangedTable() throws AtlasBaseException, IOException {
final int expectedEntityCount = 1; final int expectedEntityCount = 1;
entityStore.addClassifications(TABLE_GUID, ImmutableList.of(classificationTypeT1.createDefaultValue())); entityStore.addClassifications(TABLE_GUID, ImmutableList.of(classificationTypeT1.createDefaultValue()));
AtlasExportRequest request = getIncrementalRequest(nextTimestamp); AtlasExportRequest request = getIncrementalRequest(nextTimestamp);
ZipSource source = runExportWithParameters(exportService, request); InputStream inputStream = runExportWithParameters(exportService, request);
ZipSource source = getZipSourceFromInputStream(inputStream);
AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount); AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount);
AtlasEntity entity = null; AtlasEntity entity = null;
...@@ -155,7 +163,7 @@ public class ExportIncrementalTest extends ExportImportTestBase { ...@@ -155,7 +163,7 @@ public class ExportIncrementalTest extends ExportImportTestBase {
} }
@Test(dependsOnMethods = "atT1_NewClassificationAttachedToTable_ReturnsChangedTable") @Test(dependsOnMethods = "atT1_NewClassificationAttachedToTable_ReturnsChangedTable")
public void atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn() throws AtlasBaseException { public void atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn() throws AtlasBaseException, IOException {
final int expectedEntityCount = 1; final int expectedEntityCount = 1;
AtlasEntity.AtlasEntityWithExtInfo tableEntity = entityStore.getById(TABLE_GUID); AtlasEntity.AtlasEntityWithExtInfo tableEntity = entityStore.getById(TABLE_GUID);
...@@ -163,7 +171,9 @@ public class ExportIncrementalTest extends ExportImportTestBase { ...@@ -163,7 +171,9 @@ public class ExportIncrementalTest extends ExportImportTestBase {
entityStore.addClassifications(COLUMN_GUID_HIGH, ImmutableList.of(typeRegistry.getClassificationTypeByName("T1").createDefaultValue())); entityStore.addClassifications(COLUMN_GUID_HIGH, ImmutableList.of(typeRegistry.getClassificationTypeByName("T1").createDefaultValue()));
ZipSource source = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp)); InputStream inputStream = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp));
ZipSource source = getZipSourceFromInputStream(inputStream);
AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount); AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount);
for (Map.Entry<String, AtlasEntity> entry : entities.getReferredEntities().entrySet()) { for (Map.Entry<String, AtlasEntity> entry : entities.getReferredEntities().entrySet()) {
...@@ -176,17 +186,26 @@ public class ExportIncrementalTest extends ExportImportTestBase { ...@@ -176,17 +186,26 @@ public class ExportIncrementalTest extends ExportImportTestBase {
assertEquals(preExportTableEntityTimestamp, postUpdateTableEntityTimestamp); assertEquals(preExportTableEntityTimestamp, postUpdateTableEntityTimestamp);
} }
private ZipSource getZipSourceFromInputStream(InputStream inputStream) {
try {
return new ZipSource(inputStream);
} catch (IOException | AtlasBaseException e) {
return null;
}
}
@Test(dependsOnMethods = "atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn") @Test(dependsOnMethods = "atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn")
public void exportingWithSameParameters_Succeeds() { public void exportingWithSameParameters_Succeeds() {
ZipSource source = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp)); InputStream inputStream = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp));
assertNotNull(source); assertNotNull(getZipSourceFromInputStream(inputStream));
} }
@Test @Test
public void connectedExport() { public void connectedExport() {
ZipSource source = runExportWithParameters(exportService, getConnected()); InputStream inputStream = runExportWithParameters(exportService, getConnected());
ZipSource source = getZipSourceFromInputStream(inputStream);
UniqueList<String> creationOrder = new UniqueList<>(); UniqueList<String> creationOrder = new UniqueList<>();
List<String> zipCreationOrder = source.getCreationOrder(); List<String> zipCreationOrder = source.getCreationOrder();
creationOrder.addAll(zipCreationOrder); creationOrder.addAll(zipCreationOrder);
...@@ -200,27 +219,29 @@ public class ExportIncrementalTest extends ExportImportTestBase { ...@@ -200,27 +219,29 @@ public class ExportIncrementalTest extends ExportImportTestBase {
} }
@Test(dataProvider = "hiveDb") @Test(dataProvider = "hiveDb")
public void importHiveDb(ZipSource zipSource) throws AtlasBaseException, IOException { public void importHiveDb(InputStream stream) throws AtlasBaseException, IOException {
runImportWithNoParameters(importService, zipSource); runImportWithNoParameters(importService, stream);
} }
@Test(dependsOnMethods = "importHiveDb") @Test(dependsOnMethods = "importHiveDb")
public void exportTableInrementalConnected() throws AtlasBaseException { public void exportTableInrementalConnected() throws AtlasBaseException, IOException {
ZipSource source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, 0, true)); InputStream source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, 0, true));
verifyExpectedEntities(getFileNames(source), GUID_DB, GUID_TABLE_CTAS_2);
nextTimestamp = updateTimesampForNextIncrementalExport(source); ZipSource sourceCopy = getZipSourceCopy(source);
verifyExpectedEntities(getFileNames(sourceCopy), GUID_DB, GUID_TABLE_CTAS_2);
nextTimestamp = updateTimesampForNextIncrementalExport(sourceCopy);
try { try {
source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, nextTimestamp, true)); source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, nextTimestamp, true));
}catch (SkipException e){ } catch (SkipException e) {
throw e;
} }
entityStore.addClassifications(GUID_TABLE_CTAS_2, ImmutableList.of(classificationTypeT1.createDefaultValue())); entityStore.addClassifications(GUID_TABLE_CTAS_2, ImmutableList.of(classificationTypeT1.createDefaultValue()));
source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, nextTimestamp, true)); source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, nextTimestamp, true));
verifyExpectedEntities(getFileNames(source), GUID_TABLE_CTAS_2); verifyExpectedEntities(getFileNames(getZipSourceCopy(source)), GUID_TABLE_CTAS_2);
} }
...@@ -281,4 +302,11 @@ public class ExportIncrementalTest extends ExportImportTestBase { ...@@ -281,4 +302,11 @@ public class ExportIncrementalTest extends ExportImportTestBase {
} }
return ret; return ret;
} }
private ZipSource getZipSourceCopy(InputStream is) throws IOException, AtlasBaseException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
IOUtils.copy(is, baos);
return new ZipSource(new ByteArrayInputStream(baos.toByteArray()));
}
} }
...@@ -40,6 +40,7 @@ import org.testng.annotations.Test; ...@@ -40,6 +40,7 @@ import org.testng.annotations.Test;
import javax.inject.Inject; import javax.inject.Inject;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.util.Map; import java.util.Map;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel;
...@@ -87,11 +88,13 @@ public class ExportSkipLineageTest extends ExportImportTestBase { ...@@ -87,11 +88,13 @@ public class ExportSkipLineageTest extends ExportImportTestBase {
} }
@Test @Test
public void exportWithoutLineage() { public void exportWithoutLineage() throws IOException, AtlasBaseException {
final int expectedEntityCount = 3; final int expectedEntityCount = 3;
AtlasExportRequest request = getRequest(); AtlasExportRequest request = getRequest();
ZipSource source = runExportWithParameters(exportService, request); InputStream inputStream = runExportWithParameters(exportService, request);
ZipSource source = new ZipSource(inputStream);
AtlasEntity.AtlasEntityWithExtInfo entities = ZipFileResourceTestUtils.getEntities(source, expectedEntityCount); AtlasEntity.AtlasEntityWithExtInfo entities = ZipFileResourceTestUtils.getEntities(source, expectedEntityCount);
int count = 0; int count = 0;
......
...@@ -66,7 +66,7 @@ public class ImportTransformsShaperTest extends ExportImportTestBase { ...@@ -66,7 +66,7 @@ public class ImportTransformsShaperTest extends ExportImportTestBase {
public void newTagIsCreatedAndEntitiesAreTagged() throws AtlasBaseException, IOException { public void newTagIsCreatedAndEntitiesAreTagged() throws AtlasBaseException, IOException {
AtlasImportResult result = ZipFileResourceTestUtils.runImportWithParameters(importService, AtlasImportResult result = ZipFileResourceTestUtils.runImportWithParameters(importService,
getImporRequest(), getImporRequest(),
ZipFileResourceTestUtils.getZipSourceFrom("stocks.zip")); ZipFileResourceTestUtils.getInputStreamFrom("stocks.zip"));
AtlasClassificationType classification = typeRegistry.getClassificationTypeByName(TAG_NAME); AtlasClassificationType classification = typeRegistry.getClassificationTypeByName(TAG_NAME);
assertNotNull(classification); assertNotNull(classification);
......
...@@ -41,6 +41,7 @@ import javax.inject.Inject; ...@@ -41,6 +41,7 @@ import javax.inject.Inject;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -106,8 +107,8 @@ public class RelationshipAttributesExtractorTest { ...@@ -106,8 +107,8 @@ public class RelationshipAttributesExtractorTest {
} }
@Test(dataProvider = "hiveDb") @Test(dataProvider = "hiveDb")
public void importHiveDb(ZipSource zipSource) throws AtlasBaseException, IOException { public void importHiveDb(InputStream inputStream) throws AtlasBaseException, IOException {
runImportWithNoParameters(importService, zipSource); runImportWithNoParameters(importService, inputStream);
} }
@Test(dependsOnMethods = "importHiveDb") @Test(dependsOnMethods = "importHiveDb")
......
...@@ -24,10 +24,10 @@ import org.apache.atlas.RequestContext; ...@@ -24,10 +24,10 @@ import org.apache.atlas.RequestContext;
import org.apache.atlas.TestModules; import org.apache.atlas.TestModules;
import org.apache.atlas.TestUtilsV2; import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasServer;
import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.impexp.AtlasServer;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasEntityDef; import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.Constants;
...@@ -40,6 +40,7 @@ import org.apache.atlas.type.AtlasEntityType; ...@@ -40,6 +40,7 @@ import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.TestResourceFileUtils; import org.apache.atlas.utils.TestResourceFileUtils;
import org.apache.commons.io.IOUtils;
import org.testng.SkipException; import org.testng.SkipException;
import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeMethod;
...@@ -47,7 +48,10 @@ import org.testng.annotations.Guice; ...@@ -47,7 +48,10 @@ import org.testng.annotations.Guice;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import javax.inject.Inject; import javax.inject.Inject;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.util.List; import java.util.List;
import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_KEY_REPLICATED_TO; import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_KEY_REPLICATED_TO;
...@@ -88,7 +92,7 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase { ...@@ -88,7 +92,7 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
@Inject @Inject
private AtlasEntityStoreV2 entityStore; private AtlasEntityStoreV2 entityStore;
private ZipSource zipSource; private InputStream inputStream;
@BeforeClass @BeforeClass
public void setup() throws IOException, AtlasBaseException { public void setup() throws IOException, AtlasBaseException {
...@@ -107,13 +111,19 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase { ...@@ -107,13 +111,19 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
} }
@Test @Test
public void exportWithReplicationToOption_AddsClusterObjectIdToReplicatedFromAttribute() throws AtlasBaseException { public void exportWithReplicationToOption_AddsClusterObjectIdToReplicatedFromAttribute() throws AtlasBaseException, IOException {
final int expectedEntityCount = 2; final int expectedEntityCount = 2;
AtlasExportRequest request = getUpdateMetaInfoUpdateRequest(); AtlasExportRequest request = getUpdateMetaInfoUpdateRequest();
zipSource = runExportWithParameters(exportService, request); InputStream inputStream = runExportWithParameters(exportService, request);
assertNotNull(inputStream);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
IOUtils.copy(inputStream, baos);
this.inputStream = new ByteArrayInputStream(baos.toByteArray());
assertNotNull(zipSource); ZipSource zipSource = new ZipSource(new ByteArrayInputStream(baos.toByteArray()));
assertNotNull(zipSource.getCreationOrder()); assertNotNull(zipSource.getCreationOrder());
assertEquals(zipSource.getCreationOrder().size(), expectedEntityCount); assertEquals(zipSource.getCreationOrder().size(), expectedEntityCount);
...@@ -139,7 +149,7 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase { ...@@ -139,7 +149,7 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
@Test(dependsOnMethods = "exportWithReplicationToOption_AddsClusterObjectIdToReplicatedFromAttribute") @Test(dependsOnMethods = "exportWithReplicationToOption_AddsClusterObjectIdToReplicatedFromAttribute")
public void importWithReplicationFromOption_AddsClusterObjectIdToReplicatedFromAttribute() throws AtlasBaseException, IOException { public void importWithReplicationFromOption_AddsClusterObjectIdToReplicatedFromAttribute() throws AtlasBaseException, IOException {
AtlasImportRequest request = getImportRequestWithReplicationOption(); AtlasImportRequest request = getImportRequestWithReplicationOption();
AtlasImportResult importResult = runImportWithParameters(importService, request, zipSource); AtlasImportResult importResult = runImportWithParameters(importService, request, inputStream);
assertCluster( assertCluster(
AuditsWriter.getServerNameFromFullName(REPLICATED_FROM_CLUSTER_NAME), AuditsWriter.getServerNameFromFullName(REPLICATED_FROM_CLUSTER_NAME),
......
...@@ -33,12 +33,14 @@ import org.apache.atlas.model.typedef.AtlasTypesDef; ...@@ -33,12 +33,14 @@ import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer; import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2; import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport; import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport;
import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasJson; import org.apache.atlas.utils.AtlasJson;
import org.apache.atlas.utils.TestResourceFileUtils; import org.apache.atlas.utils.TestResourceFileUtils;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -49,6 +51,8 @@ import java.io.ByteArrayOutputStream; ...@@ -49,6 +51,8 @@ import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
...@@ -151,19 +155,11 @@ public class ZipFileResourceTestUtils { ...@@ -151,19 +155,11 @@ public class ZipFileResourceTestUtils {
} }
public static Object[][] getZipSource(String fileName) throws IOException, AtlasBaseException { public static Object[][] getZipSource(String fileName) throws IOException, AtlasBaseException {
return new Object[][]{{getZipSourceFrom(fileName)}}; return new Object[][]{{getInputStreamFrom(fileName)}};
} }
public static ZipSource getZipSourceFrom(String fileName) throws IOException, AtlasBaseException { public static InputStream getInputStreamFrom(String fileName) {
FileInputStream fs = ZipFileResourceTestUtils.getFileInputStream(fileName); return ZipFileResourceTestUtils.getFileInputStream(fileName);
return new ZipSource(fs);
}
private static ZipSource getZipSourceFrom(ByteArrayOutputStream baos) throws IOException, AtlasBaseException {
ByteArrayInputStream bis = new ByteArrayInputStream(baos.toByteArray());
ZipSource zipSource = new ZipSource(bis);
return zipSource;
} }
public static void verifyImportedEntities(List<String> creationOrder, List<String> processedEntities) { public static void verifyImportedEntities(List<String> creationOrder, List<String> processedEntities) {
...@@ -224,7 +220,7 @@ public class ZipFileResourceTestUtils { ...@@ -224,7 +220,7 @@ public class ZipFileResourceTestUtils {
} }
} }
public static ZipSource runExportWithParameters(ExportService exportService, AtlasExportRequest request) { public static InputStream runExportWithParameters(ExportService exportService, AtlasExportRequest request) {
final String requestingIP = "1.0.0.0"; final String requestingIP = "1.0.0.0";
final String hostName = "localhost"; final String hostName = "localhost";
final String userName = "admin"; final String userName = "admin";
...@@ -237,7 +233,7 @@ public class ZipFileResourceTestUtils { ...@@ -237,7 +233,7 @@ public class ZipFileResourceTestUtils {
assertEquals(result.getOperationStatus(), AtlasExportResult.OperationStatus.SUCCESS); assertEquals(result.getOperationStatus(), AtlasExportResult.OperationStatus.SUCCESS);
zipSink.close(); zipSink.close();
return getZipSourceFrom(baos); return new ByteArrayInputStream(baos.toByteArray());
} }
catch(Exception ex) { catch(Exception ex) {
throw new SkipException(String.format("runExportWithParameters: %s: failed!", request.toString())); throw new SkipException(String.format("runExportWithParameters: %s: failed!", request.toString()));
...@@ -325,27 +321,42 @@ public class ZipFileResourceTestUtils { ...@@ -325,27 +321,42 @@ public class ZipFileResourceTestUtils {
} }
public static AtlasImportResult runImportWithParameters(ImportService importService, AtlasImportRequest request, ZipSource source) throws AtlasBaseException, IOException { public static AtlasImportResult runImportWithParameters(ImportService importService, AtlasImportRequest request, InputStream inputStream) throws AtlasBaseException, IOException {
final String requestingIP = "1.0.0.0";
final String hostName = "localhost";
final String userName = "admin";
AtlasImportResult result = importService.run(inputStream, request, userName, hostName, requestingIP);
assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS);
return result;
}
public static AtlasImportResult runImportWithNoParameters(ImportService importService, InputStream inputStream) throws AtlasBaseException, IOException {
final String requestingIP = "1.0.0.0"; final String requestingIP = "1.0.0.0";
final String hostName = "localhost"; final String hostName = "localhost";
final String userName = "admin"; final String userName = "admin";
AtlasImportResult result = importService.run(source, request, userName, hostName, requestingIP); AtlasImportResult result = importService.run(inputStream, userName, hostName, requestingIP);
assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS); assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS);
return result; return result;
} }
public static AtlasImportResult runImportWithNoParameters(ImportService importService, ZipSource source) throws AtlasBaseException, IOException { public static AtlasImportResult runImportWithNoParametersUsingBackingDirectory(ImportService importService, InputStream inputStream) throws AtlasBaseException, IOException {
final String requestingIP = "1.0.0.0"; final String requestingIP = "1.0.0.0";
final String hostName = "localhost"; final String hostName = "localhost";
final String userName = "admin"; final String userName = "admin";
AtlasImportResult result = importService.run(source, userName, hostName, requestingIP); EntityImportStream sourceWithBackingDirectory = new ZipSourceWithBackingDirectory(inputStream, Files.createTempDirectory("temp").toString());
AtlasImportResult result = importService.run(sourceWithBackingDirectory, new AtlasImportRequest(), userName, hostName, requestingIP);
assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS); assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS);
return result; return result;
} }
public static void runAndVerifyQuickStart_v1_Import(ImportService importService, ZipSource zipSource) throws AtlasBaseException, IOException { public static void runAndVerifyQuickStart_v1_Import(ImportService importService, InputStream is) throws AtlasBaseException, IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
IOUtils.copy(is, baos);
ZipSource zipSource = new ZipSource(new ByteArrayInputStream(baos.toByteArray()));
AtlasExportResult exportResult = zipSource.getExportResult(); AtlasExportResult exportResult = zipSource.getExportResult();
List<String> creationOrder = zipSource.getCreationOrder(); List<String> creationOrder = zipSource.getCreationOrder();
...@@ -353,7 +364,7 @@ public class ZipFileResourceTestUtils { ...@@ -353,7 +364,7 @@ public class ZipFileResourceTestUtils {
RequestContext.get().setUser(TestUtilsV2.TEST_USER, null); RequestContext.get().setUser(TestUtilsV2.TEST_USER, null);
AtlasImportRequest request = getDefaultImportRequest(); AtlasImportRequest request = getDefaultImportRequest();
AtlasImportResult result = runImportWithParameters(importService, request, zipSource); AtlasImportResult result = runImportWithParameters(importService, request, new ByteArrayInputStream(baos.toByteArray()));
assertNotNull(result); assertNotNull(result);
verifyImportedMetrics(exportResult, result); verifyImportedMetrics(exportResult, result);
......
...@@ -28,6 +28,7 @@ import org.testng.annotations.Test; ...@@ -28,6 +28,7 @@ import org.testng.annotations.Test;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.util.List; import java.util.List;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
...@@ -127,7 +128,8 @@ public class ZipSourceTest { ...@@ -127,7 +128,8 @@ public class ZipSourceTest {
} }
@Test(dataProvider = "sales") @Test(dataProvider = "sales")
public void iteratorSetPositionBehavor(ZipSource zipSource) throws IOException, AtlasBaseException { public void iteratorSetPositionBehavor(InputStream inputStream) throws IOException, AtlasBaseException {
ZipSource zipSource = new ZipSource(inputStream);
Assert.assertTrue(zipSource.hasNext()); Assert.assertTrue(zipSource.hasNext());
List<String> creationOrder = zipSource.getCreationOrder(); List<String> creationOrder = zipSource.getCreationOrder();
......
...@@ -51,6 +51,7 @@ import org.testng.annotations.Test; ...@@ -51,6 +51,7 @@ import org.testng.annotations.Test;
import javax.inject.Inject; import javax.inject.Inject;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
...@@ -601,9 +602,8 @@ public class ClassificationPropagationTest { ...@@ -601,9 +602,8 @@ public class ClassificationPropagationTest {
} }
} }
public static ZipSource getZipSource(String fileName) throws IOException, AtlasBaseException { public static InputStream getZipSource(String fileName) throws IOException {
FileInputStream fs = ZipFileResourceTestUtils.getFileInputStream(fileName); return ZipFileResourceTestUtils.getFileInputStream(fileName);
return new ZipSource(fs);
} }
private void loadSampleClassificationDefs() throws AtlasBaseException { private void loadSampleClassificationDefs() throws AtlasBaseException {
......
...@@ -39,6 +39,7 @@ import org.testng.annotations.Test; ...@@ -39,6 +39,7 @@ import org.testng.annotations.Test;
import javax.inject.Inject; import javax.inject.Inject;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.time.Clock; import java.time.Clock;
import java.time.Instant; import java.time.Instant;
import java.time.ZoneId; import java.time.ZoneId;
...@@ -246,9 +247,8 @@ public class MetricsServiceTest { ...@@ -246,9 +247,8 @@ public class MetricsServiceTest {
} }
} }
public static ZipSource getZipSource(String fileName) throws IOException, AtlasBaseException { public static InputStream getZipSource(String fileName) throws AtlasBaseException {
FileInputStream fs = ZipFileResourceTestUtils.getFileInputStream(fileName); return ZipFileResourceTestUtils.getFileInputStream(fileName);
return new ZipSource(fs);
} }
private static class TestClock extends Clock { private static class TestClock extends Clock {
......
...@@ -45,7 +45,6 @@ import org.apache.atlas.repository.impexp.ExportService; ...@@ -45,7 +45,6 @@ import org.apache.atlas.repository.impexp.ExportService;
import org.apache.atlas.repository.impexp.ImportService; import org.apache.atlas.repository.impexp.ImportService;
import org.apache.atlas.repository.impexp.MigrationProgressService; import org.apache.atlas.repository.impexp.MigrationProgressService;
import org.apache.atlas.repository.impexp.ZipSink; import org.apache.atlas.repository.impexp.ZipSink;
import org.apache.atlas.repository.impexp.ZipSource;
import org.apache.atlas.repository.patches.AtlasPatchManager; import org.apache.atlas.repository.patches.AtlasPatchManager;
import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.services.MetricsService; import org.apache.atlas.services.MetricsService;
...@@ -404,9 +403,8 @@ public class AdminResource { ...@@ -404,9 +403,8 @@ public class AdminResource {
try { try {
AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class); AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class);
ZipSource zipSource = new ZipSource(inputStream);
result = importService.run(zipSource, request, AtlasAuthorizationUtils.getCurrentUserName(), result = importService.run(inputStream, request, Servlets.getUserName(httpServletRequest),
Servlets.getHostName(httpServletRequest), Servlets.getHostName(httpServletRequest),
AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest)); AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest));
} catch (AtlasBaseException excp) { } catch (AtlasBaseException excp) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment