Commit 8ada5d40 by nikhilbonte Committed by nixonrodrigues

ATLAS-3324 Incremental export with hive_table for table-level replication.

parent d52369ef
...@@ -55,6 +55,9 @@ public class EntitiesExtractor { ...@@ -55,6 +55,9 @@ public class EntitiesExtractor {
if (context.isHiveDBIncrementalSkipLineage()) { if (context.isHiveDBIncrementalSkipLineage()) {
extractors.get(INCREMENTAL_EXTRACT).fullFetch(entity, context); extractors.get(INCREMENTAL_EXTRACT).fullFetch(entity, context);
break; break;
} else if (context.isHiveTableIncrementalSkipLineage()) {
extractors.get(INCREMENTAL_EXTRACT).connectedFetch(entity, context);
break;
} }
case FULL: case FULL:
......
...@@ -36,6 +36,7 @@ import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; ...@@ -36,6 +36,7 @@ import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.util.UniqueList; import org.apache.atlas.repository.util.UniqueList;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.AtlasGremlinQueryProvider; import org.apache.atlas.util.AtlasGremlinQueryProvider;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -332,6 +333,7 @@ public class ExportService { ...@@ -332,6 +333,7 @@ public class ExportService {
static class ExportContext { static class ExportContext {
private static final int REPORTING_THREASHOLD = 1000; private static final int REPORTING_THREASHOLD = 1000;
private static final String ATLAS_TYPE_HIVE_DB = "hive_db"; private static final String ATLAS_TYPE_HIVE_DB = "hive_db";
private static final String ATLAS_TYPE_HIVE_TABLE = "hive_table";
final UniqueList<String> entityCreationOrder = new UniqueList<>(); final UniqueList<String> entityCreationOrder = new UniqueList<>();
...@@ -353,6 +355,7 @@ public class ExportService { ...@@ -353,6 +355,7 @@ public class ExportService {
final long changeMarker; final long changeMarker;
boolean isSkipConnectedFetch; boolean isSkipConnectedFetch;
private final boolean isHiveDBIncremental; private final boolean isHiveDBIncremental;
private final boolean isHiveTableIncremental;
private int progressReportCount = 0; private int progressReportCount = 0;
...@@ -364,11 +367,12 @@ public class ExportService { ...@@ -364,11 +367,12 @@ public class ExportService {
skipLineage = result.getRequest().getSkipLineageOptionValue(); skipLineage = result.getRequest().getSkipLineageOptionValue();
this.changeMarker = result.getRequest().getChangeTokenFromOptions(); this.changeMarker = result.getRequest().getChangeTokenFromOptions();
this.isHiveDBIncremental = checkHiveDBIncrementalSkipLineage(result.getRequest()); this.isHiveDBIncremental = checkHiveDBIncrementalSkipLineage(result.getRequest());
this.isHiveTableIncremental = checkHiveTableIncrementalSkipLineage(result.getRequest());
this.isSkipConnectedFetch = false; this.isSkipConnectedFetch = false;
} }
private boolean checkHiveDBIncrementalSkipLineage(AtlasExportRequest request) { private boolean checkHiveDBIncrementalSkipLineage(AtlasExportRequest request) {
if(request.getItemsToExport().size() == 0) { if(CollectionUtils.isEmpty(request.getItemsToExport())) {
return false; return false;
} }
...@@ -377,6 +381,16 @@ public class ExportService { ...@@ -377,6 +381,16 @@ public class ExportService {
request.getSkipLineageOptionValue(); request.getSkipLineageOptionValue();
} }
private boolean checkHiveTableIncrementalSkipLineage(AtlasExportRequest request) {
if(CollectionUtils.isEmpty(request.getItemsToExport())) {
return false;
}
return request.getItemsToExport().get(0).getTypeName().equalsIgnoreCase(ATLAS_TYPE_HIVE_TABLE) &&
request.getFetchTypeOptionValue().equalsIgnoreCase(AtlasExportRequest.FETCH_TYPE_INCREMENTAL) &&
request.getSkipLineageOptionValue();
}
public List<AtlasEntity> getEntitiesWithModifiedTimestamp(AtlasEntityWithExtInfo entityWithExtInfo) { public List<AtlasEntity> getEntitiesWithModifiedTimestamp(AtlasEntityWithExtInfo entityWithExtInfo) {
if(fetchType != ExportFetchType.INCREMENTAL) { if(fetchType != ExportFetchType.INCREMENTAL) {
return new ArrayList<>(); return new ArrayList<>();
...@@ -442,6 +456,10 @@ public class ExportService { ...@@ -442,6 +456,10 @@ public class ExportService {
return isHiveDBIncremental; return isHiveDBIncremental;
} }
public boolean isHiveTableIncrementalSkipLineage() {
return isHiveTableIncremental;
}
public void addToEntityCreationOrder(String guid) { public void addToEntityCreationOrder(String guid) {
entityCreationOrder.add(guid); entityCreationOrder.add(guid);
} }
......
...@@ -48,6 +48,10 @@ public class IncrementalExportEntityProvider implements ExtractStrategy { ...@@ -48,6 +48,10 @@ public class IncrementalExportEntityProvider implements ExtractStrategy {
private static final String TRANSFORM_CLAUSE = ".project('__guid').by('__guid').dedup().toList()"; private static final String TRANSFORM_CLAUSE = ".project('__guid').by('__guid').dedup().toList()";
private static final String TIMESTAMP_CLAUSE = ".has('__modificationTimestamp', gt(modificationTimestamp))"; private static final String TIMESTAMP_CLAUSE = ".has('__modificationTimestamp', gt(modificationTimestamp))";
private static final String QUERY_TABLE_DB = QUERY_DB + ".out('__hive_table.db')";
private static final String QUERY_TABLE_SD = QUERY_DB + ".out('__hive_table.sd')";
private static final String QUERY_TABLE_COLUMNS = QUERY_DB + ".out('__hive_table.columns')";
private ScriptEngine scriptEngine; private ScriptEngine scriptEngine;
@Inject @Inject
...@@ -67,7 +71,10 @@ public class IncrementalExportEntityProvider implements ExtractStrategy { ...@@ -67,7 +71,10 @@ public class IncrementalExportEntityProvider implements ExtractStrategy {
@Override @Override
public void connectedFetch(AtlasEntity entity, ExportService.ExportContext context) { public void connectedFetch(AtlasEntity entity, ExportService.ExportContext context) {
//starting entity is hive_table
context.guidsToProcess.addAll(fetchGuids(entity.getGuid(), QUERY_TABLE_DB, context.changeMarker));
context.guidsToProcess.addAll(fetchGuids(entity.getGuid(), QUERY_TABLE_SD, context.changeMarker));
context.guidsToProcess.addAll(fetchGuids(entity.getGuid(), QUERY_TABLE_COLUMNS, context.changeMarker));
} }
public void populate(String dbEntityGuid, long timeStamp, UniqueList<String> guidsToProcess) { public void populate(String dbEntityGuid, long timeStamp, UniqueList<String> guidsToProcess) {
......
...@@ -26,29 +26,37 @@ import org.apache.atlas.TestUtilsV2; ...@@ -26,29 +26,37 @@ import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2; import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
import org.apache.atlas.repository.util.UniqueList; import org.apache.atlas.repository.util.UniqueList;
import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasClassificationType; import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.TestResourceFileUtils; import org.apache.atlas.utils.TestResourceFileUtils;
import org.testng.ITestContext;
import org.testng.SkipException; import org.testng.SkipException;
import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice; import org.testng.annotations.Guice;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import org.testng.annotations.DataProvider;
import javax.inject.Inject; import javax.inject.Inject;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER; import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createTypes; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createTypes;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getEntities; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getEntities;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@Guice(modules = TestModules.TestOnlyModule.class) @Guice(modules = TestModules.TestOnlyModule.class)
public class ExportIncrementalTest extends ExportImportTestBase { public class ExportIncrementalTest extends ExportImportTestBase {
...@@ -62,6 +70,9 @@ public class ExportIncrementalTest extends ExportImportTestBase { ...@@ -62,6 +70,9 @@ public class ExportIncrementalTest extends ExportImportTestBase {
ExportService exportService; ExportService exportService;
@Inject @Inject
private ImportService importService;
@Inject
private AtlasEntityStoreV2 entityStore; private AtlasEntityStoreV2 entityStore;
private final String EXPORT_REQUEST_INCREMENTAL = "export-incremental"; private final String EXPORT_REQUEST_INCREMENTAL = "export-incremental";
...@@ -69,6 +80,15 @@ public class ExportIncrementalTest extends ExportImportTestBase { ...@@ -69,6 +80,15 @@ public class ExportIncrementalTest extends ExportImportTestBase {
private AtlasClassificationType classificationTypeT1; private AtlasClassificationType classificationTypeT1;
private long nextTimestamp; private long nextTimestamp;
private static final String EXPORT_INCREMENTAL = "incremental";
private static final String QUALIFIED_NAME_DB = "db_test_1@02052019";
private static final String QUALIFIED_NAME_TABLE_LINEAGE = "db_test_1.test_tbl_ctas_2@02052019";
private static final String GUID_DB = "f0b72ab4-7452-4e42-ac74-2aee7728cce4";
private static final String GUID_TABLE_2 = "8d0b834c-61ce-42d8-8f66-6fa51c36bccb";
private static final String GUID_TABLE_CTAS_2 = "eaec545b-3ac7-4e1b-a497-bd4a2b6434a2";
@BeforeClass @BeforeClass
public void setup() throws IOException, AtlasBaseException { public void setup() throws IOException, AtlasBaseException {
basicSetup(typeDefStore, typeRegistry); basicSetup(typeDefStore, typeRegistry);
...@@ -174,6 +194,36 @@ public class ExportIncrementalTest extends ExportImportTestBase { ...@@ -174,6 +194,36 @@ public class ExportIncrementalTest extends ExportImportTestBase {
assertEquals(creationOrder.size(), zipCreationOrder.size()); assertEquals(creationOrder.size(), zipCreationOrder.size());
} }
@DataProvider(name = "hiveDb")
public static Object[][] getData(ITestContext context) throws IOException, AtlasBaseException {
return getZipSource("hive_db_lineage.zip");
}
@Test(dataProvider = "hiveDb")
public void importHiveDb(ZipSource zipSource) throws AtlasBaseException, IOException {
runImportWithNoParameters(importService, zipSource);
}
@Test(dependsOnMethods = "importHiveDb")
public void exportTableInrementalConnected() throws AtlasBaseException {
ZipSource source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, 0, true));
verifyExpectedEntities(getFileNames(source), GUID_DB, GUID_TABLE_CTAS_2);
nextTimestamp = updateTimesampForNextIncrementalExport(source);
try {
source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, nextTimestamp, true));
}catch (SkipException e){
}
entityStore.addClassifications(GUID_TABLE_CTAS_2, ImmutableList.of(classificationTypeT1.createDefaultValue()));
source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, nextTimestamp, true));
verifyExpectedEntities(getFileNames(source), GUID_TABLE_CTAS_2);
}
private AtlasExportRequest getIncrementalRequest(long timestamp) { private AtlasExportRequest getIncrementalRequest(long timestamp) {
try { try {
AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, EXPORT_REQUEST_INCREMENTAL, AtlasExportRequest.class); AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, EXPORT_REQUEST_INCREMENTAL, AtlasExportRequest.class);
...@@ -193,4 +243,42 @@ public class ExportIncrementalTest extends ExportImportTestBase { ...@@ -193,4 +243,42 @@ public class ExportIncrementalTest extends ExportImportTestBase {
} }
} }
private AtlasExportRequest getExportRequestForHiveTable(String name, String fetchType, long changeMarker, boolean skipLineage) {
AtlasExportRequest request = new AtlasExportRequest();
List<AtlasObjectId> itemsToExport = new ArrayList<>();
itemsToExport.add(new AtlasObjectId("hive_table", "qualifiedName", name));
request.setItemsToExport(itemsToExport);
request.setOptions(getOptionsMap(fetchType, changeMarker, skipLineage));
return request;
}
private Map<String, Object> getOptionsMap(String fetchType, long changeMarker, boolean skipLineage){
Map<String, Object> optionsMap = new HashMap<>();
optionsMap.put("fetchType", fetchType.isEmpty() ? "full" : fetchType );
optionsMap.put( "changeMarker", changeMarker);
optionsMap.put("skipLineage", skipLineage);
return optionsMap;
}
private void verifyExpectedEntities(List<String> fileNames, String... guids){
assertEquals(fileNames.size(), guids.length);
for (String guid : guids) {
assertTrue(fileNames.contains(guid.toLowerCase()));
}
}
private List<String> getFileNames(ZipSource zipSource){
List<String> ret = new ArrayList<>();
assertTrue(zipSource.hasNext());
while (zipSource.hasNext()){
AtlasEntity atlasEntity = zipSource.next();
assertNotNull(atlasEntity);
ret.add(atlasEntity.getGuid());
}
return ret;
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment