Commit d5f46e3f by Madhan Neethiraj

ATLAS-2891: updated hook notification processing with option to ignore…

ATLAS-2891: updated hook notification processing with option to ignore potentially incorrect hive_column_lineage (cherry picked from commit 20215f3dd74b16fe4a7a9c8eb21b17925256f4f9)
parent 4128f5d2
......@@ -85,6 +85,14 @@ public class AtlasHiveHookContext {
return hook.getClusterName();
}
public boolean getSkipHiveColumnLineageHive20633() {
return hook.getSkipHiveColumnLineageHive20633();
}
public int getSkipHiveColumnLineageHive20633InputsThreshold() {
return hook.getSkipHiveColumnLineageHive20633InputsThreshold();
}
public String getQualifiedName(Database db) {
return (db.getName() + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
}
......
......@@ -51,6 +51,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
public static final String HOOK_NAME_CACHE_DATABASE_COUNT = CONF_PREFIX + "name.cache.database.count";
public static final String HOOK_NAME_CACHE_TABLE_COUNT = CONF_PREFIX + "name.cache.table.count";
public static final String HOOK_NAME_CACHE_REBUID_INTERVAL_SEC = CONF_PREFIX + "name.cache.rebuild.interval.seconds";
public static final String HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633 = CONF_PREFIX + "skip.hive_column_lineage.hive-20633";
public static final String HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = CONF_PREFIX + "skip.hive_column_lineage.hive-20633.inputs.threshold";
public static final String DEFAULT_CLUSTER_NAME = "primary";
......@@ -62,6 +64,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private static final int nameCacheTableMaxCount;
private static final int nameCacheRebuildIntervalSeconds;
private static final boolean skipHiveColumnLineageHive20633;
private static final int skipHiveColumnLineageHive20633InputsThreshold;
private static HiveHookObjectNamesCache knownObjects = null;
static {
......@@ -74,6 +79,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
nameCacheDatabaseMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_DATABASE_COUNT, 10000);
nameCacheTableMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_TABLE_COUNT, 10000);
nameCacheRebuildIntervalSeconds = atlasProperties.getInt(HOOK_NAME_CACHE_REBUID_INTERVAL_SEC, 60 * 60); // 60 minutes default
skipHiveColumnLineageHive20633 = atlasProperties.getBoolean(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
skipHiveColumnLineageHive20633InputsThreshold = atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 5); // skip greater-than 5 inputs by default
knownObjects = nameCacheEnabled ? new HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, nameCacheRebuildIntervalSeconds) : null;
}
......@@ -182,6 +189,14 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return clusterName;
}
public boolean getSkipHiveColumnLineageHive20633() {
return skipHiveColumnLineageHive20633;
}
public int getSkipHiveColumnLineageHive20633InputsThreshold() {
return skipHiveColumnLineageHive20633InputsThreshold;
}
public static class HiveHookObjectNamesCache {
private final int dbMaxCacheCount;
......
......@@ -136,10 +136,18 @@ public class CreateHiveProcess extends BaseHiveEvent {
return;
}
final List<AtlasEntity> columnLineages = new ArrayList<>();
boolean isSameInputsSize = true;
int lineageInputsSize = -1;
for (Map.Entry<DependencyKey, Dependency> entry : lineageInfo.entrySet()) {
String outputColName = getQualifiedName(entry.getKey());
AtlasEntity outputColumn = context.getEntity(outputColName);
if (LOG.isDebugEnabled()) {
LOG.debug("processColumnLineage(): DependencyKey={}; Dependency={}", entry.getKey(), entry.getValue());
}
if (outputColumn == null) {
LOG.warn("column-lineage: non-existing output-column {}", outputColName);
......@@ -165,6 +173,12 @@ public class CreateHiveProcess extends BaseHiveEvent {
continue;
}
if (lineageInputsSize == -1) {
lineageInputsSize = inputColumns.size();
} else if (lineageInputsSize != inputColumns.size()) {
isSameInputsSize = false;
}
AtlasEntity columnLineageProcess = new AtlasEntity(HIVE_TYPE_COLUMN_LINEAGE);
columnLineageProcess.setAttribute(ATTRIBUTE_NAME, hiveProcess.getAttribute(ATTRIBUTE_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME));
......@@ -175,7 +189,17 @@ public class CreateHiveProcess extends BaseHiveEvent {
columnLineageProcess.setAttribute(ATTRIBUTE_DEPENDENCY_TYPE, entry.getValue().getType());
columnLineageProcess.setAttribute(ATTRIBUTE_EXPRESSION, entry.getValue().getExpr());
entities.addEntity(columnLineageProcess);
columnLineages.add(columnLineageProcess);
}
boolean skipColumnLineage = context.getSkipHiveColumnLineageHive20633() && isSameInputsSize && lineageInputsSize > context.getSkipHiveColumnLineageHive20633InputsThreshold();
if (!skipColumnLineage) {
for (AtlasEntity columnLineage : columnLineages) {
entities.addEntity(columnLineage);
}
} else {
LOG.warn("skipping {} hive_column_lineage entities, each having {} inputs", columnLineages.size(), lineageInputsSize);
}
}
......
......@@ -75,7 +75,14 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
T message = deserializer.deserialize(record.value().toString());
T message = null;
try {
message = deserializer.deserialize(record.value().toString());
} catch (OutOfMemoryError excp) {
LOG.error("Ignoring message that failed to deserialize: topic={}, partition={}, offset={}, key={}, value={}",
record.topic(), record.partition(), record.offset(), record.key(), record.value(), excp);
}
if (message == null) {
continue;
......
......@@ -29,8 +29,9 @@ import org.apache.atlas.RequestContext;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
......@@ -55,6 +56,7 @@ import org.apache.atlas.web.filters.AuditFilter;
import org.apache.atlas.web.filters.AuditFilter.AuditLog;
import org.apache.atlas.web.service.ServiceState;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -63,7 +65,11 @@ import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
......@@ -82,6 +88,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private static final int SC_OK = 200;
private static final int SC_BAD_REQUEST = 400;
private static final String TYPE_HIVE_COLUMN_LINEAGE = "hive_column_lineage";
private static final String ATTRIBUTE_INPUTS = "inputs";
private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName();
public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
......@@ -91,6 +100,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public static final String CONSUMER_MIN_RETRY_INTERVAL = "atlas.notification.consumer.min.retry.interval";
public static final String CONSUMER_MAX_RETRY_INTERVAL = "atlas.notification.consumer.max.retry.interval";
public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633 = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633";
public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633.inputs.threshold";
public static final int SERVER_READY_WAIT_TIME_MS = 1000;
private final AtlasEntityStore atlasEntityStore;
......@@ -101,6 +113,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final int failedMsgCacheSize;
private final int minWaitDuration;
private final int maxWaitDuration;
private final boolean skipHiveColumnLineageHive20633;
private final int skipHiveColumnLineageHive20633InputsThreshold;
private NotificationInterface notificationInterface;
private ExecutorService executors;
......@@ -124,10 +138,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
this.applicationProperties = ApplicationProperties.get();
maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20);
failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 1);
consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
minWaitDuration = applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms by default
maxWaitDuration = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60); // 30 sec by default
skipHiveColumnLineageHive20633 = applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 5); // skip greater-than 5 inputs by default
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold);
}
@Override
......@@ -367,6 +387,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
return;
}
preProcessNotificationMessage(kafkaMsg);
// Used for intermediate conversions during create and update
for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
if (LOG.isDebugEnabled()) {
......@@ -636,6 +658,80 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
}
private void preProcessNotificationMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) {
skipHiveColumnLineage(kafkaMsg);
}
private void skipHiveColumnLineage(AtlasKafkaMessage<HookNotification> kafkaMessage) {
if (!skipHiveColumnLineageHive20633) {
return;
}
final HookNotification message = kafkaMessage.getMessage();
final AtlasEntitiesWithExtInfo entities;
switch (message.getType()) {
case ENTITY_CREATE_V2:
entities = ((EntityCreateRequestV2) message).getEntities();
break;
case ENTITY_FULL_UPDATE_V2:
entities = ((EntityUpdateRequestV2) message).getEntities();
break;
default:
entities = null;
break;
}
if (entities != null && entities.getEntities() != null) {
boolean isSameInputsSize = true;
int lineageInputsSize = -1;
int lineageCount = 0;
// find if all hive_column_lineage entities have same number of inputs, which is likely to be caused by HIVE-20633 that results in incorrect lineage in some cases
for (ListIterator<AtlasEntity> iter = entities.getEntities().listIterator(); iter.hasNext(); ) {
AtlasEntity entity = iter.next();
if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) {
Object objInputs = entity.getAttribute(ATTRIBUTE_INPUTS);
if (objInputs instanceof Collection) {
Collection inputs = (Collection) objInputs;
lineageCount++;
if (lineageInputsSize == -1) { // first entry
lineageInputsSize = inputs.size();
} else if (inputs.size() != lineageInputsSize) {
isSameInputsSize = false;
break;
}
}
}
}
if (lineageCount > 1 && isSameInputsSize && lineageInputsSize > skipHiveColumnLineageHive20633InputsThreshold) {
int numRemovedEntities = 0;
for (ListIterator<AtlasEntity> iter = entities.getEntities().listIterator(); iter.hasNext(); ) {
AtlasEntity entity = iter.next();
if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) {
iter.remove();
numRemovedEntities++;
}
}
if (numRemovedEntities > 0) {
LOG.warn("removed {} hive_column_lineage entities, each having {} inputs. offset={}, partition={}", numRemovedEntities, lineageInputsSize, kafkaMessage.getOffset(), kafkaMessage.getPartition());
}
}
}
}
static class FailedCommitOffsetRecorder {
private Long currentOffset;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment