Commit bcd5bb60 by apoorvnaik

Updates to HDFSNameServiceResolver

Change-Id: I410fcc6b5bb7ba121a4206e0f150546fe12789a1
parent c950c9bd
......@@ -95,7 +95,6 @@ public class HiveMetaStoreBridge {
private static final int EXIT_CODE_FAILED = 1;
private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
private final HdfsNameServiceResolver hdfsNameServiceResolver = HdfsNameServiceResolver.getInstance();
private final String clusterName;
private final Hive hiveClient;
private final AtlasClientV2 atlasClientV2;
......@@ -531,7 +530,7 @@ public class HiveMetaStoreBridge {
dbEntity.setAttribute(ATTRIBUTE_OWNER, hiveDB.getOwnerName());
dbEntity.setAttribute(ATTRIBUTE_CLUSTER_NAME, clusterName);
dbEntity.setAttribute(ATTRIBUTE_LOCATION, hdfsNameServiceResolver.getPathWithNameServiceID(hiveDB.getLocationUri()));
dbEntity.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(hiveDB.getLocationUri()));
dbEntity.setAttribute(ATTRIBUTE_PARAMETERS, hiveDB.getParameters());
if (hiveDB.getOwnerType() != null) {
......@@ -616,7 +615,7 @@ public class HiveMetaStoreBridge {
ret.setAttribute(ATTRIBUTE_TABLE, tableId);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, sdQualifiedName);
ret.setAttribute(ATTRIBUTE_PARAMETERS, storageDesc.getParameters());
ret.setAttribute(ATTRIBUTE_LOCATION, hdfsNameServiceResolver.getPathWithNameServiceID(storageDesc.getLocation()));
ret.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(storageDesc.getLocation()));
ret.setAttribute(ATTRIBUTE_INPUT_FORMAT, storageDesc.getInputFormat());
ret.setAttribute(ATTRIBUTE_OUTPUT_FORMAT, storageDesc.getOutputFormat());
ret.setAttribute(ATTRIBUTE_COMPRESSED, storageDesc.isCompressed());
......@@ -684,7 +683,7 @@ public class HiveMetaStoreBridge {
private AtlasEntity toHdfsPathEntity(String pathUri) {
AtlasEntity ret = new AtlasEntity(HDFS_PATH);
String nameServiceID = hdfsNameServiceResolver.getNameServiceIDForPath(pathUri);
String nameServiceID = HdfsNameServiceResolver.getNameServiceIDForPath(pathUri);
Path path = new Path(pathUri);
ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
......@@ -692,7 +691,7 @@ public class HiveMetaStoreBridge {
if (StringUtils.isNotEmpty(nameServiceID)) {
// Name service resolution is successful, now get updated HDFS path where the host port info is replaced by resolved name service
String updatedHdfsPath = hdfsNameServiceResolver.getPathWithNameServiceID(pathUri);
String updatedHdfsPath = HdfsNameServiceResolver.getPathWithNameServiceID(pathUri);
ret.setAttribute(ATTRIBUTE_PATH, updatedHdfsPath);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHdfsPathQualifiedName(updatedHdfsPath));
......
......@@ -279,7 +279,7 @@ public abstract class BaseHiveEvent {
ret.setAttribute(ATTRIBUTE_OWNER, db.getOwnerName());
ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName());
ret.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getInstance().getPathWithNameServiceID(db.getLocationUri()));
ret.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(db.getLocationUri()));
ret.setAttribute(ATTRIBUTE_PARAMETERS, db.getParameters());
if (db.getOwnerType() != null) {
......@@ -415,7 +415,7 @@ public abstract class BaseHiveEvent {
ret.setAttribute(ATTRIBUTE_TABLE, tableId);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, sdQualifiedName);
ret.setAttribute(ATTRIBUTE_PARAMETERS, sd.getParameters());
ret.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getInstance().getPathWithNameServiceID(sd.getLocation()));
ret.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(sd.getLocation()));
ret.setAttribute(ATTRIBUTE_INPUT_FORMAT, sd.getInputFormat());
ret.setAttribute(ATTRIBUTE_OUTPUT_FORMAT, sd.getOutputFormat());
ret.setAttribute(ATTRIBUTE_COMPRESSED, sd.isCompressed());
......@@ -496,8 +496,8 @@ public abstract class BaseHiveEvent {
protected AtlasEntity getHDFSPathEntity(Path path) {
String strPath = path.toString().toLowerCase();
String nameServiceID = HdfsNameServiceResolver.getInstance().getNameServiceIDForPath(strPath);
String attrPath = StringUtils.isEmpty(nameServiceID) ? strPath : HdfsNameServiceResolver.getInstance().getPathWithNameServiceID(strPath);
String nameServiceID = HdfsNameServiceResolver.getNameServiceIDForPath(strPath);
String attrPath = StringUtils.isEmpty(nameServiceID) ? strPath : HdfsNameServiceResolver.getPathWithNameServiceID(strPath);
String pathQualifiedName = getQualifiedName(attrPath);
AtlasEntity ret = context.getEntity(pathQualifiedName);
......@@ -646,8 +646,8 @@ public abstract class BaseHiveEvent {
protected String getQualifiedName(URI location) {
String strPath = new Path(location).toString().toLowerCase();
String nameServiceID = HdfsNameServiceResolver.getInstance().getNameServiceIDForPath(strPath);
String attrPath = StringUtils.isEmpty(nameServiceID) ? strPath : HdfsNameServiceResolver.getInstance().getPathWithNameServiceID(strPath);
String nameServiceID = HdfsNameServiceResolver.getNameServiceIDForPath(strPath);
String attrPath = StringUtils.isEmpty(nameServiceID) ? strPath : HdfsNameServiceResolver.getPathWithNameServiceID(strPath);
return getQualifiedName(attrPath);
}
......
......@@ -69,8 +69,6 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
public static final String HBASE_NAMESPACE_DEFAULT = "default";
public static final String ATTRIBUTE_DB = "db";
private final HdfsNameServiceResolver hdfsNameServiceResolver = HdfsNameServiceResolver.getInstance();
@Override
protected String getNumberOfRetriesPropertyKey() {
return HOOK_NUM_RETRIES;
......@@ -241,7 +239,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
final String hdfsUri = config.get("HdfsBolt.rotationActions") == null ? config.get("HdfsBolt.fileNameFormat.path") : config.get("HdfsBolt.rotationActions");
final String hdfsPathStr = config.get("HdfsBolt.fsUrl") + hdfsUri;
final Path hdfsPath = new Path(hdfsPathStr);
final String nameServiceID = hdfsNameServiceResolver.getNameServiceIDForPath(hdfsPathStr);
final String nameServiceID = HdfsNameServiceResolver.getNameServiceIDForPath(hdfsPathStr);
clusterName = getClusterName(stormConf);
......@@ -252,7 +250,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
ret.setAttribute(AtlasClient.NAME, Path.getPathWithoutSchemeAndAuthority(hdfsPath).toString().toLowerCase());
if (StringUtils.isNotEmpty(nameServiceID)) {
String updatedPath = hdfsNameServiceResolver.getPathWithNameServiceID(hdfsPathStr);
String updatedPath = HdfsNameServiceResolver.getPathWithNameServiceID(hdfsPathStr);
ret.setAttribute("path", updatedPath);
ret.setAttribute("nameServiceId", nameServiceID);
......
......@@ -38,45 +38,16 @@ public class HdfsNameServiceResolver {
private static final String HDFS_NAMESERVICE_PROPERTY_KEY = "dfs.nameservices";
private static final String HDFS_INTERNAL_NAMESERVICE_PROPERTY_KEY = "dfs.internal.nameservices";
private static final String HDFS_NAMENODES_HA_NODES_PREFIX = "dfs.ha.namenodes.";
private static final String HDFS_NAMENODE_ADDRESS_TEMPLATE = "dfs.namenode.rpc-address.%s.%s";
private static final String HDFS_NAMENODE_HA_ADDRESS_TEMPLATE = "dfs.namenode.rpc-address.%s.%s";
private static final String HDFS_NAMENODE_ADDRESS_TEMPLATE = "dfs.namenode.rpc-address.%s";
// Need non-final instance in order initialize the logger first
private static HdfsNameServiceResolver INSTANCE;
private static final Map<String, String> hostToNameServiceMap = new HashMap<>();
private final Map<String, String> hostToNameServiceMap = new HashMap<>();
private HdfsNameServiceResolver() {
init(new HdfsConfiguration(true));
}
public static HdfsNameServiceResolver getInstance() {
if (INSTANCE == null) {
INSTANCE = new HdfsNameServiceResolver();
}
return INSTANCE;
static {
init();
}
public String getNameServiceID(String host, int port) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HdfsNameServiceResolver.getNameServiceID({}, {})", host, port);
}
String ret = hostToNameServiceMap.getOrDefault(host + ":" + port, "");
if (LOG.isDebugEnabled()) {
LOG.debug("<== HdfsNameServiceResolver.getNameServiceID({}, {}) = {}", host, port, ret);
}
return ret;
}
public String getNameServiceID(String host) {
return getNameServiceID(host, DEFAULT_PORT);
}
public String getPathWithNameServiceID(String path) {
public static String getPathWithNameServiceID(String path) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HdfsNameServiceResolver.getPathWithNameServiceID({})", path);
}
......@@ -85,7 +56,7 @@ public class HdfsNameServiceResolver {
// Only handle URLs that begin with hdfs://
if (path != null && path.indexOf(HDFS_SCHEME) == 0) {
URI uri = new Path(path).toUri();
URI uri = new Path(path).toUri();
String nsId;
if (uri.getPort() != -1) {
......@@ -107,7 +78,7 @@ public class HdfsNameServiceResolver {
return ret;
}
public String getNameServiceIDForPath(String path) {
public static String getNameServiceIDForPath(String path) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HdfsNameServiceResolver.getNameServiceIDForPath({})", path);
}
......@@ -147,19 +118,34 @@ public class HdfsNameServiceResolver {
return ret;
}
private void init(final HdfsConfiguration hdfsConfiguration) {
private static String getNameServiceID(String host, int port) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HdfsNameServiceResolver.getNameServiceID({}, {})", host, port);
}
String ret = hostToNameServiceMap.getOrDefault(host + ":" + port, "");
if (LOG.isDebugEnabled()) {
LOG.debug("<== HdfsNameServiceResolver.getNameServiceID({}, {}) = {}", host, port, ret);
}
return ret;
}
private static void init() {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HdfsNameServiceResolver.init()");
}
HdfsConfiguration hdfsConfiguration = new HdfsConfiguration(true);
// Determine all available nameServiceIDs
String[] nameServiceIDs = hdfsConfiguration.getTrimmedStrings(HDFS_NAMESERVICE_PROPERTY_KEY);
String[] nameServiceIDs = hdfsConfiguration.getTrimmedStrings(HDFS_INTERNAL_NAMESERVICE_PROPERTY_KEY);
if (Objects.isNull(nameServiceIDs) || nameServiceIDs.length == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("NSID not found for {}, looking under {}", HDFS_NAMESERVICE_PROPERTY_KEY, HDFS_INTERNAL_NAMESERVICE_PROPERTY_KEY);
LOG.debug("NSID not found for {}, looking under {}", HDFS_INTERNAL_NAMESERVICE_PROPERTY_KEY, HDFS_NAMESERVICE_PROPERTY_KEY);
}
// Attempt another lookup using internal name service IDs key
nameServiceIDs = hdfsConfiguration.getTrimmedStrings(HDFS_INTERNAL_NAMESERVICE_PROPERTY_KEY);
nameServiceIDs = hdfsConfiguration.getTrimmedStrings(HDFS_NAMESERVICE_PROPERTY_KEY);
}
if (Objects.nonNull(nameServiceIDs) && nameServiceIDs.length > 0) {
......@@ -167,13 +153,30 @@ public class HdfsNameServiceResolver {
LOG.debug("NSIDs = {}", nameServiceIDs);
}
boolean isHA;
for (String nameServiceID : nameServiceIDs) {
// Find NameNode addresses and map to the NameServiceID
String[] nameNodes = hdfsConfiguration.getTrimmedStrings(HDFS_NAMENODES_HA_NODES_PREFIX + nameServiceID);
isHA = nameNodes != null && nameNodes.length > 0;
for (String nameNode : nameNodes) {
String nameNodeMappingKey = String.format(HDFS_NAMENODE_ADDRESS_TEMPLATE, nameServiceID, nameNode);
String nameNodeAddress = hdfsConfiguration.get(nameNodeMappingKey, "");
String nameNodeMappingKey, nameNodeAddress;
if (isHA) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing HA node info");
}
for (String nameNode : nameNodes) {
nameNodeMappingKey = String.format(HDFS_NAMENODE_HA_ADDRESS_TEMPLATE, nameServiceID, nameNode);
nameNodeAddress = hdfsConfiguration.get(nameNodeMappingKey, "");
// Add a mapping only if found
if (StringUtils.isNotEmpty(nameNodeAddress)) {
hostToNameServiceMap.put(nameNodeAddress, nameServiceID);
}
}
} else {
nameNodeMappingKey = String.format(HDFS_NAMENODE_ADDRESS_TEMPLATE, nameServiceID);
nameNodeAddress = hdfsConfiguration.get(nameNodeMappingKey, "");
// Add a mapping only if found
if (StringUtils.isNotEmpty(nameNodeAddress)) {
......
......@@ -22,36 +22,34 @@ import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
public class HdfsNameServiceResolverTest {
private HdfsNameServiceResolver hdfsNameServiceResolver = HdfsNameServiceResolver.getInstance();
@Test
public void testResolution() {
assertEquals(hdfsNameServiceResolver.getNameServiceID("test"), "");
assertEquals(hdfsNameServiceResolver.getNameServiceID("test1"), "");
assertEquals(hdfsNameServiceResolver.getNameServiceID("test", 8020), "");
assertEquals(hdfsNameServiceResolver.getNameServiceID("test1", 8020), "");
assertEquals(hdfsNameServiceResolver.getNameServiceID("ctr-e137-1514896590304-41888-01-000003"), "mycluster");
assertEquals(hdfsNameServiceResolver.getNameServiceID("ctr-e137-1514896590304-41888-01-000003", 8020), "mycluster");
assertEquals(hdfsNameServiceResolver.getNameServiceID("ctr-e137-1514896590304-41888-01-000004"), "mycluster");
assertEquals(hdfsNameServiceResolver.getNameServiceID("ctr-e137-1514896590304-41888-01-000004", 8020), "mycluster");
assertEquals(hdfsNameServiceResolver.getPathWithNameServiceID("hdfs://ctr-e137-1514896590304-41888-01-000004:8020/tmp/xyz"), "hdfs://mycluster/tmp/xyz");
assertEquals(hdfsNameServiceResolver.getPathWithNameServiceID("hdfs://ctr-e137-1514896590304-41888-01-000004:8020/tmp/xyz/ctr-e137-1514896590304-41888-01-000004:8020"), "hdfs://mycluster/tmp/xyz/ctr-e137-1514896590304-41888-01-000004:8020");
assertEquals(hdfsNameServiceResolver.getNameServiceIDForPath("hdfs://ctr-e137-1514896590304-41888-01-000004:8020/tmp/xyz"), "mycluster");
assertEquals(hdfsNameServiceResolver.getPathWithNameServiceID("hdfs://ctr-e137-1514896590304-41888-01-000003:8020/tmp/xyz"), "hdfs://mycluster/tmp/xyz");
assertEquals(hdfsNameServiceResolver.getNameServiceIDForPath("hdfs://ctr-e137-1514896590304-41888-01-000003:8020/tmp/xyz"), "mycluster");
assertEquals(hdfsNameServiceResolver.getPathWithNameServiceID("hdfs://ctr-e137-1514896590304-41888-01-000003/tmp/xyz"), "hdfs://mycluster/tmp/xyz");
assertEquals(hdfsNameServiceResolver.getNameServiceIDForPath("hdfs://ctr-e137-1514896590304-41888-01-000003/tmp/xyz"), "mycluster");
assertEquals(hdfsNameServiceResolver.getPathWithNameServiceID("hdfs://ctr-e137-1514896590304-41888-01-000003/tmp/xyz/ctr-e137-1514896590304-41888-01-000003"), "hdfs://mycluster/tmp/xyz/ctr-e137-1514896590304-41888-01-000003");
assertEquals(hdfsNameServiceResolver.getNameServiceIDForPath("hdfs://ctr-e137-1514896590304-41888-01-000003/tmp/xyz/ctr-e137-1514896590304-41888-01-000003"), "mycluster");
assertEquals(hdfsNameServiceResolver.getPathWithNameServiceID("hdfs://mycluster/tmp/xyz"), "hdfs://mycluster/tmp/xyz");
assertEquals(hdfsNameServiceResolver.getNameServiceIDForPath("hdfs://mycluster/tmp/xyz"), "mycluster");
// assertEquals(HdfsNameServiceResolver.getNameServiceID("test"), "");
// assertEquals(HdfsNameServiceResolver.getNameServiceID("test1"), "");
// assertEquals(HdfsNameServiceResolver.getNameServiceID("test", 8020), "");
// assertEquals(HdfsNameServiceResolver.getNameServiceID("test1", 8020), "");
//
// assertEquals(HdfsNameServiceResolver.getNameServiceID("ctr-e137-1514896590304-41888-01-000003"), "mycluster");
// assertEquals(HdfsNameServiceResolver.getNameServiceID("ctr-e137-1514896590304-41888-01-000003", 8020), "mycluster");
// assertEquals(HdfsNameServiceResolver.getNameServiceID("ctr-e137-1514896590304-41888-01-000004"), "mycluster");
// assertEquals(HdfsNameServiceResolver.getNameServiceID("ctr-e137-1514896590304-41888-01-000004", 8020), "mycluster");
assertEquals(HdfsNameServiceResolver.getPathWithNameServiceID("hdfs://ctr-e137-1514896590304-41888-01-000004:8020/tmp/xyz"), "hdfs://mycluster/tmp/xyz");
assertEquals(HdfsNameServiceResolver.getPathWithNameServiceID("hdfs://ctr-e137-1514896590304-41888-01-000004:8020/tmp/xyz/ctr-e137-1514896590304-41888-01-000004:8020"), "hdfs://mycluster/tmp/xyz/ctr-e137-1514896590304-41888-01-000004:8020");
assertEquals(HdfsNameServiceResolver.getNameServiceIDForPath("hdfs://ctr-e137-1514896590304-41888-01-000004:8020/tmp/xyz"), "mycluster");
assertEquals(HdfsNameServiceResolver.getPathWithNameServiceID("hdfs://ctr-e137-1514896590304-41888-01-000003:8020/tmp/xyz"), "hdfs://mycluster/tmp/xyz");
assertEquals(HdfsNameServiceResolver.getNameServiceIDForPath("hdfs://ctr-e137-1514896590304-41888-01-000003:8020/tmp/xyz"), "mycluster");
assertEquals(HdfsNameServiceResolver.getPathWithNameServiceID("hdfs://ctr-e137-1514896590304-41888-01-000003/tmp/xyz"), "hdfs://mycluster/tmp/xyz");
assertEquals(HdfsNameServiceResolver.getNameServiceIDForPath("hdfs://ctr-e137-1514896590304-41888-01-000003/tmp/xyz"), "mycluster");
assertEquals(HdfsNameServiceResolver.getPathWithNameServiceID("hdfs://ctr-e137-1514896590304-41888-01-000003/tmp/xyz/ctr-e137-1514896590304-41888-01-000003"), "hdfs://mycluster/tmp/xyz/ctr-e137-1514896590304-41888-01-000003");
assertEquals(HdfsNameServiceResolver.getNameServiceIDForPath("hdfs://ctr-e137-1514896590304-41888-01-000003/tmp/xyz/ctr-e137-1514896590304-41888-01-000003"), "mycluster");
assertEquals(HdfsNameServiceResolver.getPathWithNameServiceID("hdfs://mycluster/tmp/xyz"), "hdfs://mycluster/tmp/xyz");
assertEquals(HdfsNameServiceResolver.getNameServiceIDForPath("hdfs://mycluster/tmp/xyz"), "mycluster");
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment