diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java index 6d2517c..2ccfff4 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java @@ -136,8 +136,9 @@ public class CreateHiveProcess extends BaseHiveEvent { return; } - final List<AtlasEntity> columnLineages = new ArrayList<>(); - int lineageInputsCount = 0; + final List<AtlasEntity> columnLineages = new ArrayList<>(); + int lineageInputsCount = 0; + final Set<String> processedOutputCols = new HashSet<>(); for (Map.Entry<DependencyKey, Dependency> entry : lineageInfo.entrySet()) { String outputColName = getQualifiedName(entry.getKey()); @@ -153,6 +154,14 @@ public class CreateHiveProcess extends BaseHiveEvent { continue; } + if (processedOutputCols.contains(outputColName)) { + LOG.warn("column-lineage: duplicate for output-column {}", outputColName); + + continue; + } else { + processedOutputCols.add(outputColName); + } + List<AtlasEntity> inputColumns = new ArrayList<>(); for (BaseColumnInfo baseColumn : getBaseCols(entry.getValue())) { diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index da1be9e..1cde3d0 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -69,8 +69,10 @@ import javax.inject.Inject; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.ListIterator; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -91,6 +93,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private static final int SC_BAD_REQUEST = 400; private static final String TYPE_HIVE_COLUMN_LINEAGE = "hive_column_lineage"; private static final String ATTRIBUTE_INPUTS = "inputs"; + private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName(); private static final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString(); @@ -688,14 +691,34 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } if (entities != null && entities.getEntities() != null) { - int lineageCount = 0; - int lineageInputsCount = 0; + int lineageCount = 0; + int lineageInputsCount = 0; + int numRemovedEntities = 0; + Set<String> lineageQNames = new HashSet<>(); // find if all hive_column_lineage entities have same number of inputs, which is likely to be caused by HIVE-20633 that results in incorrect lineage in some cases for (ListIterator<AtlasEntity> iter = entities.getEntities().listIterator(); iter.hasNext(); ) { AtlasEntity entity = iter.next(); if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) { + final Object qName = entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME); + + if (qName != null) { + final String qualifiedName = qName.toString(); + + if (lineageQNames.contains(qualifiedName)) { + iter.remove(); + + LOG.warn("removed duplicate hive_column_lineage entity: qualifiedName={}. topic-offset={}, partition={}", qualifiedName, lineageInputsCount, kafkaMessage.getOffset(), kafkaMessage.getPartition()); + + numRemovedEntities++; + + continue; + } else { + lineageQNames.add(qualifiedName); + } + } + lineageCount++; Object objInputs = entity.getAttribute(ATTRIBUTE_INPUTS); @@ -711,8 +734,6 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl float avgInputsCount = lineageCount > 0 ? (((float) lineageInputsCount) / lineageCount) : 0; if (avgInputsCount > skipHiveColumnLineageHive20633InputsThreshold) { - int numRemovedEntities = 0; - for (ListIterator<AtlasEntity> iter = entities.getEntities().listIterator(); iter.hasNext(); ) { AtlasEntity entity = iter.next(); @@ -722,10 +743,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl numRemovedEntities++; } } + } - if (numRemovedEntities > 0) { - LOG.warn("removed {} hive_column_lineage entities. Average # of inputs={}, threshold={}, total # of inputs={}. topic-offset={}, partition={}", numRemovedEntities, avgInputsCount, skipHiveColumnLineageHive20633InputsThreshold, lineageInputsCount, kafkaMessage.getOffset(), kafkaMessage.getPartition()); - } + if (numRemovedEntities > 0) { + LOG.warn("removed {} hive_column_lineage entities. Average # of inputs={}, threshold={}, total # of inputs={}. topic-offset={}, partition={}", numRemovedEntities, avgInputsCount, skipHiveColumnLineageHive20633InputsThreshold, lineageInputsCount, kafkaMessage.getOffset(), kafkaMessage.getPartition()); } } }