Commit 8746b306 by Sarath Subramanian Committed by Ashutosh Mestry

ATLAS-2873: Atlas Import Transform Handler Implementation

parent dc6be8e2
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.entitytransform;
import org.apache.commons.lang.StringUtils;
import org.apache.atlas.entitytransform.BaseEntityHandler.AtlasTransformableEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class Action {
private static final Logger LOG = LoggerFactory.getLogger(Action.class);
private static final String ACTION_DELIMITER = ":";
private static final String ACTION_NAME_SET = "SET";
private static final String ACTION_NAME_REPLACE_PREFIX = "REPLACE_PREFIX";
private static final String ACTION_NAME_TO_LOWER = "TO_LOWER";
private static final String ACTION_NAME_TO_UPPER = "TO_UPPER";
protected final String attributeName;
protected Action(String attributeName) {
this.attributeName = attributeName;
}
public String getAttributeName() { return attributeName; }
public boolean isValid() {
return StringUtils.isNotEmpty(attributeName);
}
public abstract void apply(AtlasTransformableEntity entity);
public static Action createAction(String key, String value) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> Action.createAction(key={}, value={})", key, value);
}
final Action ret;
int idxActionDelim = value == null ? -1 : value.indexOf(ACTION_DELIMITER);
String actionName = idxActionDelim == -1 ? ACTION_NAME_SET : value.substring(0, idxActionDelim);
String actionValue = idxActionDelim == -1 ? value : value.substring(idxActionDelim + ACTION_DELIMITER.length());
actionName = StringUtils.trim(actionName);
actionValue = StringUtils.trim(actionValue);
value = StringUtils.trim(value);
switch (actionName.toUpperCase()) {
case ACTION_NAME_REPLACE_PREFIX:
ret = new PrefixReplaceAction(key, actionValue);
break;
case ACTION_NAME_TO_LOWER:
ret = new ToLowerCaseAction(key);
break;
case ACTION_NAME_TO_UPPER:
ret = new ToUpperCaseAction(key);
break;
case ACTION_NAME_SET:
ret = new SetAction(key, actionValue);
break;
default:
ret = new SetAction(key, value); // treat unspecified/unknown action as 'SET'
break;
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== Action.createAction(key={}, value={}): actionName={}, actionValue={}, ret={}", key, value, actionName, actionValue, ret);
}
return ret;
}
public static class SetAction extends Action {
private final String attributeValue;
public SetAction(String attributeName, String attributeValue) {
super(attributeName);
this.attributeValue = attributeValue;
}
@Override
public void apply(AtlasTransformableEntity entity) {
if (isValid()) {
entity.setAttribute(attributeName, attributeValue);
}
}
}
public static class PrefixReplaceAction extends Action {
private final String fromPrefix;
private final String toPrefix;
public PrefixReplaceAction(String attributeName, String actionValue) {
super(attributeName);
// actionValue => =:prefixToReplace=replacedValue
if (actionValue != null) {
int idxSepDelimiter = actionValue.indexOf(ACTION_DELIMITER);
if (idxSepDelimiter == -1) { // no separator specified i.e. no value specified to replace; remove the prefix
fromPrefix = actionValue;
toPrefix = "";
} else {
String prefixSep = StringUtils.trim(actionValue.substring(0, idxSepDelimiter));
int idxPrefixSep = actionValue.indexOf(prefixSep, idxSepDelimiter + ACTION_DELIMITER.length());
if (idxPrefixSep == -1) { // separator not found i.e. no value specified to replace; remove the prefix
fromPrefix = actionValue.substring(idxSepDelimiter + ACTION_DELIMITER.length());
toPrefix = "";
} else {
fromPrefix = actionValue.substring(idxSepDelimiter + ACTION_DELIMITER.length(), idxPrefixSep);
toPrefix = actionValue.substring(idxPrefixSep + prefixSep.length());
}
}
} else {
fromPrefix = null;
toPrefix = "";
}
}
@Override
public boolean isValid() {
return super.isValid() && StringUtils.isNotEmpty(fromPrefix);
}
@Override
public void apply(AtlasTransformableEntity entity) {
if (isValid()) {
Object currValue = entity.getAttribute(attributeName);
String strValue = currValue != null ? currValue.toString() : null;
if (strValue != null && strValue.startsWith(fromPrefix)) {
entity.setAttribute(attributeName, StringUtils.replace(strValue, fromPrefix, toPrefix, 1));
}
}
}
}
public static class ToLowerCaseAction extends Action {
public ToLowerCaseAction(String attributeName) {
super(attributeName);
}
@Override
public void apply(AtlasTransformableEntity entity) {
if (isValid()) {
Object currValue = entity.getAttribute(attributeName);
String strValue = currValue instanceof String ? (String) currValue : null;
if (strValue != null) {
entity.setAttribute(attributeName, strValue.toLowerCase());
}
}
}
}
public static class ToUpperCaseAction extends Action {
public ToUpperCaseAction(String attributeName) {
super(attributeName);
}
@Override
public void apply(AtlasTransformableEntity entity) {
if (isValid()) {
Object currValue = entity.getAttribute(attributeName);
String strValue = currValue instanceof String ? (String) currValue : null;
if (strValue != null) {
entity.setAttribute(attributeName, strValue.toUpperCase());
}
}
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.entitytransform;
import org.apache.atlas.entitytransform.BaseEntityHandler.AtlasTransformableEntity;
import org.apache.atlas.model.impexp.AttributeTransform;
import org.apache.commons.collections.MapUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class AtlasEntityTransformer {
private final List<Condition> conditions;
private final List<Action> actions;
public AtlasEntityTransformer(AttributeTransform attributeTransform) {
this(attributeTransform.getConditions(), attributeTransform.getAction());
}
public AtlasEntityTransformer(Map<String, String> conditions, Map<String, String> actions) {
this.conditions = createConditions(conditions);
this.actions = createActions(actions);
}
public List<Condition> getConditions() {
return conditions;
}
public List<Action> getActions() {
return actions;
}
public void transform(AtlasTransformableEntity entity) {
if (entity != null) {
boolean matches = true;
for (Condition condition : conditions) {
matches = matches && condition.matches(entity);
}
if (matches) {
for (Action action : actions) {
action.apply(entity);
}
}
}
}
private List<Condition> createConditions(Map<String, String> conditions) {
List<Condition> ret = new ArrayList<>();
if (MapUtils.isNotEmpty(conditions)) {
for (Map.Entry<String, String> entry : conditions.entrySet()) {
Condition condition = Condition.createCondition(entry.getKey(), entry.getValue());
ret.add(condition);
}
}
return ret;
}
private List<Action> createActions(Map<String, String> actions) {
List<Action> ret = new ArrayList<>();
if (MapUtils.isNotEmpty(actions)) {
for (Map.Entry<String, String> entry : actions.entrySet()) {
Action action = Action.createAction(entry.getKey(), entry.getValue());
ret.add(action);
}
}
return ret;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.entitytransform;
import org.apache.atlas.model.impexp.AttributeTransform;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import static org.apache.atlas.entitytransform.TransformationConstants.TYPE_NAME_ATTRIBUTE_NAME_SEP;
public class BaseEntityHandler {
private static final Logger LOG = LoggerFactory.getLogger(BaseEntityHandler.class);
protected final List<AtlasEntityTransformer> transformers;
protected final boolean hasCustomAttributeTransformer;
public BaseEntityHandler(List<AtlasEntityTransformer> transformers) {
this(transformers, null);
}
public BaseEntityHandler(List<AtlasEntityTransformer> transformers, List<String> customTransformAttributes) {
this.transformers = transformers;
this.hasCustomAttributeTransformer = hasTransformerForAnyAttribute(customTransformAttributes);
}
public boolean hasCustomAttributeTransformer() {
return hasCustomAttributeTransformer;
}
public AtlasEntity transform(AtlasEntity entity) {
if (CollectionUtils.isNotEmpty(transformers)) {
AtlasTransformableEntity transformableEntity = getTransformableEntity(entity);
if (transformableEntity != null) {
for (AtlasEntityTransformer transformer : transformers) {
transformer.transform(transformableEntity);
}
transformableEntity.transformComplete();
}
}
return entity;
}
public AtlasTransformableEntity getTransformableEntity(AtlasEntity entity) {
return new AtlasTransformableEntity(entity);
}
public static List<BaseEntityHandler> createEntityHandlers(List<AttributeTransform> transforms) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> BaseEntityHandler.createEntityHandlers(transforms={})", transforms);
}
List<AtlasEntityTransformer> transformers = new ArrayList<>();
for (AttributeTransform transform : transforms) {
transformers.add(new AtlasEntityTransformer(transform));
}
BaseEntityHandler[] handlers = new BaseEntityHandler[] {
new HdfsPathEntityHandler(transformers),
new HiveDatabaseEntityHandler(transformers),
new HiveTableEntityHandler(transformers),
new HiveColumnEntityHandler(transformers),
new HiveStorageDescriptorEntityHandler(transformers)
};
List<BaseEntityHandler> ret = new ArrayList<>();
// include customer handlers, only if its customer attribute is transformed
for (BaseEntityHandler handler : handlers) {
if (handler.hasCustomAttributeTransformer()) {
ret.add(handler);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== BaseEntityHandler.createEntityHandlers(transforms={}): ret.size={}", transforms, ret.size());
}
return ret;
}
private boolean hasTransformerForAnyAttribute(List<String> attributes) {
if (CollectionUtils.isNotEmpty(transformers) && CollectionUtils.isNotEmpty(attributes)) {
for (AtlasEntityTransformer transformer : transformers) {
for (Action action : transformer.getActions()) {
if (attributes.contains(action.getAttributeName())) {
return true;
}
}
}
}
return false;
}
public static class AtlasTransformableEntity {
protected final AtlasEntity entity;
protected AtlasTransformableEntity(AtlasEntity entity) {
this.entity = entity;
}
public AtlasEntity getEntity() {
return entity;
}
public Object getAttribute(String attributeName) {
Object ret = null;
if (entity != null && attributeName != null) {
ret = entity.getAttribute(attributeName);
if (ret == null) { // try after dropping typeName prefix, if attributeName contains it
int idxSep = attributeName.indexOf(TYPE_NAME_ATTRIBUTE_NAME_SEP);
if (idxSep != -1) {
ret = entity.getAttribute(attributeName.substring(idxSep + 1));
}
}
}
return ret;
}
public void setAttribute(String attributeName, String attributeValue) {
if (entity != null && attributeName != null) {
int idxSep = attributeName.indexOf(TYPE_NAME_ATTRIBUTE_NAME_SEP); // drop typeName prefix, if attributeName contains it
if (idxSep != -1) {
entity.setAttribute(attributeName.substring(idxSep + 1), attributeValue);
} else {
entity.setAttribute(attributeName, attributeValue);
}
}
}
public void transformComplete() {
// implementations can override to set value of computed-attributes
}
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.entitytransform;
import org.apache.atlas.entitytransform.BaseEntityHandler.AtlasTransformableEntity;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class Condition {
private static final Logger LOG = LoggerFactory.getLogger(Condition.class);
private static final String CONDITION_DELIMITER = ":";
private static final String CONDITION_NAME_EQUALS = "EQUALS";
private static final String CONDITION_NAME_EQUALS_IGNORE_CASE = "EQUALS_IGNORE_CASE";
private static final String CONDITION_NAME_STARTS_WITH = "STARTS_WITH";
private static final String CONDITION_NAME_STARTS_WITH_IGNORE_CASE = "STARTS_WITH_IGNORE_CASE";
protected final String attributeName;
protected Condition(String attributeName) {
this.attributeName = attributeName;
}
public String getAttributeName() { return attributeName; }
public abstract boolean matches(AtlasTransformableEntity entity);
public static Condition createCondition(String key, String value) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> Condition.createCondition(key={}, value={})", key, value);
}
final Condition ret;
int idxConditionDelim = value == null ? -1 : value.indexOf(CONDITION_DELIMITER);
String conditionName = idxConditionDelim == -1 ? CONDITION_NAME_EQUALS : value.substring(0, idxConditionDelim);
String conditionValue = idxConditionDelim == -1 ? value : value.substring(idxConditionDelim + CONDITION_DELIMITER.length());
conditionName = StringUtils.trim(conditionName);
conditionValue = StringUtils.trim(conditionValue);
value = StringUtils.trim(value);
switch (conditionName.toUpperCase()) {
case CONDITION_NAME_EQUALS:
ret = new EqualsCondition(key, conditionValue);
break;
case CONDITION_NAME_EQUALS_IGNORE_CASE:
ret = new EqualsIgnoreCaseCondition(key, conditionValue);
break;
case CONDITION_NAME_STARTS_WITH:
ret = new StartsWithCondition(key, conditionValue);
break;
case CONDITION_NAME_STARTS_WITH_IGNORE_CASE:
ret = new StartsWithIgnoreCaseCondition(key, conditionValue);
break;
default:
ret = new EqualsCondition(key, value); // treat unspecified/unknown condition as 'EQUALS'
break;
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== Condition.createCondition(key={}, value={}): actionName={}, actionValue={}, ret={}", key, value, conditionName, conditionValue, ret);
}
return ret;
}
public static class EqualsCondition extends Condition {
protected final String attributeValue;
public EqualsCondition(String attributeName, String attributeValue) {
super(attributeName);
this.attributeValue = attributeValue;
}
@Override
public boolean matches(AtlasTransformableEntity entity) {
Object attributeValue = entity != null ? entity.getAttribute(attributeName) : null;
return attributeValue != null && StringUtils.equals(attributeValue.toString(), this.attributeValue);
}
}
public static class EqualsIgnoreCaseCondition extends Condition {
protected final String attributeValue;
public EqualsIgnoreCaseCondition(String attributeName, String attributeValue) {
super(attributeName);
this.attributeValue = attributeValue;
}
@Override
public boolean matches(AtlasTransformableEntity entity) {
Object attributeValue = entity != null ? entity.getAttribute(attributeName) : null;
return attributeValue != null && StringUtils.equalsIgnoreCase(attributeValue.toString(), this.attributeValue);
}
}
public static class StartsWithCondition extends Condition {
protected final String prefix;
public StartsWithCondition(String attributeName, String prefix) {
super(attributeName);
this.prefix = prefix;
}
@Override
public boolean matches(AtlasTransformableEntity entity) {
Object attributeValue = entity != null ? entity.getAttribute(attributeName) : null;
return attributeValue != null && StringUtils.startsWith(attributeValue.toString(), this.prefix);
}
}
public static class StartsWithIgnoreCaseCondition extends Condition {
protected final String prefix;
public StartsWithIgnoreCaseCondition(String attributeName, String prefix) {
super(attributeName);
this.prefix = prefix;
}
@Override
public boolean matches(AtlasTransformableEntity entity) {
Object attributeValue = entity != null ? entity.getAttribute(attributeName) : null;
return attributeValue != null && StringUtils.startsWithIgnoreCase(attributeValue.toString(), this.prefix);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.entitytransform;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.commons.lang.StringUtils;
import java.util.Arrays;
import java.util.List;
import static org.apache.atlas.entitytransform.TransformationConstants.CLUSTER_DELIMITER;
import static org.apache.atlas.entitytransform.TransformationConstants.CLUSTER_NAME_ATTRIBUTE;
import static org.apache.atlas.entitytransform.TransformationConstants.HDFS_CLUSTER_NAME_ATTRIBUTE;
import static org.apache.atlas.entitytransform.TransformationConstants.HDFS_PATH;
import static org.apache.atlas.entitytransform.TransformationConstants.HDFS_PATH_PATH_ATTRIBUTE;
import static org.apache.atlas.entitytransform.TransformationConstants.HDFS_PATH_NAME_ATTRIBUTE;
import static org.apache.atlas.entitytransform.TransformationConstants.NAME_ATTRIBUTE;
import static org.apache.atlas.entitytransform.TransformationConstants.PATH_ATTRIBUTE;
import static org.apache.atlas.entitytransform.TransformationConstants.QUALIFIED_NAME_ATTRIBUTE;
public class HdfsPathEntityHandler extends BaseEntityHandler {
private static final List<String> CUSTOM_TRANSFORM_ATTRIBUTES = Arrays.asList(HDFS_PATH_NAME_ATTRIBUTE, HDFS_PATH_PATH_ATTRIBUTE, HDFS_CLUSTER_NAME_ATTRIBUTE);
public HdfsPathEntityHandler(List<AtlasEntityTransformer> transformers) {
super(transformers, CUSTOM_TRANSFORM_ATTRIBUTES);
}
@Override
public AtlasTransformableEntity getTransformableEntity(AtlasEntity entity) {
return isHdfsPathEntity(entity) ? new HdfsPathEntity(entity) : null;
}
private boolean isHdfsPathEntity(AtlasEntity entity) {
return StringUtils.equals(entity.getTypeName(), HDFS_PATH);
}
public static class HdfsPathEntity extends AtlasTransformableEntity {
private String clusterName;
private String path;
private String name;
private String pathPrefix;
private boolean isPathUpdated = false;
private boolean isCustomerAttributeUpdated = false;
public HdfsPathEntity(AtlasEntity entity) {
super(entity);
this.path = (String) entity.getAttribute(PATH_ATTRIBUTE);
this.name = (String) entity.getAttribute(NAME_ATTRIBUTE);
String qualifiedName = (String) entity.getAttribute(QUALIFIED_NAME_ATTRIBUTE);
if (qualifiedName != null) {
int clusterSeparatorIdx = qualifiedName.lastIndexOf(CLUSTER_DELIMITER);
if (clusterSeparatorIdx != -1) {
this.clusterName = qualifiedName.substring(clusterSeparatorIdx + 1);
} else {
this.clusterName = "";
}
if (StringUtils.isNotEmpty(path) && StringUtils.isNotEmpty(name)) {
int idx = path.indexOf(name);
if (idx != -1) {
this.pathPrefix = path.substring(0, idx);
} else {
this.pathPrefix = "";
}
}
} else {
this.clusterName = "";
this.pathPrefix = "";
}
}
@Override
public Object getAttribute(String attributeName) {
switch (attributeName) {
case HDFS_CLUSTER_NAME_ATTRIBUTE:
return clusterName;
case HDFS_PATH_NAME_ATTRIBUTE:
return name;
case HDFS_PATH_PATH_ATTRIBUTE:
return path;
}
return super.getAttribute(attributeName);
}
@Override
public void setAttribute(String attributeName, String attributeValue) {
switch (attributeName) {
case HDFS_CLUSTER_NAME_ATTRIBUTE:
clusterName = attributeValue;
isCustomerAttributeUpdated = true;
break;
case HDFS_PATH_NAME_ATTRIBUTE:
name = attributeValue;
isCustomerAttributeUpdated = true;
break;
case HDFS_PATH_PATH_ATTRIBUTE:
path = attributeValue;
isPathUpdated = true;
isCustomerAttributeUpdated = true;
break;
default:
super.setAttribute(attributeName, attributeValue);
break;
}
}
@Override
public void transformComplete() {
if (isCustomerAttributeUpdated) {
entity.setAttribute(CLUSTER_NAME_ATTRIBUTE, clusterName);
entity.setAttribute(NAME_ATTRIBUTE, name);
entity.setAttribute(PATH_ATTRIBUTE, toPath());
entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, toQualifiedName());
}
}
private String toQualifiedName() {
return StringUtils.isEmpty(clusterName) ? toPath() : String.format("%s@%s", toPath(), clusterName);
}
private String toPath() {
final String ret;
if (isPathUpdated) {
ret = path;
} else {
if (StringUtils.isNotEmpty(pathPrefix)) {
ret = pathPrefix + name;
} else {
ret = name;
}
}
return ret;
}
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.entitytransform;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.commons.lang.StringUtils;
import java.util.Arrays;
import java.util.List;
import static org.apache.atlas.entitytransform.TransformationConstants.*;
public class HiveColumnEntityHandler extends BaseEntityHandler {
private static final List<String> CUSTOM_TRANSFORM_ATTRIBUTES = Arrays.asList(HIVE_DB_NAME_ATTRIBUTE, HIVE_TABLE_NAME_ATTRIBUTE, HIVE_COLUMN_NAME_ATTRIBUTE, HIVE_DB_CLUSTER_NAME_ATTRIBUTE);
public HiveColumnEntityHandler(List<AtlasEntityTransformer> transformers) {
super(transformers, CUSTOM_TRANSFORM_ATTRIBUTES);
}
@Override
public AtlasTransformableEntity getTransformableEntity(AtlasEntity entity) {
return isHiveColumnEntity(entity) ? new HiveColumnEntity(entity) : null;
}
private boolean isHiveColumnEntity(AtlasEntity entity) {
return StringUtils.equals(entity.getTypeName(), HIVE_COLUMN);
}
public static class HiveColumnEntity extends AtlasTransformableEntity {
private String databaseName;
private String tableName;
private String columnName;
private String clusterName;
private boolean isCustomerAttributeUpdated = false;
public HiveColumnEntity(AtlasEntity entity) {
super(entity);
this.columnName = (String) entity.getAttribute(NAME_ATTRIBUTE);
String qualifiedName = (String) entity.getAttribute(QUALIFIED_NAME_ATTRIBUTE);
if (qualifiedName != null) {
int databaseSeparatorIdx = qualifiedName.indexOf(DATABASE_DELIMITER);
int tableSeparatorIdx = databaseSeparatorIdx != -1 ? qualifiedName.indexOf(DATABASE_DELIMITER, databaseSeparatorIdx + 1) : - 1;
int clusterSeparatorIdx = qualifiedName.lastIndexOf(CLUSTER_DELIMITER);
this.databaseName = (databaseSeparatorIdx != -1) ? qualifiedName.substring(0, databaseSeparatorIdx).trim() : "";
this.tableName = (tableSeparatorIdx != -1) ? qualifiedName.substring(databaseSeparatorIdx + 1, tableSeparatorIdx).trim() : "";
this.clusterName = (clusterSeparatorIdx != -1) ? qualifiedName.substring(clusterSeparatorIdx + 1).trim() : "";
} else {
this.databaseName = "";
this.tableName = "";
this.clusterName = "";
}
}
@Override
public Object getAttribute(String attributeName) {
switch (attributeName) {
case HIVE_DB_NAME_ATTRIBUTE:
return databaseName;
case HIVE_TABLE_NAME_ATTRIBUTE:
return tableName;
case HIVE_COLUMN_NAME_ATTRIBUTE:
return columnName;
case HIVE_DB_CLUSTER_NAME_ATTRIBUTE:
return clusterName;
}
return super.getAttribute(attributeName);
}
@Override
public void setAttribute(String attributeName, String attributeValue) {
switch (attributeName) {
case HIVE_DB_NAME_ATTRIBUTE:
databaseName = attributeValue;
isCustomerAttributeUpdated = true;
break;
case HIVE_TABLE_NAME_ATTRIBUTE:
tableName = attributeValue;
isCustomerAttributeUpdated = true;
break;
case HIVE_COLUMN_NAME_ATTRIBUTE:
columnName = attributeValue;
isCustomerAttributeUpdated = true;
break;
case HIVE_DB_CLUSTER_NAME_ATTRIBUTE:
clusterName = attributeValue;
isCustomerAttributeUpdated = true;
break;
default:
super.setAttribute(attributeName, attributeValue);
break;
}
}
@Override
public void transformComplete() {
if (isCustomerAttributeUpdated) {
entity.setAttribute(NAME_ATTRIBUTE, columnName);
entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, toQualifiedName());
}
}
private String toQualifiedName() {
return String.format("%s.%s.%s@%s", databaseName, tableName, columnName, clusterName);
}
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.entitytransform;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.commons.lang.StringUtils;
import java.util.Arrays;
import java.util.List;
import static org.apache.atlas.entitytransform.TransformationConstants.*;
public class HiveDatabaseEntityHandler extends BaseEntityHandler {
private static final List<String> CUSTOM_TRANSFORM_ATTRIBUTES = Arrays.asList(HIVE_DB_NAME_ATTRIBUTE, HIVE_DB_CLUSTER_NAME_ATTRIBUTE);
public HiveDatabaseEntityHandler(List<AtlasEntityTransformer> transformers) {
super(transformers, CUSTOM_TRANSFORM_ATTRIBUTES);
}
@Override
public AtlasTransformableEntity getTransformableEntity(AtlasEntity entity) {
return isHiveDatabaseEntity(entity) ? new HiveDatabaseEntity(entity) : null;
}
private boolean isHiveDatabaseEntity(AtlasEntity entity) {
return StringUtils.equals(entity.getTypeName(), HIVE_DATABASE);
}
private static class HiveDatabaseEntity extends AtlasTransformableEntity {
private String databaseName;
private String clusterName;
private boolean isCustomerAttributeUpdated = false;
public HiveDatabaseEntity(AtlasEntity entity) {
super(entity);
this.databaseName = (String) entity.getAttribute(NAME_ATTRIBUTE);
String qualifiedName = (String) entity.getAttribute(QUALIFIED_NAME_ATTRIBUTE);
if (qualifiedName != null) {
int clusterSeparatorIdx = qualifiedName.lastIndexOf(CLUSTER_DELIMITER);
this.clusterName = clusterSeparatorIdx != -1 ? qualifiedName.substring(clusterSeparatorIdx + 1) : "";
} else {
this.clusterName = "";
}
}
@Override
public Object getAttribute(String attributeName) {
switch (attributeName) {
case HIVE_DB_NAME_ATTRIBUTE:
return databaseName;
case HIVE_DB_CLUSTER_NAME_ATTRIBUTE:
return clusterName;
}
return super.getAttribute(attributeName);
}
@Override
public void setAttribute(String attributeName, String attributeValue) {
switch (attributeName) {
case HIVE_DB_NAME_ATTRIBUTE:
databaseName = attributeValue;
isCustomerAttributeUpdated = true;
break;
case HIVE_DB_CLUSTER_NAME_ATTRIBUTE:
clusterName = attributeValue;
isCustomerAttributeUpdated = true;
break;
default:
super.setAttribute(attributeName, attributeValue);
break;
}
}
@Override
public void transformComplete() {
if (isCustomerAttributeUpdated) {
entity.setAttribute(NAME_ATTRIBUTE, databaseName);
entity.setAttribute(CLUSTER_NAME_ATTRIBUTE, clusterName);
entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, toQualifiedName());
}
}
private String toQualifiedName() {
return String.format("%s@%s", databaseName, clusterName);
}
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.entitytransform;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.commons.lang.StringUtils;
import java.util.Arrays;
import java.util.List;
import static org.apache.atlas.entitytransform.TransformationConstants.*;
public class HiveStorageDescriptorEntityHandler extends BaseEntityHandler {
private static final List<String> CUSTOM_TRANSFORM_ATTRIBUTES = Arrays.asList(HIVE_DB_NAME_ATTRIBUTE, HIVE_TABLE_NAME_ATTRIBUTE, HIVE_DB_CLUSTER_NAME_ATTRIBUTE);
public HiveStorageDescriptorEntityHandler(List<AtlasEntityTransformer> transformers) {
super(transformers, CUSTOM_TRANSFORM_ATTRIBUTES);
}
@Override
public AtlasTransformableEntity getTransformableEntity(AtlasEntity entity) {
return isHiveStorageDescEntity(entity) ? new HiveStorageDescriptorEntity(entity) : null;
}
private boolean isHiveStorageDescEntity(AtlasEntity entity) {
return StringUtils.equals(entity.getTypeName(), HIVE_STORAGE_DESCRIPTOR);
}
public static class HiveStorageDescriptorEntity extends AtlasTransformableEntity {
private String databaseName;
private String tableName;
private String clusterName;
private String location;
private boolean isCustomerAttributeUpdated = false;
public HiveStorageDescriptorEntity(AtlasEntity entity) {
super(entity);
this.location = (String) entity.getAttribute(LOCATION_ATTRIBUTE);
String qualifiedName = (String) entity.getAttribute(QUALIFIED_NAME_ATTRIBUTE);
if (qualifiedName != null) {
int databaseSeparatorIdx = qualifiedName.indexOf(DATABASE_DELIMITER);
int clusterSeparatorIdx = qualifiedName.lastIndexOf(CLUSTER_DELIMITER);
String clusterNameWithSuffix = clusterSeparatorIdx != -1 ? qualifiedName.substring(clusterSeparatorIdx + 1) : "";
this.databaseName = (databaseSeparatorIdx != -1) ? qualifiedName.substring(0, databaseSeparatorIdx) : "";
this.tableName = (databaseSeparatorIdx != -1 && clusterSeparatorIdx != -1) ? qualifiedName.substring(databaseSeparatorIdx + 1, clusterSeparatorIdx) : "";
if (StringUtils.isNotEmpty(clusterNameWithSuffix)) {
int idx = clusterNameWithSuffix.lastIndexOf(HIVE_STORAGEDESC_SUFFIX);
this.clusterName = (idx != -1) ? clusterNameWithSuffix.substring(0, idx) : "";
} else {
this.clusterName = "";
}
} else {
this.databaseName = "";
this.tableName = "";
this.clusterName = "";
}
}
@Override
public Object getAttribute(String attributeName) {
switch (attributeName) {
case HIVE_DB_NAME_ATTRIBUTE:
return databaseName;
case HIVE_TABLE_NAME_ATTRIBUTE:
return tableName;
case HIVE_DB_CLUSTER_NAME_ATTRIBUTE:
return clusterName;
}
return super.getAttribute(attributeName);
}
@Override
public void setAttribute(String attributeName, String attributeValue) {
switch (attributeName) {
case HIVE_DB_NAME_ATTRIBUTE:
databaseName = attributeValue;
isCustomerAttributeUpdated = true;
break;
case HIVE_TABLE_NAME_ATTRIBUTE:
tableName = attributeValue;
isCustomerAttributeUpdated = true;
break;
case HIVE_DB_CLUSTER_NAME_ATTRIBUTE:
clusterName = attributeValue;
isCustomerAttributeUpdated = true;
break;
default:
super.setAttribute(attributeName, attributeValue);
break;
}
}
@Override
public void transformComplete() {
if (isCustomerAttributeUpdated) {
entity.setAttribute(LOCATION_ATTRIBUTE, toLocation());
entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, toQualifiedName());
}
}
private String toLocation() {
int lastPathIndex = location != null ? location.lastIndexOf(PATH_DELIMITER) : -1;
return lastPathIndex != -1 ? location.substring(0, lastPathIndex) + PATH_DELIMITER + tableName : location;
}
private String toQualifiedName() {
return String.format("%s.%s@%s_storage", databaseName, tableName, clusterName);
}
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.entitytransform;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.commons.lang.StringUtils;
import java.util.Arrays;
import java.util.List;
import static org.apache.atlas.entitytransform.TransformationConstants.*;
public class HiveTableEntityHandler extends BaseEntityHandler {
private static final List<String> CUSTOM_TRANSFORM_ATTRIBUTES = Arrays.asList(HIVE_DB_NAME_ATTRIBUTE, HIVE_TABLE_NAME_ATTRIBUTE, HIVE_DB_CLUSTER_NAME_ATTRIBUTE);
public HiveTableEntityHandler(List<AtlasEntityTransformer> transformers) {
super(transformers, CUSTOM_TRANSFORM_ATTRIBUTES);
}
@Override
public AtlasTransformableEntity getTransformableEntity(AtlasEntity entity) {
return isHiveTableEntity(entity) ? new HiveTableEntity(entity) : null;
}
private boolean isHiveTableEntity(AtlasEntity entity) {
return StringUtils.equals(entity.getTypeName(), HIVE_TABLE);
}
private static class HiveTableEntity extends AtlasTransformableEntity {
private String databaseName;
private String tableName;
private String clusterName;
private boolean isCustomerAttributeUpdated = false;
public HiveTableEntity(AtlasEntity entity) {
super(entity);
this.tableName = (String) entity.getAttribute(NAME_ATTRIBUTE);
String qualifiedName = (String) entity.getAttribute(QUALIFIED_NAME_ATTRIBUTE);
if (qualifiedName != null) {
int databaseSeparatorIdx = qualifiedName.indexOf(DATABASE_DELIMITER);
int clusterSeparatorIdx = qualifiedName.lastIndexOf(CLUSTER_DELIMITER);
this.databaseName = databaseSeparatorIdx != -1 ? qualifiedName.substring(0, databaseSeparatorIdx) : "";
this.clusterName = clusterSeparatorIdx != -1 ? qualifiedName.substring(clusterSeparatorIdx + 1) : "";
} else {
this.databaseName = "";
this.clusterName = "";
}
}
@Override
public Object getAttribute(String attributeName) {
switch (attributeName) {
case HIVE_TABLE_NAME_ATTRIBUTE:
return tableName;
case HIVE_DB_NAME_ATTRIBUTE:
return databaseName;
case HIVE_DB_CLUSTER_NAME_ATTRIBUTE:
return clusterName;
}
return super.getAttribute(attributeName);
}
@Override
public void setAttribute(String attributeName, String attributeValue) {
switch (attributeName) {
case HIVE_TABLE_NAME_ATTRIBUTE:
tableName = attributeValue;
isCustomerAttributeUpdated = true;
break;
case HIVE_DB_NAME_ATTRIBUTE:
databaseName = attributeValue;
isCustomerAttributeUpdated = true;
break;
case HIVE_DB_CLUSTER_NAME_ATTRIBUTE:
clusterName = attributeValue;
isCustomerAttributeUpdated = true;
break;
default:
super.setAttribute(attributeName, attributeValue);
break;
}
}
@Override
public void transformComplete() {
if (isCustomerAttributeUpdated) {
entity.setAttribute(NAME_ATTRIBUTE, tableName);
entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, toQualifiedName());
}
}
private String toQualifiedName() {
return String.format("%s.%s@%s", databaseName, tableName, clusterName);
}
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.entitytransform;
public final class TransformationConstants {
public static final String HDFS_PATH = "hdfs_path";
public static final String HIVE_DATABASE = "hive_db";
public static final String HIVE_TABLE = "hive_table";
public static final String HIVE_COLUMN = "hive_column";
public static final String HIVE_STORAGE_DESCRIPTOR = "hive_storagedesc";
public static final String NAME_ATTRIBUTE = "name";
public static final String QUALIFIED_NAME_ATTRIBUTE = "qualifiedName";
public static final String CLUSTER_NAME_ATTRIBUTE = "clusterName";
public static final String LOCATION_ATTRIBUTE = "location";
public static final String PATH_ATTRIBUTE = "path";
public static final String HIVE_DB_NAME_ATTRIBUTE = "hive_db.name";
public static final String HIVE_DB_CLUSTER_NAME_ATTRIBUTE = "hive_db.clusterName";
public static final String HIVE_TABLE_NAME_ATTRIBUTE = "hive_table.name";
public static final String HIVE_COLUMN_NAME_ATTRIBUTE = "hive_column.name";
public static final String HDFS_PATH_NAME_ATTRIBUTE = "hdfs_path.name";
public static final String HDFS_PATH_PATH_ATTRIBUTE = "hdfs_path.path";
public static final String HDFS_CLUSTER_NAME_ATTRIBUTE = "hdfs_path.clusterName";
public static final char TYPE_NAME_ATTRIBUTE_NAME_SEP = '.';
public static final char CLUSTER_DELIMITER = '@';
public static final char DATABASE_DELIMITER = '.';
public static final char PATH_DELIMITER = '/';
public static final String HIVE_STORAGEDESC_SUFFIX = "_storage";
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.model.impexp;
import org.apache.commons.lang.StringUtils;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown=true)
@XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY)
public class AttributeTransform implements Serializable {
private Map<String, String> conditions;
private Map<String, String> action;
public AttributeTransform() { }
public AttributeTransform(Map<String, String> conditions, Map<String, String> action) {
this.conditions = conditions;
this.action = action;
}
public Map<String, String> getConditions() {
return conditions;
}
public void setConditions(Map<String, String> conditions) {
this.conditions = conditions;
}
public Map<String, String> getAction() {
return action;
}
public void setAction(Map<String, String> action) {
this.action = action;
}
public void addCondition(String attributeName, String conditionValue) {
if (conditions == null) {
conditions = new HashMap<>();
}
if (StringUtils.isNotEmpty(attributeName) && StringUtils.isNotEmpty(conditionValue)) {
conditions.put(attributeName, conditionValue);
}
}
public void addAction(String attributeName, String actionValue) {
if (action == null) {
action = new HashMap<>();
}
if (StringUtils.isNotEmpty(attributeName) && StringUtils.isNotEmpty(actionValue)) {
action.put(attributeName, actionValue);
}
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.entitytransform;
import org.apache.atlas.model.impexp.AttributeTransform;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.commons.lang.StringUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.apache.atlas.entitytransform.TransformationConstants.HDFS_PATH;
public class TransformationHandlerTest {
@Test
public void testHdfsClusterRenameHandler() {
// Rename clusterName from cl1 to cl2
AttributeTransform p1 = new AttributeTransform(Collections.singletonMap("hdfs_path.clusterName", "EQUALS: cl1"),
Collections.singletonMap("hdfs_path.clusterName", "SET: cl2"));
List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(p1));
for (AtlasEntity hdfsPath : getHdfsPathEntities()) {
String qualifiedName = (String) hdfsPath.getAttribute("qualifiedName");
boolean endsWithCl1 = qualifiedName.endsWith("@cl1");
applyTransforms(hdfsPath, handlers);
String transformedValue = (String) hdfsPath.getAttribute("qualifiedName");
if (endsWithCl1) {
Assert.assertTrue(transformedValue.endsWith("@cl2"), transformedValue + ": expected to end with @cl2");
} else {
Assert.assertEquals(qualifiedName, transformedValue, "not expected to change");
}
}
}
@Test
public void testHdfsClusterNameToggleCaseHandler() {
// Change clusterName to Upper case
AttributeTransform p1 = new AttributeTransform(Collections.singletonMap("hdfs_path.clusterName", "EQUALS: cl1"),
Collections.singletonMap("hdfs_path.clusterName", "TO_UPPER:"));
List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(p1));
List<AtlasEntity> hdfsPaths = getHdfsPathEntities();
for (AtlasEntity hdfsPath : hdfsPaths) {
String qualifiedName = (String) hdfsPath.getAttribute("qualifiedName");
boolean endsWithCl1 = qualifiedName.endsWith("@cl1");
applyTransforms(hdfsPath, handlers);
String transformedValue = (String) hdfsPath.getAttribute("qualifiedName");
if (endsWithCl1) {
Assert.assertTrue(transformedValue.endsWith("@CL1"), transformedValue + ": expected to end with @CL1");
} else {
Assert.assertEquals(qualifiedName, transformedValue, "not expected to change");
}
}
// Change clusterName back to lower case
AttributeTransform p2 = new AttributeTransform(Collections.singletonMap("hdfs_path.clusterName", "EQUALS: CL1"),
Collections.singletonMap("hdfs_path.clusterName", "TO_LOWER:"));
handlers = initializeHandlers(Collections.singletonList(p2));
for (AtlasEntity hdfsPath : hdfsPaths) {
String qualifiedName = (String) hdfsPath.getAttribute("qualifiedName");
boolean endsWithCL1 = qualifiedName.endsWith("@CL1");
applyTransforms(hdfsPath, handlers);
String transformedValue = (String) hdfsPath.getAttribute("qualifiedName");
if (endsWithCL1) {
Assert.assertTrue(transformedValue.endsWith("@cl1"), transformedValue + ": expected to end with @cl1");
} else {
Assert.assertEquals(qualifiedName, transformedValue, "not expected to change");
}
}
}
@Test
public void testHdfsPathNameReplacePrefixHandler() {
// Prefix replace hdfs_path name from /aa/bb/ to /xx/yy/
AttributeTransform p1 = new AttributeTransform(Collections.singletonMap("hdfs_path.name", "STARTS_WITH: /aa/bb/"),
Collections.singletonMap("hdfs_path.name", "REPLACE_PREFIX: = :/aa/bb/=/xx/yy/"));
List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(p1));
for (AtlasEntity hdfsPath : getHdfsPathEntities()) {
String name = (String) hdfsPath.getAttribute("name");
boolean startsWith_aa_bb_ = name.startsWith("/aa/bb/");
applyTransforms(hdfsPath, handlers);
String transformedValue = (String) hdfsPath.getAttribute("name");
if (startsWith_aa_bb_) {
Assert.assertTrue(transformedValue.startsWith("/xx/yy/"), transformedValue + ": expected to start with /xx/yy/");
} else {
Assert.assertEquals(name, transformedValue, "not expected to change");
}
}
}
@Test
public void testHiveDatabaseClusterRenameHandler() {
// replace clusterName: from cl1 to cl1_backup
AttributeTransform p1 = new AttributeTransform(Collections.singletonMap("hive_db.clusterName", "EQUALS: cl1"),
Collections.singletonMap("hive_db.clusterName", "SET: cl1_backup"));
List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(p1));
for (AtlasEntity entity : getAllEntities()) {
String qualifiedName = (String) entity.getAttribute("qualifiedName");
boolean isHdfsPath = StringUtils.equals(entity.getTypeName(), HDFS_PATH);
boolean endsWithCl1 = qualifiedName.endsWith("@cl1");
boolean containsCl1 = qualifiedName.contains("@cl1"); // for stroage_desc
applyTransforms(entity, handlers);
String transformedValue = (String) entity.getAttribute("qualifiedName");
if (!isHdfsPath && endsWithCl1) {
Assert.assertTrue(transformedValue.endsWith("@cl1_backup"), transformedValue + ": expected to end with @cl1_backup");
} else if (!isHdfsPath && containsCl1) {
Assert.assertTrue(transformedValue.contains("@cl1_backup"), transformedValue + ": expected to contains @cl1_backup");
} else {
Assert.assertEquals(qualifiedName, transformedValue, "not expected to change");
}
}
}
@Test
public void testHiveDatabaseNameRenameHandler() {
// replace dbName: from hr to hr_backup
AttributeTransform p = new AttributeTransform(Collections.singletonMap("hive_db.name", "EQUALS: hr"),
Collections.singletonMap("hive_db.name", "SET: hr_backup"));
List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(p));
for (AtlasEntity entity : getAllEntities()) {
String qualifiedName = (String) entity.getAttribute("qualifiedName");
boolean startsWithHrDot = qualifiedName.startsWith("hr."); // for tables, columns
boolean startsWithHrAt = qualifiedName.startsWith("hr@"); // for databases
applyTransforms(entity, handlers);
if (startsWithHrDot) {
Assert.assertTrue(((String) entity.getAttribute("qualifiedName")).startsWith("hr_backup."));
} else if (startsWithHrAt) {
Assert.assertTrue(((String) entity.getAttribute("qualifiedName")).startsWith("hr_backup@"));
} else {
Assert.assertEquals(qualifiedName, (String) entity.getAttribute("qualifiedName"), "not expected to change");
}
}
}
@Test
public void testHiveTableNameRenameHandler() {
// replace tableName: from hr.employees to hr.employees_backup
AttributeTransform p = new AttributeTransform();
p.addCondition("hive_db.name", "EQUALS: hr");
p.addCondition("hive_table.name", "EQUALS: employees");
p.addAction("hive_table.name", "SET: employees_backup");
List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(p));
for (AtlasEntity entity : getAllEntities()) {
String qualifiedName = (String) entity.getAttribute("qualifiedName");
boolean startsWithHrEmployeesDot = qualifiedName.startsWith("hr.employees."); // for columns
boolean startsWithHrEmployeesAt = qualifiedName.startsWith("hr.employees@"); // for tables
applyTransforms(entity, handlers);
if (startsWithHrEmployeesDot) {
Assert.assertTrue(((String) entity.getAttribute("qualifiedName")).startsWith("hr.employees_backup."));
} else if (startsWithHrEmployeesAt) {
Assert.assertTrue(((String) entity.getAttribute("qualifiedName")).startsWith("hr.employees_backup@"));
} else {
Assert.assertEquals(qualifiedName, (String) entity.getAttribute("qualifiedName"), "not expected to change");
}
}
}
@Test
public void testHiveColumnNameRenameHandler() {
// replace columnName: from hr.employees.age to hr.employees.age_backup
AttributeTransform p = new AttributeTransform();
p.addCondition("hive_db.name", "EQUALS: hr");
p.addCondition("hive_table.name", "EQUALS: employees");
p.addCondition("hive_column.name", "EQUALS: age");
p.addAction("hive_column.name", "SET: age_backup");
List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(p));
for (AtlasEntity entity : getAllEntities()) {
String qualifiedName = (String) entity.getAttribute("qualifiedName");
boolean startsWithHrEmployeesAgeAt = qualifiedName.startsWith("hr.employees.age@");
applyTransforms(entity, handlers);
if (startsWithHrEmployeesAgeAt) {
Assert.assertTrue(((String) entity.getAttribute("qualifiedName")).startsWith("hr.employees.age_backup@"));
} else {
Assert.assertEquals(qualifiedName, (String) entity.getAttribute("qualifiedName"), "not expected to change");
}
}
}
private List<BaseEntityHandler> initializeHandlers(List<AttributeTransform> params) {
return BaseEntityHandler.createEntityHandlers(params);
}
private void applyTransforms(AtlasEntity entity, List<BaseEntityHandler> handlers) {
for (BaseEntityHandler handler : handlers) {
handler.transform(entity);
}
}
final String[] clusterNames = new String[] { "cl1", "prod" };
final String[] databaseNames = new String[] { "hr", "sales", "engg" };
final String[] tableNames = new String[] { "employees", "products", "invoice" };
final String[] columnNames = new String[] { "name", "age", "dob" };
private List<AtlasEntity> getHdfsPathEntities() {
List<AtlasEntity> ret = new ArrayList<>();
for (String clusterName : clusterNames) {
ret.add(getHdfsPathEntity1(clusterName));
ret.add(getHdfsPathEntity2(clusterName));
}
return ret;
}
private List<AtlasEntity> getAllEntities() {
List<AtlasEntity> ret = new ArrayList<>();
for (String clusterName : clusterNames) {
ret.add(getHdfsPathEntity1(clusterName));
ret.add(getHdfsPathEntity2(clusterName));
for (String databaseName : databaseNames) {
ret.add(getHiveDbEntity(clusterName, databaseName));
for (String tableName : tableNames) {
ret.add(getHiveTableEntity(clusterName, databaseName, tableName));
ret.add(getHiveStorageDescriptorEntity(clusterName, databaseName, tableName));
for (String columnName : columnNames) {
ret.add(getHiveColumnEntity(clusterName, databaseName, tableName, columnName));
}
}
}
}
return ret;
}
private AtlasEntity getHdfsPathEntity1(String clusterName) {
AtlasEntity entity = new AtlasEntity(HDFS_PATH);
entity.setAttribute("name", "/aa/bb/employee");
entity.setAttribute("path", "hdfs://localhost.localdomain:8020/aa/bb/employee");
entity.setAttribute("qualifiedName", "hdfs://localhost.localdomain:8020/aa/bb/employee@" + clusterName);
entity.setAttribute("clusterName", clusterName);
entity.setAttribute("isSymlink", false);
entity.setAttribute("modifiedTime", 0);
entity.setAttribute("isFile", false);
entity.setAttribute("numberOfReplicas", 0);
entity.setAttribute("createTime", 0);
entity.setAttribute("fileSize", 0);
return entity;
}
private AtlasEntity getHdfsPathEntity2(String clusterName) {
AtlasEntity entity = new AtlasEntity(HDFS_PATH);
entity.setAttribute("name", "/cc/dd/employee");
entity.setAttribute("path", "hdfs://localhost.localdomain:8020/cc/dd/employee");
entity.setAttribute("qualifiedName", "hdfs://localhost.localdomain:8020/cc/dd/employee@" + clusterName);
entity.setAttribute("clusterName", clusterName);
entity.setAttribute("isSymlink", false);
entity.setAttribute("modifiedTime", 0);
entity.setAttribute("isFile", false);
entity.setAttribute("numberOfReplicas", 0);
entity.setAttribute("createTime", 0);
entity.setAttribute("fileSize", 0);
return entity;
}
private AtlasEntity getHiveDbEntity(String clusterName, String dbName) {
AtlasEntity entity = new AtlasEntity(TransformationConstants.HIVE_DATABASE);
entity.setAttribute("name", dbName);
entity.setAttribute("qualifiedName", dbName + "@" + clusterName);
entity.setAttribute("location", "hdfs://localhost.localdomain:8020/warehouse/tablespace/managed/hive/" + dbName + ".db");
entity.setAttribute("clusterName", clusterName);
entity.setAttribute("owner", "hive");
entity.setAttribute("ownerType", "USER");
return entity;
}
private AtlasEntity getHiveTableEntity(String clusterName, String dbName, String tableName) {
AtlasEntity entity = new AtlasEntity(TransformationConstants.HIVE_TABLE);
entity.setAttribute("name", tableName);
entity.setAttribute("qualifiedName", dbName + "." + tableName + "@" + clusterName);
entity.setAttribute("owner", "hive");
entity.setAttribute("temporary", false);
entity.setAttribute("lastAccessTime", "1535656355000");
entity.setAttribute("tableType", "EXTERNAL_TABLE");
entity.setAttribute("createTime", "1535656355000");
entity.setAttribute("retention", 0);
return entity;
}
private AtlasEntity getHiveStorageDescriptorEntity(String clusterName, String dbName, String tableName) {
AtlasEntity entity = new AtlasEntity(TransformationConstants.HIVE_STORAGE_DESCRIPTOR);
entity.setAttribute("qualifiedName", dbName + "." + tableName + "@" + clusterName + "_storage");
entity.setAttribute("storedAsSubDirectories", false);
entity.setAttribute("location", "hdfs://localhost.localdomain:8020/warehouse/tablespace/managed/hive/" + dbName + ".db" + "/" + tableName);
entity.setAttribute("compressed", false);
entity.setAttribute("inputFormat", "org.apache.hadoop.mapred.TextInputFormat");
entity.setAttribute("outputFormat", "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat");
entity.setAttribute("numBuckets", -1);
return entity;
}
private AtlasEntity getHiveColumnEntity(String clusterName, String dbName, String tableName, String columnName) {
AtlasEntity entity = new AtlasEntity(TransformationConstants.HIVE_COLUMN);
entity.setAttribute("owner", "hive");
entity.setAttribute("qualifiedName", dbName + "." + tableName + "." + columnName +"@" + clusterName);
entity.setAttribute("name", columnName);
entity.setAttribute("position", 1);
entity.setAttribute("type", "string");
return entity;
}
}
\ No newline at end of file
......@@ -23,7 +23,6 @@ import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.commons.lang.StringUtils;
import scala.Tuple3;
import java.util.ArrayList;
import java.util.List;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment