Commit 4c9c3bb5 by ashutoshm Committed by Madhan Neethiraj

ATLAS-1825: updated import to support optional transformation of attribute values

parent d2198bb4
......@@ -39,21 +39,18 @@ import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONL
@XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY)
public class AtlasImportRequest implements Serializable {
private static final long serialVersionUID = 1L;
private static final long serialVersionUID = 1L;
public static final String TRANSFORMS_KEY = "transforms";
private Map<String, Object> options;
private Map<String, String> options;
public AtlasImportRequest() {
this.options = new HashMap<>();
}
public AtlasImportRequest(Map<String, Object> options) {
this.options = options;
}
public Map<String, Object> getOptions() { return options; }
public Map<String, String> getOptions() { return options; }
public void setOptions(Map<String, Object> options) { this.options = options; }
public void setOptions(Map<String, String> options) { this.options = options; }
public StringBuilder toString(StringBuilder sb) {
if (sb == null) {
......
......@@ -960,6 +960,12 @@
</dependency>
<dependency>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-multipart</artifactId>
<version>${jersey.version}</version>
</dependency>
<dependency>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
<version>2.0</version>
......
......@@ -30,6 +30,7 @@ import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
......@@ -44,7 +45,7 @@ public class ImportService {
private static final Logger LOG = LoggerFactory.getLogger(ImportService.class);
private final AtlasTypeDefStore typeDefStore;
private final AtlasEntityStore entityStore;
private final AtlasEntityStore entityStore;
private final AtlasTypeRegistry typeRegistry;
private long startTimestamp;
......@@ -53,7 +54,7 @@ public class ImportService {
public ImportService(final AtlasTypeDefStore typeDefStore, final AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
this.typeDefStore = typeDefStore;
this.entityStore = entityStore;
this.entityStore = entityStore;
this.typeRegistry = typeRegistry;
}
......@@ -62,8 +63,12 @@ public class ImportService {
AtlasImportResult result = new AtlasImportResult(request, userName, requestingIP, hostName, System.currentTimeMillis());
try {
LOG.info("==> import(user={}, from={})", userName, requestingIP);
String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(AtlasImportRequest.TRANSFORMS_KEY) : null;
source.setImportTransform(ImportTransforms.fromJson(transforms));
startTimestamp = System.currentTimeMillis();
processTypes(source.getTypesDef(), result);
processEntities(source, result);
......@@ -86,8 +91,8 @@ public class ImportService {
}
public AtlasImportResult run(AtlasImportRequest request, String userName, String hostName, String requestingIP)
throws AtlasBaseException {
String fileName = (String)request.getOptions().get("FILENAME");
throws AtlasBaseException {
String fileName = (String) request.getOptions().get("FILENAME");
if (StringUtils.isBlank(fileName)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "FILENAME parameter not found");
......@@ -98,8 +103,9 @@ public class ImportService {
try {
LOG.info("==> import(user={}, from={}, fileName={})", userName, requestingIP, fileName);
File file = new File(fileName);
ZipSource source = new ZipSource(new ByteArrayInputStream(FileUtils.readFileToByteArray(file)));
String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(AtlasImportRequest.TRANSFORMS_KEY) : null;
File file = new File(fileName);
ZipSource source = new ZipSource(new ByteArrayInputStream(FileUtils.readFileToByteArray(file)), ImportTransforms.fromJson(transforms));
result = run(source, request, userName, hostName, requestingIP);
} catch (AtlasBaseException excp) {
......@@ -116,7 +122,7 @@ public class ImportService {
throw new AtlasBaseException(excp);
} finally {
LOG.info("<== import(user={}, from={}, fileName={}): status={}", userName, requestingIP, fileName,
(result == null ? AtlasImportResult.OperationStatus.FAIL : result.getOperationStatus()));
(result == null ? AtlasImportResult.OperationStatus.FAIL : result.getOperationStatus()));
}
return result;
......@@ -142,19 +148,19 @@ public class ImportService {
}
private void setGuidToEmpty(AtlasTypesDef typesDef) {
for (AtlasEntityDef def: typesDef.getEntityDefs()) {
for (AtlasEntityDef def : typesDef.getEntityDefs()) {
def.setGuid(null);
}
for (AtlasClassificationDef def: typesDef.getClassificationDefs()) {
for (AtlasClassificationDef def : typesDef.getClassificationDefs()) {
def.setGuid(null);
}
for (AtlasEnumDef def: typesDef.getEnumDefs()) {
for (AtlasEnumDef def : typesDef.getEnumDefs()) {
def.setGuid(null);
}
for (AtlasStructDef def: typesDef.getStructDefs()) {
for (AtlasStructDef def : typesDef.getStructDefs()) {
def.setGuid(null);
}
}
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.commons.lang.StringUtils;
public abstract class ImportTransformer {
private static final String TRANSFORMER_PARAMETER_SEPARATOR = "\\:";
private final String transformType;
public static ImportTransformer getTransformer(String transformerSpec) throws AtlasBaseException {
String[] params = StringUtils.split(transformerSpec, TRANSFORMER_PARAMETER_SEPARATOR);
String key = (params == null || params.length < 1) ? transformerSpec : params[0];
final ImportTransformer ret;
if (StringUtils.isEmpty(key)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "Error creating ImportTransformer. Invalid transformer-specification: {}.", transformerSpec);
} else if (key.equals("replace")) {
String toFindStr = (params == null || params.length < 2) ? "" : params[1];
String replaceStr = (params == null || params.length < 3) ? "" : params[2];
ret = new Replace(toFindStr, replaceStr);
} else if (key.equals("lowercase")) {
ret = new Lowercase();
} else if (key.equals("uppercase")) {
ret = new Uppercase();
} else {
throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "Error creating ImportTransformer. Unknown transformer: {}.", transformerSpec);
}
return ret;
}
public String getTransformType() { return transformType; }
public abstract Object apply(Object o) throws AtlasBaseException;
protected ImportTransformer(String transformType) {
this.transformType = transformType;
}
static class Replace extends ImportTransformer {
private final String toFindStr;
private final String replaceStr;
public Replace(String toFindStr, String replaceStr) {
super("replace");
this.toFindStr = toFindStr;
this.replaceStr = replaceStr;
}
public String getToFindStr() { return toFindStr; }
public String getReplaceStr() { return replaceStr; }
@Override
public Object apply(Object o) throws AtlasBaseException {
Object ret = o;
if(o instanceof String) {
ret = StringUtils.replace((String) o, toFindStr, replaceStr);
}
return ret;
}
}
static class Lowercase extends ImportTransformer {
public Lowercase() {
super("lowercase");
}
@Override
public Object apply(Object o) {
Object ret = o;
if(o instanceof String) {
ret = StringUtils.lowerCase((String) o);
}
return ret;
}
}
static class Uppercase extends ImportTransformer {
public Uppercase() {
super("uppercase");
}
@Override
public Object apply(Object o) {
Object ret = o;
if(o instanceof String) {
ret = StringUtils.upperCase((String) o);
}
return ret;
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ImportTransforms {
private static final Logger LOG = LoggerFactory.getLogger(ImportTransforms.class);
private Map<String, Map<String, List<ImportTransformer>>> transforms;
public static ImportTransforms fromJson(String jsonString) {
ImportTransforms ret = null;
if (StringUtils.isNotBlank(jsonString)) {
ret = new ImportTransforms(jsonString);
}
return ret;
}
public Map<String, Map<String, List<ImportTransformer>>> getTransforms() {
return transforms;
}
public Map<String, List<ImportTransformer>> getTransforms(String typeName) { return transforms.get(typeName); }
public AtlasEntity.AtlasEntityWithExtInfo apply(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException {
if (entityWithExtInfo != null) {
apply(entityWithExtInfo.getEntity());
if(MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) {
for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
apply(e);
}
}
}
return entityWithExtInfo;
}
public AtlasEntity apply(AtlasEntity entity) throws AtlasBaseException {
if(entity != null) {
Map<String, List<ImportTransformer>> entityTransforms = getTransforms(entity.getTypeName());
if (MapUtils.isNotEmpty(entityTransforms)) {
for (Map.Entry<String, List<ImportTransformer>> entry : entityTransforms.entrySet()) {
String attributeName = entry.getKey();
List<ImportTransformer> attrTransforms = entry.getValue();
if (!entity.hasAttribute(attributeName)) {
continue;
}
Object transformedValue = entity.getAttribute(attributeName);
for (ImportTransformer attrTransform : attrTransforms) {
transformedValue = attrTransform.apply(transformedValue);
}
entity.setAttribute(attributeName, transformedValue);
}
}
}
return entity;
}
private ImportTransforms() {
transforms = new HashMap<>();
}
private ImportTransforms(String jsonString) {
this();
if(jsonString != null) {
Map typeTransforms = AtlasType.fromJson(jsonString, Map.class);
if (MapUtils.isNotEmpty(typeTransforms)) {
for (Object key : typeTransforms.keySet()) {
Object value = typeTransforms.get(key);
String entityType = (String) key;
Map<String, Object> attributeTransforms = (Map<String, Object>)value;
if (MapUtils.isNotEmpty(attributeTransforms)) {
for (Map.Entry<String, Object> e : attributeTransforms.entrySet()) {
String attributeName = e.getKey();
List<String> transforms = (List<String>)e.getValue();
if (CollectionUtils.isNotEmpty(transforms)) {
for (String transform : transforms) {
ImportTransformer transformers = null;
try {
transformers = ImportTransformer.getTransformer(transform);
} catch (AtlasBaseException ex) {
LOG.error("Error converting string to ImportTransformer: {}", transform, ex);
}
if (transformers != null) {
add(entityType, attributeName, transformers);
}
}
}
}
}
}
}
}
}
private void add(String typeName, String attributeName, ImportTransformer transformer) {
Map<String, List<ImportTransformer>> attrMap;
if(transforms.containsKey(typeName)) {
attrMap = transforms.get(typeName);
} else {
attrMap = new HashMap<>();
transforms.put(typeName, attrMap);
}
List<ImportTransformer> list;
if(attrMap.containsKey(attributeName)) {
list = attrMap.get(attributeName);
} else {
list = new ArrayList<>();
attrMap.put(attributeName, list);
}
list.add(transformer);
}
}
......@@ -44,17 +44,29 @@ import static org.apache.atlas.AtlasErrorCode.JSON_ERROR_OBJECT_MAPPER_NULL_RETU
public class ZipSource implements EntityImportStream {
private static final Logger LOG = LoggerFactory.getLogger(ZipSource.class);
private final InputStream inputStream;
private List<String> creationOrder;
private Iterator<String> iterator;
private Map<String, String> guidEntityJsonMap;
private final InputStream inputStream;
private List<String> creationOrder;
private Iterator<String> iterator;
private Map<String, String> guidEntityJsonMap;
private ImportTransforms importTransform;
public ZipSource(InputStream inputStream) throws IOException {
this.inputStream = inputStream;
guidEntityJsonMap = new HashMap<>();
this(inputStream, null);
}
public ZipSource(InputStream inputStream, ImportTransforms importTransform) throws IOException {
this.inputStream = inputStream;
this.guidEntityJsonMap = new HashMap<>();
this.importTransform = importTransform;
updateGuidZipEntryMap();
this.setCreationOrder();
setCreationOrder();
}
public ImportTransforms getImportTransform() { return this.importTransform; }
public void setImportTransform(ImportTransforms importTransform) {
this.importTransform = importTransform;
}
public AtlasTypesDef getTypesDef() throws AtlasBaseException {
......@@ -113,8 +125,13 @@ public class ZipSource implements EntityImportStream {
}
public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException {
String s = (String) getFromCache(guid);
String s = getFromCache(guid);
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, s);
if (importTransform != null) {
entityWithExtInfo = importTransform.apply(entityWithExtInfo);
}
return entityWithExtInfo;
}
......@@ -193,7 +210,8 @@ public class ZipSource implements EntityImportStream {
@Override
public AtlasEntity getByGuid(String guid) {
try {
return getEntity(guid);
AtlasEntity entity = getEntity(guid);
return entity;
} catch (AtlasBaseException e) {
e.printStackTrace();
return null;
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.type.AtlasType;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
public class AtlasImportRequestTest {
@Test
public void serializeAtlasImportRequstFromJsonWithEmptyOptions() {
String jsonData = "{ \"options\": {} }";
AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class);
assertNotNull(request);
assertNotNull(request.getOptions());
assertNull(request.getOptions().get(AtlasImportRequest.TRANSFORMS_KEY));
ImportTransforms tr = ImportTransforms.fromJson(request.getOptions().get(AtlasImportRequest.TRANSFORMS_KEY));
assertNull(tr);
}
@Test
public void serializeAtlasImportRequstFromJsonWithEmptyTransforms() {
String jsonData = "{ \"options\": { \"transforms\": \"{ }\" } }";
AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class);
assertNotNull(request);
assertNotNull(request.getOptions());
assertNotNull(request.getOptions().get(AtlasImportRequest.TRANSFORMS_KEY));
ImportTransforms tr = ImportTransforms.fromJson(request.getOptions().get(AtlasImportRequest.TRANSFORMS_KEY));
assertNotNull(tr);
assertNotNull(tr.getTransforms());
assertEquals(tr.getTransforms().size(), 0);
}
@Test
public void serializeAtlasImportRequstFromJsonWith1Transform() {
String jsonData = "{ \"options\": { \"transforms\": \"{ \\\"hive_db\\\": { \\\"qualifiedName\\\": [ \\\"replace:@cl1:@cl2\\\" ] } }\" } }";
AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class);
assertNotNull(request);
assertNotNull(request.getOptions());
assertNotNull(request.getOptions().get(AtlasImportRequest.TRANSFORMS_KEY));
ImportTransforms tr = ImportTransforms.fromJson(request.getOptions().get(AtlasImportRequest.TRANSFORMS_KEY));
assertNotNull(tr);
assertNotNull(tr.getTransforms());
assertEquals(tr.getTransforms().size(), 1);
assertTrue(tr.getTransforms().containsKey("hive_db"));
assertEquals(tr.getTransforms("hive_db").entrySet().size(), 1);
assertTrue(tr.getTransforms("hive_db").containsKey("qualifiedName"));
assertEquals(tr.getTransforms("hive_db").get("qualifiedName").size(), 1);
}
@Test
public void serializeAtlasImportRequstFromJson() {
String jsonData = "{ \"options\": { \"transforms\": \"{ \\\"hive_db\\\": { \\\"qualifiedName\\\": [ \\\"replace:@cl1:@cl2\\\" ] }, \\\"hive_table\\\": { \\\"qualifiedName\\\": [ \\\"lowercase\\\", \\\"replace:@cl1:@cl2\\\" ] } }\" } } }";
AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class);
assertNotNull(request);
assertNotNull(request.getOptions());
assertNotNull(request.getOptions().get(AtlasImportRequest.TRANSFORMS_KEY));
ImportTransforms tr = ImportTransforms.fromJson(request.getOptions().get(AtlasImportRequest.TRANSFORMS_KEY));
assertNotNull(tr);
assertNotNull(tr.getTransforms());
assertEquals(tr.getTransforms().size(), 2);
assertTrue(tr.getTransforms().containsKey("hive_db"));
assertEquals(tr.getTransforms("hive_db").entrySet().size(), 1);
assertTrue(tr.getTransforms("hive_db").containsKey("qualifiedName"));
assertEquals(tr.getTransforms("hive_db").get("qualifiedName").size(), 1);
assertTrue(tr.getTransforms().containsKey("hive_table"));
assertEquals(tr.getTransforms("hive_table").entrySet().size(), 1);
assertTrue(tr.getTransforms("hive_table").containsKey("qualifiedName"));
assertEquals(tr.getTransforms("hive_table").get("qualifiedName").size(), 2);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import com.google.inject.Inject;
import org.apache.atlas.TestModules;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Guice;
@Guice(modules = TestModules.TestOnlyModule.class)
public class ImportServiceReportingTest {
private static final Logger LOG = LoggerFactory.getLogger(ImportServiceReportingTest.class);
@Inject
AtlasTypeRegistry typeRegistry;
@Inject
private AtlasTypeDefStore typeDefStore;
@Inject
private AtlasEntityStore entityStore;
}
......@@ -81,5 +81,4 @@ public class ImportServiceTest {
loadModelFromJson("0010-base_model.json", typeDefStore, typeRegistry);
runAndVerifyQuickStart_v1_Import(new ImportService(typeDefStore, entityStore, typeRegistry), zipSource);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import org.testng.Assert;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
public class ImportTransformerJSONTest {
@Test
public void createAtlasImportTransformFromJson() throws Exception {
String hiveTableType = "hive_table";
String qualifiedName = "qualifiedName";
String jsonTransforms = "{ \"hive_table\": { \"qualifiedName\":[ \"lowercase\", \"replace:@cl1:@cl2\" ] } }";
ImportTransforms transforms = ImportTransforms.fromJson(jsonTransforms);
assertNotNull(transforms);
assertEquals(transforms.getTransforms().entrySet().size(), 1);
assertEquals(transforms.getTransforms().get(hiveTableType).entrySet().size(), 1);
assertEquals(transforms.getTransforms().get(hiveTableType).get(qualifiedName).size(), 2);
Assert.assertEquals(transforms.getTransforms().get(hiveTableType).get(qualifiedName).get(0).getTransformType(), "lowercase");
assertEquals(transforms.getTransforms().get(hiveTableType).get(qualifiedName).get(1).getTransformType(), "replace");
assertTrue(transforms.getTransforms().get(hiveTableType).get(qualifiedName).get(1) instanceof ImportTransformer.Replace);
assertEquals(((ImportTransformer.Replace)transforms.getTransforms().get(hiveTableType).get(qualifiedName).get(1)).getToFindStr(), "@cl1");
assertEquals(((ImportTransformer.Replace)transforms.getTransforms().get(hiveTableType).get(qualifiedName).get(1)).getReplaceStr(), "@cl2");
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import org.apache.atlas.exception.AtlasBaseException;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
public class ImportTransformerTest {
@Test
public void createWithCorrectParameters() throws AtlasBaseException, IllegalAccessException {
String param1 = "@cl1";
String param2 = "@cl2";
ImportTransformer e = ImportTransformer.getTransformer(String.format("%s:%s:%s", "replace", param1, param2));
assertTrue(e instanceof ImportTransformer.Replace);
assertEquals(((ImportTransformer.Replace)e).getToFindStr(), param1);
assertEquals(((ImportTransformer.Replace)e).getReplaceStr(), param2);
}
@Test
public void createSeveralWithCorrectParameters() throws AtlasBaseException, IllegalAccessException {
String param1 = "@cl1";
String param2 = "@cl2";
ImportTransformer e1 = ImportTransformer.getTransformer(String.format("%s:%s:%s", "replace", param1, param2));
ImportTransformer e2 = ImportTransformer.getTransformer(String.format("replace:tt1:tt2"));
assertTrue(e1 instanceof ImportTransformer.Replace);
assertEquals(((ImportTransformer.Replace)e1).getToFindStr(), param1);
assertEquals(((ImportTransformer.Replace)e1).getReplaceStr(), param2);
assertTrue(e2 instanceof ImportTransformer.Replace);
assertEquals(((ImportTransformer.Replace)e2).getToFindStr(), "tt1");
assertEquals(((ImportTransformer.Replace)e2).getReplaceStr(), "tt2");
}
@Test
public void createWithDefaultParameters() throws AtlasBaseException {
ImportTransformer e1 = ImportTransformer.getTransformer("replace:@cl1");
ImportTransformer e2 = ImportTransformer.getTransformer("replace");
assertTrue(e1 instanceof ImportTransformer.Replace);
assertEquals(((ImportTransformer.Replace)e1).getToFindStr(), "@cl1");
assertEquals(((ImportTransformer.Replace)e1).getReplaceStr(), "");
assertTrue(e2 instanceof ImportTransformer.Replace);
assertEquals(((ImportTransformer.Replace)e2).getToFindStr(), "");
assertEquals(((ImportTransformer.Replace)e2).getReplaceStr(), "");
}
@Test
public void applyLowercaseTransformer() throws AtlasBaseException {
ImportTransformer e = ImportTransformer.getTransformer("lowercase");
assertEquals(e.apply("@CL1"), "@cl1");
assertEquals(e.apply("@cl1"), "@cl1");
assertEquals(e.apply(""), ""); // empty string
assertEquals(e.apply(null), null); // null value: no change
assertEquals(e.apply(Integer.valueOf(5)), Integer.valueOf(5)); // non-string value: no change
}
@Test
public void applyUppercaseTransformer() throws AtlasBaseException {
ImportTransformer e = ImportTransformer.getTransformer("uppercase");
assertEquals(e.apply("@CL1"), "@CL1");
assertEquals(e.apply("@cl1"), "@CL1");
assertEquals(e.apply(""), ""); // empty string
assertEquals(e.apply(null), null); // null value: no change
assertEquals(e.apply(Integer.valueOf(5)), Integer.valueOf(5)); // non-string value: no change
}
@Test
public void applyReplaceTransformer1() throws AtlasBaseException {
ImportTransformer e = ImportTransformer.getTransformer("replace:@cl1:@cl2");
assertEquals(e.apply("@cl1"), "@cl2");
assertEquals(e.apply("default@cl1"), "default@cl2");
assertEquals(e.apply("@cl11"), "@cl21");
assertEquals(e.apply("@cl2"), "@cl2");
assertEquals(e.apply(""), ""); // empty string
assertEquals(e.apply(null), null); // null value
assertEquals(e.apply(Integer.valueOf(5)), Integer.valueOf(5)); // non-string value: no change
}
@Test
public void applyReplaceTransformer2() throws AtlasBaseException {
ImportTransformer e = ImportTransformer.getTransformer("replace:@cl1");
assertEquals(e.apply("@cl1"), "");
assertEquals(e.apply("default@cl1"), "default");
assertEquals(e.apply("@cl11"), "1");
assertEquals(e.apply("@cl2"), "@cl2");
assertEquals(e.apply(""), ""); // empty string
assertEquals(e.apply(null), null); // null value
assertEquals(e.apply(Integer.valueOf(5)), Integer.valueOf(5)); // non-string value: no change
}
@Test
public void applyReplaceTransformer3() throws AtlasBaseException {
ImportTransformer e = ImportTransformer.getTransformer("replace");
assertEquals(e.apply("@cl1"), "@cl1");
assertEquals(e.apply("default@cl1"), "default@cl1");
assertEquals(e.apply("@cl11"), "@cl11");
assertEquals(e.apply("@cl2"), "@cl2");
assertEquals(e.apply(""), ""); // empty string
assertEquals(e.apply(null), null); // null value
assertEquals(e.apply(Integer.valueOf(5)), Integer.valueOf(5)); // non-string value: no change
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
public class ImportTransformsTest {
private final String qualifiedName = "qualifiedName";
private final String lowerCaseCL1 = "@cl1";
private final String lowerCaseCL2 = "@cl2";
private final String jsonTransforms = "{ \"hive_table\": { \"qualifiedName\":[ \"lowercase\", \"replace:@cl1:@cl2\" ] } }";
private ImportTransforms transform;
@BeforeTest
public void setup() throws AtlasBaseException {
transform = ImportTransforms.fromJson(jsonTransforms);
}
@Test
public void transformEntityWith2Transforms() throws AtlasBaseException {
AtlasEntity entity = getHiveTableAtlasEntity();
String attrValue = (String) entity.getAttribute(qualifiedName);
transform.apply(entity);
assertEquals(entity.getAttribute(qualifiedName), applyDefaultTransform(attrValue));
}
@Test
public void transformEntityWithExtInfo() throws AtlasBaseException {
addColumnTransform(transform);
AtlasEntityWithExtInfo entityWithExtInfo = getAtlasEntityWithExtInfo();
AtlasEntity entity = entityWithExtInfo.getEntity();
String attrValue = (String) entity.getAttribute(qualifiedName);
String[] expectedValues = getExtEntityExpectedValues(entityWithExtInfo);
transform.apply(entityWithExtInfo);
assertEquals(entityWithExtInfo.getEntity().getAttribute(qualifiedName), applyDefaultTransform(attrValue));
for (int i = 0; i < expectedValues.length; i++) {
assertEquals(entityWithExtInfo.getReferredEntities().get(Integer.toString(i)).getAttribute(qualifiedName), expectedValues[i]);
}
}
@Test
public void transformEntityWithExtInfoNullCheck() throws AtlasBaseException {
addColumnTransform(transform);
AtlasEntityWithExtInfo entityWithExtInfo = getAtlasEntityWithExtInfo();
entityWithExtInfo.setReferredEntities(null);
AtlasEntityWithExtInfo transformedEntityWithExtInfo = transform.apply(entityWithExtInfo);
assertNotNull(transformedEntityWithExtInfo);
assertEquals(entityWithExtInfo.getEntity().getGuid(), transformedEntityWithExtInfo.getEntity().getGuid());
}
private String[] getExtEntityExpectedValues(AtlasEntityWithExtInfo entityWithExtInfo) {
String[] ret = new String[entityWithExtInfo.getReferredEntities().size()];
for (int i = 0; i < ret.length; i++) {
String attrValue = (String) entityWithExtInfo.getReferredEntities().get(Integer.toString(i)).getAttribute(qualifiedName);
ret[i] = attrValue.replace(lowerCaseCL1, lowerCaseCL2);
}
return ret;
}
private void addColumnTransform(ImportTransforms transform) throws AtlasBaseException {
Map<String, List<ImportTransformer>> tr = new HashMap<>();
List<ImportTransformer> trList = new ArrayList<>();
trList.add(ImportTransformer.getTransformer(String.format("replace:%s:%s", lowerCaseCL1, lowerCaseCL2)));
tr.put(qualifiedName, trList);
transform.getTransforms().put("hive_column", tr);
}
private String applyDefaultTransform(String attrValue) {
return attrValue.toLowerCase().replace(lowerCaseCL1, lowerCaseCL2);
}
private AtlasEntity getHiveTableAtlasEntity() {
AtlasEntity entity = new AtlasEntity("hive_table");
Map<String, Object> attributes = new HashMap<>();
attributes.put(qualifiedName, "TABLE1.default" + lowerCaseCL1);
attributes.put("dbname", "someDB");
attributes.put("name", "somename");
entity.setAttributes(attributes);
return entity;
}
private AtlasEntity getHiveColumnAtlasEntity(int index) {
AtlasEntity entity = new AtlasEntity("hive_column");
Map<String, Object> attributes = new HashMap<>();
attributes.put(qualifiedName, String.format("col%s.TABLE1.default@cl1", index));
attributes.put("name", "col" + index);
entity.setAttributes(attributes);
return entity;
}
private AtlasEntityWithExtInfo getAtlasEntityWithExtInfo() {
AtlasEntityWithExtInfo ret = new AtlasEntityWithExtInfo(getHiveTableAtlasEntity());
Map<String, AtlasEntity> referredEntities = new HashMap<>();
referredEntities.put("0", getHiveColumnAtlasEntity(1));
referredEntities.put("1", getHiveColumnAtlasEntity(2));
referredEntities.put("2", getHiveColumnAtlasEntity(3));
ret.setReferredEntities(referredEntities);
return ret;
}
}
......@@ -29,6 +29,10 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.util.List;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.AssertJUnit.assertTrue;
public class ZipSourceTest {
@DataProvider(name = "zipFileStocks")
public static Object[][] getDataFromZipFile() throws IOException {
......@@ -50,12 +54,12 @@ public class ZipSourceTest {
public void examineContents_BehavesAsExpected(ZipSource zipSource) throws IOException, AtlasBaseException {
List<String> creationOrder = zipSource.getCreationOrder();
Assert.assertNotNull(creationOrder);
Assert.assertEquals(creationOrder.size(), 4);
assertNotNull(creationOrder);
assertEquals(creationOrder.size(), 4);
AtlasTypesDef typesDef = zipSource.getTypesDef();
Assert.assertNotNull(typesDef);
Assert.assertEquals(typesDef.getEntityDefs().size(), 6);
assertNotNull(typesDef);
assertEquals(typesDef.getEntityDefs().size(), 6);
useCreationOrderToFetchEntitiesWithExtInfo(zipSource, creationOrder);
useCreationOrderToFetchEntities(zipSource, creationOrder);
......@@ -66,13 +70,13 @@ public class ZipSourceTest {
private void useCreationOrderToFetchEntities(ZipSource zipSource, List<String> creationOrder) {
for (String guid : creationOrder) {
AtlasEntity e = zipSource.getByGuid(guid);
Assert.assertNotNull(e);
assertNotNull(e);
}
}
private void verifyGuidRemovalOnImportComplete(ZipSource zipSource, String guid) {
AtlasEntity e = zipSource.getByGuid(guid);
Assert.assertNotNull(e);
assertNotNull(e);
zipSource.onImportComplete(guid);
......@@ -88,7 +92,7 @@ public class ZipSourceTest {
private void useCreationOrderToFetchEntitiesWithExtInfo(ZipSource zipSource, List<String> creationOrder) throws AtlasBaseException {
for (String guid : creationOrder) {
AtlasEntity.AtlasEntityExtInfo e = zipSource.getEntityWithExtInfo(guid);
Assert.assertNotNull(e);
assertNotNull(e);
}
}
......@@ -100,10 +104,35 @@ public class ZipSourceTest {
for (int i = 0; i < creationOrder.size(); i++) {
AtlasEntity e = zipSource.next();
Assert.assertNotNull(e);
Assert.assertEquals(e.getGuid(), creationOrder.get(i));
assertNotNull(e);
assertEquals(e.getGuid(), creationOrder.get(i));
}
Assert.assertFalse(zipSource.hasNext());
}
@Test(dataProvider = "zipFileStocks")
public void applyTransformation(ZipSource zipSource) throws IOException, AtlasBaseException {
ImportTransforms transforms = getTransformForHiveDB();
zipSource.setImportTransform(transforms);
Assert.assertTrue(zipSource.hasNext());
List<String> creationOrder = zipSource.getCreationOrder();
for (int i = 0; i < creationOrder.size(); i++) {
AtlasEntity e = zipSource.next();
if(e.getTypeName().equals("hive_db")) {
Object o = e.getAttribute("qualifiedName");
String s = (String) o;
assertNotNull(e);
assertTrue(s.contains("@cl2"));
break;
}
}
}
private ImportTransforms getTransformForHiveDB() {
ImportTransforms tr = ImportTransforms.fromJson("{ \"hive_db\": { \"qualifiedName\": [ \"replace:@cl1:@cl2\" ] } }");
return tr;
}
}
......@@ -261,6 +261,11 @@
</dependency>
<dependency>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-multipart</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
</dependency>
......
......@@ -18,6 +18,7 @@
package org.apache.atlas.web.resources;
import com.sun.jersey.multipart.FormDataParam;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasErrorCode;
......@@ -37,6 +38,7 @@ import org.apache.atlas.repository.impexp.ZipSource;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.services.MetricsService;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.web.filters.AtlasCSRFPreventionFilter;
import org.apache.atlas.web.service.ServiceState;
......@@ -69,8 +71,8 @@ import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
......@@ -362,10 +364,11 @@ public class AdminResource {
@POST
@Path("/import")
@Produces(Servlets.JSON_MEDIA_TYPE)
@Consumes(Servlets.BINARY)
public AtlasImportResult importData(byte[] bytes) throws AtlasBaseException {
@Consumes(MediaType.MULTIPART_FORM_DATA)
public AtlasImportResult importData(@FormDataParam("request") String jsonData,
@FormDataParam("data") InputStream inputStream) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> AdminResource.importData(bytes.length={})", bytes.length);
LOG.debug("==> AdminResource.importData(jsonData={}, inputStream={})", jsonData, (inputStream != null));
}
acquireExportImportLock("import");
......@@ -373,15 +376,13 @@ public class AdminResource {
AtlasImportResult result;
try {
AtlasImportRequest request = new AtlasImportRequest(Servlets.getParameterMap(httpServletRequest));
ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class);
ImportService importService = new ImportService(this.typesDefStore, this.entityStore, this.typeRegistry);
ZipSource zipSource = new ZipSource(inputStream);
result = importService.run(zipSource, request, Servlets.getUserName(httpServletRequest),
Servlets.getHostName(httpServletRequest),
AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest));
Servlets.getHostName(httpServletRequest),
AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest));
} catch (Exception excp) {
LOG.error("importData(binary) failed", excp);
......@@ -400,7 +401,7 @@ public class AdminResource {
@POST
@Path("/importfile")
@Produces(Servlets.JSON_MEDIA_TYPE)
public AtlasImportResult importFile() throws AtlasBaseException {
public AtlasImportResult importFile(String jsonData) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> AdminResource.importFile()");
}
......@@ -410,9 +411,8 @@ public class AdminResource {
AtlasImportResult result;
try {
AtlasImportRequest request = new AtlasImportRequest(Servlets.getParameterMap(httpServletRequest));
ImportService importService = new ImportService(this.typesDefStore, this.entityStore, this.typeRegistry);
AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class);
ImportService importService = new ImportService(this.typesDefStore, this.entityStore, this.typeRegistry);
result = importService.run(request, Servlets.getUserName(httpServletRequest),
Servlets.getHostName(httpServletRequest),
AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment