Commit 09425abc by ashutoshm Committed by Madhan Neethiraj

ATLAS-1724, ATLAS-1722: fix export to report error while exporting a non-existing entity

parent f6ea040a
...@@ -33,6 +33,12 @@ ...@@ -33,6 +33,12 @@
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-reflect</artifactId>
<version>1.6.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId> <groupId>org.apache.atlas</groupId>
<artifactId>atlas-intg</artifactId> <artifactId>atlas-intg</artifactId>
</dependency> </dependency>
......
...@@ -92,12 +92,41 @@ public class ExportService { ...@@ -92,12 +92,41 @@ public class ExportService {
try { try {
LOG.info("==> export(user={}, from={})", userName, requestingIP); LOG.info("==> export(user={}, from={})", userName, requestingIP);
for (AtlasObjectId item : request.getItemsToExport()) { AtlasExportResult.OperationStatus[] statuses = processItems(request, context);
processObjectId(item, context);
processTypesDef(context);
updateSinkWithOperationMetrics(context, statuses, getOperationDuration(startTime));
} catch(Exception ex) {
LOG.error("Operation failed: ", ex);
} finally {
atlasGraph.releaseGremlinScriptEngine(context.scriptEngine);
LOG.info("<== export(user={}, from={}): status {}", userName, requestingIP, context.result.getOperationStatus());
context.clear();
result.clear();
} }
long endTime = System.currentTimeMillis(); return context.result;
}
private void updateSinkWithOperationMetrics(ExportContext context, AtlasExportResult.OperationStatus[] statuses, int duration) throws AtlasBaseException {
context.sink.setExportOrder(context.result.getData().getEntityCreationOrder());
context.sink.setTypesDef(context.result.getData().getTypesDef());
clearContextData(context);
context.result.setOperationStatus(getOverallOperationStatus(statuses));
context.result.incrementMeticsCounter("duration", duration);
context.sink.setResult(context.result);
}
private void clearContextData(ExportContext context) {
context.result.setData(null);
}
private int getOperationDuration(long startTime) {
return (int) (System.currentTimeMillis() - startTime);
}
private void processTypesDef(ExportContext context) {
AtlasTypesDef typesDef = context.result.getData().getTypesDef(); AtlasTypesDef typesDef = context.result.getData().getTypesDef();
for (String entityType : context.entityTypes) { for (String entityType : context.entityTypes) {
...@@ -123,33 +152,41 @@ public class ExportService { ...@@ -123,33 +152,41 @@ public class ExportService {
typesDef.getEnumDefs().add(enumDef); typesDef.getEnumDefs().add(enumDef);
} }
}
context.sink.setExportOrder(context.result.getData().getEntityCreationOrder()); private AtlasExportResult.OperationStatus[] processItems(AtlasExportRequest request, ExportContext context) throws AtlasServiceException, AtlasException, AtlasBaseException {
context.sink.setTypesDef(context.result.getData().getTypesDef()); AtlasExportResult.OperationStatus statuses[] = new AtlasExportResult.OperationStatus[request.getItemsToExport().size()];
context.result.setData(null); List<AtlasObjectId> itemsToExport = request.getItemsToExport();
context.result.setOperationStatus(AtlasExportResult.OperationStatus.SUCCESS); for (int i = 0; i < itemsToExport.size(); i++) {
context.result.incrementMeticsCounter("duration", (int) (endTime - startTime)); AtlasObjectId item = itemsToExport.get(i);
statuses[i] = processObjectId(item, context);
}
return statuses;
}
context.sink.setResult(context.result); private AtlasExportResult.OperationStatus getOverallOperationStatus(AtlasExportResult.OperationStatus... statuses) {
} catch(Exception ex) { AtlasExportResult.OperationStatus overall = (statuses.length == 0) ?
LOG.error("Operation failed: ", ex); AtlasExportResult.OperationStatus.FAIL : statuses[0];
} finally {
atlasGraph.releaseGremlinScriptEngine(context.scriptEngine); for (AtlasExportResult.OperationStatus s : statuses) {
LOG.info("<== export(user={}, from={}): status {}", userName, requestingIP, context.result.getOperationStatus()); if (overall != s) {
context.clear(); overall = AtlasExportResult.OperationStatus.PARTIAL_SUCCESS;
result.clear(); }
} }
return context.result; return overall;
} }
private void processObjectId(AtlasObjectId item, ExportContext context) throws AtlasServiceException, AtlasException, AtlasBaseException { private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, ExportContext context) throws AtlasServiceException, AtlasException, AtlasBaseException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("==> processObjectId({})", item); LOG.debug("==> processObjectId({})", item);
} }
try { try {
List<AtlasEntityWithExtInfo> entities = getStartingEntity(item, context); List<AtlasEntityWithExtInfo> entities = getStartingEntity(item, context);
if(entities.size() == 0) {
return AtlasExportResult.OperationStatus.FAIL;
}
for (AtlasEntityWithExtInfo entityWithExtInfo : entities) { for (AtlasEntityWithExtInfo entityWithExtInfo : entities) {
processEntity(entityWithExtInfo.getEntity().getGuid(), context); processEntity(entityWithExtInfo.getEntity().getGuid(), context);
...@@ -167,14 +204,15 @@ public class ExportService { ...@@ -167,14 +204,15 @@ public class ExportService {
} }
} }
} catch (AtlasBaseException excp) { } catch (AtlasBaseException excp) {
context.result.setOperationStatus(AtlasExportResult.OperationStatus.PARTIAL_SUCCESS);
LOG.error("Fetching entity failed for: {}", item, excp); LOG.error("Fetching entity failed for: {}", item, excp);
return AtlasExportResult.OperationStatus.FAIL;
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("<== processObjectId({})", item); LOG.debug("<== processObjectId({})", item);
} }
return AtlasExportResult.OperationStatus.SUCCESS;
} }
private List<AtlasEntityWithExtInfo> getStartingEntity(AtlasObjectId item, ExportContext context) throws AtlasBaseException { private List<AtlasEntityWithExtInfo> getStartingEntity(AtlasObjectId item, ExportContext context) throws AtlasBaseException {
......
...@@ -35,7 +35,8 @@ import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1; ...@@ -35,7 +35,8 @@ import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1;
import org.apache.atlas.repository.store.graph.v1.SoftDeleteHandlerV1; import org.apache.atlas.repository.store.graph.v1.SoftDeleteHandlerV1;
import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.junit.Assert; import org.testng.Assert;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeClass;
...@@ -53,6 +54,7 @@ import java.util.HashMap; ...@@ -53,6 +54,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static org.testng.Assert.*;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@Guice(modules = RepositoryMetadataModule.class) @Guice(modules = RepositoryMetadataModule.class)
...@@ -158,7 +160,6 @@ public class ExportServiceTest { ...@@ -158,7 +160,6 @@ public class ExportServiceTest {
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
ZipSink zipSink = new ZipSink(baos); ZipSink zipSink = new ZipSink(baos);
AtlasExportResult result = exportService.run(zipSink, request, userName, hostName, requestingIP); AtlasExportResult result = exportService.run(zipSink, request, userName, hostName, requestingIP);
Assert.assertEquals(result.getOperationStatus(), AtlasExportResult.OperationStatus.SUCCESS);
zipSink.close(); zipSink.close();
...@@ -177,10 +178,10 @@ public class ExportServiceTest { ...@@ -177,10 +178,10 @@ public class ExportServiceTest {
ZipSink zipSink = new ZipSink(baos); ZipSink zipSink = new ZipSink(baos);
AtlasExportResult result = exportService.run(zipSink, request, "admin", hostName, requestingIP); AtlasExportResult result = exportService.run(zipSink, request, "admin", hostName, requestingIP);
Assert.assertNotNull(exportService); assertNotNull(exportService);
Assert.assertEquals(result.getHostName(), hostName); assertEquals(result.getHostName(), hostName);
Assert.assertEquals(result.getClientIpAddress(), requestingIP); assertEquals(result.getClientIpAddress(), requestingIP);
Assert.assertEquals(request, result.getRequest()); assertEquals(request, result.getRequest());
} }
@Test @Test
...@@ -198,8 +199,8 @@ public class ExportServiceTest { ...@@ -198,8 +199,8 @@ public class ExportServiceTest {
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
ZipSource zipSource = new ZipSource(bais); ZipSource zipSource = new ZipSource(bais);
Assert.assertNotNull(exportService); assertNotNull(exportService);
Assert.assertNotNull(zipSource.getCreationOrder()); assertNotNull(zipSource.getCreationOrder());
Assert.assertFalse(zipSource.hasNext()); Assert.assertFalse(zipSource.hasNext());
} }
...@@ -244,54 +245,109 @@ public class ExportServiceTest { ...@@ -244,54 +245,109 @@ public class ExportServiceTest {
verifyExportForEmployeeData(zipSource); verifyExportForEmployeeData(zipSource);
} }
@Test
public void verifyOverallStatus() throws Exception {
ExportService service = new ExportService(typeRegistry);
assertEquals(AtlasExportResult.OperationStatus.FAIL, Whitebox.invokeMethod(service,
"getOverallOperationStatus"));
assertEquals(AtlasExportResult.OperationStatus.SUCCESS, Whitebox.invokeMethod(service,
"getOverallOperationStatus",
AtlasExportResult.OperationStatus.SUCCESS));
assertEquals(AtlasExportResult.OperationStatus.SUCCESS, Whitebox.invokeMethod(service,
"getOverallOperationStatus",
AtlasExportResult.OperationStatus.SUCCESS,
AtlasExportResult.OperationStatus.SUCCESS,
AtlasExportResult.OperationStatus.SUCCESS));
assertEquals(AtlasExportResult.OperationStatus.PARTIAL_SUCCESS, Whitebox.invokeMethod(service,
"getOverallOperationStatus",
AtlasExportResult.OperationStatus.FAIL,
AtlasExportResult.OperationStatus.PARTIAL_SUCCESS,
AtlasExportResult.OperationStatus.SUCCESS));
assertEquals(AtlasExportResult.OperationStatus.PARTIAL_SUCCESS, Whitebox.invokeMethod(service,
"getOverallOperationStatus",
AtlasExportResult.OperationStatus.FAIL,
AtlasExportResult.OperationStatus.FAIL,
AtlasExportResult.OperationStatus.PARTIAL_SUCCESS));
assertEquals(AtlasExportResult.OperationStatus.FAIL, Whitebox.invokeMethod(service,
"getOverallOperationStatus",
AtlasExportResult.OperationStatus.FAIL,
AtlasExportResult.OperationStatus.FAIL,
AtlasExportResult.OperationStatus.FAIL));
}
@Test
public void requestingExportOfNonExistentEntity_ReturnsFailure() throws Exception {
AtlasExportRequest request = getRequestForEmployee();
tamperEmployeeRequest(request);
ZipSource zipSource = runExportWithParameters(request);
assertNotNull(zipSource.getCreationOrder());
assertEquals(zipSource.getCreationOrder().size(), 0);
assertEquals(AtlasExportResult.OperationStatus.FAIL, zipSource.getExportResult().getOperationStatus());
}
private void tamperEmployeeRequest(AtlasExportRequest request) {
AtlasObjectId objectId = request.getItemsToExport().get(0);
objectId.getUniqueAttributes().remove("name");
objectId.getUniqueAttributes().put("qualifiedName", "XXX@121");
}
private void verifyExportForEmployeeData(ZipSource zipSource) throws AtlasBaseException { private void verifyExportForEmployeeData(ZipSource zipSource) throws AtlasBaseException {
final List<String> expectedEntityTypes = Arrays.asList(new String[]{"Manager", "Employee", "Department"}); final List<String> expectedEntityTypes = Arrays.asList(new String[]{"Manager", "Employee", "Department"});
Assert.assertNotNull(zipSource.getCreationOrder()); assertNotNull(zipSource.getCreationOrder());
Assert.assertEquals(zipSource.getCreationOrder().size(), 2); assertEquals(zipSource.getCreationOrder().size(), 2);
Assert.assertTrue(zipSource.hasNext()); assertTrue(zipSource.hasNext());
while (zipSource.hasNext()) { while (zipSource.hasNext()) {
AtlasEntity entity = zipSource.next(); AtlasEntity entity = zipSource.next();
Assert.assertNotNull(entity); assertNotNull(entity);
Assert.assertEquals(entity.getStatus(), AtlasEntity.Status.ACTIVE); assertEquals(AtlasEntity.Status.ACTIVE, entity.getStatus());
Assert.assertTrue(expectedEntityTypes.contains(entity.getTypeName())); assertTrue(expectedEntityTypes.contains(entity.getTypeName()));
} }
verifyTypeDefs(zipSource); verifyTypeDefs(zipSource);
} }
private void verifyExportForHrData(ZipSource zipSource) throws IOException, AtlasBaseException { private void verifyExportForHrData(ZipSource zipSource) throws IOException, AtlasBaseException {
Assert.assertNotNull(zipSource.getCreationOrder()); assertNotNull(zipSource.getCreationOrder());
Assert.assertTrue(zipSource.getCreationOrder().size() == 1); assertTrue(zipSource.getCreationOrder().size() == 1);
Assert.assertTrue(zipSource.hasNext()); assertTrue(zipSource.hasNext());
AtlasEntity entity = zipSource.next(); AtlasEntity entity = zipSource.next();
Assert.assertNotNull(entity); assertNotNull(entity);
Assert.assertTrue(entity.getTypeName().equals("Department")); assertTrue(entity.getTypeName().equals("Department"));
Assert.assertEquals(entity.getStatus(), AtlasEntity.Status.ACTIVE); assertEquals(entity.getStatus(), AtlasEntity.Status.ACTIVE);
verifyTypeDefs(zipSource); verifyTypeDefs(zipSource);
} }
private void verifyExportForHrDataForConnected(ZipSource zipSource) throws IOException, AtlasBaseException { private void verifyExportForHrDataForConnected(ZipSource zipSource) throws IOException, AtlasBaseException {
Assert.assertNotNull(zipSource.getCreationOrder()); assertNotNull(zipSource.getCreationOrder());
Assert.assertTrue(zipSource.getCreationOrder().size() == 2); assertTrue(zipSource.getCreationOrder().size() == 2);
Assert.assertTrue(zipSource.hasNext()); assertTrue(zipSource.hasNext());
AtlasEntity entity = zipSource.next(); AtlasEntity entity = zipSource.next();
Assert.assertNotNull(entity); assertNotNull(entity);
Assert.assertTrue(entity.getTypeName().equals("Department")); assertTrue(entity.getTypeName().equals("Department"));
Assert.assertEquals(entity.getStatus(), AtlasEntity.Status.ACTIVE); assertEquals(entity.getStatus(), AtlasEntity.Status.ACTIVE);
verifyTypeDefs(zipSource); verifyTypeDefs(zipSource);
} }
private void verifyTypeDefs(ZipSource zipSource) throws AtlasBaseException { private void verifyTypeDefs(ZipSource zipSource) throws AtlasBaseException {
Assert.assertEquals(zipSource.getTypesDef().getEnumDefs().size(), 1); assertEquals(zipSource.getTypesDef().getEnumDefs().size(), 1);
Assert.assertEquals(zipSource.getTypesDef().getClassificationDefs().size(), 0); assertEquals(zipSource.getTypesDef().getClassificationDefs().size(), 0);
Assert.assertEquals(zipSource.getTypesDef().getStructDefs().size(), 1); assertEquals(zipSource.getTypesDef().getStructDefs().size(), 1);
Assert.assertEquals(zipSource.getTypesDef().getEntityDefs().size(), 4); assertEquals(zipSource.getTypesDef().getEntityDefs().size(), 4);
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment