Commit b08b7b49 by Ashutosh Mestry

ATLAS-2870: Improvement to AddClassification transform to use filters.

parent 8903c9a6
......@@ -112,7 +112,7 @@ public class ImportService {
return;
}
importTransformsShaper.shape(importTransform);
importTransformsShaper.shape(importTransform, source.getExportResult().getRequest());
source.setImportTransform(importTransform);
if(LOG.isDebugEnabled()) {
......
......@@ -21,10 +21,14 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.commons.lang.StringUtils;
import scala.Tuple3;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public abstract class ImportTransformer {
......@@ -71,8 +75,8 @@ public abstract class ImportTransformer {
} else if (key.equals(TRANSFORMER_SET_DELETED)) {
ret = new SetDeleted();
} else if (key.equals(TRANSFORMER_NAME_ADD_CLASSIFICATION)) {
String name = (params == null || params.length < 1) ? "" : StringUtils.join(params, ":", 1, params.length);
ret = new AddClassification(name);
String name = (params == null || params.length < 1) ? "" : params[1];
ret = new AddClassification(name, (params != null && params.length == 3) ? params[2] : "");
} else {
throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "Error creating ImportTransformer. Unknown transformer: {}.", transformerSpec);
}
......@@ -151,12 +155,22 @@ public abstract class ImportTransformer {
}
static class AddClassification extends ImportTransformer {
private static final String FILTER_SCOPE_TOP_LEVEL = "topLevel";
private final String scope;
private final String classificationName;
private List<AtlasObjectId> filters;
public AddClassification(String name) {
public AddClassification(String name, String scope) {
super(TRANSFORMER_NAME_REMOVE_CLASSIFICATION);
this.classificationName = name;
this.scope = scope;
filters = new ArrayList<>();
}
public void addFilter(AtlasObjectId objectId) {
filters.add(objectId);
}
@Override
......@@ -166,6 +180,10 @@ public abstract class ImportTransformer {
}
AtlasEntity entity = (AtlasEntity) o;
if(!passThruFilters(entity)) {
return o;
}
if(entity.getClassifications() == null) {
entity.setClassifications(new ArrayList<AtlasClassification>());
}
......@@ -180,6 +198,40 @@ public abstract class ImportTransformer {
return entity;
}
private boolean passThruFilters(AtlasEntity entity) {
if(StringUtils.isEmpty(scope) || !scope.equals(FILTER_SCOPE_TOP_LEVEL)) {
return true;
}
for (AtlasObjectId filter : filters) {
if(isMatch(filter, entity)) {
return true;
}
}
return false;
}
private boolean isMatch(AtlasObjectId objectId, AtlasEntity entity) {
boolean ret = true;
if (StringUtils.isEmpty(objectId.getGuid())) {
ret = Objects.equals(objectId.getTypeName(), entity.getTypeName());
if (ret) {
for (Map.Entry<String, Object> entry : objectId.getUniqueAttributes().entrySet()) {
ret = ret && Objects.equals(entity.getAttribute(entry.getKey()), entry.getValue());
if (!ret) {
break;
}
}
}
return ret;
} else {
return Objects.equals(objectId.getGuid(), entity.getGuid());
}
}
@Override
public String toString() {
return String.format("%s=%s", "AddClassification", classificationName);
......
......@@ -19,6 +19,8 @@
package org.apache.atlas.repository.impexp;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.store.AtlasTypeDefStore;
......@@ -32,6 +34,7 @@ import javax.inject.Inject;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Component
public class ImportTransformsShaper {
......@@ -46,12 +49,12 @@ public class ImportTransformsShaper {
this.typeDefStore = typeDefStore;
}
public void shape(ImportTransforms importTransform) throws AtlasBaseException {
getCreateClassifications(importTransform);
public void shape(ImportTransforms importTransform, AtlasExportRequest request) throws AtlasBaseException {
getCreateClassifications(importTransform, request);
updateTransformsWithSubTypes(importTransform);
}
private void getCreateClassifications(ImportTransforms importTransform) throws AtlasBaseException {
private void getCreateClassifications(ImportTransforms importTransform, AtlasExportRequest request) throws AtlasBaseException {
Map<String, Map<String, List<ImportTransformer>>> mapMapList = importTransform.getTransforms();
for (Map<String, List<ImportTransformer>> mapList : mapMapList.values()) {
for (List<ImportTransformer> list : mapList.values()) {
......@@ -59,6 +62,7 @@ public class ImportTransformsShaper {
if((importTransformer instanceof ImportTransformer.AddClassification)) {
ImportTransformer.AddClassification addClassification = (ImportTransformer.AddClassification) importTransformer;
addFilters(request, addClassification);
getCreateTag(addClassification.getClassificationName());
}
}
......@@ -66,6 +70,12 @@ public class ImportTransformsShaper {
}
}
private void addFilters(AtlasExportRequest request, ImportTransformer.AddClassification transformer) {
for(AtlasObjectId objectId : request.getItemsToExport()) {
transformer.addFilter(objectId);
}
}
private void updateTransformsWithSubTypes(ImportTransforms importTransforms) {
String[] transformTypes = importTransforms.getTypes().toArray(new String[importTransforms.getTypes().size()]);
for (int i = 0; i < transformTypes.length; i++) {
......
......@@ -21,6 +21,7 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
......@@ -37,6 +38,8 @@ import static org.testng.Assert.assertTrue;
public class ImportTransformsTest {
private final String ATTR_NAME_QUALIFIED_NAME = "qualifiedName";
private final String COLUMN_QUALIFIED_NAME_FORMAT = "col%s.TABLE1.default@cl1";
private final String lowerCaseCL1 = "@cl1";
private final String lowerCaseCL2 = "@cl2";
private final String jsonLowerCaseReplace = "{ \"hive_table\": { \"qualifiedName\":[ \"lowercase\", \"replace:@cl1:@cl2\" ] } }";
......@@ -48,6 +51,7 @@ public class ImportTransformsTest {
private final String jsonSetDeleted = "{ \"hive_table\": { \"*\":[ \"setDeleted\" ] } }";
private final String jsonAddClasification = "{ \"hive_table\": { \"*\":[ \"addClassification:REPLICATED\" ] } }";
private final String jsonAddClasification2 = "{ \"hive_table\": { \"*\":[ \"addClassification:REPLICATED_2\" ] } }";
private final String jsonAddClasificationScoped = "{ \"hive_column\": { \"*\":[ \"addClassification:REPLICATED_2:topLevel\" ] } }";
private ImportTransforms transform;
private String HIVE_TABLE_ATTR_SYNC_INFO = "hive_table.syncInfo";
......@@ -210,6 +214,29 @@ public class ImportTransformsTest {
addClassification_MultipleClassificationsAreAdded(entity);
}
@Test
public void addScopedClassification() throws AtlasBaseException {
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = getAtlasEntityWithExtInfo();
AtlasEntity entity = entityWithExtInfo.getReferredEntities().get("2");
int existingClassificationsCount = entityWithExtInfo.getEntity().getClassifications() != null ? entity.getClassifications().size() : 0;
ImportTransforms t = ImportTransforms.fromJson(jsonAddClasificationScoped);
assertTrue(t.getTransforms().size() > 0);
ImportTransformer.AddClassification classification = (ImportTransformer.AddClassification) t.getTransforms().get("hive_column").get("*").get(0);
AtlasObjectId objectId = new AtlasObjectId("hive_column", ATTR_NAME_QUALIFIED_NAME, String.format(COLUMN_QUALIFIED_NAME_FORMAT, 2));
classification.addFilter(objectId);
t.apply(entityWithExtInfo);
assertNotNull(t);
assertNull(entityWithExtInfo.getEntity().getClassifications());
assertNull(entityWithExtInfo.getReferredEntities().get("0").getClassifications());
assertEquals(entityWithExtInfo.getReferredEntities().get("1").getClassifications().size(), existingClassificationsCount + 1);
assertNull(entityWithExtInfo.getReferredEntities().get("2").getClassifications());
}
private void addClassification_ExistingClassificationsAreHandled(AtlasEntity entity) throws AtlasBaseException {
int existingClassificationsCount = entity.getClassifications() != null ? entity.getClassifications().size() : 0;
assertTrue(existingClassificationsCount > 0);
......@@ -270,7 +297,7 @@ public class ImportTransformsTest {
AtlasEntity entity = new AtlasEntity("hive_column");
Map<String, Object> attributes = new HashMap<>();
attributes.put(ATTR_NAME_QUALIFIED_NAME, String.format("col%s.TABLE1.default@cl1", index));
attributes.put(ATTR_NAME_QUALIFIED_NAME, String.format(COLUMN_QUALIFIED_NAME_FORMAT, index));
attributes.put("name", "col" + index);
entity.setAttributes(attributes);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment