Commit d4858609 by Sarath Subramanian

ATLAS-3708: Update Hive hook to create ADLS-Gen2 entities for ABFS path references

parent 64b24351
......@@ -81,10 +81,16 @@ public abstract class BaseHiveEvent {
public static final String AWS_S3_OBJECT = "aws_s3_object";
public static final String AWS_S3_V2_BUCKET = "aws_s3_v2_bucket";
public static final String AWS_S3_V2_PSEUDO_DIR = "aws_s3_v2_directory";
public static final String ADLS_GEN2_ACCOUNT = "adls_gen2_account";
public static final String ADLS_GEN2_CONTAINER = "adls_gen2_container";
public static final String ADLS_GEN2_DIRECTORY = "adls_gen2_directory";
public static final String ADLS_GEN2_ACCOUNT_HOST_SUFFIX = ".dfs.core.windows.net";
public static final String SCHEME_SEPARATOR = "://";
public static final String S3_SCHEME = "s3" + SCHEME_SEPARATOR;
public static final String S3A_SCHEME = "s3a" + SCHEME_SEPARATOR;
public static final String ABFS_SCHEME = "abfs" + SCHEME_SEPARATOR;
public static final String ABFSS_SCHEME = "abfss" + SCHEME_SEPARATOR;
public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
public static final String ATTRIBUTE_NAME = "name";
......@@ -146,6 +152,8 @@ public abstract class BaseHiveEvent {
public static final String ATTRIBUTE_EXEC_TIME = "execTime";
public static final String ATTRIBUTE_DDL_QUERIES = "ddlQueries";
public static final String ATTRIBUTE_SERVICE_TYPE = "serviceType";
public static final String ATTRIBUTE_ACCOUNT = "account";
public static final String ATTRIBUTE_PARENT = "parent";
public static final String HBASE_STORAGE_HANDLER_CLASS = "org.apache.hadoop.hive.hbase.HBaseStorageHandler";
public static final String HBASE_DEFAULT_NAMESPACE = "default";
......@@ -168,6 +176,8 @@ public abstract class BaseHiveEvent {
public static final String RELATIONSHIP_HIVE_DB_DDL_QUERIES = "hive_db_ddl_queries";
public static final String RELATIONSHIP_HIVE_TABLE_DDL_QUERIES = "hive_table_ddl_queries";
public static final String RELATIONSHIP_HBASE_TABLE_NAMESPACE = "hbase_table_namespace";
public static final String RELATIONSHIP_ADLS_GEN2_ACCOUNT_CONTAINERS = "adls_gen2_account_containers";
public static final String RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN = "adls_gen2_parent_children";
public static final Map<Integer, String> OWNER_TYPE_TO_ENUM_VALUE = new HashMap<>();
......@@ -588,6 +598,8 @@ public abstract class BaseHiveEvent {
} else {
ret = addS3PathEntityV1(path, strPath, extInfo);
}
} else if (isAbfsPath(strPath)) {
ret = addAbfsPathEntity(path, strPath, extInfo);
} else {
String nameServiceID = HdfsNameServiceResolver.getNameServiceIDForPath(strPath);
String attrPath = StringUtils.isEmpty(nameServiceID) ? strPath : HdfsNameServiceResolver.getPathWithNameServiceID(strPath);
......@@ -623,23 +635,27 @@ public abstract class BaseHiveEvent {
protected AtlasEntity getHiveProcessEntity(List<AtlasEntity> inputs, List<AtlasEntity> outputs) throws Exception {
AtlasEntity ret = new AtlasEntity(HIVE_TYPE_PROCESS);
String queryStr = getQueryString();
String qualifiedName = getQualifiedName(inputs, outputs);
if (queryStr != null) {
queryStr = queryStr.toLowerCase().trim();
}
ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, getOperationName());
String qualifiedName = getQualifiedName(inputs, outputs);
if (context.isMetastoreHook()) {
HiveOperation operation = context.getHiveOperation();
if (operation == HiveOperation.CREATETABLE || operation == HiveOperation.CREATETABLE_AS_SELECT) {
AtlasEntity table = outputs.get(0);
long createTime = Long.valueOf((Long)table.getAttribute(ATTRIBUTE_CREATE_TIME));
qualifiedName = (String) table.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + QNAME_SEP_PROCESS + createTime;
ret.setAttribute(ATTRIBUTE_NAME, "dummyProcess:" + UUID.randomUUID());
ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, operation.getOperationName());
}
}
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, qualifiedName);
ret.setRelationshipAttribute(ATTRIBUTE_INPUTS, AtlasTypeUtil.getAtlasRelatedObjectIds(inputs, RELATIONSHIP_DATASET_PROCESS_INPUTS));
......@@ -650,6 +666,7 @@ public abstract class BaseHiveEvent {
// mandatory attributes for hive process entity type.
ret.setAttribute(ATTRIBUTE_START_TIME, System.currentTimeMillis());
ret.setAttribute(ATTRIBUTE_END_TIME, System.currentTimeMillis());
if (context.isHiveProcessPopulateDeprecatedAttributes()) {
ret.setAttribute(ATTRIBUTE_USER_NAME, getUserName());
ret.setAttribute(ATTRIBUTE_QUERY_TEXT, queryStr);
......@@ -659,6 +676,7 @@ public abstract class BaseHiveEvent {
ret.setAttribute(ATTRIBUTE_QUERY_TEXT, EMPTY_ATTRIBUTE_VALUE);
ret.setAttribute(ATTRIBUTE_QUERY_ID, EMPTY_ATTRIBUTE_VALUE);
}
ret.setAttribute(ATTRIBUTE_QUERY_PLAN, "Not Supported");
ret.setAttribute(ATTRIBUTE_RECENT_QUERIES, Collections.singletonList(queryStr));
ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getMetadataNamespace());
......@@ -1131,6 +1149,10 @@ public abstract class BaseHiveEvent {
return strPath != null && (strPath.startsWith(S3_SCHEME) || strPath.startsWith(S3A_SCHEME));
}
private boolean isAbfsPath(String strPath) {
return strPath != null && (strPath.startsWith(ABFS_SCHEME) || strPath.startsWith(ABFSS_SCHEME));
}
private AtlasEntity addS3PathEntityV1(Path path, String strPath, AtlasEntityExtInfo extInfo) {
String metadataNamespace = getMetadataNamespace();
String bucketName = path.toUri().getAuthority();
......@@ -1239,6 +1261,119 @@ public abstract class BaseHiveEvent {
return ret;
}
private AtlasEntity addAbfsPathEntity(Path path, String strPath, AtlasEntityExtInfo extInfo) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> addAbfsPathEntity(strPath={})", strPath);
}
String metadataNamespace = getMetadataNamespace();
String pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
AtlasEntity ret = context.getEntity(pathQualifiedName);
if (ret == null) {
String abfsScheme = path.toUri().getScheme();
String storageAcctName = getAbfsStorageAccountName(path.toUri());
String schemeAndStorageAcctName = (abfsScheme + SCHEME_SEPARATOR + storageAcctName).toLowerCase();
String storageAcctQualifiedName = schemeAndStorageAcctName + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
AtlasEntity storageAcctEntity = context.getEntity(storageAcctQualifiedName);
// create adls-gen2 storage-account entity
if (storageAcctEntity == null) {
storageAcctEntity = new AtlasEntity(ADLS_GEN2_ACCOUNT);
storageAcctEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, storageAcctQualifiedName);
storageAcctEntity.setAttribute(ATTRIBUTE_NAME, storageAcctName);
if (LOG.isDebugEnabled()) {
LOG.debug("adding entity: typeName={}, qualifiedName={}", storageAcctEntity.getTypeName(), storageAcctEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
context.putEntity(storageAcctQualifiedName, storageAcctEntity);
}
extInfo.addReferredEntity(storageAcctEntity);
AtlasRelatedObjectId storageAcctObjId = AtlasTypeUtil.getAtlasRelatedObjectId(storageAcctEntity, RELATIONSHIP_ADLS_GEN2_ACCOUNT_CONTAINERS);
// create adls-gen2 container entity linking to storage account
String containerName = path.toUri().getUserInfo();
String schemeAndContainerName = (abfsScheme + SCHEME_SEPARATOR + containerName + QNAME_SEP_METADATA_NAMESPACE + storageAcctName).toLowerCase();
String containerQualifiedName = schemeAndContainerName + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
AtlasEntity containerEntity = context.getEntity(containerQualifiedName);
if (containerEntity == null) {
containerEntity = new AtlasEntity(ADLS_GEN2_CONTAINER);
containerEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, containerQualifiedName);
containerEntity.setAttribute(ATTRIBUTE_NAME, containerName);
containerEntity.setRelationshipAttribute(ATTRIBUTE_ACCOUNT, storageAcctObjId);
if (LOG.isDebugEnabled()) {
LOG.debug("adding entity: typeName={}, qualifiedName={}", containerEntity.getTypeName(), containerEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
context.putEntity(containerQualifiedName, containerEntity);
}
extInfo.addReferredEntity(containerEntity);
// create adls-gen2 directory entity linking to container
AtlasRelatedObjectId parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(containerEntity, RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN);
String parentPath = Path.SEPARATOR;
String dirPath = path.toUri().getPath();
if (StringUtils.isEmpty(dirPath)) {
dirPath = Path.SEPARATOR;
}
for (String subDirName : dirPath.split(Path.SEPARATOR)) {
if (StringUtils.isEmpty(subDirName)) {
continue;
}
String subDirPath = parentPath + subDirName + Path.SEPARATOR;
String subDirQualifiedName = schemeAndContainerName + subDirPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
ret = new AtlasEntity(ADLS_GEN2_DIRECTORY);
ret.setRelationshipAttribute(ATTRIBUTE_PARENT, parentObjId);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, subDirQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, subDirName);
if (LOG.isDebugEnabled()) {
LOG.debug("adding entity: typeName={}, qualifiedName={}", ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
context.putEntity(subDirQualifiedName, ret);
parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret, RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN);
parentPath = subDirPath;
}
if (ret == null) {
ret = storageAcctEntity;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== addAbfsPathEntity(strPath={})", strPath);
}
return ret;
}
private String getAbfsStorageAccountName(URI uri) {
String ret = null;
String host = uri.getHost();
// host: "<account_name>.dfs.core.windows.net"
if (StringUtils.isNotEmpty(host) && host.contains(ADLS_GEN2_ACCOUNT_HOST_SUFFIX)) {
ret = host.substring(0, host.indexOf(ADLS_GEN2_ACCOUNT_HOST_SUFFIX));
}
return ret;
}
static final class EntityComparator implements Comparator<Entity> {
@Override
public int compare(Entity entity1, Entity entity2) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment