From f5e0d528be7129c02e19cfdccf8afa6c6520a918 Mon Sep 17 00:00:00 2001 From: ashutoshm <amestry@hortonworks.com> Date: Mon, 5 Jun 2017 21:05:32 -0700 Subject: [PATCH] ATLAS-1851: update import API to support resume from a specific entity/position Signed-off-by: Madhan Neethiraj <madhan@apache.org> --- intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java | 21 +++++++++++++++++++++ repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java | 10 ++++++++++ repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java | 38 ++++++++++++++++++++++++++++++++++++++ repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java | 54 +++++++++++++++++++++++++++++++++++++++++------------- repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java | 22 ++++++++++++++++++++++ repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java | 6 ++++++ repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java | 31 ++++++++++++++++++++++++++++++- repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1BulkImportPercentTest.java | 168 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 336 insertions(+), 14 deletions(-) create mode 100644 repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1BulkImportPercentTest.java diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java index 4f2c1fb..b19f709 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java @@ -19,6 +19,7 @@ package org.apache.atlas.model.impexp; import org.apache.atlas.model.typedef.AtlasBaseTypeDef; import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.annotate.JsonIgnore; import org.codehaus.jackson.annotate.JsonIgnoreProperties; import org.codehaus.jackson.map.annotate.JsonSerialize; @@ -41,6 +42,8 @@ import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONL public class AtlasImportRequest implements Serializable { private static final long serialVersionUID = 1L; public static final String TRANSFORMS_KEY = "transforms"; + private static final String START_POSITION_KEY = "startPosition"; + private static final String START_GUID_KEY = "startGuid"; private Map<String, String> options; @@ -70,4 +73,22 @@ public class AtlasImportRequest implements Serializable { public String toString() { return toString(new StringBuilder()).toString(); } + + @JsonIgnore + public String getStartGuid() { + if (this.options == null || !this.options.containsKey(START_GUID_KEY)) { + return null; + } + + return (String) this.options.get(START_GUID_KEY); + } + + @JsonIgnore + public String getStartPosition() { + if (this.options == null || !this.options.containsKey(START_POSITION_KEY)) { + return null; + } + + return (String) this.options.get(START_GUID_KEY); + } } diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java index 8a7e358..4ffbb88 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java @@ -71,8 +71,10 @@ public class ImportService { source.setImportTransform(ImportTransforms.fromJson(transforms)); startTimestamp = System.currentTimeMillis(); processTypes(source.getTypesDef(), result); + setStartPosition(request, source); processEntities(source, result); + result.setOperationStatus(AtlasImportResult.OperationStatus.SUCCESS); } catch (AtlasBaseException excp) { LOG.error("import(user={}, from={}): failed", userName, requestingIP, excp); @@ -90,6 +92,14 @@ public class ImportService { return result; } + private void setStartPosition(AtlasImportRequest request, ZipSource source) throws AtlasBaseException { + if(request.getStartGuid() != null) { + source.setPositionUsingEntityGuid(request.getStartGuid()); + } else if(request.getStartPosition() != null) { + source.setPosition(Integer.parseInt(request.getStartPosition())); + } + } + public AtlasImportResult run(AtlasImportRequest request, String userName, String hostName, String requestingIP) throws AtlasBaseException { String fileName = (String) request.getOptions().get("FILENAME"); diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java index 76451c9..aa1477f 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java @@ -23,6 +23,7 @@ import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.repository.store.graph.v1.EntityImportStream; +import org.apache.commons.lang.StringUtils; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; import org.slf4j.Logger; @@ -49,6 +50,7 @@ public class ZipSource implements EntityImportStream { private Iterator<String> iterator; private Map<String, String> guidEntityJsonMap; private ImportTransforms importTransform; + private int currentPosition; public ZipSource(InputStream inputStream) throws IOException { this(inputStream, null); @@ -190,6 +192,7 @@ public class ZipSource implements EntityImportStream { @Override public AtlasEntityWithExtInfo getNextEntityWithExtInfo() { try { + currentPosition++; return getEntityWithExtInfo(this.iterator.next()); } catch (AtlasBaseException e) { e.printStackTrace(); @@ -227,8 +230,43 @@ public class ZipSource implements EntityImportStream { return null; } + public int size() { + return this.creationOrder.size(); + } + @Override public void onImportComplete(String guid) { guidEntityJsonMap.remove(guid); } + + + @Override + public void setPosition(int index) { + currentPosition = index; + reset(); + for (int i = 0; i < creationOrder.size() && i <= index; i++) { + iterator.next(); + } + } + + @Override + public void setPositionUsingEntityGuid(String guid) { + if(StringUtils.isBlank(guid)) { + return; + } + + int index = creationOrder.indexOf(guid); + if (index == -1) { + return; + } + + setPosition(index); + } + + @Override + public int getPosition() { + return currentPosition; + } + + } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java index 27c0b5d..75e9132 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java @@ -160,7 +160,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { ret.setGuidAssignments(new HashMap<String, String>()); Set<String> processedGuids = new HashSet<>(); - int progressReportedAtCount = 0; + int streamSize = entityStream.size(); + float currentPercent = 0f; while (entityStream.hasNext()) { AtlasEntityWithExtInfo entityWithExtInfo = entityStream.getNextEntityWithExtInfo(); @@ -173,16 +174,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, entityStream); EntityMutationResponse resp = createOrUpdate(oneEntityStream, false, true); - - updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult); - updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult); - updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult); - - if ((processedGuids.size() - progressReportedAtCount) > 1000) { - progressReportedAtCount = processedGuids.size(); - - LOG.info("bulkImport(): in progress.. number of entities imported: {}", progressReportedAtCount); - } + currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids, + entityStream.getPosition(), streamSize, currentPercent); if (resp.getGuidAssignments() != null) { ret.getGuidAssignments().putAll(resp.getGuidAssignments()); @@ -192,12 +185,47 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { } importResult.getProcessedEntities().addAll(processedGuids); - LOG.info("bulkImport(): done. Number of entities imported: {}", processedGuids.size()); + LOG.info("bulkImport(): done. Total number of entities (including referred entities) imported: {}", processedGuids.size()); return ret; } - private void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) { + private float updateImportMetrics(AtlasEntityWithExtInfo currentEntity, + EntityMutationResponse resp, + AtlasImportResult importResult, + Set<String> processedGuids, + int currentIndex, int streamSize, float currentPercent) { + + updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult); + updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult); + updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult); + + String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", + currentEntity.getEntity().getTypeName(), + currentIndex, + currentEntity.getEntity().getGuid()); + + return updateImportProgress(LOG, currentIndex + 1, streamSize, currentPercent, lastEntityImported); + } + + private static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, + String additionalInfo) { + final double tolerance = 0.000001; + final int MAX_PERCENT = 100; + + float percent = (float) ((currentIndex * MAX_PERCENT)/streamSize); + boolean updateLog = Double.compare(percent, currentPercent) > tolerance; + float updatedPercent = (MAX_PERCENT < streamSize) ? percent : + ((updateLog) ? ++currentPercent : currentPercent); + + if (updateLog) { + log.info("bulkImport(): progress: {}% (of {}) - {}", (int) Math.ceil(percent), streamSize, additionalInfo); + } + + return updatedPercent; + } + + private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) { if (list == null) { return; } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java index 69140e6..90ae15d 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java @@ -21,12 +21,15 @@ import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; public class AtlasEntityStreamForImport extends AtlasEntityStream implements EntityImportStream { + private int currentPosition = 0; + public AtlasEntityStreamForImport(AtlasEntityWithExtInfo entityWithExtInfo, EntityStream entityStream) { super(entityWithExtInfo, entityStream); } @Override public AtlasEntityWithExtInfo getNextEntityWithExtInfo() { + currentPosition++; AtlasEntity entity = next(); return entity != null ? new AtlasEntityWithExtInfo(entity, super.entitiesWithExtInfo) : null; @@ -44,6 +47,25 @@ public class AtlasEntityStreamForImport extends AtlasEntityStream implements Ent } @Override + public int size() { + return 1; + } + + @Override + public void setPosition(int position) { + // not applicable for a single entity stream + } + + @Override + public int getPosition() { + return currentPosition; + } + + @Override + public void setPositionUsingEntityGuid(String guid) { + } + + @Override public void onImportComplete(String guid) { } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java index 0f711db..d4b6c55 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java @@ -22,6 +22,12 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; public interface EntityImportStream extends EntityStream { + int size(); + void setPosition(int position); + int getPosition(); + + void setPositionUsingEntityGuid(String guid); + AtlasEntityWithExtInfo getNextEntityWithExtInfo(); void onImportComplete(String guid); diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java index be9c20b..8a57dcd 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java @@ -21,6 +21,7 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.typedef.AtlasTypesDef; import org.testng.Assert; +import org.testng.ITestContext; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -29,7 +30,9 @@ import java.io.FileInputStream; import java.io.IOException; import java.util.List; +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.AssertJUnit.assertTrue; @@ -41,6 +44,11 @@ public class ZipSourceTest { return new Object[][] {{ new ZipSource(fs) }}; } + @DataProvider(name = "sales") + public static Object[][] getDataFromQuickStart_v1_Sales(ITestContext context) throws IOException { + return getZipSource("sales-v1-full.zip"); + } + @Test public void improperInit_ReturnsNullCreationOrder() throws IOException, AtlasBaseException { byte bytes[] = new byte[10]; @@ -108,7 +116,27 @@ public class ZipSourceTest { assertEquals(e.getGuid(), creationOrder.get(i)); } - Assert.assertFalse(zipSource.hasNext()); + assertFalse(zipSource.hasNext()); + } + + @Test(dataProvider = "sales") + public void iteratorSetPositionBehavor(ZipSource zipSource) throws IOException, AtlasBaseException { + Assert.assertTrue(zipSource.hasNext()); + + List<String> creationOrder = zipSource.getCreationOrder(); + int moveToPosition_2 = 2; + zipSource.setPosition(moveToPosition_2); + + assertEquals(zipSource.getPosition(), moveToPosition_2); + assertTrue(zipSource.getPosition() < creationOrder.size()); + + assertTrue(zipSource.hasNext()); + for (int i = 1; i < 4; i++) { + zipSource.next(); + assertEquals(zipSource.getPosition(), moveToPosition_2 + i); + } + + assertTrue(zipSource.hasNext()); } @Test(dataProvider = "zipFileStocks") @@ -123,6 +151,7 @@ public class ZipSourceTest { if(e.getTypeName().equals("hive_db")) { Object o = e.getAttribute("qualifiedName"); String s = (String) o; + assertNotNull(e); assertTrue(s.contains("@cl2")); break; diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1BulkImportPercentTest.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1BulkImportPercentTest.java new file mode 100644 index 0000000..10becc1 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1BulkImportPercentTest.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v1; + +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.reflect.Whitebox; +import org.slf4j.Logger; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +public class AtlasEntityStoreV1BulkImportPercentTest { + + private final int MAX_PERCENT = 100; + private List<Integer> percentHolder; + private Logger log; + + public void setupPercentHolder(int max) { + percentHolder = new ArrayList<>(); + } + + @BeforeClass + void mockLog() { + log = mock(Logger.class); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + Object[] args = invocationOnMock.getArguments(); + Integer d = (Integer) args[1]; + percentHolder.add(d.intValue()); + return null; + } + }).when(log).info(anyString(), anyFloat(), anyInt(), anyString()); + } + + @Test + public void percentTest_Equal4() throws Exception { + runWithSize(4); + assertEqualsForPercentHolder(25.0, 50.0, 75.0, 100.0); + } + + @Test + public void percentTest_Equal10() throws Exception { + runWithSize(10); + + assertEqualsForPercentHolder(10.0, 20.0, 30.0, 40.0, 50, 60, 70, 80, 90, 100); + } + + private void assertEqualsForPercentHolder(double... expected) { + assertEquals(percentHolder.size(), expected.length); + Object actual[] = percentHolder.toArray(); + for (int i = 0; i < expected.length; i++) { + assertTrue((int) Double.compare((int) actual[i], expected[i]) == 0); + } + } + + @Test + public void bulkImportPercentageTestLessThan100() throws Exception { + int streamSize = 20; + + runWithSize(streamSize); + assertEqualsForPercentHolder(5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95, 100); + } + + @Test + public void percentTest_Equal101() throws Exception { + int streamSize = 101; + + double[] expected = fillPercentHolderWith100(); + + runWithSize(streamSize); + assertEqualsForPercentHolder(expected); + } + + @Test + public void percentTest_Equal200() throws Exception { + int streamSize = 200; + + double[] expected = fillPercentHolderWith100(); + + runWithSize(streamSize); + assertEqualsForPercentHolder(expected); + } + + @Test + public void percentTest_Equal202() throws Exception { + int streamSize = 202; + + double[] expected = fillPercentHolderWith100(); + + runWithSize(streamSize); + assertEqualsForPercentHolder(expected); + } + + @Test + public void percentTest_Equal1001() throws Exception { + int streamSize = 1001; + double[] expected = fillPercentHolderWith100(); + + runWithSize(streamSize); + assertEqualsForPercentHolder(expected); + } + + @Test + public void percentTest_Equal4323() throws Exception { + int streamSize = 4323; + + double[] expected = fillPercentHolderWith100(); + runWithSize(streamSize); + assertEqualsForPercentHolder(expected); + } + + @Test + public void percentTest_Equal269() throws Exception { + int streamSize = 269; + + double[] expected = fillPercentHolderWith100(); + runWithSize(streamSize); + assertEqualsForPercentHolder(expected); + } + + private void runWithSize(int streamSize) throws Exception { + float currentPercent = 0; + setupPercentHolder(streamSize); + for (int currentIndex = 0; currentIndex < streamSize; currentIndex++) { + currentPercent = invokeBulkImportProgress(currentIndex + 1, streamSize, currentPercent); + } + } + + private float invokeBulkImportProgress(int currentIndex, int streamSize, float currentPercent) throws Exception { + return Whitebox.invokeMethod(AtlasEntityStoreV1.class, "updateImportProgress", log, currentIndex, streamSize, currentPercent, "additional info"); + } + + private double[] fillPercentHolderWith100() { + double start = 1; + double expected[] = new double[MAX_PERCENT]; + for (int i = 0; i < expected.length; i++) { + expected[i] = start; + start ++; + } + return expected; + } +} -- libgit2 0.27.1