Commit f103e143 by Madhan Neethiraj

ATLAS-3056: updated rdbms types to remove use of ownedRef/inverseRef

parent 18019733
......@@ -123,6 +123,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.ignore.pattern";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.prune.pattern";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE = "atlas.notification.consumer.preprocess.hive_table.cache.size";
public static final String CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs";
public static final int SERVER_READY_WAIT_TIME_MS = 1000;
......@@ -142,6 +143,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final List<Pattern> hiveTablesToIgnore = new ArrayList<>();
private final List<Pattern> hiveTablesToPrune = new ArrayList<>();
private final Map<String, PreprocessAction> hiveTablesCache;
private final boolean rdbmsTypesRemoveOwnedRefAttrs;
private final boolean preprocessEnabled;
private NotificationInterface notificationInterface;
......@@ -212,7 +214,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
hiveTablesCache = Collections.emptyMap();
}
preprocessEnabled = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633;
rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true);
preprocessEnabled = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633 || rdbmsTypesRemoveOwnedRefAttrs;
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold);
......@@ -799,16 +802,35 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
skipHiveColumnLineage(context);
}
if (rdbmsTypesRemoveOwnedRefAttrs) {
rdbmsTypeRemoveOwnedRefAttrs(context);
}
context.moveRegisteredReferredEntities();
}
private void rdbmsTypeRemoveOwnedRefAttrs(PreprocessorContext context) {
List<AtlasEntity> entities = context.getEntities();
if (entities != null) {
for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) {
AtlasEntity entity = iter.next();
EntityPreprocessor preprocessor = EntityPreprocessor.getRdbmsPreprocessor(entity.getTypeName());
if (preprocessor != null) {
preprocessor.preprocess(entity, context);
}
}
}
}
private void ignoreOrPruneHiveTables(PreprocessorContext context) {
List<AtlasEntity> entities = context.getEntities();
if (entities != null) {
for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) {
AtlasEntity entity = iter.next();
EntityPreprocessor preprocessor = EntityPreprocessor.getPreprocessor(entity.getTypeName());
EntityPreprocessor preprocessor = EntityPreprocessor.getHivePreprocessor(entity.getTypeName());
if (preprocessor != null) {
preprocessor.preprocess(entity, context);
......@@ -824,7 +846,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
if (referredEntities != null) {
for (Iterator<Map.Entry<String, AtlasEntity>> iter = referredEntities.entrySet().iterator(); iter.hasNext(); ) {
AtlasEntity entity = iter.next().getValue();
EntityPreprocessor preprocessor = EntityPreprocessor.getPreprocessor(entity.getTypeName());
EntityPreprocessor preprocessor = EntityPreprocessor.getHivePreprocessor(entity.getTypeName());
if (preprocessor != null) {
preprocessor.preprocess(entity, context);
......
......@@ -32,25 +32,38 @@ public abstract class EntityPreprocessor {
public static final String TYPE_HIVE_PROCESS = "hive_process";
public static final String TYPE_HIVE_STORAGEDESC = "hive_storagedesc";
public static final String TYPE_HIVE_TABLE = "hive_table";
public static final String TYPE_RDBMS_INSTANCE = "rdbms_instance";
public static final String TYPE_RDBMS_DB = "rdbms_db";
public static final String TYPE_RDBMS_TABLE = "rdbms_table";
public static final String TYPE_RDBMS_COLUMN = "rdbms_column";
public static final String TYPE_RDBMS_INDEX = "rdbms_index";
public static final String TYPE_RDBMS_FOREIGN_KEY = "rdbms_foreign_key";
public static final String ATTRIBUTE_COLUMNS = "columns";
public static final String ATTRIBUTE_INPUTS = "inputs";
public static final String ATTRIBUTE_OUTPUTS = "outputs";
public static final String ATTRIBUTE_PARTITION_KEYS = "partitionKeys";
public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
public static final String ATTRIBUTE_NAME = "name";
public static final String ATTRIBUTE_SD = "sd";
public static final String ATTRIBUTE_DB = "db";
public static final String ATTRIBUTE_DATABASES = "databases";
public static final String ATTRIBUTE_TABLES = "tables";
public static final String ATTRIBUTE_INDEXES = "indexes";
public static final String ATTRIBUTE_FOREIGN_KEYS = "foreign_keys";
public static final char QNAME_SEP_CLUSTER_NAME = '@';
public static final char QNAME_SEP_ENTITY_NAME = '.';
public static final String QNAME_SD_SUFFIX = "_storage";
private static final Map<String, EntityPreprocessor> PREPROCESSOR_MAP = new HashMap<>();
private static final Map<String, EntityPreprocessor> HIVE_PREPROCESSOR_MAP = new HashMap<>();
private static final Map<String, EntityPreprocessor> RDBMS_PREPROCESSOR_MAP = new HashMap<>();
private final String typeName;
static {
EntityPreprocessor[] preprocessors = new EntityPreprocessor[] {
EntityPreprocessor[] hivePreprocessors = new EntityPreprocessor[] {
new HivePreprocessor.HiveTablePreprocessor(),
new HivePreprocessor.HiveColumnPreprocessor(),
new HivePreprocessor.HiveProcessPreprocessor(),
......@@ -58,8 +71,18 @@ public abstract class EntityPreprocessor {
new HivePreprocessor.HiveStorageDescPreprocessor()
};
for (EntityPreprocessor preprocessor : preprocessors) {
PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor);
EntityPreprocessor[] rdbmsPreprocessors = new EntityPreprocessor[] {
new RdbmsPreprocessor.RdbmsInstancePreprocessor(),
new RdbmsPreprocessor.RdbmsDbPreprocessor(),
new RdbmsPreprocessor.RdbmsTablePreprocessor()
};
for (EntityPreprocessor preprocessor : hivePreprocessors) {
HIVE_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor);
}
for (EntityPreprocessor preprocessor : rdbmsPreprocessors) {
RDBMS_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor);
}
}
......@@ -74,8 +97,12 @@ public abstract class EntityPreprocessor {
public abstract void preprocess(AtlasEntity entity, PreprocessorContext context);
public static EntityPreprocessor getPreprocessor(String typeName) {
return typeName != null ? PREPROCESSOR_MAP.get(typeName) : null;
public static EntityPreprocessor getHivePreprocessor(String typeName) {
return typeName != null ? HIVE_PREPROCESSOR_MAP.get(typeName) : null;
}
public static EntityPreprocessor getRdbmsPreprocessor(String typeName) {
return typeName != null ? RDBMS_PREPROCESSOR_MAP.get(typeName) : null;
}
public static String getQualifiedName(AtlasEntity entity) {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.preprocessor;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class RdbmsPreprocessor {
private static final Logger LOG = LoggerFactory.getLogger(RdbmsPreprocessor.class);
static class RdbmsInstancePreprocessor extends RdbmsTypePreprocessor {
public RdbmsInstancePreprocessor() {
super(TYPE_RDBMS_INSTANCE);
}
}
static class RdbmsDbPreprocessor extends RdbmsTypePreprocessor {
public RdbmsDbPreprocessor() {
super(TYPE_RDBMS_DB);
}
}
static class RdbmsTablePreprocessor extends RdbmsTypePreprocessor {
public RdbmsTablePreprocessor() {
super(TYPE_RDBMS_TABLE);
}
@Override
public void preprocess(AtlasEntity entity, PreprocessorContext context) {
super.preprocess(entity, context);
// try auto-fix when 'db' attribute is not present in relationshipAttribute & attributes
Object db = entity.getRelationshipAttribute(ATTRIBUTE_DB);
if (db == null) {
db = entity.getAttribute(ATTRIBUTE_DB);
}
if (db == null) {
String dbQualifiedName = getDbQualifiedName(entity);
if (dbQualifiedName != null) {
AtlasObjectId dbId = new AtlasObjectId(TYPE_RDBMS_DB, Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, dbQualifiedName));
LOG.info("missing attribute {}.{} is set to {}", TYPE_RDBMS_TABLE, ATTRIBUTE_DB, dbId);
entity.setRelationshipAttribute(ATTRIBUTE_DB, dbId);
}
}
}
private String getDbQualifiedName(AtlasEntity tableEntity) {
String ret = null;
Object tblQualifiedName = tableEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME); // dbName.tblName@clusterName
Object tblName = tableEntity.getAttribute(ATTRIBUTE_NAME); // tblName
if (tblQualifiedName != null && tblName != null) {
ret = tblQualifiedName.toString().replace("." + tblName.toString() + "@", "@"); // dbName@clusterName
}
return ret;
}
}
static class RdbmsTypePreprocessor extends EntityPreprocessor {
private static final Set<String> entityTypesToMove = new HashSet<>();
static {
entityTypesToMove.add(TYPE_RDBMS_DB);
entityTypesToMove.add(TYPE_RDBMS_TABLE);
entityTypesToMove.add(TYPE_RDBMS_COLUMN);
entityTypesToMove.add(TYPE_RDBMS_INDEX);
entityTypesToMove.add(TYPE_RDBMS_FOREIGN_KEY);
}
protected RdbmsTypePreprocessor(String typeName) {
super(typeName);
}
@Override
public void preprocess(AtlasEntity entity, PreprocessorContext context) {
clearRefAttributes(entity, context);
Map<String, AtlasEntity> referredEntities = context.getReferredEntities();
if (MapUtils.isNotEmpty(referredEntities)) {
for (AtlasEntity referredEntity : referredEntities.values()) {
if (entityTypesToMove.contains(referredEntity.getTypeName())) {
clearRefAttributes(referredEntity, context);
context.addToReferredEntitiesToMove(referredEntity.getGuid());
}
}
}
}
private void clearRefAttributes(AtlasEntity entity, PreprocessorContext context) {
switch (entity.getTypeName()) {
case TYPE_RDBMS_INSTANCE:
entity.removeAttribute(ATTRIBUTE_DATABASES);
break;
case TYPE_RDBMS_DB:
entity.removeAttribute(ATTRIBUTE_TABLES);
break;
case TYPE_RDBMS_TABLE:
entity.removeAttribute(ATTRIBUTE_COLUMNS);
entity.removeAttribute(ATTRIBUTE_INDEXES);
entity.removeAttribute(ATTRIBUTE_FOREIGN_KEYS);
break;
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment