Commit 82e04037 by Madhan Neethiraj

ATLAS-2891: updated hook notification processing with option to ignore…

ATLAS-2891: updated hook notification processing with option to ignore potentially incorrect hive_column_lineage - #3
parent 105d6b7f
...@@ -80,7 +80,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -80,7 +80,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
nameCacheTableMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_TABLE_COUNT, 10000); nameCacheTableMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_TABLE_COUNT, 10000);
nameCacheRebuildIntervalSeconds = atlasProperties.getInt(HOOK_NAME_CACHE_REBUID_INTERVAL_SEC, 60 * 60); // 60 minutes default nameCacheRebuildIntervalSeconds = atlasProperties.getInt(HOOK_NAME_CACHE_REBUID_INTERVAL_SEC, 60 * 60); // 60 minutes default
skipHiveColumnLineageHive20633 = atlasProperties.getBoolean(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false); skipHiveColumnLineageHive20633 = atlasProperties.getBoolean(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
skipHiveColumnLineageHive20633InputsThreshold = atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 5); // skip greater-than 5 inputs by default skipHiveColumnLineageHive20633InputsThreshold = atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15
knownObjects = nameCacheEnabled ? new HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, nameCacheRebuildIntervalSeconds) : null; knownObjects = nameCacheEnabled ? new HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, nameCacheRebuildIntervalSeconds) : null;
} }
......
...@@ -136,9 +136,8 @@ public class CreateHiveProcess extends BaseHiveEvent { ...@@ -136,9 +136,8 @@ public class CreateHiveProcess extends BaseHiveEvent {
return; return;
} }
final List<AtlasEntity> columnLineages = new ArrayList<>(); final List<AtlasEntity> columnLineages = new ArrayList<>();
boolean isSameInputsSize = true; int lineageInputsCount = 0;
int lineageInputsSize = -1;
for (Map.Entry<DependencyKey, Dependency> entry : lineageInfo.entrySet()) { for (Map.Entry<DependencyKey, Dependency> entry : lineageInfo.entrySet()) {
String outputColName = getQualifiedName(entry.getKey()); String outputColName = getQualifiedName(entry.getKey());
...@@ -173,11 +172,7 @@ public class CreateHiveProcess extends BaseHiveEvent { ...@@ -173,11 +172,7 @@ public class CreateHiveProcess extends BaseHiveEvent {
continue; continue;
} }
if (lineageInputsSize == -1) { lineageInputsCount += inputColumns.size();
lineageInputsSize = inputColumns.size();
} else if (lineageInputsSize != inputColumns.size()) {
isSameInputsSize = false;
}
AtlasEntity columnLineageProcess = new AtlasEntity(HIVE_TYPE_COLUMN_LINEAGE); AtlasEntity columnLineageProcess = new AtlasEntity(HIVE_TYPE_COLUMN_LINEAGE);
...@@ -192,14 +187,15 @@ public class CreateHiveProcess extends BaseHiveEvent { ...@@ -192,14 +187,15 @@ public class CreateHiveProcess extends BaseHiveEvent {
columnLineages.add(columnLineageProcess); columnLineages.add(columnLineageProcess);
} }
boolean skipColumnLineage = context.getSkipHiveColumnLineageHive20633() && columnLineages.size() > 1 && isSameInputsSize && lineageInputsSize > context.getSkipHiveColumnLineageHive20633InputsThreshold(); float avgInputsCount = columnLineages.size() > 0 ? (((float) lineageInputsCount) / columnLineages.size()) : 0;
boolean skipColumnLineage = context.getSkipHiveColumnLineageHive20633() && avgInputsCount > context.getSkipHiveColumnLineageHive20633InputsThreshold();
if (!skipColumnLineage) { if (!skipColumnLineage) {
for (AtlasEntity columnLineage : columnLineages) { for (AtlasEntity columnLineage : columnLineages) {
entities.addEntity(columnLineage); entities.addEntity(columnLineage);
} }
} else { } else {
LOG.warn("skipping {} hive_column_lineage entities, each having {} inputs", columnLineages.size(), lineageInputsSize); LOG.warn("skipped {} hive_column_lineage entities. Average # of inputs={}, threshold={}, total # of inputs={}", columnLineages.size(), avgInputsCount, context.getSkipHiveColumnLineageHive20633InputsThreshold(), lineageInputsCount);
} }
} }
......
...@@ -144,7 +144,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -144,7 +144,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
maxWaitDuration = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60); // 30 sec by default maxWaitDuration = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60); // 30 sec by default
skipHiveColumnLineageHive20633 = applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false); skipHiveColumnLineageHive20633 = applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 5); // skip greater-than 5 inputs by default skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633); LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold); LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold);
...@@ -685,34 +685,29 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -685,34 +685,29 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
} }
if (entities != null && entities.getEntities() != null) { if (entities != null && entities.getEntities() != null) {
boolean isSameInputsSize = true; int lineageCount = 0;
int lineageInputsSize = -1; int lineageInputsCount = 0;
int lineageCount = 0;
// find if all hive_column_lineage entities have same number of inputs, which is likely to be caused by HIVE-20633 that results in incorrect lineage in some cases // find if all hive_column_lineage entities have same number of inputs, which is likely to be caused by HIVE-20633 that results in incorrect lineage in some cases
for (ListIterator<AtlasEntity> iter = entities.getEntities().listIterator(); iter.hasNext(); ) { for (ListIterator<AtlasEntity> iter = entities.getEntities().listIterator(); iter.hasNext(); ) {
AtlasEntity entity = iter.next(); AtlasEntity entity = iter.next();
if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) { if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) {
lineageCount++;
Object objInputs = entity.getAttribute(ATTRIBUTE_INPUTS); Object objInputs = entity.getAttribute(ATTRIBUTE_INPUTS);
if (objInputs instanceof Collection) { if (objInputs instanceof Collection) {
Collection inputs = (Collection) objInputs; Collection inputs = (Collection) objInputs;
lineageCount++; lineageInputsCount += inputs.size();
if (lineageInputsSize == -1) { // first entry
lineageInputsSize = inputs.size();
} else if (inputs.size() != lineageInputsSize) {
isSameInputsSize = false;
break;
}
} }
} }
} }
if (lineageCount > 1 && isSameInputsSize && lineageInputsSize > skipHiveColumnLineageHive20633InputsThreshold) { float avgInputsCount = lineageCount > 0 ? (((float) lineageInputsCount) / lineageCount) : 0;
if (avgInputsCount > skipHiveColumnLineageHive20633InputsThreshold) {
int numRemovedEntities = 0; int numRemovedEntities = 0;
for (ListIterator<AtlasEntity> iter = entities.getEntities().listIterator(); iter.hasNext(); ) { for (ListIterator<AtlasEntity> iter = entities.getEntities().listIterator(); iter.hasNext(); ) {
...@@ -726,7 +721,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -726,7 +721,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
} }
if (numRemovedEntities > 0) { if (numRemovedEntities > 0) {
LOG.warn("removed {} hive_column_lineage entities, each having {} inputs. offset={}, partition={}", numRemovedEntities, lineageInputsSize, kafkaMessage.getOffset(), kafkaMessage.getPartition()); LOG.warn("removed {} hive_column_lineage entities. Average # of inputs={}, threshold={}, total # of inputs={}. topic-offset={}, partition={}", numRemovedEntities, avgInputsCount, skipHiveColumnLineageHive20633InputsThreshold, lineageInputsCount, kafkaMessage.getOffset(), kafkaMessage.getPartition());
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment