Commit 024f5d52 by nixonrodrigues

Revert "ATLAS-3320: Import Service. Support concurrent ingest."

This reverts commit a2ccfb9f.
parent b02443ec
...@@ -116,7 +116,7 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE ...@@ -116,7 +116,7 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE
} }
} }
janusGraph = (StandardJanusGraph) graphInstance; janusGraph = (StandardJanusGraph) AtlasJanusGraphDatabase.getGraphInstance();
} }
@Override @Override
......
...@@ -64,7 +64,6 @@ public enum AtlasConfiguration { ...@@ -64,7 +64,6 @@ public enum AtlasConfiguration {
CUSTOM_ATTRIBUTE_VALUE_MAX_LENGTH("atlas.custom.attribute.value.max.length", 500), CUSTOM_ATTRIBUTE_VALUE_MAX_LENGTH("atlas.custom.attribute.value.max.length", 500),
LABEL_MAX_LENGTH("atlas.entity.label.max.length", 50), LABEL_MAX_LENGTH("atlas.entity.label.max.length", 50),
IMPORT_TEMP_DIRECTORY("atlas.import.temp.directory", ""), IMPORT_TEMP_DIRECTORY("atlas.import.temp.directory", ""),
MIGRATION_IMPORT_START_POSITION("atlas.migration.import.start.position", 0),
LINEAGE_USING_GREMLIN("atlas.lineage.query.use.gremlin", false); LINEAGE_USING_GREMLIN("atlas.lineage.query.use.gremlin", false);
private static final Configuration APPLICATION_PROPERTIES; private static final Configuration APPLICATION_PROPERTIES;
......
...@@ -44,16 +44,10 @@ public class AtlasImportRequest implements Serializable { ...@@ -44,16 +44,10 @@ public class AtlasImportRequest implements Serializable {
public static final String TRANSFORMS_KEY = "transforms"; public static final String TRANSFORMS_KEY = "transforms";
public static final String TRANSFORMERS_KEY = "transformers"; public static final String TRANSFORMERS_KEY = "transformers";
public static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom"; public static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom";
public static final String OPTION_KEY_MIGRATION = "migration"; private static final String START_POSITION_KEY = "startPosition";
public static final String OPTION_KEY_NUM_WORKERS = "numWorkers";
public static final String OPTION_KEY_BATCH_SIZE = "batchSize";
public static final String OPTION_KEY_FORMAT = "format";
public static final String OPTION_KEY_FORMAT_ZIP_DIRECT = "zipDirect";
public static final String START_POSITION_KEY = "startPosition";
private static final String START_GUID_KEY = "startGuid"; private static final String START_GUID_KEY = "startGuid";
private static final String FILE_NAME_KEY = "fileName"; private static final String FILE_NAME_KEY = "fileName";
private static final String UPDATE_TYPE_DEFINITION_KEY = "updateTypeDefinition"; private static final String UPDATE_TYPE_DEFINITION_KEY = "updateTypeDefinition";
private static final String OPTION_KEY_STREAM_SIZE = "size";
private Map<String, String> options; private Map<String, String> options;
...@@ -114,7 +108,7 @@ public class AtlasImportRequest implements Serializable { ...@@ -114,7 +108,7 @@ public class AtlasImportRequest implements Serializable {
return null; return null;
} }
return this.options.get(key); return (String) this.options.get(key);
} }
@JsonIgnore @JsonIgnore
...@@ -127,41 +121,10 @@ public class AtlasImportRequest implements Serializable { ...@@ -127,41 +121,10 @@ public class AtlasImportRequest implements Serializable {
return isReplicationOptionSet() ? options.get(OPTION_KEY_REPLICATED_FROM) : StringUtils.EMPTY; return isReplicationOptionSet() ? options.get(OPTION_KEY_REPLICATED_FROM) : StringUtils.EMPTY;
} }
@JsonIgnore
public int getOptionKeyNumWorkers() {
return getOptionsValue(OPTION_KEY_NUM_WORKERS, 1);
}
@JsonIgnore
public int getOptionKeyBatchSize() {
return getOptionsValue(OPTION_KEY_BATCH_SIZE, 1);
}
private int getOptionsValue(String optionKeyBatchSize, int defaultValue) {
String optionsValue = getOptionForKey(optionKeyBatchSize);
return StringUtils.isEmpty(optionsValue) ?
defaultValue :
Integer.valueOf(optionsValue);
}
@JsonAnySetter @JsonAnySetter
public void setOption(String key, String value) { public void setOption(String key, String value) {
if (null == options) { if (null == options) {
options = new HashMap<>(); options = new HashMap<>();
} }
options.put(key, value); options.put(key, value);
} }}
public void setSizeOption(int size) {
setOption(OPTION_KEY_STREAM_SIZE, Integer.toString(size));
}
public int getSizeOption() {
if (!this.options.containsKey(OPTION_KEY_STREAM_SIZE)) {
return 1;
}
return Integer.valueOf(this.options.get(OPTION_KEY_STREAM_SIZE));
}
}
...@@ -21,7 +21,6 @@ package org.apache.atlas.pc; ...@@ -21,7 +21,6 @@ package org.apache.atlas.pc;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Queue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -38,7 +37,7 @@ public abstract class WorkItemConsumer<T> implements Runnable { ...@@ -38,7 +37,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
private final AtomicBoolean isDirty = new AtomicBoolean(false); private final AtomicBoolean isDirty = new AtomicBoolean(false);
private final AtomicLong maxCommitTimeInMs = new AtomicLong(DEFAULT_COMMIT_TIME_IN_MS); private final AtomicLong maxCommitTimeInMs = new AtomicLong(DEFAULT_COMMIT_TIME_IN_MS);
private CountDownLatch countdownLatch; private CountDownLatch countdownLatch;
private Queue<Object> results; private BlockingQueue<Object> results;
public WorkItemConsumer(BlockingQueue<T> queue) { public WorkItemConsumer(BlockingQueue<T> queue) {
this.queue = queue; this.queue = queue;
...@@ -102,7 +101,11 @@ public abstract class WorkItemConsumer<T> implements Runnable { ...@@ -102,7 +101,11 @@ public abstract class WorkItemConsumer<T> implements Runnable {
protected abstract void processItem(T item); protected abstract void processItem(T item);
protected void addResult(Object value) { protected void addResult(Object value) {
results.add(value); try {
results.put(value);
} catch (InterruptedException e) {
LOG.error("Interrupted while adding result: {}", value);
}
} }
protected void updateCommitTime(long commitTime) { protected void updateCommitTime(long commitTime) {
...@@ -115,7 +118,7 @@ public abstract class WorkItemConsumer<T> implements Runnable { ...@@ -115,7 +118,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
this.countdownLatch = countdownLatch; this.countdownLatch = countdownLatch;
} }
public <V> void setResults(Queue<Object> queue) { public <V> void setResults(BlockingQueue<Object> queue) {
this.results = queue; this.results = queue;
} }
} }
...@@ -23,7 +23,6 @@ import org.slf4j.LoggerFactory; ...@@ -23,7 +23,6 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Queue;
import java.util.concurrent.*; import java.util.concurrent.*;
public class WorkItemManager<T, U extends WorkItemConsumer> { public class WorkItemManager<T, U extends WorkItemConsumer> {
...@@ -34,7 +33,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> { ...@@ -34,7 +33,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
private final ExecutorService service; private final ExecutorService service;
private final List<U> consumers = new ArrayList<>(); private final List<U> consumers = new ArrayList<>();
private CountDownLatch countdownLatch; private CountDownLatch countdownLatch;
private Queue<Object> resultsQueue; private BlockingQueue<Object> resultsQueue;
public WorkItemManager(WorkItemBuilder builder, String namePrefix, int batchSize, int numWorkers, boolean collectResults) { public WorkItemManager(WorkItemBuilder builder, String namePrefix, int batchSize, int numWorkers, boolean collectResults) {
this.numWorkers = numWorkers; this.numWorkers = numWorkers;
...@@ -50,13 +49,13 @@ public class WorkItemManager<T, U extends WorkItemConsumer> { ...@@ -50,13 +49,13 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
this(builder, "workItemConsumer", batchSize, numWorkers, false); this(builder, "workItemConsumer", batchSize, numWorkers, false);
} }
public void setResultsCollection(Queue<Object> resultsQueue) { public void setResultsCollection(BlockingQueue<Object> resultsQueue) {
this.resultsQueue = resultsQueue; this.resultsQueue = resultsQueue;
} }
private void createConsumers(WorkItemBuilder builder, int numWorkers, boolean collectResults) { private void createConsumers(WorkItemBuilder builder, int numWorkers, boolean collectResults) {
if (collectResults) { if (collectResults) {
setResultsCollection(new ConcurrentLinkedQueue<>()); setResultsCollection(new LinkedBlockingQueue<>());
} }
for (int i = 0; i < numWorkers; i++) { for (int i = 0; i < numWorkers; i++) {
...@@ -125,7 +124,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> { ...@@ -125,7 +124,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
LOG.info("WorkItemManager: Shutdown done!"); LOG.info("WorkItemManager: Shutdown done!");
} }
public Queue getResults() { public BlockingQueue getResults() {
return this.resultsQueue; return this.resultsQueue;
} }
......
...@@ -199,10 +199,6 @@ public class GraphTransactionInterceptor implements MethodInterceptor { ...@@ -199,10 +199,6 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
return cache.get(guid); return cache.get(guid);
} }
public static void clearCache() {
guidVertexCache.get().clear();
}
boolean logException(Throwable t) { boolean logException(Throwable t) {
if (t instanceof AtlasBaseException) { if (t instanceof AtlasBaseException) {
Response.Status httpCode = ((AtlasBaseException) t).getAtlasErrorCode().getHttpCode(); Response.Status httpCode = ((AtlasBaseException) t).getAtlasErrorCode().getHttpCode();
......
...@@ -247,8 +247,7 @@ public class AuditsWriter { ...@@ -247,8 +247,7 @@ public class AuditsWriter {
} }
updateReplicationAttribute(replicationOptionState, sourceServerName, sourceServerFullName, entityGuids, updateReplicationAttribute(replicationOptionState, sourceServerName, sourceServerFullName, entityGuids,
Constants.ATTR_NAME_REPLICATED_FROM, Constants.ATTR_NAME_REPLICATED_FROM, result.getExportResult().getChangeMarker());
(result.getExportResult() != null) ? result.getExportResult().getChangeMarker() : 0);
} }
public void add(String userName, String sourceCluster, long startTime, public void add(String userName, String sourceCluster, long startTime,
......
...@@ -92,7 +92,7 @@ public class ImportService { ...@@ -92,7 +92,7 @@ public class ImportService {
request = new AtlasImportRequest(); request = new AtlasImportRequest();
} }
EntityImportStream source = createZipSource(request, inputStream, AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString()); EntityImportStream source = createZipSource(inputStream, AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
return run(source, request, userName, hostName, requestingIP); return run(source, request, userName, hostName, requestingIP);
} }
...@@ -248,18 +248,8 @@ public class ImportService { ...@@ -248,18 +248,8 @@ public class ImportService {
return (int) (endTime - startTime); return (int) (endTime - startTime);
} }
private EntityImportStream createZipSource(AtlasImportRequest request, InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException { private EntityImportStream createZipSource(InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException {
try { try {
if (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION)) {
LOG.info("Migration mode: Detected...", request.getOptions().get("size"));
return getZipDirectEntityImportStream(request, inputStream);
}
if (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_FORMAT) &&
request.getOptions().get(AtlasImportRequest.OPTION_KEY_FORMAT).equals(AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT) ) {
return getZipDirectEntityImportStream(request, inputStream);
}
if (StringUtils.isEmpty(configuredTemporaryDirectory)) { if (StringUtils.isEmpty(configuredTemporaryDirectory)) {
return new ZipSource(inputStream); return new ZipSource(inputStream);
} }
...@@ -270,15 +260,9 @@ public class ImportService { ...@@ -270,15 +260,9 @@ public class ImportService {
} }
} }
private EntityImportStream getZipDirectEntityImportStream(AtlasImportRequest request, InputStream inputStream) throws IOException, AtlasBaseException {
ZipSourceDirect zipSourceDirect = new ZipSourceDirect(inputStream, request.getSizeOption());
LOG.info("Using ZipSourceDirect: Size: {} entities", zipSourceDirect.size());
return zipSourceDirect;
}
@VisibleForTesting @VisibleForTesting
boolean checkHiveTableIncrementalSkipLineage(AtlasImportRequest importRequest, AtlasExportRequest exportRequest) { boolean checkHiveTableIncrementalSkipLineage(AtlasImportRequest importRequest, AtlasExportRequest exportRequest) {
if (exportRequest == null || CollectionUtils.isEmpty(exportRequest.getItemsToExport())) { if (CollectionUtils.isEmpty(exportRequest.getItemsToExport())) {
return false; return false;
} }
......
...@@ -31,8 +31,4 @@ public enum ZipExportFileNames { ...@@ -31,8 +31,4 @@ public enum ZipExportFileNames {
public String toString() { public String toString() {
return this.name; return this.name;
} }
public String toEntryFileName() {
return this.name + ".json";
}
} }
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import org.apache.atlas.entitytransform.BaseEntityHandler;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import static org.apache.atlas.AtlasErrorCode.IMPORT_ATTEMPTING_EMPTY_ZIP;
public class ZipSourceDirect implements EntityImportStream {
private static final Logger LOG = LoggerFactory.getLogger(ZipSourceDirect.class);
private final ZipInputStream zipInputStream;
private int currentPosition;
private ImportTransforms importTransform;
private List<BaseEntityHandler> entityHandlers;
private AtlasTypesDef typesDef;
private ZipEntry zipEntryNext;
private int streamSize = 1;
public ZipSourceDirect(InputStream inputStream, int streamSize) throws IOException, AtlasBaseException {
this.zipInputStream = new ZipInputStream(inputStream);
this.streamSize = streamSize;
prepareStreamForFetch();
}
@Override
public ImportTransforms getImportTransform() { return this.importTransform; }
@Override
public void setImportTransform(ImportTransforms importTransform) {
this.importTransform = importTransform;
}
@Override
public List<BaseEntityHandler> getEntityHandlers() {
return entityHandlers;
}
@Override
public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) {
this.entityHandlers = entityHandlers;
}
@Override
public AtlasTypesDef getTypesDef() throws AtlasBaseException {
return this.typesDef;
}
@Override
public
AtlasExportResult getExportResult() throws AtlasBaseException {
return new AtlasExportResult();
}
@Override
public List<String> getCreationOrder() {
return new ArrayList<>();
}
@Override
public int getPosition() {
return currentPosition;
}
@Override
public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String json) throws AtlasBaseException {
if (StringUtils.isEmpty(json)) {
return null;
}
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, json);
if (importTransform != null) {
entityWithExtInfo = importTransform.apply(entityWithExtInfo);
}
if (entityHandlers != null) {
applyTransformers(entityWithExtInfo);
}
return entityWithExtInfo;
}
@Override
public boolean hasNext() {
return (this.zipEntryNext != null
&& !zipEntryNext.getName().equals(ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toEntryFileName())
&& !zipEntryNext.getName().equals(ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toEntryFileName()));
}
@Override
public AtlasEntity next() {
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = getNextEntityWithExtInfo();
return entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
}
@Override
public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
try {
if (hasNext()) {
String json = moveNext();
return getEntityWithExtInfo(json);
}
} catch (AtlasBaseException e) {
LOG.error("getNextEntityWithExtInfo", e);
}
return null;
}
@Override
public void reset() {
currentPosition = 0;
}
@Override
public AtlasEntity getByGuid(String guid) {
try {
return getEntity(guid);
} catch (AtlasBaseException e) {
LOG.error("getByGuid: {} failed!", guid, e);
return null;
}
}
@Override
public void onImportComplete(String guid) {
}
@Override
public void setPosition(int index) {
try {
for (int i = 0; i < index; i++) {
moveNextEntry();
}
}
catch (IOException e) {
LOG.error("Error setting position: {}. Position may be beyond the stream size.", index);
}
}
@Override
public void setPositionUsingEntityGuid(String guid) {
}
@Override
public void close() {
}
private void applyTransformers(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
if (entityWithExtInfo == null) {
return;
}
transform(entityWithExtInfo.getEntity());
if (MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) {
for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
transform(e);
}
}
}
private void transform(AtlasEntity e) {
for (BaseEntityHandler handler : entityHandlers) {
handler.transform(e);
}
}
private <T> T convertFromJson(Class<T> clazz, String jsonData) throws AtlasBaseException {
try {
return AtlasType.fromJson(jsonData, clazz);
} catch (Exception e) {
throw new AtlasBaseException("Error converting file to JSON.", e);
}
}
private AtlasEntity getEntity(String guid) throws AtlasBaseException {
AtlasEntity.AtlasEntityWithExtInfo extInfo = getEntityWithExtInfo(guid);
return (extInfo != null) ? extInfo.getEntity() : null;
}
public int size() {
return this.streamSize;
}
private String moveNext() {
try {
moveNextEntry();
return getJsonPayloadFromZipEntryStream(this.zipInputStream);
} catch (IOException e) {
LOG.error("moveNext failed!", e);
}
return null;
}
private void moveNextEntry() throws IOException {
this.zipEntryNext = this.zipInputStream.getNextEntry();
this.currentPosition++;
}
private void prepareStreamForFetch() throws AtlasBaseException, IOException {
moveNextEntry();
if (this.zipEntryNext == null) {
throw new AtlasBaseException(IMPORT_ATTEMPTING_EMPTY_ZIP, "Attempting to import empty ZIP.");
}
if (this.zipEntryNext.getName().equals(ZipExportFileNames.ATLAS_TYPESDEF_NAME.toEntryFileName())) {
String json = getJsonPayloadFromZipEntryStream(this.zipInputStream);
this.typesDef = AtlasType.fromJson(json, AtlasTypesDef.class);
}
}
private String getJsonPayloadFromZipEntryStream(ZipInputStream zipInputStream) {
try {
final int BUFFER_LENGTH = 4096;
byte[] buf = new byte[BUFFER_LENGTH];
int n = 0;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
while ((n = zipInputStream.read(buf, 0, BUFFER_LENGTH)) > -1) {
bos.write(buf, 0, n);
}
return bos.toString();
} catch (IOException ex) {
LOG.error("Error fetching string from entry!", ex);
}
return null;
}
}
...@@ -24,7 +24,6 @@ import org.apache.atlas.RequestContext; ...@@ -24,7 +24,6 @@ import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.repository.impexp.ImportService; import org.apache.atlas.repository.impexp.ImportService;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -33,20 +32,11 @@ import java.io.FileInputStream; ...@@ -33,20 +32,11 @@ import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.zip.ZipFile;
import static org.apache.atlas.AtlasConfiguration.MIGRATION_IMPORT_START_POSITION;
public class ZipFileMigrationImporter implements Runnable { public class ZipFileMigrationImporter implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ZipFileMigrationImporter.class); private static final Logger LOG = LoggerFactory.getLogger(ZipFileMigrationImporter.class);
private static final String APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS = "atlas.migration.mode.workers"; private static String ENV_USER_NAME = "user.name";
private static final String APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE = "atlas.migration.mode.batch.size";
private static final String DEFAULT_NUMBER_OF_WORKDERS = "4";
private static final String DEFAULT_BATCH_SIZE = "100";
private static final String ZIP_FILE_COMMENT = "streamSize";
private final static String ENV_USER_NAME = "user.name";
private final ImportService importService; private final ImportService importService;
private final String fileToImport; private final String fileToImport;
...@@ -62,8 +52,7 @@ public class ZipFileMigrationImporter implements Runnable { ...@@ -62,8 +52,7 @@ public class ZipFileMigrationImporter implements Runnable {
FileWatcher fileWatcher = new FileWatcher(fileToImport); FileWatcher fileWatcher = new FileWatcher(fileToImport);
fileWatcher.start(); fileWatcher.start();
int streamSize = getStreamSizeFromComment(fileToImport); performImport(new FileInputStream(new File(fileToImport)));
performImport(new FileInputStream(new File(fileToImport)), streamSize);
} catch (IOException e) { } catch (IOException e) {
LOG.error("Migration Import: IO Error!", e); LOG.error("Migration Import: IO Error!", e);
} catch (AtlasBaseException e) { } catch (AtlasBaseException e) {
...@@ -71,44 +60,19 @@ public class ZipFileMigrationImporter implements Runnable { ...@@ -71,44 +60,19 @@ public class ZipFileMigrationImporter implements Runnable {
} }
} }
private int getStreamSizeFromComment(String fileToImport) { private void performImport(InputStream fs) throws AtlasBaseException {
int ret = 1;
try {
ZipFile zipFile = new ZipFile(fileToImport);
String streamSizeComment = zipFile.getComment();
ret = processZipFileStreamSizeComment(streamSizeComment);
zipFile.close();
} catch (IOException e) {
LOG.error("Error opening ZIP file: {}", fileToImport, e);
}
return ret;
}
private int processZipFileStreamSizeComment(String streamSizeComment) {
if (!StringUtils.isNotEmpty(streamSizeComment) || !StringUtils.startsWith(streamSizeComment, ZIP_FILE_COMMENT)) {
return 1;
}
String s = StringUtils.substringAfter(streamSizeComment, ":");
LOG.debug("ZipFileMigrationImporter: streamSize: {}", streamSizeComment);
return Integer.valueOf(s);
}
private void performImport(InputStream fs, int streamSize) throws AtlasBaseException {
try { try {
LOG.info("Migration Import: {}: Starting...", fileToImport); LOG.info("Migration Import: {}: Starting...", fileToImport);
RequestContext.get().setUser(getUserNameFromEnvironment(), null); RequestContext.get().setUser(getUserNameFromEnvironment(), null);
importService.run(fs, getImportRequest(streamSize), importService.run(fs, getImportRequest(),
getUserNameFromEnvironment(), getUserNameFromEnvironment(),
InetAddress.getLocalHost().getHostName(), InetAddress.getLocalHost().getHostName(),
InetAddress.getLocalHost().getHostAddress()); InetAddress.getLocalHost().getHostAddress());
} catch (Exception ex) { } catch (Exception ex) {
LOG.error("Migration Import: Error loading zip for migration!", ex); LOG.error("Error loading zip for migration", ex);
throw new AtlasBaseException(ex); throw new AtlasBaseException(ex);
} finally { } finally {
LOG.info("Migration Import: {}: Done!", fileToImport); LOG.info("Migration Import: {}: Done!", fileToImport);
...@@ -119,16 +83,8 @@ public class ZipFileMigrationImporter implements Runnable { ...@@ -119,16 +83,8 @@ public class ZipFileMigrationImporter implements Runnable {
return System.getProperty(ENV_USER_NAME); return System.getProperty(ENV_USER_NAME);
} }
private AtlasImportRequest getImportRequest(int streamSize) throws AtlasException { private AtlasImportRequest getImportRequest() throws AtlasException {
AtlasImportRequest request = new AtlasImportRequest(); return new AtlasImportRequest();
request.setSizeOption(streamSize);
request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION, "true");
request.setOption(AtlasImportRequest.OPTION_KEY_NUM_WORKERS, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS, DEFAULT_NUMBER_OF_WORKDERS));
request.setOption(AtlasImportRequest.OPTION_KEY_BATCH_SIZE, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE, DEFAULT_BATCH_SIZE));
request.setOption(AtlasImportRequest.START_POSITION_KEY, Integer.toString(MIGRATION_IMPORT_START_POSITION.getInt()));
return request;
} }
private String getPropertyValue(String property, String defaultValue) throws AtlasException { private String getPropertyValue(String property, String defaultValue) throws AtlasException {
......
...@@ -81,9 +81,9 @@ public class UniqueAttributePatch extends AtlasPatchHandler { ...@@ -81,9 +81,9 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
AtlasGraph graph = getGraph(); AtlasGraph graph = getGraph();
for (AtlasEntityType entityType : typeRegistry.getAllEntityTypes()) { for (AtlasEntityType entityType : typeRegistry.getAllEntityTypes()) {
LOG.info("finding entities of type: {}", entityType.getTypeName()); LOG.info("finding entities of type {}", entityType.getTypeName());
Iterable<Object> iterable = graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, entityType.getTypeName()).vertexIds(); Iterable<Object> iterable = graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, entityType.getTypeName()).vertexIds();
LOG.info("found entities of type: {}", entityType.getTypeName());
int count = 0; int count = 0;
for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); ) { for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); ) {
......
...@@ -150,14 +150,6 @@ public interface AtlasEntityStore { ...@@ -150,14 +150,6 @@ public interface AtlasEntityStore {
EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException; EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException;
/** /**
* Create or update entities with parameters necessary for import process without commit. Caller will have to do take care of commit.
* @param entityStream AtlasEntityStream
* @return EntityMutationResponse Entity mutations operations with the corresponding set of entities on which these operations were performed
* @throws AtlasBaseException
*/
EntityMutationResponse createOrUpdateForImportNoCommit(EntityStream entityStream) throws AtlasBaseException;
/**
* Update a single entity * Update a single entity
* @param objectId ID of the entity * @param objectId ID of the entity
* @param updatedEntityInfo updated entity information * @param updatedEntityInfo updated entity information
......
...@@ -332,11 +332,6 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { ...@@ -332,11 +332,6 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
} }
@Override @Override
public EntityMutationResponse createOrUpdateForImportNoCommit(EntityStream entityStream) throws AtlasBaseException {
return createOrUpdate(entityStream, false, true, true);
}
@Override
@GraphTransaction @GraphTransaction
public EntityMutationResponse updateEntity(AtlasObjectId objectId, AtlasEntityWithExtInfo updatedEntityInfo, boolean isPartialUpdate) throws AtlasBaseException { public EntityMutationResponse updateEntity(AtlasObjectId objectId, AtlasEntityWithExtInfo updatedEntityInfo, boolean isPartialUpdate) throws AtlasBaseException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
...@@ -1215,10 +1210,8 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { ...@@ -1215,10 +1210,8 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
ret.setGuidAssignments(context.getGuidAssignments()); ret.setGuidAssignments(context.getGuidAssignments());
if (!RequestContext.get().isImportInProgress()) {
// Notify the change listeners // Notify the change listeners
entityChangeNotifier.onEntitiesMutated(ret, RequestContext.get().isImportInProgress()); entityChangeNotifier.onEntitiesMutated(ret, RequestContext.get().isImportInProgress());
}
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("<== createOrUpdate()"); LOG.debug("<== createOrUpdate()");
......
...@@ -929,10 +929,6 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore { ...@@ -929,10 +929,6 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
} }
private void sendNotifications(AtlasRelationship ret, OperationType relationshipUpdate) throws AtlasBaseException { private void sendNotifications(AtlasRelationship ret, OperationType relationshipUpdate) throws AtlasBaseException {
if (entityChangeNotifier == null) {
return;
}
entityChangeNotifier.notifyPropagatedEntities(); entityChangeNotifier.notifyPropagatedEntities();
if (notificationsEnabled){ if (notificationsEnabled){
entityChangeNotifier.notifyRelationshipMutation(ret, relationshipUpdate); entityChangeNotifier.notifyRelationshipMutation(ret, relationshipUpdate);
......
...@@ -18,30 +18,33 @@ ...@@ -18,30 +18,33 @@
package org.apache.atlas.repository.store.graph.v2; package org.apache.atlas.repository.store.graph.v2;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.BulkImporter; import org.apache.atlas.repository.store.graph.BulkImporter;
import org.apache.atlas.repository.store.graph.v2.bulkimport.ImportStrategy;
import org.apache.atlas.repository.store.graph.v2.bulkimport.MigrationImport;
import org.apache.atlas.repository.store.graph.v2.bulkimport.RegularImport;
import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.Constants;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.inject.Inject; import javax.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
...@@ -52,24 +55,131 @@ public class BulkImporterImpl implements BulkImporter { ...@@ -52,24 +55,131 @@ public class BulkImporterImpl implements BulkImporter {
private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2.class); private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2.class);
private final AtlasEntityStore entityStore; private final AtlasEntityStore entityStore;
private final EntityGraphRetriever entityGraphRetriever;
private AtlasTypeRegistry typeRegistry; private AtlasTypeRegistry typeRegistry;
private final int MAX_ATTEMPTS = 2;
private boolean directoryBasedImportConfigured;
@Inject @Inject
public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) { public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
this.entityStore = entityStore; this.entityStore = entityStore;
this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.directoryBasedImportConfigured = StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
} }
@Override @Override
public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException { public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
ImportStrategy importStrategy = if (LOG.isDebugEnabled()) {
(importResult.getRequest().getOptions() != null && LOG.debug("==> bulkImport()");
importResult.getRequest().getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION)) }
? new MigrationImport(new AtlasGraphProvider(), this.typeRegistry)
: new RegularImport(this.entityStore, this.typeRegistry); if (entityStream == null || !entityStream.hasNext()) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
}
EntityMutationResponse ret = new EntityMutationResponse();
ret.setGuidAssignments(new HashMap<>());
Set<String> processedGuids = new HashSet<>();
float currentPercent = 0f;
List<String> residualList = new ArrayList<>();
EntityImportStreamWithResidualList entityImportStreamWithResidualList = new EntityImportStreamWithResidualList(entityStream, residualList);
while (entityImportStreamWithResidualList.hasNext()) {
AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo();
AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
if (entity == null) {
continue;
}
for (int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
try {
AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null);
EntityMutationResponse resp = entityStore.createOrUpdateForImport(oneEntityStream);
if (resp.getGuidAssignments() != null) {
ret.getGuidAssignments().putAll(resp.getGuidAssignments());
}
currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids,
entityStream.getPosition(),
entityImportStreamWithResidualList.getStreamSize(),
currentPercent);
entityStream.onImportComplete(entity.getGuid());
break;
} catch (AtlasBaseException e) {
if (!updateResidualList(e, residualList, entityWithExtInfo.getEntity().getGuid())) {
throw e;
}
break;
} catch (AtlasSchemaViolationException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Entity: {}", entity.getGuid(), e);
}
if (attempt == 0) {
updateVertexGuid(entity);
} else {
LOG.error("Guid update failed: {}", entityWithExtInfo.getEntity().getGuid());
throw e;
}
} catch (Throwable e) {
AtlasBaseException abe = new AtlasBaseException(e);
if (!updateResidualList(abe, residualList, entityWithExtInfo.getEntity().getGuid())) {
throw abe;
}
LOG.warn("Exception: {}", entity.getGuid(), e);
break;
} finally {
RequestContext.get().clearCache();
}
}
}
LOG.info("BulkImportImpl: {}", importStrategy.getClass().getSimpleName()); importResult.getProcessedEntities().addAll(processedGuids);
return importStrategy.run(entityStream, importResult); LOG.info("bulkImport(): done. Total number of entities (including referred entities) imported: {}", processedGuids.size());
return ret;
}
@GraphTransaction
public void updateVertexGuid(AtlasEntity entity) {
String entityGuid = entity.getGuid();
AtlasObjectId objectId = entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
String vertexGuid = null;
try {
vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, objectId.getUniqueAttributes());
} catch (AtlasBaseException e) {
LOG.warn("Entity: {}: Does not exist!", objectId);
return;
}
if (StringUtils.isEmpty(vertexGuid) || vertexGuid.equals(entityGuid)) {
return;
}
AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid);
if (v == null) {
return;
}
addHistoricalGuid(v, vertexGuid);
AtlasGraphUtilsV2.setProperty(v, Constants.GUID_PROPERTY_KEY, entityGuid);
LOG.warn("GUID Updated: Entity: {}: from: {}: to: {}", objectId, vertexGuid, entity.getGuid());
}
private void addHistoricalGuid(AtlasVertex v, String vertexGuid) {
String existingJson = AtlasGraphUtilsV2.getProperty(v, HISTORICAL_GUID_PROPERTY_KEY, String.class);
AtlasGraphUtilsV2.setProperty(v, HISTORICAL_GUID_PROPERTY_KEY, getJsonArray(existingJson, vertexGuid));
} }
@VisibleForTesting @VisibleForTesting
...@@ -83,16 +193,38 @@ public class BulkImporterImpl implements BulkImporter { ...@@ -83,16 +193,38 @@ public class BulkImporterImpl implements BulkImporter {
return json; return json;
} }
private boolean updateResidualList(AtlasBaseException e, List<String> lineageList, String guid) {
if (!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode())) {
return false;
}
lineageList.add(guid);
return true;
}
private float updateImportMetrics(AtlasEntity.AtlasEntityWithExtInfo currentEntity,
EntityMutationResponse resp,
AtlasImportResult importResult,
Set<String> processedGuids,
int currentIndex, int streamSize, float currentPercent) {
if (!directoryBasedImportConfigured) {
updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
}
String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid());
return updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported);
}
@VisibleForTesting @VisibleForTesting
public static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) { static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) {
final double tolerance = 0.000001; final double tolerance = 0.000001;
final int MAX_PERCENT = 100; final int MAX_PERCENT = 100;
int maxSize = (currentIndex <= streamSize) ? streamSize : currentIndex; int maxSize = (currentIndex <= streamSize) ? streamSize : currentIndex;
if (maxSize <= 0) {
return currentPercent;
}
float percent = (float) ((currentIndex * MAX_PERCENT) / maxSize); float percent = (float) ((currentIndex * MAX_PERCENT) / maxSize);
boolean updateLog = Double.compare(percent, currentPercent) > tolerance; boolean updateLog = Double.compare(percent, currentPercent) > tolerance;
float updatedPercent = (MAX_PERCENT < maxSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent); float updatedPercent = (MAX_PERCENT < maxSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent);
...@@ -104,7 +236,7 @@ public class BulkImporterImpl implements BulkImporter { ...@@ -104,7 +236,7 @@ public class BulkImporterImpl implements BulkImporter {
return updatedPercent; return updatedPercent;
} }
public static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) { private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
if (list == null) { if (list == null) {
return; return;
} }
...@@ -119,37 +251,41 @@ public class BulkImporterImpl implements BulkImporter { ...@@ -119,37 +251,41 @@ public class BulkImporterImpl implements BulkImporter {
} }
} }
public static void updateVertexGuid(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityGraphRetriever, AtlasEntity entity) { private static class EntityImportStreamWithResidualList {
String entityGuid = entity.getGuid(); private final EntityImportStream stream;
AtlasObjectId objectId = entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity); private final List<String> residualList;
private boolean navigateResidualList;
private int currentResidualListIndex;
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
String vertexGuid = null;
try {
vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, objectId.getUniqueAttributes());
} catch (AtlasBaseException e) {
LOG.warn("Entity: {}: Does not exist!", objectId);
return;
}
if (StringUtils.isEmpty(vertexGuid) || vertexGuid.equals(entityGuid)) { public EntityImportStreamWithResidualList(EntityImportStream stream, List<String> residualList) {
return; this.stream = stream;
this.residualList = residualList;
this.navigateResidualList = false;
this.currentResidualListIndex = 0;
} }
AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid); public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
if (v == null) { if (navigateResidualList == false) {
return; return stream.getNextEntityWithExtInfo();
} else {
stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++));
return stream.getNextEntityWithExtInfo();
} }
addHistoricalGuid(v, vertexGuid);
AtlasGraphUtilsV2.setProperty(v, Constants.GUID_PROPERTY_KEY, entityGuid);
LOG.warn("GUID Updated: Entity: {}: from: {}: to: {}", objectId, vertexGuid, entity.getGuid());
} }
public static void addHistoricalGuid(AtlasVertex v, String vertexGuid) { public boolean hasNext() {
String existingJson = AtlasGraphUtilsV2.getProperty(v, HISTORICAL_GUID_PROPERTY_KEY, String.class); if (!navigateResidualList) {
boolean streamHasNext = stream.hasNext();
navigateResidualList = (streamHasNext == false);
return streamHasNext ? streamHasNext : (currentResidualListIndex < residualList.size());
} else {
return (currentResidualListIndex < residualList.size());
}
}
AtlasGraphUtilsV2.setProperty(v, HISTORICAL_GUID_PROPERTY_KEY, getJsonArray(existingJson, vertexGuid)); public int getStreamSize() {
return stream.size() + residualList.size();
}
} }
} }
...@@ -361,10 +361,8 @@ public class EntityGraphMapper { ...@@ -361,10 +361,8 @@ public class EntityGraphMapper {
updateLabels(vertex, labels); updateLabels(vertex, labels);
if (entityChangeNotifier != null) {
entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), addedLabels, removedLabels); entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), addedLabels, removedLabels);
} }
}
public void addLabels(AtlasVertex vertex, Set<String> labels) throws AtlasBaseException { public void addLabels(AtlasVertex vertex, Set<String> labels) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(labels)) { if (CollectionUtils.isNotEmpty(labels)) {
...@@ -380,13 +378,10 @@ public class EntityGraphMapper { ...@@ -380,13 +378,10 @@ public class EntityGraphMapper {
if (!updatedLabels.equals(existingLabels)) { if (!updatedLabels.equals(existingLabels)) {
updateLabels(vertex, updatedLabels); updateLabels(vertex, updatedLabels);
updatedLabels.removeAll(existingLabels); updatedLabels.removeAll(existingLabels);
if (entityChangeNotifier != null) {
entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), updatedLabels, null); entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), updatedLabels, null);
} }
} }
} }
}
public void removeLabels(AtlasVertex vertex, Set<String> labels) throws AtlasBaseException { public void removeLabels(AtlasVertex vertex, Set<String> labels) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(labels)) { if (CollectionUtils.isNotEmpty(labels)) {
...@@ -400,14 +395,11 @@ public class EntityGraphMapper { ...@@ -400,14 +395,11 @@ public class EntityGraphMapper {
if (!updatedLabels.equals(existingLabels)) { if (!updatedLabels.equals(existingLabels)) {
updateLabels(vertex, updatedLabels); updateLabels(vertex, updatedLabels);
existingLabels.removeAll(updatedLabels); existingLabels.removeAll(updatedLabels);
if (entityChangeNotifier != null) {
entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), null, existingLabels); entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), null, existingLabels);
} }
} }
} }
} }
}
private void updateLabels(AtlasVertex vertex, Set<String> labels) { private void updateLabels(AtlasVertex vertex, Set<String> labels) {
if (CollectionUtils.isNotEmpty(labels)) { if (CollectionUtils.isNotEmpty(labels)) {
...@@ -1956,10 +1948,8 @@ public class EntityGraphMapper { ...@@ -1956,10 +1948,8 @@ public class EntityGraphMapper {
Set<AtlasVertex> vertices = addedClassifications.get(classification); Set<AtlasVertex> vertices = addedClassifications.get(classification);
List<AtlasEntity> propagatedEntities = updateClassificationText(classification, vertices); List<AtlasEntity> propagatedEntities = updateClassificationText(classification, vertices);
if (entityChangeNotifier != null) {
entityChangeNotifier.onClassificationsAddedToEntities(propagatedEntities, Collections.singletonList(classification)); entityChangeNotifier.onClassificationsAddedToEntities(propagatedEntities, Collections.singletonList(classification));
} }
}
RequestContext.get().endMetricRecord(metric); RequestContext.get().endMetricRecord(metric);
} }
...@@ -2066,12 +2056,9 @@ public class EntityGraphMapper { ...@@ -2066,12 +2056,9 @@ public class EntityGraphMapper {
AtlasEntity entity = updateClassificationText(entry.getKey()); AtlasEntity entity = updateClassificationText(entry.getKey());
List<AtlasClassification> deletedClassificationNames = entry.getValue(); List<AtlasClassification> deletedClassificationNames = entry.getValue();
if (entityChangeNotifier != null) {
entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassificationNames); entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassificationNames);
} }
} }
}
private AtlasEntity updateClassificationText(AtlasVertex vertex) throws AtlasBaseException { private AtlasEntity updateClassificationText(AtlasVertex vertex) throws AtlasBaseException {
String guid = GraphHelper.getGuid(vertex); String guid = GraphHelper.getGuid(vertex);
...@@ -2296,7 +2283,6 @@ public class EntityGraphMapper { ...@@ -2296,7 +2283,6 @@ public class EntityGraphMapper {
notificationVertices.addAll(entitiesToPropagateTo); notificationVertices.addAll(entitiesToPropagateTo);
} }
if (entityChangeNotifier != null) {
for (AtlasVertex vertex : notificationVertices) { for (AtlasVertex vertex : notificationVertices) {
String entityGuid = GraphHelper.getGuid(vertex); String entityGuid = GraphHelper.getGuid(vertex);
AtlasEntity entity = instanceConverter.getAndCacheEntity(entityGuid, ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES); AtlasEntity entity = instanceConverter.getAndCacheEntity(entityGuid, ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES);
...@@ -2306,9 +2292,8 @@ public class EntityGraphMapper { ...@@ -2306,9 +2292,8 @@ public class EntityGraphMapper {
entityChangeNotifier.onClassificationUpdatedToEntity(entity, updatedClassifications); entityChangeNotifier.onClassificationUpdatedToEntity(entity, updatedClassifications);
} }
} }
}
if (entityChangeNotifier != null && MapUtils.isNotEmpty(removedPropagations)) { if (MapUtils.isNotEmpty(removedPropagations)) {
for (AtlasClassification classification : removedPropagations.keySet()) { for (AtlasClassification classification : removedPropagations.keySet()) {
List<AtlasVertex> propagatedVertices = removedPropagations.get(classification); List<AtlasVertex> propagatedVertices = removedPropagations.get(classification);
List<AtlasEntity> propagatedEntities = updateClassificationText(classification, propagatedVertices); List<AtlasEntity> propagatedEntities = updateClassificationText(classification, propagatedVertices);
...@@ -2541,7 +2526,7 @@ public class EntityGraphMapper { ...@@ -2541,7 +2526,7 @@ public class EntityGraphMapper {
private List<AtlasEntity> updateClassificationText(AtlasClassification classification, Collection<AtlasVertex> propagatedVertices) throws AtlasBaseException { private List<AtlasEntity> updateClassificationText(AtlasClassification classification, Collection<AtlasVertex> propagatedVertices) throws AtlasBaseException {
List<AtlasEntity> propagatedEntities = new ArrayList<>(); List<AtlasEntity> propagatedEntities = new ArrayList<>();
if (fullTextMapperV2 != null && CollectionUtils.isNotEmpty(propagatedVertices)) { if(CollectionUtils.isNotEmpty(propagatedVertices)) {
for(AtlasVertex vertex : propagatedVertices) { for(AtlasVertex vertex : propagatedVertices) {
AtlasEntity entity = instanceConverter.getAndCacheEntity(GraphHelper.getGuid(vertex), ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES); AtlasEntity entity = instanceConverter.getAndCacheEntity(GraphHelper.getGuid(vertex), ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.bulkimport;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
public abstract class ImportStrategy {
public abstract EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException;
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.bulkimport;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.converters.AtlasFormatConverters;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
import org.apache.atlas.repository.store.graph.v2.AtlasRelationshipStoreV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.EntityConsumerBuilder;
import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.EntityCreationManager;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MigrationImport extends ImportStrategy {
private static final Logger LOG = LoggerFactory.getLogger(MigrationImport.class);
private final AtlasTypeRegistry typeRegistry;
private AtlasGraph atlasGraph;
private EntityGraphRetriever entityGraphRetriever;
private EntityGraphMapper entityGraphMapper;
private AtlasEntityStore entityStore;
public MigrationImport(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry) {
this.typeRegistry = typeRegistry;
setupEntityStore(atlasGraphProvider, typeRegistry);
LOG.info("MigrationImport: Using bulkLoading...");
}
public EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
if (entityStream == null || !entityStream.hasNext()) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
}
if (importResult.getRequest() == null) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "importResult should contain request");
}
int index = 0;
int streamSize = entityStream.size();
EntityMutationResponse ret = new EntityMutationResponse();
EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, importResult, streamSize);
try {
LOG.info("Migration Import: Size: {}: Starting...", streamSize);
index = creationManager.read(entityStream);
creationManager.drain();
creationManager.extractResults();
} catch (Exception ex) {
LOG.error("Migration Import: Error: Current position: {}", index, ex);
} finally {
shutdownEntityCreationManager(creationManager);
}
LOG.info("Migration Import: Size: {}: Done!", streamSize);
return ret;
}
private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph, AtlasImportResult importResult, int streamSize) {
int batchSize = importResult.getRequest().getOptionKeyBatchSize();
int numWorkers = getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers());
EntityConsumerBuilder consumerBuilder =
new EntityConsumerBuilder(threadedAtlasGraph, entityStore, entityGraphRetriever, typeRegistry, batchSize);
return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult, streamSize);
}
private static int getNumWorkers(int numWorkersFromOptions) {
int ret = (numWorkersFromOptions > 0) ? numWorkersFromOptions : 1;
LOG.info("Migration Import: Setting numWorkers: {}", ret);
return ret;
}
private void setupEntityStore(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry) {
this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
this.atlasGraph = atlasGraphProvider.getBulkLoading();
DeleteHandlerDelegate deleteDelegate = new DeleteHandlerDelegate(typeRegistry);
AtlasRelationshipStore relationshipStore = new AtlasRelationshipStoreV2(typeRegistry, deleteDelegate, null);
AtlasFormatConverters formatConverters = new AtlasFormatConverters(typeRegistry);
AtlasInstanceConverter instanceConverter = new AtlasInstanceConverter(typeRegistry, formatConverters);
this.entityGraphMapper = new EntityGraphMapper(deleteDelegate, typeRegistry, atlasGraph, relationshipStore, null, instanceConverter, null);
this.entityStore = new AtlasEntityStoreV2(deleteDelegate, typeRegistry, null, entityGraphMapper);
}
private void shutdownEntityCreationManager(EntityCreationManager creationManager) {
try {
creationManager.shutdown();
} catch (InterruptedException e) {
LOG.error("Migration Import: Shutdown: Interrupted!", e);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.bulkimport;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.apache.atlas.repository.Constants.HISTORICAL_GUID_PROPERTY_KEY;
import static org.apache.atlas.repository.store.graph.v2.BulkImporterImpl.updateImportProgress;
public class RegularImport extends ImportStrategy {
private static final Logger LOG = LoggerFactory.getLogger(RegularImport.class);
private static final int MAX_ATTEMPTS = 3;
private final AtlasEntityStore entityStore;
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphRetriever entityGraphRetriever;
private boolean directoryBasedImportConfigured;
public RegularImport(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
this.entityStore = entityStore;
this.typeRegistry = typeRegistry;
this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
this.directoryBasedImportConfigured = StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
}
@Override
public EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> bulkImport()");
}
if (entityStream == null || !entityStream.hasNext()) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
}
EntityMutationResponse ret = new EntityMutationResponse();
ret.setGuidAssignments(new HashMap<>());
Set<String> processedGuids = new HashSet<>();
float currentPercent = 0f;
List<String> residualList = new ArrayList<>();
EntityImportStreamWithResidualList entityImportStreamWithResidualList = new EntityImportStreamWithResidualList(entityStream, residualList);
while (entityImportStreamWithResidualList.hasNext()) {
AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo();
AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
if (entity == null) {
continue;
}
for (int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
try {
AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null);
EntityMutationResponse resp = entityStore.createOrUpdateForImport(oneEntityStream);
if (resp.getGuidAssignments() != null) {
ret.getGuidAssignments().putAll(resp.getGuidAssignments());
}
currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids,
entityStream.getPosition(),
entityImportStreamWithResidualList.getStreamSize(),
currentPercent);
entityStream.onImportComplete(entity.getGuid());
break;
} catch (AtlasBaseException e) {
if (!updateResidualList(e, residualList, entityWithExtInfo.getEntity().getGuid())) {
throw e;
}
break;
} catch (AtlasSchemaViolationException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Entity: {}", entity.getGuid(), e);
}
if (attempt == 0) {
updateVertexGuid(entity);
} else {
LOG.error("Guid update failed: {}", entityWithExtInfo.getEntity().getGuid());
throw e;
}
} catch (Throwable e) {
AtlasBaseException abe = new AtlasBaseException(e);
if (!updateResidualList(abe, residualList, entityWithExtInfo.getEntity().getGuid())) {
throw abe;
}
LOG.warn("Exception: {}", entity.getGuid(), e);
break;
} finally {
RequestContext.get().clearCache();
}
}
}
importResult.getProcessedEntities().addAll(processedGuids);
LOG.info("bulkImport(): done. Total number of entities (including referred entities) imported: {}", processedGuids.size());
return ret;
}
@GraphTransaction
public void updateVertexGuid(AtlasEntity entity) {
String entityGuid = entity.getGuid();
AtlasObjectId objectId = entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
String vertexGuid = null;
try {
vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, objectId.getUniqueAttributes());
} catch (AtlasBaseException e) {
LOG.warn("Entity: {}: Does not exist!", objectId);
return;
}
if (StringUtils.isEmpty(vertexGuid) || vertexGuid.equals(entityGuid)) {
return;
}
AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid);
if (v == null) {
return;
}
addHistoricalGuid(v, vertexGuid);
AtlasGraphUtilsV2.setProperty(v, Constants.GUID_PROPERTY_KEY, entityGuid);
LOG.warn("GUID Updated: Entity: {}: from: {}: to: {}", objectId, vertexGuid, entity.getGuid());
}
private void addHistoricalGuid(AtlasVertex v, String vertexGuid) {
String existingJson = AtlasGraphUtilsV2.getProperty(v, HISTORICAL_GUID_PROPERTY_KEY, String.class);
AtlasGraphUtilsV2.setProperty(v, HISTORICAL_GUID_PROPERTY_KEY, getJsonArray(existingJson, vertexGuid));
}
@VisibleForTesting
static String getJsonArray(String json, String vertexGuid) {
String quotedGuid = String.format("\"%s\"", vertexGuid);
if (StringUtils.isEmpty(json)) {
json = String.format("[%s]", quotedGuid);
} else {
json = json.replace("]", "").concat(",").concat(quotedGuid).concat("]");
}
return json;
}
private boolean updateResidualList(AtlasBaseException e, List<String> lineageList, String guid) {
if (!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode())) {
return false;
}
lineageList.add(guid);
return true;
}
private float updateImportMetrics(AtlasEntity.AtlasEntityWithExtInfo currentEntity,
EntityMutationResponse resp,
AtlasImportResult importResult,
Set<String> processedGuids,
int currentIndex, int streamSize, float currentPercent) {
if (!directoryBasedImportConfigured) {
BulkImporterImpl.updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
BulkImporterImpl.updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
BulkImporterImpl.updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
}
String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid());
return updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported);
}
private static class EntityImportStreamWithResidualList {
private final EntityImportStream stream;
private final List<String> residualList;
private boolean navigateResidualList;
private int currentResidualListIndex;
public EntityImportStreamWithResidualList(EntityImportStream stream, List<String> residualList) {
this.stream = stream;
this.residualList = residualList;
this.navigateResidualList = false;
this.currentResidualListIndex = 0;
}
public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
if (navigateResidualList == false) {
return stream.getNextEntityWithExtInfo();
} else {
stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++));
return stream.getNextEntityWithExtInfo();
}
}
public boolean hasNext() {
if (!navigateResidualList) {
boolean streamHasNext = stream.hasNext();
navigateResidualList = (streamHasNext == false);
return streamHasNext ? streamHasNext : (currentResidualListIndex < residualList.size());
} else {
return (currentResidualListIndex < residualList.size());
}
}
public int getStreamSize() {
return stream.size() + residualList.size();
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.pc.WorkItemConsumer;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport;
import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWithExtInfo> {
private static final Logger LOG = LoggerFactory.getLogger(EntityConsumer.class);
private static final int MAX_COMMIT_RETRY_COUNT = 3;
private final int batchSize;
private AtomicLong counter = new AtomicLong(1);
private AtomicLong currentBatch = new AtomicLong(1);
private final AtlasGraph atlasGraph;
private final AtlasEntityStore entityStoreV2;
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphRetriever entityGraphRetriever;
private List<AtlasEntity.AtlasEntityWithExtInfo> entityBuffer = new ArrayList<>();
private List<EntityMutationResponse> localResults = new ArrayList<>();
public EntityConsumer(AtlasGraph atlasGraph, AtlasEntityStore entityStore,
EntityGraphRetriever entityGraphRetriever, AtlasTypeRegistry typeRegistry,
BlockingQueue queue, int batchSize) {
super(queue);
this.atlasGraph = atlasGraph;
this.entityStoreV2 = entityStore;
this.entityGraphRetriever = entityGraphRetriever;
this.typeRegistry = typeRegistry;
this.batchSize = batchSize;
}
@Override
protected void processItem(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
int delta = (MapUtils.isEmpty(entityWithExtInfo.getReferredEntities())
? 1
: entityWithExtInfo.getReferredEntities().size()) + 1;
long currentCount = counter.addAndGet(delta);
currentBatch.addAndGet(delta);
entityBuffer.add(entityWithExtInfo);
try {
processEntity(entityWithExtInfo, currentCount);
attemptCommit();
} catch (Exception e) {
LOG.info("Data loss: Please re-submit!", e);
}
}
private void processEntity(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, long currentCount) {
try {
RequestContext.get().setImportInProgress(true);
AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null);
LOG.debug("Processing: {}", currentCount);
EntityMutationResponse result = entityStoreV2.createOrUpdateForImportNoCommit(oneEntityStream);
localResults.add(result);
} catch (AtlasBaseException e) {
addResult(entityWithExtInfo.getEntity().getGuid());
LOG.warn("Exception: {}", entityWithExtInfo.getEntity().getGuid(), e);
} catch (AtlasSchemaViolationException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Entity: {}", entityWithExtInfo.getEntity().getGuid(), e);
}
BulkImporterImpl.updateVertexGuid(typeRegistry, entityGraphRetriever, entityWithExtInfo.getEntity());
}
}
private void attemptCommit() {
if (currentBatch.get() < batchSize) {
return;
}
doCommit();
}
@Override
protected void doCommit() {
for (int retryCount = 1; retryCount <= MAX_COMMIT_RETRY_COUNT; retryCount++) {
if (commitWithRetry(retryCount)) {
return;
}
}
LOG.error("Retries exceeded! Potential data loss! Please correct data and re-attempt. Buffer: {}: Counter: {}", entityBuffer.size(), counter.get());
clear();
}
@Override
protected void commitDirty() {
super.commitDirty();
LOG.info("Total: Commit: {}", counter.get());
counter.set(0);
}
private boolean commitWithRetry(int retryCount) {
try {
atlasGraph.commit();
if (LOG.isDebugEnabled()) {
LOG.debug("Commit: Done!: Buffer: {}: Batch: {}: Counter: {}", entityBuffer.size(), currentBatch.get(), counter.get());
}
dispatchResults();
return true;
} catch (Exception ex) {
rollbackPauseRetry(retryCount, ex);
return false;
}
}
private void rollbackPauseRetry(int retryCount, Exception ex) {
atlasGraph.rollback();
clearCache();
LOG.error("Rollback: Done! Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount);
pause(retryCount);
if (ex.getClass().getName().endsWith("JanusGraphException") && retryCount >= MAX_COMMIT_RETRY_COUNT) {
LOG.warn("Commit error! Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount, ex);
} else {
LOG.info("Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount);
}
retryProcessEntity(retryCount);
}
private void retryProcessEntity(int retryCount) {
LOG.info("Replaying: Starting!: Buffer: {}: Retry count: {}", entityBuffer.size(), retryCount);
for (AtlasEntity.AtlasEntityWithExtInfo e : entityBuffer) {
processEntity(e, counter.get());
}
LOG.info("Replaying: Done!: Buffer: {}: Retry count: {}", entityBuffer.size(), retryCount);
}
private void dispatchResults() {
localResults.stream().forEach(x -> {
addResultsFromResponse(x.getCreatedEntities());
addResultsFromResponse(x.getUpdatedEntities());
addResultsFromResponse(x.getDeletedEntities());
});
clear();
}
private void pause(int retryCount) {
try {
Thread.sleep(1000 * retryCount);
} catch (InterruptedException e) {
LOG.error("pause: Interrupted!", e);
}
}
private void addResultsFromResponse(List<AtlasEntityHeader> entities) {
if (CollectionUtils.isEmpty(entities)) {
return;
}
for (AtlasEntityHeader eh : entities) {
addResult(eh.getGuid());
}
}
private void clear() {
localResults.clear();
entityBuffer.clear();
clearCache();
currentBatch.set(0);
}
private void clearCache() {
GraphTransactionInterceptor.clearCache();
RequestContext.get().clearCache();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.pc.WorkItemBuilder;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.type.AtlasTypeRegistry;
import java.util.concurrent.BlockingQueue;
public class EntityConsumerBuilder implements WorkItemBuilder<EntityConsumer, AtlasEntity.AtlasEntityWithExtInfo> {
private AtlasGraph atlasGraph;
private AtlasEntityStore entityStore;
private final EntityGraphRetriever entityGraphRetriever;
private final AtlasTypeRegistry typeRegistry;
private int batchSize;
public EntityConsumerBuilder(AtlasGraph atlasGraph, AtlasEntityStore entityStore,
EntityGraphRetriever entityGraphRetriever, AtlasTypeRegistry typeRegistry, int batchSize) {
this.atlasGraph = atlasGraph;
this.entityStore = entityStore;
this.entityGraphRetriever = entityGraphRetriever;
this.typeRegistry = typeRegistry;
this.batchSize = batchSize;
}
@Override
public EntityConsumer build(BlockingQueue<AtlasEntity.AtlasEntityWithExtInfo> queue) {
return new EntityConsumer(atlasGraph, entityStore, entityGraphRetriever, typeRegistry, queue, this.batchSize);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.pc.WorkItemBuilder;
import org.apache.atlas.pc.WorkItemManager;
import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManager {
private static final Logger LOG = LoggerFactory.getLogger(EntityCreationManager.class);
private static final String WORKER_PREFIX = "migration-import";
private final StatusReporter<String, String> statusReporter;
private final AtlasImportResult importResult;
private final int streamSize;
private final long STATUS_REPORT_TIMEOUT_DURATION = 5 * 60 * 1000; // 5 min
private String currentTypeName;
private float currentPercent;
public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, AtlasImportResult importResult, int streamSize) {
super(builder, WORKER_PREFIX, batchSize, numWorkers, true);
this.importResult = importResult;
this.streamSize = streamSize;
this.statusReporter = new StatusReporter<>(STATUS_REPORT_TIMEOUT_DURATION);
}
public int read(EntityImportStream entityStream) {
int currentIndex = 0;
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo;
while ((entityWithExtInfo = entityStream.getNextEntityWithExtInfo()) != null) {
AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
if (entity == null) {
continue;
}
try {
produce(currentIndex++, entity.getTypeName(), entityWithExtInfo);
} catch (Throwable e) {
LOG.warn("Exception: {}", entity.getGuid(), e);
break;
}
}
return currentIndex;
}
private void produce(int currentIndex, String typeName, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
String previousTypeName = getCurrentTypeName();
if (StringUtils.isNotEmpty(typeName)
&& StringUtils.isNotEmpty(previousTypeName)
&& !StringUtils.equals(previousTypeName, typeName)) {
LOG.info("Waiting: '{}' to complete...", previousTypeName);
super.drain();
LOG.info("Switching entity type processing: From: '{}' To: '{}'...", previousTypeName, typeName);
}
setCurrentTypeName(typeName);
statusReporter.produced(entityWithExtInfo.getEntity().getGuid(), String.format("%s:%s", entityWithExtInfo.getEntity().getTypeName(), currentIndex));
super.checkProduce(entityWithExtInfo);
extractResults();
}
public void extractResults() {
Object result;
while (((result = getResults().poll())) != null) {
statusReporter.processed((String) result);
}
logStatus();
}
private void logStatus() {
String ack = statusReporter.ack();
if (StringUtils.isEmpty(ack)) {
return;
}
String[] split = ack.split(":");
if (split.length == 0 || split.length < 2) {
return;
}
importResult.incrementMeticsCounter(split[0]);
this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(split[1]), getStreamSize(), getCurrentPercent());
}
private static float updateImportMetrics(String typeNameGuid, int currentIndex, int streamSize, float currentPercent) {
String lastEntityImported = String.format("entity:last-imported:%s:(%s)", typeNameGuid, currentIndex);
return BulkImporterImpl.updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported);
}
private String getCurrentTypeName() {
return this.currentTypeName;
}
private void setCurrentTypeName(String typeName) {
this.currentTypeName = typeName;
}
private float getCurrentPercent() {
return this.currentPercent;
}
private int getStreamSize() {
return this.streamSize;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;
import org.apache.atlas.v1.typesystem.types.utils.TypesUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
public class StatusReporter<T, U> {
private static final Logger LOG = LoggerFactory.getLogger(StatusReporter.class);
private Map<T,U> producedItems = new LinkedHashMap<>();
private Set<T> processedSet = new HashSet<>();
private TypesUtil.Pair<T, Long> watchedItem;
private final long timeOut;
public StatusReporter(long timeOut) {
this.timeOut = timeOut;
}
public void produced(T item, U index) {
this.producedItems.put(item, index);
}
public void processed(T item) {
this.processedSet.add(item);
}
public void processed(T[] index) {
this.processedSet.addAll(Arrays.asList(index));
}
public U ack() {
U ack = null;
U ret;
Map.Entry<T, U> firstElement;
do {
firstElement = getFirstElement(this.producedItems);
ret = completionIndex(firstElement);
if (ret != null) {
ack = ret;
}
} while(ret != null);
return addToWatchIfNeeded(ack, firstElement);
}
private U addToWatchIfNeeded(U ack, Map.Entry<T, U> firstElement) {
if (ack == null && firstElement != null) {
ack = addToWatch(firstElement.getKey());
} else {
resetWatchItem();
}
return ack;
}
private void resetWatchItem() {
this.watchedItem = null;
}
private U addToWatch(T key) {
createNewWatchItem(key);
if (!hasTimedOut(this.watchedItem)) {
return null;
}
T producedItemKey = this.watchedItem.left;
resetWatchItem();
LOG.warn("Item: {}: Was produced but not successfully processed!", producedItemKey);
return this.producedItems.get(producedItemKey);
}
private void createNewWatchItem(T key) {
if (this.watchedItem != null) {
return;
}
this.watchedItem = new TypesUtil.Pair<T, Long>(key, System.currentTimeMillis());
}
private boolean hasTimedOut(TypesUtil.Pair<T, Long> watchedItem) {
if (watchedItem == null) {
return false;
}
return (System.currentTimeMillis() - watchedItem.right) >= timeOut;
}
private Map.Entry<T, U> getFirstElement(Map<T, U> map) {
if (map.isEmpty()) {
return null;
}
return map.entrySet().iterator().next();
}
private U completionIndex(Map.Entry<T, U> lookFor) {
U ack = null;
if (lookFor == null || !processedSet.contains(lookFor.getKey())) {
return ack;
}
ack = lookFor.getValue();
producedItems.remove(lookFor.getKey());
processedSet.remove(lookFor);
return ack;
}
}
...@@ -136,11 +136,6 @@ public class ImportServiceTest extends ExportImportTestBase { ...@@ -136,11 +136,6 @@ public class ImportServiceTest extends ExportImportTestBase {
return getZipSource("dup_col_deleted.zip"); return getZipSource("dup_col_deleted.zip");
} }
@DataProvider(name = "zipDirect1")
public static Object[][] getZipDirect(ITestContext context) throws IOException, AtlasBaseException {
return getZipSource("dup_col_deleted.zip");
}
@Test(dataProvider = "sales") @Test(dataProvider = "sales")
public void importDB1(InputStream inputStream) throws AtlasBaseException, IOException { public void importDB1(InputStream inputStream) throws AtlasBaseException, IOException {
loadBaseModel(); loadBaseModel();
...@@ -535,17 +530,6 @@ public class ImportServiceTest extends ExportImportTestBase { ...@@ -535,17 +530,6 @@ public class ImportServiceTest extends ExportImportTestBase {
} }
} }
@Test(dataProvider = "zipDirect1")
public void zipSourceDirect(InputStream inputStream) throws IOException, AtlasBaseException {
loadBaseModel();
loadFsModel();
loadHiveModel();
runImportWithNoParameters(importService, inputStream);
}
private AtlasImportRequest getImportRequest(String replicatedFrom){ private AtlasImportRequest getImportRequest(String replicatedFrom){
AtlasImportRequest importRequest = getDefaultImportRequest(); AtlasImportRequest importRequest = getDefaultImportRequest();
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import com.google.inject.Inject;
import org.apache.atlas.TestModules;
import org.apache.atlas.discovery.EntityDiscoveryService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.bulkimport.MigrationImport;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.IOException;
import java.io.InputStream;
import static org.testng.Assert.assertNotNull;
@Guice(modules = TestModules.TestOnlyModule.class)
public class MigrationImportTest extends ExportImportTestBase {
private final ImportService importService;
@Inject
AtlasTypeRegistry typeRegistry;
@Inject
private AtlasTypeDefStore typeDefStore;
@Inject
private EntityDiscoveryService discoveryService;
@Inject
AtlasEntityStore entityStore;
@Inject
AtlasGraph atlasGraph;
@Inject
public MigrationImportTest(ImportService importService) {
this.importService = importService;
}
@Test
public void simpleImport() throws IOException, AtlasBaseException {
InputStream inputStream = ZipFileResourceTestUtils.getFileInputStream("zip-direct-2.zip");
AtlasImportRequest importRequest = new AtlasImportRequest();
importRequest.setOption("migration", "true");
AtlasImportResult result = importService.run(inputStream, importRequest, null, null, null);
assertNotNull(result);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.StatusReporter;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
public class StatusReporterTest {
@Test
public void noneProducedNoneReported() {
StatusReporter<Integer, Integer> statusReporter = new StatusReporter<>(100);
assertNull(statusReporter.ack());
}
@Test
public void producedButNotAcknowledged() {
StatusReporter<Integer, Integer> statusReporter = createStatusReportWithItems();
assertNull(statusReporter.ack());
}
@Test
public void producedAcknowledged() {
StatusReporter<Integer, Integer> statusReporter = createStatusReportWithItems();
statusReporter.processed(1);
assertEquals(java.util.Optional.of(100).get(), statusReporter.ack());
}
@Test
public void producedAcknowledgeMaxAvailableInSequence() {
StatusReporter<Integer, Integer> statusReporter = createStatusReportWithItems();
statusReporter.processed(new Integer[]{1, 3, 5});
assertEquals(java.util.Optional.of(100).get(), statusReporter.ack());
}
@Test
public void producedAcknowledgeMaxAvailableInSequence2() {
StatusReporter<Integer, Integer> statusReporter = createStatusReportWithItems();
statusReporter.processed(new Integer[]{1, 2, 3, 6, 5});
assertEquals(java.util.Optional.of(300).get(), statusReporter.ack());
}
@Test
public void producedSetDisjointWithAckSet() {
StatusReporter<Integer, Integer> statusReporter = new StatusReporter(100);
statusReporter.produced(11, 1000);
statusReporter.produced(12, 2000);
statusReporter.produced(13, 3000);
statusReporter.processed(new Integer[]{1, 11, 12, 13});
assertEquals(java.util.Optional.of(3000).get(), statusReporter.ack());
}
@Test
public void missingAck() throws InterruptedException {
StatusReporter<Integer, Integer> statusReporter = createStatusReportWithItems(2, 3, 4);
assertNull(statusReporter.ack());
Thread.sleep(1002);
assertEquals(java.util.Optional.of(100).get(), statusReporter.ack());
}
private StatusReporter<Integer, Integer> createStatusReportWithItems(Integer... processed) {
StatusReporter<Integer, Integer> statusReporter = new StatusReporter(1000);
statusReporter.produced(1, 100);
statusReporter.produced(2, 200);
statusReporter.produced(3, 300);
statusReporter.produced(4, 400);
statusReporter.produced(5, 500);
statusReporter.produced(6, 600);
statusReporter.processed(processed);
return statusReporter;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.testng.annotations.Test;
import java.io.IOException;
import java.io.InputStream;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
public class ZipDirectTest {
@Test(expectedExceptions = AtlasBaseException.class)
public void loadFileEmpty() throws IOException, AtlasBaseException {
InputStream inputStream = ZipFileResourceTestUtils.getFileInputStream("zip-direct-1.zip");
new ZipSourceDirect(inputStream, 1);
}
@Test
public void loadFile() throws IOException, AtlasBaseException {
final int EXPECTED_ENTITY_COUNT = 3434;
InputStream inputStream = ZipFileResourceTestUtils.getFileInputStream("zip-direct-2.zip");
ZipSourceDirect zipSourceDirect = new ZipSourceDirect(inputStream, EXPECTED_ENTITY_COUNT);
assertNotNull(zipSourceDirect);
assertNotNull(zipSourceDirect.getTypesDef());
assertTrue(zipSourceDirect.getTypesDef().getEntityDefs().size() > 0);
assertNotNull(zipSourceDirect.getExportResult());
int count = 0;
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo;
while((entityWithExtInfo = zipSourceDirect.getNextEntityWithExtInfo()) != null) {
assertNotNull(entityWithExtInfo);
count++;
}
assertEquals(count, EXPECTED_ENTITY_COUNT);
}
}
...@@ -317,9 +317,7 @@ public class ZipFileResourceTestUtils { ...@@ -317,9 +317,7 @@ public class ZipFileResourceTestUtils {
} }
public static AtlasImportRequest getDefaultImportRequest() { public static AtlasImportRequest getDefaultImportRequest() {
AtlasImportRequest atlasImportRequest = new AtlasImportRequest(); return new AtlasImportRequest();
atlasImportRequest.setOption("migration", "true");
return atlasImportRequest;
} }
...@@ -338,8 +336,7 @@ public class ZipFileResourceTestUtils { ...@@ -338,8 +336,7 @@ public class ZipFileResourceTestUtils {
final String hostName = "localhost"; final String hostName = "localhost";
final String userName = "admin"; final String userName = "admin";
AtlasImportRequest request = getDefaultImportRequest(); AtlasImportResult result = importService.run(inputStream, userName, hostName, requestingIP);
AtlasImportResult result = runImportWithParameters(importService, request, inputStream);
assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS); assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS);
return result; return result;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment