Commit 3e4fb5cd by Madhan Neethiraj

ATLAS-3054: updated notification pre-process to handle updates to ownedRef attributes - #3

parent d234de2d
...@@ -21,12 +21,14 @@ package org.apache.atlas.type; ...@@ -21,12 +21,14 @@ package org.apache.atlas.type;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef; import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.lang.reflect.Array; import java.lang.reflect.Array;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
...@@ -43,33 +45,36 @@ public class AtlasArrayType extends AtlasType { ...@@ -43,33 +45,36 @@ public class AtlasArrayType extends AtlasType {
private final String elementTypeName; private final String elementTypeName;
private int minCount; private int minCount;
private int maxCount; private int maxCount;
private Cardinality cardinality;
private AtlasType elementType; private AtlasType elementType;
public AtlasArrayType(AtlasType elementType) { public AtlasArrayType(AtlasType elementType) {
this(elementType, COUNT_NOT_SET, COUNT_NOT_SET); this(elementType, COUNT_NOT_SET, COUNT_NOT_SET, Cardinality.LIST);
} }
public AtlasArrayType(AtlasType elementType, int minCount, int maxCount) { public AtlasArrayType(AtlasType elementType, int minCount, int maxCount, Cardinality cardinality) {
super(AtlasBaseTypeDef.getArrayTypeName(elementType.getTypeName()), TypeCategory.ARRAY, SERVICE_TYPE_ATLAS_CORE); super(AtlasBaseTypeDef.getArrayTypeName(elementType.getTypeName()), TypeCategory.ARRAY, SERVICE_TYPE_ATLAS_CORE);
this.elementTypeName = elementType.getTypeName(); this.elementTypeName = elementType.getTypeName();
this.minCount = minCount; this.minCount = minCount;
this.maxCount = maxCount; this.maxCount = maxCount;
this.cardinality = cardinality;
this.elementType = elementType; this.elementType = elementType;
} }
public AtlasArrayType(String elementTypeName, AtlasTypeRegistry typeRegistry) throws AtlasBaseException { public AtlasArrayType(String elementTypeName, AtlasTypeRegistry typeRegistry) throws AtlasBaseException {
this(elementTypeName, COUNT_NOT_SET, COUNT_NOT_SET, typeRegistry); this(elementTypeName, COUNT_NOT_SET, COUNT_NOT_SET, Cardinality.LIST, typeRegistry);
} }
public AtlasArrayType(String elementTypeName, int minCount, int maxCount, AtlasTypeRegistry typeRegistry) public AtlasArrayType(String elementTypeName, int minCount, int maxCount, Cardinality cardinality, AtlasTypeRegistry typeRegistry)
throws AtlasBaseException { throws AtlasBaseException {
super(AtlasBaseTypeDef.getArrayTypeName(elementTypeName), TypeCategory.ARRAY, SERVICE_TYPE_ATLAS_CORE); super(AtlasBaseTypeDef.getArrayTypeName(elementTypeName), TypeCategory.ARRAY, SERVICE_TYPE_ATLAS_CORE);
this.elementTypeName = elementTypeName; this.elementTypeName = elementTypeName;
this.minCount = minCount; this.minCount = minCount;
this.maxCount = maxCount; this.maxCount = maxCount;
this.cardinality = cardinality;
this.resolveReferences(typeRegistry); this.resolveReferences(typeRegistry);
} }
...@@ -90,6 +95,10 @@ public class AtlasArrayType extends AtlasType { ...@@ -90,6 +95,10 @@ public class AtlasArrayType extends AtlasType {
return maxCount; return maxCount;
} }
public void setCardinality(Cardinality cardinality) { this.cardinality = cardinality; }
public Cardinality getCardinality() { return cardinality; }
public AtlasType getElementType() { public AtlasType getElementType() {
return elementType; return elementType;
} }
...@@ -151,79 +160,12 @@ public class AtlasArrayType extends AtlasType { ...@@ -151,79 +160,12 @@ public class AtlasArrayType extends AtlasType {
@Override @Override
public boolean areEqualValues(Object val1, Object val2, Map<String, String> guidAssignments) { public boolean areEqualValues(Object val1, Object val2, Map<String, String> guidAssignments) {
boolean ret = true; final boolean ret;
if (val1 == null) { if (cardinality == Cardinality.SET) {
ret = val2 == null; ret = areEqualSets(val1, val2, guidAssignments);
} else if (val2 == null) {
ret = false;
} else { } else {
if (val1.getClass().isArray() && val2.getClass().isArray()) { ret = areEqualLists(val1, val2, guidAssignments);
int len = Array.getLength(val1);
if (len != Array.getLength(val2)) {
ret = false;
} else {
for (int i = 0; i < len; i++) {
if (!elementType.areEqualValues(Array.get(val1, i), Array.get(val2, i), guidAssignments)) {
ret = false;
break;
}
}
}
} else if ((val1 instanceof Set) && (val2 instanceof Set)) {
Set set1 = (Set) val1;
Set set2 = (Set) val2;
if (set1.size() != set2.size()) {
ret = false;
} else {
for (Object elem1 : set1) {
boolean foundInSet2 = false;
for (Object elem2 : set2) {
if (elementType.areEqualValues(elem1, elem2, guidAssignments)) {
foundInSet2 = true;
break;
}
}
if (!foundInSet2) {
ret = false;
break;
}
}
}
} else {
List list1 = getListFromValue(val1);
if (list1 == null) {
ret = false;
} else {
List list2 = getListFromValue(val2);
if (list2 == null) {
ret = false;
} else {
int len = list1.size();
if (len != list2.size()) {
ret = false;
} else {
for (int i = 0; i < len; i++) {
if (!elementType.areEqualValues(list1.get(i), list2.get(i), guidAssignments)) {
ret = false;
break;
}
}
}
}
}
}
} }
return ret; return ret;
...@@ -266,128 +208,120 @@ public class AtlasArrayType extends AtlasType { ...@@ -266,128 +208,120 @@ public class AtlasArrayType extends AtlasType {
@Override @Override
public Collection<?> getNormalizedValue(Object obj) { public Collection<?> getNormalizedValue(Object obj) {
if (obj == null) { Collection<Object> ret = null;
return null;
}
if (obj instanceof String){ if (obj instanceof String) {
obj = AtlasType.fromJson(obj.toString(), List.class); obj = AtlasType.fromJson(obj.toString(), List.class);
} }
if (obj instanceof List || obj instanceof Set) { if (obj instanceof List || obj instanceof Set) {
List<Object> ret = new ArrayList<>(); Collection collObj = (Collection) obj;
Collection objList = (Collection) obj; if (isValidElementCount(collObj.size())) {
ret = new ArrayList<>(collObj.size());
if (!isValidElementCount(objList.size())) { for (Object element : collObj) {
return null; if (element != null) {
} Object normalizedValue = elementType.getNormalizedValue(element);
for (Object element : objList) { if (normalizedValue != null) {
if (element != null) { ret.add(normalizedValue);
Object normalizedValue = elementType.getNormalizedValue(element); } else {
ret = null; // invalid element value
if (normalizedValue != null) { break;
ret.add(normalizedValue); }
} else { } else {
return null; // invalid element value ret.add(element);
} }
} else {
ret.add(element);
} }
} }
} else if (obj != null && obj.getClass().isArray()) {
return ret;
} else if (obj.getClass().isArray()) {
List<Object> ret = new ArrayList<>();
int arrayLen = Array.getLength(obj); int arrayLen = Array.getLength(obj);
if (!isValidElementCount(arrayLen)) { if (isValidElementCount(arrayLen)) {
return null; ret = new ArrayList<>(arrayLen);
}
for (int i = 0; i < arrayLen; i++) { for (int i = 0; i < arrayLen; i++) {
Object element = Array.get(obj, i); Object element = Array.get(obj, i);
if (element != null) { if (element != null) {
Object normalizedValue = elementType.getNormalizedValue(element); Object normalizedValue = elementType.getNormalizedValue(element);
if (normalizedValue != null) {
ret.add(normalizedValue);
} else {
ret = null; // invalid element value
if (normalizedValue != null) { break;
ret.add(normalizedValue); }
} else { } else {
return null; // invalid element value ret.add(element);
} }
} else {
ret.add(element);
} }
} }
return ret;
} }
return null; return ret;
} }
@Override @Override
public Collection<?> getNormalizedValueForUpdate(Object obj) { public Collection<?> getNormalizedValueForUpdate(Object obj) {
if (obj == null) { Collection<Object> ret = null;
return null;
if (obj instanceof String) {
obj = AtlasType.fromJson(obj.toString(), List.class);
} }
if (obj instanceof List || obj instanceof Set) { if (obj instanceof List || obj instanceof Set) {
List<Object> ret = new ArrayList<>();
Collection objList = (Collection) obj; Collection objList = (Collection) obj;
if (!isValidElementCount(objList.size())) { if (isValidElementCount(objList.size())) {
return null; ret = new ArrayList<>(objList.size());
}
for (Object element : objList) { for (Object element : objList) {
if (element != null) { if (element != null) {
Object normalizedValue = elementType.getNormalizedValueForUpdate(element); Object normalizedValue = elementType.getNormalizedValueForUpdate(element);
if (normalizedValue != null) { if (normalizedValue != null) {
ret.add(normalizedValue); ret.add(normalizedValue);
} else {
ret = null; // invalid element value
break;
}
} else { } else {
return null; // invalid element value ret.add(element);
} }
} else {
ret.add(element);
} }
} }
} else if (obj != null && obj.getClass().isArray()) {
return ret;
} else if (obj.getClass().isArray()) {
List<Object> ret = new ArrayList<>();
int arrayLen = Array.getLength(obj); int arrayLen = Array.getLength(obj);
if (!isValidElementCount(arrayLen)) { if (isValidElementCount(arrayLen)) {
return null; ret = new ArrayList<>(arrayLen);
}
for (int i = 0; i < arrayLen; i++) { for (int i = 0; i < arrayLen; i++) {
Object element = Array.get(obj, i); Object element = Array.get(obj, i);
if (element != null) { if (element != null) {
Object normalizedValue = elementType.getNormalizedValueForUpdate(element); Object normalizedValue = elementType.getNormalizedValueForUpdate(element);
if (normalizedValue != null) {
ret.add(normalizedValue);
} else {
ret = null; // invalid element value
if (normalizedValue != null) { break;
ret.add(normalizedValue); }
} else { } else {
return null; // invalid element value ret.add(element);
} }
} else {
ret.add(element);
} }
} }
return ret;
} }
return null; return ret;
} }
@Override @Override
...@@ -483,7 +417,7 @@ public class AtlasArrayType extends AtlasType { ...@@ -483,7 +417,7 @@ public class AtlasArrayType extends AtlasType {
if (elementAttributeType == elementType) { if (elementAttributeType == elementType) {
return this; return this;
} else { } else {
AtlasType attributeType = new AtlasArrayType(elementAttributeType, minCount, maxCount); AtlasType attributeType = new AtlasArrayType(elementAttributeType, minCount, maxCount, cardinality);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("getTypeForAttribute(): {} ==> {}", getTypeName(), attributeType.getTypeName()); LOG.debug("getTypeForAttribute(): {} ==> {}", getTypeName(), attributeType.getTypeName());
...@@ -523,29 +457,131 @@ public class AtlasArrayType extends AtlasType { ...@@ -523,29 +457,131 @@ public class AtlasArrayType extends AtlasType {
return false; return false;
} }
private boolean areEqualSets(Object val1, Object val2, Map<String, String> guidAssignments) {
boolean ret = true;
if (val1 == null) {
ret = val2 == null;
} else if (val2 == null) {
ret = false;
} else if (val1 == val2) {
ret = true;
} else {
Set set1 = getSetFromValue(val1);
Set set2 = getSetFromValue(val2);
if (set1.size() != set2.size()) {
ret = false;
} else {
for (Object elem1 : set1) {
boolean foundInSet2 = false;
for (Object elem2 : set2) {
if (elementType.areEqualValues(elem1, elem2, guidAssignments)) {
foundInSet2 = true;
break;
}
}
if (!foundInSet2) {
ret = false;
break;
}
}
}
}
return ret;
}
private boolean areEqualLists(Object val1, Object val2, Map<String, String> guidAssignments) {
boolean ret = true;
if (val1 == null) {
ret = val2 == null;
} else if (val2 == null) {
ret = false;
} else if (val1 == val2) {
ret = true;
} else if (val1.getClass().isArray() && val2.getClass().isArray()) {
int len = Array.getLength(val1);
if (len != Array.getLength(val2)) {
ret = false;
} else {
for (int i = 0; i < len; i++) {
if (!elementType.areEqualValues(Array.get(val1, i), Array.get(val2, i), guidAssignments)) {
ret = false;
break;
}
}
}
} else {
List list1 = getListFromValue(val1);
List list2 = getListFromValue(val2);
if (list1.size() != list2.size()) {
ret = false;
} else {
int len = list1.size();
for (int i = 0; i < len; i++) {
if (!elementType.areEqualValues(list1.get(i), list2.get(i), guidAssignments)) {
ret = false;
break;
}
}
}
}
return ret;
}
private List getListFromValue(Object val) { private List getListFromValue(Object val) {
final List ret; final List ret;
if (val instanceof List) { if (val instanceof List) {
ret = (List) val; ret = (List) val;
} else if (val instanceof Collection) { } else if (val instanceof Collection) {
int len = ((Collection) val).size(); ret = new ArrayList<>((Collection) val);
} else if (val.getClass().isArray()) {
int len = Array.getLength(val);
ret = new ArrayList<>(len); ret = new ArrayList<>(len);
for (Object elem : ((Collection) val)) { for (int i = 0; i < len; i++) {
ret.add(elem); ret.add(Array.get(val, i));
} }
} else if (val instanceof String){
ret = AtlasType.fromJson(val.toString(), List.class);
} else {
ret = null;
}
return ret;
}
private Set getSetFromValue(Object val) {
final Set ret;
if (val instanceof Set) {
ret = (Set) val;
} else if (val instanceof Collection) {
ret = new HashSet<>((Collection) val);
} else if (val.getClass().isArray()) { } else if (val.getClass().isArray()) {
int len = Array.getLength(val); int len = Array.getLength(val);
ret = new ArrayList<>(len); ret = new HashSet<>(len);
for (int i = 0; i < len; i++) { for (int i = 0; i < len; i++) {
ret.add(Array.get(val, i)); ret.add(Array.get(val, i));
} }
} else if (val instanceof String){ } else if (val instanceof String){
ret = AtlasType.fromJson(val.toString(), List.class); ret = AtlasType.fromJson(val.toString(), Set.class);
} else { } else {
ret = null; ret = null;
} }
......
...@@ -22,7 +22,6 @@ import org.apache.atlas.AtlasErrorCode; ...@@ -22,7 +22,6 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.typedef.AtlasEntityDef; import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasEntityDef.AtlasRelationshipAttributeDef; import org.apache.atlas.model.typedef.AtlasEntityDef.AtlasRelationshipAttributeDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
...@@ -544,7 +543,9 @@ public class AtlasEntityType extends AtlasStructType { ...@@ -544,7 +543,9 @@ public class AtlasEntityType extends AtlasStructType {
superType.normalizeAttributeValues(ent); superType.normalizeAttributeValues(ent);
} }
normalizeValues(ent); super.normalizeAttributeValues(ent);
normalizeRelationshipAttributeValues(ent, false);
} }
} }
...@@ -555,6 +556,8 @@ public class AtlasEntityType extends AtlasStructType { ...@@ -555,6 +556,8 @@ public class AtlasEntityType extends AtlasStructType {
} }
super.normalizeAttributeValuesForUpdate(ent); super.normalizeAttributeValuesForUpdate(ent);
normalizeRelationshipAttributeValues(ent, true);
} }
} }
...@@ -565,7 +568,9 @@ public class AtlasEntityType extends AtlasStructType { ...@@ -565,7 +568,9 @@ public class AtlasEntityType extends AtlasStructType {
superType.normalizeAttributeValues(obj); superType.normalizeAttributeValues(obj);
} }
normalizeValues(obj); super.normalizeAttributeValues(obj);
normalizeRelationshipAttributeValues(obj, false);
} }
} }
...@@ -576,6 +581,8 @@ public class AtlasEntityType extends AtlasStructType { ...@@ -576,6 +581,8 @@ public class AtlasEntityType extends AtlasStructType {
} }
super.normalizeAttributeValuesForUpdate(obj); super.normalizeAttributeValuesForUpdate(obj);
normalizeRelationshipAttributeValues(obj, true);
} }
} }
...@@ -743,67 +750,56 @@ public class AtlasEntityType extends AtlasStructType { ...@@ -743,67 +750,56 @@ public class AtlasEntityType extends AtlasStructType {
return ret; return ret;
} }
private void normalizeRelationshipAttributeValues(AtlasStruct obj) { private void normalizeRelationshipAttributeValues(AtlasEntity entity, boolean isUpdate) {
if (obj != null && obj instanceof AtlasEntity) { if (entity != null) {
AtlasEntity entityObj = (AtlasEntity) obj;
for (String attributeName : relationshipAttributes.keySet()) { for (String attributeName : relationshipAttributes.keySet()) {
if (entityObj.hasRelationshipAttribute(attributeName)) { if (entity.hasRelationshipAttribute(attributeName)) {
Object attributeValue = entityObj.getRelationshipAttribute(attributeName); Object attributeValue = entity.getRelationshipAttribute(attributeName);
String relationshipType = AtlasEntityUtil.getRelationshipType(attributeValue); String relationshipType = AtlasEntityUtil.getRelationshipType(attributeValue);
AtlasAttribute attribute = getRelationshipAttribute(attributeName, relationshipType); AtlasAttribute attribute = getRelationshipAttribute(attributeName, relationshipType);
AtlasAttributeDef attributeDef = attribute.getAttributeDef();
attributeValue = getNormalizedValue(attributeValue, attributeDef); if (attribute != null) {
AtlasType attrType = attribute.getAttributeType();
if (isValidRelationshipType(attrType)) {
if (isUpdate) {
attributeValue = attrType.getNormalizedValueForUpdate(attributeValue);
} else {
attributeValue = attrType.getNormalizedValue(attributeValue);
}
entityObj.setRelationshipAttribute(attributeName, attributeValue); entity.setRelationshipAttribute(attributeName, attributeValue);
}
}
} }
} }
} }
} }
public void normalizeRelationshipAttributeValues(Map<String, Object> obj) { public void normalizeRelationshipAttributeValues(Map<String, Object> obj, boolean isUpdate) {
if (obj != null) { if (obj != null) {
for (String attributeName : relationshipAttributes.keySet()) { for (String attributeName : relationshipAttributes.keySet()) {
if (obj.containsKey(attributeName)) { if (obj.containsKey(attributeName)) {
Object attributeValue = obj.get(attributeName); Object attributeValue = obj.get(attributeName);
String relationshipType = AtlasEntityUtil.getRelationshipType(attributeValue); String relationshipType = AtlasEntityUtil.getRelationshipType(attributeValue);
AtlasAttribute attribute = getRelationshipAttribute(attributeName, relationshipType); AtlasAttribute attribute = getRelationshipAttribute(attributeName, relationshipType);
AtlasAttributeDef attributeDef = attribute.getAttributeDef();
attributeValue = getNormalizedValue(attributeValue, attributeDef);
obj.put(attributeName, attributeValue);
}
}
}
}
private Object getNormalizedValue(Object value, AtlasAttributeDef attributeDef) { if (attribute != null) {
String relationshipType = AtlasEntityUtil.getRelationshipType(value); AtlasType attrType = attribute.getAttributeType();
AtlasAttribute attribute = getRelationshipAttribute(attributeDef.getName(), relationshipType);
if (attribute != null) { if (isValidRelationshipType(attrType)) {
AtlasType attrType = attribute.getAttributeType(); if (isUpdate) {
attributeValue = attrType.getNormalizedValueForUpdate(attributeValue);
} else {
attributeValue = attrType.getNormalizedValue(attributeValue);
}
if (isValidRelationshipType(attrType) && value != null) { obj.put(attributeName, attributeValue);
return attrType.getNormalizedValue(value); }
}
}
} }
} }
return null;
}
private void normalizeValues(AtlasEntity ent) {
super.normalizeAttributeValues(ent);
normalizeRelationshipAttributeValues(ent);
}
private void normalizeValues(Map<String, Object> obj) {
super.normalizeAttributeValues(obj);
normalizeRelationshipAttributeValues(obj);
} }
private boolean validateRelationshipAttributes(Object obj, String objName, List<String> messages) { private boolean validateRelationshipAttributes(Object obj, String objName, List<String> messages) {
......
...@@ -346,8 +346,15 @@ public class AtlasRelationshipType extends AtlasStructType { ...@@ -346,8 +346,15 @@ public class AtlasRelationshipType extends AtlasStructType {
attributeDef.addConstraint(constraint); attributeDef.addConstraint(constraint);
} }
attribute = new AtlasAttribute(entityType, attributeDef, AtlasType attrType = typeRegistry.getType(attrTypeName);
typeRegistry.getType(attrTypeName), getTypeName(), relationshipLabel);
if (attrType instanceof AtlasArrayType) {
AtlasArrayType arrayType = (AtlasArrayType) attrType;
arrayType.setCardinality(attributeDef.getCardinality());
}
attribute = new AtlasAttribute(entityType, attributeDef, attrType, getTypeName(), relationshipLabel);
attribute.setLegacyAttribute(endDef.getIsLegacyAttribute()); attribute.setLegacyAttribute(endDef.getIsLegacyAttribute());
} else { } else {
......
...@@ -99,6 +99,7 @@ public class AtlasStructType extends AtlasType { ...@@ -99,6 +99,7 @@ public class AtlasStructType extends AtlasType {
arrayType.setMinCount(attributeDef.getValuesMinCount()); arrayType.setMinCount(attributeDef.getValuesMinCount());
arrayType.setMaxCount(attributeDef.getValuesMaxCount()); arrayType.setMaxCount(attributeDef.getValuesMaxCount());
arrayType.setCardinality(cardinality);
} }
//check if attribute type is not classification //check if attribute type is not classification
......
...@@ -38,6 +38,7 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinali ...@@ -38,6 +38,7 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinali
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef;
import org.apache.atlas.model.typedef.AtlasTypeDefHeader; import org.apache.atlas.model.typedef.AtlasTypeDefHeader;
import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.v1.model.typedef.AttributeDefinition; import org.apache.atlas.v1.model.typedef.AttributeDefinition;
import org.apache.atlas.v1.model.typedef.ClassTypeDefinition; import org.apache.atlas.v1.model.typedef.ClassTypeDefinition;
import org.apache.atlas.v1.model.typedef.Multiplicity; import org.apache.atlas.v1.model.typedef.Multiplicity;
...@@ -413,10 +414,36 @@ public class AtlasTypeUtil { ...@@ -413,10 +414,36 @@ public class AtlasTypeUtil {
return new AtlasRelatedObjectId(getAtlasObjectId(entity)); return new AtlasRelatedObjectId(getAtlasObjectId(entity));
} }
public static AtlasRelatedObjectId toAtlasRelatedObjectId(AtlasEntity entity, AtlasTypeRegistry typeRegistry) {
return new AtlasRelatedObjectId(getAtlasObjectId(entity, typeRegistry));
}
public static AtlasObjectId getAtlasObjectId(AtlasEntity entity) { public static AtlasObjectId getAtlasObjectId(AtlasEntity entity) {
return new AtlasObjectId(entity.getGuid(), entity.getTypeName()); return new AtlasObjectId(entity.getGuid(), entity.getTypeName());
} }
public static AtlasObjectId getAtlasObjectId(AtlasEntity entity, AtlasTypeRegistry typeRegistry) {
String typeName = entity.getTypeName();
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
Map<String, Object> uniqAttributes = null;
if (entityType != null && MapUtils.isNotEmpty(entityType.getUniqAttributes())) {
for (AtlasAttribute attribute : entityType.getUniqAttributes().values()) {
Object attrValue = entity.getAttribute(attribute.getName());
if (attrValue != null) {
if (uniqAttributes == null) {
uniqAttributes = new HashMap<>();
}
uniqAttributes.put(attribute.getName(), attrValue);
}
}
}
return new AtlasObjectId(entity.getGuid(), typeName, uniqAttributes);
}
public static AtlasObjectId getAtlasObjectId(AtlasEntityHeader header) { public static AtlasObjectId getAtlasObjectId(AtlasEntityHeader header) {
return new AtlasObjectId(header.getGuid(), header.getTypeName()); return new AtlasObjectId(header.getGuid(), header.getTypeName());
} }
......
...@@ -63,12 +63,12 @@ public class TestAtlasStructType { ...@@ -63,12 +63,12 @@ public class TestAtlasStructType {
multiValuedAttribMin.setName(MULTI_VAL_ATTR_NAME_MIN); multiValuedAttribMin.setName(MULTI_VAL_ATTR_NAME_MIN);
multiValuedAttribMin.setTypeName(AtlasBaseTypeDef.getArrayTypeName(ATLAS_TYPE_INT)); multiValuedAttribMin.setTypeName(AtlasBaseTypeDef.getArrayTypeName(ATLAS_TYPE_INT));
multiValuedAttribMin.setCardinality(Cardinality.SET); multiValuedAttribMin.setCardinality(Cardinality.LIST);
multiValuedAttribMin.setValuesMinCount(MULTI_VAL_ATTR_MIN_COUNT); multiValuedAttribMin.setValuesMinCount(MULTI_VAL_ATTR_MIN_COUNT);
multiValuedAttribMax.setName(MULTI_VAL_ATTR_NAME_MAX); multiValuedAttribMax.setName(MULTI_VAL_ATTR_NAME_MAX);
multiValuedAttribMax.setTypeName(AtlasBaseTypeDef.getArrayTypeName(ATLAS_TYPE_INT)); multiValuedAttribMax.setTypeName(AtlasBaseTypeDef.getArrayTypeName(ATLAS_TYPE_INT));
multiValuedAttribMax.setCardinality(Cardinality.LIST); multiValuedAttribMax.setCardinality(Cardinality.SET);
multiValuedAttribMax.setValuesMaxCount(MULTI_VAL_ATTR_MAX_COUNT); multiValuedAttribMax.setValuesMaxCount(MULTI_VAL_ATTR_MAX_COUNT);
AtlasStructDef structDef = ModelTestUtil.newStructDef(); AtlasStructDef structDef = ModelTestUtil.newStructDef();
......
...@@ -190,7 +190,7 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter { ...@@ -190,7 +190,7 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter {
if (entities != null) { if (entities != null) {
v2Value = entities; v2Value = entities;
attrType = new AtlasArrayType(entityType); attrType = new AtlasArrayType(entityType, arrayType.getMinCount(), arrayType.getMaxCount(), arrayType.getCardinality());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("{}: replaced objIdList with entityList", attr.getQualifiedName()); LOG.debug("{}: replaced objIdList with entityList", attr.getQualifiedName());
......
...@@ -22,6 +22,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; ...@@ -22,6 +22,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
public class AtlasEntityStream implements EntityStream { public class AtlasEntityStream implements EntityStream {
protected final AtlasEntitiesWithExtInfo entitiesWithExtInfo; protected final AtlasEntitiesWithExtInfo entitiesWithExtInfo;
...@@ -33,6 +34,10 @@ public class AtlasEntityStream implements EntityStream { ...@@ -33,6 +34,10 @@ public class AtlasEntityStream implements EntityStream {
this(new AtlasEntitiesWithExtInfo(entity), null); this(new AtlasEntitiesWithExtInfo(entity), null);
} }
public AtlasEntityStream(List<AtlasEntity> entities) {
this(new AtlasEntitiesWithExtInfo(entities), null);
}
public AtlasEntityStream(AtlasEntityWithExtInfo entityWithExtInfo) { public AtlasEntityStream(AtlasEntityWithExtInfo entityWithExtInfo) {
this(new AtlasEntitiesWithExtInfo(entityWithExtInfo), null); this(new AtlasEntitiesWithExtInfo(entityWithExtInfo), null);
} }
......
...@@ -34,6 +34,7 @@ import org.apache.atlas.listener.ActiveStateChangeHandler; ...@@ -34,6 +34,7 @@ import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification;
...@@ -79,7 +80,6 @@ import javax.inject.Inject; ...@@ -79,7 +80,6 @@ import javax.inject.Inject;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
...@@ -492,7 +492,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -492,7 +492,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
return; return;
} }
preProcessNotificationMessage(kafkaMsg); PreprocessorContext context = preProcessNotificationMessage(kafkaMsg);
if (isEmptyMessage(kafkaMsg)) { if (isEmptyMessage(kafkaMsg)) {
commit(kafkaMsg); commit(kafkaMsg);
...@@ -525,7 +525,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -525,7 +525,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasClient.API_V1.CREATE_ENTITY.getNormalizedPath()); AtlasClient.API_V1.CREATE_ENTITY.getNormalizedPath());
} }
createOrUpdate(entities, false); createOrUpdate(entities, false, context);
} }
break; break;
...@@ -546,7 +546,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -546,7 +546,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
// There should only be one root entity // There should only be one root entity
entities.getEntities().get(0).setGuid(guid); entities.getEntities().get(0).setGuid(guid);
createOrUpdate(entities, true); createOrUpdate(entities, true, context);
} }
break; break;
...@@ -579,7 +579,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -579,7 +579,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath()); AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
} }
createOrUpdate(entities, false); createOrUpdate(entities, false, context);
} }
break; break;
...@@ -593,7 +593,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -593,7 +593,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasClientV2.API_V2.CREATE_ENTITY.getNormalizedPath()); AtlasClientV2.API_V2.CREATE_ENTITY.getNormalizedPath());
} }
createOrUpdate(entities, false); createOrUpdate(entities, false, context);
} }
break; break;
...@@ -622,7 +622,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -622,7 +622,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath()); AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
} }
createOrUpdate(entities, false); createOrUpdate(entities, false, context);
} }
break; break;
...@@ -708,15 +708,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -708,15 +708,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
} }
} }
private void createOrUpdate(AtlasEntitiesWithExtInfo entities, boolean isPartialUpdate) throws AtlasBaseException { private void createOrUpdate(AtlasEntitiesWithExtInfo entities, boolean isPartialUpdate, PreprocessorContext context) throws AtlasBaseException {
List<AtlasEntity> entitiesList = entities.getEntities(); List<AtlasEntity> entitiesList = entities.getEntities();
AtlasEntityStream entityStream = new AtlasEntityStream(entities); AtlasEntityStream entityStream = new AtlasEntityStream(entities);
if (commitBatchSize <= 0 || entitiesList.size() <= commitBatchSize) { if (commitBatchSize <= 0 || entitiesList.size() <= commitBatchSize) {
atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate); EntityMutationResponse response = atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate);
} else {
Map<String, String> guidAssignments = new HashMap<>();
recordProcessedEntities(response, context);
} else {
for (int fromIdx = 0; fromIdx < entitiesList.size(); fromIdx += commitBatchSize) { for (int fromIdx = 0; fromIdx < entitiesList.size(); fromIdx += commitBatchSize) {
int toIndex = fromIdx + commitBatchSize; int toIndex = fromIdx + commitBatchSize;
...@@ -726,20 +726,30 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -726,20 +726,30 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
List<AtlasEntity> entitiesBatch = new ArrayList<>(entitiesList.subList(fromIdx, toIndex)); List<AtlasEntity> entitiesBatch = new ArrayList<>(entitiesList.subList(fromIdx, toIndex));
updateProcessedEntityReferences(entitiesBatch, guidAssignments); updateProcessedEntityReferences(entitiesBatch, context.getGuidAssignments());
AtlasEntitiesWithExtInfo batch = new AtlasEntitiesWithExtInfo(entitiesBatch); AtlasEntitiesWithExtInfo batch = new AtlasEntitiesWithExtInfo(entitiesBatch);
AtlasEntityStream batchStream = new AtlasEntityStream(batch, entityStream); AtlasEntityStream batchStream = new AtlasEntityStream(batch, entityStream);
EntityMutationResponse response = atlasEntityStore.createOrUpdate(batchStream, isPartialUpdate); EntityMutationResponse response = atlasEntityStore.createOrUpdate(batchStream, isPartialUpdate);
recordProcessedEntities(response, guidAssignments); recordProcessedEntities(response, context);
RequestContext.get().resetEntityGuidUpdates(); RequestContext.get().resetEntityGuidUpdates();
RequestContext.get().clearCache(); RequestContext.get().clearCache();
} }
} }
if (context != null) {
context.prepareForPostUpdate();
List<AtlasEntity> postUpdateEntities = context.getPostUpdateEntities();
if (CollectionUtils.isNotEmpty(postUpdateEntities)) {
atlasEntityStore.createOrUpdate(new AtlasEntityStream(postUpdateEntities), true);
}
}
} }
private void recordFailedMessages() { private void recordFailedMessages() {
...@@ -815,49 +825,51 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -815,49 +825,51 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
} }
} }
private void preProcessNotificationMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) { private PreprocessorContext preProcessNotificationMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) {
if (!preprocessEnabled) { PreprocessorContext context = null;
return;
}
PreprocessorContext context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs); if (preprocessEnabled) {
context = new PreprocessorContext(kafkaMsg, typeRegistry, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs);
if (context.isHivePreprocessEnabled()) { if (context.isHivePreprocessEnabled()) {
preprocessHiveTypes(context); preprocessHiveTypes(context);
} }
if (skipHiveColumnLineageHive20633) { if (skipHiveColumnLineageHive20633) {
skipHiveColumnLineage(context); skipHiveColumnLineage(context);
} }
if (rdbmsTypesRemoveOwnedRefAttrs) { if (rdbmsTypesRemoveOwnedRefAttrs) {
rdbmsTypeRemoveOwnedRefAttrs(context); rdbmsTypeRemoveOwnedRefAttrs(context);
} }
context.moveRegisteredReferredEntities(); context.moveRegisteredReferredEntities();
if (context.isHivePreprocessEnabled() && CollectionUtils.isNotEmpty(context.getEntities())) { if (context.isHivePreprocessEnabled() && CollectionUtils.isNotEmpty(context.getEntities())) {
// move hive_process and hive_column_lineage entities to end of the list // move hive_process and hive_column_lineage entities to end of the list
List<AtlasEntity> entities = context.getEntities(); List<AtlasEntity> entities = context.getEntities();
int count = entities.size(); int count = entities.size();
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
AtlasEntity entity = entities.get(i); AtlasEntity entity = entities.get(i);
switch (entity.getTypeName()) { switch (entity.getTypeName()) {
case TYPE_HIVE_PROCESS: case TYPE_HIVE_PROCESS:
case TYPE_HIVE_COLUMN_LINEAGE: case TYPE_HIVE_COLUMN_LINEAGE:
entities.remove(i--); entities.remove(i--);
entities.add(entity); entities.add(entity);
count--; count--;
break; break;
}
} }
}
if (entities.size() - count > 0) { if (entities.size() - count > 0) {
LOG.info("moved {} hive_process/hive_column_lineage entities to end of list (listSize={})", entities.size() - count, entities.size()); LOG.info("moved {} hive_process/hive_column_lineage entities to end of list (listSize={})", entities.size() - count, entities.size());
}
} }
} }
return context;
} }
private void rdbmsTypeRemoveOwnedRefAttrs(PreprocessorContext context) { private void rdbmsTypeRemoveOwnedRefAttrs(PreprocessorContext context) {
...@@ -1009,12 +1021,26 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -1009,12 +1021,26 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
return ret; return ret;
} }
private void recordProcessedEntities(EntityMutationResponse mutationResponse, Map<String, String> guidAssignments) { private void recordProcessedEntities(EntityMutationResponse mutationResponse, PreprocessorContext context) {
if (mutationResponse != null && MapUtils.isNotEmpty(mutationResponse.getGuidAssignments())) { if (mutationResponse != null && context != null) {
guidAssignments.putAll(mutationResponse.getGuidAssignments()); if (MapUtils.isNotEmpty(mutationResponse.getGuidAssignments())) {
context.getGuidAssignments().putAll(mutationResponse.getGuidAssignments());
}
if (LOG.isDebugEnabled()) { if (CollectionUtils.isNotEmpty(mutationResponse.getCreatedEntities())) {
LOG.debug("recordProcessedEntities: added {} guidAssignments. updatedSize={}", mutationResponse.getGuidAssignments().size(), guidAssignments.size()); for (AtlasEntityHeader entity : mutationResponse.getCreatedEntities()) {
if (entity != null && entity.getGuid() != null) {
context.getCreatedEntities().add(entity.getGuid());
}
}
}
if (CollectionUtils.isNotEmpty(mutationResponse.getDeletedEntities())) {
for (AtlasEntityHeader entity : mutationResponse.getDeletedEntities()) {
if (entity != null && entity.getGuid() != null) {
context.getDeletedEntities().add(entity.getGuid());
}
}
} }
} }
} }
......
...@@ -53,6 +53,7 @@ public abstract class EntityPreprocessor { ...@@ -53,6 +53,7 @@ public abstract class EntityPreprocessor {
public static final String ATTRIBUTE_TABLES = "tables"; public static final String ATTRIBUTE_TABLES = "tables";
public static final String ATTRIBUTE_INDEXES = "indexes"; public static final String ATTRIBUTE_INDEXES = "indexes";
public static final String ATTRIBUTE_FOREIGN_KEYS = "foreign_keys"; public static final String ATTRIBUTE_FOREIGN_KEYS = "foreign_keys";
public static final String ATTRIBUTE_INSTANCE = "instance";
public static final char QNAME_SEP_CLUSTER_NAME = '@'; public static final char QNAME_SEP_CLUSTER_NAME = '@';
public static final char QNAME_SEP_ENTITY_NAME = '.'; public static final char QNAME_SEP_ENTITY_NAME = '.';
......
...@@ -18,8 +18,6 @@ ...@@ -18,8 +18,6 @@
package org.apache.atlas.notification.preprocessor; package org.apache.atlas.notification.preprocessor;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction; import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -27,16 +25,14 @@ import org.slf4j.LoggerFactory; ...@@ -27,16 +25,14 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set;
public class HivePreprocessor { public class HivePreprocessor {
private static final Logger LOG = LoggerFactory.getLogger(HivePreprocessor.class); private static final Logger LOG = LoggerFactory.getLogger(HivePreprocessor.class);
private static final String RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS = "hive_table_columns"; private static final String RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS = "hive_table_columns";
private static final String RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS = "hive_table_partitionkeys"; private static final String RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS = "hive_table_partitionkeys";
private static final String RELATIONSHIP_TYPE_HIVE_TABLE_STORAGEDESC = "hive_table_storagedesc";
static class HiveTablePreprocessor extends EntityPreprocessor { static class HiveTablePreprocessor extends EntityPreprocessor {
public HiveTablePreprocessor() { public HiveTablePreprocessor() {
...@@ -67,63 +63,12 @@ public class HivePreprocessor { ...@@ -67,63 +63,12 @@ public class HivePreprocessor {
entity.setAttribute(ATTRIBUTE_COLUMNS, null); entity.setAttribute(ATTRIBUTE_COLUMNS, null);
entity.setAttribute(ATTRIBUTE_PARTITION_KEYS, null); entity.setAttribute(ATTRIBUTE_PARTITION_KEYS, null);
} else if (context.getHiveTypesRemoveOwnedRefAttrs()) { } else if (context.getHiveTypesRemoveOwnedRefAttrs()) {
context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_SD); context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_SD, RELATIONSHIP_TYPE_HIVE_TABLE_STORAGEDESC, ATTRIBUTE_TABLE);
context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_COLUMNS, RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS, ATTRIBUTE_TABLE);
removeColumnsAttributeAndRegisterToMove(entity, ATTRIBUTE_COLUMNS, RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS, context); context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_PARTITION_KEYS, RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS, ATTRIBUTE_TABLE);
removeColumnsAttributeAndRegisterToMove(entity, ATTRIBUTE_PARTITION_KEYS, RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS, context);
} }
} }
} }
private void removeColumnsAttributeAndRegisterToMove(AtlasEntity entity, String attrName, String relationshipType, PreprocessorContext context) {
Object attrVal = entity.removeAttribute(attrName);
if (attrVal != null) {
Set<String> guids = new HashSet<>();
context.collectGuids(attrVal, guids);
for (String guid : guids) {
AtlasEntity colEntity = context.getEntity(guid);
if (colEntity != null) {
Object attrTable = null;
if (colEntity.hasRelationshipAttribute(ATTRIBUTE_TABLE)) {
attrTable = colEntity.getRelationshipAttribute(ATTRIBUTE_TABLE);
} else if (colEntity.hasAttribute(ATTRIBUTE_TABLE)) {
attrTable = colEntity.getAttribute(ATTRIBUTE_TABLE);
}
attrTable = setRelationshipType(attrTable, relationshipType);
if (attrTable != null) {
colEntity.setRelationshipAttribute(ATTRIBUTE_TABLE, attrTable);
}
context.addToReferredEntitiesToMove(guid);
}
}
}
}
private AtlasRelatedObjectId setRelationshipType(Object attr, String relationshipType) {
AtlasRelatedObjectId ret = null;
if (attr instanceof AtlasRelatedObjectId) {
ret = (AtlasRelatedObjectId) attr;
} else if (attr instanceof AtlasObjectId) {
ret = new AtlasRelatedObjectId((AtlasObjectId) attr);
} else if (attr instanceof Map) {
ret = new AtlasRelatedObjectId((Map) attr);
}
if (ret != null) {
ret.setRelationshipType(relationshipType);
}
return ret;
}
} }
......
...@@ -21,18 +21,27 @@ import org.apache.atlas.kafka.AtlasKafkaMessage; ...@@ -21,18 +21,27 @@ import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.ListIterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import static org.apache.atlas.model.instance.AtlasObjectId.KEY_GUID;
public class PreprocessorContext { public class PreprocessorContext {
private static final Logger LOG = LoggerFactory.getLogger(PreprocessorContext.class); private static final Logger LOG = LoggerFactory.getLogger(PreprocessorContext.class);
...@@ -40,6 +49,7 @@ public class PreprocessorContext { ...@@ -40,6 +49,7 @@ public class PreprocessorContext {
public enum PreprocessAction { NONE, IGNORE, PRUNE } public enum PreprocessAction { NONE, IGNORE, PRUNE }
private final AtlasKafkaMessage<HookNotification> kafkaMessage; private final AtlasKafkaMessage<HookNotification> kafkaMessage;
private final AtlasTypeRegistry typeRegistry;
private final AtlasEntitiesWithExtInfo entitiesWithExtInfo; private final AtlasEntitiesWithExtInfo entitiesWithExtInfo;
private final List<Pattern> hiveTablesToIgnore; private final List<Pattern> hiveTablesToIgnore;
private final List<Pattern> hiveTablesToPrune; private final List<Pattern> hiveTablesToPrune;
...@@ -49,9 +59,14 @@ public class PreprocessorContext { ...@@ -49,9 +59,14 @@ public class PreprocessorContext {
private final Set<String> ignoredEntities = new HashSet<>(); private final Set<String> ignoredEntities = new HashSet<>();
private final Set<String> prunedEntities = new HashSet<>(); private final Set<String> prunedEntities = new HashSet<>();
private final Set<String> referredEntitiesToMove = new HashSet<>(); private final Set<String> referredEntitiesToMove = new HashSet<>();
private final Set<String> createdEntities = new HashSet<>();
private final Set<String> deletedEntities = new HashSet<>();
private final Map<String, String> guidAssignments = new HashMap<>();
private List<AtlasEntity> postUpdateEntities = null;
public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs) { public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs) {
this.kafkaMessage = kafkaMessage; this.kafkaMessage = kafkaMessage;
this.typeRegistry = typeRegistry;
this.hiveTablesToIgnore = hiveTablesToIgnore; this.hiveTablesToIgnore = hiveTablesToIgnore;
this.hiveTablesToPrune = hiveTablesToPrune; this.hiveTablesToPrune = hiveTablesToPrune;
this.hiveTablesCache = hiveTablesCache; this.hiveTablesCache = hiveTablesCache;
...@@ -119,6 +134,14 @@ public class PreprocessorContext { ...@@ -119,6 +134,14 @@ public class PreprocessorContext {
public Set<String> getReferredEntitiesToMove() { return referredEntitiesToMove; } public Set<String> getReferredEntitiesToMove() { return referredEntitiesToMove; }
public Set<String> getCreatedEntities() { return createdEntities; }
public Set<String> getDeletedEntities() { return deletedEntities; }
public Map<String, String> getGuidAssignments() { return guidAssignments; }
public List<AtlasEntity> getPostUpdateEntities() { return postUpdateEntities; }
public PreprocessAction getPreprocessActionForHiveTable(String qualifiedName) { public PreprocessAction getPreprocessActionForHiveTable(String qualifiedName) {
PreprocessAction ret = PreprocessAction.NONE; PreprocessAction ret = PreprocessAction.NONE;
...@@ -199,12 +222,48 @@ public class PreprocessorContext { ...@@ -199,12 +222,48 @@ public class PreprocessorContext {
collectGuids(obj, prunedEntities); collectGuids(obj, prunedEntities);
} }
public void removeRefAttributeAndRegisterToMove(AtlasEntity entity, String attrName) { public void removeRefAttributeAndRegisterToMove(AtlasEntity entity, String attrName, String relationshipType, String refAttrName) {
Set<String> guidsToMove = new HashSet<>(); Object attrVal = entity.removeAttribute(attrName);
if (attrVal != null) {
AtlasRelatedObjectId entityId = null;
Set<String> guids = new HashSet<>();
collectGuids(attrVal, guids);
// removed attrVal might have elements removed (e.g. removed column); to handle this case register the entity for partial update
addToPostUpdate(entity, attrName, attrVal);
for (String guid : guids) {
AtlasEntity refEntity = getEntity(guid);
if (refEntity != null) {
Object refAttr = null;
if (refEntity.hasRelationshipAttribute(refAttrName)) {
refAttr = refEntity.getRelationshipAttribute(refAttrName);
} else if (refEntity.hasAttribute(refAttrName)) {
refAttr = refEntity.getAttribute(refAttrName);
} else {
if (entityId == null) {
entityId = AtlasTypeUtil.toAtlasRelatedObjectId(entity, typeRegistry);
}
refAttr = entityId;
}
if (refAttr != null) {
refAttr = setRelationshipType(refAttr, relationshipType);
}
collectGuids(entity.removeAttribute(attrName), guidsToMove); if (refAttr != null) {
refEntity.setRelationshipAttribute(refAttrName, refAttr);
}
addToReferredEntitiesToMove(guidsToMove); addToReferredEntitiesToMove(guid);
}
}
}
} }
public void moveRegisteredReferredEntities() { public void moveRegisteredReferredEntities() {
...@@ -236,6 +295,32 @@ public class PreprocessorContext { ...@@ -236,6 +295,32 @@ public class PreprocessorContext {
} }
} }
public void prepareForPostUpdate() {
if (postUpdateEntities != null) {
ListIterator<AtlasEntity> iter = postUpdateEntities.listIterator();
while (iter.hasNext()) {
AtlasEntity entity = iter.next();
String assignedGuid = getAssignedGuid(entity.getGuid());
// no need to perform partial-update for entities that are created/deleted while processing this message
if (createdEntities.contains(assignedGuid) || deletedEntities.contains(assignedGuid)) {
iter.remove();
} else {
entity.setGuid(assignedGuid);
if (entity.getAttributes() != null) {
setAssignedGuids(entity.getAttributes().values());
}
if (entity.getRelationshipAttributes() != null) {
setAssignedGuids(entity.getRelationshipAttributes().values());
}
}
}
}
}
public String getTypeName(Object obj) { public String getTypeName(Object obj) {
Object ret = null; Object ret = null;
...@@ -258,7 +343,7 @@ public class PreprocessorContext { ...@@ -258,7 +343,7 @@ public class PreprocessorContext {
if (obj instanceof AtlasObjectId) { if (obj instanceof AtlasObjectId) {
ret = ((AtlasObjectId) obj).getGuid(); ret = ((AtlasObjectId) obj).getGuid();
} else if (obj instanceof Map) { } else if (obj instanceof Map) {
ret = ((Map) obj).get(AtlasObjectId.KEY_GUID); ret = ((Map) obj).get(KEY_GUID);
} else if (obj instanceof AtlasEntity) { } else if (obj instanceof AtlasEntity) {
ret = ((AtlasEntity) obj).getGuid(); ret = ((AtlasEntity) obj).getGuid();
} else if (obj instanceof AtlasEntity.AtlasEntityWithExtInfo) { } else if (obj instanceof AtlasEntity.AtlasEntityWithExtInfo) {
...@@ -274,7 +359,7 @@ public class PreprocessorContext { ...@@ -274,7 +359,7 @@ public class PreprocessorContext {
Collection objList = (Collection) obj; Collection objList = (Collection) obj;
for (Object objElem : objList) { for (Object objElem : objList) {
collectGuid(objElem, guids); collectGuids(objElem, guids);
} }
} else { } else {
collectGuid(obj, guids); collectGuid(obj, guids);
...@@ -304,4 +389,91 @@ public class PreprocessorContext { ...@@ -304,4 +389,91 @@ public class PreprocessorContext {
return ret; return ret;
} }
private AtlasRelatedObjectId setRelationshipType(Object attr, String relationshipType) {
AtlasRelatedObjectId ret = null;
if (attr instanceof AtlasRelatedObjectId) {
ret = (AtlasRelatedObjectId) attr;
} else if (attr instanceof AtlasObjectId) {
ret = new AtlasRelatedObjectId((AtlasObjectId) attr);
} else if (attr instanceof Map) {
ret = new AtlasRelatedObjectId((Map) attr);
}
if (ret != null) {
ret.setRelationshipType(relationshipType);
}
return ret;
}
private String getAssignedGuid(String guid) {
String ret = guidAssignments.get(guid);
return ret != null ? ret : guid;
}
private void setAssignedGuids(Object obj) {
if (obj != null) {
if (obj instanceof Collection) {
Collection objList = (Collection) obj;
for (Object objElem : objList) {
setAssignedGuids(objElem);
}
} else {
setAssignedGuid(obj);
}
}
}
private void setAssignedGuid(Object obj) {
if (obj instanceof AtlasRelatedObjectId) {
AtlasRelatedObjectId objId = (AtlasRelatedObjectId) obj;
objId.setGuid(getAssignedGuid(objId.getGuid()));
} else if (obj instanceof AtlasObjectId) {
AtlasObjectId objId = (AtlasObjectId) obj;
objId.setGuid(getAssignedGuid(objId.getGuid()));
} else if (obj instanceof Map) {
Map objId = (Map) obj;
Object guid = objId.get(KEY_GUID);
if (guid != null) {
objId.put(KEY_GUID, getAssignedGuid(guid.toString()));
}
}
}
private void addToPostUpdate(AtlasEntity entity, String attrName, Object attrVal) {
if (LOG.isDebugEnabled()) {
LOG.debug("addToPostUpdate(guid={}, entityType={}, attrName={}", entity.getGuid(), entity.getTypeName(), attrName);
}
AtlasEntity partialEntity = null;
if (postUpdateEntities == null) {
postUpdateEntities = new ArrayList<>();
}
for (AtlasEntity existing : postUpdateEntities) {
if (StringUtils.equals(entity.getGuid(), existing.getGuid())) {
partialEntity = existing;
break;
}
}
if (partialEntity == null) {
partialEntity = new AtlasEntity(entity.getTypeName(), attrName, attrVal);
partialEntity.setGuid(entity.getGuid());
postUpdateEntities.add(partialEntity);
} else {
partialEntity.setAttribute(attrName, attrVal);
}
}
} }
...@@ -32,6 +32,12 @@ import java.util.Set; ...@@ -32,6 +32,12 @@ import java.util.Set;
public class RdbmsPreprocessor { public class RdbmsPreprocessor {
private static final Logger LOG = LoggerFactory.getLogger(RdbmsPreprocessor.class); private static final Logger LOG = LoggerFactory.getLogger(RdbmsPreprocessor.class);
private static final String RELATIONSHIP_TYPE_RDBMS_INSTANCE_DATABASES = "rdbms_instance_databases";
private static final String RELATIONSHIP_TYPE_RDBMS_DB_TABLES = "rdbms_db_tables";
private static final String RELATIONSHIP_TYPE_RDBMS_TABLE_COLUMNS = "rdbms_table_columns";
private static final String RELATIONSHIP_TYPE_RDBMS_TABLE_INDEXES = "rdbms_table_indexes";
private static final String RELATIONSHIP_TYPE_RDBMS_TABLE_FOREIGN_KEYS = "rdbms_table_foreign_key";
static class RdbmsInstancePreprocessor extends RdbmsTypePreprocessor { static class RdbmsInstancePreprocessor extends RdbmsTypePreprocessor {
public RdbmsInstancePreprocessor() { public RdbmsInstancePreprocessor() {
super(TYPE_RDBMS_INSTANCE); super(TYPE_RDBMS_INSTANCE);
...@@ -121,17 +127,17 @@ public class RdbmsPreprocessor { ...@@ -121,17 +127,17 @@ public class RdbmsPreprocessor {
private void clearRefAttributesAndMove(AtlasEntity entity, PreprocessorContext context) { private void clearRefAttributesAndMove(AtlasEntity entity, PreprocessorContext context) {
switch (entity.getTypeName()) { switch (entity.getTypeName()) {
case TYPE_RDBMS_INSTANCE: case TYPE_RDBMS_INSTANCE:
context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_DATABASES); context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_DATABASES, RELATIONSHIP_TYPE_RDBMS_INSTANCE_DATABASES, ATTRIBUTE_INSTANCE);
break; break;
case TYPE_RDBMS_DB: case TYPE_RDBMS_DB:
context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_TABLES); context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_TABLES, RELATIONSHIP_TYPE_RDBMS_DB_TABLES, ATTRIBUTE_DB);
break; break;
case TYPE_RDBMS_TABLE: case TYPE_RDBMS_TABLE:
context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_COLUMNS); context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_COLUMNS, RELATIONSHIP_TYPE_RDBMS_TABLE_COLUMNS, ATTRIBUTE_TABLE);
context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_INDEXES); context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_INDEXES, RELATIONSHIP_TYPE_RDBMS_TABLE_INDEXES, ATTRIBUTE_TABLE);
context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_FOREIGN_KEYS); context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_FOREIGN_KEYS, RELATIONSHIP_TYPE_RDBMS_TABLE_FOREIGN_KEYS, ATTRIBUTE_TABLE);
break; break;
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment