Commit 88ac0fa6 by Madhan Neethiraj

ATLAS-2760: Hive hook updates to handle references to s3 paths

parent f787bcc2
......@@ -78,6 +78,13 @@ public abstract class BaseHiveEvent {
public static final String HDFS_TYPE_PATH = "hdfs_path";
public static final String HBASE_TYPE_TABLE = "hbase_table";
public static final String HBASE_TYPE_NAMESPACE = "hbase_namespace";
public static final String AWS_S3_BUCKET = "aws_s3_bucket";
public static final String AWS_S3_PSEUDO_DIR = "aws_s3_pseudo_dir";
public static final String AWS_S3_OBJECT = "aws_s3_object";
public static final String SCHEME_SEPARATOR = "://";
public static final String S3_SCHEME = "s3" + SCHEME_SEPARATOR;
public static final String S3A_SCHEME = "s3a" + SCHEME_SEPARATOR;
public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
public static final String ATTRIBUTE_NAME = "name";
......@@ -130,6 +137,8 @@ public abstract class BaseHiveEvent {
public static final String ATTRIBUTE_URI = "uri";
public static final String ATTRIBUTE_STORAGE_HANDLER = "storage_handler";
public static final String ATTRIBUTE_NAMESPACE = "namespace";
public static final String ATTRIBUTE_OBJECT_PREFIX = "objectPrefix";
public static final String ATTRIBUTE_BUCKET = "bucket";
public static final String HBASE_STORAGE_HANDLER_CLASS = "org.apache.hadoop.hive.hbase.HBaseStorageHandler";
public static final String HBASE_DEFAULT_NAMESPACE = "default";
......@@ -245,7 +254,7 @@ public abstract class BaseHiveEvent {
URI location = entity.getLocation();
if (location != null) {
ret = getHDFSPathEntity(new Path(entity.getLocation()));
ret = getPathEntity(new Path(entity.getLocation()), entityExtInfo);
}
}
break;
......@@ -494,26 +503,60 @@ public abstract class BaseHiveEvent {
return ret;
}
protected AtlasEntity getHDFSPathEntity(Path path) {
String strPath = path.toString().toLowerCase();
String nameServiceID = HdfsNameServiceResolver.getNameServiceIDForPath(strPath);
String attrPath = StringUtils.isEmpty(nameServiceID) ? strPath : HdfsNameServiceResolver.getPathWithNameServiceID(strPath);
String pathQualifiedName = getQualifiedName(attrPath);
AtlasEntity ret = context.getEntity(pathQualifiedName);
protected AtlasEntity getPathEntity(Path path, AtlasEntityExtInfo extInfo) {
AtlasEntity ret;
String strPath = path.toString().toLowerCase();
if (ret == null) {
ret = new AtlasEntity(HDFS_TYPE_PATH);
if (isS3Path(strPath)) {
String bucketName = path.toUri().getAuthority();
String bucketQualifiedName = (path.toUri().getScheme() + SCHEME_SEPARATOR + path.toUri().getAuthority() + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
String pathQualifiedName = (strPath + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
AtlasEntity bucketEntity = context.getEntity(bucketQualifiedName);
ret = context.getEntity(pathQualifiedName);
if (ret == null) {
if (bucketEntity == null) {
bucketEntity = new AtlasEntity(AWS_S3_BUCKET);
bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, bucketQualifiedName);
bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
context.putEntity(bucketQualifiedName, bucketEntity);
}
extInfo.addReferredEntity(bucketEntity);
ret = new AtlasEntity(AWS_S3_PSEUDO_DIR);
ret.setAttribute(ATTRIBUTE_BUCKET, getObjectId(bucketEntity));
ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
if (StringUtils.isNotEmpty(nameServiceID)) {
ret.setAttribute(ATTRIBUTE_NAMESERVICE_ID, nameServiceID);
context.putEntity(pathQualifiedName, ret);
}
} else {
String nameServiceID = HdfsNameServiceResolver.getNameServiceIDForPath(strPath);
String attrPath = StringUtils.isEmpty(nameServiceID) ? strPath : HdfsNameServiceResolver.getPathWithNameServiceID(strPath);
String pathQualifiedName = getQualifiedName(attrPath);
ret.setAttribute(ATTRIBUTE_PATH, attrPath);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName());
ret = context.getEntity(pathQualifiedName);
if (ret == null) {
ret = new AtlasEntity(HDFS_TYPE_PATH);
if (StringUtils.isNotEmpty(nameServiceID)) {
ret.setAttribute(ATTRIBUTE_NAMESERVICE_ID, nameServiceID);
}
context.putEntity(pathQualifiedName, ret);
ret.setAttribute(ATTRIBUTE_PATH, attrPath);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName());
context.putEntity(pathQualifiedName, ret);
}
}
return ret;
......@@ -874,6 +917,10 @@ public abstract class BaseHiveEvent {
return false;
}
private boolean isS3Path(String strPath) {
return strPath != null && (strPath.startsWith(S3_SCHEME) || strPath.startsWith(S3A_SCHEME));
}
static final class EntityComparator implements Comparator<Entity> {
@Override
......
......@@ -98,7 +98,7 @@ public class CreateTable extends BaseHiveEvent {
}
} else {
if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
AtlasEntity hdfsPathEntity = getHDFSPathEntity(table.getDataLocation());
AtlasEntity hdfsPathEntity = getPathEntity(table.getDataLocation(), ret);
AtlasEntity processEntity = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), Collections.singletonList(tblEntity));
ret.addEntity(processEntity);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment