Commit 8ae61e0f by Sarath Subramanian

ATLAS-2552: Add propagated classifications and blocked propagated…

ATLAS-2552: Add propagated classifications and blocked propagated classifications in GET releationship REST API
parent 71965e31
......@@ -65,6 +65,8 @@ public final class Constants {
public static final String RELATIONSHIPTYPE_END2_KEY = "endDef2";
public static final String RELATIONSHIPTYPE_CATEGORY_KEY = "relationshipCategory";
public static final String RELATIONSHIPTYPE_TAG_PROPAGATION_KEY = "tagPropagation";
public static final String RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY = "blockedPropagatedClassifications";
/**
* Trait names property key and index name.
*/
......@@ -119,6 +121,7 @@ public final class Constants {
public static final String CLASSIFICATION_ENTITY_GUID = INTERNAL_PROPERTY_KEY_PREFIX + "entityGuid";
public static final String CLASSIFICATION_VALIDITY_PERIODS_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "validityPeriods";
public static final String CLASSIFICATION_VERTEX_PROPAGATE_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "propagate";
public static final String CLASSIFICATION_VERTEX_NAME_KEY = TYPE_NAME_PROPERTY_KEY;
public static final String CLASSIFICATION_EDGE_NAME_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "name";
public static final String CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "isPropagated";
public static final String CLASSIFICATION_EDGE_STATE_PROPERTY_KEY = STATE_PROPERTY_KEY;
......
......@@ -128,9 +128,9 @@ public enum AtlasErrorCode {
NO_CLASSIFICATIONS_FOUND_FOR_ENTITY(400, "ATLAS-400-00-06E", "No classifications associated with entity: {0}"),
INVALID_CLASSIFICATION_PARAMS(400, "ATLAS-400-00-06F", "Invalid classification parameters passed for {0} operation for entity: {1}"),
PROPAGATED_CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY(400, "ATLAS-400-00-070", "Propagated classification {0} is not associated with entity"),
// Glossary Related errors
INVALID_PARTIAL_UPDATE_ATTR_VAL(400, "ATLAS-400-00-071", "Invalid attrVal for partial update of {0}, expected = {1} found {2}"),
MISSING_MANDATORY_ANCHOR(400, "ATLAS-400-00-072", "Mandatory anchor attribute is missing"),
INVALID_BLOCKED_PROPAGATED_CLASSIFICATION(400, "ATLAS-400-00-073", "Invalid propagated classification: {0} with entityGuid: {1} added to blocked propagated classifications."),
UNAUTHORIZED_ACCESS(403, "ATLAS-403-00-001", "{0} is not authorized to perform {1}"),
......
......@@ -24,12 +24,15 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.atlas.model.typedef.AtlasRelationshipDef;
import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags;
import org.apache.commons.collections.CollectionUtils;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
......@@ -61,6 +64,9 @@ public class AtlasRelationship extends AtlasStruct implements Serializable {
public static final String KEY_LABEL = "label";
public static final String KEY_PROPAGATE_TAGS = "propagateTags";
public static final String KEY_BLOCKED_PROPAGATED_CLASSIFICATIONS = "blockedPropagatedClassifications";
public static final String KEY_PROPAGATED_CLASSIFICATIONS = "propagatedClassifications";
private String guid = null;
private AtlasObjectId end1 = null;
private AtlasObjectId end2 = null;
......@@ -75,6 +81,9 @@ public class AtlasRelationship extends AtlasStruct implements Serializable {
public enum Status { ACTIVE, DELETED }
private List<AtlasClassification> propagatedClassifications;
private List<AtlasClassification> blockedPropagatedClassifications;
@JsonIgnore
private static AtomicLong s_nextId = new AtomicLong(System.nanoTime());
......@@ -97,13 +106,13 @@ public class AtlasRelationship extends AtlasStruct implements Serializable {
public AtlasRelationship(String typeName, AtlasObjectId end1, AtlasObjectId end2) {
super(typeName);
init(nextInternalId(), end1, end2, null, null, null, null, null, null, null, 0L);
init(nextInternalId(), end1, end2, null, null, null, null, null, null, null, 0L, null, null);
}
public AtlasRelationship(String typeName, AtlasObjectId end1, AtlasObjectId end2, Map<String, Object> attributes) {
super(typeName, attributes);
init(nextInternalId(), end1, end2, null, null, null, null, null, null, null, 0L);
init(nextInternalId(), end1, end2, null, null, null, null, null, null, null, 0L, null, null);
}
public AtlasRelationship(String typeName, String attrName, Object attrValue) {
......@@ -132,6 +141,9 @@ public class AtlasRelationship extends AtlasStruct implements Serializable {
Object updateTime = map.get(KEY_UPDATE_TIME);
Object version = map.get(KEY_VERSION);
Object propagatedClassifications = map.get(KEY_PROPAGATED_CLASSIFICATIONS);
Object blockedPropagatedClassifications = map.get(KEY_BLOCKED_PROPAGATED_CLASSIFICATIONS);
if (oGuid != null) {
setGuid(oGuid.toString());
}
......@@ -183,6 +195,30 @@ public class AtlasRelationship extends AtlasStruct implements Serializable {
if (version instanceof Number) {
setVersion(((Number) version).longValue());
}
if (CollectionUtils.isNotEmpty((List) propagatedClassifications)) {
this.propagatedClassifications = new ArrayList<>();
for (Object elem : (List) propagatedClassifications) {
if (elem instanceof AtlasClassification) {
this.propagatedClassifications.add((AtlasClassification) elem);
} else if (elem instanceof Map) {
this.propagatedClassifications.add(new AtlasClassification((Map) elem));
}
}
}
if (CollectionUtils.isNotEmpty((List) blockedPropagatedClassifications)) {
this.blockedPropagatedClassifications = new ArrayList<>();
for (Object elem : (List) blockedPropagatedClassifications) {
if (elem instanceof AtlasClassification) {
this.blockedPropagatedClassifications.add((AtlasClassification) elem);
} else if (elem instanceof Map) {
this.blockedPropagatedClassifications.add(new AtlasClassification((Map) elem));
}
}
}
}
}
......@@ -190,8 +226,8 @@ public class AtlasRelationship extends AtlasStruct implements Serializable {
super(other);
if (other != null) {
init(other.guid, other.end1, other.end2, other.label, other.propagateTags, other.status,
other.createdBy, other.updatedBy, other.createTime, other.updateTime, other.version);
init(other.guid, other.end1, other.end2, other.label, other.propagateTags, other.status, other.createdBy, other.updatedBy,
other.createTime, other.updateTime, other.version, other.propagatedClassifications, other.blockedPropagatedClassifications);
}
}
......@@ -271,12 +307,29 @@ public class AtlasRelationship extends AtlasStruct implements Serializable {
return "-" + Long.toString(s_nextId.getAndIncrement());
}
public List<AtlasClassification> getPropagatedClassifications() {
return propagatedClassifications;
}
public void setPropagatedClassifications(List<AtlasClassification> propagatedClassifications) {
this.propagatedClassifications = propagatedClassifications;
}
public List<AtlasClassification> getBlockedPropagatedClassifications() {
return blockedPropagatedClassifications;
}
public void setBlockedPropagatedClassifications(List<AtlasClassification> blockedPropagatedClassifications) {
this.blockedPropagatedClassifications = blockedPropagatedClassifications;
}
private void init() {
init(nextInternalId(), null, null, null, null, null, null, null, null, null, 0L);
init(nextInternalId(), null, null, null, null, null, null, null, null, null, 0L, null, null);
}
private void init(String guid, AtlasObjectId end1, AtlasObjectId end2, String label, PropagateTags propagateTags,
Status status, String createdBy, String updatedBy, Date createTime, Date updateTime, Long version) {
Status status, String createdBy, String updatedBy, Date createTime, Date updateTime, Long version,
List<AtlasClassification> propagatedClassifications, List<AtlasClassification> blockedPropagatedClassifications) {
setGuid(guid);
setEnd1(end1);
setEnd2(end2);
......@@ -288,6 +341,8 @@ public class AtlasRelationship extends AtlasStruct implements Serializable {
setCreateTime(createTime);
setUpdateTime(updateTime);
setVersion(version);
setPropagatedClassifications(propagatedClassifications);
setBlockedPropagatedClassifications(blockedPropagatedClassifications);
}
@Override
......@@ -309,6 +364,12 @@ public class AtlasRelationship extends AtlasStruct implements Serializable {
dumpDateField(", createTime=", createTime, sb);
dumpDateField(", updateTime=", updateTime, sb);
sb.append(", version=").append(version);
sb.append(", propagatedClassifications=[");
dumpObjects(propagatedClassifications, sb);
sb.append("]");
sb.append(", blockedPropagatedClassifications=[");
dumpObjects(blockedPropagatedClassifications, sb);
sb.append("]");
sb.append('}');
return sb;
......@@ -331,13 +392,15 @@ public class AtlasRelationship extends AtlasStruct implements Serializable {
Objects.equals(updatedBy, that.updatedBy) &&
Objects.equals(createTime, that.createTime) &&
Objects.equals(updateTime, that.updateTime) &&
Objects.equals(version, that.version);
Objects.equals(version, that.version) &&
Objects.equals(propagatedClassifications, that.propagatedClassifications) &&
Objects.equals(blockedPropagatedClassifications, that.blockedPropagatedClassifications);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), guid, end1, end2, label, propagateTags,
status, createdBy, updatedBy, createTime, updateTime, version);
return Objects.hash(super.hashCode(), guid, end1, end2, label, propagateTags, status, createdBy, updatedBy,
createTime, updateTime, version, propagatedClassifications, blockedPropagatedClassifications);
}
@Override
......
......@@ -1060,6 +1060,18 @@ public final class GraphHelper {
return ret;
}
public static List<String> getBlockedClassificationIds(AtlasEdge edge) {
List<String> ret = null;
if (edge != null) {
List<String> classificationIds = AtlasGraphUtilsV1.getProperty(edge, Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY, List.class);
ret = CollectionUtils.isNotEmpty(classificationIds) ? classificationIds : Collections.emptyList();
}
return ret;
}
public static PropagateTags getPropagateTags(AtlasElement element) {
String propagateTags = element.getProperty(Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, String.class);
......
......@@ -21,6 +21,7 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelationship;
......@@ -62,7 +63,8 @@ import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.BOTH;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.TWO_TO_ONE;
import static org.apache.atlas.repository.graph.GraphHelper.getGuid;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_NAME_KEY;
import static org.apache.atlas.repository.graph.GraphHelper.getOutGoingEdgesByLabel;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags;
import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getIdFromVertex;
......@@ -312,7 +314,8 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
updateTagPropagations(relationshipEdge, relationship.getPropagateTags());
AtlasGraphUtilsV1.setProperty(relationshipEdge, Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, relationship.getPropagateTags().name());
// update blocked propagated classifications
handleBlockedClassifications(relationshipEdge, relationship.getBlockedPropagatedClassifications());
if (MapUtils.isNotEmpty(relationType.getAllAttributes())) {
for (AtlasAttribute attr : relationType.getAllAttributes().values()) {
......@@ -328,6 +331,53 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
return entityRetriever.mapEdgeToAtlasRelationship(relationshipEdge);
}
private void handleBlockedClassifications(AtlasEdge edge, List<AtlasClassification> blockedPropagatedClassifications) throws AtlasBaseException {
if (blockedPropagatedClassifications != null) {
List<AtlasVertex> propagatedClassificationVertices = blockedPropagatedClassifications.isEmpty() ? null : entityRetriever.getClassificationVertices(edge);
List<String> classificationIds = new ArrayList<>();
for (AtlasClassification classification : blockedPropagatedClassifications) {
String classificationId = validateBlockedPropagatedClassification(propagatedClassificationVertices, classification);
// ignore invalid blocked propagated classification
if (classificationId == null) {
continue;
}
classificationIds.add(classificationId);
}
addToBlockedClassificationIds(edge, classificationIds);
}
}
// propagated classifications should contain blocked propagated classification
private String validateBlockedPropagatedClassification(List<AtlasVertex> classificationVertices, AtlasClassification classification) throws AtlasBaseException {
String ret = null;
for (AtlasVertex vertex : classificationVertices) {
String classificationName = AtlasGraphUtilsV1.getProperty(vertex, CLASSIFICATION_VERTEX_NAME_KEY, String.class);
String entityGuid = AtlasGraphUtilsV1.getProperty(vertex, CLASSIFICATION_ENTITY_GUID, String.class);
if (classificationName.equals(classification.getTypeName()) && entityGuid.equals(classification.getEntityGuid())) {
ret = vertex.getIdForDisplay();
break;
}
}
return ret;
}
private void addToBlockedClassificationIds(AtlasEdge edge, List<String> classificationIds) {
if (edge != null) {
if (classificationIds.isEmpty()) {
edge.removeProperty(Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY);
} else {
edge.setListProperty(Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY, classificationIds);
}
}
}
private void updateTagPropagations(AtlasEdge relationshipEdge, PropagateTags tagPropagation) throws AtlasBaseException {
PropagateTags oldTagPropagation = getPropagateTags(relationshipEdge);
PropagateTags newTagPropagation = tagPropagation;
......@@ -365,6 +415,8 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
entityRetriever.removeTagPropagation(relationshipEdge, ONE_TO_TWO);
}
}
AtlasGraphUtilsV1.setProperty(relationshipEdge, Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, newTagPropagation.name());
}
}
......@@ -571,6 +623,9 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
AtlasGraphUtilsV1.setProperty(ret, Constants.VERSION_PROPERTY_KEY, getRelationshipVersion(relationship));
AtlasGraphUtilsV1.setProperty(ret, Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, tagPropagation.name());
// blocked propagated classifications
handleBlockedClassifications(ret, relationship.getBlockedPropagatedClassifications());
// propagate tags
entityRetriever.addTagPropagation(ret, tagPropagation);
}
......
......@@ -74,6 +74,7 @@ import java.util.stream.Collectors;
import static org.apache.atlas.model.instance.AtlasClassification.PropagationState.ACTIVE;
import static org.apache.atlas.model.instance.AtlasClassification.PropagationState.DELETED;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.*;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX;
import static org.apache.atlas.repository.graph.GraphHelper.addToPropagatedTraitNames;
......@@ -81,6 +82,7 @@ import static org.apache.atlas.repository.graph.GraphHelper.getAdjacentEdgesByLa
import static org.apache.atlas.repository.graph.GraphHelper.getAllClassificationEdges;
import static org.apache.atlas.repository.graph.GraphHelper.getAllTraitNames;
import static org.apache.atlas.repository.graph.GraphHelper.getAssociatedEntityVertex;
import static org.apache.atlas.repository.graph.GraphHelper.getBlockedClassificationIds;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdge;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdgeState;
import static org.apache.atlas.repository.graph.GraphHelper.getGuid;
......@@ -459,10 +461,9 @@ public final class EntityGraphRetriever {
}
}
public List<AtlasClassification> getAllClassifications(AtlasVertex entityVertex) throws AtlasBaseException {
List<AtlasClassification> ret = new ArrayList<>();
Iterable edges = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL)
.has(CLASSIFICATION_EDGE_STATE_PROPERTY_KEY, ACTIVE.name()).edges();
public List<AtlasVertex> getPropagationEnabledClassificationVertices(AtlasVertex entityVertex) {
List<AtlasVertex> ret = new ArrayList<>();
Iterable edges = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL).edges();
if (edges != null) {
Iterator<AtlasEdge> iterator = edges.iterator();
......@@ -471,7 +472,11 @@ public final class EntityGraphRetriever {
AtlasEdge edge = iterator.next();
if (edge != null) {
ret.add(toAtlasClassification(edge.getInVertex()));
AtlasVertex classificationVertex = edge.getInVertex();
if (isPropagationEnabled(classificationVertex)) {
ret.add(classificationVertex);
}
}
}
}
......@@ -479,9 +484,9 @@ public final class EntityGraphRetriever {
return ret;
}
protected List<AtlasVertex> getPropagationEnabledClassificationVertices(AtlasVertex entityVertex) {
List<AtlasVertex> ret = new ArrayList<>();
Iterable edges = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL).edges();
public List<AtlasClassification> getAllClassifications(AtlasVertex entityVertex) throws AtlasBaseException {
List<AtlasClassification> ret = new ArrayList<>();
Iterable edges = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL).edges();
if (edges != null) {
Iterator<AtlasEdge> iterator = edges.iterator();
......@@ -490,11 +495,7 @@ public final class EntityGraphRetriever {
AtlasEdge edge = iterator.next();
if (edge != null) {
AtlasVertex classificationVertex = edge.getInVertex();
if (isPropagationEnabled(classificationVertex)) {
ret.add(classificationVertex);
}
ret.add(toAtlasClassification(edge.getInVertex()));
}
}
}
......@@ -897,7 +898,7 @@ public final class EntityGraphRetriever {
return ret;
}
private AtlasRelationship mapSystemAttributes(AtlasEdge edge, AtlasRelationship relationship) {
private AtlasRelationship mapSystemAttributes(AtlasEdge edge, AtlasRelationship relationship) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Mapping system attributes for relationship");
}
......@@ -929,9 +930,50 @@ public final class EntityGraphRetriever {
relationship.setLabel(edge.getLabel());
relationship.setPropagateTags(getPropagateTags(edge));
// set propagated and blocked propagated classifications
readClassificationsFromEdge(edge, relationship);
return relationship;
}
private void readClassificationsFromEdge(AtlasEdge edge, AtlasRelationship relationship) throws AtlasBaseException {
List<AtlasVertex> classificationVertices = getClassificationVertices(edge);
List<String> blockedClassificationIds = getBlockedClassificationIds(edge);
List<AtlasClassification> propagatedClassifications = new ArrayList<>();
List<AtlasClassification> blockedClassifications = new ArrayList<>();
for (AtlasVertex classificationVertex : classificationVertices) {
String classificationId = classificationVertex.getIdForDisplay();
if (blockedClassificationIds.contains(classificationId)) {
blockedClassifications.add(toAtlasClassification(classificationVertex));
} else {
propagatedClassifications.add(toAtlasClassification(classificationVertex));
}
}
relationship.setPropagatedClassifications(propagatedClassifications);
relationship.setBlockedPropagatedClassifications(blockedClassifications);
}
public List<AtlasVertex> getClassificationVertices(AtlasEdge edge) {
List<AtlasVertex> ret = new ArrayList<>();
if (edge != null) {
PropagateTags propagateTags = getPropagateTags(edge);
if (propagateTags == PropagateTags.ONE_TO_TWO || propagateTags == PropagateTags.BOTH) {
ret.addAll(getPropagationEnabledClassificationVertices(edge.getOutVertex()));
}
if (propagateTags == PropagateTags.TWO_TO_ONE || propagateTags == PropagateTags.BOTH) {
ret.addAll(getPropagationEnabledClassificationVertices(edge.getInVertex()));
}
}
return ret;
}
private void mapAttributes(AtlasEdge edge, AtlasRelationship relationship) throws AtlasBaseException {
AtlasType objType = typeRegistry.getType(relationship.getTypeName());
......@@ -957,7 +999,7 @@ public final class EntityGraphRetriever {
AtlasVertex outVertex = edge.getOutVertex();
AtlasVertex inVertex = edge.getInVertex();
if (propagateTags == PropagateTags.ONE_TO_TWO || propagateTags == PropagateTags.BOTH) {
if (propagateTags == ONE_TO_TWO || propagateTags == PropagateTags.BOTH) {
addTagPropagation(outVertex, inVertex, edge);
}
......@@ -974,7 +1016,7 @@ public final class EntityGraphRetriever {
AtlasVertex outVertex = edge.getOutVertex();
AtlasVertex inVertex = edge.getInVertex();
if (propagateTags == PropagateTags.ONE_TO_TWO || propagateTags == PropagateTags.BOTH) {
if (propagateTags == ONE_TO_TWO || propagateTags == PropagateTags.BOTH) {
removeTagPropagation(outVertex, inVertex, edge);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment