Commit b02443ec by nixonrodrigues

Revert "DataMigration: Automatic resume."

This reverts commit 54042d35.
parent c99e15b3
......@@ -256,56 +256,6 @@
]
},
{
"name": "__MigrationImportStatus",
"superTypes": [
"__internal"
],
"serviceType": "atlas_core",
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "name",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": true
},
{
"name": "size",
"typeName": "int",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": true,
"isUnique": false
},
{
"name": "position",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": true,
"isUnique": false
},
{
"name": "startTime",
"typeName": "long",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": true,
"isUnique": false
},
{
"name": "endTime",
"typeName": "long",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": true,
"isUnique": false
}
]
},
{
"name": "__AtlasUserSavedSearch",
"superTypes": [
"__internal"
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.model.migration;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.atlas.model.AtlasBaseModelObject;
import java.io.Serializable;
import java.util.Date;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class MigrationImportStatus extends AtlasBaseModelObject implements Serializable {
private String name;
private int size;
private long startTime;
private long endTime;
private String position;
public MigrationImportStatus() {
}
public MigrationImportStatus(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getSize() {
return size;
}
public void setSize(int size) {
this.size = size;
}
public long getStartTime() {
return startTime;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
public long getEndTime() {
return endTime;
}
public void setEndTime(long endTime) {
this.endTime = endTime;
}
public void setPosition(String position) {
this.position = position;
}
public String getPosition() {
return this.position;
}
@Override
protected StringBuilder toString(StringBuilder sb) {
sb.append(", name=").append(name);
sb.append(", size=").append(size);
sb.append(", startTime=").append(startTime);
sb.append(", endTime=").append(endTime);
return sb;
}
}
......@@ -60,14 +60,14 @@ public class DataMigrationService implements Service {
@Inject
public DataMigrationService(GraphDBMigrator migrator, AtlasTypeDefStore typeDefStore, Configuration configuration,
GraphBackedSearchIndexer indexer, AtlasTypeDefStoreInitializer storeInitializer,
AtlasTypeRegistry typeRegistry, ImportService importService, DataMigrationStatusService dataMigrationStatusService) {
AtlasTypeRegistry typeRegistry, ImportService importService) {
this.configuration = configuration;
String fileName = getFileName();
boolean zipFileBasedMigrationImport = StringUtils.endsWithIgnoreCase(fileName, FILE_EXTENSION_ZIP);
this.thread = (zipFileBasedMigrationImport)
? new Thread(new ZipFileMigrationImporter(importService, fileName, dataMigrationStatusService), "zipFileBasedMigrationImporter")
? new Thread(new ZipFileMigrationImporter(importService, fileName), "zipFileBasedMigrationImporter")
: new Thread(new FileImporter(migrator, typeDefStore, typeRegistry, storeInitializer, fileName, indexer));
}
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.migration;
import org.apache.atlas.annotation.AtlasService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.migration.MigrationImportStatus;
import org.apache.atlas.repository.ogm.DataAccess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
@AtlasService
public class DataMigrationStatusService {
private static final Logger LOG = LoggerFactory.getLogger(DataMigrationStatusService.class);
private final DataAccess dataAccess;
private MigrationImportStatus status;
@Inject
public DataMigrationStatusService(DataAccess dataAccess) {
this.dataAccess = dataAccess;
}
public MigrationImportStatus getCreate(MigrationImportStatus status) {
try {
this.status = this.dataAccess.load(status);
this.status.setSize(status.getSize());
this.status.setStartTime(status.getStartTime());
this.status = dataAccess.save(this.status);
} catch (Exception ex) {
LOG.info("DataMigrationStatusService: Setting status: {}...", status.getName());
try {
this.status = dataAccess.save(status);
} catch (AtlasBaseException e) {
LOG.info("DataMigrationStatusService: Error saving status: {}...", status.getName());
}
}
return this.status;
}
public MigrationImportStatus get() {
return this.status;
}
public MigrationImportStatus getByName(String name) throws AtlasBaseException {
MigrationImportStatus status = new MigrationImportStatus(name);
return dataAccess.load(status);
}
public void deleteStatus() throws AtlasBaseException {
if (this.status == null) {
return;
}
MigrationImportStatus status = getByName(this.status.getName());
dataAccess.delete(status.getGuid());
}
public void savePosition(String position) {
this.status.setPosition(position);
try {
this.dataAccess.saveNoLoad(this.status);
} catch (AtlasBaseException e) {
LOG.error("Error saving status: {}", position, e);
}
}
public void setEndTime() {
this.status.setEndTime(System.currentTimeMillis());
try {
this.dataAccess.saveNoLoad(this.status);
} catch (AtlasBaseException e) {
LOG.error("Error saving status: endTime", e);
}
}
public MigrationImportStatus createGet(String fileToImport, int streamSize) {
MigrationImportStatus status = new MigrationImportStatus(fileToImport);
status.setSize(streamSize);
return getCreate(status);
}
}
......@@ -23,7 +23,6 @@ import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.migration.MigrationImportStatus;
import org.apache.atlas.repository.impexp.ImportService;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
......@@ -51,23 +50,20 @@ public class ZipFileMigrationImporter implements Runnable {
private final ImportService importService;
private final String fileToImport;
private DataMigrationStatusService dataMigrationStatusService;
public ZipFileMigrationImporter(ImportService importService, String fileName, DataMigrationStatusService dataMigrationStatusService) {
public ZipFileMigrationImporter(ImportService importService, String fileName) {
this.importService = importService;
this.fileToImport = fileName;
this.dataMigrationStatusService = dataMigrationStatusService;
}
@Override
public void run() {
try {
detectFileToImport();
FileWatcher fileWatcher = new FileWatcher(fileToImport);
fileWatcher.start();
int streamSize = getStreamSizeFromComment(fileToImport);
MigrationImportStatus status = dataMigrationStatusService.createGet(fileToImport, streamSize);
performImport(new FileInputStream(new File(fileToImport)), status.getPosition(), streamSize);
dataMigrationStatusService.setEndTime();
performImport(new FileInputStream(new File(fileToImport)), streamSize);
} catch (IOException e) {
LOG.error("Migration Import: IO Error!", e);
} catch (AtlasBaseException e) {
......@@ -75,11 +71,6 @@ public class ZipFileMigrationImporter implements Runnable {
}
}
private void detectFileToImport() throws IOException {
FileWatcher fileWatcher = new FileWatcher(fileToImport);
fileWatcher.start();
}
private int getStreamSizeFromComment(String fileToImport) {
int ret = 1;
try {
......@@ -105,13 +96,13 @@ public class ZipFileMigrationImporter implements Runnable {
return Integer.valueOf(s);
}
private void performImport(InputStream fs, String position, int streamSize) throws AtlasBaseException {
private void performImport(InputStream fs, int streamSize) throws AtlasBaseException {
try {
LOG.info("Migration Import: {}: Position: {}: Starting...", fileToImport, position);
LOG.info("Migration Import: {}: Starting...", fileToImport);
RequestContext.get().setUser(getUserNameFromEnvironment(), null);
importService.run(fs, getImportRequest(streamSize, position),
importService.run(fs, getImportRequest(streamSize),
getUserNameFromEnvironment(),
InetAddress.getLocalHost().getHostName(),
InetAddress.getLocalHost().getHostAddress());
......@@ -121,7 +112,6 @@ public class ZipFileMigrationImporter implements Runnable {
throw new AtlasBaseException(ex);
} finally {
LOG.info("Migration Import: {}: Done!", fileToImport);
dataMigrationStatusService.deleteStatus();
}
}
......@@ -129,19 +119,14 @@ public class ZipFileMigrationImporter implements Runnable {
return System.getProperty(ENV_USER_NAME);
}
private AtlasImportRequest getImportRequest(int streamSize, String position) throws AtlasException {
private AtlasImportRequest getImportRequest(int streamSize) throws AtlasException {
AtlasImportRequest request = new AtlasImportRequest();
request.setSizeOption(streamSize);
request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION, "true");
request.setOption(AtlasImportRequest.OPTION_KEY_NUM_WORKERS, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS, DEFAULT_NUMBER_OF_WORKDERS));
request.setOption(AtlasImportRequest.OPTION_KEY_BATCH_SIZE, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE, DEFAULT_BATCH_SIZE));
request.setOption(AtlasImportRequest.START_POSITION_KEY,
(StringUtils.isEmpty(position)
? Integer.toString(MIGRATION_IMPORT_START_POSITION.getInt())
: position)
);
request.setOption(AtlasImportRequest.START_POSITION_KEY, Integer.toString(MIGRATION_IMPORT_START_POSITION.getInt()));
return request;
}
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.ogm;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.migration.MigrationImportStatus;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@Component
public class MigrationImportStatusDTO extends AbstractDataTransferObject<MigrationImportStatus> {
public static final String PROPERTY_NAME = "name";
public static final String PROPERTY_SIZE = "size";
public static final String PROPERTY_POSITION = "position";
public static final String PROPERTY_START_TIME = "startTime";
public static final String PROPERTY_END_TIME = "endTime";
public static final String PROPERTY_ADDITIONAL_INFO = "additionalInfo";
private static final Set<String> ATTRIBUTE_NAMES = new HashSet<>(Arrays.asList(PROPERTY_NAME,
PROPERTY_SIZE, PROPERTY_POSITION,
PROPERTY_START_TIME, PROPERTY_END_TIME,
PROPERTY_ADDITIONAL_INFO));
@Inject
public MigrationImportStatusDTO(AtlasTypeRegistry typeRegistry) {
super(typeRegistry, MigrationImportStatus.class, Constants.INTERNAL_PROPERTY_KEY_PREFIX + MigrationImportStatus.class.getSimpleName());
}
public static Set<String> getAttributes() {
return ATTRIBUTE_NAMES;
}
public static MigrationImportStatus from(String guid, Map<String,Object> attributes) {
MigrationImportStatus entry = new MigrationImportStatus();
entry.setGuid(guid);
entry.setName((String) attributes.get(PROPERTY_NAME));
entry.setSize((int) attributes.get(PROPERTY_SIZE));
entry.setPosition((String) attributes.get(PROPERTY_POSITION));
entry.setStartTime((long) attributes.get(PROPERTY_START_TIME));
entry.setEndTime((long) attributes.get(PROPERTY_END_TIME));
return entry;
}
@Override
public MigrationImportStatus from(AtlasEntity entity) {
return from(entity.getGuid(), entity.getAttributes());
}
@Override
public MigrationImportStatus from(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
return from(entityWithExtInfo.getEntity());
}
@Override
public AtlasEntity toEntity(MigrationImportStatus obj) {
AtlasEntity entity = getDefaultAtlasEntity(obj);
entity.setAttribute(PROPERTY_NAME, obj.getName());
entity.setAttribute(PROPERTY_SIZE, obj.getSize());
entity.setAttribute(PROPERTY_POSITION, obj.getPosition());
entity.setAttribute(PROPERTY_START_TIME, obj.getStartTime());
entity.setAttribute(PROPERTY_END_TIME, obj.getEndTime());
return entity;
}
@Override
public AtlasEntity.AtlasEntityWithExtInfo toEntityWithExtInfo(MigrationImportStatus obj) throws AtlasBaseException {
return new AtlasEntity.AtlasEntityWithExtInfo(toEntity(obj));
}
@Override
public Map<String, Object> getUniqueAttributes(final MigrationImportStatus obj) {
return Collections.singletonMap(PROPERTY_NAME, obj.getName());
}
}
......@@ -26,8 +26,8 @@ import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.migration.DataMigrationStatusService;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.BulkImporter;
import org.apache.atlas.repository.store.graph.v2.bulkimport.ImportStrategy;
......@@ -53,13 +53,11 @@ public class BulkImporterImpl implements BulkImporter {
private final AtlasEntityStore entityStore;
private AtlasTypeRegistry typeRegistry;
private DataMigrationStatusService dataMigrationStatusService;
@Inject
public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry, DataMigrationStatusService dataMigrationStatusService) {
public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
this.entityStore = entityStore;
this.typeRegistry = typeRegistry;
this.dataMigrationStatusService = dataMigrationStatusService;
}
@Override
......@@ -67,7 +65,7 @@ public class BulkImporterImpl implements BulkImporter {
ImportStrategy importStrategy =
(importResult.getRequest().getOptions() != null &&
importResult.getRequest().getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION))
? new MigrationImport(new AtlasGraphProvider(), this.typeRegistry, dataMigrationStatusService)
? new MigrationImport(new AtlasGraphProvider(), this.typeRegistry)
: new RegularImport(this.entityStore, this.typeRegistry);
LOG.info("BulkImportImpl: {}", importStrategy.getClass().getSimpleName());
......
......@@ -26,7 +26,6 @@ import org.apache.atlas.repository.converters.AtlasFormatConverters;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.migration.DataMigrationStatusService;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
......@@ -45,16 +44,14 @@ public class MigrationImport extends ImportStrategy {
private static final Logger LOG = LoggerFactory.getLogger(MigrationImport.class);
private final AtlasTypeRegistry typeRegistry;
private final DataMigrationStatusService dataMigrationStatusService;
private AtlasGraph atlasGraph;
private EntityGraphRetriever entityGraphRetriever;
private EntityGraphMapper entityGraphMapper;
private AtlasEntityStore entityStore;
public MigrationImport(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry, DataMigrationStatusService dataMigrationStatusService) {
public MigrationImport(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry) {
this.typeRegistry = typeRegistry;
setupEntityStore(atlasGraphProvider, typeRegistry);
this.dataMigrationStatusService = dataMigrationStatusService;
LOG.info("MigrationImport: Using bulkLoading...");
}
......@@ -70,11 +67,11 @@ public class MigrationImport extends ImportStrategy {
int index = 0;
int streamSize = entityStream.size();
EntityMutationResponse ret = new EntityMutationResponse();
EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, dataMigrationStatusService, importResult, streamSize);
EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, importResult, streamSize);
try {
LOG.info("Migration Import: Size: {}: Starting...", streamSize);
index = creationManager.read(entityStream, importResult.getRequest().getStartPosition());
index = creationManager.read(entityStream);
creationManager.drain();
creationManager.extractResults();
} catch (Exception ex) {
......@@ -87,14 +84,14 @@ public class MigrationImport extends ImportStrategy {
return ret;
}
private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph, DataMigrationStatusService dataMigrationStatusService, AtlasImportResult importResult, int streamSize) {
private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph, AtlasImportResult importResult, int streamSize) {
int batchSize = importResult.getRequest().getOptionKeyBatchSize();
int numWorkers = getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers());
EntityConsumerBuilder consumerBuilder =
new EntityConsumerBuilder(threadedAtlasGraph, entityStore, entityGraphRetriever, typeRegistry, batchSize);
return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, dataMigrationStatusService, importResult, streamSize);
return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult, streamSize);
}
private static int getNumWorkers(int numWorkersFromOptions) {
......
......@@ -22,7 +22,6 @@ import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.pc.WorkItemBuilder;
import org.apache.atlas.pc.WorkItemManager;
import org.apache.atlas.repository.migration.DataMigrationStatusService;
import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
import org.apache.commons.lang.StringUtils;
......@@ -31,27 +30,25 @@ import org.slf4j.LoggerFactory;
public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManager {
private static final Logger LOG = LoggerFactory.getLogger(EntityCreationManager.class);
private static final long STATUS_REPORT_TIMEOUT_DURATION = 5 * 60 * 1000; // 5 min
private static final String WORKER_PREFIX = "migration-import";
private final StatusReporter<String, String> statusReporter;
private final DataMigrationStatusService dataMigrationStatusService;
private final AtlasImportResult importResult;
private final int streamSize;
private final long STATUS_REPORT_TIMEOUT_DURATION = 5 * 60 * 1000; // 5 min
private String currentTypeName;
private float currentPercent;
public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, DataMigrationStatusService dataMigrationStatusService, AtlasImportResult importResult, int streamSize) {
public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, AtlasImportResult importResult, int streamSize) {
super(builder, WORKER_PREFIX, batchSize, numWorkers, true);
this.dataMigrationStatusService = dataMigrationStatusService;
this.importResult = importResult;
this.streamSize = streamSize;
this.statusReporter = new StatusReporter<>(STATUS_REPORT_TIMEOUT_DURATION);
}
public int read(EntityImportStream entityStream, String startPosition) {
int currentIndex = StringUtils.isEmpty(startPosition) ? 0 : Integer.valueOf(startPosition);
public int read(EntityImportStream entityStream) {
int currentIndex = 0;
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo;
while ((entityWithExtInfo = entityStream.getNextEntityWithExtInfo()) != null) {
AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
......@@ -106,10 +103,8 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag
return;
}
String currentPosition = split[1];
dataMigrationStatusService.savePosition(currentPosition);
importResult.incrementMeticsCounter(split[0]);
this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(currentPosition), getStreamSize(), getCurrentPercent());
this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(split[1]), getStreamSize(), getCurrentPercent());
}
private static float updateImportMetrics(String typeNameGuid, int currentIndex, int streamSize, float currentPercent) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment