Commit a2397c15 by Ashutosh Mestry

ATLAS-2738: Export Process: Support for incremental export.

parent b37154f8
......@@ -52,6 +52,8 @@ public class AtlasExportRequest implements Serializable {
public static final String OPTION_KEY_REPLICATED_TO = "replicatedTo";
public static final String FETCH_TYPE_FULL = "full";
public static final String FETCH_TYPE_CONNECTED = "connected";
public static final String FETCH_TYPE_INCREMENTAL = "incremental";
public static final String FETCH_TYPE_INCREMENTAL_FROM_TIME = "fromTime";
public static final String MATCH_TYPE_STARTS_WITH = "startsWith";
public static final String MATCH_TYPE_ENDS_WITH = "endsWith";
public static final String MATCH_TYPE_CONTAINS = "contains";
......
......@@ -62,7 +62,7 @@ public class AtlasExportResult implements Serializable {
private AtlasExportData data;
private OperationStatus operationStatus;
private String sourceClusterName;
private long lastModifiedTimestamp;
public AtlasExportResult() {
this(null, null, null, null, System.currentTimeMillis());
......@@ -136,6 +136,14 @@ public class AtlasExportResult implements Serializable {
this.data = data;
}
public void setLastModifiedTimestamp(long lastModifiedTimestamp) {
this.lastModifiedTimestamp = lastModifiedTimestamp;
}
public long getLastModifiedTimestamp() {
return this.lastModifiedTimestamp;
}
public OperationStatus getOperationStatus() {
return operationStatus;
}
......@@ -175,6 +183,7 @@ public class AtlasExportResult implements Serializable {
sb.append(", userName='").append(userName).append("'");
sb.append(", clientIpAddress='").append(clientIpAddress).append("'");
sb.append(", hostName='").append(hostName).append("'");
sb.append(", lastModifiedTimestamp='").append(lastModifiedTimestamp).append("'");
sb.append(", sourceCluster='").append(sourceClusterName).append("'");
sb.append(", timeStamp='").append(timeStamp).append("'");
sb.append(", metrics={");
......
......@@ -129,6 +129,7 @@ public class ExportService {
clearContextData(context);
context.result.setOperationStatus(getOverallOperationStatus(statuses));
context.result.incrementMeticsCounter("duration", duration);
context.result.setLastModifiedTimestamp(context.newestLastModifiedTimestamp);
context.sink.setResult(context.result);
}
......@@ -287,7 +288,8 @@ public class ExportService {
}
private void logInfoStartingEntitiesFound(AtlasObjectId item, ExportContext context, List<String> ret) {
LOG.info("export(item={}; matchType={}, fetchType={}): found {} entities", item, context.matchType, context.fetchType, ret.size());
LOG.info("export(item={}; matchType={}, fetchType={}): found {} entities: options: {}", item,
context.matchType, context.fetchType, ret.size(), AtlasType.toJson(context.result.getRequest()));
}
private void setupBindingsForTypeName(ExportContext context, String typeName) {
......@@ -336,9 +338,9 @@ public class ExportService {
TraversalDirection direction = context.guidDirection.get(guid);
AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
if(!context.lineageProcessed.contains(guid)) {
context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid());
}
if (!context.lineageProcessed.contains(guid) && context.doesTimestampQualify(entityWithExtInfo.getEntity())) {
context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid());
}
addEntity(entityWithExtInfo, context);
exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), context);
......@@ -367,6 +369,7 @@ public class ExportService {
getEntityGuidsForConnectedFetch(entity, context, direction);
break;
case INCREMENTAL:
case FULL:
default:
getEntityGuidsForFullFetch(entity, context);
......@@ -479,21 +482,31 @@ public class ExportService {
}
}
private void addEntity(AtlasEntityWithExtInfo entity, ExportContext context) throws AtlasBaseException {
if(context.sink.hasEntity(entity.getEntity().getGuid())) {
private void addEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException {
if(context.sink.hasEntity(entityWithExtInfo.getEntity().getGuid())) {
return;
}
context.sink.add(entity);
if(context.doesTimestampQualify(entityWithExtInfo.getEntity())) {
context.addToSink(entityWithExtInfo);
context.result.incrementMeticsCounter(String.format("entity:%s", entity.getEntity().getTypeName()));
if(entity.getReferredEntities() != null) {
for (AtlasEntity e: entity.getReferredEntities().values()) {
context.result.incrementMeticsCounter(String.format("entity:%s", entityWithExtInfo.getEntity().getTypeName()));
if (entityWithExtInfo.getReferredEntities() != null) {
for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
context.result.incrementMeticsCounter(String.format("entity:%s", e.getTypeName()));
}
}
context.result.incrementMeticsCounter("entity:withExtInfo");
} else {
List<AtlasEntity> entities = context.getEntitiesWithModifiedTimestamp(entityWithExtInfo);
for (AtlasEntity e : entities) {
context.result.getData().getEntityCreationOrder().add(e.getGuid());
context.addToSink(new AtlasEntityWithExtInfo(e));
context.result.incrementMeticsCounter(String.format("entity:%s", e.getTypeName()));
}
}
context.result.incrementMeticsCounter("entity:withExtInfo");
context.reportProgress();
}
......@@ -636,7 +649,8 @@ public class ExportService {
public enum ExportFetchType {
FULL(FETCH_TYPE_FULL),
CONNECTED(FETCH_TYPE_CONNECTED);
CONNECTED(FETCH_TYPE_CONNECTED),
INCREMENTAL(FETCH_TYPE_INCREMENTAL);
final String str;
ExportFetchType(String s) {
......@@ -655,6 +669,8 @@ public class ExportService {
}
static class ExportContext {
private static final int REPORTING_THREASHOLD = 1000;
final Set<String> guidsProcessed = new HashSet<>();
final UniqueList<String> guidsToProcess = new UniqueList<>();
final UniqueList<String> lineageToProcess = new UniqueList<>();
......@@ -665,13 +681,15 @@ public class ExportService {
final Set<String> structTypes = new HashSet<>();
final Set<String> enumTypes = new HashSet<>();
final AtlasExportResult result;
final ZipSink sink;
private final ZipSink sink;
private final ScriptEngine scriptEngine;
private final Map<String, Object> bindings;
private final ExportFetchType fetchType;
private final String matchType;
private final boolean skipLineage;
private final long lastModifiedTimestampRequested;
private long newestLastModifiedTimestamp;
private int progressReportCount = 0;
......@@ -684,6 +702,8 @@ public class ExportService {
fetchType = getFetchType(result.getRequest());
matchType = getMatchType(result.getRequest());
skipLineage = getOptionSkipLineage(result.getRequest());
this.lastModifiedTimestampRequested = getLastModifiedTimestamp(fetchType, result.getRequest());
this.newestLastModifiedTimestamp = 0;
}
private ExportFetchType getFetchType(AtlasExportRequest request) {
......@@ -715,6 +735,34 @@ public class ExportService {
(boolean) request.getOptions().get(AtlasExportRequest.OPTION_SKIP_LINEAGE);
}
private long getLastModifiedTimestamp(ExportFetchType fetchType, AtlasExportRequest request) {
if(fetchType == ExportFetchType.INCREMENTAL && request.getOptions().containsKey(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_FROM_TIME)) {
return Long.parseLong(request.getOptions().get(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_FROM_TIME).toString());
}
return 0L;
}
public List<AtlasEntity> getEntitiesWithModifiedTimestamp(AtlasEntityWithExtInfo entityWithExtInfo) {
if(fetchType != ExportFetchType.INCREMENTAL) {
return new ArrayList<>();
}
List<AtlasEntity> ret = new ArrayList<>();
if(doesTimestampQualify(entityWithExtInfo.getEntity())) {
ret.add(entityWithExtInfo.getEntity());
return ret;
}
for (AtlasEntity entity : entityWithExtInfo.getReferredEntities().values()) {
if((doesTimestampQualify(entity))) {
ret.add(entity);
}
}
return ret;
}
public void clear() {
guidsToProcess.clear();
guidsProcessed.clear();
......@@ -734,16 +782,40 @@ public class ExportService {
}
public void reportProgress() {
if ((guidsProcessed.size() - progressReportCount) > 1000) {
if ((guidsProcessed.size() - progressReportCount) > REPORTING_THREASHOLD) {
progressReportCount = guidsProcessed.size();
LOG.info("export(): in progress.. number of entities exported: {}", this.guidsProcessed.size());
}
}
public boolean doesTimestampQualify(AtlasEntity entity) {
if(fetchType != ExportFetchType.INCREMENTAL) {
return true;
}
long entityModificationTimestamp = entity.getUpdateTime().getTime();
updateNewestLastModifiedTimestamp(entityModificationTimestamp);
return doesTimestampQualify(entityModificationTimestamp);
}
private void updateNewestLastModifiedTimestamp(long entityModificationTimestamp) {
if(newestLastModifiedTimestamp < entityModificationTimestamp) {
newestLastModifiedTimestamp = entityModificationTimestamp;
}
}
private boolean doesTimestampQualify(long modificationTimestamp) {
return lastModifiedTimestampRequested < modificationTimestamp;
}
public boolean getSkipLineage() {
return skipLineage;
}
public void addToSink(AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException {
updateNewestLastModifiedTimestamp(entityWithExtInfo.getEntity().getUpdateTime().getTime());
sink.add(entityWithExtInfo);
}
}
}
......@@ -27,12 +27,17 @@ import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1;
import org.apache.atlas.repository.store.graph.v1.SoftDeleteHandlerV1;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.testng.SkipException;
import java.io.IOException;
import java.util.Arrays;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createAtlasEntity;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadEntity;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadHiveModel;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
......@@ -40,9 +45,20 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
public class ExportImportTestBase {
protected static final String ENTITIES_SUB_DIR = "stocksDB-Entities";
protected static final String DB_GUID = "1637a33e-6512-447b-ade7-249c8cb5344b";
protected static final String TABLE_GUID = "df122fc3-5555-40f8-a30f-3090b8a622f8";
protected static final String TABLE_TABLE_GUID = "6f3b305a-c459-4ae4-b651-aee0deb0685f";
protected static final String TABLE_VIEW_GUID = "56415119-7cb0-40dd-ace8-1e50efd54991";
protected static final String COLUMN_GUID_HIGH = "f87a5320-1529-4369-8d63-b637ebdf2c1c";
protected DeleteHandlerV1 deleteHandler = mock(SoftDeleteHandlerV1.class);
protected void basicSetup(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) throws IOException, AtlasBaseException {
loadBaseModel(typeDefStore, typeRegistry);
loadHiveModel(typeDefStore, typeRegistry);
}
protected int createEntities(AtlasEntityStoreV2 entityStore, String subDir, String entityFileNames[]) {
for (String fileName : entityFileNames) {
createAtlasEntity(entityStore, loadEntity(subDir, fileName));
......@@ -70,6 +86,7 @@ public class ExportImportTestBase {
}
assertNotNull(result);
assertNotNull(result.getEntities());
assertTrue(result.getEntities().size() > 0);
}
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import com.google.common.collect.ImmutableList;
import org.apache.atlas.RequestContext;
import org.apache.atlas.TestModules;
import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.TestResourceFileUtils;
import org.testng.SkipException;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
import java.io.IOException;
import java.util.Map;
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL_FROM_TIME;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createTypes;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getEntities;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
@Guice(modules = TestModules.TestOnlyModule.class)
public class ExportIncrementalTest extends ExportImportTestBase {
@Inject
AtlasTypeRegistry typeRegistry;
@Inject
private AtlasTypeDefStore typeDefStore;
@Inject
ExportService exportService;
@Inject
ClusterService clusterService;
@Inject
private AtlasEntityStoreV2 entityStore;
private final String EXPORT_REQUEST_INCREMENTAL = "export-incremental";
private long nextTimestamp;
@BeforeClass
public void setup() throws IOException, AtlasBaseException {
basicSetup(typeDefStore, typeRegistry);
createEntities(entityStore, ENTITIES_SUB_DIR, new String[] { "db", "table-columns"});
final String[] entityGuids = {DB_GUID, TABLE_GUID};
verifyCreatedEntities(entityStore, entityGuids, 2);
}
@BeforeMethod
public void setupTest() {
RequestContext.clear();
RequestContext.get().setUser(TestUtilsV2.TEST_USER, null);
}
@Test
public void atT0_ReturnsAllEntities() throws AtlasBaseException {
final int expectedEntityCount = 2;
AtlasExportRequest request = getIncrementalRequest(0);
ZipSource source = runExportWithParameters(exportService, request);
AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount);
int count = 0;
for (Map.Entry<String, AtlasEntity> entry : entities.getReferredEntities().entrySet()) {
assertNotNull(entry.getValue());
count++;
}
nextTimestamp = updateTimesampForNextIncrementalExport(source);
assertEquals(count, expectedEntityCount);
}
private long updateTimesampForNextIncrementalExport(ZipSource source) throws AtlasBaseException {
return source.getExportResult().getLastModifiedTimestamp();
}
@Test(dependsOnMethods = "atT0_ReturnsAllEntities")
public void atT1_NewClassificationAttachedToTable_ReturnsChangedTable() throws AtlasBaseException {
final int expectedEntityCount = 1;
AtlasClassificationType ct = createNewClassification();
entityStore.addClassifications(TABLE_GUID, ImmutableList.of(ct.createDefaultValue()));
AtlasExportRequest request = getIncrementalRequest(nextTimestamp);
ZipSource source = runExportWithParameters(exportService, request);
AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount);
AtlasEntity entity = null;
for (Map.Entry<String, AtlasEntity> entry : entities.getReferredEntities().entrySet()) {
entity = entry.getValue();
assertNotNull(entity);
break;
}
nextTimestamp = updateTimesampForNextIncrementalExport(source);
assertEquals(entity.getGuid(),TABLE_GUID);
}
private AtlasClassificationType createNewClassification() {
createTypes(typeDefStore, ENTITIES_SUB_DIR,"typesDef-new-classification");
return typeRegistry.getClassificationTypeByName("T1");
}
@Test(dependsOnMethods = "atT1_NewClassificationAttachedToTable_ReturnsChangedTable")
public void atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn() throws AtlasBaseException {
final int expectedEntityCount = 1;
AtlasEntity.AtlasEntityWithExtInfo tableEntity = entityStore.getById(TABLE_GUID);
long preExportTableEntityTimestamp = tableEntity.getEntity().getUpdateTime().getTime();
entityStore.addClassifications(COLUMN_GUID_HIGH, ImmutableList.of(typeRegistry.getClassificationTypeByName("T1").createDefaultValue()));
ZipSource source = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp));
AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount);
for (Map.Entry<String, AtlasEntity> entry : entities.getReferredEntities().entrySet()) {
AtlasEntity entity = entry.getValue();
assertNotNull(entity.getGuid());
break;
}
long postUpdateTableEntityTimestamp = tableEntity.getEntity().getUpdateTime().getTime();
assertEquals(preExportTableEntityTimestamp, postUpdateTableEntityTimestamp);
nextTimestamp = updateTimesampForNextIncrementalExport(source);
}
@Test(dependsOnMethods = "atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn")
public void exportingWithSameParameters_Succeeds() {
ZipSource source = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp));
assertNotNull(source);
}
private AtlasExportRequest getIncrementalRequest(long timestamp) {
try {
AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, EXPORT_REQUEST_INCREMENTAL, AtlasExportRequest.class);
request.getOptions().put(FETCH_TYPE_INCREMENTAL_FROM_TIME, timestamp);
return request;
} catch (IOException e) {
throw new SkipException(String.format("getIncrementalRequest: '%s' could not be laoded.", EXPORT_REQUEST_INCREMENTAL));
}
}
}
......@@ -53,12 +53,6 @@ import static org.testng.AssertJUnit.fail;
@Guice(modules = TestModules.TestOnlyModule.class)
public class ExportSkipLineageTest extends ExportImportTestBase {
private final String ENTITIES_SUB_DIR = "stocksDB-Entities";
private final String DB_GUID = "1637a33e-6512-447b-ade7-249c8cb5344b";
private final String TABLE_GUID = "df122fc3-5555-40f8-a30f-3090b8a622f8";
private final String TABLE_TABLE_GUID = "6f3b305a-c459-4ae4-b651-aee0deb0685f";
private final String TABLE_VIEW_GUID = "56415119-7cb0-40dd-ace8-1e50efd54991";
@Inject
AtlasTypeRegistry typeRegistry;
......
......@@ -257,7 +257,7 @@ public class ZipFileResourceTestUtils {
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = new AtlasEntity.AtlasEntityWithExtInfo();
try {
int count = 0;
for(String s : source.getCreationOrder()) {
for (String s : source.getCreationOrder()) {
AtlasEntity entity = source.getByGuid(s);
entityWithExtInfo.addReferredEntity(s, entity);
count++;
......
{
"itemsToExport": [
{
"typeName": "hive_db", "uniqueAttributes": { "qualifiedName": "stocks_base@cl1" }
}
],
"options": {
"fetchType": "incremental",
"fromTime": 0
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment