Commit 30a275d4 by Madhan Neethiraj

ATLAS-3659: updated Hive hook to create aws_s3_v2 entities

parent 7de35793
......@@ -168,6 +168,10 @@ public class AtlasHiveHookContext {
return hook.isConvertHdfsPathToLowerCase();
}
public boolean isAwsS3AtlasModelVersionV2() {
return hook.isAwsS3AtlasModelVersionV2();
}
public boolean getSkipHiveColumnLineageHive20633() {
return hook.getSkipHiveColumnLineageHive20633();
}
......
......@@ -60,6 +60,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
public static final String HOOK_NAME_CACHE_DATABASE_COUNT = CONF_PREFIX + "name.cache.database.count";
public static final String HOOK_NAME_CACHE_TABLE_COUNT = CONF_PREFIX + "name.cache.table.count";
public static final String HOOK_NAME_CACHE_REBUID_INTERVAL_SEC = CONF_PREFIX + "name.cache.rebuild.interval.seconds";
public static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION = CONF_PREFIX + "aws_s3.atlas.model.version";
public static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2 = "v2";
public static final String HOOK_HIVE_PROCESS_POPULATE_DEPRECATED_ATTRIBUTES = CONF_PREFIX + "hive_process.populate.deprecated.attributes";
public static final String HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633 = CONF_PREFIX + "skip.hive_column_lineage.hive-20633";
public static final String HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = CONF_PREFIX + "skip.hive_column_lineage.hive-20633.inputs.threshold";
......@@ -75,6 +77,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private static final int nameCacheDatabaseMaxCount;
private static final int nameCacheTableMaxCount;
private static final int nameCacheRebuildIntervalSeconds;
private static final boolean isAwsS3AtlasModelVersionV2;
private static final boolean skipHiveColumnLineageHive20633;
private static final int skipHiveColumnLineageHive20633InputsThreshold;
......@@ -98,6 +101,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
nameCacheDatabaseMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_DATABASE_COUNT, 10000);
nameCacheTableMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_TABLE_COUNT, 10000);
nameCacheRebuildIntervalSeconds = atlasProperties.getInt(HOOK_NAME_CACHE_REBUID_INTERVAL_SEC, 60 * 60); // 60 minutes default
isAwsS3AtlasModelVersionV2 = StringUtils.equalsIgnoreCase(atlasProperties.getString(HOOK_AWS_S3_ATLAS_MODEL_VERSION, HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2), HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2);
skipHiveColumnLineageHive20633 = atlasProperties.getBoolean(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
skipHiveColumnLineageHive20633InputsThreshold = atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15
hiveProcessPopulateDeprecatedAttributes = atlasProperties.getBoolean(HOOK_HIVE_PROCESS_POPULATE_DEPRECATED_ATTRIBUTES, false);
......@@ -253,6 +257,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return convertHdfsPathToLowerCase;
}
public boolean isAwsS3AtlasModelVersionV2() { return isAwsS3AtlasModelVersionV2; }
public boolean getSkipHiveColumnLineageHive20633() {
return skipHiveColumnLineageHive20633;
}
......
......@@ -79,6 +79,8 @@ public abstract class BaseHiveEvent {
public static final String AWS_S3_BUCKET = "aws_s3_bucket";
public static final String AWS_S3_PSEUDO_DIR = "aws_s3_pseudo_dir";
public static final String AWS_S3_OBJECT = "aws_s3_object";
public static final String AWS_S3_V2_BUCKET = "aws_s3_v2_bucket";
public static final String AWS_S3_V2_PSEUDO_DIR = "aws_s3_v2_directory";
public static final String SCHEME_SEPARATOR = "://";
public static final String S3_SCHEME = "s3" + SCHEME_SEPARATOR;
......@@ -139,6 +141,7 @@ public abstract class BaseHiveEvent {
public static final String ATTRIBUTE_NAMESPACE = "namespace";
public static final String ATTRIBUTE_OBJECT_PREFIX = "objectPrefix";
public static final String ATTRIBUTE_BUCKET = "bucket";
public static final String ATTRIBUTE_CONTAINER = "container";
public static final String ATTRIBUTE_HOSTNAME = "hostName";
public static final String ATTRIBUTE_EXEC_TIME = "execTime";
public static final String ATTRIBUTE_DDL_QUERIES = "ddlQueries";
......@@ -160,6 +163,7 @@ public abstract class BaseHiveEvent {
public static final String RELATIONSHIP_HIVE_TABLE_COLUMNS = "hive_table_columns";
public static final String RELATIONSHIP_HIVE_TABLE_STORAGE_DESC = "hive_table_storagedesc";
public static final String RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS = "aws_s3_bucket_aws_s3_pseudo_dirs";
public static final String RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED = "aws_s3_v2_container_contained";
public static final String RELATIONSHIP_HIVE_PROCESS_PROCESS_EXE = "hive_process_process_executions";
public static final String RELATIONSHIP_HIVE_DB_DDL_QUERIES = "hive_db_ddl_queries";
public static final String RELATIONSHIP_HIVE_TABLE_DDL_QUERIES = "hive_table_ddl_queries";
......@@ -579,33 +583,10 @@ public abstract class BaseHiveEvent {
}
if (isS3Path(strPath)) {
String bucketName = path.toUri().getAuthority();
String bucketQualifiedName = (path.toUri().getScheme() + SCHEME_SEPARATOR + path.toUri().getAuthority() + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
String pathQualifiedName = (strPath + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
AtlasEntity bucketEntity = context.getEntity(bucketQualifiedName);
ret = context.getEntity(pathQualifiedName);
if (ret == null) {
if (bucketEntity == null) {
bucketEntity = new AtlasEntity(AWS_S3_BUCKET);
bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, bucketQualifiedName);
bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
context.putEntity(bucketQualifiedName, bucketEntity);
}
extInfo.addReferredEntity(bucketEntity);
ret = new AtlasEntity(AWS_S3_PSEUDO_DIR);
ret.setRelationshipAttribute(ATTRIBUTE_BUCKET, AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS));
ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
context.putEntity(pathQualifiedName, ret);
if (context.isAwsS3AtlasModelVersionV2()) {
ret = addS3PathEntityV2(path, strPath, extInfo);
} else {
ret = addS3PathEntityV1(path, strPath, extInfo);
}
} else {
String nameServiceID = HdfsNameServiceResolver.getNameServiceIDForPath(strPath);
......@@ -1150,6 +1131,113 @@ public abstract class BaseHiveEvent {
return strPath != null && (strPath.startsWith(S3_SCHEME) || strPath.startsWith(S3A_SCHEME));
}
private AtlasEntity addS3PathEntityV1(Path path, String strPath, AtlasEntityExtInfo extInfo) {
String metadataNamespace = getMetadataNamespace();
String bucketName = path.toUri().getAuthority();
String bucketQualifiedName = (path.toUri().getScheme() + SCHEME_SEPARATOR + path.toUri().getAuthority() + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
String pathQualifiedName = (strPath + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
AtlasEntity bucketEntity = context.getEntity(bucketQualifiedName);
AtlasEntity ret = context.getEntity(pathQualifiedName);
if (ret == null) {
if (bucketEntity == null) {
bucketEntity = new AtlasEntity(AWS_S3_BUCKET);
bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, bucketQualifiedName);
bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
context.putEntity(bucketQualifiedName, bucketEntity);
}
extInfo.addReferredEntity(bucketEntity);
ret = new AtlasEntity(AWS_S3_PSEUDO_DIR);
ret.setRelationshipAttribute(ATTRIBUTE_BUCKET, AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS));
ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
context.putEntity(pathQualifiedName, ret);
}
return ret;
}
private AtlasEntity addS3PathEntityV2(Path path, String strPath, AtlasEntityExtInfo extInfo) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> addS3PathEntityV2(strPath={})", strPath);
}
String metadataNamespace = getMetadataNamespace();
String pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
AtlasEntity ret = context.getEntity(pathQualifiedName);
if (ret == null) {
String bucketName = path.toUri().getAuthority();
String schemeAndBucketName = (path.toUri().getScheme() + SCHEME_SEPARATOR + bucketName).toLowerCase();
String bucketQualifiedName = schemeAndBucketName + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
AtlasEntity bucketEntity = context.getEntity(bucketQualifiedName);
if (bucketEntity == null) {
bucketEntity = new AtlasEntity(AWS_S3_V2_BUCKET);
bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, bucketQualifiedName);
bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
if (LOG.isDebugEnabled()) {
LOG.debug("adding entity: typeName={}, qualifiedName={}", bucketEntity.getTypeName(), bucketEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
context.putEntity(bucketQualifiedName, bucketEntity);
}
extInfo.addReferredEntity(bucketEntity);
AtlasRelatedObjectId parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED);
String parentPath = Path.SEPARATOR;
String dirPath = path.toUri().getPath();
if (StringUtils.isEmpty(dirPath)) {
dirPath = Path.SEPARATOR;
}
for (String subDirName : dirPath.split(Path.SEPARATOR)) {
if (StringUtils.isEmpty(subDirName)) {
continue;
}
String subDirPath = parentPath + subDirName + Path.SEPARATOR;
String subDirQualifiedName = schemeAndBucketName + subDirPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
ret = new AtlasEntity(AWS_S3_V2_PSEUDO_DIR);
ret.setRelationshipAttribute(ATTRIBUTE_CONTAINER, parentObjId);
ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, subDirPath);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, subDirQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, subDirName);
if (LOG.isDebugEnabled()) {
LOG.debug("adding entity: typeName={}, qualifiedName={}", ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
context.putEntity(subDirQualifiedName, ret);
parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret, RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED);
parentPath = subDirPath;
}
if (ret == null) {
ret = bucketEntity;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== addS3PathEntityV2(strPath={})", strPath);
}
return ret;
}
static final class EntityComparator implements Comparator<Entity> {
@Override
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment