Commit 854208c1 by Madhan Neethiraj

ATLAS-2961: updated Hive hook to recognize configuration 'hdfs_path.convert_to_lowercase'

(cherry picked from commit a37aeda48aa4239cd110e665c837c730edb4b800) (cherry picked from commit fea8acbc6790fa23fe5b3121b38a3c218c694c09)
parent 85280ddf
......@@ -205,7 +205,7 @@ public class HiveMetaStoreBridge {
* @param hiveConf {@link HiveConf} for Hive component in the cluster
*/
public HiveMetaStoreBridge(Configuration atlasProperties, HiveConf hiveConf, AtlasClientV2 atlasClientV2) throws Exception {
this(atlasProperties.getString(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME), Hive.get(hiveConf), atlasClientV2, atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, true));
this(atlasProperties.getString(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME), Hive.get(hiveConf), atlasClientV2, atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false));
}
/**
......@@ -700,7 +700,7 @@ public class HiveMetaStoreBridge {
String nameServiceID = HdfsNameServiceResolver.getNameServiceIDForPath(pathUri);
Path path = new Path(pathUri);
ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString());
ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, clusterName);
if (StringUtils.isNotEmpty(nameServiceID)) {
......
......@@ -85,6 +85,10 @@ public class AtlasHiveHookContext {
return hook.getClusterName();
}
public boolean isConvertHdfsPathToLowerCase() {
return hook.isConvertHdfsPathToLowerCase();
}
public boolean getSkipHiveColumnLineageHive20633() {
return hook.getSkipHiveColumnLineageHive20633();
}
......
......@@ -47,6 +47,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
public static final String CONF_PREFIX = "atlas.hook.hive.";
public static final String CONF_CLUSTER_NAME = "atlas.cluster.name";
public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase";
public static final String HOOK_NAME_CACHE_ENABLED = CONF_PREFIX + "name.cache.enabled";
public static final String HOOK_NAME_CACHE_DATABASE_COUNT = CONF_PREFIX + "name.cache.database.count";
public static final String HOOK_NAME_CACHE_TABLE_COUNT = CONF_PREFIX + "name.cache.table.count";
......@@ -59,6 +60,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private static final Map<String, HiveOperation> OPERATION_MAP = new HashMap<>();
private static final String clusterName;
private static final boolean convertHdfsPathToLowerCase;
private static final boolean nameCacheEnabled;
private static final int nameCacheDatabaseMaxCount;
private static final int nameCacheTableMaxCount;
......@@ -75,6 +77,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
clusterName = atlasProperties.getString(CONF_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
nameCacheEnabled = atlasProperties.getBoolean(HOOK_NAME_CACHE_ENABLED, true);
nameCacheDatabaseMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_DATABASE_COUNT, 10000);
nameCacheTableMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_TABLE_COUNT, 10000);
......@@ -189,6 +192,10 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return clusterName;
}
public boolean isConvertHdfsPathToLowerCase() {
return convertHdfsPathToLowerCase;
}
public boolean getSkipHiveColumnLineageHive20633() {
return skipHiveColumnLineageHive20633;
}
......
......@@ -145,6 +145,7 @@ public abstract class BaseHiveEvent {
public static final String HBASE_NAMESPACE_TABLE_DELIMITER = ":";
public static final String HBASE_PARAM_TABLE_NAME = "hbase.table.name";
public static final long MILLIS_CONVERT_FACTOR = 1000;
public static final String HDFS_PATH_PREFIX = "hdfs://";
public static final Map<Integer, String> OWNER_TYPE_TO_ENUM_VALUE = new HashMap<>();
......@@ -505,7 +506,11 @@ public abstract class BaseHiveEvent {
protected AtlasEntity getPathEntity(Path path, AtlasEntityExtInfo extInfo) {
AtlasEntity ret;
String strPath = path.toString().toLowerCase();
String strPath = path.toString();
if (strPath.startsWith(HDFS_PATH_PREFIX) && context.isConvertHdfsPathToLowerCase()) {
strPath = strPath.toLowerCase();
}
if (isS3Path(strPath)) {
String bucketName = path.toUri().getAuthority();
......@@ -550,9 +555,15 @@ public abstract class BaseHiveEvent {
ret.setAttribute(ATTRIBUTE_NAMESERVICE_ID, nameServiceID);
}
String name = Path.getPathWithoutSchemeAndAuthority(path).toString();
if (strPath.startsWith(HDFS_PATH_PREFIX) && context.isConvertHdfsPathToLowerCase()) {
name = name.toLowerCase();
}
ret.setAttribute(ATTRIBUTE_PATH, attrPath);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
ret.setAttribute(ATTRIBUTE_NAME, name);
ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName());
context.putEntity(pathQualifiedName, ret);
......@@ -688,7 +699,12 @@ public abstract class BaseHiveEvent {
}
protected String getQualifiedName(URI location) {
String strPath = new Path(location).toString().toLowerCase();
String strPath = new Path(location).toString();
if (strPath.startsWith(HDFS_PATH_PREFIX) && context.isConvertHdfsPathToLowerCase()) {
strPath = strPath.toLowerCase();
}
String nameServiceID = HdfsNameServiceResolver.getNameServiceIDForPath(strPath);
String attrPath = StringUtils.isEmpty(nameServiceID) ? strPath : HdfsNameServiceResolver.getPathWithNameServiceID(strPath);
......@@ -697,7 +713,7 @@ public abstract class BaseHiveEvent {
protected String getQualifiedName(String path) {
if (path.startsWith(HdfsNameServiceResolver.HDFS_SCHEME)) {
return (path + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
return path + QNAME_SEP_CLUSTER_NAME + getClusterName();
}
return path.toLowerCase();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment