Commit 3ec37cae by apoorvnaik Committed by Madhan Neethiraj

ATLAS-2444: Atlas changes for HDFS NameNode federation

parent 4119f431
......@@ -27,17 +27,19 @@ import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.hook.HiveHook;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hook.AtlasHookException;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.atlas.utils.HdfsNameServiceResolver;
import org.apache.atlas.v1.model.instance.Id;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
......@@ -57,6 +59,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import static org.apache.atlas.hive.hook.HiveHook.CONF_PREFIX;
/**
......@@ -102,6 +105,8 @@ public class HiveMetaStoreBridge {
private final AtlasClient atlasClient;
private final boolean convertHdfsPathToLowerCase;
private final HdfsNameServiceResolver hdfsNameServiceResolver = HdfsNameServiceResolver.getInstance();
HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient) {
this(clusterName, hiveClient, atlasClient, true);
}
......@@ -201,7 +206,8 @@ public class HiveMetaStoreBridge {
dbRef.set(AtlasClient.NAME, dbName);
dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
dbRef.set(DESCRIPTION_ATTR, hiveDB.getDescription());
dbRef.set(LOCATION, hiveDB.getLocationUri());
dbRef.set(LOCATION, hdfsNameServiceResolver.getPathWithNameServiceID(hiveDB.getLocationUri()));
dbRef.set(PARAMETERS, hiveDB.getParameters());
dbRef.set(AtlasClient.OWNER, hiveDB.getOwnerName());
if (hiveDB.getOwnerType() != null) {
......@@ -574,7 +580,7 @@ public class HiveMetaStoreBridge {
sdReferenceable.set("sortCols", sortColsStruct);
}
sdReferenceable.set(LOCATION, storageDesc.getLocation());
sdReferenceable.set(LOCATION, hdfsNameServiceResolver.getPathWithNameServiceID(storageDesc.getLocation()));
sdReferenceable.set("inputFormat", storageDesc.getInputFormat());
sdReferenceable.set("outputFormat", storageDesc.getOutputFormat());
sdReferenceable.set("compressed", storageDesc.isCompressed());
......@@ -592,10 +598,25 @@ public class HiveMetaStoreBridge {
public Referenceable fillHDFSDataSet(String pathUri) {
Referenceable ref = new Referenceable(HDFS_PATH);
ref.set("path", pathUri);
// Get the name service ID for the given HDFS path
String nameServiceID = hdfsNameServiceResolver.getNameServiceIDForPath(pathUri);
Path path = new Path(pathUri);
ref.set(AtlasClient.NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
ref.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, pathUri);
if (StringUtils.isNotEmpty(nameServiceID)) {
// Name service resolution is successful, now get updated HDFS path where the host port info is replaced by
// resolved name service
String updatedHdfsPath = hdfsNameServiceResolver.getPathWithNameServiceID(pathUri);
ref.set("path", updatedHdfsPath);
ref.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(clusterName, updatedHdfsPath));
// Only set name service if it was resolved
ref.set("nameServiceId", nameServiceID);
} else {
ref.set("path", pathUri);
ref.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(clusterName, pathUri));
}
ref.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
return ref;
}
......@@ -628,6 +649,10 @@ public class HiveMetaStoreBridge {
return colList;
}
public static String getHdfsPathQualifiedName(String clusterName, String hdfsPath) {
return String.format("%s@%s", hdfsPath, clusterName);
}
public static void main(String[] args) throws AtlasHookException {
try {
......
......@@ -159,7 +159,7 @@
"superTypes": [
"fs_path"
],
"typeVersion": "1.0",
"typeVersion": "1.1",
"attributeDefs": [
{
"name": "clusterName",
......@@ -184,6 +184,14 @@
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "nameServiceId",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
}
]
}
......
......@@ -18,6 +18,7 @@
package org.apache.atlas.storm.hook;
import org.apache.atlas.utils.HdfsNameServiceResolver;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.storm.ISubmitterHook;
import org.apache.storm.generated.Bolt;
......@@ -64,6 +65,8 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
public static final String HBASE_NAMESPACE_DEFAULT = "default";
private final HdfsNameServiceResolver hdfsNameServiceResolver = HdfsNameServiceResolver.getInstance();
@Override
protected String getNumberOfRetriesPropertyKey() {
return HOOK_NUM_RETRIES;
......@@ -214,10 +217,22 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
String hdfsUri = config.get("HdfsBolt.rotationActions") == null
? config.get("HdfsBolt.fileNameFormat.path")
: config.get("HdfsBolt.rotationActions");
final String hdfsPathStr = config.get("HdfsBolt.fsUrl") + hdfsUri;
final String hdfsPathStr = config.get("HdfsBolt.fsUrl") + hdfsUri;
final String nameServiceID = hdfsNameServiceResolver.getNameServiceIDForPath(hdfsPathStr);
clusterName = getClusterName(stormConf);
dataSetReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf));
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, hdfsPathStr);
dataSetReferenceable.set("path", hdfsPathStr);
if (StringUtils.isNotEmpty(nameServiceID)) {
String updatedPath = hdfsNameServiceResolver.getPathWithNameServiceID(hdfsPathStr);
dataSetReferenceable.set("path", updatedPath);
dataSetReferenceable.set("nameServiceId", nameServiceID);
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(clusterName, updatedPath));
} else {
dataSetReferenceable.set("path", hdfsPathStr);
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(clusterName, hdfsPathStr));
}
dataSetReferenceable.set(AtlasClient.OWNER, stormConf.get("hdfs.kerberos.principal"));
final Path hdfsPath = new Path(hdfsPathStr);
dataSetReferenceable.set(AtlasClient.NAME, Path.getPathWithoutSchemeAndAuthority(hdfsPath).toString().toLowerCase());
......@@ -367,6 +382,10 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
return String.format("%s.%s@%s", nameSpace, tableName, clusterName);
}
public static String getHdfsPathQualifiedName(String clusterName, String hdfsPath) {
return String.format("%s@%s", hdfsPath, clusterName);
}
private String getClusterName(Map stormConf) {
return atlasProperties.getString(AtlasConstants.CLUSTER_NAME_KEY, AtlasConstants.DEFAULT_CLUSTER_NAME);
}
......
......@@ -57,6 +57,19 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
<!-- This is only used for certain helper classes and should not be packaged -->
<scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</dependency>
......@@ -89,7 +102,6 @@
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-intg</artifactId>
</dependency>
</dependencies>
<build>
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.utils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
public class HdfsNameServiceResolver {
private static final Logger LOG = LoggerFactory.getLogger(HdfsNameServiceResolver.class);
private static final int DEFAULT_PORT = 8020;
private static final String HDFS_SCHEME = "hdfs://";
private static final String HDFS_NAMESERVICE_PROPERTY_KEY = "dfs.nameservices";
private static final String HDFS_INTERNAL_NAMESERVICE_PROPERTY_KEY = "dfs.internal.nameservices";
private static final String HDFS_NAMENODES_HA_NODES_PREFIX = "dfs.ha.namenodes.";
private static final String HDFS_NAMENODE_ADDRESS_TEMPLATE = "dfs.namenode.rpc-address.%s.%s";
// Need non-final instance in order initialize the logger first
private static HdfsNameServiceResolver INSTANCE;
private final Map<String, String> hostToNameServiceMap = new HashMap<>();
private HdfsNameServiceResolver() {
init(new HdfsConfiguration(true));
}
public static HdfsNameServiceResolver getInstance() {
if (INSTANCE == null) {
INSTANCE = new HdfsNameServiceResolver();
}
return INSTANCE;
}
public String getNameServiceID(String host, int port) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HdfsNameServiceResolver.getNameServiceID({}, {})", host, port);
}
String ret = hostToNameServiceMap.getOrDefault(host + ":" + port, "");
if (LOG.isDebugEnabled()) {
LOG.debug("<== HdfsNameServiceResolver.getNameServiceID({}, {}) = {}", host, port, ret);
}
return ret;
}
public String getNameServiceID(String host) {
return getNameServiceID(host, DEFAULT_PORT);
}
public String getPathWithNameServiceID(String path) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HdfsNameServiceResolver.getPathWithNameServiceID({})", path);
}
String ret = path;
// Only handle URLs that begin with hdfs://
if (path.indexOf(HDFS_SCHEME) == 0) {
URI uri = new Path(path).toUri();
String nsId;
if (uri.getPort() != -1) {
nsId = hostToNameServiceMap.get(uri.getAuthority());
} else {
nsId = hostToNameServiceMap.get(uri.getHost() + ":" + DEFAULT_PORT);
}
if (nsId != null) {
ret = StringUtils.replaceOnce(path, uri.getAuthority(), nsId);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== HdfsNameServiceResolver.getPathWithNameServiceID({}) = {}", path, ret);
}
return ret;
}
public String getNameServiceIDForPath(String path) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HdfsNameServiceResolver.getNameServiceIDForPath({})", path);
}
String ret = "";
// Only handle path URLs that begin with hdfs://
if (path.indexOf(HDFS_SCHEME) == 0) {
try {
URI uri = new Path(path).toUri();
if (uri != null) {
// URI can contain host and port
if (uri.getPort() != -1) {
ret = getNameServiceID(uri.getHost(), uri.getPort());
} else {
// No port information present, it means the path might contain only host or the nameservice id itself
// Try resolving using default port
ret = getNameServiceID(uri.getHost(), DEFAULT_PORT);
// If not resolved yet, then the path must contain nameServiceId
if (StringUtils.isEmpty(ret) && hostToNameServiceMap.containsValue(uri.getHost())) {
ret = uri.getHost();
}
}
}
} catch (IllegalArgumentException ignored) {
// No need to do anything
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== HdfsNameServiceResolver.getNameServiceIDForPath({}) = {}", path, ret);
}
return ret;
}
private void init(final HdfsConfiguration hdfsConfiguration) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HdfsNameServiceResolver.init()");
}
// Determine all available nameServiceIDs
String[] nameServiceIDs = hdfsConfiguration.getTrimmedStrings(HDFS_NAMESERVICE_PROPERTY_KEY);
if (Objects.isNull(nameServiceIDs) || nameServiceIDs.length == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("NSID not found for {}, looking under {}", HDFS_NAMESERVICE_PROPERTY_KEY, HDFS_INTERNAL_NAMESERVICE_PROPERTY_KEY);
}
// Attempt another lookup using internal name service IDs key
nameServiceIDs = hdfsConfiguration.getTrimmedStrings(HDFS_INTERNAL_NAMESERVICE_PROPERTY_KEY);
}
if (Objects.nonNull(nameServiceIDs) && nameServiceIDs.length > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("NSIDs = {}", nameServiceIDs);
}
for (String nameServiceID : nameServiceIDs) {
// Find NameNode addresses and map to the NameServiceID
String[] nameNodes = hdfsConfiguration.getTrimmedStrings(HDFS_NAMENODES_HA_NODES_PREFIX + nameServiceID);
for (String nameNode : nameNodes) {
String nameNodeMappingKey = String.format(HDFS_NAMENODE_ADDRESS_TEMPLATE, nameServiceID, nameNode);
String nameNodeAddress = hdfsConfiguration.get(nameNodeMappingKey, "");
// Add a mapping only if found
if (StringUtils.isNotEmpty(nameNodeAddress)) {
hostToNameServiceMap.put(nameNodeAddress, nameServiceID);
}
}
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("No NSID could be resolved");
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== HdfsNameServiceResolver.init()");
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.utils;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
public class HdfsNameServiceResolverTest {
private HdfsNameServiceResolver hdfsNameServiceResolver = HdfsNameServiceResolver.getInstance();
@Test
public void testResolution() {
assertEquals(hdfsNameServiceResolver.getNameServiceID("test"), "");
assertEquals(hdfsNameServiceResolver.getNameServiceID("test1"), "");
assertEquals(hdfsNameServiceResolver.getNameServiceID("test", 8020), "");
assertEquals(hdfsNameServiceResolver.getNameServiceID("test1", 8020), "");
assertEquals(hdfsNameServiceResolver.getNameServiceID("ctr-e137-1514896590304-41888-01-000003"), "mycluster");
assertEquals(hdfsNameServiceResolver.getNameServiceID("ctr-e137-1514896590304-41888-01-000003", 8020), "mycluster");
assertEquals(hdfsNameServiceResolver.getNameServiceID("ctr-e137-1514896590304-41888-01-000004"), "mycluster");
assertEquals(hdfsNameServiceResolver.getNameServiceID("ctr-e137-1514896590304-41888-01-000004", 8020), "mycluster");
assertEquals(hdfsNameServiceResolver.getPathWithNameServiceID("hdfs://ctr-e137-1514896590304-41888-01-000004:8020/tmp/xyz"), "hdfs://mycluster/tmp/xyz");
assertEquals(hdfsNameServiceResolver.getPathWithNameServiceID("hdfs://ctr-e137-1514896590304-41888-01-000004:8020/tmp/xyz/ctr-e137-1514896590304-41888-01-000004:8020"), "hdfs://mycluster/tmp/xyz/ctr-e137-1514896590304-41888-01-000004:8020");
assertEquals(hdfsNameServiceResolver.getNameServiceIDForPath("hdfs://ctr-e137-1514896590304-41888-01-000004:8020/tmp/xyz"), "mycluster");
assertEquals(hdfsNameServiceResolver.getPathWithNameServiceID("hdfs://ctr-e137-1514896590304-41888-01-000003:8020/tmp/xyz"), "hdfs://mycluster/tmp/xyz");
assertEquals(hdfsNameServiceResolver.getNameServiceIDForPath("hdfs://ctr-e137-1514896590304-41888-01-000003:8020/tmp/xyz"), "mycluster");
assertEquals(hdfsNameServiceResolver.getPathWithNameServiceID("hdfs://ctr-e137-1514896590304-41888-01-000003/tmp/xyz"), "hdfs://mycluster/tmp/xyz");
assertEquals(hdfsNameServiceResolver.getNameServiceIDForPath("hdfs://ctr-e137-1514896590304-41888-01-000003/tmp/xyz"), "mycluster");
assertEquals(hdfsNameServiceResolver.getPathWithNameServiceID("hdfs://ctr-e137-1514896590304-41888-01-000003/tmp/xyz/ctr-e137-1514896590304-41888-01-000003"), "hdfs://mycluster/tmp/xyz/ctr-e137-1514896590304-41888-01-000003");
assertEquals(hdfsNameServiceResolver.getNameServiceIDForPath("hdfs://ctr-e137-1514896590304-41888-01-000003/tmp/xyz/ctr-e137-1514896590304-41888-01-000003"), "mycluster");
assertEquals(hdfsNameServiceResolver.getPathWithNameServiceID("hdfs://mycluster/tmp/xyz"), "hdfs://mycluster/tmp/xyz");
assertEquals(hdfsNameServiceResolver.getNameServiceIDForPath("hdfs://mycluster/tmp/xyz"), "mycluster");
}
}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8" ?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="console" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/>
</layout>
</appender>
<logger name="org.apache.atlas" additivity="false">
<level value="debug"/>
<appender-ref ref="console"/>
</logger>
<!-- uncomment this block to generate performance traces
<appender name="perf_appender" class="org.apache.log4j.DailyRollingFileAppender">
<param name="file" value="${atlas.log.dir}/atlas_perf.log" />
<param name="datePattern" value="'.'yyyy-MM-dd" />
<param name="append" value="true" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d|%t|%m%n" />
</layout>
</appender>
<logger name="org.apache.atlas.perf" additivity="false">
<level value="debug" />
<appender-ref ref="perf_appender" />
</logger>
-->
<logger name="com.thinkaurelius.titan" additivity="false">
<level value="warn"/>
<appender-ref ref="console"/>
</logger>
<logger name="org.springframework" additivity="false">
<level value="warn"/>
<appender-ref ref="console"/>
</logger>
<logger name="org.eclipse" additivity="false">
<level value="warn"/>
<appender-ref ref="console"/>
</logger>
<logger name="com.sun.jersey" additivity="false">
<level value="warn"/>
<appender-ref ref="console"/>
</logger>
<root>
<priority value="warn"/>
<appender-ref ref="console"/>
</root>
</log4j:configuration>
......@@ -2010,6 +2010,7 @@
<exclude>**/policy-store.txt</exclude>
<exclude>**/*rebel*.xml</exclude>
<exclude>**/*rebel*.xml.bak</exclude>
<exclude>**/test/resources/**</exclude>
</excludes>
</configuration>
<executions>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment