Commit 4b3c078c by Sarath Subramanian Committed by Ashutosh Mestry

ATLAS-2874: Include handling of Atlas Entity Transformers in current Import logic

parent 8746b306
...@@ -40,6 +40,7 @@ public class AtlasImportRequest implements Serializable { ...@@ -40,6 +40,7 @@ public class AtlasImportRequest implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public static final String TRANSFORMS_KEY = "transforms"; public static final String TRANSFORMS_KEY = "transforms";
public static final String TRANSFORMERS_KEY = "transformers";
public static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom"; public static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom";
private static final String START_POSITION_KEY = "startPosition"; private static final String START_POSITION_KEY = "startPosition";
private static final String START_GUID_KEY = "startGuid"; private static final String START_GUID_KEY = "startGuid";
......
...@@ -19,15 +19,17 @@ package org.apache.atlas.repository.impexp; ...@@ -19,15 +19,17 @@ package org.apache.atlas.repository.impexp;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.entitytransform.BaseEntityHandler;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.impexp.AttributeTransform;
import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.graph.BulkImporter; import org.apache.atlas.repository.store.graph.BulkImporter;
import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils; import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
...@@ -40,6 +42,11 @@ import java.io.ByteArrayInputStream; ...@@ -40,6 +42,11 @@ import java.io.ByteArrayInputStream;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMERS_KEY;
import static org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMS_KEY;
@Component @Component
public class ImportService { public class ImportService {
...@@ -82,9 +89,12 @@ public class ImportService { ...@@ -82,9 +89,12 @@ public class ImportService {
try { try {
LOG.info("==> import(user={}, from={}, request={})", userName, requestingIP, request); LOG.info("==> import(user={}, from={}, request={})", userName, requestingIP, request);
String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(AtlasImportRequest.TRANSFORMS_KEY) : null; String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(TRANSFORMS_KEY) : null;
setImportTransform(source, transforms); setImportTransform(source, transforms);
String transformers = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(TRANSFORMERS_KEY) : null;
setEntityTransformerHandlers(source, transformers);
startTimestamp = System.currentTimeMillis(); startTimestamp = System.currentTimeMillis();
processTypes(source.getTypesDef(), result); processTypes(source.getTypesDef(), result);
setStartPosition(request, source); setStartPosition(request, source);
...@@ -121,6 +131,38 @@ public class ImportService { ...@@ -121,6 +131,38 @@ public class ImportService {
} }
private void setEntityTransformerHandlers(ZipSource source, String transformersString) {
if (StringUtils.isEmpty(transformersString)) {
return;
}
Object transformersObj = AtlasType.fromJson(transformersString, Object.class);
List transformers = (transformersObj != null && transformersObj instanceof List) ? (List) transformersObj : null;
List<AttributeTransform> attributeTransforms = new ArrayList<>();
if (CollectionUtils.isNotEmpty(transformers)) {
for (Object transformer : transformers) {
String transformerStr = AtlasType.toJson(transformer);
AttributeTransform attributeTransform = AtlasType.fromJson(transformerStr, AttributeTransform.class);
if (attributeTransform == null) {
continue;
}
attributeTransforms.add(attributeTransform);
}
}
if (CollectionUtils.isNotEmpty(attributeTransforms)) {
List<BaseEntityHandler> entityHandlers = BaseEntityHandler.createEntityHandlers(attributeTransforms);
if (CollectionUtils.isNotEmpty(entityHandlers)) {
source.setEntityHandlers(entityHandlers);
}
}
}
private void debugLog(String s, Object... params) { private void debugLog(String s, Object... params) {
if(!LOG.isDebugEnabled()) return; if(!LOG.isDebugEnabled()) return;
...@@ -148,7 +190,7 @@ public class ImportService { ...@@ -148,7 +190,7 @@ public class ImportService {
try { try {
LOG.info("==> import(user={}, from={}, fileName={})", userName, requestingIP, fileName); LOG.info("==> import(user={}, from={}, fileName={})", userName, requestingIP, fileName);
String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(AtlasImportRequest.TRANSFORMS_KEY) : null; String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(TRANSFORMS_KEY) : null;
File file = new File(fileName); File file = new File(fileName);
ZipSource source = new ZipSource(new ByteArrayInputStream(FileUtils.readFileToByteArray(file)), ImportTransforms.fromJson(transforms)); ZipSource source = new ZipSource(new ByteArrayInputStream(FileUtils.readFileToByteArray(file)), ImportTransforms.fromJson(transforms));
result = run(source, request, userName, hostName, requestingIP); result = run(source, request, userName, hostName, requestingIP);
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
*/ */
package org.apache.atlas.repository.impexp; package org.apache.atlas.repository.impexp;
import org.apache.atlas.entitytransform.BaseEntityHandler;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportResult; import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
...@@ -24,6 +25,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; ...@@ -24,6 +25,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.graph.v2.EntityImportStream; import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -42,12 +44,13 @@ import java.util.zip.ZipInputStream; ...@@ -42,12 +44,13 @@ import java.util.zip.ZipInputStream;
public class ZipSource implements EntityImportStream { public class ZipSource implements EntityImportStream {
private static final Logger LOG = LoggerFactory.getLogger(ZipSource.class); private static final Logger LOG = LoggerFactory.getLogger(ZipSource.class);
private final InputStream inputStream; private final InputStream inputStream;
private List<String> creationOrder; private List<String> creationOrder;
private Iterator<String> iterator; private Iterator<String> iterator;
private Map<String, String> guidEntityJsonMap; private Map<String, String> guidEntityJsonMap;
private ImportTransforms importTransform; private ImportTransforms importTransform;
private int currentPosition; private List<BaseEntityHandler> entityHandlers;
private int currentPosition;
public ZipSource(InputStream inputStream) throws IOException { public ZipSource(InputStream inputStream) throws IOException {
this(inputStream, null); this(inputStream, null);
...@@ -68,6 +71,14 @@ public class ZipSource implements EntityImportStream { ...@@ -68,6 +71,14 @@ public class ZipSource implements EntityImportStream {
this.importTransform = importTransform; this.importTransform = importTransform;
} }
public List<BaseEntityHandler> getEntityHandlers() {
return entityHandlers;
}
public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) {
this.entityHandlers = entityHandlers;
}
public AtlasTypesDef getTypesDef() throws AtlasBaseException { public AtlasTypesDef getTypesDef() throws AtlasBaseException {
final String fileName = ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString(); final String fileName = ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString();
...@@ -123,17 +134,39 @@ public class ZipSource implements EntityImportStream { ...@@ -123,17 +134,39 @@ public class ZipSource implements EntityImportStream {
return this.creationOrder; return this.creationOrder;
} }
public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException { public AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException {
String s = getFromCache(guid); String s = getFromCache(guid);
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, s); AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntityWithExtInfo.class, s);
if (importTransform != null) { if (entityHandlers != null) {
applyTransformers(entityWithExtInfo);
} else if (importTransform != null) {
entityWithExtInfo = importTransform.apply(entityWithExtInfo); entityWithExtInfo = importTransform.apply(entityWithExtInfo);
} }
return entityWithExtInfo; return entityWithExtInfo;
} }
private void applyTransformers(AtlasEntityWithExtInfo entityWithExtInfo) {
if (entityWithExtInfo == null) {
return;
}
transform(entityWithExtInfo.getEntity());
if (MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) {
for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
transform(e);
}
}
}
private void transform(AtlasEntity e) {
for (BaseEntityHandler handler : entityHandlers) {
handler.transform(e);
}
}
private <T> T convertFromJson(Class<T> clazz, String jsonData) throws AtlasBaseException { private <T> T convertFromJson(Class<T> clazz, String jsonData) throws AtlasBaseException {
T t; T t;
try { try {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment