Commit 3d0c9877 by Ashutosh Mestry

ATLAS-3663: ZipFileMigrator: Automatic Resume During Migration.

parent 60f878e5
......@@ -44,12 +44,13 @@ public class AtlasImportRequest implements Serializable {
public static final String TRANSFORMS_KEY = "transforms";
public static final String TRANSFORMERS_KEY = "transformers";
public static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom";
public static final String OPTION_KEY_MIGRATION_FILE_NAME = "migrationFileName";
public static final String OPTION_KEY_MIGRATION = "migration";
public static final String OPTION_KEY_NUM_WORKERS = "numWorkers";
public static final String OPTION_KEY_BATCH_SIZE = "batchSize";
public static final String OPTION_KEY_FORMAT = "format";
public static final String OPTION_KEY_FORMAT_ZIP_DIRECT = "zipDirect";
private static final String START_POSITION_KEY = "startPosition";
public static final String START_POSITION_KEY = "startPosition";
private static final String START_GUID_KEY = "startGuid";
private static final String FILE_NAME_KEY = "fileName";
private static final String UPDATE_TYPE_DEFINITION_KEY = "updateTypeDefinition";
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.model.migration;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.atlas.model.AtlasBaseModelObject;
import org.apache.atlas.model.impexp.MigrationStatus;
import org.apache.commons.lang.StringUtils;
import java.io.Serializable;
import java.util.Date;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class MigrationImportStatus extends MigrationStatus {
private String name;
public MigrationImportStatus() {
}
public MigrationImportStatus(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(", name=").append(name);
sb.append(super.toString());
return sb.toString();
}
}
......@@ -18,6 +18,9 @@
package org.apache.atlas.pc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedHashMap;
......@@ -25,8 +28,20 @@ import java.util.Map;
import java.util.Set;
public class StatusReporter<T, U> {
private static final Logger LOG = LoggerFactory.getLogger(StatusReporter.class);
private Map<T,U> producedItems = new LinkedHashMap<>();
private Set<T> processedSet = new HashSet<>();
private long timeoutDuration;
private long lastAck;
public StatusReporter() {
this.timeoutDuration = -1;
}
public StatusReporter(long timeoutDurationInMs) {
this.timeoutDuration = timeoutDurationInMs;
}
public void produced(T item, U index) {
this.producedItems.put(item, index);
......@@ -44,7 +59,8 @@ public class StatusReporter<T, U> {
U ack = null;
U ret;
do {
ret = completionIndex(getFirstElement(this.producedItems));
Map.Entry<T, U> firstElement = getFirstElement(this.producedItems);
ret = completionIndex(firstElement);
if (ret != null) {
ack = ret;
}
......@@ -63,13 +79,32 @@ public class StatusReporter<T, U> {
private U completionIndex(Map.Entry<T, U> lookFor) {
U ack = null;
if (lookFor == null || !processedSet.contains(lookFor.getKey())) {
if (lookFor == null) {
return ack;
}
ack = lookFor.getValue();
if (hasTimeoutDurationReached(System.currentTimeMillis())) {
LOG.warn("Ack: Timeout: {} - {}", lookFor.getKey(), lookFor.getValue());
return acknowledged(lookFor);
}
if (!processedSet.contains(lookFor.getKey())) {
return ack;
}
return acknowledged(lookFor);
}
private U acknowledged(Map.Entry<T, U> lookFor) {
U ack = lookFor.getValue();
producedItems.remove(lookFor.getKey());
processedSet.remove(lookFor);
return ack;
}
private boolean hasTimeoutDurationReached(long now) {
boolean b = (this.timeoutDuration > -1) && (this.lastAck != 0) && ((now - this.lastAck) >= timeoutDuration);
lastAck = System.currentTimeMillis();
return b;
}
}
......@@ -23,6 +23,8 @@ import org.testng.annotations.Test;
import java.util.concurrent.BlockingQueue;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
public class StatusReporterTest {
private static class IntegerConsumer extends WorkItemConsumer<Integer> {
......@@ -91,4 +93,20 @@ public class StatusReporterTest {
statusReporter.processed((Integer) result);
}
}
@Test
public void reportWithTimeout() throws InterruptedException {
StatusReporter<Integer, Integer> statusReporter = new StatusReporter<>(2000);
statusReporter.produced(1, 100);
statusReporter.produced(2, 200);
statusReporter.processed(2);
Integer ack = statusReporter.ack();
assertNull(ack);
Thread.sleep(3000);
ack = statusReporter.ack();
assertNotNull(ack);
assertEquals(ack, Integer.valueOf(200));
}
}
......@@ -235,6 +235,10 @@ public class ImportService {
result.incrementMeticsCounter("duration", getDuration(this.endTimestamp, this.startTimestamp));
result.setOperationStatus(AtlasImportResult.OperationStatus.SUCCESS);
if (isMigrationMode(result.getRequest())) {
return;
}
auditsWriter.write(userName, result, startTimestamp, endTimestamp, importSource.getCreationOrder());
}
......@@ -250,7 +254,7 @@ public class ImportService {
private EntityImportStream createZipSource(AtlasImportRequest request, InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException {
try {
if (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION) || (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_FORMAT) &&
if (isMigrationMode(request) || (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_FORMAT) &&
request.getOptions().get(AtlasImportRequest.OPTION_KEY_FORMAT).equals(AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT))) {
LOG.info("ZipSource Format: ZipDirect: Size: {}", request.getOptions().get("size"));
return getZipDirectEntityImportStream(request, inputStream);
......@@ -288,4 +292,8 @@ public class ImportService {
exportRequest.getFetchTypeOptionValue().equalsIgnoreCase(AtlasExportRequest.FETCH_TYPE_INCREMENTAL) &&
exportRequest.getSkipLineageOptionValue();
}
private boolean isMigrationMode(AtlasImportRequest request) {
return request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION);
}
}
......@@ -19,35 +19,61 @@
package org.apache.atlas.repository.impexp;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.annotation.AtlasService;
import org.apache.atlas.model.impexp.MigrationStatus;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.GraphDBMigrator;
import org.apache.atlas.repository.migration.DataMigrationStatusService;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Singleton;
import static org.apache.atlas.AtlasConstants.ATLAS_MIGRATION_MODE_FILENAME;
@AtlasService
@Singleton
public class MigrationProgressService {
private static final Logger LOG = LoggerFactory.getLogger(MigrationProgressService.class);
private static final String FILE_EXTENSION_ZIP = ".zip";
public static final String MIGRATION_QUERY_CACHE_TTL = "atlas.migration.query.cache.ttlInSecs";
@VisibleForTesting
static long DEFAULT_CACHE_TTL_IN_SECS = 30 * 1000; // 30 secs
static long DEFAULT_CACHE_TTL_IN_SECS = 120 * 1000; // 30 secs
private final long cacheValidity;
private final GraphDBMigrator migrator;
private MigrationStatus cachedStatus;
private long cacheExpirationTime = 0;
private DataMigrationStatusService dataMigrationStatusService;
private boolean zipFileBasedMigrationImport;
@Inject
public MigrationProgressService(Configuration configuration, GraphDBMigrator migrator) {
this.migrator = migrator;
this.cacheValidity = (configuration != null) ? configuration.getLong(MIGRATION_QUERY_CACHE_TTL, DEFAULT_CACHE_TTL_IN_SECS) : DEFAULT_CACHE_TTL_IN_SECS;
this.zipFileBasedMigrationImport = isZipFileBasedMigrationEnabled();
initConditionallyZipFileBasedMigrator();
}
private void initConditionallyZipFileBasedMigrator() {
if (!zipFileBasedMigrationImport) {
return;
}
dataMigrationStatusService = new DataMigrationStatusService(AtlasGraphProvider.getGraphInstance());
dataMigrationStatusService.init(getFileNameFromMigrationProperty());
}
private boolean isZipFileBasedMigrationEnabled() {
return StringUtils.endsWithIgnoreCase(getFileNameFromMigrationProperty(), FILE_EXTENSION_ZIP);
}
public MigrationStatus getStatus() {
......@@ -58,7 +84,11 @@ public class MigrationProgressService {
long currentTime = System.currentTimeMillis();
if(resetCache(currentTime)) {
cachedStatus = migrator.getMigrationStatus();
if (this.zipFileBasedMigrationImport) {
cachedStatus = dataMigrationStatusService.getStatus();
} else {
cachedStatus = migrator.getMigrationStatus();
}
}
return cachedStatus;
......@@ -73,4 +103,12 @@ public class MigrationProgressService {
return ret;
}
public String getFileNameFromMigrationProperty() {
try {
return ApplicationProperties.get().getString(ATLAS_MIGRATION_MODE_FILENAME, StringUtils.EMPTY);
} catch (AtlasException e) {
return StringUtils.EMPTY;
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.migration;
import org.apache.atlas.model.migration.MigrationImportStatus;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.Iterator;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getEncodedProperty;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.encodePropertyKey;
import static org.apache.atlas.type.Constants.INTERNAL_PROPERTY_KEY_PREFIX;
public class DataMigrationStatusService {
private static final Logger LOG = LoggerFactory.getLogger(DataMigrationStatusService.class);
private final MigrationStatusVertexManagement migrationStatusVertexManagement;
private MigrationImportStatus status;
public DataMigrationStatusService() {
this.migrationStatusVertexManagement = new MigrationStatusVertexManagement(AtlasGraphProvider.getGraphInstance());
}
public DataMigrationStatusService(AtlasGraph atlasGraph) {
this.migrationStatusVertexManagement = new MigrationStatusVertexManagement(atlasGraph);
}
public void init(String fileToImport) {
this.status = new MigrationImportStatus(fileToImport);
if (!this.migrationStatusVertexManagement.exists(fileToImport)) {
return;
}
getCreate(fileToImport);
}
public MigrationImportStatus getCreate(String fileName) {
return getCreate(new MigrationImportStatus(fileName));
}
public MigrationImportStatus getCreate(MigrationImportStatus status) {
try {
this.status = this.migrationStatusVertexManagement.createOrUpdate(status);
} catch (Exception ex) {
LOG.error("DataMigrationStatusService: Setting status: {}: Resulted in error!", status.getName(), ex);
}
return this.status;
}
public MigrationImportStatus getStatus() {
if (this.status != null &&
StringUtils.isEmpty(this.status.getOperationStatus()) &&
this.migrationStatusVertexManagement.exists(this.status.getName())) {
return getCreate(this.status);
} else {
return this.status;
}
}
public MigrationImportStatus getByName(String name) {
return this.migrationStatusVertexManagement.findByName(name);
}
public void delete() {
if (this.status == null) {
return;
}
MigrationImportStatus status = getByName(this.status.getName());
this.migrationStatusVertexManagement.delete(status.getName());
this.status = null;
}
public void savePosition(String position) {
this.status.setCurrentIndex(Long.valueOf(position));
this.migrationStatusVertexManagement.updateVertexPartial(this.status);
}
public void setStatus(String status) {
this.status.setOperationStatus(status);
this.migrationStatusVertexManagement.updateVertexPartial(this.status);
}
private static class MigrationStatusVertexManagement {
public static final String PROPERTY_KEY_START_TIME = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "migration.startTime");
public static final String PROPERTY_KEY_SIZE = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "migration.size");
public static final String PROPERTY_KEY_POSITION = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "migration.position");
public static final String PROPERTY_KEY_STATUS = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "migration.status");
private AtlasGraph atlasGraph;
private AtlasVertex vertex;
public MigrationStatusVertexManagement(AtlasGraph atlasGraph) {
this.atlasGraph = atlasGraph;
}
public MigrationImportStatus createOrUpdate(MigrationImportStatus status) {
this.vertex = findByNameInternal(status.getName());
if (this.vertex == null) {
this.vertex = atlasGraph.addVertex();
LOG.info("MigrationStatusVertexManagement: Vertex created!");
updateVertex(this.vertex, status);
}
return to(this.vertex);
}
public boolean exists(String name) {
return findByNameInternal(name) != null;
}
public MigrationImportStatus findByName(String name) {
if (this.vertex != null) {
return to(this.vertex);
}
AtlasVertex v = findByNameInternal(name);
if (v == null) {
return null;
}
this.vertex = v;
LOG.info("MigrationImportStatus: Vertex found!");
return to(v);
}
public void delete(String name) {
try {
AtlasVertex vertex = findByNameInternal(name);
atlasGraph.removeVertex(vertex);
this.vertex = null;
} finally {
atlasGraph.commit();
}
}
private AtlasVertex findByNameInternal(String name) {
try {
String idxQueryString = String.format("%s\"%s\":\"%s\"", AtlasGraphUtilsV2.getIndexSearchPrefix(), Constants.GUID_PROPERTY_KEY, name);
AtlasIndexQuery idxQuery = atlasGraph.indexQuery(Constants.VERTEX_INDEX, idxQueryString);
Iterator<AtlasIndexQuery.Result<Object, Object>> results = idxQuery.vertices();
AtlasIndexQuery.Result<?, ?> qryResult = results.hasNext() ? results.next() : null;
if (qryResult != null) {
return qryResult.getVertex();
} else {
return null;
}
} catch (Exception e) {
LOG.error("MigrationStatusVertexManagement.findByNameInternal: Failed!", e);
} finally {
atlasGraph.commit();
}
return null;
}
public void updateVertexPartial(MigrationImportStatus status) {
try {
setEncodedProperty(vertex, PROPERTY_KEY_POSITION, status.getCurrentIndex());
} catch (Exception e) {
LOG.warn("Error updating status. Please rely on log messages.", e);
} finally {
atlasGraph.commit();
}
}
private void updateVertex(AtlasVertex vertex, MigrationImportStatus status) {
try {
setEncodedProperty(vertex, Constants.GUID_PROPERTY_KEY, status.getName());
setEncodedProperty(vertex, PROPERTY_KEY_START_TIME,
(status.getStartTime() != null)
? status.getStartTime().getTime()
: System.currentTimeMillis());
setEncodedProperty(vertex, PROPERTY_KEY_SIZE, status.getTotalCount());
setEncodedProperty(vertex, PROPERTY_KEY_POSITION, status.getCurrentIndex());
setEncodedProperty(vertex, PROPERTY_KEY_STATUS, status.getOperationStatus());
} catch (Exception ex) {
LOG.error("Error updating MigrationImportStatus vertex. Status may not be persisted correctly.", ex);
} finally {
atlasGraph.commit();
}
}
private static MigrationImportStatus to(AtlasVertex vertex) {
MigrationImportStatus ret = new MigrationImportStatus();
try {
ret.setName(getEncodedProperty(vertex, Constants.GUID_PROPERTY_KEY, String.class));
Long dateValue = getEncodedProperty(vertex, PROPERTY_KEY_START_TIME, Long.class);
if (dateValue != null) {
ret.setStartTime(new Date(dateValue));
}
Long size = getEncodedProperty(vertex, PROPERTY_KEY_SIZE, Long.class);
if (size != null) {
ret.setTotalCount(size);
}
Long position = getEncodedProperty(vertex, PROPERTY_KEY_POSITION, Long.class);
if (position != null) {
ret.setCurrentIndex(position);
}
ret.setOperationStatus(getEncodedProperty(vertex, PROPERTY_KEY_STATUS, String.class));
} catch (Exception ex) {
LOG.error("Error converting to MigrationImportStatus. Will proceed with default values.", ex);
}
return ret;
}
}
}
......@@ -23,6 +23,8 @@ import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.migration.MigrationImportStatus;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.impexp.ImportService;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.lang.StringUtils;
......@@ -41,7 +43,7 @@ public class ZipFileMigrationImporter implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ZipFileMigrationImporter.class);
private static final String APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS = "atlas.migration.mode.workers";
private static final String APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE = "atlas.migration.mode.batch.size";
private static final String APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE = "atlas.migration.mode.batch.size";
private static final String DEFAULT_NUMBER_OF_WORKERS = "4";
private static final String DEFAULT_BATCH_SIZE = "100";
private static final String ZIP_FILE_COMMENT_ENTITIES_COUNT = "entitiesCount";
......@@ -51,20 +53,24 @@ public class ZipFileMigrationImporter implements Runnable {
private final ImportService importService;
private final String fileToImport;
private DataMigrationStatusService dataMigrationStatusService;
private MigrationImportStatus migrationImportStatus;
public ZipFileMigrationImporter(ImportService importService, String fileName) {
this.importService = importService;
this.fileToImport = fileName;
this.dataMigrationStatusService = new DataMigrationStatusService(AtlasGraphProvider.getGraphInstance());
}
@Override
public void run() {
try {
FileWatcher fileWatcher = new FileWatcher(fileToImport);
fileWatcher.start();
detectFileToImport();
int streamSize = getStreamSizeFromComment(fileToImport);
performImport(new FileInputStream(new File(fileToImport)), streamSize);
migrationImportStatus = getCreateMigrationStatus(fileToImport, streamSize);
performImport(fileToImport, streamSize, Long.toString(migrationImportStatus.getCurrentIndex()));
dataMigrationStatusService.setStatus("DONE");
} catch (IOException e) {
LOG.error("Migration Import: IO Error!", e);
} catch (AtlasBaseException e) {
......@@ -72,6 +78,22 @@ public class ZipFileMigrationImporter implements Runnable {
}
}
private MigrationImportStatus getCreateMigrationStatus(String fileName, int streamSize) {
MigrationImportStatus status = new MigrationImportStatus(fileName);
status.setTotalCount(streamSize);
MigrationImportStatus statusRetrieved = dataMigrationStatusService.getCreate(status);
LOG.info("DataMigrationStatusService: Position: {}", statusRetrieved.getCurrentIndex());
dataMigrationStatusService.setStatus("STARTED");
return statusRetrieved;
}
private void detectFileToImport() throws IOException {
FileWatcher fileWatcher = new FileWatcher(fileToImport);
fileWatcher.start();
}
private int getStreamSizeFromComment(String fileToImport) {
int ret = 1;
try {
......@@ -99,13 +121,13 @@ public class ZipFileMigrationImporter implements Runnable {
return entitiesCount;
}
private void performImport(InputStream fs, int streamSize) throws AtlasBaseException {
private void performImport(String fileToImport, int streamSize, String startPosition) throws AtlasBaseException {
try {
LOG.info("Migration Import: {}: Starting...", fileToImport);
LOG.info("Migration Import: {}: Starting at: {}...", fileToImport, startPosition);
InputStream fs = new FileInputStream(new File(fileToImport));
RequestContext.get().setUser(getUserNameFromEnvironment(), null);
importService.run(fs, getImportRequest(streamSize),
importService.run(fs, getImportRequest(fileToImport, streamSize, startPosition),
getUserNameFromEnvironment(),
InetAddress.getLocalHost().getHostName(),
InetAddress.getLocalHost().getHostAddress());
......@@ -122,16 +144,19 @@ public class ZipFileMigrationImporter implements Runnable {
return System.getProperty(ENV_USER_NAME);
}
private AtlasImportRequest getImportRequest(int streamSize) throws AtlasException {
private AtlasImportRequest getImportRequest(String fileToImport, int streamSize, String position) throws AtlasException {
AtlasImportRequest request = new AtlasImportRequest();
request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION_FILE_NAME, fileToImport);
request.setSizeOption(streamSize);
request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION, "true");
request.setOption(AtlasImportRequest.OPTION_KEY_NUM_WORKERS, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS, DEFAULT_NUMBER_OF_WORKERS));
request.setOption(AtlasImportRequest.OPTION_KEY_BATCH_SIZE, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE, DEFAULT_BATCH_SIZE));
request.setOption(AtlasImportRequest.START_POSITION_KEY, (StringUtils.isEmpty(position) ? "0" : position));
return request;
}
private String getPropertyValue(String property, String defaultValue) throws AtlasException {
return ApplicationProperties.get().getString(property, defaultValue);
}
......
......@@ -20,12 +20,14 @@ package org.apache.atlas.repository.store.graph.v2.bulkimport;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.converters.AtlasFormatConverters;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.migration.DataMigrationStatusService;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
......@@ -65,10 +67,12 @@ public class MigrationImport extends ImportStrategy {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "importResult should contain request");
}
int index = 0;
DataMigrationStatusService dataMigrationStatusService = createMigrationStatusService(importResult);
long index = 0;
int streamSize = entityStream.size();
EntityMutationResponse ret = new EntityMutationResponse();
EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, importResult);
EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, importResult, dataMigrationStatusService);
try {
LOG.info("Migration Import: Size: {}: Starting...", streamSize);
......@@ -85,14 +89,23 @@ public class MigrationImport extends ImportStrategy {
return ret;
}
private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph, AtlasImportResult importResult) {
private DataMigrationStatusService createMigrationStatusService(AtlasImportResult importResult) {
DataMigrationStatusService dataMigrationStatusService = new DataMigrationStatusService();
dataMigrationStatusService.init(importResult.getRequest().getOptions().get(AtlasImportRequest.OPTION_KEY_MIGRATION_FILE_NAME));
return dataMigrationStatusService;
}
private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph,
AtlasImportResult importResult,
DataMigrationStatusService dataMigrationStatusService) {
atlasGraph = threadedAtlasGraph;
int batchSize = importResult.getRequest().getOptionKeyBatchSize();
int numWorkers = getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers());
EntityConsumerBuilder consumerBuilder =
new EntityConsumerBuilder(threadedAtlasGraph, entityStore, entityGraphRetriever, typeRegistry, batchSize);
return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult);
return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult, dataMigrationStatusService);
}
private static int getNumWorkers(int numWorkersFromOptions) {
......
......@@ -155,7 +155,10 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith
LOG.error("Rollback: Done! Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount);
pause(retryCount);
LOG.warn("Commit error! Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount, ex);
String exceptionClass = ex.getClass().getSimpleName();
if (!exceptionClass.equals("JanusGraphException") && !exceptionClass.equals("PermanentLockingException")) {
LOG.warn("Commit error! Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount, ex);
}
retryProcessEntity(retryCount);
}
......
......@@ -23,6 +23,7 @@ import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.pc.StatusReporter;
import org.apache.atlas.pc.WorkItemBuilder;
import org.apache.atlas.pc.WorkItemManager;
import org.apache.atlas.repository.migration.DataMigrationStatusService;
import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
import org.apache.commons.lang.StringUtils;
......@@ -32,24 +33,28 @@ import org.slf4j.LoggerFactory;
public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManager {
private static final Logger LOG = LoggerFactory.getLogger(EntityCreationManager.class);
private static final String WORKER_PREFIX = "migration-import";
private static final long STATUS_REPORT_TIMEOUT_DURATION = 1 * 60 * 1000; // 5 min
private final StatusReporter<String, String> statusReporter;
private final AtlasImportResult importResult;
private final DataMigrationStatusService dataMigrationStatusService;
private String currentTypeName;
private float currentPercent;
private EntityImportStream entityImportStream;
public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, AtlasImportResult importResult) {
public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, AtlasImportResult importResult, DataMigrationStatusService dataMigrationStatusService) {
super(builder, WORKER_PREFIX, batchSize, numWorkers, true);
this.importResult = importResult;
this.dataMigrationStatusService = dataMigrationStatusService;
this.statusReporter = new StatusReporter<>();
this.statusReporter = new StatusReporter<>(STATUS_REPORT_TIMEOUT_DURATION);
}
public int read(EntityImportStream entityStream) {
int currentIndex = 0;
public long read(EntityImportStream entityStream) {
long currentIndex = this.dataMigrationStatusService.getStatus().getCurrentIndex();
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo;
this.entityImportStream = entityStream;
this.dataMigrationStatusService.setStatus("IN_PROGRESS");
while ((entityWithExtInfo = entityStream.getNextEntityWithExtInfo()) != null) {
AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
if (entity == null) {
......@@ -66,7 +71,7 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag
return currentIndex;
}
private void produce(int currentIndex, String typeName, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
private void produce(long currentIndex, String typeName, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
String previousTypeName = getCurrentTypeName();
if (StringUtils.isNotEmpty(typeName)
......@@ -104,7 +109,9 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag
}
importResult.incrementMeticsCounter(split[0]);
this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(split[1]), this.entityImportStream.size(), getCurrentPercent());
String currentPosition = split[1];
dataMigrationStatusService.savePosition(currentPosition);
this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(currentPosition), this.entityImportStream.size(), getCurrentPercent());
}
private static float updateImportMetrics(String typeNameGuid, int currentIndex, int streamSize, float currentPercent) {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import com.google.inject.Inject;
import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.migration.MigrationImportStatus;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.migration.DataMigrationStatusService;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.util.Date;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
@Guice(modules = TestModules.TestOnlyModule.class)
public class DataMigrationStatusServiceTest {
@Inject
AtlasGraph atlasGraph;
@Test
public void createUpdateDelete() throws AtlasBaseException {
DataMigrationStatusService dataMigrationStatusService = new DataMigrationStatusService(atlasGraph);
MigrationImportStatus expected = new MigrationImportStatus("/tmp/defg.zip");
expected.setTotalCount(3333);
expected.setCurrentIndex(20);
expected.setStartTime(new Date());
MigrationImportStatus ret = dataMigrationStatusService.getCreate(expected);
assertNotNull(ret);
assertEquals(ret.getName(), expected.getName());
assertEquals(ret.getStartTime(), expected.getStartTime());
assertEquals(ret.getTotalCount(), expected.getTotalCount());
assertEquals(ret.getCurrentIndex(), expected.getCurrentIndex());
dataMigrationStatusService.savePosition("100");
assertNotNull(dataMigrationStatusService.getStatus());
assertNotNull(dataMigrationStatusService.getStatus().getCurrentIndex(), "100");
assertNotNull(dataMigrationStatusService.getCreate(expected).getCurrentIndex(), "100");
dataMigrationStatusService.delete();
assertNull(dataMigrationStatusService.getStatus());
assertNull(dataMigrationStatusService.getByName(ret.getName()));
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment