Commit 15639ee9 by Jon Maron

ATLAS-32 create HTTP connection in context of invoking user in secure cluster

parent d6f5d9da
...@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; ...@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
...@@ -66,14 +67,18 @@ public class HiveMetaStoreBridge { ...@@ -66,14 +67,18 @@ public class HiveMetaStoreBridge {
private final Hive hiveClient; private final Hive hiveClient;
private final AtlasClient atlasClient; private final AtlasClient atlasClient;
public HiveMetaStoreBridge(HiveConf hiveConf) throws Exception {
this(hiveConf, null, null);
}
/** /**
* Construct a HiveMetaStoreBridge. * Construct a HiveMetaStoreBridge.
* @param hiveConf hive conf * @param hiveConf hive conf
*/ */
public HiveMetaStoreBridge(HiveConf hiveConf) throws Exception { public HiveMetaStoreBridge(HiveConf hiveConf, String doAsUser, UserGroupInformation ugi) throws Exception {
clusterName = hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); clusterName = hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
hiveClient = Hive.get(hiveConf); hiveClient = Hive.get(hiveConf);
atlasClient = new AtlasClient(hiveConf.get(DGI_URL_PROPERTY, DEFAULT_DGI_URL)); atlasClient = new AtlasClient(hiveConf.get(DGI_URL_PROPERTY, DEFAULT_DGI_URL), ugi, doAsUser);
} }
public AtlasClient getAtlasClient() { public AtlasClient getAtlasClient() {
......
...@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity; ...@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.security.UserGroupInformation;
import org.json.JSONObject; import org.json.JSONObject;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -113,6 +114,7 @@ public class HiveHook implements ExecuteWithHookContext { ...@@ -113,6 +114,7 @@ public class HiveHook implements ExecuteWithHookContext {
public Set<WriteEntity> outputs; public Set<WriteEntity> outputs;
public String user; public String user;
public UserGroupInformation ugi;
public HiveOperation operation; public HiveOperation operation;
public QueryPlan queryPlan; public QueryPlan queryPlan;
public HookContext.HookType hookType; public HookContext.HookType hookType;
...@@ -136,6 +138,7 @@ public class HiveHook implements ExecuteWithHookContext { ...@@ -136,6 +138,7 @@ public class HiveHook implements ExecuteWithHookContext {
event.outputs = hookContext.getOutputs(); event.outputs = hookContext.getOutputs();
event.user = hookContext.getUserName() == null ? hookContext.getUgi().getUserName() : hookContext.getUserName(); event.user = hookContext.getUserName() == null ? hookContext.getUgi().getUserName() : hookContext.getUserName();
event.ugi = hookContext.getUgi();
event.operation = HiveOperation.valueOf(hookContext.getOperationName()); event.operation = HiveOperation.valueOf(hookContext.getOperationName());
event.queryPlan = hookContext.getQueryPlan(); event.queryPlan = hookContext.getQueryPlan();
event.hookType = hookContext.getHookType(); event.hookType = hookContext.getHookType();
...@@ -162,7 +165,7 @@ public class HiveHook implements ExecuteWithHookContext { ...@@ -162,7 +165,7 @@ public class HiveHook implements ExecuteWithHookContext {
assert event.hookType == HookContext.HookType.POST_EXEC_HOOK : "Non-POST_EXEC_HOOK not supported!"; assert event.hookType == HookContext.HookType.POST_EXEC_HOOK : "Non-POST_EXEC_HOOK not supported!";
LOG.info("Entered Atlas hook for hook type {} operation {}", event.hookType, event.operation); LOG.info("Entered Atlas hook for hook type {} operation {}", event.hookType, event.operation);
HiveMetaStoreBridge dgiBridge = new HiveMetaStoreBridge(event.conf); HiveMetaStoreBridge dgiBridge = new HiveMetaStoreBridge(event.conf, event.user, event.ugi);
if (!typesRegistered) { if (!typesRegistered) {
dgiBridge.registerHiveDataModel(); dgiBridge.registerHiveDataModel();
......
...@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path; ...@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.alias.JavaKeyStoreProvider; import org.apache.hadoop.security.alias.JavaKeyStoreProvider;
import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.ssl.SSLHostnameVerifier; import org.apache.hadoop.security.ssl.SSLHostnameVerifier;
...@@ -129,12 +130,23 @@ public class SSLAndKerberosHiveHookIT extends BaseSSLAndKerberosTest { ...@@ -129,12 +130,23 @@ public class SSLAndKerberosHiveHookIT extends BaseSSLAndKerberosTest {
configuration.save(new FileWriter(persistDir + File.separator + "application.properties")); configuration.save(new FileWriter(persistDir + File.separator + "application.properties"));
dgiCLient = new AtlasClient(DGI_URL) { subject = loginTestUser();
UserGroupInformation.loginUserFromSubject(subject);
UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
"testUser",
UserGroupInformation.getLoginUser());
dgiCLient = proxyUser.doAs(new PrivilegedExceptionAction<AtlasClient>() {
@Override @Override
protected PropertiesConfiguration getClientProperties() throws AtlasException { public AtlasClient run() throws Exception {
return configuration; return new AtlasClient(DGI_URL) {
@Override
protected PropertiesConfiguration getClientProperties() throws AtlasException {
return configuration;
}
};
} }
}; });
secureEmbeddedServer = new TestSecureEmbeddedServer(21443, "webapp/target/apache-atlas") { secureEmbeddedServer = new TestSecureEmbeddedServer(21443, "webapp/target/apache-atlas") {
@Override @Override
...@@ -152,7 +164,6 @@ public class SSLAndKerberosHiveHookIT extends BaseSSLAndKerberosTest { ...@@ -152,7 +164,6 @@ public class SSLAndKerberosHiveHookIT extends BaseSSLAndKerberosTest {
System.setProperty("atlas.conf", persistDir); System.setProperty("atlas.conf", persistDir);
secureEmbeddedServer.getServer().start(); secureEmbeddedServer.getServer().start();
subject = loginTestUser();
} }
@AfterClass @AfterClass
...@@ -194,7 +205,11 @@ public class SSLAndKerberosHiveHookIT extends BaseSSLAndKerberosTest { ...@@ -194,7 +205,11 @@ public class SSLAndKerberosHiveHookIT extends BaseSSLAndKerberosTest {
private void runCommand(final String cmd) throws Exception { private void runCommand(final String cmd) throws Exception {
ss.setCommandType(null); ss.setCommandType(null);
Subject.doAs(subject, new PrivilegedExceptionAction<Object>() { UserGroupInformation.loginUserFromSubject(subject);
UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
"testUser",
UserGroupInformation.getLoginUser());
proxyUser.doAs(new PrivilegedExceptionAction<Object>() {
@Override @Override
public Object run() throws Exception { public Object run() throws Exception {
driver.run(cmd); driver.run(cmd);
...@@ -218,7 +233,11 @@ public class SSLAndKerberosHiveHookIT extends BaseSSLAndKerberosTest { ...@@ -218,7 +233,11 @@ public class SSLAndKerberosHiveHookIT extends BaseSSLAndKerberosTest {
private void assertInstanceIsRegistered(final String typeName, final String colName, final String colValue) private void assertInstanceIsRegistered(final String typeName, final String colName, final String colValue)
throws Exception { throws Exception {
Subject.doAs(subject, new PrivilegedExceptionAction<Object>() { UserGroupInformation.loginUserFromSubject(subject);
UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
"testUser",
UserGroupInformation.getLoginUser());
proxyUser.doAs(new PrivilegedExceptionAction<Object>() {
@Override @Override
public Object run() throws Exception { public Object run() throws Exception {
JSONArray results = dgiCLient.rawSearch(typeName, colName, colValue); JSONArray results = dgiCLient.rawSearch(typeName, colName, colValue);
......
...@@ -27,6 +27,7 @@ import org.apache.atlas.security.SecureClientUtils; ...@@ -27,6 +27,7 @@ import org.apache.atlas.security.SecureClientUtils;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
...@@ -81,6 +82,10 @@ public class AtlasClient { ...@@ -81,6 +82,10 @@ public class AtlasClient {
private WebResource service; private WebResource service;
public AtlasClient(String baseUrl) { public AtlasClient(String baseUrl) {
this(baseUrl, null, null);
}
public AtlasClient(String baseUrl, UserGroupInformation ugi, String doAsUser) {
DefaultClientConfig config = new DefaultClientConfig(); DefaultClientConfig config = new DefaultClientConfig();
PropertiesConfiguration clientConfig = null; PropertiesConfiguration clientConfig = null;
try { try {
...@@ -95,7 +100,8 @@ public class AtlasClient { ...@@ -95,7 +100,8 @@ public class AtlasClient {
LOG.info("Error processing client configuration.", e); LOG.info("Error processing client configuration.", e);
} }
URLConnectionClientHandler handler = SecureClientUtils.getClientConnectionHandler(config, clientConfig); URLConnectionClientHandler handler =
SecureClientUtils.getClientConnectionHandler(config, clientConfig, doAsUser, ugi);
Client client = new Client(handler, config); Client client = new Client(handler, config);
client.resource(UriBuilder.fromUri(baseUrl).build()); client.resource(UriBuilder.fromUri(baseUrl).build());
......
...@@ -45,6 +45,7 @@ import java.net.HttpURLConnection; ...@@ -45,6 +45,7 @@ import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import java.net.URLConnection; import java.net.URLConnection;
import java.security.GeneralSecurityException; import java.security.GeneralSecurityException;
import java.security.PrivilegedExceptionAction;
import static org.apache.atlas.security.SecurityProperties.CERT_STORES_CREDENTIAL_PROVIDER_PATH; import static org.apache.atlas.security.SecurityProperties.CERT_STORES_CREDENTIAL_PROVIDER_PATH;
import static org.apache.atlas.security.SecurityProperties.CLIENT_AUTH_KEY; import static org.apache.atlas.security.SecurityProperties.CLIENT_AUTH_KEY;
...@@ -61,7 +62,7 @@ public class SecureClientUtils { ...@@ -61,7 +62,7 @@ public class SecureClientUtils {
public static URLConnectionClientHandler getClientConnectionHandler(DefaultClientConfig config, public static URLConnectionClientHandler getClientConnectionHandler(DefaultClientConfig config,
PropertiesConfiguration clientConfig) { PropertiesConfiguration clientConfig, final String doAsUser, final UserGroupInformation ugi) {
config.getProperties().put(URLConnectionClientHandler.PROPERTY_HTTP_URL_CONNECTION_SET_METHOD_WORKAROUND, true); config.getProperties().put(URLConnectionClientHandler.PROPERTY_HTTP_URL_CONNECTION_SET_METHOD_WORKAROUND, true);
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.addResource(conf.get(SSLFactory.SSL_CLIENT_CONF_KEY, "ssl-client.xml")); conf.addResource(conf.get(SSLFactory.SSL_CLIENT_CONF_KEY, "ssl-client.xml"));
...@@ -78,17 +79,47 @@ public class SecureClientUtils { ...@@ -78,17 +79,47 @@ public class SecureClientUtils {
authenticator.setConnectionConfigurator(connConfigurator); authenticator.setConnectionConfigurator(connConfigurator);
final DelegationTokenAuthenticator finalAuthenticator = (DelegationTokenAuthenticator) authenticator; final DelegationTokenAuthenticator finalAuthenticator = (DelegationTokenAuthenticator) authenticator;
final DelegationTokenAuthenticatedURL.Token token = new DelegationTokenAuthenticatedURL.Token(); final DelegationTokenAuthenticatedURL.Token token = new DelegationTokenAuthenticatedURL.Token();
HttpURLConnectionFactory httpURLConnectionFactory = new HttpURLConnectionFactory() { HttpURLConnectionFactory httpURLConnectionFactory = null;
@Override try {
public HttpURLConnection getHttpURLConnection(final URL url) throws IOException { UserGroupInformation ugiToUse = ugi != null ?
try { ugi : UserGroupInformation.getCurrentUser();
return new DelegationTokenAuthenticatedURL(finalAuthenticator, connConfigurator) final UserGroupInformation actualUgi =
.openConnection(url, token, null); (ugiToUse.getAuthenticationMethod() ==
} catch (Exception e) { UserGroupInformation.AuthenticationMethod.PROXY)
throw new IOException(e); ? ugiToUse.getRealUser()
: ugiToUse;
LOG.info("Real User: {}, is from ticket cache? {}",
actualUgi,
actualUgi.isLoginTicketBased());
LOG.info("doAsUser: {}", doAsUser);
httpURLConnectionFactory = new HttpURLConnectionFactory() {
@Override
public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
try {
return actualUgi.doAs(new PrivilegedExceptionAction<HttpURLConnection>() {
@Override
public HttpURLConnection run() throws Exception {
try {
return new DelegationTokenAuthenticatedURL(
finalAuthenticator, connConfigurator)
.openConnection(url, token, doAsUser);
} catch (Exception e) {
throw new IOException(e);
}
}
});
} catch (Exception e) {
if (e instanceof IOException) {
throw (IOException) e;
} else {
throw new IOException(e);
}
}
} }
} };
}; } catch (IOException e) {
LOG.warn("Error obtaining user", e);
}
return new URLConnectionClientHandler(httpURLConnectionFactory); return new URLConnectionClientHandler(httpURLConnectionFactory);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment