diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java index 93be953..fc34847 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java @@ -52,6 +52,8 @@ public class AtlasExportRequest implements Serializable { public static final String OPTION_KEY_REPLICATED_TO = "replicatedTo"; public static final String FETCH_TYPE_FULL = "full"; public static final String FETCH_TYPE_CONNECTED = "connected"; + public static final String FETCH_TYPE_INCREMENTAL = "incremental"; + public static final String FETCH_TYPE_INCREMENTAL_FROM_TIME = "fromTime"; public static final String MATCH_TYPE_STARTS_WITH = "startsWith"; public static final String MATCH_TYPE_ENDS_WITH = "endsWith"; public static final String MATCH_TYPE_CONTAINS = "contains"; diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java index d12c20f..14a1f65 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java @@ -62,7 +62,7 @@ public class AtlasExportResult implements Serializable { private AtlasExportData data; private OperationStatus operationStatus; private String sourceClusterName; - + private long lastModifiedTimestamp; public AtlasExportResult() { this(null, null, null, null, System.currentTimeMillis()); @@ -136,6 +136,14 @@ public class AtlasExportResult implements Serializable { this.data = data; } + public void setLastModifiedTimestamp(long lastModifiedTimestamp) { + this.lastModifiedTimestamp = lastModifiedTimestamp; + } + + public long getLastModifiedTimestamp() { + return this.lastModifiedTimestamp; + } + public OperationStatus getOperationStatus() { return operationStatus; } @@ -175,6 +183,7 @@ public class AtlasExportResult implements Serializable { sb.append(", userName='").append(userName).append("'"); sb.append(", clientIpAddress='").append(clientIpAddress).append("'"); sb.append(", hostName='").append(hostName).append("'"); + sb.append(", lastModifiedTimestamp='").append(lastModifiedTimestamp).append("'"); sb.append(", sourceCluster='").append(sourceClusterName).append("'"); sb.append(", timeStamp='").append(timeStamp).append("'"); sb.append(", metrics={"); diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java index f74b3cd..30dd8c1 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java @@ -129,6 +129,7 @@ public class ExportService { clearContextData(context); context.result.setOperationStatus(getOverallOperationStatus(statuses)); context.result.incrementMeticsCounter("duration", duration); + context.result.setLastModifiedTimestamp(context.newestLastModifiedTimestamp); context.sink.setResult(context.result); } @@ -287,7 +288,8 @@ public class ExportService { } private void logInfoStartingEntitiesFound(AtlasObjectId item, ExportContext context, List<String> ret) { - LOG.info("export(item={}; matchType={}, fetchType={}): found {} entities", item, context.matchType, context.fetchType, ret.size()); + LOG.info("export(item={}; matchType={}, fetchType={}): found {} entities: options: {}", item, + context.matchType, context.fetchType, ret.size(), AtlasType.toJson(context.result.getRequest())); } private void setupBindingsForTypeName(ExportContext context, String typeName) { @@ -336,9 +338,9 @@ public class ExportService { TraversalDirection direction = context.guidDirection.get(guid); AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid); - if(!context.lineageProcessed.contains(guid)) { - context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid()); - } + if (!context.lineageProcessed.contains(guid) && context.doesTimestampQualify(entityWithExtInfo.getEntity())) { + context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid()); + } addEntity(entityWithExtInfo, context); exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), context); @@ -367,6 +369,7 @@ public class ExportService { getEntityGuidsForConnectedFetch(entity, context, direction); break; + case INCREMENTAL: case FULL: default: getEntityGuidsForFullFetch(entity, context); @@ -479,21 +482,31 @@ public class ExportService { } } - private void addEntity(AtlasEntityWithExtInfo entity, ExportContext context) throws AtlasBaseException { - if(context.sink.hasEntity(entity.getEntity().getGuid())) { + private void addEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException { + if(context.sink.hasEntity(entityWithExtInfo.getEntity().getGuid())) { return; } - context.sink.add(entity); + if(context.doesTimestampQualify(entityWithExtInfo.getEntity())) { + context.addToSink(entityWithExtInfo); - context.result.incrementMeticsCounter(String.format("entity:%s", entity.getEntity().getTypeName())); - if(entity.getReferredEntities() != null) { - for (AtlasEntity e: entity.getReferredEntities().values()) { + context.result.incrementMeticsCounter(String.format("entity:%s", entityWithExtInfo.getEntity().getTypeName())); + if (entityWithExtInfo.getReferredEntities() != null) { + for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) { + context.result.incrementMeticsCounter(String.format("entity:%s", e.getTypeName())); + } + } + + context.result.incrementMeticsCounter("entity:withExtInfo"); + } else { + List<AtlasEntity> entities = context.getEntitiesWithModifiedTimestamp(entityWithExtInfo); + for (AtlasEntity e : entities) { + context.result.getData().getEntityCreationOrder().add(e.getGuid()); + context.addToSink(new AtlasEntityWithExtInfo(e)); context.result.incrementMeticsCounter(String.format("entity:%s", e.getTypeName())); } } - context.result.incrementMeticsCounter("entity:withExtInfo"); context.reportProgress(); } @@ -636,7 +649,8 @@ public class ExportService { public enum ExportFetchType { FULL(FETCH_TYPE_FULL), - CONNECTED(FETCH_TYPE_CONNECTED); + CONNECTED(FETCH_TYPE_CONNECTED), + INCREMENTAL(FETCH_TYPE_INCREMENTAL); final String str; ExportFetchType(String s) { @@ -655,6 +669,8 @@ public class ExportService { } static class ExportContext { + private static final int REPORTING_THREASHOLD = 1000; + final Set<String> guidsProcessed = new HashSet<>(); final UniqueList<String> guidsToProcess = new UniqueList<>(); final UniqueList<String> lineageToProcess = new UniqueList<>(); @@ -665,13 +681,15 @@ public class ExportService { final Set<String> structTypes = new HashSet<>(); final Set<String> enumTypes = new HashSet<>(); final AtlasExportResult result; - final ZipSink sink; + private final ZipSink sink; private final ScriptEngine scriptEngine; private final Map<String, Object> bindings; private final ExportFetchType fetchType; private final String matchType; private final boolean skipLineage; + private final long lastModifiedTimestampRequested; + private long newestLastModifiedTimestamp; private int progressReportCount = 0; @@ -684,6 +702,8 @@ public class ExportService { fetchType = getFetchType(result.getRequest()); matchType = getMatchType(result.getRequest()); skipLineage = getOptionSkipLineage(result.getRequest()); + this.lastModifiedTimestampRequested = getLastModifiedTimestamp(fetchType, result.getRequest()); + this.newestLastModifiedTimestamp = 0; } private ExportFetchType getFetchType(AtlasExportRequest request) { @@ -715,6 +735,34 @@ public class ExportService { (boolean) request.getOptions().get(AtlasExportRequest.OPTION_SKIP_LINEAGE); } + private long getLastModifiedTimestamp(ExportFetchType fetchType, AtlasExportRequest request) { + if(fetchType == ExportFetchType.INCREMENTAL && request.getOptions().containsKey(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_FROM_TIME)) { + return Long.parseLong(request.getOptions().get(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_FROM_TIME).toString()); + } + + return 0L; + } + + public List<AtlasEntity> getEntitiesWithModifiedTimestamp(AtlasEntityWithExtInfo entityWithExtInfo) { + if(fetchType != ExportFetchType.INCREMENTAL) { + return new ArrayList<>(); + } + + List<AtlasEntity> ret = new ArrayList<>(); + if(doesTimestampQualify(entityWithExtInfo.getEntity())) { + ret.add(entityWithExtInfo.getEntity()); + return ret; + } + + for (AtlasEntity entity : entityWithExtInfo.getReferredEntities().values()) { + if((doesTimestampQualify(entity))) { + ret.add(entity); + } + } + + return ret; + } + public void clear() { guidsToProcess.clear(); guidsProcessed.clear(); @@ -734,16 +782,40 @@ public class ExportService { } public void reportProgress() { - - if ((guidsProcessed.size() - progressReportCount) > 1000) { + if ((guidsProcessed.size() - progressReportCount) > REPORTING_THREASHOLD) { progressReportCount = guidsProcessed.size(); LOG.info("export(): in progress.. number of entities exported: {}", this.guidsProcessed.size()); } } + public boolean doesTimestampQualify(AtlasEntity entity) { + if(fetchType != ExportFetchType.INCREMENTAL) { + return true; + } + + long entityModificationTimestamp = entity.getUpdateTime().getTime(); + updateNewestLastModifiedTimestamp(entityModificationTimestamp); + return doesTimestampQualify(entityModificationTimestamp); + } + + private void updateNewestLastModifiedTimestamp(long entityModificationTimestamp) { + if(newestLastModifiedTimestamp < entityModificationTimestamp) { + newestLastModifiedTimestamp = entityModificationTimestamp; + } + } + + private boolean doesTimestampQualify(long modificationTimestamp) { + return lastModifiedTimestampRequested < modificationTimestamp; + } + public boolean getSkipLineage() { return skipLineage; } + + public void addToSink(AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException { + updateNewestLastModifiedTimestamp(entityWithExtInfo.getEntity().getUpdateTime().getTime()); + sink.add(entityWithExtInfo); + } } } diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java index 36966f0..79fd308 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java @@ -27,12 +27,17 @@ import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1; import org.apache.atlas.repository.store.graph.v1.SoftDeleteHandlerV1; import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2; +import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.atlas.type.AtlasTypeRegistry; import org.testng.SkipException; +import java.io.IOException; import java.util.Arrays; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createAtlasEntity; +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadEntity; +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadHiveModel; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; @@ -40,9 +45,20 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; public class ExportImportTestBase { + protected static final String ENTITIES_SUB_DIR = "stocksDB-Entities"; + protected static final String DB_GUID = "1637a33e-6512-447b-ade7-249c8cb5344b"; + protected static final String TABLE_GUID = "df122fc3-5555-40f8-a30f-3090b8a622f8"; + protected static final String TABLE_TABLE_GUID = "6f3b305a-c459-4ae4-b651-aee0deb0685f"; + protected static final String TABLE_VIEW_GUID = "56415119-7cb0-40dd-ace8-1e50efd54991"; + protected static final String COLUMN_GUID_HIGH = "f87a5320-1529-4369-8d63-b637ebdf2c1c"; protected DeleteHandlerV1 deleteHandler = mock(SoftDeleteHandlerV1.class); + protected void basicSetup(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) throws IOException, AtlasBaseException { + loadBaseModel(typeDefStore, typeRegistry); + loadHiveModel(typeDefStore, typeRegistry); + } + protected int createEntities(AtlasEntityStoreV2 entityStore, String subDir, String entityFileNames[]) { for (String fileName : entityFileNames) { createAtlasEntity(entityStore, loadEntity(subDir, fileName)); @@ -70,6 +86,7 @@ public class ExportImportTestBase { } assertNotNull(result); + assertNotNull(result.getEntities()); assertTrue(result.getEntities().size() > 0); } diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java new file mode 100644 index 0000000..f86a463 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.impexp; + + +import com.google.common.collect.ImmutableList; +import org.apache.atlas.RequestContext; +import org.apache.atlas.TestModules; +import org.apache.atlas.TestUtilsV2; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasExportRequest; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2; +import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.atlas.type.AtlasClassificationType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.utils.TestResourceFileUtils; +import org.testng.SkipException; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import javax.inject.Inject; +import java.io.IOException; +import java.util.Map; + +import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL_FROM_TIME; +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createTypes; +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getEntities; +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +@Guice(modules = TestModules.TestOnlyModule.class) +public class ExportIncrementalTest extends ExportImportTestBase { + @Inject + AtlasTypeRegistry typeRegistry; + + @Inject + private AtlasTypeDefStore typeDefStore; + + @Inject + ExportService exportService; + + @Inject + ClusterService clusterService; + + @Inject + private AtlasEntityStoreV2 entityStore; + + private final String EXPORT_REQUEST_INCREMENTAL = "export-incremental"; + private long nextTimestamp; + + @BeforeClass + public void setup() throws IOException, AtlasBaseException { + basicSetup(typeDefStore, typeRegistry); + createEntities(entityStore, ENTITIES_SUB_DIR, new String[] { "db", "table-columns"}); + final String[] entityGuids = {DB_GUID, TABLE_GUID}; + verifyCreatedEntities(entityStore, entityGuids, 2); + } + + @BeforeMethod + public void setupTest() { + RequestContext.clear(); + RequestContext.get().setUser(TestUtilsV2.TEST_USER, null); + } + + @Test + public void atT0_ReturnsAllEntities() throws AtlasBaseException { + final int expectedEntityCount = 2; + + AtlasExportRequest request = getIncrementalRequest(0); + ZipSource source = runExportWithParameters(exportService, request); + AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount); + + int count = 0; + for (Map.Entry<String, AtlasEntity> entry : entities.getReferredEntities().entrySet()) { + assertNotNull(entry.getValue()); + count++; + } + + nextTimestamp = updateTimesampForNextIncrementalExport(source); + assertEquals(count, expectedEntityCount); + } + + private long updateTimesampForNextIncrementalExport(ZipSource source) throws AtlasBaseException { + return source.getExportResult().getLastModifiedTimestamp(); + } + + @Test(dependsOnMethods = "atT0_ReturnsAllEntities") + public void atT1_NewClassificationAttachedToTable_ReturnsChangedTable() throws AtlasBaseException { + final int expectedEntityCount = 1; + + AtlasClassificationType ct = createNewClassification(); + entityStore.addClassifications(TABLE_GUID, ImmutableList.of(ct.createDefaultValue())); + + AtlasExportRequest request = getIncrementalRequest(nextTimestamp); + ZipSource source = runExportWithParameters(exportService, request); + AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount); + + AtlasEntity entity = null; + for (Map.Entry<String, AtlasEntity> entry : entities.getReferredEntities().entrySet()) { + entity = entry.getValue(); + assertNotNull(entity); + break; + } + + nextTimestamp = updateTimesampForNextIncrementalExport(source); + assertEquals(entity.getGuid(),TABLE_GUID); + } + + private AtlasClassificationType createNewClassification() { + createTypes(typeDefStore, ENTITIES_SUB_DIR,"typesDef-new-classification"); + return typeRegistry.getClassificationTypeByName("T1"); + } + + @Test(dependsOnMethods = "atT1_NewClassificationAttachedToTable_ReturnsChangedTable") + public void atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn() throws AtlasBaseException { + final int expectedEntityCount = 1; + + AtlasEntity.AtlasEntityWithExtInfo tableEntity = entityStore.getById(TABLE_GUID); + long preExportTableEntityTimestamp = tableEntity.getEntity().getUpdateTime().getTime(); + + entityStore.addClassifications(COLUMN_GUID_HIGH, ImmutableList.of(typeRegistry.getClassificationTypeByName("T1").createDefaultValue())); + + ZipSource source = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp)); + AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount); + + for (Map.Entry<String, AtlasEntity> entry : entities.getReferredEntities().entrySet()) { + AtlasEntity entity = entry.getValue(); + assertNotNull(entity.getGuid()); + break; + } + + long postUpdateTableEntityTimestamp = tableEntity.getEntity().getUpdateTime().getTime(); + assertEquals(preExportTableEntityTimestamp, postUpdateTableEntityTimestamp); + nextTimestamp = updateTimesampForNextIncrementalExport(source); + } + + @Test(dependsOnMethods = "atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn") + public void exportingWithSameParameters_Succeeds() { + ZipSource source = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp)); + + assertNotNull(source); + } + + private AtlasExportRequest getIncrementalRequest(long timestamp) { + try { + AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, EXPORT_REQUEST_INCREMENTAL, AtlasExportRequest.class); + request.getOptions().put(FETCH_TYPE_INCREMENTAL_FROM_TIME, timestamp); + + return request; + } catch (IOException e) { + throw new SkipException(String.format("getIncrementalRequest: '%s' could not be laoded.", EXPORT_REQUEST_INCREMENTAL)); + } + } +} diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java index 96dd64e..3393b82 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java @@ -53,12 +53,6 @@ import static org.testng.AssertJUnit.fail; @Guice(modules = TestModules.TestOnlyModule.class) public class ExportSkipLineageTest extends ExportImportTestBase { - private final String ENTITIES_SUB_DIR = "stocksDB-Entities"; - private final String DB_GUID = "1637a33e-6512-447b-ade7-249c8cb5344b"; - private final String TABLE_GUID = "df122fc3-5555-40f8-a30f-3090b8a622f8"; - private final String TABLE_TABLE_GUID = "6f3b305a-c459-4ae4-b651-aee0deb0685f"; - private final String TABLE_VIEW_GUID = "56415119-7cb0-40dd-ace8-1e50efd54991"; - @Inject AtlasTypeRegistry typeRegistry; diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java index 38877f7..faf68fe 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java @@ -257,7 +257,7 @@ public class ZipFileResourceTestUtils { AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = new AtlasEntity.AtlasEntityWithExtInfo(); try { int count = 0; - for(String s : source.getCreationOrder()) { + for (String s : source.getCreationOrder()) { AtlasEntity entity = source.getByGuid(s); entityWithExtInfo.addReferredEntity(s, entity); count++; diff --git a/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json b/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json new file mode 100644 index 0000000..c2bc867 --- /dev/null +++ b/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json @@ -0,0 +1,11 @@ +{ + "itemsToExport": [ + { + "typeName": "hive_db", "uniqueAttributes": { "qualifiedName": "stocks_base@cl1" } + } + ], + "options": { + "fetchType": "incremental", + "fromTime": 0 + } +}