Commit dff690a0 by Ashutosh Mestry

ATLAS-3674: ZipFileMigrationImporter: Set Shell Entity Creation.

parent 784b606d
...@@ -175,12 +175,8 @@ public class ZipSourceDirect implements EntityImportStream { ...@@ -175,12 +175,8 @@ public class ZipSourceDirect implements EntityImportStream {
@Override @Override
public void setPosition(int index) { public void setPosition(int index) {
try {
for (int i = 0; i < index; i++) { for (int i = 0; i < index; i++) {
moveNextEntry(); moveNext();
}
} catch (IOException e) {
LOG.error("Error setting position: {}. Position may be beyond the stream size.", index);
} }
} }
......
...@@ -99,8 +99,8 @@ public class DataMigrationStatusService { ...@@ -99,8 +99,8 @@ public class DataMigrationStatusService {
this.status = null; this.status = null;
} }
public void savePosition(String position) { public void savePosition(Long position) {
this.status.setCurrentIndex(Long.valueOf(position)); this.status.setCurrentIndex(position);
this.migrationStatusVertexManagement.updateVertexPartial(this.status); this.migrationStatusVertexManagement.updateVertexPartial(this.status);
} }
......
...@@ -90,6 +90,8 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith ...@@ -90,6 +90,8 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith
private void processEntity(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, long currentCount) { private void processEntity(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, long currentCount) {
try { try {
RequestContext.get().setImportInProgress(true); RequestContext.get().setImportInProgress(true);
RequestContext.get().setCreateShellEntityForNonExistingReference(true);
AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null); AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null);
LOG.debug("Processing: {}", currentCount); LOG.debug("Processing: {}", currentCount);
...@@ -163,7 +165,10 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith ...@@ -163,7 +165,10 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith
} }
private void retryProcessEntity(int retryCount) { private void retryProcessEntity(int retryCount) {
if (LOG.isDebugEnabled() || retryCount > 1) {
LOG.info("Replaying: Starting!: Buffer: {}: Retry count: {}", entityBuffer.size(), retryCount); LOG.info("Replaying: Starting!: Buffer: {}: Retry count: {}", entityBuffer.size(), retryCount);
}
for (AtlasEntity.AtlasEntityWithExtInfo e : entityBuffer) { for (AtlasEntity.AtlasEntityWithExtInfo e : entityBuffer) {
processEntity(e, counter.get()); processEntity(e, counter.get());
} }
......
...@@ -35,7 +35,7 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag ...@@ -35,7 +35,7 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag
private static final String WORKER_PREFIX = "migration-import"; private static final String WORKER_PREFIX = "migration-import";
private static final long STATUS_REPORT_TIMEOUT_DURATION = 1 * 60 * 1000; // 5 min private static final long STATUS_REPORT_TIMEOUT_DURATION = 1 * 60 * 1000; // 5 min
private final StatusReporter<String, String> statusReporter; private final StatusReporter<String, Long> statusReporter;
private final AtlasImportResult importResult; private final AtlasImportResult importResult;
private final DataMigrationStatusService dataMigrationStatusService; private final DataMigrationStatusService dataMigrationStatusService;
private String currentTypeName; private String currentTypeName;
...@@ -51,7 +51,7 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag ...@@ -51,7 +51,7 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag
} }
public long read(EntityImportStream entityStream) { public long read(EntityImportStream entityStream) {
long currentIndex = this.dataMigrationStatusService.getStatus().getCurrentIndex(); long currentIndex = entityStream.getPosition();
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo; AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo;
this.entityImportStream = entityStream; this.entityImportStream = entityStream;
this.dataMigrationStatusService.setStatus("IN_PROGRESS"); this.dataMigrationStatusService.setStatus("IN_PROGRESS");
...@@ -68,6 +68,8 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag ...@@ -68,6 +68,8 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag
break; break;
} }
} }
this.dataMigrationStatusService.setStatus("DONE");
return currentIndex; return currentIndex;
} }
...@@ -83,7 +85,7 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag ...@@ -83,7 +85,7 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag
} }
setCurrentTypeName(typeName); setCurrentTypeName(typeName);
statusReporter.produced(entityWithExtInfo.getEntity().getGuid(), String.format("%s:%s", entityWithExtInfo.getEntity().getTypeName(), currentIndex)); statusReporter.produced(entityWithExtInfo.getEntity().getGuid(), currentIndex);
super.checkProduce(entityWithExtInfo); super.checkProduce(entityWithExtInfo);
extractResults(); extractResults();
} }
...@@ -98,25 +100,19 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag ...@@ -98,25 +100,19 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag
} }
private void logStatus() { private void logStatus() {
String ack = statusReporter.ack(); Long ack = statusReporter.ack();
if (StringUtils.isEmpty(ack)) { if (ack == null) {
return;
}
String[] split = ack.split(":");
if (split.length == 0 || split.length < 2) {
return; return;
} }
importResult.incrementMeticsCounter(split[0]); importResult.incrementMeticsCounter(getCurrentTypeName());
String currentPosition = split[1]; dataMigrationStatusService.savePosition(ack);
dataMigrationStatusService.savePosition(currentPosition); this.currentPercent = updateImportMetrics(getCurrentTypeName(), ack, this.entityImportStream.size(), getCurrentPercent());
this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(currentPosition), this.entityImportStream.size(), getCurrentPercent());
} }
private static float updateImportMetrics(String typeNameGuid, int currentIndex, int streamSize, float currentPercent) { private static float updateImportMetrics(String typeNameGuid, long currentIndex, int streamSize, float currentPercent) {
String lastEntityImported = String.format("entity:last-imported:%s:(%s)", typeNameGuid, currentIndex); String lastEntityImported = String.format("entity:last-imported:%s:(%s)", typeNameGuid, currentIndex);
return BulkImporterImpl.updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported); return BulkImporterImpl.updateImportProgress(LOG, (int) currentIndex, streamSize, currentPercent, lastEntityImported);
} }
private String getCurrentTypeName() { private String getCurrentTypeName() {
......
...@@ -54,7 +54,7 @@ public class DataMigrationStatusServiceTest { ...@@ -54,7 +54,7 @@ public class DataMigrationStatusServiceTest {
assertEquals(ret.getTotalCount(), expected.getTotalCount()); assertEquals(ret.getTotalCount(), expected.getTotalCount());
assertEquals(ret.getCurrentIndex(), expected.getCurrentIndex()); assertEquals(ret.getCurrentIndex(), expected.getCurrentIndex());
dataMigrationStatusService.savePosition("100"); dataMigrationStatusService.savePosition(100l);
assertNotNull(dataMigrationStatusService.getStatus()); assertNotNull(dataMigrationStatusService.getStatus());
assertNotNull(dataMigrationStatusService.getStatus().getCurrentIndex(), "100"); assertNotNull(dataMigrationStatusService.getStatus().getCurrentIndex(), "100");
assertNotNull(dataMigrationStatusService.getCreate(expected).getCurrentIndex(), "100"); assertNotNull(dataMigrationStatusService.getCreate(expected).getCurrentIndex(), "100");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment