Commit f5e0d528 by ashutoshm Committed by Madhan Neethiraj

ATLAS-1851: update import API to support resume from a specific entity/position

parent 4eb8bf23
......@@ -19,6 +19,7 @@ package org.apache.atlas.model.impexp;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.codehaus.jackson.annotate.JsonAutoDetect;
import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize;
......@@ -41,6 +42,8 @@ import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONL
public class AtlasImportRequest implements Serializable {
private static final long serialVersionUID = 1L;
public static final String TRANSFORMS_KEY = "transforms";
private static final String START_POSITION_KEY = "startPosition";
private static final String START_GUID_KEY = "startGuid";
private Map<String, String> options;
......@@ -70,4 +73,22 @@ public class AtlasImportRequest implements Serializable {
public String toString() {
return toString(new StringBuilder()).toString();
}
@JsonIgnore
public String getStartGuid() {
if (this.options == null || !this.options.containsKey(START_GUID_KEY)) {
return null;
}
return (String) this.options.get(START_GUID_KEY);
}
@JsonIgnore
public String getStartPosition() {
if (this.options == null || !this.options.containsKey(START_POSITION_KEY)) {
return null;
}
return (String) this.options.get(START_GUID_KEY);
}
}
......@@ -71,8 +71,10 @@ public class ImportService {
source.setImportTransform(ImportTransforms.fromJson(transforms));
startTimestamp = System.currentTimeMillis();
processTypes(source.getTypesDef(), result);
setStartPosition(request, source);
processEntities(source, result);
result.setOperationStatus(AtlasImportResult.OperationStatus.SUCCESS);
} catch (AtlasBaseException excp) {
LOG.error("import(user={}, from={}): failed", userName, requestingIP, excp);
......@@ -90,6 +92,14 @@ public class ImportService {
return result;
}
private void setStartPosition(AtlasImportRequest request, ZipSource source) throws AtlasBaseException {
if(request.getStartGuid() != null) {
source.setPositionUsingEntityGuid(request.getStartGuid());
} else if(request.getStartPosition() != null) {
source.setPosition(Integer.parseInt(request.getStartPosition()));
}
}
public AtlasImportResult run(AtlasImportRequest request, String userName, String hostName, String requestingIP)
throws AtlasBaseException {
String fileName = (String) request.getOptions().get("FILENAME");
......
......@@ -23,6 +23,7 @@ import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
import org.apache.commons.lang.StringUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
......@@ -49,6 +50,7 @@ public class ZipSource implements EntityImportStream {
private Iterator<String> iterator;
private Map<String, String> guidEntityJsonMap;
private ImportTransforms importTransform;
private int currentPosition;
public ZipSource(InputStream inputStream) throws IOException {
this(inputStream, null);
......@@ -190,6 +192,7 @@ public class ZipSource implements EntityImportStream {
@Override
public AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
try {
currentPosition++;
return getEntityWithExtInfo(this.iterator.next());
} catch (AtlasBaseException e) {
e.printStackTrace();
......@@ -227,8 +230,43 @@ public class ZipSource implements EntityImportStream {
return null;
}
public int size() {
return this.creationOrder.size();
}
@Override
public void onImportComplete(String guid) {
guidEntityJsonMap.remove(guid);
}
@Override
public void setPosition(int index) {
currentPosition = index;
reset();
for (int i = 0; i < creationOrder.size() && i <= index; i++) {
iterator.next();
}
}
@Override
public void setPositionUsingEntityGuid(String guid) {
if(StringUtils.isBlank(guid)) {
return;
}
int index = creationOrder.indexOf(guid);
if (index == -1) {
return;
}
setPosition(index);
}
@Override
public int getPosition() {
return currentPosition;
}
}
......@@ -160,7 +160,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
ret.setGuidAssignments(new HashMap<String, String>());
Set<String> processedGuids = new HashSet<>();
int progressReportedAtCount = 0;
int streamSize = entityStream.size();
float currentPercent = 0f;
while (entityStream.hasNext()) {
AtlasEntityWithExtInfo entityWithExtInfo = entityStream.getNextEntityWithExtInfo();
......@@ -173,16 +174,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, entityStream);
EntityMutationResponse resp = createOrUpdate(oneEntityStream, false, true);
updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
if ((processedGuids.size() - progressReportedAtCount) > 1000) {
progressReportedAtCount = processedGuids.size();
LOG.info("bulkImport(): in progress.. number of entities imported: {}", progressReportedAtCount);
}
currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids,
entityStream.getPosition(), streamSize, currentPercent);
if (resp.getGuidAssignments() != null) {
ret.getGuidAssignments().putAll(resp.getGuidAssignments());
......@@ -192,12 +185,47 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
}
importResult.getProcessedEntities().addAll(processedGuids);
LOG.info("bulkImport(): done. Number of entities imported: {}", processedGuids.size());
LOG.info("bulkImport(): done. Total number of entities (including referred entities) imported: {}", processedGuids.size());
return ret;
}
private void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
private float updateImportMetrics(AtlasEntityWithExtInfo currentEntity,
EntityMutationResponse resp,
AtlasImportResult importResult,
Set<String> processedGuids,
int currentIndex, int streamSize, float currentPercent) {
updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)",
currentEntity.getEntity().getTypeName(),
currentIndex,
currentEntity.getEntity().getGuid());
return updateImportProgress(LOG, currentIndex + 1, streamSize, currentPercent, lastEntityImported);
}
private static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent,
String additionalInfo) {
final double tolerance = 0.000001;
final int MAX_PERCENT = 100;
float percent = (float) ((currentIndex * MAX_PERCENT)/streamSize);
boolean updateLog = Double.compare(percent, currentPercent) > tolerance;
float updatedPercent = (MAX_PERCENT < streamSize) ? percent :
((updateLog) ? ++currentPercent : currentPercent);
if (updateLog) {
log.info("bulkImport(): progress: {}% (of {}) - {}", (int) Math.ceil(percent), streamSize, additionalInfo);
}
return updatedPercent;
}
private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
if (list == null) {
return;
}
......
......@@ -21,12 +21,15 @@ import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
public class AtlasEntityStreamForImport extends AtlasEntityStream implements EntityImportStream {
private int currentPosition = 0;
public AtlasEntityStreamForImport(AtlasEntityWithExtInfo entityWithExtInfo, EntityStream entityStream) {
super(entityWithExtInfo, entityStream);
}
@Override
public AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
currentPosition++;
AtlasEntity entity = next();
return entity != null ? new AtlasEntityWithExtInfo(entity, super.entitiesWithExtInfo) : null;
......@@ -44,6 +47,25 @@ public class AtlasEntityStreamForImport extends AtlasEntityStream implements Ent
}
@Override
public int size() {
return 1;
}
@Override
public void setPosition(int position) {
// not applicable for a single entity stream
}
@Override
public int getPosition() {
return currentPosition;
}
@Override
public void setPositionUsingEntityGuid(String guid) {
}
@Override
public void onImportComplete(String guid) {
}
......
......@@ -22,6 +22,12 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
public interface EntityImportStream extends EntityStream {
int size();
void setPosition(int position);
int getPosition();
void setPositionUsingEntityGuid(String guid);
AtlasEntityWithExtInfo getNextEntityWithExtInfo();
void onImportComplete(String guid);
......
......@@ -21,6 +21,7 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.testng.Assert;
import org.testng.ITestContext;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
......@@ -29,7 +30,9 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.util.List;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.AssertJUnit.assertTrue;
......@@ -41,6 +44,11 @@ public class ZipSourceTest {
return new Object[][] {{ new ZipSource(fs) }};
}
@DataProvider(name = "sales")
public static Object[][] getDataFromQuickStart_v1_Sales(ITestContext context) throws IOException {
return getZipSource("sales-v1-full.zip");
}
@Test
public void improperInit_ReturnsNullCreationOrder() throws IOException, AtlasBaseException {
byte bytes[] = new byte[10];
......@@ -108,7 +116,27 @@ public class ZipSourceTest {
assertEquals(e.getGuid(), creationOrder.get(i));
}
Assert.assertFalse(zipSource.hasNext());
assertFalse(zipSource.hasNext());
}
@Test(dataProvider = "sales")
public void iteratorSetPositionBehavor(ZipSource zipSource) throws IOException, AtlasBaseException {
Assert.assertTrue(zipSource.hasNext());
List<String> creationOrder = zipSource.getCreationOrder();
int moveToPosition_2 = 2;
zipSource.setPosition(moveToPosition_2);
assertEquals(zipSource.getPosition(), moveToPosition_2);
assertTrue(zipSource.getPosition() < creationOrder.size());
assertTrue(zipSource.hasNext());
for (int i = 1; i < 4; i++) {
zipSource.next();
assertEquals(zipSource.getPosition(), moveToPosition_2 + i);
}
assertTrue(zipSource.hasNext());
}
@Test(dataProvider = "zipFileStocks")
......@@ -123,6 +151,7 @@ public class ZipSourceTest {
if(e.getTypeName().equals("hive_db")) {
Object o = e.getAttribute("qualifiedName");
String s = (String) o;
assertNotNull(e);
assertTrue(s.contains("@cl2"));
break;
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v1;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
public class AtlasEntityStoreV1BulkImportPercentTest {
private final int MAX_PERCENT = 100;
private List<Integer> percentHolder;
private Logger log;
public void setupPercentHolder(int max) {
percentHolder = new ArrayList<>();
}
@BeforeClass
void mockLog() {
log = mock(Logger.class);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
Object[] args = invocationOnMock.getArguments();
Integer d = (Integer) args[1];
percentHolder.add(d.intValue());
return null;
}
}).when(log).info(anyString(), anyFloat(), anyInt(), anyString());
}
@Test
public void percentTest_Equal4() throws Exception {
runWithSize(4);
assertEqualsForPercentHolder(25.0, 50.0, 75.0, 100.0);
}
@Test
public void percentTest_Equal10() throws Exception {
runWithSize(10);
assertEqualsForPercentHolder(10.0, 20.0, 30.0, 40.0, 50, 60, 70, 80, 90, 100);
}
private void assertEqualsForPercentHolder(double... expected) {
assertEquals(percentHolder.size(), expected.length);
Object actual[] = percentHolder.toArray();
for (int i = 0; i < expected.length; i++) {
assertTrue((int) Double.compare((int) actual[i], expected[i]) == 0);
}
}
@Test
public void bulkImportPercentageTestLessThan100() throws Exception {
int streamSize = 20;
runWithSize(streamSize);
assertEqualsForPercentHolder(5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95, 100);
}
@Test
public void percentTest_Equal101() throws Exception {
int streamSize = 101;
double[] expected = fillPercentHolderWith100();
runWithSize(streamSize);
assertEqualsForPercentHolder(expected);
}
@Test
public void percentTest_Equal200() throws Exception {
int streamSize = 200;
double[] expected = fillPercentHolderWith100();
runWithSize(streamSize);
assertEqualsForPercentHolder(expected);
}
@Test
public void percentTest_Equal202() throws Exception {
int streamSize = 202;
double[] expected = fillPercentHolderWith100();
runWithSize(streamSize);
assertEqualsForPercentHolder(expected);
}
@Test
public void percentTest_Equal1001() throws Exception {
int streamSize = 1001;
double[] expected = fillPercentHolderWith100();
runWithSize(streamSize);
assertEqualsForPercentHolder(expected);
}
@Test
public void percentTest_Equal4323() throws Exception {
int streamSize = 4323;
double[] expected = fillPercentHolderWith100();
runWithSize(streamSize);
assertEqualsForPercentHolder(expected);
}
@Test
public void percentTest_Equal269() throws Exception {
int streamSize = 269;
double[] expected = fillPercentHolderWith100();
runWithSize(streamSize);
assertEqualsForPercentHolder(expected);
}
private void runWithSize(int streamSize) throws Exception {
float currentPercent = 0;
setupPercentHolder(streamSize);
for (int currentIndex = 0; currentIndex < streamSize; currentIndex++) {
currentPercent = invokeBulkImportProgress(currentIndex + 1, streamSize, currentPercent);
}
}
private float invokeBulkImportProgress(int currentIndex, int streamSize, float currentPercent) throws Exception {
return Whitebox.invokeMethod(AtlasEntityStoreV1.class, "updateImportProgress", log, currentIndex, streamSize, currentPercent, "additional info");
}
private double[] fillPercentHolderWith100() {
double start = 1;
double expected[] = new double[MAX_PERCENT];
for (int i = 0; i < expected.length; i++) {
expected[i] = start;
start ++;
}
return expected;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment