Commit 86ba342f by Madhan Neethiraj

ATLAS-3825: updated import business-metadata to set business attribute values on…

ATLAS-3825: updated import business-metadata to set business attribute values on appropriate entities
parent 2ed6a48d
...@@ -95,6 +95,10 @@ public class BulkImportResponse { ...@@ -95,6 +95,10 @@ public class BulkImportResponse {
this(parentObjectName, childObjectName, importStatus, "",-1); this(parentObjectName, childObjectName, importStatus, "",-1);
} }
public ImportInfo( ImportStatus importStatus, String remarks) {
this("","", importStatus, remarks, -1);
}
public ImportInfo( ImportStatus importStatus, String remarks, Integer rowNumber) { public ImportInfo( ImportStatus importStatus, String remarks, Integer rowNumber) {
this("","", importStatus, remarks, rowNumber); this("","", importStatus, remarks, rowNumber);
} }
......
...@@ -41,7 +41,6 @@ import org.apache.atlas.model.instance.AtlasEntityHeaders; ...@@ -41,7 +41,6 @@ import org.apache.atlas.model.instance.AtlasEntityHeaders;
import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef; import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.AtlasVertex;
...@@ -1551,28 +1550,33 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { ...@@ -1551,28 +1550,33 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
@GraphTransaction @GraphTransaction
public BulkImportResponse bulkCreateOrUpdateBusinessAttributes(InputStream inputStream, String fileName) throws AtlasBaseException { public BulkImportResponse bulkCreateOrUpdateBusinessAttributes(InputStream inputStream, String fileName) throws AtlasBaseException {
BulkImportResponse ret = new BulkImportResponse(); BulkImportResponse ret = new BulkImportResponse();
try { try {
if (StringUtils.isBlank(fileName)) { if (StringUtils.isBlank(fileName)) {
throw new AtlasBaseException(AtlasErrorCode.FILE_NAME_NOT_FOUND, fileName); throw new AtlasBaseException(AtlasErrorCode.FILE_NAME_NOT_FOUND, fileName);
} }
List<String[]> fileData = FileUtils.readFileData(fileName, inputStream);
List<String[]> fileData = FileUtils.readFileData(fileName, inputStream);
Map<String, AtlasEntity> attributesToAssociate = getBusinessMetadataDefList(fileData, ret); Map<String, AtlasEntity> attributesToAssociate = getBusinessMetadataDefList(fileData, ret);
for (Map.Entry<String, AtlasEntity> entry : attributesToAssociate.entrySet()) { for (AtlasEntity entity : attributesToAssociate.values()) {
AtlasEntity entity = entry.getValue(); try {
try{
addOrUpdateBusinessAttributes(entity.getGuid(), entity.getBusinessAttributes(), true); addOrUpdateBusinessAttributes(entity.getGuid(), entity.getBusinessAttributes(), true);
BulkImportResponse.ImportInfo successImportInfo = new BulkImportResponse.ImportInfo(entity.getAttribute(Constants.QUALIFIED_NAME).toString(), entity.getBusinessAttributes().toString());
BulkImportResponse.ImportInfo successImportInfo = new BulkImportResponse.ImportInfo(entity.getGuid(), entity.getBusinessAttributes().toString());
ret.setSuccessImportInfoList(successImportInfo); ret.setSuccessImportInfoList(successImportInfo);
}catch(Exception e){ }catch (Exception e) {
LOG.error("Error occurred while updating BusinessMetadata Attributes for Entity "+entity.getAttribute(Constants.QUALIFIED_NAME).toString()); LOG.error("Error occurred while updating BusinessMetadata Attributes for Entity " + entity.getGuid());
BulkImportResponse.ImportInfo failedImportInfo = new BulkImportResponse.ImportInfo(entity.getAttribute(Constants.QUALIFIED_NAME).toString(), entity.getBusinessAttributes().toString(), BulkImportResponse.ImportStatus.FAILED, e.getMessage());
ret.setFailedImportInfoList(failedImportInfo); BulkImportResponse.ImportInfo failedImportInfo = new BulkImportResponse.ImportInfo(entity.getGuid(), entity.getBusinessAttributes().toString(), BulkImportResponse.ImportStatus.FAILED, e.getMessage());
ret.getFailedImportInfoList().add(failedImportInfo);
} }
} }
} catch (IOException e) { } catch (IOException e) {
LOG.error("An Exception occurred while uploading the file : "+e.getMessage()); LOG.error("An Exception occurred while uploading the file {}", fileName, e);
throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_UPLOAD, e); throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_UPLOAD, e);
} }
...@@ -1581,104 +1585,132 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { ...@@ -1581,104 +1585,132 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
private Map<String, AtlasEntity> getBusinessMetadataDefList(List<String[]> fileData, BulkImportResponse bulkImportResponse) throws AtlasBaseException { private Map<String, AtlasEntity> getBusinessMetadataDefList(List<String[]> fileData, BulkImportResponse bulkImportResponse) throws AtlasBaseException {
Map<String, AtlasEntity> ret = new HashMap<>(); Map<String, AtlasEntity> ret = new HashMap<>();
Map<String, Map<String, Object>> newBMAttributes = new HashMap<>();
Map<String, AtlasVertex> vertexCache = new HashMap<>(); Map<String, AtlasVertex> vertexCache = new HashMap<>();
Map<String, Object> attribute = new HashMap<>(); List<String> failedMsgList = new ArrayList<>();
for (int lineIndex = 0; lineIndex < fileData.size(); lineIndex++) { for (int lineIndex = 0; lineIndex < fileData.size(); lineIndex++) {
List<String> failedTermMsgList = new ArrayList<>();
AtlasEntity atlasEntity = new AtlasEntity();
String[] record = fileData.get(lineIndex); String[] record = fileData.get(lineIndex);
if (missingFieldsCheck(record, bulkImportResponse, lineIndex+1)) {
boolean missingFields = record.length < FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX ||
StringUtils.isBlank(record[FileUtils.TYPENAME_COLUMN_INDEX]) ||
StringUtils.isBlank(record[FileUtils.UNIQUE_ATTR_VALUE_COLUMN_INDEX]) ||
StringUtils.isBlank(record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX]) ||
StringUtils.isBlank(record[FileUtils.BM_ATTR_VALUE_COLUMN_INDEX]);
if (missingFields){
failedMsgList.add("Line #" + (lineIndex + 1) + ": missing fields. " + Arrays.toString(record));
continue; continue;
} }
String typeName = record[FileUtils.TYPENAME_COLUMN_INDEX]; String typeName = record[FileUtils.TYPENAME_COLUMN_INDEX];
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
if (entityType == null) {
failedMsgList.add("Line #" + (lineIndex + 1) + ": invalid entity-type '" + typeName + "'");
continue;
}
String uniqueAttrValue = record[FileUtils.UNIQUE_ATTR_VALUE_COLUMN_INDEX]; String uniqueAttrValue = record[FileUtils.UNIQUE_ATTR_VALUE_COLUMN_INDEX];
String bmAttribute = record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX]; String bmAttribute = record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX];
String bmAttributeValue = record[FileUtils.BM_ATTR_VALUE_COLUMN_INDEX]; String bmAttributeValue = record[FileUtils.BM_ATTR_VALUE_COLUMN_INDEX];
String uniqueAttrName = Constants.QUALIFIED_NAME; String uniqueAttrName = AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME;
if (record.length > FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX) { if (record.length > FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX) {
uniqueAttrName = typeName+"."+record[FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX]; uniqueAttrName = record[FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX];
} }
if (validateTypeName(typeName, bulkImportResponse, lineIndex+1)) { AtlasAttribute uniqueAttribute = entityType.getAttribute(uniqueAttrName);
if (uniqueAttribute == null) {
failedMsgList.add("Line #" + (lineIndex + 1) + ": attribute '" + uniqueAttrName + "' not found in entity-type '" + typeName + "'");
continue; continue;
} }
String vertexKey = typeName + "_" + uniqueAttrName + "_" + uniqueAttrValue; if (!uniqueAttribute.getAttributeDef().getIsUnique()) {
AtlasVertex atlasVertex = vertexCache.get(vertexKey); failedMsgList.add("Line #" + (lineIndex + 1) + ": attribute '" + uniqueAttrName + "' is not an unique attribute in entity-type '" + typeName + "'");
if (atlasVertex == null) {
atlasVertex = AtlasGraphUtilsV2.findByTypeAndUniquePropertyName(graph, typeName, uniqueAttrName, uniqueAttrValue);
}
if (atlasVertex == null) { continue;
LOG.error("Provided UniqueAttributeValue is not valid : " + uniqueAttrValue + " at line #" + (lineIndex + 1));
failedTermMsgList.add("Provided UniqueAttributeValue is not valid : " + uniqueAttrValue + " at line #" + (lineIndex + 1));
} }
vertexCache.put(vertexKey, atlasVertex); String vertexKey = uniqueAttribute.getVertexPropertyName() + "_" + uniqueAttrValue;
String[] businessAttributeName = bmAttribute.split(FileUtils.ESCAPE_CHARACTER + "."); AtlasVertex vertex = vertexCache.get(vertexKey);
if (validateBMAttributeName(uniqueAttrValue,bmAttribute,bulkImportResponse,lineIndex+1)) {
if (vertex == null) {
vertex = AtlasGraphUtilsV2.findByTypeAndUniquePropertyName(graph, typeName, uniqueAttribute.getVertexUniquePropertyName(), uniqueAttrValue);
if (vertex == null) {
failedMsgList.add("Line #" + (lineIndex + 1) + ": no " + typeName + " entity found with " + uniqueAttrName + "=" + uniqueAttrValue);
continue; continue;
} }
String bMName = businessAttributeName[0]; vertexCache.put(vertexKey, vertex);
String bMAttributeName = businessAttributeName[1];
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
if (validateBMAttribute(uniqueAttrValue, businessAttributeName, entityType, bulkImportResponse,lineIndex+1)) {
continue;
} }
AtlasBusinessAttribute atlasBusinessAttribute = entityType.getBusinessAttributes().get(bMName).get(bMAttributeName); AtlasBusinessAttribute businessAttribute = entityType.getBusinesAAttribute(bmAttribute);
if (atlasBusinessAttribute.getAttributeType().getTypeCategory() == TypeCategory.ARRAY) {
AtlasArrayType arrayType = (AtlasArrayType) atlasBusinessAttribute.getAttributeType(); if (businessAttribute == null) {
List attributeValueData; failedMsgList.add("Line #" + (lineIndex + 1) + ": invalid business attribute name '" + bmAttribute + "'");
if(arrayType.getElementType() instanceof AtlasEnumType){ continue;
attributeValueData = AtlasGraphUtilsV2.assignEnumValues(bmAttributeValue, (AtlasEnumType) arrayType.getElementType(), failedTermMsgList, lineIndex+1);
}else{
attributeValueData = assignMultipleValues(bmAttributeValue, arrayType.getElementTypeName(), failedTermMsgList, lineIndex+1);
} }
attribute.put(bmAttribute, attributeValueData);
final Object attrValue;
if (businessAttribute.getAttributeType().getTypeCategory() == TypeCategory.ARRAY) {
AtlasArrayType arrayType = (AtlasArrayType) businessAttribute.getAttributeType();
List arrayValue;
if (arrayType.getElementType() instanceof AtlasEnumType) {
arrayValue = AtlasGraphUtilsV2.assignEnumValues(bmAttributeValue, (AtlasEnumType) arrayType.getElementType(), failedMsgList, lineIndex+1);
} else { } else {
attribute.put(bmAttribute, bmAttributeValue); arrayValue = assignMultipleValues(bmAttributeValue, arrayType.getElementTypeName(), failedMsgList, lineIndex+1);
} }
if(failedMsgCheck(uniqueAttrValue,bmAttribute, failedTermMsgList, bulkImportResponse, lineIndex+1)) { attrValue = arrayValue;
continue; } else {
attrValue = bmAttributeValue;
} }
if(ret.containsKey(vertexKey)) { if (ret.containsKey(vertexKey)) {
atlasEntity = ret.get(vertexKey); AtlasEntity entity = ret.get(vertexKey);
atlasEntity.setBusinessAttribute(bMName, bMAttributeName, attribute.get(bmAttribute));
ret.put(vertexKey, atlasEntity); entity.setBusinessAttribute(businessAttribute.getDefinedInType().getTypeName(), businessAttribute.getName(), attrValue);
} else { } else {
String guid = GraphHelper.getGuid(atlasVertex); AtlasEntity entity = new AtlasEntity();
atlasEntity.setGuid(guid); String guid = GraphHelper.getGuid(vertex);
atlasEntity.setTypeName(typeName); Map<String, Map<String, Object>> businessAttributes = entityRetriever.getBusinessMetadata(vertex);
atlasEntity.setAttribute(Constants.QUALIFIED_NAME,uniqueAttrValue);
newBMAttributes = entityRetriever.getBusinessMetadata(atlasVertex) != null ? entityRetriever.getBusinessMetadata(atlasVertex) : newBMAttributes; entity.setGuid(guid);
atlasEntity.setBusinessAttributes(newBMAttributes); entity.setTypeName(typeName);
atlasEntity.setBusinessAttribute(bMName, bMAttributeName, attribute.get(bmAttribute)); entity.setAttribute(uniqueAttribute.getName(), uniqueAttrValue);
ret.put(vertexKey, atlasEntity);
if (businessAttributes == null) {
businessAttributes = new HashMap<>();
} }
entity.setBusinessAttributes(businessAttributes);
entity.setBusinessAttribute(businessAttribute.getDefinedInType().getTypeName(), businessAttribute.getName(), attrValue);
ret.put(vertexKey, entity);
} }
return ret;
} }
private boolean validateTypeName(String typeName, BulkImportResponse bulkImportResponse, int lineIndex) { for (String failedMsg : failedMsgList) {
boolean ret = false; LOG.error(failedMsg);
BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(BulkImportResponse.ImportStatus.FAILED, failedMsg);
if(!typeRegistry.getAllEntityDefNames().contains(typeName)){
ret = true;
LOG.error("Invalid entity-type: " + typeName + " at line #" + lineIndex);
String failedTermMsgs = "Invalid entity-type: " + typeName + " at line #" + lineIndex;
BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(BulkImportResponse.ImportStatus.FAILED, failedTermMsgs, lineIndex);
bulkImportResponse.getFailedImportInfoList().add(importInfo); bulkImportResponse.getFailedImportInfoList().add(importInfo);
} }
return ret; return ret;
} }
private List assignMultipleValues(String bmAttributeValues, String elementTypeName, List failedTermMsgList, int lineIndex) { private List assignMultipleValues(String bmAttributeValues, String elementTypeName, List failedTermMsgList, int lineIndex) {
String[] arr = bmAttributeValues.split(FileUtils.ESCAPE_CHARACTER + FileUtils.PIPE_CHARACTER); String[] arr = bmAttributeValues.split(FileUtils.ESCAPE_CHARACTER + FileUtils.PIPE_CHARACTER);
...@@ -1731,44 +1763,4 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { ...@@ -1731,44 +1763,4 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
} }
return missingFieldsCheck; return missingFieldsCheck;
} }
private boolean validateBMAttributeName(String uniqueAttrValue, String bmAttribute, BulkImportResponse bulkImportResponse, int lineIndex) {
boolean ret = false;
String[] businessAttributeName = bmAttribute.split(FileUtils.ESCAPE_CHARACTER + ".");
if(businessAttributeName.length < 2){
LOG.error("Provided businessAttributeName is not in proper format : " + bmAttribute + " at line #" + lineIndex);
String failedTermMsgs = "Provided businessAttributeName is not in proper format : " + bmAttribute + " at line #" + lineIndex;
BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(uniqueAttrValue, bmAttribute, BulkImportResponse.ImportStatus.FAILED, failedTermMsgs, lineIndex);
bulkImportResponse.getFailedImportInfoList().add(importInfo);
ret = true;
}
return ret;
}
private boolean validateBMAttribute(String uniqueAttrValue,String[] businessAttributeName, AtlasEntityType entityType, BulkImportResponse bulkImportResponse, int lineIndex) {
boolean ret = false;
String bMName = businessAttributeName[0];
String bMAttributeName = businessAttributeName[1];
if(entityType.getBusinessAttributes(bMName) == null ||
entityType.getBusinessAttributes(bMName).get(bMAttributeName) == null){
ret = true;
LOG.error("Provided businessAttributeName is not valid : " + bMName+"."+bMAttributeName + " at line #" + lineIndex);
String failedTermMsgs = "Provided businessAttributeName is not valid : " + bMName+"."+bMAttributeName + " at line #" + lineIndex;
BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(uniqueAttrValue, bMName+"."+bMAttributeName, BulkImportResponse.ImportStatus.FAILED, failedTermMsgs, lineIndex);
bulkImportResponse.getFailedImportInfoList().add(importInfo);
}
return ret;
}
private boolean failedMsgCheck(String uniqueAttrValue, String bmAttribute, List<String> failedTermMsgList,BulkImportResponse bulkImportResponse,int lineIndex) {
boolean ret = false;
if(!failedTermMsgList.isEmpty()){
ret = true;
String failedTermMsg = StringUtils.join(failedTermMsgList, "\n");
BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(uniqueAttrValue, bmAttribute, BulkImportResponse.ImportStatus.FAILED, failedTermMsg, lineIndex);
bulkImportResponse.getFailedImportInfoList().add(importInfo);
}
return ret;
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment