Commit 78e41ac2 by Ashutosh Mestry

ATLAS-2926: ZipSink OOM

parent 7217f678
...@@ -22,6 +22,7 @@ import org.apache.atlas.model.impexp.AtlasExportResult; ...@@ -22,6 +22,7 @@ import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -103,11 +104,31 @@ public class ZipSink { ...@@ -103,11 +104,31 @@ public class ZipSink {
ZipEntry e = new ZipEntry(entryName); ZipEntry e = new ZipEntry(entryName);
zipOutputStream.putNextEntry(e); zipOutputStream.putNextEntry(e);
writeBytes(payload);
zipOutputStream.write(payload.getBytes());
zipOutputStream.closeEntry(); zipOutputStream.closeEntry();
} }
private void writeBytes(String payload) throws IOException {
splitAndWriteBytes(payload, 10 * 1024 * 1024, zipOutputStream);
}
static void splitAndWriteBytes(String msg, int bufferSize, OutputStream os) throws IOException {
int numberOfSplits = (int) Math.ceil(((float) msg.length()) / bufferSize);
if (numberOfSplits == 0) {
numberOfSplits = 1;
} else {
if (LOG.isDebugEnabled()) {
LOG.info("ZipSink: number of splits: {}", numberOfSplits);
}
}
for (int i = 0, start = 0; i < numberOfSplits; i++, start += bufferSize) {
int end = bufferSize + start;
String s = StringUtils.substring(msg, start, end);
os.write(s.getBytes());
}
}
public boolean hasEntity(String guid) { public boolean hasEntity(String guid) {
return guids.contains(guid); return guids.contains(guid);
} }
......
...@@ -30,12 +30,14 @@ import org.testng.annotations.Test; ...@@ -30,12 +30,14 @@ import org.testng.annotations.Test;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream; import java.util.zip.ZipInputStream;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.*; import static org.testng.Assert.*;
public class ZipSinkTest { public class ZipSinkTest {
...@@ -45,6 +47,22 @@ public class ZipSinkTest { ...@@ -45,6 +47,22 @@ public class ZipSinkTest {
private AtlasExportResult defaultExportResult; private AtlasExportResult defaultExportResult;
private String knownEntityGuidFormat = "111-222-333-%s"; private String knownEntityGuidFormat = "111-222-333-%s";
private class MockOutputStream extends OutputStream {
List<byte[]> collected = new ArrayList<>();
@Override
public void write(int b) throws IOException {
}
@Override
public void write(byte[] bytes) {
collected.add(bytes);
}
public List<byte[]> getCollected() {
return collected;
}
};
private void initZipSinkWithExportOrder() throws AtlasBaseException { private void initZipSinkWithExportOrder() throws AtlasBaseException {
zipSink = new ZipSink(byteArrayOutputStream); zipSink = new ZipSink(byteArrayOutputStream);
...@@ -207,4 +225,26 @@ public class ZipSinkTest { ...@@ -207,4 +225,26 @@ public class ZipSinkTest {
String json = AtlasType.toJson(defaultExportResult); String json = AtlasType.toJson(defaultExportResult);
return json.equals(s); return json.equals(s);
} }
@Test
public void splitTest() throws IOException {
assertSplit("ABCDEFGHIJKLMNOPQRSTUVWXYZ01", 7, new String[] {"ABCDEFG", "HIJKLMN", "OPQRSTU", "VWXYZ01"});
assertSplit("ABCDEFGHIJKLMNOPQRSTUVWXYZ", 7, new String[] {"ABCDEFG", "HIJKLMN", "OPQRSTU", "VWXYZ"});
}
private void assertSplit(String msg, int bufferSize, String[] splits) throws IOException {
MockOutputStream os = getOutputStream();
ZipSink.splitAndWriteBytes(msg, bufferSize, os);
assertEquals(os.getCollected().size(), splits.length);
for (int i = 0; i < os.collected.size(); i++) {
byte[] bytes = os.getCollected().get(i);
String s = new String(bytes);
assertEquals(s, splits[i]);
}
}
private MockOutputStream getOutputStream() {
return new MockOutputStream();
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment