Commit 9d4f9728 by Ashutosh Mestry

ATLAS-2882: AddClassification transform for new transforms

parent afa314cb
......@@ -17,16 +17,26 @@
*/
package org.apache.atlas.entitytransform;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.commons.lang.StringUtils;
import org.apache.atlas.entitytransform.BaseEntityHandler.AtlasTransformableEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
public abstract class Action {
private static final Logger LOG = LoggerFactory.getLogger(Action.class);
private static final String ENTITY_KEY = "__entity";
private static final String ACTION_DELIMITER = ":";
private static final String ACTION_ADD_CLASSIFICATION = "ADDCLASSIFICATION";
private static final String ACTION_NAME_SET = "SET";
private static final String ACTION_NAME_REPLACE_PREFIX = "REPLACE_PREFIX";
private static final String ACTION_NAME_TO_LOWER = "TO_LOWER";
......@@ -65,6 +75,10 @@ public abstract class Action {
value = StringUtils.trim(value);
switch (actionName.toUpperCase()) {
case ACTION_ADD_CLASSIFICATION:
ret = new AddClassificationAction(actionValue);
break;
case ACTION_NAME_REPLACE_PREFIX:
ret = new PrefixReplaceAction(key, actionValue);
break;
......@@ -115,6 +129,60 @@ public abstract class Action {
}
}
public static class AddClassificationAction extends Action implements NeedsContext {
private final String classificationName;
private TransformerContext transformerContext;
public AddClassificationAction(String classificationName) {
super(ENTITY_KEY);
this.classificationName = classificationName;
}
@Override
public void apply(AtlasTransformableEntity transformableEntity) {
AtlasEntity entity = transformableEntity.entity;
if (entity.getClassifications() == null) {
entity.setClassifications(new ArrayList<AtlasClassification>());
}
for (AtlasClassification c : entity.getClassifications()) {
if (c.getTypeName().equals(classificationName)) {
return;
}
}
entity.getClassifications().add(new AtlasClassification(classificationName));
}
@Override
public void setContext(TransformerContext transformerContext) {
this.transformerContext = transformerContext;
getCreateTag(classificationName);
}
private void getCreateTag(String classificationName) {
if (transformerContext == null) {
return;
}
try {
AtlasClassificationDef classificationDef = transformerContext.getTypeRegistry().getClassificationDefByName(classificationName);
if (classificationDef != null) {
return;
}
classificationDef = new AtlasClassificationDef(classificationName);
AtlasTypesDef typesDef = new AtlasTypesDef();
typesDef.setClassificationDefs(Collections.singletonList(classificationDef));
transformerContext.getTypeDefStore().createTypesDef(typesDef);
LOG.info("created classification: {}", classificationName);
} catch (AtlasBaseException e) {
LOG.error("Error creating classification: {}", classificationName, e);
}
}
}
public static class PrefixReplaceAction extends Action {
private final String fromPrefix;
......
......@@ -19,14 +19,17 @@ package org.apache.atlas.entitytransform;
import org.apache.atlas.entitytransform.BaseEntityHandler.AtlasTransformableEntity;
import org.apache.atlas.model.impexp.AttributeTransform;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class AtlasEntityTransformer {
private final List<Condition> conditions;
private final List<Action> actions;
......@@ -35,6 +38,10 @@ public class AtlasEntityTransformer {
this(attributeTransform.getConditions(), attributeTransform.getAction());
}
public AtlasEntityTransformer(AtlasObjectId objectId, Map<String, String> actions) {
this(Collections.singletonMap("__entity", AtlasType.toJson(objectId)), actions);
}
public AtlasEntityTransformer(Map<String, String> conditions, Map<String, String> actions) {
this.conditions = createConditions(conditions);
this.actions = createActions(actions);
......
......@@ -17,9 +17,14 @@
*/
package org.apache.atlas.entitytransform;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AttributeTransform;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -33,6 +38,7 @@ public class BaseEntityHandler {
protected final List<AtlasEntityTransformer> transformers;
protected final boolean hasCustomAttributeTransformer;
private TransformerContext transformerContext;
public BaseEntityHandler(List<AtlasEntityTransformer> transformers) {
this(transformers, null);
......@@ -48,26 +54,45 @@ public class BaseEntityHandler {
}
public AtlasEntity transform(AtlasEntity entity) {
if (CollectionUtils.isNotEmpty(transformers)) {
AtlasTransformableEntity transformableEntity = getTransformableEntity(entity);
if (!CollectionUtils.isNotEmpty(transformers)) {
return entity;
}
if (transformableEntity != null) {
for (AtlasEntityTransformer transformer : transformers) {
transformer.transform(transformableEntity);
}
AtlasTransformableEntity transformableEntity = getTransformableEntity(entity);
if (transformableEntity == null) {
return entity;
}
transformableEntity.transformComplete();
}
for (AtlasEntityTransformer transformer : transformers) {
transformer.transform(transformableEntity);
}
transformableEntity.transformComplete();
return entity;
}
private void setContextForActions(List<Action> actions) {
for(Action action : actions) {
if (action instanceof NeedsContext) {
((NeedsContext) action).setContext(transformerContext);
}
}
}
private void setContextForConditions(List<Condition> conditions) {
for(Condition condition : conditions) {
if (condition instanceof NeedsContext) {
((NeedsContext) condition).setContext(transformerContext);
}
}
}
public AtlasTransformableEntity getTransformableEntity(AtlasEntity entity) {
return new AtlasTransformableEntity(entity);
}
public static List<BaseEntityHandler> createEntityHandlers(List<AttributeTransform> transforms) {
public static List<BaseEntityHandler> createEntityHandlers(List<AttributeTransform> transforms, TransformerContext context) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> BaseEntityHandler.createEntityHandlers(transforms={})", transforms);
}
......@@ -92,10 +117,18 @@ public class BaseEntityHandler {
for (BaseEntityHandler handler : handlers) {
if (handler.hasCustomAttributeTransformer()) {
ret.add(handler);
handler.setContext(context);
}
}
if (CollectionUtils.isEmpty(ret)) {
BaseEntityHandler be = new BaseEntityHandler(transformers);
be.setContext(context);
ret.add(be);
}
if (CollectionUtils.isEmpty(ret)) {
ret.add(new BaseEntityHandler(transformers));
}
......@@ -119,7 +152,20 @@ public class BaseEntityHandler {
return false;
}
public void setContext(AtlasTypeRegistry typeRegistry, AtlasTypeDefStore typeDefStore, AtlasExportRequest request) {
setContext(new TransformerContext(typeRegistry, typeDefStore, request));
}
public void setContext(TransformerContext context) {
this.transformerContext = context;
for (AtlasEntityTransformer transformer : transformers) {
if (transformerContext != null) {
setContextForActions(transformer.getActions());
setContextForConditions(transformer.getConditions());
}
}
}
public static class AtlasTransformableEntity {
protected final AtlasEntity entity;
......@@ -170,4 +216,41 @@ public class BaseEntityHandler {
// implementations can override to set value of computed-attributes
}
}
public static List<BaseEntityHandler> fromJson(String transformersString, TransformerContext context) {
if (StringUtils.isEmpty(transformersString)) {
return null;
}
Object transformersObj = AtlasType.fromJson(transformersString, Object.class);
List transformers = (transformersObj != null && transformersObj instanceof List) ? (List) transformersObj : null;
List<AttributeTransform> attributeTransforms = new ArrayList<>();
if (CollectionUtils.isEmpty(transformers)) {
return null;
}
for (Object transformer : transformers) {
String transformerStr = AtlasType.toJson(transformer);
AttributeTransform attributeTransform = AtlasType.fromJson(transformerStr, AttributeTransform.class);
if (attributeTransform == null) {
continue;
}
attributeTransforms.add(attributeTransform);
}
if (CollectionUtils.isEmpty(attributeTransforms)) {
return null;
}
List<BaseEntityHandler> entityHandlers = createEntityHandlers(attributeTransforms, context);
if (CollectionUtils.isEmpty(entityHandlers)) {
return null;
}
return entityHandlers;
}
}
\ No newline at end of file
......@@ -18,15 +18,25 @@
package org.apache.atlas.entitytransform;
import org.apache.atlas.entitytransform.BaseEntityHandler.AtlasTransformableEntity;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public abstract class Condition {
private static final Logger LOG = LoggerFactory.getLogger(Condition.class);
private static final String CONDITION_DELIMITER = ":";
private static final String CONDITION_ENTITY_OBJECT_ID = "OBJECTID";
private static final String CONDITION_ENTITY_TOP_LEVEL = "TOPLEVEL";
private static final String CONDITION_ENTITY_ALL = "ALL";
private static final String CONDITION_NAME_EQUALS = "EQUALS";
private static final String CONDITION_NAME_EQUALS_IGNORE_CASE = "EQUALS_IGNORE_CASE";
private static final String CONDITION_NAME_STARTS_WITH = "STARTS_WITH";
......@@ -60,6 +70,18 @@ public abstract class Condition {
value = StringUtils.trim(value);
switch (conditionName.toUpperCase()) {
case CONDITION_ENTITY_ALL:
ret = new ObjectIdEquals(key, CONDITION_ENTITY_ALL);
break;
case CONDITION_ENTITY_TOP_LEVEL:
ret = new ObjectIdEquals(key, CONDITION_ENTITY_TOP_LEVEL);
break;
case CONDITION_ENTITY_OBJECT_ID:
ret = new ObjectIdEquals(key, conditionValue);
break;
case CONDITION_NAME_EQUALS:
ret = new EqualsCondition(key, conditionValue);
break;
......@@ -164,6 +186,70 @@ public abstract class Condition {
}
}
static class ObjectIdEquals extends Condition implements NeedsContext {
private final List<AtlasObjectId> objectIds;
private String scope;
private TransformerContext transformerContext;
public ObjectIdEquals(String key, String conditionValue) {
super(key);
objectIds = new ArrayList<>();
this.scope = conditionValue;
}
@Override
public boolean matches(AtlasTransformableEntity entity) {
for (AtlasObjectId objectId : objectIds) {
return isMatch(objectId, entity.entity);
}
return objectIds.size() == 0;
}
public void add(AtlasObjectId objectId) {
this.objectIds.add(objectId);
}
private boolean isMatch(AtlasObjectId objectId, AtlasEntity entity) {
boolean ret = true;
if (!StringUtils.isEmpty(objectId.getGuid())) {
return Objects.equals(objectId.getGuid(), entity.getGuid());
}
ret = Objects.equals(objectId.getTypeName(), entity.getTypeName());
if (!ret) {
return ret;
}
for (Map.Entry<String, Object> entry : objectId.getUniqueAttributes().entrySet()) {
ret = ret && Objects.equals(entity.getAttribute(entry.getKey()), entry.getValue());
if (!ret) {
break;
}
}
return ret;
}
@Override
public void setContext(TransformerContext transformerContext) {
this.transformerContext = transformerContext;
if(StringUtils.isEmpty(scope) || scope.equals(CONDITION_ENTITY_ALL)) {
return;
}
addObjectIdsFromExportRequest();
}
private void addObjectIdsFromExportRequest() {
for(AtlasObjectId objectId : this.transformerContext.getExportRequest().getItemsToExport()) {
add(objectId);
}
}
}
public static class HasValueCondition extends Condition {
protected final String attributeValue;
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.entitytransform;
public interface NeedsContext {
void setContext(TransformerContext transformerContext);
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.entitytransform;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry;
public class TransformerContext {
private final AtlasTypeRegistry typeRegistry;
private final AtlasTypeDefStore typeDefStore;
private final AtlasExportRequest exportRequest;
public TransformerContext(AtlasTypeRegistry typeRegistry, AtlasTypeDefStore typeDefStore, AtlasExportRequest exportRequest) {
this.typeRegistry = typeRegistry;
this.typeDefStore = typeDefStore;
this.exportRequest = exportRequest;
}
public AtlasTypeRegistry getTypeRegistry() {
return this.typeRegistry;
}
public AtlasTypeDefStore getTypeDefStore() {
return typeDefStore;
}
public AtlasExportRequest getExportRequest() {
return exportRequest;
}
}
......@@ -20,10 +20,10 @@ package org.apache.atlas.repository.impexp;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.entitytransform.BaseEntityHandler;
import org.apache.atlas.entitytransform.TransformerContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.impexp.AttributeTransform;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.graph.BulkImporter;
import org.apache.atlas.store.AtlasTypeDefStore;
......@@ -42,7 +42,6 @@ import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMERS_KEY;
......@@ -131,36 +130,19 @@ public class ImportService {
}
private void setEntityTransformerHandlers(ZipSource source, String transformersString) {
if (StringUtils.isEmpty(transformersString)) {
@VisibleForTesting
void setEntityTransformerHandlers(ZipSource source, String transformersJson) throws AtlasBaseException {
if (StringUtils.isEmpty(transformersJson)) {
return;
}
Object transformersObj = AtlasType.fromJson(transformersString, Object.class);
List transformers = (transformersObj != null && transformersObj instanceof List) ? (List) transformersObj : null;
List<AttributeTransform> attributeTransforms = new ArrayList<>();
if (CollectionUtils.isNotEmpty(transformers)) {
for (Object transformer : transformers) {
String transformerStr = AtlasType.toJson(transformer);
AttributeTransform attributeTransform = AtlasType.fromJson(transformerStr, AttributeTransform.class);
if (attributeTransform == null) {
continue;
}
attributeTransforms.add(attributeTransform);
}
TransformerContext context = new TransformerContext(typeRegistry, typeDefStore, source.getExportResult().getRequest());
List<BaseEntityHandler> entityHandlers = BaseEntityHandler.fromJson(transformersJson, context);
if (CollectionUtils.isEmpty(entityHandlers)) {
return;
}
if (CollectionUtils.isNotEmpty(attributeTransforms)) {
List<BaseEntityHandler> entityHandlers = BaseEntityHandler.createEntityHandlers(attributeTransforms);
if (CollectionUtils.isNotEmpty(entityHandlers)) {
source.setEntityHandlers(entityHandlers);
}
}
source.setEntityHandlers(entityHandlers);
}
private void debugLog(String s, Object... params) {
......
......@@ -138,10 +138,12 @@ public class ZipSource implements EntityImportStream {
String s = getFromCache(guid);
AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntityWithExtInfo.class, s);
if (importTransform != null) {
entityWithExtInfo = importTransform.apply(entityWithExtInfo);
}
if (entityHandlers != null) {
applyTransformers(entityWithExtInfo);
} else if (importTransform != null) {
entityWithExtInfo = importTransform.apply(entityWithExtInfo);
}
return entityWithExtInfo;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment