Commit c623835f by Ashutosh Mestry Committed by Madhan Neethiraj

ATLAS-2129: import fix to handle shutdown while in the middle of import

Signed-off-by: 's avatarMadhan Neethiraj <madhan@apache.org> (cherry picked from commit 81e5444f4ce9635465632b90ac9d97eec3a16a6b)
parent f1c46466
......@@ -22,7 +22,7 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.BulkImporter;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.MapUtils;
......@@ -42,17 +42,17 @@ public class ImportService {
private static final Logger LOG = LoggerFactory.getLogger(ImportService.class);
private final AtlasTypeDefStore typeDefStore;
private final AtlasEntityStore entityStore;
private final AtlasTypeRegistry typeRegistry;
private final BulkImporter bulkImporter;
private long startTimestamp;
private long endTimestamp;
@Inject
public ImportService(final AtlasTypeDefStore typeDefStore, final AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
public ImportService(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry, BulkImporter bulkImporter) {
this.typeDefStore = typeDefStore;
this.entityStore = entityStore;
this.typeRegistry = typeRegistry;
this.bulkImporter = bulkImporter;
}
public AtlasImportResult run(ZipSource source, String userName,
......@@ -154,7 +154,7 @@ public class ImportService {
}
private void processEntities(ZipSource importSource, AtlasImportResult result) throws AtlasBaseException {
this.entityStore.bulkImport(importSource, result);
this.bulkImporter.bulkImport(importSource, result);
endTimestamp = System.currentTimeMillis();
result.incrementMeticsCounter("duration", (int) (this.endTimestamp - this.startTimestamp));
......
......@@ -18,12 +18,10 @@
package org.apache.atlas.repository.store.graph;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
import org.apache.atlas.repository.store.graph.v1.EntityStream;
import org.apache.atlas.type.AtlasEntityType;
......@@ -69,12 +67,12 @@ public interface AtlasEntityStore {
EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate) throws AtlasBaseException;
/**
* Create or update entities in the stream using repeated commits of connected entities
* Create or update entities with parameters necessary for import process
* @param entityStream AtlasEntityStream
* @return EntityMutationResponse Entity mutations operations with the corresponding set of entities on which these operations were performed
* @throws AtlasBaseException
*/
EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException;
EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException;
/**
* Update a single entity
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
public interface BulkImporter {
/**
* Create or update entities in the stream using repeated commits of connected entities
* @param entityStream AtlasEntityStream
* @return EntityMutationResponse Entity mutations operations with the corresponding set of entities on which these operations were performed
* @throws AtlasBaseException
*/
EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException;
}
......@@ -23,12 +23,10 @@ import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.graphdb.AtlasVertex;
......@@ -52,11 +50,8 @@ import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
......@@ -146,115 +141,6 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
return ret;
}
@Override
public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> bulkImport()");
}
if (entityStream == null || !entityStream.hasNext()) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
}
EntityMutationResponse ret = new EntityMutationResponse();
ret.setGuidAssignments(new HashMap<String, String>());
Set<String> processedGuids = new HashSet<>();
float currentPercent = 0f;
List<String> residualList = new ArrayList<>();
EntityImportStreamWithResidualList entityImportStreamWithResidualList = new EntityImportStreamWithResidualList(entityStream, residualList);
while (entityImportStreamWithResidualList.hasNext()) {
AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo();
AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
if (entity == null || processedGuids.contains(entity.getGuid())) {
continue;
}
AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, entityStream);
try {
EntityMutationResponse resp = createOrUpdateForImport(oneEntityStream);
if (resp.getGuidAssignments() != null) {
ret.getGuidAssignments().putAll(resp.getGuidAssignments());
}
currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids, entityStream.getPosition(),
entityImportStreamWithResidualList.getStreamSize(), currentPercent);
entityStream.onImportComplete(entity.getGuid());
} catch (AtlasBaseException e) {
if (!updateResidualList(e, residualList, entityWithExtInfo.getEntity().getGuid())) {
throw e;
}
}
}
importResult.getProcessedEntities().addAll(processedGuids);
LOG.info("bulkImport(): done. Total number of entities (including referred entities) imported: {}", processedGuids.size());
return ret;
}
private boolean updateResidualList(AtlasBaseException e, List<String> lineageList, String guid) {
if (!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode())) {
return false;
}
lineageList.add(guid);
return true;
}
private float updateImportMetrics(AtlasEntityWithExtInfo currentEntity,
EntityMutationResponse resp,
AtlasImportResult importResult,
Set<String> processedGuids,
int currentIndex, int streamSize, float currentPercent) {
updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)",
currentEntity.getEntity().getTypeName(),
currentIndex,
currentEntity.getEntity().getGuid());
return updateImportProgress(LOG, currentIndex + 1, streamSize, currentPercent, lastEntityImported);
}
private static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent,
String additionalInfo) {
final double tolerance = 0.000001;
final int MAX_PERCENT = 100;
float percent = (float) ((currentIndex * MAX_PERCENT) / streamSize);
boolean updateLog = Double.compare(percent, currentPercent) > tolerance;
float updatedPercent = (MAX_PERCENT < streamSize) ? percent :
((updateLog) ? ++currentPercent : currentPercent);
if (updateLog) {
log.info("bulkImport(): progress: {}% (of {}) - {}", (int) Math.ceil(percent), streamSize, additionalInfo);
}
return updatedPercent;
}
private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
if (list == null) {
return;
}
for (AtlasEntityHeader h : list) {
if (processedGuids.contains(h.getGuid())) {
continue;
}
processedGuids.add(h.getGuid());
importResult.incrementMeticsCounter(String.format(prefix, h.getTypeName()));
}
}
private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate, boolean replaceClassifications) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> createOrUpdate()");
......@@ -287,8 +173,9 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
return createOrUpdate(entityStream, isPartialUpdate, false);
}
@Override
@GraphTransaction
private EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException {
public EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException {
return createOrUpdate(entityStream, false, true);
}
......@@ -763,43 +650,4 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
return ret;
}
private static class EntityImportStreamWithResidualList {
private final EntityImportStream stream;
private final List<String> residualList;
private boolean navigateResidualList;
private int currentResidualListIndex;
public EntityImportStreamWithResidualList(EntityImportStream stream, List<String> residualList) {
this.stream = stream;
this.residualList = residualList;
this.navigateResidualList = false;
this.currentResidualListIndex = 0;
}
public AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
if (navigateResidualList == false) {
return stream.getNextEntityWithExtInfo();
} else {
stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++));
return stream.getNextEntityWithExtInfo();
}
}
public boolean hasNext() {
if (!navigateResidualList) {
boolean streamHasNext = stream.hasNext();
navigateResidualList = (streamHasNext == false);
return streamHasNext ? streamHasNext : (currentResidualListIndex < residualList.size());
} else {
return (currentResidualListIndex < residualList.size());
}
}
public int getStreamSize() {
return stream.size() + residualList.size();
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v1;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.BulkImporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Component
public class BulkImporterImpl implements BulkImporter {
private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV1.class);
private final AtlasEntityStore entityStore;
@Inject
public BulkImporterImpl(AtlasEntityStore entityStore) {
this.entityStore = entityStore;
}
@Override
public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> bulkImport()");
}
if (entityStream == null || !entityStream.hasNext()) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
}
EntityMutationResponse ret = new EntityMutationResponse();
ret.setGuidAssignments(new HashMap<String, String>());
Set<String> processedGuids = new HashSet<>();
float currentPercent = 0f;
List<String> residualList = new ArrayList<>();
EntityImportStreamWithResidualList entityImportStreamWithResidualList = new EntityImportStreamWithResidualList(entityStream, residualList);
while (entityImportStreamWithResidualList.hasNext()) {
AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo();
AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
if (entity == null || processedGuids.contains(entity.getGuid())) {
continue;
}
AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, entityStream);
try {
EntityMutationResponse resp = entityStore.createOrUpdateForImport(oneEntityStream);
if (resp.getGuidAssignments() != null) {
ret.getGuidAssignments().putAll(resp.getGuidAssignments());
}
currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids, entityStream.getPosition(), entityImportStreamWithResidualList.getStreamSize(), currentPercent);
entityStream.onImportComplete(entity.getGuid());
} catch (AtlasBaseException e) {
if (!updateResidualList(e, residualList, entityWithExtInfo.getEntity().getGuid())) {
throw e;
}
} catch (Throwable e) {
AtlasBaseException abe = new AtlasBaseException(e);
if (!updateResidualList(abe, residualList, entityWithExtInfo.getEntity().getGuid())) {
throw abe;
}
}
}
importResult.getProcessedEntities().addAll(processedGuids);
LOG.info("bulkImport(): done. Total number of entities (including referred entities) imported: {}", processedGuids.size());
return ret;
}
private boolean updateResidualList(AtlasBaseException e, List<String> lineageList, String guid) {
if (!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode())) {
return false;
}
lineageList.add(guid);
return true;
}
private float updateImportMetrics(AtlasEntity.AtlasEntityWithExtInfo currentEntity,
EntityMutationResponse resp,
AtlasImportResult importResult,
Set<String> processedGuids,
int currentIndex, int streamSize, float currentPercent) {
updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid());
return updateImportProgress(LOG, currentIndex + 1, streamSize, currentPercent, lastEntityImported);
}
private static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent,
String additionalInfo) {
final double tolerance = 0.000001;
final int MAX_PERCENT = 100;
float percent = (float) ((currentIndex * MAX_PERCENT) / streamSize);
boolean updateLog = Double.compare(percent, currentPercent) > tolerance;
float updatedPercent = (MAX_PERCENT < streamSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent);
if (updateLog) {
log.info("bulkImport(): progress: {}% (of {}) - {}", (int) Math.ceil(percent), streamSize, additionalInfo);
}
return updatedPercent;
}
private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
if (list == null) {
return;
}
for (AtlasEntityHeader h : list) {
if (processedGuids.contains(h.getGuid())) {
continue;
}
processedGuids.add(h.getGuid());
importResult.incrementMeticsCounter(String.format(prefix, h.getTypeName()));
}
}
private static class EntityImportStreamWithResidualList {
private final EntityImportStream stream;
private final List<String> residualList;
private boolean navigateResidualList;
private int currentResidualListIndex;
public EntityImportStreamWithResidualList(EntityImportStream stream, List<String> residualList) {
this.stream = stream;
this.residualList = residualList;
this.navigateResidualList = false;
this.currentResidualListIndex = 0;
}
public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
if (navigateResidualList == false) {
return stream.getNextEntityWithExtInfo();
} else {
stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++));
return stream.getNextEntityWithExtInfo();
}
}
public boolean hasNext() {
if (!navigateResidualList) {
boolean streamHasNext = stream.hasNext();
navigateResidualList = (streamHasNext == false);
return streamHasNext ? streamHasNext : (currentResidualListIndex < residualList.size());
} else {
return (currentResidualListIndex < residualList.size());
}
}
public int getStreamSize() {
return stream.size() + residualList.size();
}
}
}
......@@ -151,6 +151,7 @@ public class TestModules {
bind(LineageService.class).to(DataSetLineageService.class).asEagerSingleton();
bind(AtlasLineageService.class).to(EntityLineageService.class).asEagerSingleton();
bind(BulkImporter.class).to(BulkImporterImpl.class).asEagerSingleton();
bindTypeCache();
......
......@@ -24,7 +24,6 @@ import org.apache.atlas.TestModules;
import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasTypeRegistry;
......@@ -43,11 +42,11 @@ import java.util.Map;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@Guice(modules = TestModules.TestOnlyModule.class)
public class ImportServiceTest {
private static final Logger LOG = LoggerFactory.getLogger(ImportServiceTest.class);
private final ImportService importService;
@Inject
AtlasTypeRegistry typeRegistry;
......@@ -56,7 +55,9 @@ public class ImportServiceTest {
private AtlasTypeDefStore typeDefStore;
@Inject
private AtlasEntityStore entityStore;
public ImportServiceTest(ImportService importService) {
this.importService = importService;
}
@BeforeTest
public void setupTest() {
......@@ -72,7 +73,7 @@ public class ImportServiceTest {
@Test(dataProvider = "sales")
public void importDB1(ZipSource zipSource) throws AtlasBaseException, IOException {
loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
runAndVerifyQuickStart_v1_Import(new ImportService(typeDefStore, entityStore, typeRegistry), zipSource);
runAndVerifyQuickStart_v1_Import(importService, zipSource);
}
@DataProvider(name = "reporting")
......@@ -83,7 +84,7 @@ public class ImportServiceTest {
@Test(dataProvider = "reporting")
public void importDB2(ZipSource zipSource) throws AtlasBaseException, IOException {
loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
runAndVerifyQuickStart_v1_Import(new ImportService(typeDefStore, entityStore, typeRegistry), zipSource);
runAndVerifyQuickStart_v1_Import(importService, zipSource);
}
@DataProvider(name = "logging")
......@@ -94,7 +95,7 @@ public class ImportServiceTest {
@Test(dataProvider = "logging")
public void importDB3(ZipSource zipSource) throws AtlasBaseException, IOException {
loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
runAndVerifyQuickStart_v1_Import(new ImportService(typeDefStore, entityStore, typeRegistry), zipSource);
runAndVerifyQuickStart_v1_Import(importService, zipSource);
}
@DataProvider(name = "salesNewTypeAttrs")
......@@ -105,7 +106,7 @@ public class ImportServiceTest {
@Test(dataProvider = "salesNewTypeAttrs", dependsOnMethods = "importDB1")
public void importDB4(ZipSource zipSource) throws AtlasBaseException, IOException {
loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
runImportWithParameters(new ImportService(typeDefStore, entityStore, typeRegistry), getDefaultImportRequest(), zipSource);
runImportWithParameters(importService, getDefaultImportRequest(), zipSource);
}
@DataProvider(name = "salesNewTypeAttrs-next")
......@@ -125,7 +126,7 @@ public class ImportServiceTest {
options.put("updateTypeDefinition", "false");
request.setOptions(options);
runImportWithParameters(new ImportService(typeDefStore, entityStore, typeRegistry), request, zipSource);
runImportWithParameters(importService, request, zipSource);
assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName));
assertEquals(typeDefStore.getEnumDefByName(newEnumDefName).getElementDefs().size(), 4);
}
......@@ -141,7 +142,7 @@ public class ImportServiceTest {
options.put("updateTypeDefinition", "true");
request.setOptions(options);
runImportWithParameters(new ImportService(typeDefStore, entityStore, typeRegistry), request, zipSource);
runImportWithParameters(importService, request, zipSource);
assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName));
assertEquals(typeDefStore.getEnumDefByName(newEnumDefName).getElementDefs().size(), 8);
}
......@@ -156,7 +157,7 @@ public class ImportServiceTest {
loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
loadModelFromJson("1000-Hadoop/1030-hive_model.json", typeDefStore, typeRegistry);
runImportWithNoParameters(getImportService(), zipSource);
runImportWithNoParameters(importService, zipSource);
}
@DataProvider(name = "hdfs_path1")
......@@ -172,7 +173,7 @@ public class ImportServiceTest {
loadModelFromResourcesJson("tag1.json", typeDefStore, typeRegistry);
try {
runImportWithNoParameters(getImportService(), zipSource);
runImportWithNoParameters(importService, zipSource);
} catch (AtlasBaseException e) {
assertEquals(e.getAtlasErrorCode(), AtlasErrorCode.INVALID_IMPORT_ATTRIBUTE_TYPE_CHANGED);
AtlasClassificationType tag1 = typeRegistry.getClassificationTypeByName("tag1");
......@@ -181,8 +182,4 @@ public class ImportServiceTest {
throw e;
}
}
private ImportService getImportService() {
return new ImportService(typeDefStore, entityStore, typeRegistry);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment