Commit ed795dc4 by Madhan Neethiraj

ATLAS-2975: updated Hive hook to avoid duplicate column-lineage entities; also…

ATLAS-2975: updated Hive hook to avoid duplicate column-lineage entities; also updated Atlas server to skip duplicate column-lineage entities
parent 84371bcc
......@@ -136,8 +136,9 @@ public class CreateHiveProcess extends BaseHiveEvent {
return;
}
final List<AtlasEntity> columnLineages = new ArrayList<>();
int lineageInputsCount = 0;
final List<AtlasEntity> columnLineages = new ArrayList<>();
int lineageInputsCount = 0;
final Set<String> processedOutputCols = new HashSet<>();
for (Map.Entry<DependencyKey, Dependency> entry : lineageInfo.entrySet()) {
String outputColName = getQualifiedName(entry.getKey());
......@@ -153,6 +154,14 @@ public class CreateHiveProcess extends BaseHiveEvent {
continue;
}
if (processedOutputCols.contains(outputColName)) {
LOG.warn("column-lineage: duplicate for output-column {}", outputColName);
continue;
} else {
processedOutputCols.add(outputColName);
}
List<AtlasEntity> inputColumns = new ArrayList<>();
for (BaseColumnInfo baseColumn : getBaseCols(entry.getValue())) {
......
......@@ -69,8 +69,10 @@ import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
......@@ -91,6 +93,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private static final int SC_BAD_REQUEST = 400;
private static final String TYPE_HIVE_COLUMN_LINEAGE = "hive_column_lineage";
private static final String ATTRIBUTE_INPUTS = "inputs";
private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName();
private static final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
......@@ -688,14 +691,34 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
if (entities != null && entities.getEntities() != null) {
int lineageCount = 0;
int lineageInputsCount = 0;
int lineageCount = 0;
int lineageInputsCount = 0;
int numRemovedEntities = 0;
Set<String> lineageQNames = new HashSet<>();
// find if all hive_column_lineage entities have same number of inputs, which is likely to be caused by HIVE-20633 that results in incorrect lineage in some cases
for (ListIterator<AtlasEntity> iter = entities.getEntities().listIterator(); iter.hasNext(); ) {
AtlasEntity entity = iter.next();
if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) {
final Object qName = entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
if (qName != null) {
final String qualifiedName = qName.toString();
if (lineageQNames.contains(qualifiedName)) {
iter.remove();
LOG.warn("removed duplicate hive_column_lineage entity: qualifiedName={}. topic-offset={}, partition={}", qualifiedName, lineageInputsCount, kafkaMessage.getOffset(), kafkaMessage.getPartition());
numRemovedEntities++;
continue;
} else {
lineageQNames.add(qualifiedName);
}
}
lineageCount++;
Object objInputs = entity.getAttribute(ATTRIBUTE_INPUTS);
......@@ -711,8 +734,6 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
float avgInputsCount = lineageCount > 0 ? (((float) lineageInputsCount) / lineageCount) : 0;
if (avgInputsCount > skipHiveColumnLineageHive20633InputsThreshold) {
int numRemovedEntities = 0;
for (ListIterator<AtlasEntity> iter = entities.getEntities().listIterator(); iter.hasNext(); ) {
AtlasEntity entity = iter.next();
......@@ -722,10 +743,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
numRemovedEntities++;
}
}
}
if (numRemovedEntities > 0) {
LOG.warn("removed {} hive_column_lineage entities. Average # of inputs={}, threshold={}, total # of inputs={}. topic-offset={}, partition={}", numRemovedEntities, avgInputsCount, skipHiveColumnLineageHive20633InputsThreshold, lineageInputsCount, kafkaMessage.getOffset(), kafkaMessage.getPartition());
}
if (numRemovedEntities > 0) {
LOG.warn("removed {} hive_column_lineage entities. Average # of inputs={}, threshold={}, total # of inputs={}. topic-offset={}, partition={}", numRemovedEntities, avgInputsCount, skipHiveColumnLineageHive20633InputsThreshold, lineageInputsCount, kafkaMessage.getOffset(), kafkaMessage.getPartition());
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment