Commit 5c9a699a by Aadarsh Jajodia Committed by Sarath Subramanian

ATLAS-3133: Add support for Process Executions in Atlas

parent 0a81c250
......@@ -133,6 +133,8 @@ public class AtlasHiveHookContext {
return hook.getClusterName();
}
public String getHostName() { return hook.getHostName(); }
public boolean isConvertHdfsPathToLowerCase() {
return hook.isConvertHdfsPathToLowerCase();
}
......
......@@ -32,6 +32,8 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
......@@ -46,7 +48,6 @@ import static org.apache.atlas.hive.hook.events.BaseHiveEvent.ATTRIBUTE_QUALIFIE
import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_DB;
import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_TABLE;
public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class);
......@@ -66,6 +67,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
public static final String HOOK_HIVE_TABLE_CACHE_SIZE = CONF_PREFIX + "hive_table.cache.size";
public static final String DEFAULT_CLUSTER_NAME = "primary";
public static final String DEFAULT_HOST_NAME = "localhost";
private static final Map<String, HiveOperation> OPERATION_MAP = new HashMap<>();
......@@ -83,6 +85,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private static final Map<String, PreprocessAction> hiveTablesCache;
private static HiveHookObjectNamesCache knownObjects = null;
private static String hostName;
static {
for (HiveOperation hiveOperation : HiveOperation.values()) {
......@@ -134,6 +137,13 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
knownObjects = nameCacheEnabled ? new HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, nameCacheRebuildIntervalSeconds) : null;
try {
hostName = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
LOG.warn("No hostname found. Setting the hostname to default value {}", DEFAULT_HOST_NAME, e);
hostName = DEFAULT_HOST_NAME;
}
}
......@@ -292,6 +302,10 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return knownObjects;
}
public String getHostName() {
return hostName;
}
public static class HiveHookObjectNamesCache {
private final int dbMaxCacheCount;
private final int tblMaxCacheCount;
......
......@@ -18,6 +18,7 @@
package org.apache.atlas.hive.hook.events;
import com.google.common.collect.ImmutableMap;
import org.apache.atlas.hive.hook.AtlasHiveHookContext;
import org.apache.atlas.hive.hook.HiveHook.PreprocessAction;
import org.apache.atlas.model.instance.AtlasEntity;
......@@ -27,6 +28,8 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.utils.HdfsNameServiceResolver;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
......@@ -49,6 +52,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
......@@ -67,24 +71,25 @@ import static org.apache.atlas.hive.hook.AtlasHiveHookContext.QNAME_SEP_PROCESS;
public abstract class BaseHiveEvent {
private static final Logger LOG = LoggerFactory.getLogger(BaseHiveEvent.class);
public static final String HIVE_TYPE_DB = "hive_db";
public static final String HIVE_TYPE_TABLE = "hive_table";
public static final String HIVE_TYPE_STORAGEDESC = "hive_storagedesc";
public static final String HIVE_TYPE_COLUMN = "hive_column";
public static final String HIVE_TYPE_PROCESS = "hive_process";
public static final String HIVE_TYPE_COLUMN_LINEAGE = "hive_column_lineage";
public static final String HIVE_TYPE_SERDE = "hive_serde";
public static final String HIVE_TYPE_ORDER = "hive_order";
public static final String HDFS_TYPE_PATH = "hdfs_path";
public static final String HBASE_TYPE_TABLE = "hbase_table";
public static final String HBASE_TYPE_NAMESPACE = "hbase_namespace";
public static final String AWS_S3_BUCKET = "aws_s3_bucket";
public static final String AWS_S3_PSEUDO_DIR = "aws_s3_pseudo_dir";
public static final String AWS_S3_OBJECT = "aws_s3_object";
public static final String SCHEME_SEPARATOR = "://";
public static final String S3_SCHEME = "s3" + SCHEME_SEPARATOR;
public static final String S3A_SCHEME = "s3a" + SCHEME_SEPARATOR;
public static final String HIVE_TYPE_DB = "hive_db";
public static final String HIVE_TYPE_TABLE = "hive_table";
public static final String HIVE_TYPE_STORAGEDESC = "hive_storagedesc";
public static final String HIVE_TYPE_COLUMN = "hive_column";
public static final String HIVE_TYPE_PROCESS = "hive_process";
public static final String HIVE_TYPE_COLUMN_LINEAGE = "hive_column_lineage";
public static final String HIVE_TYPE_SERDE = "hive_serde";
public static final String HIVE_TYPE_ORDER = "hive_order";
public static final String HIVE_TYPE_PROCESS_EXECUTION = "hive_process_execution";
public static final String HDFS_TYPE_PATH = "hdfs_path";
public static final String HBASE_TYPE_TABLE = "hbase_table";
public static final String HBASE_TYPE_NAMESPACE = "hbase_namespace";
public static final String AWS_S3_BUCKET = "aws_s3_bucket";
public static final String AWS_S3_PSEUDO_DIR = "aws_s3_pseudo_dir";
public static final String AWS_S3_OBJECT = "aws_s3_object";
public static final String SCHEME_SEPARATOR = "://";
public static final String S3_SCHEME = "s3" + SCHEME_SEPARATOR;
public static final String S3A_SCHEME = "s3a" + SCHEME_SEPARATOR;
public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
public static final String ATTRIBUTE_NAME = "name";
......@@ -126,6 +131,8 @@ public abstract class BaseHiveEvent {
public static final String ATTRIBUTE_START_TIME = "startTime";
public static final String ATTRIBUTE_USER_NAME = "userName";
public static final String ATTRIBUTE_QUERY_TEXT = "queryText";
public static final String ATTRIBUTE_PROCESS = "process";
public static final String ATTRIBUTE_PROCESS_EXECUTIONS = "processExecutions";
public static final String ATTRIBUTE_QUERY_ID = "queryId";
public static final String ATTRIBUTE_QUERY_PLAN = "queryPlan";
public static final String ATTRIBUTE_END_TIME = "endTime";
......@@ -139,6 +146,7 @@ public abstract class BaseHiveEvent {
public static final String ATTRIBUTE_NAMESPACE = "namespace";
public static final String ATTRIBUTE_OBJECT_PREFIX = "objectPrefix";
public static final String ATTRIBUTE_BUCKET = "bucket";
public static final String ATTRIBUTE_HOSTNAME = "hostName";
public static final String HBASE_STORAGE_HANDLER_CLASS = "org.apache.hadoop.hive.hbase.HBaseStorageHandler";
public static final String HBASE_DEFAULT_NAMESPACE = "default";
......@@ -146,6 +154,7 @@ public abstract class BaseHiveEvent {
public static final String HBASE_PARAM_TABLE_NAME = "hbase.table.name";
public static final long MILLIS_CONVERT_FACTOR = 1000;
public static final String HDFS_PATH_PREFIX = "hdfs://";
public static final String EMPTY_ATTRIBUTE_VALUE = "";
public static final Map<Integer, String> OWNER_TYPE_TO_ENUM_VALUE = new HashMap<>();
......@@ -605,14 +614,41 @@ public abstract class BaseHiveEvent {
ret.setAttribute(ATTRIBUTE_OUTPUTS, getObjectIds(outputs));
ret.setAttribute(ATTRIBUTE_NAME, queryStr);
ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, getOperationName());
// We are setting an empty value to these attributes, since now we have a new entity type called hive process
// execution which captures these values. We have to set empty values here because these attributes are
// mandatory attributes for hive process entity type.
ret.setAttribute(ATTRIBUTE_START_TIME, EMPTY_ATTRIBUTE_VALUE);
ret.setAttribute(ATTRIBUTE_END_TIME, EMPTY_ATTRIBUTE_VALUE);
ret.setAttribute(ATTRIBUTE_USER_NAME, EMPTY_ATTRIBUTE_VALUE);
ret.setAttribute(ATTRIBUTE_QUERY_TEXT, EMPTY_ATTRIBUTE_VALUE);
ret.setAttribute(ATTRIBUTE_QUERY_ID, EMPTY_ATTRIBUTE_VALUE);
ret.setAttribute(ATTRIBUTE_QUERY_PLAN, "Not Supported");
ret.setAttribute(ATTRIBUTE_RECENT_QUERIES, Collections.singletonList(queryStr));
return ret;
}
protected AtlasEntity getHiveProcessExecutionEntity(AtlasEntity hiveProcess) throws Exception {
AtlasEntity ret = new AtlasEntity(HIVE_TYPE_PROCESS_EXECUTION);
String queryStr = getQueryString();
if (queryStr != null) {
queryStr = queryStr.toLowerCase().trim();
}
Long endTime = System.currentTimeMillis();
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME).toString() +
QNAME_SEP_PROCESS + getQueryStartTime().toString() +
QNAME_SEP_PROCESS + endTime.toString());
ret.setAttribute(ATTRIBUTE_NAME, queryStr + QNAME_SEP_PROCESS + getQueryStartTime().toString());
ret.setAttribute(ATTRIBUTE_START_TIME, getQueryStartTime());
ret.setAttribute(ATTRIBUTE_END_TIME, System.currentTimeMillis());
ret.setAttribute(ATTRIBUTE_END_TIME, endTime);
ret.setAttribute(ATTRIBUTE_USER_NAME, getUserName());
ret.setAttribute(ATTRIBUTE_QUERY_TEXT, queryStr);
ret.setAttribute(ATTRIBUTE_QUERY_ID, getQueryId());
ret.setAttribute(ATTRIBUTE_QUERY_PLAN, "Not Supported");
ret.setAttribute(ATTRIBUTE_RECENT_QUERIES, Collections.singletonList(queryStr));
ret.setAttribute(ATTRIBUTE_HOSTNAME, getContext().getHostName());
ret.setRelationshipAttribute(ATTRIBUTE_PROCESS, AtlasTypeUtil.toAtlasRelatedObjectId(hiveProcess));
return ret;
}
......
......@@ -117,6 +117,9 @@ public class CreateHiveProcess extends BaseHiveEvent {
ret.addEntity(process);
AtlasEntity processExecution = getHiveProcessExecutionEntity(process);
ret.addEntity(processExecution);
processColumnLineage(process, ret);
addProcessedEntities(ret);
......
......@@ -130,8 +130,11 @@ public class CreateTable extends BaseHiveEvent {
} else {
processEntity = getHiveProcessEntity(Collections.singletonList(tblEntity), Collections.singletonList(hbaseTableEntity));
}
ret.addEntity(processEntity);
AtlasEntity processExecution = getHiveProcessExecutionEntity(processEntity);
ret.addEntity(processExecution);
}
} else {
if (EXTERNAL_TABLE.equals(table.getTableType())) {
......@@ -140,6 +143,9 @@ public class CreateTable extends BaseHiveEvent {
ret.addEntity(processEntity);
ret.addReferredEntity(hdfsPathEntity);
AtlasEntity processExecution = getHiveProcessExecutionEntity(processEntity);
ret.addEntity(processExecution);
}
}
}
......
......@@ -43,7 +43,8 @@ public enum HiveDataTypes {
HIVE_ROLE,
HIVE_TYPE,
HIVE_PROCESS,
HIVE_COLUMN_LINEAGE
HIVE_COLUMN_LINEAGE,
HIVE_PROCESS_EXECUTION,
// HIVE_VIEW,
;
......
......@@ -218,6 +218,25 @@ public class HiveITBase {
return (String) entity.getGuid();
}
protected String assertEntityIsRegisteredViaGuid(String guid,
final HiveHookIT.AssertPredicate assertPredicate) throws Exception {
waitFor(80000, new HiveHookIT.Predicate() {
@Override
public void evaluate() throws Exception {
AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.getEntityByGuid(guid);
AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
assertNotNull(entity);
if (assertPredicate != null) {
assertPredicate.assertOnEntity(entity);
}
}
});
AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.getEntityByGuid(guid);
AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
return (String) entity.getGuid();
}
protected AtlasEntity assertEntityIsRegistedViaEntity(final String typeName, final String property, final String value,
final HiveHookIT.AssertPredicate assertPredicate) throws Exception {
waitFor(80000, new HiveHookIT.Predicate() {
......@@ -473,7 +492,6 @@ public class HiveITBase {
return buffer.toString();
}
protected static Entity getEntityByType(Set<? extends Entity> entities, Entity.Type entityType) {
for (Entity entity : entities) {
if (entity.getType() == entityType) {
......
......@@ -324,6 +324,15 @@
"isUnique": false
}
]
},
{
"name": "ProcessExecution",
"superTypes": [
"Asset"
],
"serviceType": "atlas_core",
"typeVersion": "1.0",
"attributeDefs": []
}
],
"relationshipDefs": [
......
......@@ -457,6 +457,80 @@
"isUnique": false
}
]
},
{
"name" : "hive_process_execution",
"superTypes" : [
"ProcessExecution"
],
"serviceType": "hive",
"typeVersion" : "1.0",
"attributeDefs" : [
{
"name": "startTime",
"typeName": "date",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "endTime",
"typeName": "date",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "userName",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "queryText",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "queryGraph",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "queryId",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "queryPlan",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "hostName",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
}
]
}
],
"relationshipDefs": [
......@@ -567,6 +641,24 @@
"cardinality": "SET"
},
"propagateTags": "NONE"
},
{
"name": "hive_process_process_executions",
"serviceType": "hive",
"typeVersion": "1.0",
"relationshipCategory": "COMPOSITION",
"endDef1": {
"type": "hive_process",
"name": "processExecutions",
"cardinality": "SET",
"isContainer": true
},
"endDef2": {
"type": "hive_process_execution",
"name": "process",
"cardinality": "SINGLE"
},
"propagateTags": "NONE"
}
]
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment