Commit 4ff57281 by Ashutosh Mestry

ATLAS-2811: Skip Lineage Export option.

parent 831ad014
...@@ -48,6 +48,7 @@ public class AtlasExportRequest implements Serializable { ...@@ -48,6 +48,7 @@ public class AtlasExportRequest implements Serializable {
public static final String OPTION_FETCH_TYPE = "fetchType"; public static final String OPTION_FETCH_TYPE = "fetchType";
public static final String OPTION_ATTR_MATCH_TYPE = "matchType"; public static final String OPTION_ATTR_MATCH_TYPE = "matchType";
public static final String OPTION_SKIP_LINEAGE = "skipLineage";
public static final String OPTION_KEY_REPLICATED_TO = "replicatedTo"; public static final String OPTION_KEY_REPLICATED_TO = "replicatedTo";
public static final String FETCH_TYPE_FULL = "full"; public static final String FETCH_TYPE_FULL = "full";
public static final String FETCH_TYPE_CONNECTED = "connected"; public static final String FETCH_TYPE_CONNECTED = "connected";
......
...@@ -38,6 +38,7 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; ...@@ -38,6 +38,7 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.util.UniqueList;
import org.apache.atlas.type.AtlasArrayType; import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasClassificationType; import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasEntityType;
...@@ -68,11 +69,17 @@ import static org.apache.atlas.model.impexp.AtlasExportRequest.*; ...@@ -68,11 +69,17 @@ import static org.apache.atlas.model.impexp.AtlasExportRequest.*;
public class ExportService { public class ExportService {
private static final Logger LOG = LoggerFactory.getLogger(ExportService.class); private static final Logger LOG = LoggerFactory.getLogger(ExportService.class);
private static final String PROPERTY_GUID = "__guid";
private static final String PROPERTY_IS_PROCESS = "isProcess";
private final AtlasTypeRegistry typeRegistry; private final AtlasTypeRegistry typeRegistry;
private AuditsWriter auditsWriter; private final String QUERY_BINDING_START_GUID = "startGuid";
private AuditsWriter auditsWriter;
private final AtlasGraph atlasGraph; private final AtlasGraph atlasGraph;
private final EntityGraphRetriever entityGraphRetriever; private final EntityGraphRetriever entityGraphRetriever;
private final AtlasGremlinQueryProvider gremlinQueryProvider; private final AtlasGremlinQueryProvider gremlinQueryProvider;
private ExportTypeProcessor exportTypeProcessor;
@Inject @Inject
public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, AuditsWriter auditsWriter) { public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, AuditsWriter auditsWriter) {
...@@ -87,7 +94,8 @@ public class ExportService { ...@@ -87,7 +94,8 @@ public class ExportService {
String requestingIP) throws AtlasBaseException { String requestingIP) throws AtlasBaseException {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP, hostName, startTime); AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP, hostName, startTime);
ExportContext context = new ExportContext(result, exportSink); ExportContext context = new ExportContext(atlasGraph, result, exportSink);
exportTypeProcessor = new ExportTypeProcessor(typeRegistry, context);
try { try {
LOG.info("==> export(user={}, from={})", userName, requestingIP); LOG.info("==> export(user={}, from={})", userName, requestingIP);
...@@ -333,14 +341,14 @@ public class ExportService { ...@@ -333,14 +341,14 @@ public class ExportService {
} }
addEntity(entityWithExtInfo, context); addEntity(entityWithExtInfo, context);
addTypes(entityWithExtInfo.getEntity(), context); exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), context);
context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid()); context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid());
getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), context, direction); getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), context, direction);
if(entityWithExtInfo.getReferredEntities() != null) { if(entityWithExtInfo.getReferredEntities() != null) {
for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) { for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
addTypes(e, context); exportTypeProcessor.addTypes(e, context);
getConntedEntitiesBasedOnOption(e, context, direction); getConntedEntitiesBasedOnOption(e, context, direction);
} }
...@@ -377,7 +385,7 @@ public class ExportService { ...@@ -377,7 +385,7 @@ public class ExportService {
} }
} }
private boolean isProcessEntity(AtlasEntity entity) throws AtlasBaseException { private boolean isProcessEntity(AtlasEntity entity) {
String typeName = entity.getTypeName(); String typeName = entity.getTypeName();
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
...@@ -397,7 +405,7 @@ public class ExportService { ...@@ -397,7 +405,7 @@ public class ExportService {
} }
context.bindings.clear(); context.bindings.clear();
context.bindings.put("startGuid", entity.getGuid()); context.bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
List<Map<String, Object>> result = executeGremlinQuery(query, context); List<Map<String, Object>> result = executeGremlinQuery(query, context);
...@@ -405,10 +413,12 @@ public class ExportService { ...@@ -405,10 +413,12 @@ public class ExportService {
continue; continue;
} }
for (Map<String, Object> map : result) { for (Map<String, Object> hashMap : result) {
String guid = (String) map.get("__guid"); String guid = (String) hashMap.get(PROPERTY_GUID);
TraversalDirection currentDirection = context.guidDirection.get(guid); TraversalDirection currentDirection = context.guidDirection.get(guid);
boolean isLineage = (boolean) map.get("isProcess"); boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
if(context.skipLineage && isLineage) continue;
if (currentDirection == null) { if (currentDirection == null) {
context.addToBeProcessed(isLineage, guid, direction); context.addToBeProcessed(isLineage, guid, direction);
...@@ -445,7 +455,7 @@ public class ExportService { ...@@ -445,7 +455,7 @@ public class ExportService {
String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL); String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL);
context.bindings.clear(); context.bindings.clear();
context.bindings.put("startGuid", entity.getGuid()); context.bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
List<Map<String, Object>> result = executeGremlinQuery(query, context); List<Map<String, Object>> result = executeGremlinQuery(query, context);
...@@ -453,9 +463,11 @@ public class ExportService { ...@@ -453,9 +463,11 @@ public class ExportService {
return; return;
} }
for (Map<String, Object> map : result) { for (Map<String, Object> hashMap : result) {
String guid = (String) map.get("__guid"); String guid = (String) hashMap.get(PROPERTY_GUID);
boolean isLineage = (boolean) map.get("isProcess"); boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
if(context.getSkipLineage() && isLineage) continue;
if (!context.guidsProcessed.contains(guid)) { if (!context.guidsProcessed.contains(guid)) {
context.addToBeProcessed(isLineage, guid, TraversalDirection.BOTH); context.addToBeProcessed(isLineage, guid, TraversalDirection.BOTH);
...@@ -642,58 +654,7 @@ public class ExportService { ...@@ -642,58 +654,7 @@ public class ExportService {
} }
} }
public static class UniqueList<T> { static class ExportContext {
private final List<T> list = new ArrayList<>();
private final Set<T> set = new HashSet<>();
public void add(T e) {
if(set.contains(e)) {
return;
}
list.add(e);
set.add(e);
}
public void addAll(UniqueList<T> uniqueList) {
for (T item : uniqueList.list) {
if(set.contains(item)) continue;
set.add(item);
list.add(item);
}
}
public T remove(int index) {
T e = list.remove(index);
set.remove(e);
return e;
}
public boolean contains(T e) {
return set.contains(e);
}
public int size() {
return list.size();
}
public boolean isEmpty() {
return list.isEmpty();
}
public void clear() {
list.clear();
set.clear();
}
public List<T> getList() {
return list;
}
}
private class ExportContext {
final Set<String> guidsProcessed = new HashSet<>(); final Set<String> guidsProcessed = new HashSet<>();
final UniqueList<String> guidsToProcess = new UniqueList<>(); final UniqueList<String> guidsToProcess = new UniqueList<>();
final UniqueList<String> lineageToProcess = new UniqueList<>(); final UniqueList<String> lineageToProcess = new UniqueList<>();
...@@ -710,10 +671,11 @@ public class ExportService { ...@@ -710,10 +671,11 @@ public class ExportService {
private final Map<String, Object> bindings; private final Map<String, Object> bindings;
private final ExportFetchType fetchType; private final ExportFetchType fetchType;
private final String matchType; private final String matchType;
private final boolean skipLineage;
private int progressReportCount = 0; private int progressReportCount = 0;
ExportContext(AtlasExportResult result, ZipSink sink) throws AtlasBaseException { ExportContext(AtlasGraph atlasGraph, AtlasExportResult result, ZipSink sink) throws AtlasBaseException {
this.result = result; this.result = result;
this.sink = sink; this.sink = sink;
...@@ -721,6 +683,7 @@ public class ExportService { ...@@ -721,6 +683,7 @@ public class ExportService {
bindings = new HashMap<>(); bindings = new HashMap<>();
fetchType = getFetchType(result.getRequest()); fetchType = getFetchType(result.getRequest());
matchType = getMatchType(result.getRequest()); matchType = getMatchType(result.getRequest());
skipLineage = getOptionSkipLineage(result.getRequest());
} }
private ExportFetchType getFetchType(AtlasExportRequest request) { private ExportFetchType getFetchType(AtlasExportRequest request) {
...@@ -747,6 +710,11 @@ public class ExportService { ...@@ -747,6 +710,11 @@ public class ExportService {
return matchType; return matchType;
} }
private boolean getOptionSkipLineage(AtlasExportRequest request) {
return request.getOptions().containsKey(AtlasExportRequest.OPTION_SKIP_LINEAGE) &&
(boolean) request.getOptions().get(AtlasExportRequest.OPTION_SKIP_LINEAGE);
}
public void clear() { public void clear() {
guidsToProcess.clear(); guidsToProcess.clear();
guidsProcessed.clear(); guidsProcessed.clear();
...@@ -773,5 +741,9 @@ public class ExportService { ...@@ -773,5 +741,9 @@ public class ExportService {
LOG.info("export(): in progress.. number of entities exported: {}", this.guidsProcessed.size()); LOG.info("export(): in progress.. number of entities exported: {}", this.guidsProcessed.size());
} }
} }
public boolean getSkipLineage() {
return skipLineage;
}
} }
} }
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasEnumType;
import org.apache.atlas.type.AtlasMapType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class ExportTypeProcessor {
private static final Logger LOG = LoggerFactory.getLogger(ExportTypeProcessor.class);
private AtlasTypeRegistry typeRegistry;
private final ExportService.ExportContext context;
ExportTypeProcessor(AtlasTypeRegistry typeRegistry, ExportService.ExportContext context) {
this.typeRegistry = typeRegistry;
this.context = context;
}
public void addTypes(AtlasEntity entity, ExportService.ExportContext context) {
addEntityType(entity.getTypeName(), context);
if(CollectionUtils.isNotEmpty(entity.getClassifications())) {
for (AtlasClassification c : entity.getClassifications()) {
addClassificationType(c.getTypeName(), context);
}
}
}
private void addType(String typeName, ExportService.ExportContext context) {
AtlasType type = null;
try {
type = typeRegistry.getType(typeName);
addType(type, context);
} catch (AtlasBaseException excp) {
LOG.error("unknown type {}", typeName);
}
}
private void addEntityType(String typeName, ExportService.ExportContext context) {
if (!context.entityTypes.contains(typeName)) {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
addEntityType(entityType, context);
}
}
private void addClassificationType(String typeName, ExportService.ExportContext context) {
if (!context.classificationTypes.contains(typeName)) {
AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(typeName);
addClassificationType(classificationType, context);
}
}
private void addType(AtlasType type, ExportService.ExportContext context) {
if (type.getTypeCategory() == TypeCategory.PRIMITIVE) {
return;
}
if (type instanceof AtlasArrayType) {
AtlasArrayType arrayType = (AtlasArrayType)type;
addType(arrayType.getElementType(), context);
} else if (type instanceof AtlasMapType) {
AtlasMapType mapType = (AtlasMapType)type;
addType(mapType.getKeyType(), context);
addType(mapType.getValueType(), context);
} else if (type instanceof AtlasEntityType) {
addEntityType((AtlasEntityType)type, context);
} else if (type instanceof AtlasClassificationType) {
addClassificationType((AtlasClassificationType)type, context);
} else if (type instanceof AtlasStructType) {
addStructType((AtlasStructType)type, context);
} else if (type instanceof AtlasEnumType) {
addEnumType((AtlasEnumType)type, context);
}
}
private void addEntityType(AtlasEntityType entityType, ExportService.ExportContext context) {
if (!context.entityTypes.contains(entityType.getTypeName())) {
context.entityTypes.add(entityType.getTypeName());
addAttributeTypes(entityType, context);
if (CollectionUtils.isNotEmpty(entityType.getAllSuperTypes())) {
for (String superType : entityType.getAllSuperTypes()) {
addEntityType(superType, context);
}
}
}
}
private void addClassificationType(AtlasClassificationType classificationType, ExportService.ExportContext context) {
if (!context.classificationTypes.contains(classificationType.getTypeName())) {
context.classificationTypes.add(classificationType.getTypeName());
addAttributeTypes(classificationType, context);
if (CollectionUtils.isNotEmpty(classificationType.getAllSuperTypes())) {
for (String superType : classificationType.getAllSuperTypes()) {
addClassificationType(superType, context);
}
}
}
}
private void addStructType(AtlasStructType structType, ExportService.ExportContext context) {
if (!context.structTypes.contains(structType.getTypeName())) {
context.structTypes.add(structType.getTypeName());
addAttributeTypes(structType, context);
}
}
private void addEnumType(AtlasEnumType enumType, ExportService.ExportContext context) {
if (!context.enumTypes.contains(enumType.getTypeName())) {
context.enumTypes.add(enumType.getTypeName());
}
}
private void addAttributeTypes(AtlasStructType structType, ExportService.ExportContext context) {
for (AtlasStructDef.AtlasAttributeDef attributeDef : structType.getStructDef().getAttributeDefs()) {
addType(attributeDef.getTypeName(), context);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.util;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class UniqueList<T> {
private final List<T> list = new ArrayList<>();
private final Set<T> set = new HashSet<>();
public void add(T e) {
if(set.contains(e)) {
return;
}
list.add(e);
set.add(e);
}
public void addAll(UniqueList<T> uniqueList) {
for (T item : uniqueList.list) {
if(set.contains(item)) continue;
set.add(item);
list.add(item);
}
}
public T remove(int index) {
T e = list.remove(index);
set.remove(e);
return e;
}
public boolean contains(T e) {
return set.contains(e);
}
public int size() {
return list.size();
}
public boolean isEmpty() {
return list.isEmpty();
}
public void clear() {
list.clear();
set.clear();
}
public List<T> getList() {
return list;
}
}
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.repository.clusterinfo; package org.apache.atlas.repository.impexp;
import org.apache.atlas.TestModules; import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
......
...@@ -18,16 +18,23 @@ ...@@ -18,16 +18,23 @@
package org.apache.atlas.repository.impexp; package org.apache.atlas.repository.impexp;
import com.google.inject.Inject;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConstants; import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.AtlasSearchResult; import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1; import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1;
import org.apache.atlas.repository.store.graph.v1.SoftDeleteHandlerV1; import org.apache.atlas.repository.store.graph.v1.SoftDeleteHandlerV1;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
import org.testng.SkipException;
import java.util.Arrays;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createAtlasEntity;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadEntity;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail; import static org.testng.Assert.fail;
...@@ -36,6 +43,23 @@ public class ExportImportTestBase { ...@@ -36,6 +43,23 @@ public class ExportImportTestBase {
protected DeleteHandlerV1 deleteHandler = mock(SoftDeleteHandlerV1.class); protected DeleteHandlerV1 deleteHandler = mock(SoftDeleteHandlerV1.class);
protected int createEntities(AtlasEntityStoreV2 entityStore, String subDir, String entityFileNames[]) {
for (String fileName : entityFileNames) {
createAtlasEntity(entityStore, loadEntity(subDir, fileName));
}
return entityFileNames.length;
}
protected void verifyCreatedEntities(AtlasEntityStoreV2 entityStore, Object[] entityGuids, int expectedNumberOfEntitiesCreated) {
try {
AtlasEntity.AtlasEntitiesWithExtInfo entities = entityStore.getByIds(Arrays.asList((String[]) entityGuids));
assertEquals(entities.getEntities().size(), expectedNumberOfEntitiesCreated);
} catch (AtlasBaseException e) {
throw new SkipException(String.format("getByIds: could not load '%s'", entityGuids.toString()));
}
}
protected void assertAuditEntry(ExportImportAuditService auditService) { protected void assertAuditEntry(ExportImportAuditService auditService) {
AtlasSearchResult result = null; AtlasSearchResult result = null;
try { try {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import org.apache.atlas.RequestContext;
import org.apache.atlas.TestModules;
import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1;
import org.apache.atlas.repository.store.graph.v1.SoftDeleteHandlerV1;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityChangeNotifier;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.TestResourceFileUtils;
import org.testng.SkipException;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
import java.io.IOException;
import java.util.Map;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadHiveModel;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.AssertJUnit.fail;
@Guice(modules = TestModules.TestOnlyModule.class)
public class ExportSkipLineageTest extends ExportImportTestBase {
private final String ENTITIES_SUB_DIR = "stocksDB-Entities";
private final String DB_GUID = "1637a33e-6512-447b-ade7-249c8cb5344b";
private final String TABLE_GUID = "df122fc3-5555-40f8-a30f-3090b8a622f8";
private final String TABLE_TABLE_GUID = "6f3b305a-c459-4ae4-b651-aee0deb0685f";
private final String TABLE_VIEW_GUID = "56415119-7cb0-40dd-ace8-1e50efd54991";
@Inject
AtlasTypeRegistry typeRegistry;
@Inject
private AtlasTypeDefStore typeDefStore;
@Inject
private EntityGraphMapper graphMapper;
@Inject
ExportService exportService;
private DeleteHandlerV1 deleteHandler = mock(SoftDeleteHandlerV1.class);
private AtlasEntityChangeNotifier mockChangeNotifier = mock(AtlasEntityChangeNotifier.class);
private AtlasEntityStoreV2 entityStore;
@BeforeClass
public void setup() throws IOException, AtlasBaseException {
loadBaseModel(typeDefStore, typeRegistry);
loadHiveModel(typeDefStore, typeRegistry);
entityStore = new AtlasEntityStoreV2(deleteHandler, typeRegistry, mockChangeNotifier, graphMapper);
createEntities(entityStore, ENTITIES_SUB_DIR, new String[]{"db", "table-columns", "table-view", "table-table-lineage"});
final Object[] entityGuids = new Object[]{DB_GUID, TABLE_GUID, TABLE_TABLE_GUID, TABLE_VIEW_GUID};
verifyCreatedEntities(entityStore, entityGuids, 4);
}
@BeforeMethod
public void setupTest() {
RequestContext.clear();
RequestContext.get().setUser(TestUtilsV2.TEST_USER, null);
}
@Test
public void exportWithoutLineage() {
final int expectedEntityCount = 3;
AtlasExportRequest request = getRequest();
ZipSource source = runExportWithParameters(exportService, request);
AtlasEntity.AtlasEntityWithExtInfo entities = ZipFileResourceTestUtils.getEntities(source, expectedEntityCount);
int count = 0;
for (Map.Entry<String, AtlasEntity> entry : entities.getReferredEntities().entrySet()) {
assertNotNull(entry.getValue());
if(entry.getValue().getTypeName().equals("hive_process")) {
fail("Process entities should not be part of export!");
}
count++;
}
assertEquals(count, expectedEntityCount);
}
private AtlasExportRequest getRequest() {
final String filename = "export-skip-lineage";
try {
AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, filename, AtlasExportRequest.class);
return request;
} catch (IOException e) {
throw new SkipException(String.format("getRequest: '%s' could not be laoded.", filename));
}
}
}
...@@ -166,23 +166,6 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase { ...@@ -166,23 +166,6 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
return request; return request;
} }
private AtlasEntity.AtlasEntityWithExtInfo getEntities(ZipSource source, int expectedCount) {
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = new AtlasEntity.AtlasEntityWithExtInfo();
try {
int count = 0;
for(String s : source.getCreationOrder()) {
AtlasEntity entity = source.getByGuid(s);
entityWithExtInfo.addReferredEntity(s, entity);
count++;
}
assertEquals(count, expectedCount);
return entityWithExtInfo;
} catch (AtlasBaseException e) {
throw new SkipException("getEntities: failed!");
}
}
private AtlasExportRequest getExportRequestWithReplicationOption() { private AtlasExportRequest getExportRequestWithReplicationOption() {
try { try {
AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, EXPORT_REQUEST_FILE, AtlasExportRequest.class); AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, EXPORT_REQUEST_FILE, AtlasExportRequest.class);
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
*/ */
package org.apache.atlas.repository.impexp; package org.apache.atlas.repository.impexp;
import org.apache.atlas.repository.util.UniqueList;
import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test; import org.testng.annotations.Test;
...@@ -24,11 +25,11 @@ import static org.testng.Assert.assertEquals; ...@@ -24,11 +25,11 @@ import static org.testng.Assert.assertEquals;
public class UniqueListTest { public class UniqueListTest {
private final String firstElement = "firstElement"; private final String firstElement = "firstElement";
private ExportService.UniqueList<String> uniqueList; private UniqueList<String> uniqueList;
@BeforeClass @BeforeClass
public void setup() { public void setup() {
uniqueList = new ExportService.UniqueList(); uniqueList = new UniqueList();
uniqueList.add(firstElement); uniqueList.add(firstElement);
uniqueList.add("def"); uniqueList.add("def");
uniqueList.add("firstElement"); uniqueList.add("firstElement");
...@@ -42,7 +43,7 @@ public class UniqueListTest { ...@@ -42,7 +43,7 @@ public class UniqueListTest {
@Test @Test
public void addAllList_ListHas2() { public void addAllList_ListHas2() {
ExportService.UniqueList<String> uniqueList2 = new ExportService.UniqueList<>(); UniqueList<String> uniqueList2 = new UniqueList<>();
uniqueList2.addAll(uniqueList); uniqueList2.addAll(uniqueList);
assertEquals(3, uniqueList2.size()); assertEquals(3, uniqueList2.size());
...@@ -50,7 +51,7 @@ public class UniqueListTest { ...@@ -50,7 +51,7 @@ public class UniqueListTest {
@Test @Test
public void attemptClear_SizeIsZero() { public void attemptClear_SizeIsZero() {
ExportService.UniqueList<String> uniqueList2 = new ExportService.UniqueList<>(); UniqueList<String> uniqueList2 = new UniqueList<>();
uniqueList2.addAll(uniqueList); uniqueList2.addAll(uniqueList);
uniqueList2.clear(); uniqueList2.clear();
...@@ -59,7 +60,7 @@ public class UniqueListTest { ...@@ -59,7 +60,7 @@ public class UniqueListTest {
@Test @Test
public void attemptOneRemove_SizeIsReduced() { public void attemptOneRemove_SizeIsReduced() {
ExportService.UniqueList<String> uniqueList2 = new ExportService.UniqueList<>(); UniqueList<String> uniqueList2 = new UniqueList<>();
uniqueList2.addAll(uniqueList); uniqueList2.addAll(uniqueList);
String removedElement = uniqueList2.remove(0); String removedElement = uniqueList2.remove(0);
......
...@@ -253,6 +253,23 @@ public class ZipFileResourceTestUtils { ...@@ -253,6 +253,23 @@ public class ZipFileResourceTestUtils {
return r; return r;
} }
public static AtlasEntity.AtlasEntityWithExtInfo getEntities(ZipSource source, int expectedCount) {
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = new AtlasEntity.AtlasEntityWithExtInfo();
try {
int count = 0;
for(String s : source.getCreationOrder()) {
AtlasEntity entity = source.getByGuid(s);
entityWithExtInfo.addReferredEntity(s, entity);
count++;
}
assertEquals(count, expectedCount);
return entityWithExtInfo;
} catch (AtlasBaseException e) {
throw new SkipException("getEntities: failed!");
}
}
public static void loadModelFromJson(String fileName, AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) throws IOException, AtlasBaseException { public static void loadModelFromJson(String fileName, AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) throws IOException, AtlasBaseException {
AtlasTypesDef typesFromJson = getAtlasTypesDefFromFile(fileName); AtlasTypesDef typesFromJson = getAtlasTypesDefFromFile(fileName);
addReplicationAttributes(typesFromJson); addReplicationAttributes(typesFromJson);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment