Commit 0f689faa by Sarath Subramanian

ATLAS-2510: Add support to disable/enable propagated classification in entity

parent 99318030
......@@ -121,6 +121,7 @@ public final class Constants {
public static final String CLASSIFICATION_VERTEX_PROPAGATE_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "propagate";
public static final String CLASSIFICATION_EDGE_NAME_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "name";
public static final String CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "isPropagated";
public static final String CLASSIFICATION_EDGE_STATE_PROPERTY_KEY = STATE_PROPERTY_KEY;
public static final String CLASSIFICATION_LABEL = "classifiedAs";
private Constants() {
......
......@@ -127,6 +127,7 @@ public enum AtlasErrorCode {
CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY(400, "ATLAS-400-00-06D", "Classification {0} is not associated with entity"),
NO_CLASSIFICATIONS_FOUND_FOR_ENTITY(400, "ATLAS-400-00-06E", "No classifications associated with entity: {0}"),
INVALID_CLASSIFICATION_PARAMS(400, "ATLAS-400-00-06F", "Invalid classification parameters passed for {0} operation for entity: {1}"),
PROPAGATED_CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY(400, "ATLAS-400-00-070", "Propagated classification {0} is not associated with entity"),
UNAUTHORIZED_ACCESS(403, "ATLAS-403-00-001", "{0} is not authorized to perform {1}"),
......
......@@ -57,6 +57,7 @@ public class AtlasClassification extends AtlasStruct implements Serializable {
private boolean propagate = true;
private List<TimeBoundary> validityPeriods = null;
public enum PropagationState { ACTIVE, DELETED }
public AtlasClassification() {
this(null, null);
......
......@@ -67,7 +67,6 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
public static final String KEY_UPDATE_TIME = "updateTime";
public static final String KEY_VERSION = "version";
/**
* Status of the entity - can be active or deleted. Deleted entities are not removed from Atlas store.
*/
......@@ -83,6 +82,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
private Map<String, Object> relationshipAttributes;
private List<AtlasClassification> classifications;
private List<AtlasClassification> propagationDisabledClassifications;
@JsonIgnore
private static AtomicLong s_nextId = new AtomicLong(System.nanoTime());
......@@ -165,6 +165,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
setUpdateTime(other.getUpdateTime());
setVersion(other.getVersion());
setClassifications(other.getClassifications());
setPropagationDisabledClassifications(other.getPropagationDisabledClassifications());
}
}
......@@ -259,6 +260,14 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
public void setClassifications(List<AtlasClassification> classifications) { this.classifications = classifications; }
public List<AtlasClassification> getPropagationDisabledClassifications() {
return propagationDisabledClassifications;
}
public void setPropagationDisabledClassifications(List<AtlasClassification> propagationDisabledClassifications) {
this.propagationDisabledClassifications = propagationDisabledClassifications;
}
public void addClassifications(List<AtlasClassification> classifications) {
List<AtlasClassification> c = this.classifications;
......@@ -279,6 +288,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
setCreateTime(null);
setUpdateTime(null);
setClassifications(null);
setPropagationDisabledClassifications(null);
}
private static String nextInternalId() {
......@@ -306,7 +316,9 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
sb.append(", classifications=[");
AtlasBaseTypeDef.dumpObjects(classifications, sb);
sb.append(']');
sb.append(", ");
sb.append(", propagationDisabledClassifications=[");
AtlasBaseTypeDef.dumpObjects(propagationDisabledClassifications, sb);
sb.append(']');
sb.append('}');
return sb;
......@@ -327,13 +339,14 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
Objects.equals(updateTime, that.updateTime) &&
Objects.equals(version, that.version) &&
Objects.equals(relationshipAttributes, that.relationshipAttributes) &&
Objects.equals(classifications, that.classifications);
Objects.equals(classifications, that.classifications) &&
Objects.equals(propagationDisabledClassifications, that.propagationDisabledClassifications);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), guid, status, createdBy, updatedBy, createTime, updateTime, version,
relationshipAttributes, classifications);
relationshipAttributes, classifications, propagationDisabledClassifications);
}
@Override
......@@ -715,4 +728,4 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
super(list, startIndex, pageSize, totalCount, sortType, sortBy);
}
}
}
}
\ No newline at end of file
......@@ -46,7 +46,7 @@ enum GremlinClause {
TEXT_CONTAINS("has('%s', org.janusgraph.core.attribute.Text.textRegex(%s))"),
TEXT_PREFIX("has('%s', org.janusgraph.core.attribute.Text.textPrefix(%s))"),
TEXT_SUFFIX("has('%s', org.janusgraph.core.attribute.Text.textRegex(\".*\" + %s))"),
TRAIT("outE('classifiedAs').has('__name', within('%s')).outV()"),
TRAIT("outE('classifiedAs').has('__name', within('%s')).has('__state', 'ACTIVE').outV()"),
SELECT_NOOP_FN("def f(r){ r }; "),
SELECT_FN("def f(r){ t=[[%s]]; %s r.each({t.add([%s])}); t.unique(); }; "),
SELECT_ONLY_AGG_FN("def f(r){ t=[[%s]]; %s t.add([%s]); t;}; "),
......
......@@ -157,4 +157,6 @@ public interface AtlasEntityStore {
List<AtlasClassification> getClassifications(String guid) throws AtlasBaseException;
AtlasClassification getClassification(String guid, String classificationName) throws AtlasBaseException;
void setPropagatedClassificationState(String guid, String classificationName, String sourceEntityGuid, boolean disablePropagation) throws AtlasBaseException;
}
......@@ -468,6 +468,23 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
@Override
@GraphTransaction
public void setPropagatedClassificationState(String entityGuid, String classificationName, String sourceEntityGuid, boolean disablePropagation) throws AtlasBaseException {
AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(entityGuid);
AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE_CLASSIFICATION, entityHeader, new AtlasClassification(classificationName)),
"change propagated classification state: guid=", entityGuid, ", classification=", classificationName);
if (LOG.isDebugEnabled()) {
LOG.debug("Toggle propagated classification={}, sourceEntityGuid={} for entity={}, disablePropagation={}", classificationName, sourceEntityGuid, entityGuid, disablePropagation);
}
GraphTransactionInterceptor.lockObjectAndReleasePostCommit(entityGuid);
entityGraphMapper.setPropagatedClassificationState(entityGuid, classificationName, sourceEntityGuid, disablePropagation);
}
@Override
@GraphTransaction
public void deleteClassifications(final String guid, final List<String> classificationNames) throws AtlasBaseException {
if (StringUtils.isEmpty(guid)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified");
......
......@@ -41,18 +41,19 @@ import org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdg
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import static org.apache.atlas.model.instance.AtlasClassification.PropagationState.ACTIVE;
import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY;
import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX;
import static org.apache.atlas.repository.graph.GraphHelper.addListProperty;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdge;
import static org.apache.atlas.repository.graph.GraphHelper.addToPropagatedTraitNames;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdgeState;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdges;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedEdges;
import static org.apache.atlas.repository.graph.GraphHelper.getTraitNames;
......@@ -367,7 +368,9 @@ public abstract class DeleteHandlerV1 {
getTypeName(propagatedEntityVertex), GraphHelper.getGuid(propagatedEntityVertex), CLASSIFICATION_LABEL);
}
removeFromPropagatedTraitNames(propagatedEntityVertex, classificationName);
if (getClassificationEdgeState(propagatedEdge) == ACTIVE) {
removeFromPropagatedTraitNames(propagatedEntityVertex, classificationName);
}
deleteEdge(propagatedEdge, true);
......@@ -390,7 +393,7 @@ public abstract class DeleteHandlerV1 {
entityVertex.removeProperty(PROPAGATED_TRAIT_NAMES_PROPERTY_KEY);
for (String propagatedTraitName : propagatedTraitNames) {
addListProperty(entityVertex, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, propagatedTraitName);
addToPropagatedTraitNames(entityVertex, propagatedTraitName);
}
}
}
......
......@@ -25,6 +25,7 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TimeBoundary;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasClassification.PropagationState;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader;
......@@ -74,20 +75,23 @@ import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DE
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality.SET;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_STATE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TRAIT_NAMES_PROPERTY_KEY;
import static org.apache.atlas.repository.graph.GraphHelper.addListProperty;
import static org.apache.atlas.repository.graph.GraphHelper.addToPropagatedTraitNames;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdge;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdgeState;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertex;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedEntities;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedClassificationEdge;
import static org.apache.atlas.repository.graph.GraphHelper.getTraitLabel;
import static org.apache.atlas.repository.graph.GraphHelper.getTraitNames;
import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
import static org.apache.atlas.repository.graph.GraphHelper.getTypeNames;
import static org.apache.atlas.repository.graph.GraphHelper.isPropagationEnabled;
import static org.apache.atlas.repository.graph.GraphHelper.isRelationshipEdge;
import static org.apache.atlas.repository.graph.GraphHelper.removeFromPropagatedTraitNames;
import static org.apache.atlas.repository.graph.GraphHelper.string;
import static org.apache.atlas.repository.graph.GraphHelper.updateModificationMetadata;
import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getIdFromVertex;
......@@ -1443,10 +1447,10 @@ public class EntityGraphMapper {
// remove classifications from associated entity
if (LOG.isDebugEnabled()) {
LOG.debug("Removing classification: [{}] from: [{}][{}] with edge label: [{}]", classificationName,
getTypeName(entityVertex), entityGuid, CLASSIFICATION_LABEL);
getTypeName(entityVertex), entityGuid, CLASSIFICATION_LABEL);
}
AtlasEdge edge = getClassificationEdge(entityVertex, classificationName);
AtlasEdge edge = getClassificationEdge(entityVertex, classificationVertex);
deleteHandler.deleteEdgeReference(edge, CLASSIFICATION, false, true, entityVertex);
......@@ -1626,6 +1630,46 @@ public class EntityGraphMapper {
}
}
public void setPropagatedClassificationState(String entityGuid, String classificationName, String sourceEntityGuid, boolean disablePropagation) throws AtlasBaseException {
AtlasVertex entityVertex = AtlasGraphUtilsV1.findByGuid(entityGuid);
if (entityVertex == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, entityGuid);
}
AtlasEdge propagatedEdge = getPropagatedClassificationEdge(entityVertex, classificationName, sourceEntityGuid);
if (propagatedEdge == null) {
throw new AtlasBaseException(AtlasErrorCode.PROPAGATED_CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY, classificationName);
}
PropagationState currentState = getClassificationEdgeState(propagatedEdge);
PropagationState updatedState = (disablePropagation) ? PropagationState.DELETED : PropagationState.ACTIVE;
if (currentState != updatedState) {
AtlasGraphUtilsV1.setProperty(propagatedEdge, CLASSIFICATION_EDGE_STATE_PROPERTY_KEY, updatedState);
if (disablePropagation) {
removeFromPropagatedTraitNames(entityVertex, classificationName);
} else {
addToPropagatedTraitNames(entityVertex, classificationName);
}
updateModificationMetadata(entityVertex);
AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid);
AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
if (updatedState == PropagationState.DELETED) {
entityChangeNotifier.onClassificationDeletedFromEntity(entity, Collections.singletonList(classificationName));
} else {
AtlasClassification classification = entityRetriever.toAtlasClassification(propagatedEdge.getInVertex());
entityChangeNotifier.onClassificationAddedToEntity(entity, Collections.singletonList(classification));
}
}
}
private List<AtlasVertex> addTagPropagation(AtlasVertex classificationVertex, List<AtlasVertex> propagatedEntityVertices) {
List<AtlasVertex> ret = null;
......@@ -1634,6 +1678,12 @@ public class EntityGraphMapper {
AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classificationName);
for (AtlasVertex propagatedEntityVertex : propagatedEntityVertices) {
AtlasEdge existingEdge = getPropagatedClassificationEdge(propagatedEntityVertex, classificationVertex);
if (existingEdge != null) {
continue;
}
String entityTypeName = getTypeName(propagatedEntityVertex);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName);
......@@ -1651,7 +1701,7 @@ public class EntityGraphMapper {
graphHelper.addClassificationEdge(propagatedEntityVertex, classificationVertex, true);
addListProperty(propagatedEntityVertex, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, classificationName);
addToPropagatedTraitNames(propagatedEntityVertex, classificationName);
}
}
}
......@@ -1679,7 +1729,7 @@ public class EntityGraphMapper {
// map all the attributes to this newly created AtlasVertex
mapAttributes(classification, traitInstanceVertex, operation, context);
AtlasEdge ret = getClassificationEdge(parentInstanceVertex, getTypeName(traitInstanceVertex));
AtlasEdge ret = getClassificationEdge(parentInstanceVertex, traitInstanceVertex);
if (ret == null) {
ret = graphHelper.addClassificationEdge(parentInstanceVertex, traitInstanceVertex, false);
......
......@@ -36,13 +36,13 @@ import static org.testng.Assert.fail;
public class GremlinQueryComposerTest {
@Test
public void classification() {
String expected = "g.V().outE('classifiedAs').has('__name', within('PII')).outV().dedup().limit(25).toList()";
String expected = "g.V().outE('classifiedAs').has('__name', within('PII')).has('__state', 'ACTIVE').outV().dedup().limit(25).toList()";
verify("PII", expected);
}
@Test()
public void dimension() {
String expected = "g.V().has('__typeName', 'Table').outE('classifiedAs').has('__name', within('Dimension')).outV().dedup().limit(25).toList()";
String expected = "g.V().has('__typeName', 'Table').outE('classifiedAs').has('__name', within('Dimension')).has('__state', 'ACTIVE').outV().dedup().limit(25).toList()";
verify("Table isa Dimension", expected);
verify("Table is Dimension", expected);
verify("Table where Table is Dimension", expected);
......@@ -295,14 +295,14 @@ public class GremlinQueryComposerTest {
@Test
public void keywordsInWhereClause() {
verify("Table as t where t has name and t isa Dimension",
"g.V().has('__typeName', 'Table').as('t').and(__.has('Table.name'),__.outE('classifiedAs').has('__name', within('Dimension')).outV()).dedup().limit(25).toList()");
"g.V().has('__typeName', 'Table').as('t').and(__.has('Table.name'),__.outE('classifiedAs').has('__name', within('Dimension')).has('__state', 'ACTIVE').outV()).dedup().limit(25).toList()");
verify("Table as t where t has name and t.name = 'sales_fact'",
"g.V().has('__typeName', 'Table').as('t').and(__.has('Table.name'),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
verify("Table as t where t is Dimension and t.name = 'sales_fact'",
"g.V().has('__typeName', 'Table').as('t').and(__.outE('classifiedAs').has('__name', within('Dimension')).outV(),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
verify("Table isa 'Dimension' and name = 'sales_fact'", "g.V().has('__typeName', 'Table').and(__.outE('classifiedAs').has('__name', within('Dimension')).outV(),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
"g.V().has('__typeName', 'Table').as('t').and(__.outE('classifiedAs').has('__name', within('Dimension')).has('__state', 'ACTIVE').outV(),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
verify("Table isa 'Dimension' and name = 'sales_fact'", "g.V().has('__typeName', 'Table').and(__.outE('classifiedAs').has('__name', within('Dimension')).has('__state', 'ACTIVE').outV(),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
verify("Table has name and name = 'sales_fact'", "g.V().has('__typeName', 'Table').and(__.has('Table.name'),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
verify("Table is 'Dimension' and Table has owner and name = 'sales_fact'", "g.V().has('__typeName', 'Table').and(__.outE('classifiedAs').has('__name', within('Dimension')).outV(),__.has('Table.owner'),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
verify("Table is 'Dimension' and Table has owner and name = 'sales_fact'", "g.V().has('__typeName', 'Table').and(__.outE('classifiedAs').has('__name', within('Dimension')).has('__state', 'ACTIVE').outV(),__.has('Table.owner'),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
verify("Table has name and Table has owner and name = 'sales_fact'", "g.V().has('__typeName', 'Table').and(__.has('Table.name'),__.has('Table.owner'),__.has('Table.name', eq('sales_fact'))).dedup().limit(25).toList()");
}
......
......@@ -475,6 +475,52 @@ public class EntityREST {
}
}
/**
* Disable/Enable propagated classification for an existing entity represented by its guid.
* @param guid globally unique identifier for the entity
* @param classificationName name of the propagated classification
* @param sourceEntityGuid source entity guid of the propagated classification
* @param disablePropagation disable/enable propagation
*/
@PUT
@Path("/guid/{guid}/propagatedClassifications/{classificationName}")
@Produces(Servlets.JSON_MEDIA_TYPE)
public void setPropagatedClassificationState(@PathParam("guid") String guid,
@PathParam("classificationName") final String classificationName,
@QueryParam("sourceEntityGuid") String sourceEntityGuid,
@QueryParam("disablePropagation") boolean disablePropagation) throws AtlasBaseException {
Servlets.validateQueryParamLength("guid", guid);
Servlets.validateQueryParamLength("classificationName", classificationName);
Servlets.validateQueryParamLength("sourceEntityGuid", sourceEntityGuid);
AtlasPerfTracer perf = null;
try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityREST.setPropagatedClassificationState(" + guid + "," + classificationName + "," + sourceEntityGuid + "," + disablePropagation + ")");
}
if (StringUtils.isEmpty(guid)) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
}
if (StringUtils.isEmpty(classificationName)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "propagated classification not specified");
}
if (StringUtils.isEmpty(sourceEntityGuid)) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, sourceEntityGuid);
}
ensureClassificationType(classificationName);
entitiesStore.setPropagatedClassificationState(guid, classificationName, sourceEntityGuid, disablePropagation);
} finally {
AtlasPerfTracer.log(perf);
}
}
/******************************************************************/
/** Bulk API operations **/
/******************************************************************/
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment