Commit 9735643c by Ashutosh Mestry

ATLAS-3046: Classification Updater tool. Unique name used.

parent b7dcfc60
...@@ -28,6 +28,8 @@ import org.apache.atlas.model.instance.AtlasClassification; ...@@ -28,6 +28,8 @@ import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasEntityHeaders; import org.apache.atlas.model.instance.AtlasEntityHeaders;
import org.apache.atlas.model.typedef.AtlasClassificationDef; import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
import org.apache.atlas.utils.AtlasJson; import org.apache.atlas.utils.AtlasJson;
...@@ -300,6 +302,7 @@ public class BulkFetchAndUpdate { ...@@ -300,6 +302,7 @@ public class BulkFetchAndUpdate {
private static final String ATTR_NAME_QUALIFIED_NAME = "qualifiedName"; private static final String ATTR_NAME_QUALIFIED_NAME = "qualifiedName";
private AtlasClientV2 atlasClientV2; private AtlasClientV2 atlasClientV2;
private Map<String, String> typeNameUniqueAttributeNameMap = new HashMap<>();
public Preparer(AtlasClientV2 atlasClientV2) { public Preparer(AtlasClientV2 atlasClientV2) {
this.atlasClientV2 = atlasClientV2; this.atlasClientV2 = atlasClientV2;
...@@ -322,7 +325,7 @@ public class BulkFetchAndUpdate { ...@@ -322,7 +325,7 @@ public class BulkFetchAndUpdate {
try { try {
classificationDef.setGuid(null); classificationDef.setGuid(null);
String json = AtlasType.toJson(classificationDef); String json = AtlasType.toJson(classificationDef);
fileWriter.write(json + "\n"); fileWriter.write(json);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error writing classifications: {}", e); LOG.error("Error writing classifications: {}", e);
displayCrLf("Error writing classifications."); displayCrLf("Error writing classifications.");
...@@ -348,11 +351,11 @@ public class BulkFetchAndUpdate { ...@@ -348,11 +351,11 @@ public class BulkFetchAndUpdate {
} }
try { try {
AtlasEntityHeaders response = atlasClientV2.getEntityHeaders(fromTimestamp); AtlasEntityHeaders entityHeaders = atlasClientV2.getEntityHeaders(fromTimestamp);
int guidHeaderMapSize = response.getGuidHeaderMap().size(); int guidHeaderMapSize = entityHeaders.getGuidHeaderMap().size();
try { try {
displayCrLf("Read entities: " + guidHeaderMapSize); displayCrLf("Read entities: " + guidHeaderMapSize);
AtlasEntityHeaders updatedHeaders = removeEntityGuids(response); AtlasEntityHeaders updatedHeaders = removeEntityGuids(entityHeaders);
fileWriter.write(AtlasType.toJson(updatedHeaders)); fileWriter.write(AtlasType.toJson(updatedHeaders));
displayCrLf("Writing entities: " + updatedHeaders.getGuidHeaderMap().size()); displayCrLf("Writing entities: " + updatedHeaders.getGuidHeaderMap().size());
...@@ -368,12 +371,17 @@ public class BulkFetchAndUpdate { ...@@ -368,12 +371,17 @@ public class BulkFetchAndUpdate {
} }
private AtlasEntityHeaders removeEntityGuids(AtlasEntityHeaders headers) { private AtlasEntityHeaders removeEntityGuids(AtlasEntityHeaders headers) {
Map<String, AtlasEntityHeader> qualifiedNameHeaderMap = new HashMap<>(); Map<String, AtlasEntityHeader> uniqueNameEntityHeaderMap = new HashMap<>();
for (AtlasEntityHeader header : headers.getGuidHeaderMap().values()) { for (AtlasEntityHeader header : headers.getGuidHeaderMap().values()) {
String qualifiedName = getQualifiedName(header); String uniqueName = getUniqueName(header);
displayCrLf("Processing: " + qualifiedName); if (StringUtils.isEmpty(uniqueName)) {
displayCrLf("UniqueName is empty. Ignoring: " + header.getGuid());
LOG.warn("UniqueName is empty. Ignoring: {}", AtlasJson.toJson(header));
continue;
}
displayCrLf("Processing: " + uniqueName);
if (header.getStatus() == DELETED) { if (header.getStatus() == DELETED) {
continue; continue;
} }
...@@ -383,37 +391,72 @@ public class BulkFetchAndUpdate { ...@@ -383,37 +391,72 @@ public class BulkFetchAndUpdate {
continue; continue;
} }
boolean keyFound = qualifiedNameHeaderMap.containsKey(qualifiedName); String key = String.format("%s:%s", header.getTypeName(), uniqueName);
boolean keyFound = uniqueNameEntityHeaderMap.containsKey(key);
if (!keyFound) { if (!keyFound) {
qualifiedNameHeaderMap.put(qualifiedName, header); uniqueNameEntityHeaderMap.put(key, header);
} }
AtlasEntityHeader currentHeader = qualifiedNameHeaderMap.get(qualifiedName); updateClassificationsForHeader(header, uniqueNameEntityHeaderMap.get(key), keyFound);
for (AtlasClassification c : header.getClassifications()) { displayCrLf("Processing: " + uniqueName);
c.setEntityGuid(null); }
if (keyFound) { displayCrLf("Processed: " + uniqueNameEntityHeaderMap.size());
boolean found = headers.setGuidHeaderMap(uniqueNameEntityHeaderMap);
currentHeader.getClassifications().stream().anyMatch(ox -> ox.getTypeName().equals(c.getTypeName())); return headers;
if (!found) { }
currentHeader.getClassifications().add(c);
} else { private void updateClassificationsForHeader(AtlasEntityHeader header, AtlasEntityHeader currentHeader, boolean keyFound) {
displayCrLf("Ignoring: " + c.toString()); for (AtlasClassification c : header.getClassifications()) {
LOG.warn("Ignoring: {}", AtlasJson.toJson(c)); c.setEntityGuid(null);
}
if (keyFound) {
boolean found =
currentHeader.getClassifications().stream().anyMatch(ox -> ox.getTypeName().equals(c.getTypeName()));
if (!found) {
currentHeader.getClassifications().add(c);
} else {
displayCrLf("Ignoring: " + c.toString());
LOG.warn("Ignoring: {}", AtlasJson.toJson(c));
} }
} }
}
}
displayCrLf("Processing: " + qualifiedName); private String getUniqueName(AtlasEntityHeader header) {
String uniqueAttributeName = ATTR_NAME_QUALIFIED_NAME;
if (!header.getAttributes().containsKey(ATTR_NAME_QUALIFIED_NAME)) {
uniqueAttributeName = getUniqueAttribute(header.getTypeName());
} }
displayCrLf("Processed: " + qualifiedNameHeaderMap.size()); Object attrValue = header.getAttribute(uniqueAttributeName);
headers.setGuidHeaderMap(qualifiedNameHeaderMap); if (attrValue == null) {
return headers; LOG.warn("Unique Attribute Value: empty: {}", AtlasJson.toJson(header));
return StringUtils.EMPTY;
}
return attrValue.toString();
} }
private String getQualifiedName(AtlasEntityHeader header) { private String getUniqueAttribute(String typeName) {
return (String) header.getAttribute(ATTR_NAME_QUALIFIED_NAME); try {
if (typeNameUniqueAttributeNameMap.containsKey(typeName)) {
return typeNameUniqueAttributeNameMap.get(typeName);
}
AtlasEntityDef entityDef = atlasClientV2.getEntityDefByName(typeName);
for (AtlasStructDef.AtlasAttributeDef ad : entityDef.getAttributeDefs()) {
if (ad.getIsUnique()) {
typeNameUniqueAttributeNameMap.put(typeName, ad.getName());
return ad.getName();
}
}
} catch (AtlasServiceException e) {
LOG.error("Error fetching type: {}", typeName, e);
return null;
}
return null;
} }
private List<AtlasClassificationDef> getAllClassificationsDefs() throws Exception { private List<AtlasClassificationDef> getAllClassificationsDefs() throws Exception {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment