Commit 8b50ac0c by Nikhil Bonte Committed by Sarath Subramanian

ATLAS-3836 Add Apache Ozone support in hive hook

parent ce95c629
......@@ -157,6 +157,7 @@ public class AtlasHiveHookContext {
public Collection<AtlasEntity> getEntities() { return qNameEntityMap.values(); }
public Map<String, AtlasEntity> getQNameToEntityMap() { return qNameEntityMap; }
public String getMetadataNamespace() {
return hook.getMetadataNamespace();
......@@ -168,8 +169,8 @@ public class AtlasHiveHookContext {
return hook.isConvertHdfsPathToLowerCase();
}
public boolean isAwsS3AtlasModelVersionV2() {
return hook.isAwsS3AtlasModelVersionV2();
public String getAwsS3AtlasModelVersion() {
return hook.getAwsS3AtlasModelVersion();
}
public boolean getSkipHiveColumnLineageHive20633() {
......
......@@ -77,7 +77,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private static final int nameCacheDatabaseMaxCount;
private static final int nameCacheTableMaxCount;
private static final int nameCacheRebuildIntervalSeconds;
private static final boolean isAwsS3AtlasModelVersionV2;
private static final String awsS3AtlasModelVersion;
private static final boolean skipHiveColumnLineageHive20633;
private static final int skipHiveColumnLineageHive20633InputsThreshold;
......@@ -101,7 +101,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
nameCacheDatabaseMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_DATABASE_COUNT, 10000);
nameCacheTableMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_TABLE_COUNT, 10000);
nameCacheRebuildIntervalSeconds = atlasProperties.getInt(HOOK_NAME_CACHE_REBUID_INTERVAL_SEC, 60 * 60); // 60 minutes default
isAwsS3AtlasModelVersionV2 = StringUtils.equalsIgnoreCase(atlasProperties.getString(HOOK_AWS_S3_ATLAS_MODEL_VERSION, HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2), HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2);
awsS3AtlasModelVersion = atlasProperties.getString(HOOK_AWS_S3_ATLAS_MODEL_VERSION, HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2);
skipHiveColumnLineageHive20633 = atlasProperties.getBoolean(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
skipHiveColumnLineageHive20633InputsThreshold = atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15
hiveProcessPopulateDeprecatedAttributes = atlasProperties.getBoolean(HOOK_HIVE_PROCESS_POPULATE_DEPRECATED_ATTRIBUTES, false);
......@@ -257,7 +257,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return convertHdfsPathToLowerCase;
}
public boolean isAwsS3AtlasModelVersionV2() { return isAwsS3AtlasModelVersionV2; }
public String getAwsS3AtlasModelVersion() {
return awsS3AtlasModelVersion;
}
public boolean getSkipHiveColumnLineageHive20633() {
return skipHiveColumnLineageHive20633;
......
......@@ -20,6 +20,7 @@ package org.apache.atlas.hive.hook.events;
import org.apache.atlas.hive.hook.AtlasHiveHookContext;
import org.apache.atlas.hive.hook.HiveHook.PreprocessAction;
import org.apache.atlas.utils.PathExtractorContext;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
......@@ -29,6 +30,7 @@ import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.utils.AtlasPathExtractorUtil;
import org.apache.atlas.utils.HdfsNameServiceResolver;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
......@@ -73,25 +75,8 @@ public abstract class BaseHiveEvent {
public static final String HIVE_TYPE_PROCESS_EXECUTION = "hive_process_execution";
public static final String HIVE_DB_DDL = "hive_db_ddl";
public static final String HIVE_TABLE_DDL = "hive_table_ddl";
public static final String HDFS_TYPE_PATH = "hdfs_path";
public static final String HBASE_TYPE_TABLE = "hbase_table";
public static final String HBASE_TYPE_NAMESPACE = "hbase_namespace";
public static final String AWS_S3_BUCKET = "aws_s3_bucket";
public static final String AWS_S3_PSEUDO_DIR = "aws_s3_pseudo_dir";
public static final String AWS_S3_OBJECT = "aws_s3_object";
public static final String AWS_S3_V2_BUCKET = "aws_s3_v2_bucket";
public static final String AWS_S3_V2_PSEUDO_DIR = "aws_s3_v2_directory";
public static final String ADLS_GEN2_ACCOUNT = "adls_gen2_account";
public static final String ADLS_GEN2_CONTAINER = "adls_gen2_container";
public static final String ADLS_GEN2_DIRECTORY = "adls_gen2_directory";
public static final String ADLS_GEN2_ACCOUNT_HOST_SUFFIX = ".dfs.core.windows.net";
public static final String SCHEME_SEPARATOR = "://";
public static final String S3_SCHEME = "s3" + SCHEME_SEPARATOR;
public static final String S3A_SCHEME = "s3a" + SCHEME_SEPARATOR;
public static final String ABFS_SCHEME = "abfs" + SCHEME_SEPARATOR;
public static final String ABFSS_SCHEME = "abfss" + SCHEME_SEPARATOR;
public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
public static final String ATTRIBUTE_NAME = "name";
public static final String ATTRIBUTE_DESCRIPTION = "description";
......@@ -145,16 +130,10 @@ public abstract class BaseHiveEvent {
public static final String ATTRIBUTE_URI = "uri";
public static final String ATTRIBUTE_STORAGE_HANDLER = "storage_handler";
public static final String ATTRIBUTE_NAMESPACE = "namespace";
public static final String ATTRIBUTE_OBJECT_PREFIX = "objectPrefix";
public static final String ATTRIBUTE_BUCKET = "bucket";
public static final String ATTRIBUTE_CONTAINER = "container";
public static final String ATTRIBUTE_HOSTNAME = "hostName";
public static final String ATTRIBUTE_EXEC_TIME = "execTime";
public static final String ATTRIBUTE_DDL_QUERIES = "ddlQueries";
public static final String ATTRIBUTE_SERVICE_TYPE = "serviceType";
public static final String ATTRIBUTE_ACCOUNT = "account";
public static final String ATTRIBUTE_PARENT = "parent";
public static final String HBASE_STORAGE_HANDLER_CLASS = "org.apache.hadoop.hive.hbase.HBaseStorageHandler";
public static final String HBASE_DEFAULT_NAMESPACE = "default";
public static final String HBASE_NAMESPACE_TABLE_DELIMITER = ":";
......@@ -170,14 +149,10 @@ public abstract class BaseHiveEvent {
public static final String RELATIONSHIP_HIVE_TABLE_PART_KEYS = "hive_table_partitionkeys";
public static final String RELATIONSHIP_HIVE_TABLE_COLUMNS = "hive_table_columns";
public static final String RELATIONSHIP_HIVE_TABLE_STORAGE_DESC = "hive_table_storagedesc";
public static final String RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS = "aws_s3_bucket_aws_s3_pseudo_dirs";
public static final String RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED = "aws_s3_v2_container_contained";
public static final String RELATIONSHIP_HIVE_PROCESS_PROCESS_EXE = "hive_process_process_executions";
public static final String RELATIONSHIP_HIVE_DB_DDL_QUERIES = "hive_db_ddl_queries";
public static final String RELATIONSHIP_HIVE_TABLE_DDL_QUERIES = "hive_table_ddl_queries";
public static final String RELATIONSHIP_HBASE_TABLE_NAMESPACE = "hbase_table_namespace";
public static final String RELATIONSHIP_ADLS_GEN2_ACCOUNT_CONTAINERS = "adls_gen2_account_containers";
public static final String RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN = "adls_gen2_parent_children";
public static final Map<Integer, String> OWNER_TYPE_TO_ENUM_VALUE = new HashMap<>();
......@@ -584,52 +559,21 @@ public abstract class BaseHiveEvent {
}
protected AtlasEntity getPathEntity(Path path, AtlasEntityExtInfo extInfo) {
AtlasEntity ret;
String strPath = path.toString();
String metadataNamespace = getMetadataNamespace();
if (strPath.startsWith(HDFS_PATH_PREFIX) && context.isConvertHdfsPathToLowerCase()) {
strPath = strPath.toLowerCase();
}
if (isS3Path(strPath)) {
if (context.isAwsS3AtlasModelVersionV2()) {
ret = addS3PathEntityV2(path, strPath, extInfo);
} else {
ret = addS3PathEntityV1(path, strPath, extInfo);
}
} else if (isAbfsPath(strPath)) {
ret = addAbfsPathEntity(path, strPath, extInfo);
} else {
String nameServiceID = HdfsNameServiceResolver.getNameServiceIDForPath(strPath);
String attrPath = StringUtils.isEmpty(nameServiceID) ? strPath : HdfsNameServiceResolver.getPathWithNameServiceID(strPath);
String pathQualifiedName = getQualifiedName(attrPath);
ret = context.getEntity(pathQualifiedName);
if (ret == null) {
ret = new AtlasEntity(HDFS_TYPE_PATH);
if (StringUtils.isNotEmpty(nameServiceID)) {
ret.setAttribute(ATTRIBUTE_NAMESERVICE_ID, nameServiceID);
}
String name = Path.getPathWithoutSchemeAndAuthority(path).toString();
String strPath = path.toString();
String metadataNamespace = getMetadataNamespace();
boolean isConvertPathToLowerCase = strPath.startsWith(HDFS_PATH_PREFIX) && context.isConvertHdfsPathToLowerCase();
PathExtractorContext pathExtractorContext = new PathExtractorContext(metadataNamespace, context.getQNameToEntityMap(),
isConvertPathToLowerCase, context.getAwsS3AtlasModelVersion());
if (strPath.startsWith(HDFS_PATH_PREFIX) && context.isConvertHdfsPathToLowerCase()) {
name = name.toLowerCase();
}
ret.setAttribute(ATTRIBUTE_PATH, attrPath);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, name);
ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace);
AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, pathExtractorContext);
context.putEntity(pathQualifiedName, ret);
if (entityWithExtInfo.getReferredEntities() != null){
for (AtlasEntity entity : entityWithExtInfo.getReferredEntities().values()) {
extInfo.addReferredEntity(entity);
}
}
return ret;
return entityWithExtInfo.getEntity();
}
protected AtlasEntity getHiveProcessEntity(List<AtlasEntity> inputs, List<AtlasEntity> outputs) throws Exception {
......@@ -1145,235 +1089,6 @@ public abstract class BaseHiveEvent {
return false;
}
private boolean isS3Path(String strPath) {
return strPath != null && (strPath.startsWith(S3_SCHEME) || strPath.startsWith(S3A_SCHEME));
}
private boolean isAbfsPath(String strPath) {
return strPath != null && (strPath.startsWith(ABFS_SCHEME) || strPath.startsWith(ABFSS_SCHEME));
}
private AtlasEntity addS3PathEntityV1(Path path, String strPath, AtlasEntityExtInfo extInfo) {
String metadataNamespace = getMetadataNamespace();
String bucketName = path.toUri().getAuthority();
String bucketQualifiedName = (path.toUri().getScheme() + SCHEME_SEPARATOR + path.toUri().getAuthority() + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
String pathQualifiedName = (strPath + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
AtlasEntity bucketEntity = context.getEntity(bucketQualifiedName);
AtlasEntity ret = context.getEntity(pathQualifiedName);
if (ret == null) {
if (bucketEntity == null) {
bucketEntity = new AtlasEntity(AWS_S3_BUCKET);
bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, bucketQualifiedName);
bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
context.putEntity(bucketQualifiedName, bucketEntity);
}
extInfo.addReferredEntity(bucketEntity);
ret = new AtlasEntity(AWS_S3_PSEUDO_DIR);
ret.setRelationshipAttribute(ATTRIBUTE_BUCKET, AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS));
ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
context.putEntity(pathQualifiedName, ret);
}
return ret;
}
private AtlasEntity addS3PathEntityV2(Path path, String strPath, AtlasEntityExtInfo extInfo) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> addS3PathEntityV2(strPath={})", strPath);
}
String metadataNamespace = getMetadataNamespace();
String pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
AtlasEntity ret = context.getEntity(pathQualifiedName);
if (ret == null) {
String bucketName = path.toUri().getAuthority();
String schemeAndBucketName = (path.toUri().getScheme() + SCHEME_SEPARATOR + bucketName).toLowerCase();
String bucketQualifiedName = schemeAndBucketName + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
AtlasEntity bucketEntity = context.getEntity(bucketQualifiedName);
if (bucketEntity == null) {
bucketEntity = new AtlasEntity(AWS_S3_V2_BUCKET);
bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, bucketQualifiedName);
bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
if (LOG.isDebugEnabled()) {
LOG.debug("adding entity: typeName={}, qualifiedName={}", bucketEntity.getTypeName(), bucketEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
context.putEntity(bucketQualifiedName, bucketEntity);
}
extInfo.addReferredEntity(bucketEntity);
AtlasRelatedObjectId parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED);
String parentPath = Path.SEPARATOR;
String dirPath = path.toUri().getPath();
if (StringUtils.isEmpty(dirPath)) {
dirPath = Path.SEPARATOR;
}
for (String subDirName : dirPath.split(Path.SEPARATOR)) {
if (StringUtils.isEmpty(subDirName)) {
continue;
}
String subDirPath = parentPath + subDirName + Path.SEPARATOR;
String subDirQualifiedName = schemeAndBucketName + subDirPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
ret = new AtlasEntity(AWS_S3_V2_PSEUDO_DIR);
ret.setRelationshipAttribute(ATTRIBUTE_CONTAINER, parentObjId);
ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, subDirPath);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, subDirQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, subDirName);
if (LOG.isDebugEnabled()) {
LOG.debug("adding entity: typeName={}, qualifiedName={}", ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
context.putEntity(subDirQualifiedName, ret);
parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret, RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED);
parentPath = subDirPath;
}
if (ret == null) {
ret = bucketEntity;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== addS3PathEntityV2(strPath={})", strPath);
}
return ret;
}
private AtlasEntity addAbfsPathEntity(Path path, String strPath, AtlasEntityExtInfo extInfo) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> addAbfsPathEntity(strPath={})", strPath);
}
String metadataNamespace = getMetadataNamespace();
String pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
AtlasEntity ret = context.getEntity(pathQualifiedName);
if (ret == null) {
String abfsScheme = path.toUri().getScheme();
String storageAcctName = getAbfsStorageAccountName(path.toUri());
String schemeAndStorageAcctName = (abfsScheme + SCHEME_SEPARATOR + storageAcctName).toLowerCase();
String storageAcctQualifiedName = schemeAndStorageAcctName + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
AtlasEntity storageAcctEntity = context.getEntity(storageAcctQualifiedName);
// create adls-gen2 storage-account entity
if (storageAcctEntity == null) {
storageAcctEntity = new AtlasEntity(ADLS_GEN2_ACCOUNT);
storageAcctEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, storageAcctQualifiedName);
storageAcctEntity.setAttribute(ATTRIBUTE_NAME, storageAcctName);
if (LOG.isDebugEnabled()) {
LOG.debug("adding entity: typeName={}, qualifiedName={}", storageAcctEntity.getTypeName(), storageAcctEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
context.putEntity(storageAcctQualifiedName, storageAcctEntity);
}
extInfo.addReferredEntity(storageAcctEntity);
AtlasRelatedObjectId storageAcctObjId = AtlasTypeUtil.getAtlasRelatedObjectId(storageAcctEntity, RELATIONSHIP_ADLS_GEN2_ACCOUNT_CONTAINERS);
// create adls-gen2 container entity linking to storage account
String containerName = path.toUri().getUserInfo();
String schemeAndContainerName = (abfsScheme + SCHEME_SEPARATOR + containerName + QNAME_SEP_METADATA_NAMESPACE + storageAcctName).toLowerCase();
String containerQualifiedName = schemeAndContainerName + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
AtlasEntity containerEntity = context.getEntity(containerQualifiedName);
if (containerEntity == null) {
containerEntity = new AtlasEntity(ADLS_GEN2_CONTAINER);
containerEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, containerQualifiedName);
containerEntity.setAttribute(ATTRIBUTE_NAME, containerName);
containerEntity.setRelationshipAttribute(ATTRIBUTE_ACCOUNT, storageAcctObjId);
if (LOG.isDebugEnabled()) {
LOG.debug("adding entity: typeName={}, qualifiedName={}", containerEntity.getTypeName(), containerEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
context.putEntity(containerQualifiedName, containerEntity);
}
extInfo.addReferredEntity(containerEntity);
// create adls-gen2 directory entity linking to container
AtlasRelatedObjectId parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(containerEntity, RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN);
String parentPath = Path.SEPARATOR;
String dirPath = path.toUri().getPath();
if (StringUtils.isEmpty(dirPath)) {
dirPath = Path.SEPARATOR;
}
for (String subDirName : dirPath.split(Path.SEPARATOR)) {
if (StringUtils.isEmpty(subDirName)) {
continue;
}
String subDirPath = parentPath + subDirName + Path.SEPARATOR;
String subDirQualifiedName = schemeAndContainerName + subDirPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
ret = new AtlasEntity(ADLS_GEN2_DIRECTORY);
ret.setRelationshipAttribute(ATTRIBUTE_PARENT, parentObjId);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, subDirQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, subDirName);
if (LOG.isDebugEnabled()) {
LOG.debug("adding entity: typeName={}, qualifiedName={}", ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
context.putEntity(subDirQualifiedName, ret);
parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret, RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN);
parentPath = subDirPath;
}
if (ret == null) {
ret = storageAcctEntity;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== addAbfsPathEntity(strPath={})", strPath);
}
return ret;
}
private String getAbfsStorageAccountName(URI uri) {
String ret = null;
String host = uri.getHost();
// host: "<account_name>.dfs.core.windows.net"
if (StringUtils.isNotEmpty(host) && host.contains(ADLS_GEN2_ACCOUNT_HOST_SUFFIX)) {
ret = host.substring(0, host.indexOf(ADLS_GEN2_ACCOUNT_HOST_SUFFIX));
}
return ret;
}
static final class EntityComparator implements Comparator<Entity> {
@Override
public int compare(Entity entity1, Entity entity2) {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.utils;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
public class AtlasPathExtractorUtil {
private static final Logger LOG = LoggerFactory.getLogger(AtlasPathExtractorUtil.class);
// Common
public static final char QNAME_SEP_METADATA_NAMESPACE = '@';
public static final char QNAME_SEP_ENTITY_NAME = '.';
public static final String SCHEME_SEPARATOR = "://";
public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
public static final String ATTRIBUTE_NAME = "name";
public static final String ATTRIBUTE_BUCKET = "bucket";
// HDFS
public static final String HDFS_TYPE_PATH = "hdfs_path";
public static final String ATTRIBUTE_PATH = "path";
public static final String ATTRIBUTE_CLUSTER_NAME = "clusterName";
public static final String ATTRIBUTE_NAMESERVICE_ID = "nameServiceId";
// AWS S3
public static final String AWS_S3_ATLAS_MODEL_VERSION_V2 = "v2";
public static final String AWS_S3_BUCKET = "aws_s3_bucket";
public static final String AWS_S3_PSEUDO_DIR = "aws_s3_pseudo_dir";
public static final String AWS_S3_V2_BUCKET = "aws_s3_v2_bucket";
public static final String AWS_S3_V2_PSEUDO_DIR = "aws_s3_v2_directory";
public static final String S3_SCHEME = "s3" + SCHEME_SEPARATOR;
public static final String S3A_SCHEME = "s3a" + SCHEME_SEPARATOR;
public static final String ATTRIBUTE_CONTAINER = "container";
public static final String ATTRIBUTE_OBJECT_PREFIX = "objectPrefix";
public static final String RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS = "aws_s3_bucket_aws_s3_pseudo_dirs";
public static final String RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED = "aws_s3_v2_container_contained";
// ADLS Gen2
public static final String ADLS_GEN2_ACCOUNT = "adls_gen2_account";
public static final String ADLS_GEN2_CONTAINER = "adls_gen2_container";
public static final String ADLS_GEN2_DIRECTORY = "adls_gen2_directory";
public static final String ADLS_GEN2_ACCOUNT_HOST_SUFFIX = ".dfs.core.windows.net";
public static final String ABFS_SCHEME = "abfs" + SCHEME_SEPARATOR;
public static final String ABFSS_SCHEME = "abfss" + SCHEME_SEPARATOR;
public static final String ATTRIBUTE_ACCOUNT = "account";
public static final String ATTRIBUTE_PARENT = "parent";
public static final String RELATIONSHIP_ADLS_GEN2_ACCOUNT_CONTAINERS = "adls_gen2_account_containers";
public static final String RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN = "adls_gen2_parent_children";
// Ozone
public static final String OZONE_VOLUME = "ozone_volume";
public static final String OZONE_BUCKET = "ozone_bucket";
public static final String OZONE_KEY = "ozone_key";
public static final String OZONE_SCHEME = "ofs" + SCHEME_SEPARATOR;
public static final String OZONE_3_SCHEME = "o3fs" + SCHEME_SEPARATOR;
public static final String ATTRIBUTE_VOLUME = "volume";
public static final String RELATIONSHIP_OZONE_VOLUME_BUCKET = "ozone_volume_buckets";
public static final String RELATIONSHIP_OZONE_BUCKET_KEY = "ozone_bucket_keys";
public static AtlasEntityWithExtInfo getPathEntity(Path path, PathExtractorContext context) {
AtlasEntityWithExtInfo entityWithExtInfo = new AtlasEntityWithExtInfo();
AtlasEntity ret;
String strPath = path.toString();
if (context.isConvertPathToLowerCase()) {
strPath = strPath.toLowerCase();
}
if (isS3Path(strPath)) {
ret = isAwsS3AtlasModelVersionV2(context) ? addS3PathEntityV2(path, entityWithExtInfo, context) :
addS3PathEntityV1(path, entityWithExtInfo, context);
} else if (isAbfsPath(strPath)) {
ret = addAbfsPathEntity(path, entityWithExtInfo, context);
} else if (isOzonePath(strPath)) {
ret = addOzonePathEntity(path, entityWithExtInfo, context);
} else {
ret = addHDFSPathEntity(path, context);
}
entityWithExtInfo.setEntity(ret);
return entityWithExtInfo;
}
private static boolean isAwsS3AtlasModelVersionV2(PathExtractorContext context) {
return StringUtils.isNotEmpty(context.getAwsS3AtlasModelVersion()) &&
StringUtils.equalsIgnoreCase(context.getAwsS3AtlasModelVersion(), AWS_S3_ATLAS_MODEL_VERSION_V2);
}
private static boolean isS3Path(String strPath) {
return strPath != null && (strPath.startsWith(S3_SCHEME) || strPath.startsWith(S3A_SCHEME));
}
private static boolean isAbfsPath(String strPath) {
return strPath != null && (strPath.startsWith(ABFS_SCHEME) || strPath.startsWith(ABFSS_SCHEME));
}
private static boolean isOzonePath(String strPath) {
return strPath != null && (strPath.startsWith(OZONE_SCHEME) || strPath.startsWith(OZONE_3_SCHEME));
}
private static AtlasEntity addS3PathEntityV1(Path path, AtlasEntityExtInfo extInfo, PathExtractorContext context) {
String strPath = path.toString();
if (LOG.isDebugEnabled()) {
LOG.debug("==> addS3PathEntityV1(strPath={})", strPath);
}
String metadataNamespace = context.getMetadataNamespace();
String bucketName = path.toUri().getAuthority();
String bucketQualifiedName = (path.toUri().getScheme() + SCHEME_SEPARATOR + path.toUri().getAuthority() + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
String pathQualifiedName = (strPath + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
AtlasEntity bucketEntity = context.getEntity(bucketQualifiedName);
AtlasEntity ret = context.getEntity(pathQualifiedName);
if (ret == null) {
if (bucketEntity == null) {
bucketEntity = new AtlasEntity(AWS_S3_BUCKET);
bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, bucketQualifiedName);
bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
context.putEntity(bucketQualifiedName, bucketEntity);
}
extInfo.addReferredEntity(bucketEntity);
ret = new AtlasEntity(AWS_S3_PSEUDO_DIR);
ret.setRelationshipAttribute(ATTRIBUTE_BUCKET, AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS));
ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
context.putEntity(pathQualifiedName, ret);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== addS3PathEntityV1(strPath={})", strPath);
}
return ret;
}
private static AtlasEntity addS3PathEntityV2(Path path, AtlasEntityExtInfo extInfo, PathExtractorContext context) {
String strPath = path.toString();
if (LOG.isDebugEnabled()) {
LOG.debug("==> addS3PathEntityV2(strPath={})", strPath);
}
String metadataNamespace = context.getMetadataNamespace();
String pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
AtlasEntity ret = context.getEntity(pathQualifiedName);
if (ret == null) {
String bucketName = path.toUri().getAuthority();
String schemeAndBucketName = (path.toUri().getScheme() + SCHEME_SEPARATOR + bucketName).toLowerCase();
String bucketQualifiedName = schemeAndBucketName + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
AtlasEntity bucketEntity = context.getEntity(bucketQualifiedName);
if (bucketEntity == null) {
bucketEntity = new AtlasEntity(AWS_S3_V2_BUCKET);
bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, bucketQualifiedName);
bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
if (LOG.isDebugEnabled()) {
LOG.debug("adding entity: typeName={}, qualifiedName={}", bucketEntity.getTypeName(), bucketEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
context.putEntity(bucketQualifiedName, bucketEntity);
}
extInfo.addReferredEntity(bucketEntity);
AtlasRelatedObjectId parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED);
String parentPath = Path.SEPARATOR;
String dirPath = path.toUri().getPath();
if (StringUtils.isEmpty(dirPath)) {
dirPath = Path.SEPARATOR;
}
for (String subDirName : dirPath.split(Path.SEPARATOR)) {
if (StringUtils.isEmpty(subDirName)) {
continue;
}
String subDirPath = parentPath + subDirName + Path.SEPARATOR;
String subDirQualifiedName = schemeAndBucketName + subDirPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
ret = new AtlasEntity(AWS_S3_V2_PSEUDO_DIR);
ret.setRelationshipAttribute(ATTRIBUTE_CONTAINER, parentObjId);
ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, subDirPath);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, subDirQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, subDirName);
if (LOG.isDebugEnabled()) {
LOG.debug("adding entity: typeName={}, qualifiedName={}", ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
context.putEntity(subDirQualifiedName, ret);
parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret, RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED);
parentPath = subDirPath;
}
if (ret == null) {
ret = bucketEntity;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== addS3PathEntityV2(strPath={})", strPath);
}
return ret;
}
private static AtlasEntity addAbfsPathEntity(Path path, AtlasEntityExtInfo extInfo, PathExtractorContext context) {
String strPath = path.toString();
if (LOG.isDebugEnabled()) {
LOG.debug("==> addAbfsPathEntity(strPath={})", strPath);
}
String metadataNamespace = context.getMetadataNamespace();
String pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
AtlasEntity ret = context.getEntity(pathQualifiedName);
if (ret == null) {
String abfsScheme = path.toUri().getScheme();
String storageAcctName = getAbfsStorageAccountName(path.toUri());
String schemeAndStorageAcctName = (abfsScheme + SCHEME_SEPARATOR + storageAcctName).toLowerCase();
String storageAcctQualifiedName = schemeAndStorageAcctName + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
AtlasEntity storageAcctEntity = context.getEntity(storageAcctQualifiedName);
// create adls-gen2 storage-account entity
if (storageAcctEntity == null) {
storageAcctEntity = new AtlasEntity(ADLS_GEN2_ACCOUNT);
storageAcctEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, storageAcctQualifiedName);
storageAcctEntity.setAttribute(ATTRIBUTE_NAME, storageAcctName);
if (LOG.isDebugEnabled()) {
LOG.debug("adding entity: typeName={}, qualifiedName={}", storageAcctEntity.getTypeName(), storageAcctEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
context.putEntity(storageAcctQualifiedName, storageAcctEntity);
}
extInfo.addReferredEntity(storageAcctEntity);
AtlasRelatedObjectId storageAcctObjId = AtlasTypeUtil.getAtlasRelatedObjectId(storageAcctEntity, RELATIONSHIP_ADLS_GEN2_ACCOUNT_CONTAINERS);
// create adls-gen2 container entity linking to storage account
String containerName = path.toUri().getUserInfo();
String schemeAndContainerName = (abfsScheme + SCHEME_SEPARATOR + containerName + QNAME_SEP_METADATA_NAMESPACE + storageAcctName).toLowerCase();
String containerQualifiedName = schemeAndContainerName + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
AtlasEntity containerEntity = context.getEntity(containerQualifiedName);
if (containerEntity == null) {
containerEntity = new AtlasEntity(ADLS_GEN2_CONTAINER);
containerEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, containerQualifiedName);
containerEntity.setAttribute(ATTRIBUTE_NAME, containerName);
containerEntity.setRelationshipAttribute(ATTRIBUTE_ACCOUNT, storageAcctObjId);
if (LOG.isDebugEnabled()) {
LOG.debug("adding entity: typeName={}, qualifiedName={}", containerEntity.getTypeName(), containerEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
context.putEntity(containerQualifiedName, containerEntity);
}
extInfo.addReferredEntity(containerEntity);
// create adls-gen2 directory entity linking to container
AtlasRelatedObjectId parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(containerEntity, RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN);
String parentPath = Path.SEPARATOR;
String dirPath = path.toUri().getPath();
if (StringUtils.isEmpty(dirPath)) {
dirPath = Path.SEPARATOR;
}
for (String subDirName : dirPath.split(Path.SEPARATOR)) {
if (StringUtils.isEmpty(subDirName)) {
continue;
}
String subDirPath = parentPath + subDirName + Path.SEPARATOR;
String subDirQualifiedName = schemeAndContainerName + subDirPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
ret = new AtlasEntity(ADLS_GEN2_DIRECTORY);
ret.setRelationshipAttribute(ATTRIBUTE_PARENT, parentObjId);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, subDirQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, subDirName);
if (LOG.isDebugEnabled()) {
LOG.debug("adding entity: typeName={}, qualifiedName={}", ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
context.putEntity(subDirQualifiedName, ret);
parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret, RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN);
parentPath = subDirPath;
}
if (ret == null) {
ret = storageAcctEntity;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== addAbfsPathEntity(strPath={})", strPath);
}
return ret;
}
private static AtlasEntity addOzonePathEntity(Path path, AtlasEntityExtInfo extInfo, PathExtractorContext context) {
String strPath = path.toString();
if (LOG.isDebugEnabled()) {
LOG.debug("==> addOzonePathEntity(strPath={})", strPath);
}
String metadataNamespace = context.getMetadataNamespace();
String ozoneScheme = path.toUri().getScheme();
String pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
AtlasEntity ret = context.getEntity(pathQualifiedName);
if (ret == null) {
//create ozone volume entity
String volumeName = getOzoneVolumeName(path);
String volumeQualifiedName = ozoneScheme + SCHEME_SEPARATOR + volumeName + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
AtlasEntity volumeEntity = context.getEntity(volumeQualifiedName);
if (volumeEntity == null) {
volumeEntity = new AtlasEntity(OZONE_VOLUME);
volumeEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, volumeQualifiedName);
volumeEntity.setAttribute(ATTRIBUTE_NAME, volumeName);
if (LOG.isDebugEnabled()) {
LOG.debug("adding entity: typeName={}, qualifiedName={}", volumeEntity.getTypeName(), volumeEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
context.putEntity(volumeQualifiedName, volumeEntity);
}
extInfo.addReferredEntity(volumeEntity);
//create ozone bucket entity
String bucketName = getOzoneBucketName(path);
String bucketQualifiedName = ozoneScheme + SCHEME_SEPARATOR + volumeName + QNAME_SEP_ENTITY_NAME + bucketName + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
AtlasEntity bucketEntity = context.getEntity(bucketQualifiedName);
if (bucketEntity == null) {
bucketEntity = new AtlasEntity(OZONE_BUCKET);
bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, bucketQualifiedName);
bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
bucketEntity.setRelationshipAttribute( ATTRIBUTE_VOLUME, AtlasTypeUtil.getAtlasRelatedObjectId(volumeEntity, RELATIONSHIP_OZONE_VOLUME_BUCKET));
if (LOG.isDebugEnabled()) {
LOG.debug("adding entity: typeName={}, qualifiedName={}", bucketEntity.getTypeName(), bucketEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
context.putEntity(bucketQualifiedName, bucketEntity);
}
extInfo.addReferredEntity(bucketEntity);
ret = new AtlasEntity(OZONE_KEY);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, path.toUri().getPath());
ret.setRelationshipAttribute( ATTRIBUTE_BUCKET, AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_OZONE_BUCKET_KEY));
context.putEntity(pathQualifiedName, ret);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== addOzonePathEntity(strPath={})", strPath);
}
return ret;
}
private static AtlasEntity addHDFSPathEntity(Path path, PathExtractorContext context) {
String strPath = path.toString();
if (context.isConvertPathToLowerCase()) {
strPath = strPath.toLowerCase();
}
if (LOG.isDebugEnabled()) {
LOG.debug("==> addHDFSPathEntity(strPath={})", strPath);
}
String nameServiceID = HdfsNameServiceResolver.getNameServiceIDForPath(strPath);
String attrPath = StringUtils.isEmpty(nameServiceID) ? strPath : HdfsNameServiceResolver.getPathWithNameServiceID(strPath);
String pathQualifiedName = getQualifiedName(attrPath, context.getMetadataNamespace());
AtlasEntity ret = context.getEntity(pathQualifiedName);
if (ret == null) {
ret = new AtlasEntity(HDFS_TYPE_PATH);
if (StringUtils.isNotEmpty(nameServiceID)) {
ret.setAttribute(ATTRIBUTE_NAMESERVICE_ID, nameServiceID);
}
String name = Path.getPathWithoutSchemeAndAuthority(path).toString();
if (context.isConvertPathToLowerCase()) {
name = name.toLowerCase();
}
ret.setAttribute(ATTRIBUTE_PATH, attrPath);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, name);
ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, context.getMetadataNamespace());
context.putEntity(pathQualifiedName, ret);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== addHDFSPathEntity(strPath={})", strPath);
}
return ret;
}
private static String getAbfsStorageAccountName(URI uri) {
String ret = null;
String host = uri.getHost();
// host: "<account_name>.dfs.core.windows.net"
if (StringUtils.isNotEmpty(host) && host.contains(ADLS_GEN2_ACCOUNT_HOST_SUFFIX)) {
ret = host.substring(0, host.indexOf(ADLS_GEN2_ACCOUNT_HOST_SUFFIX));
}
return ret;
}
private static String getOzoneVolumeName(Path path) {
String pathAuthority = path.toUri().getAuthority();
// pathAuthority: "<bucket_name>.<volume_name>.<ozone.service.id>"
return pathAuthority.split("\\.")[1];
}
private static String getOzoneBucketName(Path path) {
String pathAuthority = path.toUri().getAuthority();
return pathAuthority.split("\\.")[0];
}
private static String getQualifiedName(String path, String metadataNamespace) {
if (path.startsWith(HdfsNameServiceResolver.HDFS_SCHEME)) {
return path + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
}
return path.toLowerCase();
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.utils;
import org.apache.atlas.model.instance.AtlasEntity;
import java.util.HashMap;
import java.util.Map;
public class PathExtractorContext {
private final String metadataNamespace;
private final Map<String, AtlasEntity> knownEntities;
private final boolean isConvertPathToLowerCase;
private final String awsS3AtlasModelVersion;
public PathExtractorContext(String metadataNamespace) {
this(metadataNamespace, new HashMap<>(), false, null) ;
}
public PathExtractorContext(String metadataNamespace, String awsS3AtlasModelVersion) {
this(metadataNamespace, new HashMap<>(), false, awsS3AtlasModelVersion) ;
}
public PathExtractorContext(String metadataNamespace, boolean isConvertPathToLowerCase, String awsS3AtlasModelVersion) {
this(metadataNamespace, new HashMap<>(), isConvertPathToLowerCase, awsS3AtlasModelVersion) ;
}
public PathExtractorContext(String metadataNamespace, Map<String, AtlasEntity> knownEntities, boolean isConvertPathToLowerCase, String awsS3AtlasModelVersion) {
this.metadataNamespace = metadataNamespace;
this.knownEntities = knownEntities;
this.isConvertPathToLowerCase = isConvertPathToLowerCase;
this.awsS3AtlasModelVersion = awsS3AtlasModelVersion;
}
public String getMetadataNamespace() {
return metadataNamespace;
}
public Map<String, AtlasEntity> getKnownEntities() {
return knownEntities;
}
public void putEntity(String qualifiedName, AtlasEntity entity) {
knownEntities.put(qualifiedName, entity);
}
public AtlasEntity getEntity(String qualifiedName) {
return knownEntities.get(qualifiedName);
}
public boolean isConvertPathToLowerCase() {
return isConvertPathToLowerCase;
}
public String getAwsS3AtlasModelVersion() {
return awsS3AtlasModelVersion;
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.utils;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
import org.apache.hadoop.fs.Path;
import java.util.HashMap;
import java.util.Map;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
public class AtlasPathExtractorUtilTest {
private static final Logger LOG = LoggerFactory.getLogger(AtlasPathExtractorUtilTest.class);
// Common
private static final String METADATA_NAMESPACE = "metaspace";
private static final String QNAME_METADATA_NAMESPACE = '@' + METADATA_NAMESPACE;
private static final String SCHEME_SEPARATOR = "://";
private static final String ATTRIBUTE_NAME = "name";
private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
// HDFS
private static final String HDFS_PATH_TYPE = "hdfs_path";
private static final String ATTRIBUTE_PATH = "path";
private static final String ATTRIBUTE_CLUSTER_NAME = "clusterName";
// Ozone
private static final String OZONE_VOLUME = "ozone_volume";
private static final String OZONE_BUCKET = "ozone_bucket";
private static final String OZONE_KEY = "ozone_key";
private static final String OZONE_SCHEME = "ofs" + SCHEME_SEPARATOR;
private static final String OZONE_3_SCHEME = "o3fs" + SCHEME_SEPARATOR;
private static final String OZONE_PATH = OZONE_SCHEME + "bucket1.volume1.ozone1/files/file.txt";
private static final String OZONE_3_PATH = OZONE_3_SCHEME + "bucket1.volume1.ozone1/files/file.txt";
// HDFS
private static final String HDFS_SCHEME = "hdfs" + SCHEME_SEPARATOR;
private static final String HDFS_PATH = HDFS_SCHEME + "host_name:8020/warehouse/tablespace/external/hive/taBlE_306";
@Test
public void testGetPathEntityOzone3Path() {
PathExtractorContext extractorContext = new PathExtractorContext(METADATA_NAMESPACE);
Path path = new Path(OZONE_3_PATH);
AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
AtlasEntity entity = entityWithExtInfo.getEntity();
assertNotNull(entity);
assertEquals(entity.getTypeName(), OZONE_KEY);
verifyOzoneKeyEntity(OZONE_3_PATH, entity);
assertEquals(entityWithExtInfo.getReferredEntities().size(), 2);
verifyOzoneEntities(OZONE_3_SCHEME, OZONE_3_PATH, extractorContext.getKnownEntities());
assertEquals(extractorContext.getKnownEntities().size(), 3);
verifyOzoneEntities(OZONE_3_SCHEME, OZONE_3_PATH, extractorContext.getKnownEntities());
}
@Test
public void testGetPathEntityOzonePath() {
PathExtractorContext extractorContext = new PathExtractorContext(METADATA_NAMESPACE);
Path path = new Path(OZONE_PATH);
AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
AtlasEntity entity = entityWithExtInfo.getEntity();
assertNotNull(entity);
assertEquals(entity.getTypeName(), OZONE_KEY);
verifyOzoneKeyEntity(OZONE_PATH, entity);
assertEquals(entityWithExtInfo.getReferredEntities().size(), 2);
verifyOzoneEntities(OZONE_SCHEME, OZONE_PATH, extractorContext.getKnownEntities());
assertEquals(extractorContext.getKnownEntities().size(), 3);
verifyOzoneEntities(OZONE_SCHEME, OZONE_PATH, extractorContext.getKnownEntities());
}
@Test
public void testGetPathEntityHdfsPath() {
Map<String, AtlasEntity> knownEntities = new HashMap<>();
AtlasEntityWithExtInfo extInfo = new AtlasEntityWithExtInfo();
PathExtractorContext extractorContext = new PathExtractorContext(METADATA_NAMESPACE);
Path path = new Path(HDFS_PATH);
AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
AtlasEntity entity = entityWithExtInfo.getEntity();
assertNotNull(entity);
assertEquals(entity.getTypeName(), HDFS_PATH_TYPE);
verifyHDFSEntity(entity, false);
assertNull(extInfo.getReferredEntities());
assertEquals(extractorContext.getKnownEntities().size(), 1);
extractorContext.getKnownEntities().values().forEach(x -> verifyHDFSEntity(x, false));
}
@Test
public void testGetPathEntityHdfsPathLowerCase() {
PathExtractorContext extractorContext = new PathExtractorContext(METADATA_NAMESPACE, true, null);
Path path = new Path(HDFS_PATH);
AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
AtlasEntity entity = entityWithExtInfo.getEntity();
assertNotNull(entity);
assertEquals(entity.getTypeName(), HDFS_PATH_TYPE);
verifyHDFSEntity(entity, true);
assertNull(entityWithExtInfo.getReferredEntities());
assertEquals(extractorContext.getKnownEntities().size(), 1);
extractorContext.getKnownEntities().values().forEach(x -> verifyHDFSEntity(x, true));
}
private void verifyOzoneEntities(String scheme, String path, Map<String, AtlasEntity> knownEntities) {
for (AtlasEntity knownEntity : knownEntities.values()) {
switch (knownEntity.getTypeName()){
case OZONE_KEY:
verifyOzoneKeyEntity(path, knownEntity);
break;
case OZONE_VOLUME:
assertEquals(knownEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), scheme + "volume1" + QNAME_METADATA_NAMESPACE);
assertEquals(knownEntity.getAttribute(ATTRIBUTE_NAME), "volume1");
break;
case OZONE_BUCKET:
assertEquals(knownEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), scheme + "volume1.bucket1" + QNAME_METADATA_NAMESPACE);
assertEquals(knownEntity.getAttribute(ATTRIBUTE_NAME), "bucket1");
break;
}
}
}
private void verifyOzoneKeyEntity(String path, AtlasEntity entity) {
assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), path + QNAME_METADATA_NAMESPACE);
assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "/files/file.txt");
}
private void verifyHDFSEntity(AtlasEntity entity, boolean toLowerCase) {
if (toLowerCase) {
assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), HDFS_PATH.toLowerCase() + QNAME_METADATA_NAMESPACE);
assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "/warehouse/tablespace/external/hive/table_306");
assertEquals(entity.getAttribute(ATTRIBUTE_PATH), HDFS_PATH.toLowerCase());
assertEquals(entity.getAttribute(ATTRIBUTE_CLUSTER_NAME), METADATA_NAMESPACE);
} else {
assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), HDFS_PATH + QNAME_METADATA_NAMESPACE);
assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "/warehouse/tablespace/external/hive/taBlE_306");
assertEquals(entity.getAttribute(ATTRIBUTE_PATH), HDFS_PATH);
assertEquals(entity.getAttribute(ATTRIBUTE_CLUSTER_NAME), METADATA_NAMESPACE);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment