Commit c0a91c7e by Ashutosh Mestry

ATLAS-2843: AtlasClient updates for exportData and importData.

parent 0c308015
......@@ -59,10 +59,10 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.URI;
......@@ -385,7 +385,7 @@ public abstract class AtlasBaseClient {
}
try {
if(api.getProduces().equals(MediaType.APPLICATION_OCTET_STREAM)) {
return (T) IOUtils.toByteArray(clientResponse.getEntityInputStream());
return (T) clientResponse.getEntityInputStream();
} else if (responseType.getRawClass().equals(ObjectNode.class)) {
String stringEntity = clientResponse.getEntity(String.class);
try {
......@@ -404,8 +404,6 @@ public abstract class AtlasBaseClient {
}
} catch (ClientHandlerException e) {
throw new AtlasServiceException(api, e);
} catch (IOException e) {
throw new AtlasServiceException(api, e);
}
} else if (clientResponse.getStatus() != ClientResponse.Status.SERVICE_UNAVAILABLE.getStatusCode()) {
break;
......@@ -467,9 +465,9 @@ public abstract class AtlasBaseClient {
return configuration.getInt(AtlasBaseClient.ATLAS_CLIENT_HA_RETRIES_KEY, AtlasBaseClient.DEFAULT_NUM_RETRIES);
}
public byte[] exportData(AtlasExportRequest request) throws AtlasServiceException {
public InputStream exportData(AtlasExportRequest request) throws AtlasServiceException {
try {
return (byte[]) callAPI(EXPORT, Object.class, request);
return (InputStream) callAPI(EXPORT, Object.class, request);
} catch (Exception e) {
LOG.error("error writing to file", e);
throw new AtlasServiceException(e);
......@@ -479,14 +477,22 @@ public abstract class AtlasBaseClient {
public void exportData(AtlasExportRequest request, String absolutePath) throws AtlasServiceException {
OutputStream fileOutputStream = null;
try {
byte[] fileBytes = exportData(request);
InputStream inputStream = exportData(request);
fileOutputStream = new FileOutputStream(new File(absolutePath));
IOUtils.write(fileBytes, fileOutputStream);
byte[] buffer = new byte[8 * 1024];
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
fileOutputStream.write(buffer, 0, bytesRead);
}
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(fileOutputStream);
} catch (Exception e) {
LOG.error("error writing to file", e);
throw new AtlasServiceException(e);
} finally {
if(fileOutputStream != null) {
if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
......@@ -502,9 +508,9 @@ public abstract class AtlasBaseClient {
new FileDataBodyPart(IMPORT_DATA_PARAMETER, new File(absoluteFilePath)));
}
public AtlasImportResult importData(AtlasImportRequest request, byte[] fileData) throws AtlasServiceException {
public AtlasImportResult importData(AtlasImportRequest request, InputStream stream) throws AtlasServiceException {
return performImportData(getImportRequestBodyPart(request),
new StreamDataBodyPart(IMPORT_DATA_PARAMETER, new ByteArrayInputStream(fileData)));
new StreamDataBodyPart(IMPORT_DATA_PARAMETER, stream));
}
private AtlasImportResult performImportData(BodyPart requestPart, BodyPart filePart) throws AtlasServiceException {
......
......@@ -35,17 +35,16 @@ import java.util.Set;
public class ImportTransforms {
private static final Logger LOG = LoggerFactory.getLogger(ImportTransforms.class);
private Map<String, Map<String, List<ImportTransformer>>> transforms;
private static final String ALL_ATTRIBUTES = "*";
private Map<String, Map<String, List<ImportTransformer>>> transforms;
public static ImportTransforms fromJson(String jsonString) {
ImportTransforms ret = null;
if (StringUtils.isNotBlank(jsonString)) {
ret = new ImportTransforms(jsonString);
if (StringUtils.isEmpty(jsonString)) {
return null;
}
return ret;
return new ImportTransforms(jsonString);
}
public Map<String, Map<String, List<ImportTransformer>>> getTransforms() {
......@@ -72,13 +71,15 @@ public class ImportTransforms {
}
public AtlasEntity.AtlasEntityWithExtInfo apply(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException {
if (entityWithExtInfo != null) {
apply(entityWithExtInfo.getEntity());
if (entityWithExtInfo == null) {
return entityWithExtInfo;
}
if(MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) {
for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
apply(e);
}
apply(entityWithExtInfo.getEntity());
if(MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) {
for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
apply(e);
}
}
......@@ -86,30 +87,46 @@ public class ImportTransforms {
}
public AtlasEntity apply(AtlasEntity entity) throws AtlasBaseException {
if(entity != null) {
Map<String, List<ImportTransformer>> entityTransforms = getTransforms(entity.getTypeName());
if (entity == null) {
return entity;
}
Map<String, List<ImportTransformer>> entityTransforms = getTransforms(entity.getTypeName());
if (MapUtils.isEmpty(entityTransforms)) {
return entity;
}
applyEntitySpecific(entity, entityTransforms);
if (MapUtils.isNotEmpty(entityTransforms)) {
for (Map.Entry<String, List<ImportTransformer>> entry : entityTransforms.entrySet()) {
String attributeName = entry.getKey();
List<ImportTransformer> attrTransforms = entry.getValue();
applyAttributeSpecific(entity, entityTransforms);
if (!entity.hasAttribute(attributeName)) {
continue;
}
return entity;
}
Object transformedValue = entity.getAttribute(attributeName);
private void applyAttributeSpecific(AtlasEntity entity, Map<String, List<ImportTransformer>> entityTransforms) throws AtlasBaseException {
for (Map.Entry<String, List<ImportTransformer>> entry : entityTransforms.entrySet()) {
String attributeName = entry.getKey();
List<ImportTransformer> attrTransforms = entry.getValue();
for (ImportTransformer attrTransform : attrTransforms) {
transformedValue = attrTransform.apply(transformedValue);
}
if (!entity.hasAttribute(attributeName)) {
continue;
}
entity.setAttribute(attributeName, transformedValue);
}
Object attributeValue = entity.getAttribute(attributeName);
for (ImportTransformer attrTransform : attrTransforms) {
attributeValue = attrTransform.apply(attributeValue);
}
entity.setAttribute(attributeName, attributeValue);
}
}
return entity;
private void applyEntitySpecific(AtlasEntity entity, Map<String, List<ImportTransformer>> entityTransforms) throws AtlasBaseException {
if(entityTransforms.containsKey(ALL_ATTRIBUTES)) {
for (ImportTransformer attrTransform : entityTransforms.get(ALL_ATTRIBUTES)) {
attrTransform.apply(entity);
}
}
}
private ImportTransforms() {
......@@ -119,38 +136,58 @@ public class ImportTransforms {
private ImportTransforms(String jsonString) {
this();
if(jsonString != null) {
Map typeTransforms = AtlasType.fromJson(jsonString, Map.class);
if (MapUtils.isNotEmpty(typeTransforms)) {
for (Object key : typeTransforms.keySet()) {
Object value = typeTransforms.get(key);
String entityType = (String) key;
Map<String, Object> attributeTransforms = (Map<String, Object>)value;
if (MapUtils.isNotEmpty(attributeTransforms)) {
for (Map.Entry<String, Object> e : attributeTransforms.entrySet()) {
String attributeName = e.getKey();
List<String> transforms = (List<String>)e.getValue();
if (CollectionUtils.isNotEmpty(transforms)) {
for (String transform : transforms) {
ImportTransformer transformers = null;
try {
transformers = ImportTransformer.getTransformer(transform);
} catch (AtlasBaseException ex) {
LOG.error("Error converting string to ImportTransformer: {}", transform, ex);
}
if (transformers != null) {
add(entityType, attributeName, transformers);
}
}
}
}
}
if (StringUtils.isEmpty(jsonString)) {
return;
}
Map typeTransforms = AtlasType.fromJson(jsonString, Map.class);
if (MapUtils.isEmpty(typeTransforms)) {
return;
}
addOuterMap(typeTransforms);
}
private void addOuterMap(Map typeTransforms) {
for (Object key : typeTransforms.keySet()) {
Object value = typeTransforms.get(key);
String entityType = (String) key;
Map<String, Object> attributeTransforms = (Map<String, Object>)value;
if (MapUtils.isEmpty(attributeTransforms)) {
continue;
}
addInnerMap(entityType, attributeTransforms);
}
}
private void addInnerMap(String entityType, Map<String, Object> attributeTransforms) {
for (Map.Entry<String, Object> e : attributeTransforms.entrySet()) {
String attributeName = e.getKey();
List<String> transforms = (List<String>)e.getValue();
if (CollectionUtils.isEmpty(transforms)) {
continue;
}
addTransforms(entityType, attributeName, transforms);
}
}
private void addTransforms(String entityType, String attributeName, List<String> transforms) {
for (String transform : transforms) {
ImportTransformer transformers = null;
try {
transformers = ImportTransformer.getTransformer(transform);
if (transformers == null) {
continue;
}
add(entityType, attributeName, transformers);
} catch (AtlasBaseException ex) {
LOG.error("Error converting string to ImportTransformer: {}", transform, ex);
}
}
}
......@@ -158,21 +195,16 @@ public class ImportTransforms {
private void add(String typeName, String attributeName, ImportTransformer transformer) {
Map<String, List<ImportTransformer>> attrMap;
if(transforms.containsKey(typeName)) {
attrMap = transforms.get(typeName);
} else {
if(!transforms.containsKey(typeName)) {
attrMap = new HashMap<>();
transforms.put(typeName, attrMap);
}
List<ImportTransformer> list;
if(attrMap.containsKey(attributeName)) {
list = attrMap.get(attributeName);
} else {
list = new ArrayList<>();
attrMap.put(attributeName, list);
attrMap = transforms.get(typeName);
if(!attrMap.containsKey(attributeName)) {
attrMap.put(attributeName, new ArrayList<ImportTransformer>());
}
list.add(transformer);
attrMap.get(attributeName).add(transformer);
}
}
......@@ -128,6 +128,7 @@ public class ImportTransformsTest {
t.apply(entity);
assertEquals(entity.getClassifications().size(), 0);
assertNotNull(t);
assertEquals(entity.getAttribute(ATTR_NAME_QUALIFIED_NAME), expected_qualifiedName);
}
......@@ -145,6 +146,7 @@ public class ImportTransformsTest {
assertNotNull(t);
assertEquals(entity.getAttribute(ATTR_NAME_QUALIFIED_NAME), expected_qualifiedName);
assertEquals(entity.getAttribute(HIVE_TABLE_ATTR_SYNC_INFO), new ArrayList<String>() {{ add(expected_syncInfo); }});
}
......@@ -161,6 +163,8 @@ public class ImportTransformsTest {
t.apply(entity);
assertNotNull(t);
assertNull(entity.getAttribute(HIVE_TABLE_ATTR_REPLICATED_FROM));
assertNull(entity.getAttribute(HIVE_TABLE_ATTR_REPLICATED_TO));
}
@Test
......@@ -173,6 +177,8 @@ public class ImportTransformsTest {
t.apply(entity);
assertNotNull(t);
assertNull(entity.getAttribute(HIVE_TABLE_ATTR_REPLICATED_FROM));
assertNull(entity.getAttribute(HIVE_TABLE_ATTR_REPLICATED_TO));
}
@Test
......@@ -185,6 +191,7 @@ public class ImportTransformsTest {
t.apply(entity);
assertNotNull(t);
assertEquals(entity.getStatus(), AtlasEntity.Status.DELETED);
}
@Test
......
......@@ -32,15 +32,13 @@ import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.InputStream;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
public class AdminExportImportTestIT extends BaseResourceIT {
......@@ -69,10 +67,10 @@ public class AdminExportImportTestIT extends BaseResourceIT {
final int EXPECTED_CREATION_ORDER_SIZE = 10;
AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(".", EXPORT_REQUEST_FILE, AtlasExportRequest.class);
byte[] exportedBytes = atlasClientV2.exportData(request);
assertNotNull(exportedBytes);
InputStream exportedStream = atlasClientV2.exportData(request);
assertNotNull(exportedStream);
ZipSource zs = new ZipSource(new ByteArrayInputStream(exportedBytes));
ZipSource zs = new ZipSource(exportedStream);
assertNotNull(zs.getExportResult());
assertTrue(zs.getCreationOrder().size() > EXPECTED_CREATION_ORDER_SIZE);
}
......@@ -87,14 +85,15 @@ public class AdminExportImportTestIT extends BaseResourceIT {
private void performImport(String fileToImport, AtlasImportRequest request) throws AtlasServiceException {
byte[] fileBytes = new byte[0];
FileInputStream fileInputStream = null;
try {
fileBytes = Files.readAllBytes(Paths.get(TestResourceFileUtils.getTestFilePath(fileToImport)));
fileInputStream = new FileInputStream(TestResourceFileUtils.getTestFilePath(fileToImport));
} catch (IOException e) {
assertFalse(true, "Exception: " + e.getMessage());
}
AtlasImportResult result = atlasClientV2.importData(request, fileBytes);
AtlasImportResult result = atlasClientV2.importData(request, fileInputStream);
assertNotNull(result);
assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS);
assertNotNull(result.getMetrics());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment