Commit 3962057c by ashutoshm Committed by Madhan Neethiraj

ATLAS-1919: Refactored ZipSink to record committed guids

parent bcabde9b
...@@ -433,6 +433,10 @@ public class ExportService { ...@@ -433,6 +433,10 @@ public class ExportService {
} }
private void addEntity(AtlasEntityWithExtInfo entity, ExportContext context) throws AtlasBaseException { private void addEntity(AtlasEntityWithExtInfo entity, ExportContext context) throws AtlasBaseException {
if(context.sink.hasEntity(entity.getEntity().getGuid())) {
return;
}
context.sink.add(entity); context.sink.add(entity);
context.result.incrementMeticsCounter(String.format("entity:%s", entity.getEntity().getTypeName())); context.result.incrementMeticsCounter(String.format("entity:%s", entity.getEntity().getTypeName()));
......
...@@ -27,7 +27,9 @@ import org.slf4j.LoggerFactory; ...@@ -27,7 +27,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream; import java.util.zip.ZipOutputStream;
...@@ -35,6 +37,8 @@ public class ZipSink { ...@@ -35,6 +37,8 @@ public class ZipSink {
private static final Logger LOG = LoggerFactory.getLogger(ZipSink.class); private static final Logger LOG = LoggerFactory.getLogger(ZipSink.class);
private ZipOutputStream zipOutputStream; private ZipOutputStream zipOutputStream;
final Set<String> guids = new HashSet<>();
public ZipSink(OutputStream outputStream) { public ZipSink(OutputStream outputStream) {
zipOutputStream = new ZipOutputStream(outputStream); zipOutputStream = new ZipOutputStream(outputStream);
...@@ -43,11 +47,13 @@ public class ZipSink { ...@@ -43,11 +47,13 @@ public class ZipSink {
public void add(AtlasEntity entity) throws AtlasBaseException { public void add(AtlasEntity entity) throws AtlasBaseException {
String jsonData = convertToJSON(entity); String jsonData = convertToJSON(entity);
saveToZip(entity.getGuid(), jsonData); saveToZip(entity.getGuid(), jsonData);
recordAddedEntityGuids(entity);
} }
public void add(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException { public void add(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException {
String jsonData = convertToJSON(entityWithExtInfo); String jsonData = convertToJSON(entityWithExtInfo);
saveToZip(entityWithExtInfo.getEntity().getGuid(), jsonData); saveToZip(entityWithExtInfo.getEntity().getGuid(), jsonData);
recordAddedEntityGuids(entityWithExtInfo);
} }
public void setResult(AtlasExportResult result) throws AtlasBaseException { public void setResult(AtlasExportResult result) throws AtlasBaseException {
...@@ -100,4 +106,19 @@ public class ZipSink { ...@@ -100,4 +106,19 @@ public class ZipSink {
zipOutputStream.write(payload.getBytes()); zipOutputStream.write(payload.getBytes());
zipOutputStream.closeEntry(); zipOutputStream.closeEntry();
} }
public boolean hasEntity(String guid) {
return guids.contains(guid);
}
private void recordAddedEntityGuids(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
guids.add(entityWithExtInfo.getEntity().getGuid());
if(entityWithExtInfo.getReferredEntities() != null) {
guids.addAll(entityWithExtInfo.getReferredEntities().keySet());
}
}
private void recordAddedEntityGuids(AtlasEntity entity) {
guids.add(entity.getGuid());
}
} }
...@@ -21,6 +21,7 @@ package org.apache.atlas.repository.impexp; ...@@ -21,6 +21,7 @@ package org.apache.atlas.repository.impexp;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult; import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
import org.testng.Assert; import org.testng.Assert;
...@@ -35,11 +36,15 @@ import java.util.List; ...@@ -35,11 +36,15 @@ import java.util.List;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream; import java.util.zip.ZipInputStream;
import static org.testng.Assert.*;
public class ZipSinkTest { public class ZipSinkTest {
private ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); private ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
private ZipSink zipSink; private ZipSink zipSink;
private List<String> defaultExportOrder = new ArrayList<>(Arrays.asList("a", "b", "c", "d")); private List<String> defaultExportOrder = new ArrayList<>(Arrays.asList("a", "b", "c", "d"));
private AtlasExportResult defaultExportResult; private AtlasExportResult defaultExportResult;
private String knownEntityGuidFormat = "111-222-333-%s";
private void initZipSinkWithExportOrder() throws AtlasBaseException { private void initZipSinkWithExportOrder() throws AtlasBaseException {
zipSink = new ZipSink(byteArrayOutputStream); zipSink = new ZipSink(byteArrayOutputStream);
...@@ -80,7 +85,7 @@ public class ZipSinkTest { ...@@ -80,7 +85,7 @@ public class ZipSinkTest {
@Test @Test
public void correctInit_succeeds() throws AtlasBaseException { public void correctInit_succeeds() throws AtlasBaseException {
initZipSinkWithExportOrder(); initZipSinkWithExportOrder();
Assert.assertTrue(true); assertTrue(true);
Assert.assertNotNull(zipSink); Assert.assertNotNull(zipSink);
} }
...@@ -95,11 +100,11 @@ public class ZipSinkTest { ...@@ -95,11 +100,11 @@ public class ZipSinkTest {
Assert.assertNull(zis.getNextEntry()); Assert.assertNull(zis.getNextEntry());
} catch (IOException e) { } catch (IOException e) {
Assert.assertTrue(false); assertTrue(false);
} }
} catch (AtlasBaseException e) { } catch (AtlasBaseException e) {
Assert.assertTrue(false, "No exception should be thrown."); assertTrue(false, "No exception should be thrown.");
} }
} }
...@@ -109,7 +114,7 @@ public class ZipSinkTest { ...@@ -109,7 +114,7 @@ public class ZipSinkTest {
ZipInputStream zis = getZipInputStreamForDefaultExportOrder(); ZipInputStream zis = getZipInputStreamForDefaultExportOrder();
ZipEntry ze = zis.getNextEntry(); ZipEntry ze = zis.getNextEntry();
Assert.assertEquals(ze.getName().replace(".json", ""), ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString()); assertEquals(ze.getName().replace(".json", ""), ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString());
} }
@Test @Test
...@@ -118,28 +123,80 @@ public class ZipSinkTest { ...@@ -118,28 +123,80 @@ public class ZipSinkTest {
ZipInputStream zis = getZipInputStreamForDefaultExportOrder(); ZipInputStream zis = getZipInputStreamForDefaultExportOrder();
zis.getNextEntry(); zis.getNextEntry();
Assert.assertEquals(getZipEntryAsStream(zis).replace("\"", "'"), "['a','b','c','d']"); assertEquals(getZipEntryAsStream(zis).replace("\"", "'"), "['a','b','c','d']");
} }
@Test @Test
public void zipWithExactlyTwoEntries_ContentsVerified() throws AtlasBaseException, IOException { public void zipWithExactlyTwoEntries_ContentsVerified() throws AtlasBaseException, IOException {
ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(); ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
useZipSinkToCreateZipWithTwoEntries(byteOutputStream); useZipSinkToCreateEntries(byteOutputStream);
ByteArrayInputStream bis = new ByteArrayInputStream(byteOutputStream.toByteArray()); ByteArrayInputStream bis = new ByteArrayInputStream(byteOutputStream.toByteArray());
ZipInputStream zipStream = new ZipInputStream(bis); ZipInputStream zipStream = new ZipInputStream(bis);
ZipEntry entry = zipStream.getNextEntry(); ZipEntry entry = zipStream.getNextEntry();
Assert.assertEquals(getZipEntryAsStream(zipStream), "[\"a\",\"b\",\"c\",\"d\"]"); assertEquals(getZipEntryAsStream(zipStream), "[\"a\",\"b\",\"c\",\"d\"]");
Assert.assertEquals(entry.getName().replace(".json", ""), ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString()); assertEquals(entry.getName().replace(".json", ""), ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString());
entry = zipStream.getNextEntry(); entry = zipStream.getNextEntry();
Assert.assertEquals(entry.getName().replace(".json", ""), ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString()); assertEquals(entry.getName().replace(".json", ""), ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString());
Assert.assertTrue(compareJsonWithObject(getZipEntryAsStream(zipStream), defaultExportResult)); assertTrue(compareJsonWithObject(getZipEntryAsStream(zipStream), defaultExportResult));
}
@Test
public void recordsEntityEntries() throws AtlasBaseException {
ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
ZipSink zs = new ZipSink(byteOutputStream);
AtlasEntity entity = new AtlasEntity();
entity.setGuid(String.format(knownEntityGuidFormat, 0));
zs.add(entity);
assertTrue(zs.hasEntity(String.format(knownEntityGuidFormat, 0)));
zs.close();
}
@Test
public void recordsEntityWithExtInfoEntries() throws AtlasBaseException {
final int max_entries = 3;
ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
ZipSink zs = new ZipSink(byteOutputStream);
AtlasEntity entity = new AtlasEntity();
entity.setGuid(String.format(knownEntityGuidFormat, 0));
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = new AtlasEntity.AtlasEntityWithExtInfo(entity);
addReferredEntities(entityWithExtInfo, max_entries);
zs.add(entityWithExtInfo);
for (int i = 0; i <= max_entries; i++) {
String g = String.format(knownEntityGuidFormat, i);
assertTrue(zs.hasEntity(g));
}
zs.close();
}
private void addReferredEntities(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, int maxEntries) {
for (int i = 1; i <= maxEntries; i++) {
AtlasEntity entity1 = new AtlasEntity();
entity1.setGuid(String.format(knownEntityGuidFormat, i));
entityWithExtInfo.addReferredEntity(entity1);
}
}
@Test
public void recordsDoesNotRecordEntityEntries() throws AtlasBaseException {
initZipSinkWithExportOrder();
assertNotNull(zipSink);
assertFalse(zipSink.hasEntity(ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString()));
} }
private void useZipSinkToCreateZipWithTwoEntries(ByteArrayOutputStream byteOutputStream) throws AtlasBaseException { private void useZipSinkToCreateEntries(ByteArrayOutputStream byteOutputStream) throws AtlasBaseException {
ZipSink zs = new ZipSink(byteOutputStream); ZipSink zs = new ZipSink(byteOutputStream);
zs.setExportOrder(defaultExportOrder); zs.setExportOrder(defaultExportOrder);
zs.setResult(getDefaultExportResult()); zs.setResult(getDefaultExportResult());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment