Commit fcca791b by Sarath Subramanian

ATLAS-3188: Regression: Simple insert queries are being captured in ATLAS_HOOK topic

parent 8b58a326
......@@ -26,10 +26,7 @@ import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.hive.metastore.IHMSHandler;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.events.*;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.hooks.LineageInfo;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.hooks.*;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
......@@ -58,6 +55,9 @@ public class AtlasHiveHookContext {
private final ListenerEvent metastoreEvent;
private final IHMSHandler metastoreHandler;
private boolean isSkippedInputEntity;
private boolean isSkippedOutputEntity;
public AtlasHiveHookContext(HiveHook hook, HiveOperation hiveOperation, HookContext hiveContext,
HiveHookObjectNamesCache knownObjects) throws Exception {
this(hook, hiveOperation, hiveContext, knownObjects, null, null);
......@@ -102,6 +102,34 @@ public class AtlasHiveHookContext {
return hiveContext != null ? hiveContext.getOutputs() : Collections.emptySet();
}
public boolean isSkippedInputEntity() {
return isSkippedInputEntity;
}
public boolean isSkippedOutputEntity() {
return isSkippedOutputEntity;
}
public void registerSkippedEntity(Entity entity) {
if (entity instanceof ReadEntity) {
registerSkippedInputEntity();
} else if (entity instanceof WriteEntity) {
registerSkippedOutputEntity();
}
}
public void registerSkippedInputEntity() {
if (!isSkippedInputEntity) {
isSkippedInputEntity = true;
}
}
public void registerSkippedOutputEntity() {
if (!isSkippedOutputEntity) {
isSkippedOutputEntity = true;
}
}
public LineageInfo getLineageInfo() {
return hiveContext != null ? hiveContext.getLinfo() : null;
}
......
......@@ -18,7 +18,6 @@
package org.apache.atlas.hive.hook.events;
import com.google.common.collect.ImmutableMap;
import org.apache.atlas.hive.hook.AtlasHiveHookContext;
import org.apache.atlas.hive.hook.HiveHook.PreprocessAction;
import org.apache.atlas.model.instance.AtlasEntity;
......@@ -28,7 +27,6 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.utils.HdfsNameServiceResolver;
import org.apache.commons.collections.CollectionUtils;
......@@ -52,7 +50,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
......@@ -268,6 +265,8 @@ public abstract class BaseHiveEvent {
Table table = getHive().getTable(dbName, tableName);
ret = toTableEntity(table, entityExtInfo);
} else {
context.registerSkippedEntity(entity);
}
}
break;
......
......@@ -25,7 +25,6 @@ import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.hooks.LineageInfo;
import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo;
import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency;
......@@ -112,7 +111,17 @@ public class CreateHiveProcess extends BaseHiveEvent {
}
}
if (!inputs.isEmpty() || !outputs.isEmpty()) {
boolean skipProcess = inputs.isEmpty() && outputs.isEmpty();
if (!skipProcess) {
if (inputs.isEmpty() && context.isSkippedInputEntity()) {
skipProcess = true;
} else if (outputs.isEmpty() && context.isSkippedOutputEntity()) {
skipProcess = true;
}
}
if (!skipProcess) {
AtlasEntity process = getHiveProcessEntity(inputs, outputs);
ret.addEntity(process);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment