Commit 7154e12d by ashutoshm Committed by Madhan Neethiraj

ATLAS-1503: optimization of export implementation

parent 89a38727
...@@ -180,6 +180,16 @@ public class AtlasExportResult implements Serializable { ...@@ -180,6 +180,16 @@ public class AtlasExportResult implements Serializable {
return toString(new StringBuilder()).toString(); return toString(new StringBuilder()).toString();
} }
public void clear() {
if(this.data != null) {
this.data.clear();
}
if(this.metrics != null) {
this.metrics.clear();
}
}
@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE) @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) @JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown=true) @JsonIgnoreProperties(ignoreUnknown=true)
...@@ -234,5 +244,15 @@ public class AtlasExportResult implements Serializable { ...@@ -234,5 +244,15 @@ public class AtlasExportResult implements Serializable {
public String toString() { public String toString() {
return toString(new StringBuilder()).toString(); return toString(new StringBuilder()).toString();
} }
public void clear() {
if(this.typesDef!= null) {
this.typesDef.clear();
}
if(this.entityCreationOrder != null) {
this.entityCreationOrder.clear();
}
}
} }
} }
\ No newline at end of file
...@@ -51,7 +51,6 @@ import org.springframework.security.core.GrantedAuthority; ...@@ -51,7 +51,6 @@ import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.security.core.context.SecurityContextHolder;
import javax.inject.Singleton; import javax.inject.Singleton;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.*; import javax.ws.rs.*;
...@@ -309,7 +308,7 @@ public class AdminResource { ...@@ -309,7 +308,7 @@ public class AdminResource {
ZipSink exportSink = null; ZipSink exportSink = null;
try { try {
exportSink = new ZipSink(); exportSink = new ZipSink(httpServletResponse.getOutputStream());
ExportService exportService = new ExportService(this.typeRegistry); ExportService exportService = new ExportService(this.typeRegistry);
AtlasExportResult result = exportService.run(exportSink, request, Servlets.getUserName(httpServletRequest), AtlasExportResult result = exportService.run(exportSink, request, Servlets.getUserName(httpServletRequest),
...@@ -318,14 +317,13 @@ public class AdminResource { ...@@ -318,14 +317,13 @@ public class AdminResource {
exportSink.close(); exportSink.close();
ServletOutputStream outStream = httpServletResponse.getOutputStream(); httpServletResponse.addHeader("Content-Encoding","gzip");
exportSink.writeTo(outStream);
httpServletResponse.setContentType("application/zip"); httpServletResponse.setContentType("application/zip");
httpServletResponse.setHeader("Content-Disposition", httpServletResponse.setHeader("Content-Disposition",
"attachment; filename=" + result.getClass().getSimpleName()); "attachment; filename=" + result.getClass().getSimpleName());
httpServletResponse.setHeader("Transfer-Encoding", "chunked");
outStream.flush(); httpServletResponse.getOutputStream().flush();
return Response.ok().build(); return Response.ok().build();
} catch (IOException excp) { } catch (IOException excp) {
LOG.error("export() failed", excp); LOG.error("export() failed", excp);
......
...@@ -105,8 +105,9 @@ public class ExportService { ...@@ -105,8 +105,9 @@ public class ExportService {
LOG.error("Operation failed: ", ex); LOG.error("Operation failed: ", ex);
} finally { } finally {
atlasGraph.releaseGremlinScriptEngine(context.scriptEngine); atlasGraph.releaseGremlinScriptEngine(context.scriptEngine);
LOG.info("<== export(user={}, from={}): status {}", userName, requestingIP, context.result.getOperationStatus()); LOG.info("<== export(user={}, from={}): status {}", userName, requestingIP, context.result.getOperationStatus());
context.clear();
result.clear();
} }
return context.result; return context.result;
...@@ -124,8 +125,8 @@ public class ExportService { ...@@ -124,8 +125,8 @@ public class ExportService {
processEntity(entity, context, TraversalDirection.UNKNOWN); processEntity(entity, context, TraversalDirection.UNKNOWN);
} }
while (!context.guidsToProcess.isEmpty()) { while (!context.guidsToProcessIsEmpty()) {
String guid = context.guidsToProcess.remove(0); String guid = context.guidsToProcessRemove(0);
TraversalDirection direction = context.guidDirection.get(guid); TraversalDirection direction = context.guidDirection.get(guid);
AtlasEntity entity = entityGraphRetriever.toAtlasEntity(guid); AtlasEntity entity = entityGraphRetriever.toAtlasEntity(guid);
...@@ -245,7 +246,7 @@ public class ExportService { ...@@ -245,7 +246,7 @@ public class ExportService {
private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException { private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException {
if (direction == TraversalDirection.UNKNOWN) { if (direction == TraversalDirection.UNKNOWN) {
getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.OUTWARD); getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.INWARD);
} else { } else {
if (isProcessEntity(entity)) { if (isProcessEntity(entity)) {
direction = TraversalDirection.OUTWARD; direction = TraversalDirection.OUTWARD;
...@@ -271,7 +272,7 @@ public class ExportService { ...@@ -271,7 +272,7 @@ public class ExportService {
String query = getQueryForTraversalDirection(direction); String query = getQueryForTraversalDirection(direction);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), query); LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcessSize(), query);
} }
context.bindings.clear(); context.bindings.clear();
...@@ -289,8 +290,8 @@ public class ExportService { ...@@ -289,8 +290,8 @@ public class ExportService {
if (currentDirection == null) { if (currentDirection == null) {
context.guidDirection.put(guid, direction); context.guidDirection.put(guid, direction);
if (!context.guidsToProcess.contains(guid)) { if (!context.guidsToProcessContains(guid)) {
context.guidsToProcess.add(guid); context.guidsToProcessAdd(guid);
} }
} else if (currentDirection == TraversalDirection.OUTWARD && direction == TraversalDirection.INWARD) { } else if (currentDirection == TraversalDirection.OUTWARD && direction == TraversalDirection.INWARD) {
context.guidDirection.put(guid, direction); context.guidDirection.put(guid, direction);
...@@ -298,14 +299,14 @@ public class ExportService { ...@@ -298,14 +299,14 @@ public class ExportService {
// the entity should be reprocessed to get inward entities // the entity should be reprocessed to get inward entities
context.guidsProcessed.remove(guid); context.guidsProcessed.remove(guid);
if (!context.guidsToProcess.contains(guid)) { if (!context.guidsToProcessContains(guid)) {
context.guidsToProcess.add(guid); context.guidsToProcessAdd(guid);
} }
} }
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), guids.size(), context.guidsToProcess.size()); LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), guids.size(), context.guidsToProcessSize());
} }
} }
} }
...@@ -323,7 +324,7 @@ public class ExportService { ...@@ -323,7 +324,7 @@ public class ExportService {
private void getEntityGuidsForFullFetch(AtlasEntity entity, ExportContext context) { private void getEntityGuidsForFullFetch(AtlasEntity entity, ExportContext context) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size()); LOG.debug("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcessSize());
} }
String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL); String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL);
...@@ -339,8 +340,8 @@ public class ExportService { ...@@ -339,8 +340,8 @@ public class ExportService {
for (String guid : result) { for (String guid : result) {
if (!context.guidsProcessed.contains(guid)) { if (!context.guidsProcessed.contains(guid)) {
if (!context.guidsToProcess.contains(guid)) { if (!context.guidsToProcessContains(guid)) {
context.guidsToProcess.add(guid); context.guidsToProcessAdd(guid);
} }
context.guidDirection.put(guid, TraversalDirection.BOTH); context.guidDirection.put(guid, TraversalDirection.BOTH);
...@@ -348,7 +349,7 @@ public class ExportService { ...@@ -348,7 +349,7 @@ public class ExportService {
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size()); LOG.debug("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcessSize());
} }
} }
...@@ -434,7 +435,8 @@ public class ExportService { ...@@ -434,7 +435,8 @@ public class ExportService {
private class ExportContext { private class ExportContext {
final Set<String> guidsProcessed = new HashSet<>(); final Set<String> guidsProcessed = new HashSet<>();
final List<String> guidsToProcess = new ArrayList<>(); private final List<String> guidsToProcessList = new ArrayList<>();
private final Set<String> guidsToProcessSet = new HashSet<>();
final Map<String, TraversalDirection> guidDirection = new HashMap<>(); final Map<String, TraversalDirection> guidDirection = new HashMap<>();
final AtlasExportResult result; final AtlasExportResult result;
final ZipSink sink; final ZipSink sink;
...@@ -477,5 +479,35 @@ public class ExportService { ...@@ -477,5 +479,35 @@ public class ExportService {
return matchType; return matchType;
} }
public void clear() {
guidsToProcessList.clear();
guidsToProcessSet.clear();
guidsProcessed.clear();
guidDirection.clear();
}
public boolean guidsToProcessIsEmpty() {
return this.guidsToProcessList.isEmpty();
}
public String guidsToProcessRemove(int i) {
String s = this.guidsToProcessList.remove(i);
guidsToProcessSet.remove(s);
return s;
}
public int guidsToProcessSize() {
return this.guidsToProcessList.size();
}
public boolean guidsToProcessContains(String guid) {
return guidsToProcessSet.contains(guid);
}
public void guidsToProcessAdd(String guid) {
this.guidsToProcessList.add(guid);
guidsToProcessSet.add(guid);
}
} }
} }
...@@ -18,18 +18,16 @@ ...@@ -18,18 +18,16 @@
package org.apache.atlas.web.resources; package org.apache.atlas.web.resources;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.impexp.AtlasExportResult; import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream; import java.util.zip.ZipOutputStream;
...@@ -37,15 +35,9 @@ public class ZipSink { ...@@ -37,15 +35,9 @@ public class ZipSink {
private static final Logger LOG = LoggerFactory.getLogger(ZipSink.class); private static final Logger LOG = LoggerFactory.getLogger(ZipSink.class);
private ZipOutputStream zipOutputStream; private ZipOutputStream zipOutputStream;
private ByteArrayOutputStream byteArrayOutputStream;
public ZipSink() {
init();
}
private void init() { public ZipSink(OutputStream outputStream) {
byteArrayOutputStream = new ByteArrayOutputStream(); zipOutputStream = new ZipOutputStream(outputStream);
zipOutputStream = new ZipOutputStream(byteArrayOutputStream);
} }
public void add(AtlasEntity entity) throws AtlasBaseException { public void add(AtlasEntity entity) throws AtlasBaseException {
...@@ -68,10 +60,6 @@ public class ZipSink { ...@@ -68,10 +60,6 @@ public class ZipSink {
saveToZip(ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME, jsonData); saveToZip(ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME, jsonData);
} }
public void writeTo(OutputStream stream) throws IOException {
byteArrayOutputStream.writeTo(stream);
}
public void close() { public void close() {
try { try {
if(zipOutputStream != null) { if(zipOutputStream != null) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment