Commit fa2e5b49 by mayanknj Committed by Sarath Subramanian

ATLAS-3679 : Bulk import Business Metadata attribute.

parent 1e8fa7e5
...@@ -202,9 +202,9 @@ public final class Constants { ...@@ -202,9 +202,9 @@ public final class Constants {
public static final Integer INCOMPLETE_ENTITY_VALUE = Integer.valueOf(1); public static final Integer INCOMPLETE_ENTITY_VALUE = Integer.valueOf(1);
/* /*
* All supported file-format extensions for AtlasGlossaryTerms file upload * All supported file-format extensions for Bulk Imports through file upload
*/ */
public enum GlossaryImportSupportedFileExtensions { XLSX, XLS, CSV } public enum SupportedFileExtensions { XLSX, XLS, CSV }
private Constants() { private Constants() {
} }
......
...@@ -168,6 +168,7 @@ public enum AtlasErrorCode { ...@@ -168,6 +168,7 @@ public enum AtlasErrorCode {
BUSINESS_METADATA_ATTRIBUTE_DOES_NOT_EXIST(400, "ATLAS-400-00-096", "Business-metadata attribute does not exist in entity: {0}"), BUSINESS_METADATA_ATTRIBUTE_DOES_NOT_EXIST(400, "ATLAS-400-00-096", "Business-metadata attribute does not exist in entity: {0}"),
BUSINESS_METADATA_ATTRIBUTE_ALREADY_EXISTS(400, "ATLAS-400-00-097", "Business-metadata attribute already exists in entity: {0}"), BUSINESS_METADATA_ATTRIBUTE_ALREADY_EXISTS(400, "ATLAS-400-00-097", "Business-metadata attribute already exists in entity: {0}"),
INVALID_FILE_TYPE(400, "ATLAS-400-00-98", "The provided file type {0} is not supported."), INVALID_FILE_TYPE(400, "ATLAS-400-00-98", "The provided file type {0} is not supported."),
INVALID_BUSINESS_ATTRIBUTES_IMPORT_DATA(400, "ATLAS-400-00-99","The uploaded file was not processed due to following errors : {0}"),
UNAUTHORIZED_ACCESS(403, "ATLAS-403-00-001", "{0} is not authorized to perform {1}"), UNAUTHORIZED_ACCESS(403, "ATLAS-403-00-001", "{0} is not authorized to perform {1}"),
...@@ -192,6 +193,7 @@ public enum AtlasErrorCode { ...@@ -192,6 +193,7 @@ public enum AtlasErrorCode {
INSTANCE_GUID_DELETED(404, "ATLAS-404-00-012", "Given instance guid {0} has been deleted"), INSTANCE_GUID_DELETED(404, "ATLAS-404-00-012", "Given instance guid {0} has been deleted"),
NO_PROPAGATED_CLASSIFICATIONS_FOUND_FOR_ENTITY(404, "ATLAS-404-00-013", "No propagated classifications associated with entity: {0}"), NO_PROPAGATED_CLASSIFICATIONS_FOUND_FOR_ENTITY(404, "ATLAS-404-00-013", "No propagated classifications associated with entity: {0}"),
NO_DATA_FOUND(404, "ATLAS-404-00-014", "No data found in the uploaded file"), NO_DATA_FOUND(404, "ATLAS-404-00-014", "No data found in the uploaded file"),
FILE_NAME_NOT_FOUND(404, "ATLAS-404-00-015", "File name should not be blank"),
// All data conflict errors go here // All data conflict errors go here
TYPE_ALREADY_EXISTS(409, "ATLAS-409-00-001", "Given type {0} already exists"), TYPE_ALREADY_EXISTS(409, "ATLAS-409-00-001", "Given type {0} already exists"),
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.bulkimport;
import java.util.ArrayList;
import java.util.List;
public class BulkImportResponse {
private List<ImportInfo> failedImportInfoList = new ArrayList<ImportInfo>();
private List<ImportInfo> successImportInfoList = new ArrayList<ImportInfo>();
public BulkImportResponse() {}
public List<ImportInfo> getFailedImportInfoList() {
return failedImportInfoList;
}
public void setFailedImportInfoList(List<ImportInfo> failedImportInfoList){
this.failedImportInfoList = failedImportInfoList;
}
public void setFailedImportInfoList(ImportInfo importInfo){
List<ImportInfo> failedImportInfoList = this.failedImportInfoList;
if (failedImportInfoList == null) {
failedImportInfoList = new ArrayList<>();
}
failedImportInfoList.add(importInfo);
setFailedImportInfoList(failedImportInfoList);
}
public List<ImportInfo> getSuccessImportInfoList() {
return successImportInfoList;
}
public void setSuccessImportInfoList(List<ImportInfo> successImportInfoList){
this.successImportInfoList = successImportInfoList;
}
public void setSuccessImportInfoList(ImportInfo importInfo){
List<ImportInfo> successImportInfoList = this.successImportInfoList;
if (successImportInfoList == null) {
successImportInfoList = new ArrayList<>();
}
successImportInfoList.add(importInfo);
setSuccessImportInfoList(successImportInfoList);
}
public enum ImportStatus {
SUCCESS, FAILED
}
@Override
public String toString() {
return "BulkImportResponse{" +
"failedImportInfoList=" + failedImportInfoList +
", successImportInfoList=" + successImportInfoList +
'}';
}
static public class ImportInfo {
private String parentObjectName;
private String childObjectName;
private ImportStatus importStatus;
private String remarks;
private Integer rowNumber;
public ImportInfo(String parentObjectName, String childObjectName, ImportStatus importStatus, String remarks, Integer rowNumber) {
this.parentObjectName = parentObjectName;
this.childObjectName = childObjectName;
this.importStatus = importStatus;
this.remarks = remarks;
this.rowNumber = rowNumber;
}
public ImportInfo(String parentObjectName, String childObjectName, ImportStatus importStatus) {
this(parentObjectName, childObjectName, importStatus, "",-1);
}
public ImportInfo( ImportStatus importStatus, String remarks, Integer rowNumber) {
this("","", importStatus, remarks, rowNumber);
}
public ImportInfo(String parentObjectName, String childObjectName) {
this(parentObjectName,childObjectName, ImportStatus.SUCCESS, "", -1);
}
public ImportInfo(String parentObjectName, String childObjectName, ImportStatus importStatus, String remarks) {
this(parentObjectName, childObjectName, importStatus, remarks, -1);
}
public String getParentObjectName() {
return parentObjectName;
}
public void setParentObjectName(String parentObjectName) {
this.parentObjectName = parentObjectName;
}
public String getChildObjectName() {
return childObjectName;
}
public void setChildObjectName(String childObjectName) {
this.childObjectName = childObjectName;
}
public String getRemarks() {
return remarks;
}
public void setRemarks(String remarks) {
this.remarks = remarks;
}
public ImportStatus getImportStatus() {
return importStatus;
}
public void setImportStatus(ImportStatus importStatus) {
this.importStatus = importStatus;
}
@Override
public String toString() {
return "ImportInfo{" +
"parentObjectName='" + parentObjectName + '\'' +
", childObjectName='" + childObjectName + '\'' +
", remarks='" + remarks + '\'' +
", importStatus=" + importStatus +
", rowNumber=" + rowNumber +
'}';
}
}
}
\ No newline at end of file
...@@ -35,8 +35,17 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef; ...@@ -35,8 +35,17 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef;
import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.type.AtlasTypeUtil; import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -66,7 +75,7 @@ import static org.apache.atlas.type.AtlasTypeUtil.createBusinessMetadataDef; ...@@ -66,7 +75,7 @@ import static org.apache.atlas.type.AtlasTypeUtil.createBusinessMetadataDef;
* Test utility class. * Test utility class.
*/ */
public final class TestUtilsV2 { public final class TestUtilsV2 {
private static final Logger LOG = LoggerFactory.getLogger(TestUtilsV2.class);
public static final long TEST_DATE_IN_LONG = 1418265358440L; public static final long TEST_DATE_IN_LONG = 1418265358440L;
public static final String TEST_USER = "testUser"; public static final String TEST_USER = "testUser";
...@@ -1545,4 +1554,35 @@ public final class TestUtilsV2 { ...@@ -1545,4 +1554,35 @@ public final class TestUtilsV2 {
typeDef.setCreatedBy(TestUtilsV2.TEST_USER); typeDef.setCreatedBy(TestUtilsV2.TEST_USER);
typeDef.setUpdatedBy(TestUtilsV2.TEST_USER); typeDef.setUpdatedBy(TestUtilsV2.TEST_USER);
} }
public static InputStream getFile(String subDir, String fileName){
final String userDir = System.getProperty("user.dir");
String filePath = getTestFilePath(userDir, subDir, fileName);
File file = new File(filePath);
InputStream fs = null;
try {
fs = new FileInputStream(file);
} catch (FileNotFoundException e) {
LOG.error("File could not be found at: {}", filePath, e);
}
return fs;
}
public static String getFileData(String subDir, String fileName)throws IOException {
final String userDir = System.getProperty("user.dir");
String filePath = getTestFilePath(userDir, subDir, fileName);
File f = new File(filePath);
String ret = FileUtils.readFileToString(f, "UTF-8");
return ret;
}
private static String getTestFilePath(String startPath, String subDir, String fileName) {
if (StringUtils.isNotEmpty(subDir)) {
return startPath + "/src/test/resources/" + subDir + "/" + fileName;
} else {
return startPath + "/src/test/resources/" + fileName;
}
}
} }
...@@ -29,7 +29,9 @@ import org.apache.atlas.model.instance.AtlasObjectId; ...@@ -29,7 +29,9 @@ import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.store.graph.v2.EntityStream; import org.apache.atlas.repository.store.graph.v2.EntityStream;
import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.bulkimport.BulkImportResponse;
import java.io.InputStream;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
...@@ -282,4 +284,13 @@ public interface AtlasEntityStore { ...@@ -282,4 +284,13 @@ public interface AtlasEntityStore {
* Add given labels to the given entity, if labels is null/empty, no labels will be added. * Add given labels to the given entity, if labels is null/empty, no labels will be added.
*/ */
void addLabels(String guid, Set<String> labels) throws AtlasBaseException; void addLabels(String guid, Set<String> labels) throws AtlasBaseException;
/**
*
* @param inputStream
* @param fileName
* @throws AtlasBaseException
*
*/
BulkImportResponse bulkCreateOrUpdateBusinessAttributes(InputStream inputStream, String fileName) throws AtlasBaseException;
} }
...@@ -40,19 +40,26 @@ import org.apache.atlas.model.instance.AtlasEntityHeader; ...@@ -40,19 +40,26 @@ import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasEntityHeaders; import org.apache.atlas.model.instance.AtlasEntityHeaders;
import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.EntityGraphDiscovery; import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext; import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate; import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
import org.apache.atlas.store.DeleteType; import org.apache.atlas.store.DeleteType;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasBusinessMetadataType.AtlasBusinessAttribute;
import org.apache.atlas.type.AtlasClassificationType; import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasBusinessMetadataType.AtlasBusinessAttribute; import org.apache.atlas.type.AtlasEnumType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute; import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil; import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.bulkimport.BulkImportResponse;
import org.apache.atlas.util.FileUtils;
import org.apache.atlas.utils.AtlasEntityUtil; import org.apache.atlas.utils.AtlasEntityUtil;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder; import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.atlas.utils.AtlasPerfTracer; import org.apache.atlas.utils.AtlasPerfTracer;
...@@ -64,9 +71,13 @@ import org.slf4j.LoggerFactory; ...@@ -64,9 +71,13 @@ import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.inject.Inject; import javax.inject.Inject;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -74,7 +85,9 @@ import java.util.Objects; ...@@ -74,7 +85,9 @@ import java.util.Objects;
import java.util.Set; import java.util.Set;
import static java.lang.Boolean.FALSE; import static java.lang.Boolean.FALSE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.*; import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PURGE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
import static org.apache.atlas.repository.Constants.IS_INCOMPLETE_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.IS_INCOMPLETE_PROPERTY_KEY;
import static org.apache.atlas.repository.graph.GraphHelper.getCustomAttributes; import static org.apache.atlas.repository.graph.GraphHelper.getCustomAttributes;
import static org.apache.atlas.repository.graph.GraphHelper.getTypeName; import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
...@@ -94,6 +107,7 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { ...@@ -94,6 +107,7 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
private final EntityGraphMapper entityGraphMapper; private final EntityGraphMapper entityGraphMapper;
private final EntityGraphRetriever entityRetriever; private final EntityGraphRetriever entityRetriever;
@Inject @Inject
public AtlasEntityStoreV2(DeleteHandlerDelegate deleteDelegate, AtlasTypeRegistry typeRegistry, public AtlasEntityStoreV2(DeleteHandlerDelegate deleteDelegate, AtlasTypeRegistry typeRegistry,
IAtlasEntityChangeNotifier entityChangeNotifier, EntityGraphMapper entityGraphMapper) { IAtlasEntityChangeNotifier entityChangeNotifier, EntityGraphMapper entityGraphMapper) {
...@@ -1531,4 +1545,229 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { ...@@ -1531,4 +1545,229 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, messages); throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, messages);
} }
} }
@Override
@GraphTransaction
public BulkImportResponse bulkCreateOrUpdateBusinessAttributes(InputStream inputStream, String fileName) throws AtlasBaseException {
BulkImportResponse ret = new BulkImportResponse();
try {
if (StringUtils.isBlank(fileName)) {
throw new AtlasBaseException(AtlasErrorCode.FILE_NAME_NOT_FOUND, fileName);
}
List<String[]> fileData = FileUtils.readFileData(fileName, inputStream);
Map<String, AtlasEntity> attributesToAssociate = getBusinessMetadataDefList(fileData, ret);
for (Map.Entry<String, AtlasEntity> entry : attributesToAssociate.entrySet()) {
AtlasEntity entity = entry.getValue();
try{
addOrUpdateBusinessAttributes(entity.getGuid(), entity.getBusinessAttributes(), true);
BulkImportResponse.ImportInfo successImportInfo = new BulkImportResponse.ImportInfo(entity.getAttribute(Constants.QUALIFIED_NAME).toString(), entity.getBusinessAttributes().toString());
ret.setSuccessImportInfoList(successImportInfo);
}catch(Exception e){
LOG.error("Error occurred while updating BusinessMetadata Attributes for Entity "+entity.getAttribute(Constants.QUALIFIED_NAME).toString());
BulkImportResponse.ImportInfo failedImportInfo = new BulkImportResponse.ImportInfo(entity.getAttribute(Constants.QUALIFIED_NAME).toString(), entity.getBusinessAttributes().toString(), BulkImportResponse.ImportStatus.FAILED, e.getMessage());
ret.setFailedImportInfoList(failedImportInfo);
}
}
} catch (IOException e) {
LOG.error("An Exception occurred while uploading the file : "+e.getMessage());
throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_UPLOAD, e);
}
return ret;
}
private Map<String, AtlasEntity> getBusinessMetadataDefList(List<String[]> fileData, BulkImportResponse bulkImportResponse) throws AtlasBaseException {
Map<String, AtlasEntity> ret = new HashMap<>();
Map<String, Map<String, Object>> newBMAttributes = new HashMap<>();
Map<String, AtlasVertex> vertexCache = new HashMap<>();
Map<String, Object> attribute = new HashMap<>();
for (int lineIndex = 0; lineIndex < fileData.size(); lineIndex++) {
List<String> failedTermMsgList = new ArrayList<>();
AtlasEntity atlasEntity = new AtlasEntity();
String[] record = fileData.get(lineIndex);
if (missingFieldsCheck(record, bulkImportResponse, lineIndex+1)) {
continue;
}
String typeName = record[FileUtils.TYPENAME_COLUMN_INDEX];
String uniqueAttrValue = record[FileUtils.UNIQUE_ATTR_VALUE_COLUMN_INDEX];
String bmAttribute = record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX];
String bmAttributeValue = record[FileUtils.BM_ATTR_VALUE_COLUMN_INDEX];
String uniqueAttrName = Constants.QUALIFIED_NAME;
if (record.length > FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX) {
uniqueAttrName = typeName+"."+record[FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX];
}
if (validateTypeName(typeName, bulkImportResponse, lineIndex+1)) {
continue;
}
String vertexKey = typeName + "_" + uniqueAttrName + "_" + uniqueAttrValue;
AtlasVertex atlasVertex = vertexCache.get(vertexKey);
if (atlasVertex == null) {
atlasVertex = AtlasGraphUtilsV2.findByTypeAndUniquePropertyName(typeName, uniqueAttrName, uniqueAttrValue);
}
if (atlasVertex == null) {
LOG.error("Provided UniqueAttributeValue is not valid : " + uniqueAttrValue + " at line #" + (lineIndex + 1));
failedTermMsgList.add("Provided UniqueAttributeValue is not valid : " + uniqueAttrValue + " at line #" + (lineIndex + 1));
}
vertexCache.put(vertexKey, atlasVertex);
String[] businessAttributeName = bmAttribute.split(FileUtils.ESCAPE_CHARACTER + ".");
if (validateBMAttributeName(uniqueAttrValue,bmAttribute,bulkImportResponse,lineIndex+1)) {
continue;
}
String bMName = businessAttributeName[0];
String bMAttributeName = businessAttributeName[1];
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
if (validateBMAttribute(uniqueAttrValue, businessAttributeName, entityType, bulkImportResponse,lineIndex+1)) {
continue;
}
AtlasBusinessAttribute atlasBusinessAttribute = entityType.getBusinessAttributes().get(bMName).get(bMAttributeName);
if (atlasBusinessAttribute.getAttributeType().getTypeCategory() == TypeCategory.ARRAY) {
AtlasArrayType arrayType = (AtlasArrayType) atlasBusinessAttribute.getAttributeType();
List attributeValueData;
if(arrayType.getElementType() instanceof AtlasEnumType){
attributeValueData = AtlasGraphUtilsV2.assignEnumValues(bmAttributeValue, (AtlasEnumType) arrayType.getElementType(), failedTermMsgList, lineIndex+1);
}else{
attributeValueData = assignMultipleValues(bmAttributeValue, arrayType.getElementTypeName(), failedTermMsgList, lineIndex+1);
}
attribute.put(bmAttribute, attributeValueData);
} else {
attribute.put(bmAttribute, bmAttributeValue);
}
if(failedMsgCheck(uniqueAttrValue,bmAttribute, failedTermMsgList, bulkImportResponse, lineIndex+1)) {
continue;
}
if(ret.containsKey(vertexKey)) {
atlasEntity = ret.get(vertexKey);
atlasEntity.setBusinessAttribute(bMName, bMAttributeName, attribute.get(bmAttribute));
ret.put(vertexKey, atlasEntity);
} else {
String guid = GraphHelper.getGuid(atlasVertex);
atlasEntity.setGuid(guid);
atlasEntity.setTypeName(typeName);
atlasEntity.setAttribute(Constants.QUALIFIED_NAME,uniqueAttrValue);
newBMAttributes = entityRetriever.getBusinessMetadata(atlasVertex) != null ? entityRetriever.getBusinessMetadata(atlasVertex) : newBMAttributes;
atlasEntity.setBusinessAttributes(newBMAttributes);
atlasEntity.setBusinessAttribute(bMName, bMAttributeName, attribute.get(bmAttribute));
ret.put(vertexKey, atlasEntity);
}
}
return ret;
}
private boolean validateTypeName(String typeName, BulkImportResponse bulkImportResponse, int lineIndex) {
boolean ret = false;
if(!typeRegistry.getAllEntityDefNames().contains(typeName)){
ret = true;
LOG.error("Invalid entity-type: " + typeName + " at line #" + lineIndex);
String failedTermMsgs = "Invalid entity-type: " + typeName + " at line #" + lineIndex;
BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(BulkImportResponse.ImportStatus.FAILED, failedTermMsgs, lineIndex);
bulkImportResponse.getFailedImportInfoList().add(importInfo);
}
return ret;
}
private List assignMultipleValues(String bmAttributeValues, String elementTypeName, List failedTermMsgList, int lineIndex) {
String[] arr = bmAttributeValues.split(FileUtils.ESCAPE_CHARACTER + FileUtils.PIPE_CHARACTER);
try {
switch (elementTypeName) {
case AtlasBaseTypeDef.ATLAS_TYPE_FLOAT:
return AtlasGraphUtilsV2.floatParser(arr, failedTermMsgList, lineIndex);
case AtlasBaseTypeDef.ATLAS_TYPE_INT:
return AtlasGraphUtilsV2.intParser(arr, failedTermMsgList, lineIndex);
case AtlasBaseTypeDef.ATLAS_TYPE_LONG:
return AtlasGraphUtilsV2.longParser(arr, failedTermMsgList, lineIndex);
case AtlasBaseTypeDef.ATLAS_TYPE_SHORT:
return AtlasGraphUtilsV2.shortParser(arr, failedTermMsgList, lineIndex);
case AtlasBaseTypeDef.ATLAS_TYPE_DOUBLE:
return AtlasGraphUtilsV2.doubleParser(arr, failedTermMsgList, lineIndex);
case AtlasBaseTypeDef.ATLAS_TYPE_DATE:
return AtlasGraphUtilsV2.dateParser(arr, failedTermMsgList, lineIndex);
case AtlasBaseTypeDef.ATLAS_TYPE_BOOLEAN:
return AtlasGraphUtilsV2.booleanParser(arr, failedTermMsgList, lineIndex);
default:
return Arrays.asList(arr);
}
} catch (Exception e) {
LOG.error("On line index " + lineIndex + "the provided BusinessMetadata AttributeValue " + bmAttributeValues + " are not of type - " + elementTypeName);
failedTermMsgList.add("On line index " + lineIndex + "the provided BusinessMetadata AttributeValue " + bmAttributeValues + " are not of type - " + elementTypeName);
}
return null;
}
private boolean missingFieldsCheck(String[] record, BulkImportResponse bulkImportResponse, int lineIndex){
boolean missingFieldsCheck = (record.length < FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX) ||
StringUtils.isBlank(record[FileUtils.TYPENAME_COLUMN_INDEX]) ||
StringUtils.isBlank(record[FileUtils.UNIQUE_ATTR_VALUE_COLUMN_INDEX]) ||
StringUtils.isBlank(record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX]) ||
StringUtils.isBlank(record[FileUtils.BM_ATTR_VALUE_COLUMN_INDEX]);
if(missingFieldsCheck){
LOG.error("Missing fields: " + Arrays.toString(record) + " at line #" + lineIndex);
String failedTermMsgs = "Missing fields: " + Arrays.toString(record) + " at line #" + lineIndex;
BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(BulkImportResponse.ImportStatus.FAILED, failedTermMsgs, lineIndex);
bulkImportResponse.getFailedImportInfoList().add(importInfo);
}
return missingFieldsCheck;
}
private boolean validateBMAttributeName(String uniqueAttrValue, String bmAttribute, BulkImportResponse bulkImportResponse, int lineIndex) {
boolean ret = false;
String[] businessAttributeName = bmAttribute.split(FileUtils.ESCAPE_CHARACTER + ".");
if(businessAttributeName.length < 2){
LOG.error("Provided businessAttributeName is not in proper format : " + bmAttribute + " at line #" + lineIndex);
String failedTermMsgs = "Provided businessAttributeName is not in proper format : " + bmAttribute + " at line #" + lineIndex;
BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(uniqueAttrValue, bmAttribute, BulkImportResponse.ImportStatus.FAILED, failedTermMsgs, lineIndex);
bulkImportResponse.getFailedImportInfoList().add(importInfo);
ret = true;
}
return ret;
}
private boolean validateBMAttribute(String uniqueAttrValue,String[] businessAttributeName, AtlasEntityType entityType, BulkImportResponse bulkImportResponse, int lineIndex) {
boolean ret = false;
String bMName = businessAttributeName[0];
String bMAttributeName = businessAttributeName[1];
if(entityType.getBusinessAttributes(bMName) == null ||
entityType.getBusinessAttributes(bMName).get(bMAttributeName) == null){
ret = true;
LOG.error("Provided businessAttributeName is not valid : " + bMName+"."+bMAttributeName + " at line #" + lineIndex);
String failedTermMsgs = "Provided businessAttributeName is not valid : " + bMName+"."+bMAttributeName + " at line #" + lineIndex;
BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(uniqueAttrValue, bMName+"."+bMAttributeName, BulkImportResponse.ImportStatus.FAILED, failedTermMsgs, lineIndex);
bulkImportResponse.getFailedImportInfoList().add(importInfo);
}
return ret;
}
private boolean failedMsgCheck(String uniqueAttrValue, String bmAttribute, List<String> failedTermMsgList,BulkImportResponse bulkImportResponse,int lineIndex) {
boolean ret = false;
if(!failedTermMsgList.isEmpty()){
ret = true;
String failedTermMsg = StringUtils.join(failedTermMsgList, "\n");
BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(uniqueAttrValue, bmAttribute, BulkImportResponse.ImportStatus.FAILED, failedTermMsg, lineIndex);
bulkImportResponse.getFailedImportInfoList().add(importInfo);
}
return ret;
}
} }
...@@ -29,6 +29,7 @@ import org.apache.atlas.model.TypeCategory; ...@@ -29,6 +29,7 @@ import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.Status; import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef; import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasEnumDef;
import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasEdge;
...@@ -39,9 +40,11 @@ import org.apache.atlas.repository.graphdb.AtlasIndexQuery; ...@@ -39,9 +40,11 @@ import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result; import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result;
import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasEnumType;
import org.apache.atlas.type.AtlasStructType; import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute; import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
import org.apache.atlas.util.FileUtils;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder; import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.commons.collections.MapUtils; import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
...@@ -49,6 +52,7 @@ import org.apache.commons.lang.StringUtils; ...@@ -49,6 +52,7 @@ import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
...@@ -62,11 +66,10 @@ import static org.apache.atlas.repository.Constants.CLASSIFICATION_NAMES_KEY; ...@@ -62,11 +66,10 @@ import static org.apache.atlas.repository.Constants.CLASSIFICATION_NAMES_KEY;
import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_DEFAULT; import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_DEFAULT;
import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_PROPERTY; import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_PROPERTY;
import static org.apache.atlas.repository.Constants.LABELS_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.PROPAGATED_CLASSIFICATION_NAMES_KEY; import static org.apache.atlas.repository.Constants.PROPAGATED_CLASSIFICATION_NAMES_KEY;
import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TYPE_NAME_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TYPENAME_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.TYPENAME_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TYPE_NAME_PROPERTY_KEY;
import static org.apache.atlas.repository.graph.AtlasGraphProvider.getGraphInstance; import static org.apache.atlas.repository.graph.AtlasGraphProvider.getGraphInstance;
import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.ASC; import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.ASC;
import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.DESC; import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.DESC;
...@@ -674,4 +677,128 @@ public class AtlasGraphUtilsV2 { ...@@ -674,4 +677,128 @@ public class AtlasGraphUtilsV2 {
} }
return classificationNames; return classificationNames;
} }
public static List<Date> dateParser(String[] arr, List failedTermMsgList, int lineIndex) {
List<Date> ret = new ArrayList();
for (String s : arr) {
try{
SimpleDateFormat formatter = new SimpleDateFormat("MM/dd/yyyy");
Date date = formatter.parse(s);
ret.add(date);
}
catch(Exception e){
LOG.error("Provided value "+s+" is not of Date type at line #"+lineIndex);
failedTermMsgList.add("Provided value "+s+" is not of Date type at line #"+lineIndex);
}
}
return ret;
}
public static List<Boolean> booleanParser(String[] arr, List failedTermMsgList, int lineIndex) {
List<Boolean> ret = new ArrayList();
for (String s : arr) {
try{
ret.add(Boolean.parseBoolean(s));
}
catch(Exception e){
LOG.error("Provided value "+s+" is not of Boolean type at line #"+lineIndex);
failedTermMsgList.add("Provided value "+s+" is not of Boolean type at line #"+lineIndex);
}
}
return ret;
}
public static List<Double> doubleParser(String[] arr, List failedTermMsgList, int lineIndex) {
List<Double> ret = new ArrayList();
for (String s : arr) {
try{
ret.add(Double.parseDouble(s));
}
catch(Exception e){
LOG.error("Provided value "+s+" is not of Double type at line #"+lineIndex);
failedTermMsgList.add("Provided value "+s+" is not of Double type at line #"+lineIndex);
}
}
return ret;
}
public static List<Short> shortParser(String[] arr, List failedTermMsgList, int lineIndex) {
List<Short> ret = new ArrayList();
for (String s : arr) {
try{
ret.add(Short.parseShort(s));
}
catch(Exception e){
LOG.error("Provided value "+s+" is not of Short type at line #"+lineIndex);
failedTermMsgList.add("Provided value "+s+" is not of Short type at line #"+lineIndex);
}
}
return ret;
}
public static List<Long> longParser(String[] arr, List failedTermMsgList, int lineIndex) {
List<Long> ret = new ArrayList();
for (String s : arr) {
try{
ret.add(Long.parseLong(s));
}
catch(Exception e){
LOG.error("Provided value "+s+" is not of Long type at line #"+lineIndex);
failedTermMsgList.add("Provided value "+s+" is not of Long type at line #"+lineIndex);
}
}
return ret;
}
public static List<Integer> intParser(String[] arr, List failedTermMsgList, int lineIndex) {
List<Integer> ret = new ArrayList();
for (String s : arr) {
try{
ret.add(Integer.parseInt(s));
}
catch(Exception e){
LOG.error("Provided value "+s+" is not of Integer type at line #"+lineIndex);
failedTermMsgList.add("Provided value "+s+" is Integer of Long type at line #"+lineIndex);
}
}
return ret;
}
public static List<Float> floatParser(String[] arr, List failedTermMsgList, int lineIndex) {
List<Float> ret = new ArrayList();
for (String s : arr) {
try{
ret.add(Float.parseFloat(s));
}
catch(Exception e){
LOG.error("Provided value "+s+" is Float of Long type at line #"+lineIndex);
failedTermMsgList.add("Provided value "+s+" is Float of Long type at line #"+lineIndex);
}
}
return ret;
}
public static List assignEnumValues(String bmAttributeValues, AtlasEnumType enumType, List<String> failedTermMsgList, int lineIndex) {
List<String> ret = new ArrayList<>();
String[] arr = bmAttributeValues.split(FileUtils.ESCAPE_CHARACTER + FileUtils.PIPE_CHARACTER);
AtlasEnumDef.AtlasEnumElementDef atlasEnumDef;
for(String s : arr){
atlasEnumDef = enumType.getEnumElementDef(s);
if(atlasEnumDef==null){
LOG.error("Provided value "+s+" is Enumeration of Long type at line #"+lineIndex);
failedTermMsgList.add("Provided value "+s+" is Enumeration of Long type at line #"+lineIndex);
}else{
ret.add(s);
}
}
return ret;
}
} }
\ No newline at end of file
...@@ -22,6 +22,7 @@ import org.apache.atlas.AtlasErrorCode; ...@@ -22,6 +22,7 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FilenameUtils; import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.poi.hssf.usermodel.HSSFWorkbook; import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Cell; import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Row; import org.apache.poi.ss.usermodel.Row;
...@@ -36,13 +37,20 @@ import java.util.ArrayList; ...@@ -36,13 +37,20 @@ import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import static org.apache.atlas.repository.Constants.GlossaryImportSupportedFileExtensions.*; import static org.apache.atlas.repository.Constants.SupportedFileExtensions.*;
public class FileUtils { public class FileUtils {
public static final String PIPE_CHARACTER = "|"; public static final String PIPE_CHARACTER = "|";
public static final String COLON_CHARACTER = ":"; public static final String COLON_CHARACTER = ":";
public static final String ESCAPE_CHARACTER = "\\"; public static final String ESCAPE_CHARACTER = "\\";
//BusinessMetadata attributes association uploads
public static final int TYPENAME_COLUMN_INDEX = 0;
public static final int UNIQUE_ATTR_VALUE_COLUMN_INDEX = 1;
public static final int BM_ATTR_NAME_COLUMN_INDEX = 2;
public static final int BM_ATTR_VALUE_COLUMN_INDEX = 3;
public static final int UNIQUE_ATTR_NAME_COLUMN_INDEX = 4;
public static List<String[]> readFileData(String fileName, InputStream inputStream) throws IOException, AtlasBaseException { public static List<String[]> readFileData(String fileName, InputStream inputStream) throws IOException, AtlasBaseException {
List<String[]> ret; List<String[]> ret;
String extension = FilenameUtils.getExtension(fileName); String extension = FilenameUtils.getExtension(fileName);
...@@ -123,4 +131,16 @@ public class FileUtils { ...@@ -123,4 +131,16 @@ public class FileUtils {
return true; return true;
} }
public static String getBusinessMetadataHeaders() {
List<String> bMHeader = new ArrayList<>();
bMHeader.add("EntityType");
bMHeader.add("EntityUniqueAttributeValue");
bMHeader.add("BusinessAttributeName");
bMHeader.add("BusinessAttributeValue");
bMHeader.add("EntityUniqueAttributeName[optional]");
return StringUtils.join(bMHeader, ",");
}
} }
\ No newline at end of file
...@@ -22,6 +22,7 @@ import org.apache.atlas.AtlasErrorCode; ...@@ -22,6 +22,7 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext; import org.apache.atlas.RequestContext;
import org.apache.atlas.TestModules; import org.apache.atlas.TestModules;
import org.apache.atlas.TestUtilsV2; import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.bulkimport.BulkImportResponse;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
...@@ -37,6 +38,7 @@ import org.apache.atlas.model.typedef.AtlasClassificationDef; ...@@ -37,6 +38,7 @@ import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasEntityDef; import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.type.AtlasTypeUtil; import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.util.FileUtils;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -47,6 +49,9 @@ import org.testng.annotations.Guice; ...@@ -47,6 +49,9 @@ import org.testng.annotations.Guice;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import javax.inject.Inject; import javax.inject.Inject;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
...@@ -55,20 +60,27 @@ import java.util.List; ...@@ -55,20 +60,27 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import static org.apache.atlas.AtlasErrorCode.*; import static org.apache.atlas.AtlasErrorCode.INVALID_CUSTOM_ATTRIBUTE_KEY_CHARACTERS;
import static org.apache.atlas.AtlasErrorCode.INVALID_CUSTOM_ATTRIBUTE_KEY_LENGTH;
import static org.apache.atlas.AtlasErrorCode.INVALID_CUSTOM_ATTRIBUTE_VALUE;
import static org.apache.atlas.AtlasErrorCode.INVALID_LABEL_CHARACTERS;
import static org.apache.atlas.AtlasErrorCode.INVALID_LABEL_LENGTH;
import static org.apache.atlas.TestUtilsV2.COLUMNS_ATTR_NAME; import static org.apache.atlas.TestUtilsV2.COLUMNS_ATTR_NAME;
import static org.apache.atlas.TestUtilsV2.COLUMN_TYPE; import static org.apache.atlas.TestUtilsV2.COLUMN_TYPE;
import static org.apache.atlas.TestUtilsV2.NAME; import static org.apache.atlas.TestUtilsV2.NAME;
import static org.apache.atlas.TestUtilsV2.TABLE_TYPE; import static org.apache.atlas.TestUtilsV2.TABLE_TYPE;
import static org.apache.atlas.TestUtilsV2.getFile;
import static org.apache.commons.lang.RandomStringUtils.randomAlphanumeric; import static org.apache.commons.lang.RandomStringUtils.randomAlphanumeric;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail; import static org.testng.Assert.fail;
@Guice(modules = TestModules.TestOnlyModule.class) @Guice(modules = TestModules.TestOnlyModule.class)
public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2Test.class); private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2Test.class);
public static final String CSV_FILES = "/csvFiles/";
private AtlasEntitiesWithExtInfo deptEntity; private AtlasEntitiesWithExtInfo deptEntity;
private AtlasEntityWithExtInfo dbEntity; private AtlasEntityWithExtInfo dbEntity;
...@@ -208,15 +220,15 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -208,15 +220,15 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
AtlasEntitiesWithExtInfo entitiesInfo = new AtlasEntitiesWithExtInfo(tableEntity); AtlasEntitiesWithExtInfo entitiesInfo = new AtlasEntitiesWithExtInfo(tableEntity);
AtlasEntity col1 = TestUtilsV2.createColumnEntity(tableEntity); AtlasEntity col1 = TestUtilsV2.createColumnEntity(tableEntity);
col1.setAttribute(TestUtilsV2.NAME, "col1"); col1.setAttribute(NAME, "col1");
AtlasEntity col2 = TestUtilsV2.createColumnEntity(tableEntity); AtlasEntity col2 = TestUtilsV2.createColumnEntity(tableEntity);
col2.setAttribute(TestUtilsV2.NAME, "col2"); col2.setAttribute(NAME, "col2");
columns.add(AtlasTypeUtil.getAtlasObjectId(col1)); columns.add(AtlasTypeUtil.getAtlasObjectId(col1));
columns.add(AtlasTypeUtil.getAtlasObjectId(col2)); columns.add(AtlasTypeUtil.getAtlasObjectId(col2));
tableEntity.setAttribute(TestUtilsV2.COLUMNS_ATTR_NAME, columns); tableEntity.setAttribute(COLUMNS_ATTR_NAME, columns);
entitiesInfo.addReferredEntity(dbEntity.getEntity()); entitiesInfo.addReferredEntity(dbEntity.getEntity());
entitiesInfo.addReferredEntity(col1); entitiesInfo.addReferredEntity(col1);
...@@ -230,14 +242,14 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -230,14 +242,14 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
//Complete update. Add array elements - col3,col4 //Complete update. Add array elements - col3,col4
AtlasEntity col3 = TestUtilsV2.createColumnEntity(tableEntity); AtlasEntity col3 = TestUtilsV2.createColumnEntity(tableEntity);
col3.setAttribute(TestUtilsV2.NAME, "col3"); col3.setAttribute(NAME, "col3");
AtlasEntity col4 = TestUtilsV2.createColumnEntity(tableEntity); AtlasEntity col4 = TestUtilsV2.createColumnEntity(tableEntity);
col4.setAttribute(TestUtilsV2.NAME, "col4"); col4.setAttribute(NAME, "col4");
columns.add(AtlasTypeUtil.getAtlasObjectId(col3)); columns.add(AtlasTypeUtil.getAtlasObjectId(col3));
columns.add(AtlasTypeUtil.getAtlasObjectId(col4)); columns.add(AtlasTypeUtil.getAtlasObjectId(col4));
tableEntity.setAttribute(TestUtilsV2.COLUMNS_ATTR_NAME, columns); tableEntity.setAttribute(COLUMNS_ATTR_NAME, columns);
entitiesInfo.addReferredEntity(col3); entitiesInfo.addReferredEntity(col3);
entitiesInfo.addReferredEntity(col4); entitiesInfo.addReferredEntity(col4);
...@@ -252,7 +264,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -252,7 +264,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
columns.clear(); columns.clear();
columns.add(AtlasTypeUtil.getAtlasObjectId(col4)); columns.add(AtlasTypeUtil.getAtlasObjectId(col4));
columns.add(AtlasTypeUtil.getAtlasObjectId(col3)); columns.add(AtlasTypeUtil.getAtlasObjectId(col3));
tableEntity.setAttribute(TestUtilsV2.COLUMNS_ATTR_NAME, columns); tableEntity.setAttribute(COLUMNS_ATTR_NAME, columns);
init(); init();
response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false); response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
...@@ -280,7 +292,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -280,7 +292,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
AtlasEntity tableEntity = new AtlasEntity(tblEntity.getEntity()); AtlasEntity tableEntity = new AtlasEntity(tblEntity.getEntity());
AtlasEntitiesWithExtInfo entitiesInfo = new AtlasEntitiesWithExtInfo(tableEntity); AtlasEntitiesWithExtInfo entitiesInfo = new AtlasEntitiesWithExtInfo(tableEntity);
Map<String, AtlasStruct> partsMap = new HashMap<>(); Map<String, AtlasStruct> partsMap = new HashMap<>();
partsMap.put("part0", new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, TestUtilsV2.NAME, "test")); partsMap.put("part0", new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, NAME, "test"));
tableEntity.setAttribute("partitionsMap", partsMap); tableEntity.setAttribute("partitionsMap", partsMap);
...@@ -294,7 +306,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -294,7 +306,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
Assert.assertTrue(partsMap.get("part0").equals(((Map<String, AtlasStruct>) updatedTableDef1.getAttribute("partitionsMap")).get("part0"))); Assert.assertTrue(partsMap.get("part0").equals(((Map<String, AtlasStruct>) updatedTableDef1.getAttribute("partitionsMap")).get("part0")));
//update map - add a map key //update map - add a map key
partsMap.put("part1", new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, TestUtilsV2.NAME, "test1")); partsMap.put("part1", new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, NAME, "test1"));
tableEntity.setAttribute("partitionsMap", partsMap); tableEntity.setAttribute("partitionsMap", partsMap);
init(); init();
...@@ -309,7 +321,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -309,7 +321,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
//update map - remove a key and add another key //update map - remove a key and add another key
partsMap.remove("part0"); partsMap.remove("part0");
partsMap.put("part2", new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, TestUtilsV2.NAME, "test2")); partsMap.put("part2", new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, NAME, "test2"));
tableEntity.setAttribute("partitionsMap", partsMap); tableEntity.setAttribute("partitionsMap", partsMap);
init(); init();
...@@ -325,7 +337,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -325,7 +337,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
//update struct value for existing map key //update struct value for existing map key
AtlasStruct partition2 = partsMap.get("part2"); AtlasStruct partition2 = partsMap.get("part2");
partition2.setAttribute(TestUtilsV2.NAME, "test2Updated"); partition2.setAttribute(NAME, "test2Updated");
init(); init();
response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false); response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
...@@ -340,7 +352,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -340,7 +352,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
//Test map pointing to a class //Test map pointing to a class
AtlasEntity col0 = new AtlasEntity(TestUtilsV2.COLUMN_TYPE, TestUtilsV2.NAME, "test1"); AtlasEntity col0 = new AtlasEntity(COLUMN_TYPE, NAME, "test1");
col0.setAttribute("type", "string"); col0.setAttribute("type", "string");
col0.setAttribute("table", AtlasTypeUtil.getAtlasObjectId(tableEntity)); col0.setAttribute("table", AtlasTypeUtil.getAtlasObjectId(tableEntity));
...@@ -351,7 +363,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -351,7 +363,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
init(); init();
entityStore.createOrUpdate(new AtlasEntityStream(col0WithExtendedInfo), false); entityStore.createOrUpdate(new AtlasEntityStream(col0WithExtendedInfo), false);
AtlasEntity col1 = new AtlasEntity(TestUtilsV2.COLUMN_TYPE, TestUtilsV2.NAME, "test2"); AtlasEntity col1 = new AtlasEntity(COLUMN_TYPE, NAME, "test2");
col1.setAttribute("type", "string"); col1.setAttribute("type", "string");
col1.setAttribute("table", AtlasTypeUtil.getAtlasObjectId(tableEntity)); col1.setAttribute("table", AtlasTypeUtil.getAtlasObjectId(tableEntity));
...@@ -440,8 +452,8 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -440,8 +452,8 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
AtlasEntitiesWithExtInfo entitiesInfo = new AtlasEntitiesWithExtInfo(tableEntity); AtlasEntitiesWithExtInfo entitiesInfo = new AtlasEntitiesWithExtInfo(tableEntity);
List<AtlasStruct> partitions = new ArrayList<AtlasStruct>(){{ List<AtlasStruct> partitions = new ArrayList<AtlasStruct>(){{
add(new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, TestUtilsV2.NAME, "part1")); add(new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, NAME, "part1"));
add(new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, TestUtilsV2.NAME, "part2")); add(new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, NAME, "part2"));
}}; }};
tableEntity.setAttribute("partitions", partitions); tableEntity.setAttribute("partitions", partitions);
...@@ -451,7 +463,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -451,7 +463,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
validateEntity(entitiesInfo, getEntityFromStore(updatedTable)); validateEntity(entitiesInfo, getEntityFromStore(updatedTable));
//add a new element to array of struct //add a new element to array of struct
partitions.add(new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, TestUtilsV2.NAME, "part3")); partitions.add(new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, NAME, "part3"));
tableEntity.setAttribute("partitions", partitions); tableEntity.setAttribute("partitions", partitions);
init(); init();
response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false); response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
...@@ -467,7 +479,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -467,7 +479,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
validateEntity(entitiesInfo, getEntityFromStore(updatedTable)); validateEntity(entitiesInfo, getEntityFromStore(updatedTable));
//Update struct value within array of struct //Update struct value within array of struct
partitions.get(0).setAttribute(TestUtilsV2.NAME, "part4"); partitions.get(0).setAttribute(NAME, "part4");
tableEntity.setAttribute("partitions", partitions); tableEntity.setAttribute("partitions", partitions);
init(); init();
response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false); response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
...@@ -476,7 +488,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -476,7 +488,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
//add a repeated element to array of struct //add a repeated element to array of struct
partitions.add(new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, TestUtilsV2.NAME, "part4")); partitions.add(new AtlasStruct(TestUtilsV2.PARTITION_STRUCT_TYPE, NAME, "part4"));
tableEntity.setAttribute("partitions", partitions); tableEntity.setAttribute("partitions", partitions);
init(); init();
response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false); response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
...@@ -499,11 +511,11 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -499,11 +511,11 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
AtlasEntity tableEntity = new AtlasEntity(tblEntity.getEntity()); AtlasEntity tableEntity = new AtlasEntity(tblEntity.getEntity());
AtlasEntitiesWithExtInfo entitiesInfo = new AtlasEntitiesWithExtInfo(tableEntity); AtlasEntitiesWithExtInfo entitiesInfo = new AtlasEntitiesWithExtInfo(tableEntity);
AtlasStruct serdeInstance = new AtlasStruct(TestUtilsV2.SERDE_TYPE, TestUtilsV2.NAME, "serde1Name"); AtlasStruct serdeInstance = new AtlasStruct(TestUtilsV2.SERDE_TYPE, NAME, "serde1Name");
serdeInstance.setAttribute("serde", "test"); serdeInstance.setAttribute("serde", "test");
serdeInstance.setAttribute("description", "testDesc"); serdeInstance.setAttribute("description", "testDesc");
tableEntity.setAttribute("serde1", serdeInstance); tableEntity.setAttribute("serde1", serdeInstance);
tableEntity.setAttribute("database", new AtlasObjectId(databaseEntity.getTypeName(), TestUtilsV2.NAME, databaseEntity.getAttribute(TestUtilsV2.NAME))); tableEntity.setAttribute("database", new AtlasObjectId(databaseEntity.getTypeName(), NAME, databaseEntity.getAttribute(NAME)));
init(); init();
EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false); EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
...@@ -547,7 +559,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -547,7 +559,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
response = entityStore.createOrUpdate(new InMemoryMapEntityStream(tableCloneMap), false); response = entityStore.createOrUpdate(new InMemoryMapEntityStream(tableCloneMap), false);
final AtlasEntityHeader tableDefinition = response.getFirstUpdatedEntityByTypeName(TABLE_TYPE); final AtlasEntityHeader tableDefinition = response.getFirstUpdatedEntityByTypeName(TABLE_TYPE);
AtlasEntity updatedTableDefinition = getEntityFromStore(tableDefinition); AtlasEntity updatedTableDefinition = getEntityFromStore(tableDefinition);
Assert.assertNotNull(updatedTableDefinition.getAttribute("database")); assertNotNull(updatedTableDefinition.getAttribute("database"));
Assert.assertEquals(((AtlasObjectId) updatedTableDefinition.getAttribute("database")).getGuid(), dbCreated.getGuid()); Assert.assertEquals(((AtlasObjectId) updatedTableDefinition.getAttribute("database")).getGuid(), dbCreated.getGuid());
} }
...@@ -563,7 +575,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -563,7 +575,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
//The optional boolean attribute should have a non-null value //The optional boolean attribute should have a non-null value
final String isReplicatedAttr = "isReplicated"; final String isReplicatedAttr = "isReplicated";
final String paramsAttr = "parameters"; final String paramsAttr = "parameters";
Assert.assertNotNull(firstEntityCreated.getAttribute(isReplicatedAttr)); assertNotNull(firstEntityCreated.getAttribute(isReplicatedAttr));
Assert.assertEquals(firstEntityCreated.getAttribute(isReplicatedAttr), Boolean.FALSE); Assert.assertEquals(firstEntityCreated.getAttribute(isReplicatedAttr), Boolean.FALSE);
Assert.assertNull(firstEntityCreated.getAttribute(paramsAttr)); Assert.assertNull(firstEntityCreated.getAttribute(paramsAttr));
...@@ -578,8 +590,8 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -578,8 +590,8 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
response = entityStore.createOrUpdate(new AtlasEntityStream(dbEntity), false); response = entityStore.createOrUpdate(new AtlasEntityStream(dbEntity), false);
AtlasEntity firstEntityUpdated = getEntityFromStore(response.getFirstUpdatedEntityByTypeName(TestUtilsV2.DATABASE_TYPE)); AtlasEntity firstEntityUpdated = getEntityFromStore(response.getFirstUpdatedEntityByTypeName(TestUtilsV2.DATABASE_TYPE));
Assert.assertNotNull(firstEntityUpdated); assertNotNull(firstEntityUpdated);
Assert.assertNotNull(firstEntityUpdated.getAttribute(isReplicatedAttr)); assertNotNull(firstEntityUpdated.getAttribute(isReplicatedAttr));
Assert.assertEquals(firstEntityUpdated.getAttribute(isReplicatedAttr), Boolean.TRUE); Assert.assertEquals(firstEntityUpdated.getAttribute(isReplicatedAttr), Boolean.TRUE);
Assert.assertEquals(firstEntityUpdated.getAttribute(paramsAttr), params); Assert.assertEquals(firstEntityUpdated.getAttribute(paramsAttr), params);
...@@ -645,7 +657,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -645,7 +657,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
//Update required attribute //Update required attribute
Map<String, AtlasEntity> tableCloneMap = new HashMap<>(); Map<String, AtlasEntity> tableCloneMap = new HashMap<>();
AtlasEntity tableEntity = new AtlasEntity(TABLE_TYPE); AtlasEntity tableEntity = new AtlasEntity(TABLE_TYPE);
tableEntity.setAttribute(TestUtilsV2.NAME, "table_" + TestUtilsV2.randomString()); tableEntity.setAttribute(NAME, "table_" + TestUtilsV2.randomString());
tableCloneMap.put(tableEntity.getGuid(), tableEntity); tableCloneMap.put(tableEntity.getGuid(), tableEntity);
entityStore.createOrUpdate(new InMemoryMapEntityStream(tableCloneMap), false); entityStore.createOrUpdate(new InMemoryMapEntityStream(tableCloneMap), false);
...@@ -713,14 +725,14 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -713,14 +725,14 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
// create new column entity // create new column entity
AtlasEntity col1 = TestUtilsV2.createColumnEntity(tblEntity); AtlasEntity col1 = TestUtilsV2.createColumnEntity(tblEntity);
AtlasEntity col2 = TestUtilsV2.createColumnEntity(tblEntity); AtlasEntity col2 = TestUtilsV2.createColumnEntity(tblEntity);
col1.setAttribute(TestUtilsV2.NAME, "col1"); col1.setAttribute(NAME, "col1");
col2.setAttribute(TestUtilsV2.NAME, "col2"); col2.setAttribute(NAME, "col2");
List<AtlasObjectId> columns = new ArrayList<>(); List<AtlasObjectId> columns = new ArrayList<>();
columns.add(AtlasTypeUtil.getAtlasObjectId(col1)); columns.add(AtlasTypeUtil.getAtlasObjectId(col1));
columns.add(AtlasTypeUtil.getAtlasObjectId(col2)); columns.add(AtlasTypeUtil.getAtlasObjectId(col2));
tblEntity.setAttribute(TestUtilsV2.COLUMNS_ATTR_NAME, columns); tblEntity.setAttribute(COLUMNS_ATTR_NAME, columns);
AtlasEntitiesWithExtInfo tableEntityInfo = new AtlasEntitiesWithExtInfo(tblEntity); AtlasEntitiesWithExtInfo tableEntityInfo = new AtlasEntitiesWithExtInfo(tblEntity);
tableEntityInfo.addReferredEntity(col1.getGuid(), col1); tableEntityInfo.addReferredEntity(col1.getGuid(), col1);
...@@ -731,16 +743,16 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -731,16 +743,16 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
AtlasEntityHeader tblHeader = response.getFirstEntityCreated(); AtlasEntityHeader tblHeader = response.getFirstEntityCreated();
AtlasEntity createdTblEntity = getEntityFromStore(tblHeader); AtlasEntity createdTblEntity = getEntityFromStore(tblHeader);
columns = (List<AtlasObjectId>) createdTblEntity.getAttribute(TestUtilsV2.COLUMNS_ATTR_NAME); columns = (List<AtlasObjectId>) createdTblEntity.getAttribute(COLUMNS_ATTR_NAME);
assertEquals(columns.size(), 2); assertEquals(columns.size(), 2);
// update - add 2 more columns to table // update - add 2 more columns to table
AtlasEntity col3 = TestUtilsV2.createColumnEntity(createdTblEntity); AtlasEntity col3 = TestUtilsV2.createColumnEntity(createdTblEntity);
col3.setAttribute(TestUtilsV2.NAME, "col3"); col3.setAttribute(NAME, "col3");
col3.setAttribute("description", "description col3"); col3.setAttribute("description", "description col3");
AtlasEntity col4 = TestUtilsV2.createColumnEntity(createdTblEntity); AtlasEntity col4 = TestUtilsV2.createColumnEntity(createdTblEntity);
col4.setAttribute(TestUtilsV2.NAME, "col4"); col4.setAttribute(NAME, "col4");
col4.setAttribute("description", "description col4"); col4.setAttribute("description", "description col4");
columns.clear(); columns.clear();
...@@ -749,7 +761,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -749,7 +761,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
tblEntity = new AtlasEntity(TABLE_TYPE); tblEntity = new AtlasEntity(TABLE_TYPE);
tblEntity.setGuid(createdTblEntity.getGuid()); tblEntity.setGuid(createdTblEntity.getGuid());
tblEntity.setAttribute(TestUtilsV2.COLUMNS_ATTR_NAME, columns); tblEntity.setAttribute(COLUMNS_ATTR_NAME, columns);
tableEntityInfo = new AtlasEntitiesWithExtInfo(tblEntity); tableEntityInfo = new AtlasEntitiesWithExtInfo(tblEntity);
tableEntityInfo.addReferredEntity(col3.getGuid(), col3); tableEntityInfo.addReferredEntity(col3.getGuid(), col3);
...@@ -760,7 +772,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -760,7 +772,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
tblHeader = response.getFirstEntityPartialUpdated(); tblHeader = response.getFirstEntityPartialUpdated();
AtlasEntity updatedTblEntity = getEntityFromStore(tblHeader); AtlasEntity updatedTblEntity = getEntityFromStore(tblHeader);
columns = (List<AtlasObjectId>) updatedTblEntity.getAttribute(TestUtilsV2.COLUMNS_ATTR_NAME); columns = (List<AtlasObjectId>) updatedTblEntity.getAttribute(COLUMNS_ATTR_NAME);
// deleted columns are included in the attribute; hence use >= // deleted columns are included in the attribute; hence use >=
assertTrue(columns.size() >= 2); assertTrue(columns.size() >= 2);
} }
...@@ -1215,14 +1227,14 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -1215,14 +1227,14 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
entityStore.removeLabels(tblEntityGuid, labels); entityStore.removeLabels(tblEntityGuid, labels);
AtlasEntity tblEntity = getEntityFromStore(tblEntityGuid); AtlasEntity tblEntity = getEntityFromStore(tblEntityGuid);
Assert.assertNotNull(tblEntity.getLabels()); assertNotNull(tblEntity.getLabels());
Assert.assertEquals(tblEntity.getLabels().size(), 1); Assert.assertEquals(tblEntity.getLabels().size(), 1);
labels.clear(); labels.clear();
labels.add("label_4_add"); labels.add("label_4_add");
entityStore.removeLabels(tblEntityGuid, labels); entityStore.removeLabels(tblEntityGuid, labels);
tblEntity = getEntityFromStore(tblEntityGuid); tblEntity = getEntityFromStore(tblEntityGuid);
Assert.assertNotNull(tblEntity.getLabels()); assertNotNull(tblEntity.getLabels());
Assert.assertEquals(tblEntity.getLabels().size(), 1); Assert.assertEquals(tblEntity.getLabels().size(), 1);
labels.clear(); labels.clear();
...@@ -1321,4 +1333,51 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase { ...@@ -1321,4 +1333,51 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
Assert.fail(); Assert.fail();
} }
@Test
public void testGetTemplate() {
try {
String bMHeaderListAsString = FileUtils.getBusinessMetadataHeaders();
assertNotNull(bMHeaderListAsString);
assertEquals(bMHeaderListAsString, "EntityType,EntityUniqueAttributeValue,BusinessAttributeName,BusinessAttributeValue,EntityUniqueAttributeName[optional]");
} catch (Exception e) {
fail("The Template for BussinessMetadata Attributes should've been a success : ", e);
}
}
@Test
public void testEmptyFileException() {
InputStream inputStream = getFile(CSV_FILES, "empty.csv");
try {
entityStore.bulkCreateOrUpdateBusinessAttributes(inputStream, "empty.csv");
fail("Error occurred : Failed to recognize the empty file.");
} catch (AtlasBaseException e) {
assertEquals(e.getMessage(), "No data found in the uploaded file");
} finally {
if (inputStream != null) {
try {
inputStream.close();
} catch (Exception e) {
}
}
}
}
@Test(dependsOnMethods = "testCreate")
public void testBulkAddOrUpdateBusinessAttributes() {
try {
AtlasEntity hive_db_1 = getEntityFromStore(dbEntityGuid);
String dbName = (String) hive_db_1.getAttribute("name");
String data = TestUtilsV2.getFileData(CSV_FILES, "template_2.csv");
data = data.replaceAll("hive_db_1", dbName);
InputStream inputStream1 = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
BulkImportResponse bulkImportResponse = entityStore.bulkCreateOrUpdateBusinessAttributes(inputStream1, "template_2.csv");
assertEquals(CollectionUtils.isEmpty(bulkImportResponse.getSuccessImportInfoList()), false);
assertEquals(CollectionUtils.isEmpty(bulkImportResponse.getFailedImportInfoList()), true);
} catch (Exception e) {
fail("The BusinessMetadata Attribute should have been assigned " +e);
}
}
} }
\ No newline at end of file
TypeName,UniqueAttributeValue,BusinessAttributeName,BusinessAttributeValue,UniqueAttributeName[optional]
incorrectEntityType,hive_db_1,bmWithAllTypes.attr8,"Awesome Attribute 1",name
TypeName,UniqueAttributeValue,BusinessAttributeName,BusinessAttributeValue,UniqueAttributeName[optional]
hive_database,hive_db_1,bmWithAllTypes.attr8,"Awesome Attribute 1",name
...@@ -17,8 +17,11 @@ ...@@ -17,8 +17,11 @@
*/ */
package org.apache.atlas.web.rest; package org.apache.atlas.web.rest;
import com.sun.jersey.core.header.FormDataContentDisposition;
import com.sun.jersey.multipart.FormDataParam;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.bulkimport.BulkImportResponse;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.audit.EntityAuditEventV2; import org.apache.atlas.model.audit.EntityAuditEventV2;
...@@ -31,14 +34,15 @@ import org.apache.atlas.model.instance.ClassificationAssociateRequest; ...@@ -31,14 +34,15 @@ import org.apache.atlas.model.instance.ClassificationAssociateRequest;
import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.repository.audit.EntityAuditRepository; import org.apache.atlas.repository.audit.EntityAuditRepository;
import org.apache.atlas.repository.store.graph.v2.ClassificationAssociator;
import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream; import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
import org.apache.atlas.repository.store.graph.v2.ClassificationAssociator;
import org.apache.atlas.repository.store.graph.v2.EntityStream; import org.apache.atlas.repository.store.graph.v2.EntityStream;
import org.apache.atlas.type.AtlasClassificationType; import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.FileUtils;
import org.apache.atlas.utils.AtlasPerfTracer; import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.web.util.Servlets; import org.apache.atlas.web.util.Servlets;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
...@@ -61,8 +65,14 @@ import javax.ws.rs.Path; ...@@ -61,8 +65,14 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam; import javax.ws.rs.PathParam;
import javax.ws.rs.Produces; import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam; import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context; import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
...@@ -1173,4 +1183,43 @@ public class EntityREST { ...@@ -1173,4 +1183,43 @@ public class EntityREST {
} }
} }
} }
/**
* Get the sample Template for uploading/creating bulk BusinessMetaData
*
* @return Template File
* @throws AtlasBaseException
* @HTTP 400 If the provided fileType is not supported
*/
@GET
@Path("/businessmetadata/import/template")
@Produces(MediaType.APPLICATION_OCTET_STREAM)
public Response produceTemplate() {
return Response.ok(new StreamingOutput() {
@Override
public void write(OutputStream outputStream) throws IOException, WebApplicationException {
outputStream.write(FileUtils.getBusinessMetadataHeaders().getBytes());
}
}).header("Content-Disposition", "attachment; filename=\"template_business_metadata\"").build();
}
/**
* Upload the file for creating Business Metadata in BULK
*
* @param uploadedInputStream InputStream of file
* @param fileDetail FormDataContentDisposition metadata of file
* @return
* @throws AtlasBaseException
* @HTTP 200 If Business Metadata creation was successful
* @HTTP 400 If Business Metadata definition has invalid or missing information
* @HTTP 409 If Business Metadata already exists (duplicate qualifiedName)
*/
@POST
@Path("/businessmetadata/import")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public BulkImportResponse importBMAttributes(@FormDataParam("file") InputStream uploadedInputStream,
@FormDataParam("file") FormDataContentDisposition fileDetail) throws AtlasBaseException {
return entitiesStore.bulkCreateOrUpdateBusinessAttributes(uploadedInputStream, fileDetail.getFileName());
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment