From 1c781deb40c23c79c2cf201b70b79a34b0b2acbe Mon Sep 17 00:00:00 2001 From: nikhilbonte <nikhil.bonte@freestoneinfotech.com> Date: Thu, 29 Aug 2019 12:47:20 +0530 Subject: [PATCH] ATLAS-3416 Import API: delete non-exported hive_table for table level replication Signed-off-by: nixonrodrigues <nixon@apache.org> --- intg/src/main/java/org/apache/atlas/model/impexp/ExportImportAuditEntry.java | 1 + repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java | 22 ++++++++++++++++++++++ repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java | 40 ++++++++++++++++++++++++++++++++++++---- repository/src/main/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessor.java | 181 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java | 83 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------- repository/src/test/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessorTest.java | 146 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 461 insertions(+), 12 deletions(-) create mode 100644 repository/src/main/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessor.java create mode 100644 repository/src/test/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessorTest.java diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/ExportImportAuditEntry.java b/intg/src/main/java/org/apache/atlas/model/impexp/ExportImportAuditEntry.java index a199c6e..90f4296 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/ExportImportAuditEntry.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/ExportImportAuditEntry.java @@ -35,6 +35,7 @@ public class ExportImportAuditEntry extends AtlasBaseModelObject implements Seri private static final long serialVersionUID = 1L; public static final String OPERATION_EXPORT = "EXPORT"; public static final String OPERATION_IMPORT = "IMPORT"; + public static final String OPERATION_IMPORT_DELETE_REPL = "IMPORT_DELETE_REPL"; private String userName; private String operation; diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java index f2d36ed..55990f7 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java @@ -45,6 +45,7 @@ import org.springframework.util.CollectionUtils; import javax.inject.Inject; import java.util.Collections; import java.util.List; +import java.util.Set; @Component public class AuditsWriter { @@ -68,6 +69,10 @@ public class AuditsWriter { this.auditService = auditService; } + public AtlasServerService getAtlasServerService() { + return atlasServerService; + } + public void write(String userName, AtlasExportResult result, long startTime, long endTime, List<String> entityCreationOrder) throws AtlasBaseException { @@ -80,6 +85,12 @@ public class AuditsWriter { auditForImport.add(userName, result, startTime, endTime, entityCreationOrder); } + public void write(String userName, String sourceCluster, + long startTime, long endTime, + Set<String> entityCreationOrder) throws AtlasBaseException { + auditForImport.add(userName, sourceCluster, startTime, endTime, entityCreationOrder); + } + private void updateReplicationAttribute(boolean isReplicationSet, String serverName, String serverFullName, List<String> exportedGuids, @@ -238,5 +249,16 @@ public class AuditsWriter { updateReplicationAttribute(replicationOptionState, sourceServerName, sourceServerFullName, entityGuids, Constants.ATTR_NAME_REPLICATED_FROM, result.getExportResult().getChangeMarker()); } + + public void add(String userName, String sourceCluster, long startTime, + long endTime, Set<String> entityGuids) throws AtlasBaseException { + + sourceServerName = getServerNameFromFullName(sourceCluster); + auditService.add(userName, + sourceServerName, getCurrentClusterName(), + ExportImportAuditEntry.OPERATION_IMPORT_DELETE_REPL, + AtlasType.toJson(entityGuids), startTime, endTime, !entityGuids.isEmpty()); + + } } } diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java index df49ae1..27001e3 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java @@ -24,8 +24,10 @@ import org.apache.atlas.RequestContext; import org.apache.atlas.entitytransform.BaseEntityHandler; import org.apache.atlas.entitytransform.TransformerContext; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.repository.store.graph.BulkImporter; import org.apache.atlas.repository.store.graph.v2.EntityImportStream; @@ -54,24 +56,28 @@ import static org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMS_KEY; public class ImportService { private static final Logger LOG = LoggerFactory.getLogger(ImportService.class); + private static final String ATLAS_TYPE_HIVE_TABLE = "hive_table"; private final AtlasTypeDefStore typeDefStore; private final AtlasTypeRegistry typeRegistry; private final BulkImporter bulkImporter; private final AuditsWriter auditsWriter; private final ImportTransformsShaper importTransformsShaper; + private TableReplicationRequestProcessor tableReplicationRequestProcessor; + private long startTimestamp; private long endTimestamp; @Inject public ImportService(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry, BulkImporter bulkImporter, - AuditsWriter auditsWriter, - ImportTransformsShaper importTransformsShaper) { + AuditsWriter auditsWriter, ImportTransformsShaper importTransformsShaper, + TableReplicationRequestProcessor tableReplicationRequestProcessor) { this.typeDefStore = typeDefStore; this.typeRegistry = typeRegistry; this.bulkImporter = bulkImporter; this.auditsWriter = auditsWriter; this.importTransformsShaper = importTransformsShaper; + this.tableReplicationRequestProcessor = tableReplicationRequestProcessor; } public AtlasImportResult run(InputStream inputStream, String userName, @@ -109,7 +115,11 @@ public class ImportService { startTimestamp = System.currentTimeMillis(); processTypes(source.getTypesDef(), result); setStartPosition(request, source); + processEntities(userName, source, result); + + processReplicationDeletion(source.getExportResult().getRequest(), request); + } catch (AtlasBaseException excp) { LOG.error("import(user={}, from={}): failed", userName, requestingIP, excp); @@ -228,6 +238,12 @@ public class ImportService { auditsWriter.write(userName, result, startTimestamp, endTimestamp, importSource.getCreationOrder()); } + private void processReplicationDeletion(AtlasExportRequest exportRequest, AtlasImportRequest importRequest) throws AtlasBaseException { + if (checkHiveTableIncrementalSkipLineage(importRequest, exportRequest)) { + tableReplicationRequestProcessor.process(exportRequest, importRequest); + } + } + private int getDuration(long endTime, long startTime) { return (int) (endTime - startTime); } @@ -239,9 +255,25 @@ public class ImportService { } return new ZipSourceWithBackingDirectory(inputStream, configuredTemporaryDirectory); - } - catch (IOException ex) { + } catch (IOException ex) { throw new AtlasBaseException(ex); } } + + @VisibleForTesting + boolean checkHiveTableIncrementalSkipLineage(AtlasImportRequest importRequest, AtlasExportRequest exportRequest) { + if (CollectionUtils.isEmpty(exportRequest.getItemsToExport())) { + return false; + } + + for (AtlasObjectId itemToExport : exportRequest.getItemsToExport()) { + if (!itemToExport.getTypeName().equalsIgnoreCase(ATLAS_TYPE_HIVE_TABLE)){ + return false; + } + } + + return importRequest.isReplicationOptionSet() && exportRequest.isReplicationOptionSet() && + exportRequest.getFetchTypeOptionValue().equalsIgnoreCase(AtlasExportRequest.FETCH_TYPE_INCREMENTAL) && + exportRequest.getSkipLineageOptionValue(); + } } diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessor.java new file mode 100644 index 0000000..d5807a5 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessor.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.impexp; + +import org.apache.atlas.authorize.AtlasAuthorizationUtils; +import org.apache.atlas.discovery.AtlasDiscoveryService; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.discovery.AtlasSearchResult; +import org.apache.atlas.model.discovery.SearchParameters; +import org.apache.atlas.model.impexp.AtlasExportRequest; +import org.apache.atlas.model.impexp.AtlasImportRequest; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.Map; +import java.util.ArrayList; +import java.util.HashSet; + +@Component +public class TableReplicationRequestProcessor { + private static final Logger LOG = LoggerFactory.getLogger(TableReplicationRequestProcessor.class); + + private static final String QUERY_DB_NAME_EQUALS= "where db.name='%s'"; + private static final String ATTR_NAME_KEY = "name"; + private static final String TYPE_HIVE_TABLE = "hive_table"; + private static final String ATTR_QUALIFIED_NAME_KEY = "qualifiedName"; + private static final String REPLICATED_TAG_NAME = "%s_replicated"; + + private long startTstamp; + private long endTstamp; + private AuditsWriter auditsWriter; + private AtlasEntityStore entityStore; + private AtlasTypeRegistry typeRegistry; + private AtlasDiscoveryService discoveryService; + + @Inject + public TableReplicationRequestProcessor(AuditsWriter auditsWriter, AtlasEntityStore entityStore, + AtlasDiscoveryService atlasDiscoveryService, AtlasTypeRegistry typeRegistry) { + this.auditsWriter = auditsWriter; + this.entityStore = entityStore; + this.typeRegistry = typeRegistry; + this.discoveryService = atlasDiscoveryService; + } + + public void process(AtlasExportRequest exportRequest, AtlasImportRequest importRequest) throws AtlasBaseException { + startTstamp = System.currentTimeMillis(); + LOG.info("process: deleting entities with type hive_table which are not imported."); + String sourceCluster = importRequest.getOptionKeyReplicatedFrom(); + + List<String> qualifiedNames = getQualifiedNamesFromRequest(exportRequest); + + List<String> safeGUIDs = getEntitiesFromQualifiedNames(qualifiedNames); + + String dbName = getDbName(safeGUIDs.get(0)); + + Set<String> guidsToDelete = getGuidsToDelete(dbName, safeGUIDs, sourceCluster); + + deleteTables(sourceCluster, guidsToDelete); + } + + private List<String> getQualifiedNamesFromRequest(AtlasExportRequest exportRequest){ + List<String> qualifiedNames = new ArrayList<>(); + + for (AtlasObjectId objectId : exportRequest.getItemsToExport()) { + qualifiedNames.add(objectId.getUniqueAttributes().get(ATTR_QUALIFIED_NAME_KEY).toString()); + } + return qualifiedNames; + } + + private List<String> getEntitiesFromQualifiedNames(List<String> qualifiedNames) throws AtlasBaseException { + + List<String> safeGUIDs = new ArrayList<>(); + for(String qualifiedName : qualifiedNames) { + String guid = getGuidByUniqueAttributes(Collections.singletonMap(ATTR_QUALIFIED_NAME_KEY, qualifiedName)); + safeGUIDs.add(guid); + } + return safeGUIDs; + } + + private String getGuidByUniqueAttributes(Map<String, Object> uniqueAttributes) throws AtlasBaseException { + return entityStore.getGuidByUniqueAttributes(typeRegistry.getEntityTypeByName(TYPE_HIVE_TABLE), uniqueAttributes); + } + + private String getDbName(String tableGuid) throws AtlasBaseException { + String dbGuid = AuditsWriter.ReplKeyGuidFinder.get(typeRegistry, entityStore, tableGuid); + return (String) entityStore.getById(dbGuid).getEntity().getAttribute(ATTR_NAME_KEY); + } + + private Set<String> getGuidsToDelete(String dbName, List<String> excludeGUIDs, String sourceCluster) throws AtlasBaseException { + + SearchParameters parameters = getSearchParameters(dbName, sourceCluster); + Set<String> unsafeGUIDs = new HashSet<>(); + + final int max = 10000; + int fetchedSize = 0; + int i = 0; + parameters.setLimit(max); + + while (fetchedSize == (max * i)) { + if (LOG.isDebugEnabled()) { + LOG.debug("i={}, fetchedSize={}, unsafeGUIDs.size()={}", i, fetchedSize, unsafeGUIDs.size()); + } + + int offset = max * i; + parameters.setOffset(offset); + + AtlasSearchResult searchResult = discoveryService.searchWithParameters(parameters); + + if (CollectionUtils.isEmpty(searchResult.getEntities())) { + break; + } + + for (AtlasEntityHeader entityHeader : searchResult.getEntities()) { + String guid = entityHeader.getGuid(); + if (!excludeGUIDs.contains(guid)) { + unsafeGUIDs.add(guid); + } + } + fetchedSize = searchResult.getEntities().size(); + i++; + } + return unsafeGUIDs; + } + + private SearchParameters getSearchParameters(String dbName, String sourceCluster) { + String query = String.format(QUERY_DB_NAME_EQUALS, dbName); + + SearchParameters parameters = new SearchParameters(); + parameters.setExcludeDeletedEntities(false); + parameters.setTypeName(TYPE_HIVE_TABLE); + parameters.setExcludeDeletedEntities(true); + + parameters.setClassification(String.format(REPLICATED_TAG_NAME, sourceCluster)); + parameters.setAttributes(new HashSet<String>(){{ add(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM); }}); + parameters.setQuery(query); + + return parameters; + } + + private void deleteTables(String sourceCluster, Set<String> guidsToDelete) throws AtlasBaseException { + if (!CollectionUtils.isEmpty(guidsToDelete)) { + entityStore.deleteByIds(new ArrayList<>(guidsToDelete)); + + endTstamp = System.currentTimeMillis(); + createAuditEntry(sourceCluster, guidsToDelete); + } + } + + private void createAuditEntry(String sourceCluster, Set<String> guidsToDelete) throws AtlasBaseException { + auditsWriter.write(AtlasAuthorizationUtils.getCurrentUserName(), sourceCluster, startTstamp, endTstamp, guidsToDelete); + + if (LOG.isDebugEnabled()) { + LOG.debug("Deleted entities => {}", guidsToDelete); + } + } +} diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java index 1bfe62b..95f6ec3 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java @@ -24,9 +24,11 @@ import org.apache.atlas.TestModules; import org.apache.atlas.TestUtilsV2; import org.apache.atlas.discovery.EntityDiscoveryService; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasRelatedObjectId; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.repository.Constants; @@ -40,10 +42,7 @@ import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.type.AtlasClassificationType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.commons.lang.StringUtils; -import org.apache.tinkerpop.shaded.kryo.io.Input; import org.mockito.stubbing.Answer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.testng.ITestContext; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterTest; @@ -54,6 +53,7 @@ import org.testng.annotations.Test; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -61,11 +61,17 @@ import java.util.Map; import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getDefaultImportRequest; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource; +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getInputStreamFrom; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromJson; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromResourcesJson; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runAndVerifyQuickStart_v1_Import; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithParameters; +import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_FETCH_TYPE; +import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_KEY_REPLICATED_TO; +import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_SKIP_LINEAGE; +import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_FULL; +import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; @@ -392,7 +398,7 @@ public class ImportServiceTest extends ExportImportTestBase { @Test public void importServiceProcessesIOException() { - ImportService importService = new ImportService(typeDefStore, typeRegistry, null, null,null); + ImportService importService = new ImportService(typeDefStore, typeRegistry, null,null, null,null); AtlasImportRequest req = mock(AtlasImportRequest.class); Answer<Map> answer = invocationOnMock -> { @@ -447,8 +453,8 @@ public class ImportServiceTest extends ExportImportTestBase { @Test(dataProvider = "salesNewTypeAttrs-next") public void transformUpdatesForSubTypesAddsToExistingTransforms(InputStream inputStream) throws IOException, AtlasBaseException { - loadBaseModel(); - loadHiveModel(); + loadBaseModel(); + loadHiveModel(); String transformJSON = "{ \"Asset\": { \"qualifiedName\":[ \"replace:@cl1:@cl2\" ] }, \"hive_table\": { \"qualifiedName\":[ \"lowercase\" ] } }"; ZipSource zipSource = new ZipSource(inputStream); @@ -461,9 +467,70 @@ public class ImportServiceTest extends ExportImportTestBase { assertEquals(importTransforms.getTransforms().get("hive_table").get("qualifiedName").size(), 2); } - @Test(expectedExceptions = AtlasBaseException.class) public void importEmptyZip() throws IOException, AtlasBaseException { - new ZipSource((InputStream) getZipSource("empty.zip")[0][0]); + new ZipSource(getInputStreamFrom("empty.zip")); + } + + @Test + public void testCheckHiveTableIncrementalSkipLineage() { + AtlasImportRequest importRequest; + AtlasExportRequest exportRequest; + + importRequest = getImportRequest("cl1"); + exportRequest = getExportRequest(FETCH_TYPE_INCREMENTAL, "cl2", true, getItemsToExport("hive_table", "hive_table")); + assertTrue(importService.checkHiveTableIncrementalSkipLineage(importRequest, exportRequest)); + + exportRequest = getExportRequest(FETCH_TYPE_INCREMENTAL, "cl2", true, getItemsToExport("hive_table", "hive_db", "hive_table")); + assertFalse(importService.checkHiveTableIncrementalSkipLineage(importRequest, exportRequest)); + + exportRequest = getExportRequest(FETCH_TYPE_FULL, "cl2", true, getItemsToExport("hive_table", "hive_table")); + assertFalse(importService.checkHiveTableIncrementalSkipLineage(importRequest, exportRequest)); + + exportRequest = getExportRequest(FETCH_TYPE_FULL, "", true, getItemsToExport("hive_table", "hive_table")); + assertFalse(importService.checkHiveTableIncrementalSkipLineage(importRequest, exportRequest)); + + importRequest = getImportRequest(""); + exportRequest = getExportRequest(FETCH_TYPE_INCREMENTAL, "cl2", true, getItemsToExport("hive_table", "hive_table")); + assertFalse(importService.checkHiveTableIncrementalSkipLineage(importRequest, exportRequest)); + } + + private AtlasImportRequest getImportRequest(String replicatedFrom){ + AtlasImportRequest importRequest = getDefaultImportRequest(); + + if (!StringUtils.isEmpty(replicatedFrom)) { + importRequest.setOption(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM, replicatedFrom); + } + return importRequest; + } + + private AtlasExportRequest getExportRequest(String fetchType, String replicatedTo, boolean skipLineage, List<AtlasObjectId> itemsToExport){ + AtlasExportRequest request = new AtlasExportRequest(); + + request.setOptions(getOptionsMap(fetchType, replicatedTo, skipLineage)); + request.setItemsToExport(itemsToExport); + return request; + } + + private List<AtlasObjectId> getItemsToExport(String... typeNames){ + List<AtlasObjectId> itemsToExport = new ArrayList<>(); + for (String typeName : typeNames) { + itemsToExport.add(new AtlasObjectId(typeName, "qualifiedName", "db.table@cluster")); + } + return itemsToExport; + } + + private Map<String, Object> getOptionsMap(String fetchType, String replicatedTo, boolean skipLineage){ + Map<String, Object> options = new HashMap<>(); + + if (!StringUtils.isEmpty(fetchType)) { + options.put(OPTION_FETCH_TYPE, fetchType); + } + if (!StringUtils.isEmpty(replicatedTo)) { + options.put(OPTION_KEY_REPLICATED_TO, replicatedTo); + } + options.put(OPTION_SKIP_LINEAGE, skipLineage); + + return options; } } diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessorTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessorTest.java new file mode 100644 index 0000000..c9bb11c --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessorTest.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.impexp; + +import com.google.inject.Inject; +import org.apache.atlas.RequestContext; +import org.apache.atlas.TestModules; +import org.apache.atlas.TestUtilsV2; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasImportRequest; +import org.apache.atlas.model.impexp.ExportImportAuditEntry; +import org.apache.atlas.repository.graph.AtlasGraphProvider; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.runner.LocalSolrRunner; +import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.ITestContext; +import org.testng.SkipException; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.DataProvider; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr; +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getDefaultImportRequest; +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource; +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getInputStreamFrom; +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithParameters; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; + + +@Guice(modules = TestModules.TestOnlyModule.class) +public class TableReplicationRequestProcessorTest extends ExportImportTestBase { + private static final Logger LOG = LoggerFactory.getLogger(TableReplicationRequestProcessorTest.class); + + private static final String ENTITY_GUID_REPLICATED = "718a6d12-35a8-4731-aff8-3a64637a43a3"; + private static final String ENTITY_GUID_NOT_REPLICATED_1 = "e19e5683-d9ae-436a-af1e-0873582d0f1e"; + private static final String ENTITY_GUID_NOT_REPLICATED_2 = "2e28ae34-576e-4a8b-be48-cf5f925d7b15"; + private static final String REPL_FROM = "cl1"; + private static final String REPL_TRANSFORMER = "[{\"conditions\":{\"__entity\":\"topLevel: \"}," + + "\"action\":{\"__entity\":\"ADD_CLASSIFICATION: cl1_replicated\"}}," + + "{\"action\":{\"__entity.replicatedTo\":\"CLEAR:\",\"__entity.replicatedFrom\":\"CLEAR:\"}}," + + "{\"conditions\":{\"hive_db.clusterName\":\"EQUALS: cl1\"},\"action\":{\"hive_db.clusterName\":\"SET: cl2\"}}," + + "{\"conditions\":{\"hive_db.location\":\"STARTS_WITH_IGNORE_CASE: file:///\"}," + + "\"action\":{\"hive_db.location\":\"REPLACE_PREFIX: = :file:///=file:///\"}}," + + "{\"conditions\":{\"hive_storagedesc.location\":\"STARTS_WITH_IGNORE_CASE: file:///\"}," + + "\"action\":{\"hive_storagedesc.location\":\"REPLACE_PREFIX: = :file:///=file:///\"}}]"; + + @Inject + private ImportService importService; + + @Inject + private AtlasTypeRegistry typeRegistry; + + @Inject + private AtlasEntityStore entityStore; + + @Inject + private ExportImportAuditService auditService; + + @Inject + private AtlasTypeDefStore typeDefStore; + + @BeforeTest + public void setupTest() throws IOException, AtlasBaseException { + RequestContext.clear(); + RequestContext.get().setUser(TestUtilsV2.TEST_USER, null); + basicSetup(typeDefStore, typeRegistry); + } + + @AfterClass + public void clear() throws Exception { + AtlasGraphProvider.cleanup(); + + if (useLocalSolr()) { + LocalSolrRunner.stop(); + } + } + + @DataProvider(name = "source1") + public static Object[][] getData1(ITestContext context) throws IOException, AtlasBaseException { + return getZipSource("repl_exp_1.zip"); + } + + public static InputStream getData2() { + return getInputStreamFrom("repl_exp_2.zip"); + } + + @Test(dataProvider = "source1") + public void importWithIsReplTrue(InputStream zipSource) throws AtlasBaseException, IOException { + AtlasImportRequest atlasImportRequest = getDefaultImportRequest(); + + atlasImportRequest.setOption("replicatedFrom", REPL_FROM); + atlasImportRequest.setOption("transformers", REPL_TRANSFORMER); + + runImportWithParameters(importService, atlasImportRequest, zipSource); + + runImportWithParameters(importService, atlasImportRequest, getData2()); + + assertAuditEntry(); + } + + private void assertAuditEntry() { + pauseForIndexCreation(); + List<ExportImportAuditEntry> result; + try { + result = auditService.get("", "IMPORT_DELETE_REPL", "", "", "", 10, 0); + } catch (Exception e) { + throw new SkipException("audit entries not retrieved."); + } + + assertNotNull(result); + assertTrue(result.size() > 0); + + List<String> deletedGuids = AtlasType.fromJson(result.get(0).getResultSummary(), List.class); + assertNotNull(deletedGuids); + assertFalse(deletedGuids.contains(ENTITY_GUID_REPLICATED)); + assertTrue(deletedGuids.contains(ENTITY_GUID_NOT_REPLICATED_1)); + assertTrue(deletedGuids.contains(ENTITY_GUID_NOT_REPLICATED_2)); + } +} -- libgit2 0.27.1