Commit 1123f512 by Madhan Neethiraj Committed by Sarath Subramanian

ATLAS-3006: Option to ignore/prune metadata for temporary/staging Hive tables

parent 40038589
......@@ -19,6 +19,7 @@
package org.apache.atlas.hive.hook;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.hive.hook.HiveHook.PreprocessAction;
import org.apache.atlas.hive.hook.HiveHook.HiveHookObjectNamesCache;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.RandomStringUtils;
......@@ -97,6 +98,10 @@ public class AtlasHiveHookContext {
return hook.getSkipHiveColumnLineageHive20633InputsThreshold();
}
public PreprocessAction getPreprocessActionForHiveTable(String qualifiedName) {
return hook.getPreprocessActionForHiveTable(qualifiedName);
}
public String getQualifiedName(Database db) {
return (db.getName() + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
}
......
......@@ -21,6 +21,8 @@ package org.apache.atlas.hive.hook;
import org.apache.atlas.hive.hook.events.*;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.utils.LruCache;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.HookContext;
......@@ -30,12 +32,15 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import static org.apache.atlas.hive.hook.events.BaseHiveEvent.ATTRIBUTE_QUALIFIED_NAME;
import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_DB;
......@@ -45,6 +50,8 @@ import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_TABLE;
public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class);
public enum PreprocessAction { NONE, IGNORE, PRUNE }
public static final String CONF_PREFIX = "atlas.hook.hive.";
public static final String CONF_CLUSTER_NAME = "atlas.cluster.name";
public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase";
......@@ -54,6 +61,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
public static final String HOOK_NAME_CACHE_REBUID_INTERVAL_SEC = CONF_PREFIX + "name.cache.rebuild.interval.seconds";
public static final String HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633 = CONF_PREFIX + "skip.hive_column_lineage.hive-20633";
public static final String HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = CONF_PREFIX + "skip.hive_column_lineage.hive-20633.inputs.threshold";
public static final String HOOK_HIVE_TABLE_IGNORE_PATTERN = CONF_PREFIX + "hive_table.ignore.pattern";
public static final String HOOK_HIVE_TABLE_PRUNE_PATTERN = CONF_PREFIX + "hive_table.prune.pattern";
public static final String HOOK_HIVE_TABLE_CACHE_SIZE = CONF_PREFIX + "hive_table.cache.size";
public static final String DEFAULT_CLUSTER_NAME = "primary";
......@@ -66,8 +76,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private static final int nameCacheTableMaxCount;
private static final int nameCacheRebuildIntervalSeconds;
private static final boolean skipHiveColumnLineageHive20633;
private static final int skipHiveColumnLineageHive20633InputsThreshold;
private static final boolean skipHiveColumnLineageHive20633;
private static final int skipHiveColumnLineageHive20633InputsThreshold;
private static final List<Pattern> hiveTablesToIgnore = new ArrayList<>();
private static final List<Pattern> hiveTablesToPrune = new ArrayList<>();
private static final Map<String, PreprocessAction> hiveTablesCache;
private static HiveHookObjectNamesCache knownObjects = null;
......@@ -85,6 +98,41 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
skipHiveColumnLineageHive20633 = atlasProperties.getBoolean(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
skipHiveColumnLineageHive20633InputsThreshold = atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15
String[] patternHiveTablesToIgnore = atlasProperties.getStringArray(HOOK_HIVE_TABLE_IGNORE_PATTERN);
String[] patternHiveTablesToPrune = atlasProperties.getStringArray(HOOK_HIVE_TABLE_PRUNE_PATTERN);
if (patternHiveTablesToIgnore != null) {
for (String pattern : patternHiveTablesToIgnore) {
try {
hiveTablesToIgnore.add(Pattern.compile(pattern));
LOG.info("{}={}", HOOK_HIVE_TABLE_IGNORE_PATTERN, pattern);
} catch (Throwable t) {
LOG.warn("failed to compile pattern {}", pattern, t);
LOG.warn("Ignoring invalid pattern in configuration {}: {}", HOOK_HIVE_TABLE_IGNORE_PATTERN, pattern);
}
}
}
if (patternHiveTablesToPrune != null) {
for (String pattern : patternHiveTablesToPrune) {
try {
hiveTablesToPrune.add(Pattern.compile(pattern));
LOG.info("{}={}", HOOK_HIVE_TABLE_PRUNE_PATTERN, pattern);
} catch (Throwable t) {
LOG.warn("failed to compile pattern {}", pattern, t);
LOG.warn("Ignoring invalid pattern in configuration {}: {}", HOOK_HIVE_TABLE_PRUNE_PATTERN, pattern);
}
}
}
if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty()) {
hiveTablesCache = new LruCache<>(atlasProperties.getInt(HOOK_HIVE_TABLE_CACHE_SIZE, 10000), 0);
} else {
hiveTablesCache = Collections.emptyMap();
}
knownObjects = nameCacheEnabled ? new HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, nameCacheRebuildIntervalSeconds) : null;
}
......@@ -204,6 +252,42 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return skipHiveColumnLineageHive20633InputsThreshold;
}
public PreprocessAction getPreprocessActionForHiveTable(String qualifiedName) {
PreprocessAction ret = PreprocessAction.NONE;
if (qualifiedName != null && (CollectionUtils.isNotEmpty(hiveTablesToIgnore) || CollectionUtils.isNotEmpty(hiveTablesToPrune))) {
ret = hiveTablesCache.get(qualifiedName);
if (ret == null) {
if (isMatch(qualifiedName, hiveTablesToIgnore)) {
ret = PreprocessAction.IGNORE;
} else if (isMatch(qualifiedName, hiveTablesToPrune)) {
ret = PreprocessAction.PRUNE;
} else {
ret = PreprocessAction.NONE;
}
hiveTablesCache.put(qualifiedName, ret);
}
}
return ret;
}
private boolean isMatch(String name, List<Pattern> patterns) {
boolean ret = false;
for (Pattern p : patterns) {
if (p.matcher(name).matches()) {
ret = true;
break;
}
}
return ret;
}
public static class HiveHookObjectNamesCache {
private final int dbMaxCacheCount;
......
......@@ -85,13 +85,16 @@ public class AlterTableRename extends BaseHiveEvent {
return ret;
}
AtlasEntityWithExtInfo oldTableEntity = toTableEntity(oldTable);
AtlasEntityWithExtInfo oldTableEntity = toTableEntity(oldTable);
AtlasEntityWithExtInfo renamedTableEntity = toTableEntity(newTable);
if (oldTableEntity == null || renamedTableEntity == null) {
return ret;
}
// first update with oldTable info, so that the table will be created if it is not present in Atlas
ret.add(new EntityUpdateRequestV2(getUserName(), new AtlasEntitiesWithExtInfo(oldTableEntity)));
AtlasEntityWithExtInfo renamedTableEntity = toTableEntity(newTable);
// update qualifiedName for all columns, partitionKeys, storageDesc
String renamedTableQualifiedName = (String) renamedTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME);
......
......@@ -19,6 +19,7 @@
package org.apache.atlas.hive.hook.events;
import org.apache.atlas.hive.hook.AtlasHiveHookContext;
import org.apache.atlas.hive.hook.HiveHook.PreprocessAction;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
......@@ -307,7 +308,11 @@ public abstract class BaseHiveEvent {
AtlasEntity entity = toTableEntity(table, ret);
ret.setEntity(entity);
if (entity != null) {
ret.setEntity(entity);
} else {
ret = null;
}
return ret;
}
......@@ -315,7 +320,9 @@ public abstract class BaseHiveEvent {
protected AtlasEntity toTableEntity(Table table, AtlasEntitiesWithExtInfo entities) throws Exception {
AtlasEntity ret = toTableEntity(table, (AtlasEntityExtInfo) entities);
entities.addEntity(ret);
if (ret != null) {
entities.addEntity(ret);
}
return ret;
}
......@@ -341,64 +348,76 @@ public abstract class BaseHiveEvent {
AtlasEntity ret = context.getEntity(tblQualifiedName);
if (ret == null) {
ret = new AtlasEntity(HIVE_TYPE_TABLE);
PreprocessAction action = context.getPreprocessActionForHiveTable(tblQualifiedName);
// if this table was sent in an earlier notification, set 'guid' to null - which will:
// - result in this entity to be not included in 'referredEntities'
// - cause Atlas server to resolve the entity by its qualifiedName
if (isKnownTable && !isAlterTableOperation()) {
ret.setGuid(null);
}
if (action == PreprocessAction.IGNORE) {
LOG.info("ignoring table {}", tblQualifiedName);
} else {
ret = new AtlasEntity(HIVE_TYPE_TABLE);
long createTime = getTableCreateTime(table);
long lastAccessTime = table.getLastAccessTime() > 0 ? (table.getLastAccessTime() * MILLIS_CONVERT_FACTOR) : createTime;
ret.setAttribute(ATTRIBUTE_DB, dbId);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tblQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, table.getTableName().toLowerCase());
ret.setAttribute(ATTRIBUTE_OWNER, table.getOwner());
ret.setAttribute(ATTRIBUTE_CREATE_TIME, createTime);
ret.setAttribute(ATTRIBUTE_LAST_ACCESS_TIME, lastAccessTime);
ret.setAttribute(ATTRIBUTE_RETENTION, table.getRetention());
ret.setAttribute(ATTRIBUTE_PARAMETERS, table.getParameters());
ret.setAttribute(ATTRIBUTE_COMMENT, table.getParameters().get(ATTRIBUTE_COMMENT));
ret.setAttribute(ATTRIBUTE_TABLE_TYPE, table.getTableType().name());
ret.setAttribute(ATTRIBUTE_TEMPORARY, table.isTemporary());
if (table.getViewOriginalText() != null) {
ret.setAttribute(ATTRIBUTE_VIEW_ORIGINAL_TEXT, table.getViewOriginalText());
}
// if this table was sent in an earlier notification, set 'guid' to null - which will:
// - result in this entity to be not included in 'referredEntities'
// - cause Atlas server to resolve the entity by its qualifiedName
if (isKnownTable && !isAlterTableOperation()) {
ret.setGuid(null);
}
if (table.getViewExpandedText() != null) {
ret.setAttribute(ATTRIBUTE_VIEW_EXPANDED_TEXT, table.getViewExpandedText());
}
long createTime = getTableCreateTime(table);
long lastAccessTime = table.getLastAccessTime() > 0 ? (table.getLastAccessTime() * MILLIS_CONVERT_FACTOR) : createTime;
ret.setAttribute(ATTRIBUTE_DB, dbId);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tblQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, table.getTableName().toLowerCase());
ret.setAttribute(ATTRIBUTE_OWNER, table.getOwner());
ret.setAttribute(ATTRIBUTE_CREATE_TIME, createTime);
ret.setAttribute(ATTRIBUTE_LAST_ACCESS_TIME, lastAccessTime);
ret.setAttribute(ATTRIBUTE_RETENTION, table.getRetention());
ret.setAttribute(ATTRIBUTE_PARAMETERS, table.getParameters());
ret.setAttribute(ATTRIBUTE_COMMENT, table.getParameters().get(ATTRIBUTE_COMMENT));
ret.setAttribute(ATTRIBUTE_TABLE_TYPE, table.getTableType().name());
ret.setAttribute(ATTRIBUTE_TEMPORARY, table.isTemporary());
if (table.getViewOriginalText() != null) {
ret.setAttribute(ATTRIBUTE_VIEW_ORIGINAL_TEXT, table.getViewOriginalText());
}
AtlasObjectId tableId = getObjectId(ret);
AtlasEntity sd = getStorageDescEntity(tableId, table);
List<AtlasEntity> partitionKeys = getColumnEntities(tableId, table, table.getPartitionKeys());
List<AtlasEntity> columns = getColumnEntities(tableId, table, table.getCols());
if (table.getViewExpandedText() != null) {
ret.setAttribute(ATTRIBUTE_VIEW_EXPANDED_TEXT, table.getViewExpandedText());
}
if (entityExtInfo != null) {
entityExtInfo.addReferredEntity(sd);
boolean pruneTable = table.isTemporary() || action == PreprocessAction.PRUNE;
if (partitionKeys != null) {
for (AtlasEntity partitionKey : partitionKeys) {
entityExtInfo.addReferredEntity(partitionKey);
}
}
if (pruneTable) {
LOG.info("ignoring details of table {}", tblQualifiedName);
} else {
AtlasObjectId tableId = getObjectId(ret);
AtlasEntity sd = getStorageDescEntity(tableId, table);
List<AtlasEntity> partitionKeys = getColumnEntities(tableId, table, table.getPartitionKeys());
List<AtlasEntity> columns = getColumnEntities(tableId, table, table.getCols());
if (entityExtInfo != null) {
entityExtInfo.addReferredEntity(sd);
if (partitionKeys != null) {
for (AtlasEntity partitionKey : partitionKeys) {
entityExtInfo.addReferredEntity(partitionKey);
}
}
if (columns != null) {
for (AtlasEntity column : columns) {
entityExtInfo.addReferredEntity(column);
if (columns != null) {
for (AtlasEntity column : columns) {
entityExtInfo.addReferredEntity(column);
}
}
}
}
}
ret.setAttribute(ATTRIBUTE_STORAGEDESC, getObjectId(sd));
ret.setAttribute(ATTRIBUTE_PARTITION_KEYS, getObjectIds(partitionKeys));
ret.setAttribute(ATTRIBUTE_COLUMNS, getObjectIds(columns));
ret.setAttribute(ATTRIBUTE_STORAGEDESC, getObjectId(sd));
ret.setAttribute(ATTRIBUTE_PARTITION_KEYS, getObjectIds(partitionKeys));
ret.setAttribute(ATTRIBUTE_COLUMNS, getObjectIds(columns));
}
context.putEntity(tblQualifiedName, ret);
context.putEntity(tblQualifiedName, ret);
}
}
return ret;
......
......@@ -81,28 +81,30 @@ public class CreateTable extends BaseHiveEvent {
if (table != null) {
AtlasEntity tblEntity = toTableEntity(table, ret);
if (isHBaseStore(table)) {
// This create lineage to HBase table in case of Hive on HBase
AtlasEntity hbaseTableEntity = toReferencedHBaseTable(table, ret);
if (tblEntity != null) {
if (isHBaseStore(table)) {
// This create lineage to HBase table in case of Hive on HBase
AtlasEntity hbaseTableEntity = toReferencedHBaseTable(table, ret);
if (hbaseTableEntity != null) {
final AtlasEntity processEntity;
if (hbaseTableEntity != null) {
final AtlasEntity processEntity;
if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
processEntity = getHiveProcessEntity(Collections.singletonList(hbaseTableEntity), Collections.singletonList(tblEntity));
} else {
processEntity = getHiveProcessEntity(Collections.singletonList(tblEntity), Collections.singletonList(hbaseTableEntity));
}
if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
processEntity = getHiveProcessEntity(Collections.singletonList(hbaseTableEntity), Collections.singletonList(tblEntity));
} else {
processEntity = getHiveProcessEntity(Collections.singletonList(tblEntity), Collections.singletonList(hbaseTableEntity));
}
ret.addEntity(processEntity);
}
} else {
if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
AtlasEntity hdfsPathEntity = getPathEntity(table.getDataLocation(), ret);
AtlasEntity processEntity = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), Collections.singletonList(tblEntity));
ret.addEntity(processEntity);
}
} else {
if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
AtlasEntity hdfsPathEntity = getPathEntity(table.getDataLocation(), ret);
AtlasEntity processEntity = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), Collections.singletonList(tblEntity));
ret.addEntity(processEntity);
ret.addReferredEntity(hdfsPathEntity);
ret.addEntity(processEntity);
ret.addReferredEntity(hdfsPathEntity);
}
}
}
}
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.preprocessor;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
public abstract class EntityPreprocessor {
public static final String TYPE_HIVE_COLUMN = "hive_column";
public static final String TYPE_HIVE_COLUMN_LINEAGE = "hive_column_lineage";
public static final String TYPE_HIVE_PROCESS = "hive_process";
public static final String TYPE_HIVE_STORAGEDESC = "hive_storagedesc";
public static final String TYPE_HIVE_TABLE = "hive_table";
public static final String ATTRIBUTE_COLUMNS = "columns";
public static final String ATTRIBUTE_INPUTS = "inputs";
public static final String ATTRIBUTE_OUTPUTS = "outputs";
public static final String ATTRIBUTE_PARTITION_KEYS = "partitionKeys";
public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
public static final String ATTRIBUTE_SD = "sd";
public static final char QNAME_SEP_CLUSTER_NAME = '@';
public static final char QNAME_SEP_ENTITY_NAME = '.';
public static final String QNAME_SD_SUFFIX = "_storage";
private static final Map<String, EntityPreprocessor> PREPROCESSOR_MAP = new HashMap<>();
private final String typeName;
static {
EntityPreprocessor[] preprocessors = new EntityPreprocessor[] {
new HivePreprocessor.HiveTablePreprocessor(),
new HivePreprocessor.HiveColumnPreprocessor(),
new HivePreprocessor.HiveProcessPreprocessor(),
new HivePreprocessor.HiveColumnLineageProcessPreprocessor(),
new HivePreprocessor.HiveStorageDescPreprocessor()
};
for (EntityPreprocessor preprocessor : preprocessors) {
PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor);
}
}
protected EntityPreprocessor(String typeName) {
this.typeName = typeName;
}
public String getTypeName() {
return typeName;
}
public abstract void preprocess(AtlasEntity entity, PreprocessorContext context);
public static EntityPreprocessor getPreprocessor(String typeName) {
return typeName != null ? PREPROCESSOR_MAP.get(typeName) : null;
}
public static String getQualifiedName(AtlasEntity entity) {
Object obj = entity != null ? entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME) : null;
return obj != null ? obj.toString() : null;
}
public String getTypeName(Object obj) {
Object ret = null;
if (obj instanceof AtlasObjectId) {
ret = ((AtlasObjectId) obj).getTypeName();
} else if (obj instanceof Map) {
ret = ((Map) obj).get(AtlasObjectId.KEY_TYPENAME);
} else if (obj instanceof AtlasEntity) {
ret = ((AtlasEntity) obj).getTypeName();
} else if (obj instanceof AtlasEntityWithExtInfo) {
ret = ((AtlasEntityWithExtInfo) obj).getEntity().getTypeName();
}
return ret != null ? ret.toString() : null;
}
public String getQualifiedName(Object obj) {
Map<String, Object> attributes = null;
if (obj instanceof AtlasObjectId) {
attributes = ((AtlasObjectId) obj).getUniqueAttributes();
} else if (obj instanceof Map) {
attributes = (Map) ((Map) obj).get(AtlasObjectId.KEY_UNIQUE_ATTRIBUTES);
} else if (obj instanceof AtlasEntity) {
attributes = ((AtlasEntity) obj).getAttributes();
} else if (obj instanceof AtlasEntityWithExtInfo) {
attributes = ((AtlasEntityWithExtInfo) obj).getEntity().getAttributes();
}
Object ret = attributes != null ? attributes.get(ATTRIBUTE_QUALIFIED_NAME) : null;
return ret != null ? ret.toString() : null;
}
protected boolean isEmpty(Object obj) {
return obj == null || ((obj instanceof Collection) && ((Collection) obj).isEmpty());
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.preprocessor;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
public class PreprocessorContext {
private static final Logger LOG = LoggerFactory.getLogger(PreprocessorContext.class);
public enum PreprocessAction { NONE, IGNORE, PRUNE }
private final AtlasKafkaMessage<HookNotification> kafkaMessage;
private final AtlasEntitiesWithExtInfo entitiesWithExtInfo;
private final List<Pattern> hiveTablesToIgnore;
private final List<Pattern> hiveTablesToPrune;
private final Map<String, PreprocessAction> hiveTablesCache;
private final Set<String> ignoredEntities = new HashSet<>();
private final Set<String> prunedEntities = new HashSet<>();
private final Set<String> referredEntitiesToMove = new HashSet<>();
public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache) {
this.kafkaMessage = kafkaMessage;
this.hiveTablesToIgnore = hiveTablesToIgnore;
this.hiveTablesToPrune = hiveTablesToPrune;
this.hiveTablesCache = hiveTablesCache;
final HookNotification message = kafkaMessage.getMessage();
switch (message.getType()) {
case ENTITY_CREATE_V2:
entitiesWithExtInfo = ((HookNotification.EntityCreateRequestV2) message).getEntities();
break;
case ENTITY_FULL_UPDATE_V2:
entitiesWithExtInfo = ((HookNotification.EntityUpdateRequestV2) message).getEntities();
break;
default:
entitiesWithExtInfo = null;
break;
}
}
public AtlasKafkaMessage<HookNotification> getKafkaMessage() {
return kafkaMessage;
}
public long getKafkaMessageOffset() {
return kafkaMessage.getOffset();
}
public int getKafkaPartition() {
return kafkaMessage.getPartition();
}
public List<AtlasEntity> getEntities() {
return entitiesWithExtInfo != null ? entitiesWithExtInfo.getEntities() : null;
}
public Map<String, AtlasEntity> getReferredEntities() {
return entitiesWithExtInfo != null ? entitiesWithExtInfo.getReferredEntities() : null;
}
public AtlasEntity getEntity(String guid) {
return entitiesWithExtInfo != null && guid != null ? entitiesWithExtInfo.getEntity(guid) : null;
}
public Set<String> getIgnoredEntities() { return ignoredEntities; }
public Set<String> getPrunedEntities() { return prunedEntities; }
public Set<String> getReferredEntitiesToMove() { return referredEntitiesToMove; }
public PreprocessAction getPreprocessActionForHiveTable(String qualifiedName) {
PreprocessAction ret = PreprocessAction.NONE;
if (qualifiedName != null && (CollectionUtils.isNotEmpty(hiveTablesToIgnore) || CollectionUtils.isNotEmpty(hiveTablesToPrune))) {
ret = hiveTablesCache.get(qualifiedName);
if (ret == null) {
if (isMatch(qualifiedName, hiveTablesToIgnore)) {
ret = PreprocessAction.IGNORE;
} else if (isMatch(qualifiedName, hiveTablesToPrune)) {
ret = PreprocessAction.PRUNE;
} else {
ret = PreprocessAction.NONE;
}
hiveTablesCache.put(qualifiedName, ret);
}
}
return ret;
}
public boolean isIgnoredEntity(String guid) {
return guid != null ? ignoredEntities.contains(guid) : false;
}
public boolean isPrunedEntity(String guid) {
return guid != null ? prunedEntities.contains(guid) : false;
}
public void addToIgnoredEntities(AtlasEntity entity) {
if (!ignoredEntities.contains(entity.getGuid())) {
ignoredEntities.add(entity.getGuid());
LOG.info("ignored entity: typeName={}, qualifiedName={}. topic-offset={}, partition={}", entity.getTypeName(), EntityPreprocessor.getQualifiedName(entity), getKafkaMessageOffset(), getKafkaPartition());
}
}
public void addToPrunedEntities(AtlasEntity entity) {
if (!prunedEntities.contains(entity.getGuid())) {
prunedEntities.add(entity.getGuid());
LOG.info("pruned entity: typeName={}, qualifiedName={} topic-offset={}, partition={}", entity.getTypeName(), EntityPreprocessor.getQualifiedName(entity), getKafkaMessageOffset(), getKafkaPartition());
}
}
public void addToIgnoredEntities(String guid) {
if (guid != null) {
ignoredEntities.add(guid);
}
}
public void addToPrunedEntities(String guid) {
if (guid != null) {
prunedEntities.add(guid);
}
}
public void addToReferredEntitiesToMove(String guid) {
if (guid != null) {
referredEntitiesToMove.add(guid);
}
}
public void addToIgnoredEntities(Object obj) {
collectGuids(obj, ignoredEntities);
}
public void addToPrunedEntities(Object obj) {
collectGuids(obj, prunedEntities);
}
public String getGuid(Object obj) {
Object ret = null;
if (obj instanceof AtlasObjectId) {
ret = ((AtlasObjectId) obj).getGuid();
} else if (obj instanceof Map) {
ret = ((Map) obj).get(AtlasObjectId.KEY_GUID);
} else if (obj instanceof AtlasEntity) {
ret = ((AtlasEntity) obj).getGuid();
} else if (obj instanceof AtlasEntity.AtlasEntityWithExtInfo) {
ret = ((AtlasEntity.AtlasEntityWithExtInfo) obj).getEntity().getGuid();
}
return ret != null ? ret.toString() : null;
}
private boolean isMatch(String name, List<Pattern> patterns) {
boolean ret = false;
for (Pattern p : patterns) {
if (p.matcher(name).matches()) {
ret = true;
break;
}
}
return ret;
}
private void collectGuids(Object obj, Set<String> guids) {
if (obj != null) {
if (obj instanceof Collection) {
Collection objList = (Collection) obj;
for (Object objElem : objList) {
collectGuid(objElem, guids);
}
} else {
collectGuid(obj, guids);
}
}
}
private void collectGuid(Object obj, Set<String> guids) {
String guid = getGuid(obj);
if (guid != null) {
guids.add(guid);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment