Commit 23eacbaf by Madhan Neethiraj

ATLAS-3333: updated notification pre-process with an option to ignore dummy Hive database/table

parent c1f95d17
......@@ -71,7 +71,6 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.DependsOn;
......@@ -116,6 +115,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private static final String ATTRIBUTE_INPUTS = "inputs";
private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
// from org.apache.hadoop.hive.ql.parse.SemanticAnalyzer
public static final String DUMMY_DATABASE = "_dummy_database";
public static final String DUMMY_TABLE = "_dummy_table";
public static final String VALUES_TMP_TABLE_NAME_PREFIX = "Values__Tmp__Table__";
private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName();
public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
......@@ -133,6 +137,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.ignore.pattern";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.prune.pattern";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE = "atlas.notification.consumer.preprocess.hive_table.cache.size";
public static final String CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_ENABLED = "atlas.notification.consumer.preprocess.hive_db.ignore.dummy.enabled";
public static final String CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_NAMES = "atlas.notification.consumer.preprocess.hive_db.ignore.dummy.names";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_ENABLED = "atlas.notification.consumer.preprocess.hive_table.ignore.dummy.enabled";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_NAMES = "atlas.notification.consumer.preprocess.hive_table.ignore.dummy.names";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES_ENABLED = "atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes.enabled";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES = "atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes";
public static final String CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.hive_types.remove.ownedref.attrs";
public static final String CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs";
......@@ -154,6 +164,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final boolean consumerDisabled;
private final List<Pattern> hiveTablesToIgnore = new ArrayList<>();
private final List<Pattern> hiveTablesToPrune = new ArrayList<>();
private final List<String> hiveDummyDatabasesToIgnore;
private final List<String> hiveDummyTablesToIgnore;
private final List<String> hiveTablePrefixesToIgnore;
private final Map<String, PreprocessAction> hiveTablesCache;
private final boolean hiveTypesRemoveOwnedRefAttrs;
private final boolean rdbmsTypesRemoveOwnedRefAttrs;
......@@ -228,9 +241,47 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
hiveTablesCache = Collections.emptyMap();
}
boolean hiveDbIgnoreDummyEnabled = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_ENABLED, true);
boolean hiveTableIgnoreDummyEnabled = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_ENABLED, true);
boolean hiveTableIgnoreNamePrefixEnabled = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES_ENABLED, true);
LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_ENABLED, hiveDbIgnoreDummyEnabled);
LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_ENABLED, hiveTableIgnoreDummyEnabled);
LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES_ENABLED, hiveTableIgnoreNamePrefixEnabled);
if (hiveDbIgnoreDummyEnabled) {
String[] dummyDatabaseNames = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_NAMES);
hiveDummyDatabasesToIgnore = trimAndPurge(dummyDatabaseNames, DUMMY_DATABASE);
LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_NAMES, StringUtils.join(hiveDummyDatabasesToIgnore, ','));
} else {
hiveDummyDatabasesToIgnore = Collections.emptyList();
}
if (hiveTableIgnoreDummyEnabled) {
String[] dummyTableNames = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_NAMES);
hiveDummyTablesToIgnore = trimAndPurge(dummyTableNames, DUMMY_TABLE);
LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_NAMES, StringUtils.join(hiveDummyTablesToIgnore, ','));
} else {
hiveDummyTablesToIgnore = Collections.emptyList();
}
if (hiveTableIgnoreNamePrefixEnabled) {
String[] ignoreNamePrefixes = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES);
hiveTablePrefixesToIgnore = trimAndPurge(ignoreNamePrefixes, VALUES_TMP_TABLE_NAME_PREFIX);
LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES, StringUtils.join(hiveTablePrefixesToIgnore, ','));
} else {
hiveTablePrefixesToIgnore = Collections.emptyList();
}
hiveTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, true);
rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true);
preprocessEnabled = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633 || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs;
preprocessEnabled = skipHiveColumnLineageHive20633 || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty();
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold);
......@@ -366,6 +417,26 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
}
private List<String> trimAndPurge(String[] values, String defaultValue) {
final List<String> ret;
if (values != null && values.length > 0) {
ret = new ArrayList<>(values.length);
for (String val : values) {
if (StringUtils.isNotBlank(val)) {
ret.add(val.trim());
}
}
} else if (StringUtils.isNotBlank(defaultValue)) {
ret = Collections.singletonList(defaultValue.trim());
} else {
ret = Collections.emptyList();
}
return ret;
}
static class AdaptiveWaiter {
private final long increment;
private final long maxDuration;
......@@ -843,7 +914,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
PreprocessorContext context = null;
if (preprocessEnabled) {
context = new PreprocessorContext(kafkaMsg, typeRegistry, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs);
context = new PreprocessorContext(kafkaMsg, typeRegistry, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs);
if (context.isHivePreprocessEnabled()) {
preprocessHiveTypes(context);
......@@ -878,7 +949,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
if (entities.size() - count > 0) {
LOG.info("moved {} hive_process/hive_column_lineage entities to end of list (listSize={})", entities.size() - count, entities.size());
LOG.info("preprocess: moved {} hive_process/hive_column_lineage entities to end of list (listSize={}). topic-offset={}, partition={}", entities.size() - count, entities.size(), kafkaMsg.getOffset(), kafkaMsg.getPartition());
}
}
}
......
......@@ -31,6 +31,7 @@ public abstract class EntityPreprocessor {
public static final String TYPE_HIVE_COLUMN_LINEAGE = "hive_column_lineage";
public static final String TYPE_HIVE_PROCESS = "hive_process";
public static final String TYPE_HIVE_STORAGEDESC = "hive_storagedesc";
public static final String TYPE_HIVE_DB = "hive_db";
public static final String TYPE_HIVE_TABLE = "hive_table";
public static final String TYPE_RDBMS_INSTANCE = "rdbms_instance";
public static final String TYPE_RDBMS_DB = "rdbms_db";
......@@ -67,6 +68,7 @@ public abstract class EntityPreprocessor {
static {
EntityPreprocessor[] hivePreprocessors = new EntityPreprocessor[] {
new HivePreprocessor.HiveDbPreprocessor(),
new HivePreprocessor.HiveTablePreprocessor(),
new HivePreprocessor.HiveColumnPreprocessor(),
new HivePreprocessor.HiveProcessPreprocessor(),
......@@ -114,6 +116,12 @@ public abstract class EntityPreprocessor {
return obj != null ? obj.toString() : null;
}
public static String getName(AtlasEntity entity) {
Object obj = entity != null ? entity.getAttribute(ATTRIBUTE_NAME) : null;
return obj != null ? obj.toString() : null;
}
public String getTypeName(Object obj) {
Object ret = null;
......
......@@ -34,6 +34,23 @@ public class HivePreprocessor {
private static final String RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS = "hive_table_partitionkeys";
private static final String RELATIONSHIP_TYPE_HIVE_TABLE_STORAGEDESC = "hive_table_storagedesc";
static class HiveDbPreprocessor extends EntityPreprocessor {
public HiveDbPreprocessor() {
super(TYPE_HIVE_DB);
}
@Override
public void preprocess(AtlasEntity entity, PreprocessorContext context) {
if (!context.isIgnoredEntity(entity.getGuid())) {
PreprocessAction action = context.getPreprocessActionForHiveDb(getName(entity));
if (action == PreprocessAction.IGNORE) {
context.addToIgnoredEntities(entity);
}
}
}
}
static class HiveTablePreprocessor extends EntityPreprocessor {
public HiveTablePreprocessor() {
super(TYPE_HIVE_TABLE);
......@@ -147,20 +164,19 @@ public class HivePreprocessor {
if (context.isIgnoredEntity(entity.getGuid())) {
context.addToIgnoredEntities(entity); // so that this will be logged with typeName and qualifiedName
} else {
Object inputs = entity.getAttribute(ATTRIBUTE_INPUTS);
Object outputs = entity.getAttribute(ATTRIBUTE_OUTPUTS);
int inputsCount = (inputs instanceof Collection) ? ((Collection) inputs).size() : 0;
int outputsCount = (outputs instanceof Collection) ? ((Collection) outputs).size() : 0;
Object inputs = entity.getAttribute(ATTRIBUTE_INPUTS);
Object outputs = entity.getAttribute(ATTRIBUTE_OUTPUTS);
int inputsCount = getCollectionSize(inputs);
int outputsCount = getCollectionSize(outputs);
removeIgnoredObjectIds(inputs, context);
removeIgnoredObjectIds(outputs, context);
boolean isInputsEmpty = isEmpty(inputs);
boolean isOutputsEmpty = isEmpty(outputs);
boolean isAnyRemoved = inputsCount > getCollectionSize(inputs) || outputsCount > getCollectionSize(outputs);
// if inputs/outputs became empty due to removal of ignored entities, ignore the process entity as well
if ((inputsCount > 0 && isInputsEmpty) || (outputsCount > 0 && isOutputsEmpty)) {
if (isAnyRemoved && (isInputsEmpty || isOutputsEmpty)) {
context.addToIgnoredEntities(entity);
// since the process entity is ignored, entities referenced by inputs/outputs of this process entity
......@@ -186,6 +202,10 @@ public class HivePreprocessor {
}
}
private int getCollectionSize(Object obj) {
return (obj instanceof Collection) ? ((Collection) obj).size() : 0;
}
private void removeIgnoredObjectIds(Object obj, PreprocessorContext context) {
if (obj == null || !(obj instanceof Collection)) {
return;
......
......@@ -41,6 +41,8 @@ import java.util.Set;
import java.util.regex.Pattern;
import static org.apache.atlas.model.instance.AtlasObjectId.KEY_GUID;
import static org.apache.atlas.notification.preprocessor.EntityPreprocessor.QNAME_SEP_CLUSTER_NAME;
import static org.apache.atlas.notification.preprocessor.EntityPreprocessor.QNAME_SEP_ENTITY_NAME;
public class PreprocessorContext {
......@@ -54,8 +56,12 @@ public class PreprocessorContext {
private final List<Pattern> hiveTablesToIgnore;
private final List<Pattern> hiveTablesToPrune;
private final Map<String, PreprocessAction> hiveTablesCache;
private final List<String> hiveDummyDatabasesToIgnore;
private final List<String> hiveDummyTablesToIgnore;
private final List<String> hiveTablePrefixesToIgnore;
private final boolean hiveTypesRemoveOwnedRefAttrs;
private final boolean rdbmsTypesRemoveOwnedRefAttrs;
private final boolean isHivePreProcessEnabled;
private final Set<String> ignoredEntities = new HashSet<>();
private final Set<String> prunedEntities = new HashSet<>();
private final Set<String> referredEntitiesToMove = new HashSet<>();
......@@ -64,12 +70,15 @@ public class PreprocessorContext {
private final Map<String, String> guidAssignments = new HashMap<>();
private List<AtlasEntity> postUpdateEntities = null;
public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs) {
public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs) {
this.kafkaMessage = kafkaMessage;
this.typeRegistry = typeRegistry;
this.hiveTablesToIgnore = hiveTablesToIgnore;
this.hiveTablesToPrune = hiveTablesToPrune;
this.hiveTablesCache = hiveTablesCache;
this.hiveDummyDatabasesToIgnore = hiveDummyDatabasesToIgnore;
this.hiveDummyTablesToIgnore = hiveDummyTablesToIgnore;
this.hiveTablePrefixesToIgnore = hiveTablePrefixesToIgnore;
this.hiveTypesRemoveOwnedRefAttrs = hiveTypesRemoveOwnedRefAttrs;
this.rdbmsTypesRemoveOwnedRefAttrs = rdbmsTypesRemoveOwnedRefAttrs;
......@@ -88,6 +97,8 @@ public class PreprocessorContext {
entitiesWithExtInfo = null;
break;
}
this.isHivePreProcessEnabled = hiveTypesRemoveOwnedRefAttrs || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty();
}
public AtlasKafkaMessage<HookNotification> getKafkaMessage() {
......@@ -107,7 +118,7 @@ public class PreprocessorContext {
public boolean getRdbmsTypesRemoveOwnedRefAttrs() { return rdbmsTypesRemoveOwnedRefAttrs; }
public boolean isHivePreprocessEnabled() {
return !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || hiveTypesRemoveOwnedRefAttrs;
return isHivePreProcessEnabled;
}
public List<AtlasEntity> getEntities() {
......@@ -142,22 +153,78 @@ public class PreprocessorContext {
public List<AtlasEntity> getPostUpdateEntities() { return postUpdateEntities; }
public PreprocessAction getPreprocessActionForHiveDb(String dbName) {
PreprocessAction ret = PreprocessAction.NONE;
if (dbName != null) {
for (String dummyDbName : hiveDummyDatabasesToIgnore) {
if (StringUtils.equalsIgnoreCase(dbName, dummyDbName)) {
ret = PreprocessAction.IGNORE;
break;
}
}
}
return ret;
}
public PreprocessAction getPreprocessActionForHiveTable(String qualifiedName) {
PreprocessAction ret = PreprocessAction.NONE;
if (qualifiedName != null && (CollectionUtils.isNotEmpty(hiveTablesToIgnore) || CollectionUtils.isNotEmpty(hiveTablesToPrune))) {
ret = hiveTablesCache.get(qualifiedName);
if (qualifiedName != null) {
if (CollectionUtils.isNotEmpty(hiveTablesToIgnore) || CollectionUtils.isNotEmpty(hiveTablesToPrune)) {
ret = hiveTablesCache.get(qualifiedName);
if (ret == null) {
if (isMatch(qualifiedName, hiveTablesToIgnore)) {
ret = PreprocessAction.IGNORE;
} else if (isMatch(qualifiedName, hiveTablesToPrune)) {
ret = PreprocessAction.PRUNE;
} else {
ret = PreprocessAction.NONE;
if (ret == null) {
if (isMatch(qualifiedName, hiveTablesToIgnore)) {
ret = PreprocessAction.IGNORE;
} else if (isMatch(qualifiedName, hiveTablesToPrune)) {
ret = PreprocessAction.PRUNE;
} else {
ret = PreprocessAction.NONE;
}
hiveTablesCache.put(qualifiedName, ret);
}
}
if (ret != PreprocessAction.IGNORE && (CollectionUtils.isNotEmpty(hiveDummyTablesToIgnore) || CollectionUtils.isNotEmpty(hiveTablePrefixesToIgnore))) {
String tblName = getHiveTableNameFromQualifiedName(qualifiedName);
hiveTablesCache.put(qualifiedName, ret);
if (tblName != null) {
for (String dummyTblName : hiveDummyTablesToIgnore) {
if (StringUtils.equalsIgnoreCase(tblName, dummyTblName)) {
ret = PreprocessAction.IGNORE;
break;
}
}
if (ret != PreprocessAction.IGNORE) {
for (String tableNamePrefix : hiveTablePrefixesToIgnore) {
if (StringUtils.startsWithIgnoreCase(tblName, tableNamePrefix)) {
ret = PreprocessAction.IGNORE;
break;
}
}
}
}
if (ret != PreprocessAction.IGNORE && CollectionUtils.isNotEmpty(hiveDummyDatabasesToIgnore)) {
String dbName = getHiveDbNameFromQualifiedName(qualifiedName);
if (dbName != null) {
for (String dummyDbName : hiveDummyDatabasesToIgnore) {
if (StringUtils.equalsIgnoreCase(dbName, dummyDbName)) {
ret = PreprocessAction.IGNORE;
break;
}
}
}
}
}
}
......@@ -321,6 +388,36 @@ public class PreprocessorContext {
}
}
public String getHiveTableNameFromQualifiedName(String qualifiedName) {
String ret = null;
int idxStart = qualifiedName.indexOf(QNAME_SEP_ENTITY_NAME) + 1;
if (idxStart != 0 && qualifiedName.length() > idxStart) {
int idxEnd = qualifiedName.indexOf(QNAME_SEP_CLUSTER_NAME, idxStart);
if (idxEnd != -1) {
ret = qualifiedName.substring(idxStart, idxEnd);
}
}
return ret;
}
public String getHiveDbNameFromQualifiedName(String qualifiedName) {
String ret = null;
int idxEnd = qualifiedName.indexOf(QNAME_SEP_ENTITY_NAME); // db.table@cluster, db.table.column@cluster
if (idxEnd == -1) {
idxEnd = qualifiedName.indexOf(QNAME_SEP_CLUSTER_NAME); // db@cluster
}
if (idxEnd != -1) {
ret = qualifiedName.substring(0, idxEnd);
}
return ret;
}
public String getTypeName(Object obj) {
Object ret = null;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment