Commit 15e5bedb by skoritala Committed by Sarath Subramanian

ATLAS-3413: HMS should create a dummy process linking HDFS path to Hive table…

ATLAS-3413: HMS should create a dummy process linking HDFS path to Hive table for external table lineage Signed-off-by: 's avatarSarath Subramanian <sarath@apache.org>
parent 94118657
...@@ -52,15 +52,7 @@ import org.slf4j.LoggerFactory; ...@@ -52,15 +52,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.*;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.atlas.hive.bridge.HiveMetaStoreBridge.getDatabaseName; import static org.apache.atlas.hive.bridge.HiveMetaStoreBridge.getDatabaseName;
import static org.apache.atlas.hive.hook.AtlasHiveHookContext.QNAME_SEP_METADATA_NAMESPACE; import static org.apache.atlas.hive.hook.AtlasHiveHookContext.QNAME_SEP_METADATA_NAMESPACE;
...@@ -640,18 +632,29 @@ public abstract class BaseHiveEvent { ...@@ -640,18 +632,29 @@ public abstract class BaseHiveEvent {
} }
protected AtlasEntity getHiveProcessEntity(List<AtlasEntity> inputs, List<AtlasEntity> outputs) throws Exception { protected AtlasEntity getHiveProcessEntity(List<AtlasEntity> inputs, List<AtlasEntity> outputs) throws Exception {
AtlasEntity ret = new AtlasEntity(HIVE_TYPE_PROCESS); AtlasEntity ret = new AtlasEntity(HIVE_TYPE_PROCESS);
String queryStr = getQueryString(); String queryStr = getQueryString();
if (queryStr != null) { if (queryStr != null) {
queryStr = queryStr.toLowerCase().trim(); queryStr = queryStr.toLowerCase().trim();
} }
ret.setAttribute(ATTRIBUTE_NAME, queryStr);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(inputs, outputs)); ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, getOperationName());
String qualifiedName = getQualifiedName(inputs, outputs);
if (context.isMetastoreHook()) {
HiveOperation operation = context.getHiveOperation();
if (operation == HiveOperation.CREATETABLE || operation == HiveOperation.CREATETABLE_AS_SELECT) {
AtlasEntity table = outputs.get(0);
long createTime = Long.valueOf((Long)table.getAttribute(ATTRIBUTE_CREATE_TIME));
qualifiedName = (String) table.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + QNAME_SEP_PROCESS + createTime;
ret.setAttribute(ATTRIBUTE_NAME, "dummyProcess:" + UUID.randomUUID());
ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, operation.getOperationName());
}
}
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
ret.setRelationshipAttribute(ATTRIBUTE_INPUTS, AtlasTypeUtil.getAtlasRelatedObjectIds(inputs, RELATIONSHIP_DATASET_PROCESS_INPUTS)); ret.setRelationshipAttribute(ATTRIBUTE_INPUTS, AtlasTypeUtil.getAtlasRelatedObjectIds(inputs, RELATIONSHIP_DATASET_PROCESS_INPUTS));
ret.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, AtlasTypeUtil.getAtlasRelatedObjectIds(outputs, RELATIONSHIP_PROCESS_DATASET_OUTPUTS)); ret.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, AtlasTypeUtil.getAtlasRelatedObjectIds(outputs, RELATIONSHIP_PROCESS_DATASET_OUTPUTS));
ret.setAttribute(ATTRIBUTE_NAME, queryStr);
ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, getOperationName());
// We are setting an empty value to these attributes, since now we have a new entity type called hive process // We are setting an empty value to these attributes, since now we have a new entity type called hive process
// execution which captures these values. We have to set empty values here because these attributes are // execution which captures these values. We have to set empty values here because these attributes are
......
...@@ -30,6 +30,8 @@ import org.apache.hadoop.hive.metastore.events.ListenerEvent; ...@@ -30,6 +30,8 @@ import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
...@@ -38,6 +40,7 @@ import static org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE; ...@@ -38,6 +40,7 @@ import static org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE;
import static org.apache.hadoop.hive.ql.plan.HiveOperation.*; import static org.apache.hadoop.hive.ql.plan.HiveOperation.*;
public class CreateTable extends BaseHiveEvent { public class CreateTable extends BaseHiveEvent {
private static final Logger LOG = LoggerFactory.getLogger(CreateTable.class);
private final boolean skipTempTables; private final boolean skipTempTables;
public CreateTable(AtlasHiveHookContext context, boolean skipTempTables) { public CreateTable(AtlasHiveHookContext context, boolean skipTempTables) {
...@@ -48,7 +51,7 @@ public class CreateTable extends BaseHiveEvent { ...@@ -48,7 +51,7 @@ public class CreateTable extends BaseHiveEvent {
@Override @Override
public List<HookNotification> getNotificationMessages() throws Exception { public List<HookNotification> getNotificationMessages() throws Exception {
List<HookNotification> ret = null; List<HookNotification> ret = null;
AtlasEntitiesWithExtInfo entities = context.isMetastoreHook() ? getHiveMetastoreEntities() : getHiveEntities(); AtlasEntitiesWithExtInfo entities = context.isMetastoreHook() ? getHiveMetastoreEntities() : getHiveEntities();
if (entities != null && CollectionUtils.isNotEmpty(entities.getEntities())) { if (entities != null && CollectionUtils.isNotEmpty(entities.getEntities())) {
...@@ -117,41 +120,70 @@ public class CreateTable extends BaseHiveEvent { ...@@ -117,41 +120,70 @@ public class CreateTable extends BaseHiveEvent {
if (table != null) { if (table != null) {
AtlasEntity tblEntity = toTableEntity(table, ret); AtlasEntity tblEntity = toTableEntity(table, ret);
if (tblEntity != null && !context.isMetastoreHook()) { if (tblEntity != null) {
if (isHBaseStore(table)) { if (isHBaseStore(table)) {
// This create lineage to HBase table in case of Hive on HBase if (context.isMetastoreHook()) {
AtlasEntity hbaseTableEntity = toReferencedHBaseTable(table, ret); //do nothing
} else {
if (hbaseTableEntity != null) { // This create lineage to HBase table in case of Hive on HBase
final AtlasEntity processEntity; AtlasEntity hbaseTableEntity = toReferencedHBaseTable(table, ret);
//not a hive metastore hook
//it is running in the context of Hbase.
if (hbaseTableEntity != null) {
final AtlasEntity processEntity;
if (EXTERNAL_TABLE.equals(table.getTableType())) {
processEntity = getHiveProcessEntity(Collections.singletonList(hbaseTableEntity), Collections.singletonList(tblEntity));
} else {
processEntity = getHiveProcessEntity(Collections.singletonList(tblEntity), Collections.singletonList(hbaseTableEntity));
}
ret.addEntity(processEntity);
if (EXTERNAL_TABLE.equals(table.getTableType())) { AtlasEntity processExecution = getHiveProcessExecutionEntity(processEntity);
processEntity = getHiveProcessEntity(Collections.singletonList(hbaseTableEntity), Collections.singletonList(tblEntity)); ret.addEntity(processExecution);
} else {
processEntity = getHiveProcessEntity(Collections.singletonList(tblEntity), Collections.singletonList(hbaseTableEntity));
} }
ret.addEntity(processEntity);
AtlasEntity processExecution = getHiveProcessExecutionEntity(processEntity);
ret.addEntity(processExecution);
} }
} else { } else {
if (EXTERNAL_TABLE.equals(table.getTableType())) { if (context.isMetastoreHook()) {
AtlasEntity hdfsPathEntity = getPathEntity(table.getDataLocation(), ret); //it is running in the context of HiveMetastore
AtlasEntity processEntity = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), Collections.singletonList(tblEntity)); //not a hive metastore hook
if (EXTERNAL_TABLE.equals(table.getTableType())) {
AtlasEntity hdfsPathEntity = getPathEntity(table.getDataLocation(), ret);
if(LOG.isDebugEnabled()) {
LOG.debug("Creating a dummy process with lineage from hdfs path to table");
}
AtlasEntity processEntity = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity),
Collections.singletonList(tblEntity));
ret.addEntity(processEntity);
ret.addReferredEntity(hdfsPathEntity);
//hive process entity will be created by hiveserver hook.
}
} else {
//not a hive metastore hook
//it is running in the context of HiveServer2
if (EXTERNAL_TABLE.equals(table.getTableType())) {
AtlasEntity hdfsPathEntity = getPathEntity(table.getDataLocation(), ret);
AtlasEntity processEntity = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), Collections.singletonList(tblEntity));
ret.addEntity(processEntity); ret.addEntity(processEntity);
ret.addReferredEntity(hdfsPathEntity); ret.addReferredEntity(hdfsPathEntity);
AtlasEntity processExecution = getHiveProcessExecutionEntity(processEntity); AtlasEntity processExecution = getHiveProcessExecutionEntity(processEntity);
ret.addEntity(processExecution); ret.addEntity(processExecution);
}
} }
} }
AtlasEntity tableDDLEntity = createHiveDDLEntity(tblEntity); if (!context.isMetastoreHook()) {
AtlasEntity tableDDLEntity = createHiveDDLEntity(tblEntity);
if (tableDDLEntity != null) { if (tableDDLEntity != null) {
ret.addEntity(tableDDLEntity); ret.addEntity(tableDDLEntity);
}
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment