Commit bc093ca8 by Sarath Subramanian

ATLAS-2822: Provide option whether to delete propagated classification on entity…

ATLAS-2822: Provide option whether to delete propagated classification on entity delete during add classification
parent f0229909
......@@ -137,6 +137,7 @@ public final class Constants {
public static final String CLASSIFICATION_ENTITY_STATUS = INTERNAL_PROPERTY_KEY_PREFIX + "entityStatus";
public static final String CLASSIFICATION_VALIDITY_PERIODS_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "validityPeriods";
public static final String CLASSIFICATION_VERTEX_PROPAGATE_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "propagate";
public static final String CLASSIFICATION_VERTEX_REMOVE_PROPAGATIONS_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "removePropagations";
public static final String CLASSIFICATION_VERTEX_NAME_KEY = TYPE_NAME_PROPERTY_KEY;
public static final String CLASSIFICATION_EDGE_NAME_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "name";
public static final String CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "isPropagated";
......
......@@ -58,6 +58,7 @@ public class AtlasClassification extends AtlasStruct implements Serializable {
private Status entityStatus = Status.ACTIVE;
private Boolean propagate = null;
private List<TimeBoundary> validityPeriods = null;
private Boolean removePropagationsOnEntityDelete = null;
public AtlasClassification() {
this(null, null);
......@@ -87,6 +88,7 @@ public class AtlasClassification extends AtlasStruct implements Serializable {
setEntityStatus(other.getEntityStatus());
setPropagate(other.isPropagate());
setValidityPeriods(other.getValidityPeriods());
setRemovePropagationsOnEntityDelete(other.getRemovePropagationsOnEntityDelete());
}
}
......@@ -122,6 +124,14 @@ public class AtlasClassification extends AtlasStruct implements Serializable {
this.entityStatus = entityStatus;
}
public Boolean getRemovePropagationsOnEntityDelete() {
return removePropagationsOnEntityDelete;
}
public void setRemovePropagationsOnEntityDelete(Boolean removePropagationsOnEntityDelete) {
this.removePropagationsOnEntityDelete = removePropagationsOnEntityDelete;
}
@JsonIgnore
public void addValityPeriod(TimeBoundary validityPeriod) {
List<TimeBoundary> vpList = this.validityPeriods;
......@@ -142,6 +152,7 @@ public class AtlasClassification extends AtlasStruct implements Serializable {
if (!super.equals(o)) { return false; }
AtlasClassification that = (AtlasClassification) o;
return Objects.equals(propagate, that.propagate) &&
Objects.equals(removePropagationsOnEntityDelete, that.removePropagationsOnEntityDelete) &&
Objects.equals(entityGuid, that.entityGuid) &&
entityStatus == that.entityStatus &&
Objects.equals(validityPeriods, that.validityPeriods);
......@@ -149,7 +160,7 @@ public class AtlasClassification extends AtlasStruct implements Serializable {
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), entityGuid, entityStatus, propagate);
return Objects.hash(super.hashCode(), entityGuid, entityStatus, propagate, removePropagationsOnEntityDelete);
}
@Override
......@@ -159,6 +170,8 @@ public class AtlasClassification extends AtlasStruct implements Serializable {
sb.append("entityGuid='").append(entityGuid).append('\'');
sb.append(", entityStatus=").append(entityStatus);
sb.append(", propagate=").append(propagate);
sb.append(", removePropagationsOnEntityDelete=").append(removePropagationsOnEntityDelete);
sb.append(", validityPeriods=").append(validityPeriods);
sb.append(", validityPeriods=").append(validityPeriods);
sb.append('}');
return sb.toString();
......
......@@ -85,6 +85,7 @@ import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_NAME_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_NAME_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_PROPAGATE_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_REMOVE_PROPAGATIONS_KEY;
import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isReference;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.BOTH;
......@@ -105,14 +106,17 @@ public final class GraphHelper {
public static final String RETRY_COUNT = "atlas.graph.storage.num.retries";
public static final String RETRY_DELAY = "atlas.graph.storage.retry.sleeptime.ms";
public static final String DEFAULT_REMOVE_PROPAGATIONS_ON_ENTITY_DELETE = "atlas.graph.remove.propagations.default";
private final AtlasGremlinQueryProvider queryProvider = AtlasGremlinQueryProvider.INSTANCE;
private static volatile GraphHelper INSTANCE;
private AtlasGraph graph;
private static int maxRetries;
public static long retrySleepTimeMillis;
private static long retrySleepTimeMillis;
private static boolean removePropagations;
@VisibleForTesting
GraphHelper(AtlasGraph graph) {
......@@ -120,6 +124,7 @@ public final class GraphHelper {
try {
maxRetries = ApplicationProperties.get().getInt(RETRY_COUNT, 3);
retrySleepTimeMillis = ApplicationProperties.get().getLong(RETRY_DELAY, 1000);
removePropagations = ApplicationProperties.get().getBoolean(DEFAULT_REMOVE_PROPAGATIONS_ON_ENTITY_DELETE, true);
} catch (AtlasException e) {
LOG.error("Could not load configuration. Setting to default value for " + RETRY_COUNT, e);
}
......@@ -384,6 +389,18 @@ public final class GraphHelper {
return ret;
}
public static boolean getRemovePropagations(AtlasVertex classificationVertex) {
boolean ret = false;
if (classificationVertex != null) {
Boolean enabled = AtlasGraphUtilsV2.getProperty(classificationVertex, CLASSIFICATION_VERTEX_REMOVE_PROPAGATIONS_KEY, Boolean.class);
ret = (enabled == null) ? true : enabled;
}
return ret;
}
public static AtlasVertex getClassificationVertex(AtlasVertex entityVertex, String classificationName) {
AtlasVertex ret = null;
Iterable edges = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL)
......@@ -1939,4 +1956,8 @@ public final class GraphHelper {
private static boolean verticesEquals(AtlasVertex vertexA, AtlasVertex vertexB) {
return StringUtils.equals(getGuid(vertexB), getGuid(vertexA));
}
public static boolean getDefaultRemovePropagations() {
return removePropagations;
}
}
\ No newline at end of file
......@@ -952,6 +952,14 @@ public abstract class DeleteHandlerV1 {
List<AtlasEdge> classificationEdges = getAllClassificationEdges(instanceVertex);
for (AtlasEdge edge : classificationEdges) {
AtlasVertex classificationVertex = edge.getInVertex();
boolean isClassificationEdge = isClassificationEdge(edge);
boolean removePropagations = getRemovePropagations(classificationVertex);
if (isClassificationEdge && removePropagations) {
removeTagPropagation(classificationVertex);
}
deleteEdgeReference(edge, CLASSIFICATION, false, false, instanceVertex);
}
}
......
......@@ -77,12 +77,14 @@ import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UP
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality.SET;
import static org.apache.atlas.repository.Constants.ATTRIBUTE_KEY_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_REMOVE_PROPAGATIONS_KEY;
import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TRAIT_NAMES_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.ATTRIBUTE_INDEX_PROPERTY_KEY;
import static org.apache.atlas.repository.graph.GraphHelper.getCollectionElementsUsingRelationship;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdge;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertex;
import static org.apache.atlas.repository.graph.GraphHelper.getDefaultRemovePropagations;
import static org.apache.atlas.repository.graph.GraphHelper.getMapElementsProperty;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedTraitNames;
import static org.apache.atlas.repository.graph.GraphHelper.getStatus;
......@@ -1294,6 +1296,7 @@ public class EntityGraphMapper {
AtlasClassification classification = new AtlasClassification(c);
String classificationName = classification.getTypeName();
Boolean propagateTags = classification.isPropagate();
Boolean removePropagations = classification.getRemovePropagationsOnEntityDelete();
if (propagateTags != null && propagateTags &&
classification.getEntityGuid() != null &&
......@@ -1310,6 +1313,12 @@ public class EntityGraphMapper {
}
}
if (removePropagations == null) {
removePropagations = getDefaultRemovePropagations();
classification.setRemovePropagationsOnEntityDelete(removePropagations);
}
// set associated entity id to classification
if (classification.getEntityGuid() == null) {
classification.setEntityGuid(guid);
......@@ -1630,6 +1639,14 @@ public class EntityGraphMapper {
}
}
// handle update of 'removePropagationsOnEntityDelete' flag
Boolean currentRemovePropagations = currentClassification.getRemovePropagationsOnEntityDelete();
Boolean updatedRemovePropagations = classification.getRemovePropagationsOnEntityDelete();
if (updatedRemovePropagations != null && (updatedRemovePropagations != currentRemovePropagations)) {
AtlasGraphUtilsV2.setProperty(classificationVertex, CLASSIFICATION_VERTEX_REMOVE_PROPAGATIONS_KEY, updatedRemovePropagations);
}
updatedClassifications.add(currentClassification);
}
......@@ -1676,6 +1693,11 @@ public class EntityGraphMapper {
AtlasGraphUtilsV2.setProperty(traitInstanceVertex, Constants.CLASSIFICATION_VERTEX_PROPAGATE_KEY, classification.isPropagate());
}
if (classification.getRemovePropagationsOnEntityDelete() != null) {
AtlasGraphUtilsV2.setProperty(traitInstanceVertex, CLASSIFICATION_VERTEX_REMOVE_PROPAGATIONS_KEY,
classification.getRemovePropagationsOnEntityDelete());
}
// map all the attributes to this newly created AtlasVertex
mapAttributes(classification, traitInstanceVertex, operation, context);
......
......@@ -109,6 +109,7 @@ import static org.apache.atlas.repository.graph.GraphHelper.getReferenceMap;
import static org.apache.atlas.repository.graph.GraphHelper.getOutGoingEdgesByLabel;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags;
import static org.apache.atlas.repository.graph.GraphHelper.getRelationshipGuid;
import static org.apache.atlas.repository.graph.GraphHelper.getRemovePropagations;
import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
import static org.apache.atlas.repository.graph.GraphHelper.isPropagationEnabled;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
......@@ -279,6 +280,7 @@ public final class EntityGraphRetriever {
ret.setEntityGuid(AtlasGraphUtilsV2.getProperty(classificationVertex, CLASSIFICATION_ENTITY_GUID, String.class));
ret.setEntityStatus(getClassificationEntityStatus(classificationVertex));
ret.setPropagate(isPropagationEnabled(classificationVertex));
ret.setRemovePropagationsOnEntityDelete(getRemovePropagations(classificationVertex));
String strValidityPeriods = AtlasGraphUtilsV2.getProperty(classificationVertex, CLASSIFICATION_VALIDITY_PERIODS_KEY, String.class);
......
......@@ -82,10 +82,13 @@ public class ClassificationPropagationTest {
public static final String EMPLOYEES1_PROCESS = "EMPLOYEES1_PROCESS";
public static final String EMPLOYEES2_PROCESS = "EMPLOYEES2_PROCESS";
public static final String EMPLOYEES_UNION_PROCESS = "EMPLOYEES_UNION_PROCESS";
public static final String IMPORT_FILE = "tag-propagation-data-1.zip";
public static final String EMPLOYEES_TABLE = "EMPLOYEES_TABLE";
public static final String US_EMPLOYEES_TABLE = "US_EMPLOYEES2_TABLE";
public static final String EMPLOYEES_PROCESS = "EMPLOYEES_PROCESS";
public static final String ORDERS_TABLE = "ORDERS_TABLE";
public static final String US_ORDERS_TABLE = "US_ORDERS_TABLE";
public static final String ORDERS_PROCESS = "ORDERS_PROCESS";
public static final String IMPORT_FILE = "tag-propagation-data.zip";
@Inject
private AtlasTypeDefStore typeDefStore;
......@@ -144,6 +147,12 @@ public class ClassificationPropagationTest {
[Employees] ----> [Process] ----> [ US_Employees ]
Lineage - 3
-----------
[Orders] ----> [Process] ----> [ US_Orders ]
*/
@Test
......@@ -459,12 +468,39 @@ public class ClassificationPropagationTest {
}
@Test(dependsOnMethods = {"removeBlockedPropagatedClassifications"})
public void addClassification_DeleteCase() throws AtlasBaseException {
public void addClassification_removePropagationsTrue_DeleteCase() throws AtlasBaseException {
AtlasEntity orders = getEntity(ORDERS_TABLE);
AtlasClassification tag2 = new AtlasClassification("tag2");
tag2.setEntityGuid(orders.getGuid());
tag2.setPropagate(true);
tag2.setRemovePropagationsOnEntityDelete(true);
addClassification(orders, tag2);
List<String> propagatedEntities = Arrays.asList(EMPLOYEES_PROCESS, US_EMPLOYEES_TABLE);
assertClassificationExistInEntities(propagatedEntities, tag2);
AtlasEntity orders_process = getEntity(ORDERS_PROCESS);
AtlasEntity us_orders = getEntity(US_ORDERS_TABLE);
deletePropagatedClassificationExpectFail(orders_process, tag2);
deletePropagatedClassificationExpectFail(us_orders, tag2);
deleteEntity(ORDERS_TABLE);
assertClassificationNotExistInEntity(ORDERS_PROCESS, tag2);
assertClassificationNotExistInEntity(US_ORDERS_TABLE, tag2);
}
@Test(dependsOnMethods = {"addClassification_removePropagationsTrue_DeleteCase"})
public void addClassification_removePropagationsFalse_DeleteCase() throws AtlasBaseException {
AtlasEntity employees = getEntity(EMPLOYEES_TABLE);
AtlasClassification tag1 = new AtlasClassification("tag1");
tag1.setEntityGuid(employees.getGuid());
tag1.setPropagate(true);
tag1.setRemovePropagationsOnEntityDelete(false);
addClassification(employees, tag1);
......@@ -604,6 +640,10 @@ public class ClassificationPropagationTest {
entitiesMap.put(US_EMPLOYEES_TABLE, "44acef8e-fefe-491c-87d9-e2ea6a9ad3b0");
entitiesMap.put(EMPLOYEES_PROCESS, "a1c9a281-d30b-419c-8199-7434b245d7fe");
entitiesMap.put(ORDERS_TABLE, "ab995a8d-1f87-4908-91e4-d4e8e376ba22");
entitiesMap.put(US_ORDERS_TABLE, "70268a81-f145-4a37-ae39-b09daa85a928");
entitiesMap.put(ORDERS_PROCESS, "da016ad9-456a-4c99-895a-fa00f2de49ba");
lineageInfo = lineageService.getAtlasLineageInfo(entitiesMap.get(HDFS_PATH_EMPLOYEES), LineageDirection.BOTH, 3);
}
......
......@@ -465,6 +465,7 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT {
classification.setEntityGuid(tableGuid);
classification.addValityPeriod(validityPeriod);
classification.setPropagate(true);
classification.setRemovePropagationsOnEntityDelete(true);
atlasClientV2.addClassifications(tableGuid, Collections.singletonList(classification));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment