Commit 61abecac by Jayendra Parab Committed by nixonrodrigues

ATLAS-3779 : Refactoring Kafka in-memory JAASConfig in Atlas.

parent 6248e361
......@@ -17,7 +17,6 @@
*/
package org.apache.atlas;
import org.apache.atlas.security.InMemoryJAASConfiguration;
import org.apache.atlas.security.SecurityUtil;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
......@@ -109,10 +108,7 @@ public final class ApplicationProperties extends PropertiesConfiguration {
public static Configuration set(Configuration configuration) throws AtlasException {
synchronized (ApplicationProperties.class) {
instance = configuration;
InMemoryJAASConfiguration.init(instance);
}
return instance;
}
......
......@@ -93,12 +93,6 @@ public abstract class AtlasHook {
failedMessagesLogger = null;
}
if (!isLoginKeytabBased()) {
if (isLoginTicketBased()) {
InMemoryJAASConfiguration.setConfigSectionRedirect("KafkaClient", "ticketBased-KafkaClient");
}
}
metadataNamespace = getMetadataNamespace(atlasProperties);
notificationMaxRetries = atlasProperties.getInt(ATLAS_NOTIFICATION_MAX_RETRIES, 3);
notificationRetryInterval = atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);
......@@ -287,30 +281,6 @@ public abstract class AtlasHook {
}
}
private static boolean isLoginKeytabBased() {
boolean ret = false;
try {
ret = UserGroupInformation.isLoginKeytabBased();
} catch (Exception excp) {
LOG.warn("Error in determining keytab for KafkaClient-JAAS config", excp);
}
return ret;
}
private static boolean isLoginTicketBased() {
boolean ret = false;
try {
ret = UserGroupInformation.isLoginTicketBased();
} catch (Exception excp) {
LOG.warn("Error in determining ticket-cache for KafkaClient-JAAS config", excp);
}
return ret;
}
private static String getMetadataNamespace(Configuration config) {
return config.getString(CONF_METADATA_NAMESPACE, getClusterName(config));
}
......
......@@ -28,6 +28,7 @@ import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
......@@ -64,6 +65,17 @@ public class KafkaNotification extends AbstractNotification implements Service {
public static final String ATLAS_ENTITIES_TOPIC = AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString();
protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";
static final String KAFKA_SASL_JAAS_CONFIG_PROPERTY = "sasl.jaas.config";
private static final String JAAS_CONFIG_PREFIX_PARAM = "atlas.jaas";
private static final String JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM = "loginModuleName";
private static final String JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM = "loginModuleControlFlag";
private static final String JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG = "required";
private static final String JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS = "optional|requisite|sufficient|required";
private static final String JAAS_CONFIG_LOGIN_OPTIONS_PREFIX = "option";
private static final String JAAS_PRINCIPAL_PROP = "principal";
private static final String JAAS_DEFAULT_CLIENT_NAME = "KafkaClient";
private static final String JAAS_TICKET_BASED_CLIENT_NAME = "ticketBased-KafkaClient";
private static final String[] ATLAS_HOOK_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_HOOK_TOPIC);
private static final String[] ATLAS_ENTITIES_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_ENTITIES_TOPIC);
......@@ -134,6 +146,8 @@ public class KafkaNotification extends AbstractNotification implements Service {
// if no value is specified for max.poll.records, set to 1
properties.put("max.poll.records", kafkaConf.getInt("max.poll.records", 1));
setKafkaJAASProperties(applicationProperties, properties);
LOG.info("<== KafkaNotification()");
}
......@@ -401,4 +415,116 @@ public class KafkaNotification extends AbstractNotification implements Service {
return ret;
}
void setKafkaJAASProperties(Configuration configuration, Properties kafkaProperties) {
LOG.debug("==> KafkaNotification.setKafkaJAASProperties()");
if(kafkaProperties.containsKey(KAFKA_SASL_JAAS_CONFIG_PROPERTY)) {
LOG.debug("JAAS config is already set, returning");
return;
}
Properties jaasConfig = ApplicationProperties.getSubsetAsProperties(configuration, JAAS_CONFIG_PREFIX_PARAM);
// JAAS Configuration is present then update set those properties in sasl.jaas.config
if(jaasConfig != null && !jaasConfig.isEmpty()) {
String jaasClientName = JAAS_DEFAULT_CLIENT_NAME;
// Required for backward compatability for Hive CLI
if (!isLoginKeytabBased() && isLoginTicketBased()) {
LOG.debug("Using ticketBased-KafkaClient JAAS configuration");
jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME;
}
String keyPrefix = jaasClientName + ".";
String keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM;
String loginModuleName = jaasConfig.getProperty(keyParam);
if (loginModuleName == null) {
LOG.error("Unable to add JAAS configuration for client [{}] as it is missing param [{}]. Skipping JAAS config for [{}]", jaasClientName, keyParam, jaasClientName);
return;
}
keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM;
String controlFlag = jaasConfig.getProperty(keyParam);
if(StringUtils.isEmpty(controlFlag)) {
String validValues = JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS;
controlFlag = JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG;
LOG.warn("Unknown JAAS configuration value for ({}) = [{}], valid value are [{}] using the default value, REQUIRED", keyParam, controlFlag, validValues);
}
String optionPrefix = keyPrefix + JAAS_CONFIG_LOGIN_OPTIONS_PREFIX + ".";
String principalOptionKey = optionPrefix + JAAS_PRINCIPAL_PROP;
int optionPrefixLen = optionPrefix.length();
StringBuffer optionStringBuffer = new StringBuffer();
for (String key : jaasConfig.stringPropertyNames()) {
if (key.startsWith(optionPrefix)) {
String optionVal = jaasConfig.getProperty(key);
if (optionVal != null) {
optionVal = optionVal.trim();
try {
if (key.equalsIgnoreCase(principalOptionKey)) {
optionVal = org.apache.hadoop.security.SecurityUtil.getServerPrincipal(optionVal, (String) null);
}
} catch (IOException e) {
LOG.warn("Failed to build serverPrincipal. Using provided value:[{}]", optionVal);
}
optionVal = surroundWithQuotes(optionVal);
optionStringBuffer.append(String.format(" %s=%s", key.substring(optionPrefixLen), optionVal));
}
}
}
String newJaasProperty = String.format("%s %s %s ;", loginModuleName.trim(), controlFlag, optionStringBuffer.toString());
kafkaProperties.put(KAFKA_SASL_JAAS_CONFIG_PROPERTY, newJaasProperty);
}
LOG.debug("<== KafkaNotification.setKafkaJAASProperties()");
}
private static boolean isLoginKeytabBased() {
boolean ret = false;
try {
ret = UserGroupInformation.isLoginKeytabBased();
} catch (Exception excp) {
LOG.warn("Error in determining keytab for KafkaClient-JAAS config", excp);
}
return ret;
}
private static boolean isLoginTicketBased() {
boolean ret = false;
try {
ret = UserGroupInformation.isLoginTicketBased();
} catch (Exception excp) {
LOG.warn("Error in determining ticket-cache for KafkaClient-JAAS config", excp);
}
return ret;
}
private static String surroundWithQuotes(String optionVal) {
if(StringUtils.isEmpty(optionVal)) {
return optionVal;
}
String ret = optionVal;
// For property values which have special chars like "@" or "/", we need to enclose it in
// double quotes, so that Kafka can parse it
// If the property is already enclosed in double quotes, then do nothing.
if(optionVal.indexOf(0) != '"' && optionVal.indexOf(optionVal.length() - 1) != '"') {
// If the string as special characters like except _,-
final String SPECIAL_CHAR_LIST = "/!@#%^&*";
if (StringUtils.containsAny(optionVal, SPECIAL_CHAR_LIST)) {
ret = String.format("\"%s\"", optionVal);
}
}
return ret;
}
}
......@@ -17,14 +17,19 @@
*/
package org.apache.atlas.kafka;
import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
......@@ -38,6 +43,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
......@@ -144,6 +150,132 @@ public class KafkaNotificationMockTest {
}
}
@Test
public void testSetKafkaJAASPropertiesForAllProperValues() {
Properties properties = new Properties();
Configuration configuration = new PropertiesConfiguration();
final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
final String loginModuleControlFlag = "required";
final String optionUseKeyTab = "false";
final String optionStoreKey = "true";
final String optionServiceName = "kafka";
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab);
configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey);
configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
try {
KafkaNotification kafkaNotification = new KafkaNotification(configuration);
kafkaNotification.setKafkaJAASProperties(configuration, properties);
String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property");
assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match");
assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), "storeKey not present in new property or value doesn't match");
assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match");
} catch (AtlasException e) {
fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
}
}
@Test
public void testSetKafkaJAASPropertiesForMissingControlFlag() {
Properties properties = new Properties();
Configuration configuration = new PropertiesConfiguration();
final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
final String loginModuleControlFlag = "required";
final String optionUseKeyTab = "false";
final String optionStoreKey = "true";
final String optionServiceName = "kafka";
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab);
configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey);
configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
try {
KafkaNotification kafkaNotification = new KafkaNotification(configuration);
kafkaNotification.setKafkaJAASProperties(configuration, properties);
String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property");
assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match");
assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), "storeKey not present in new property or value doesn't match");
assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match");
} catch (AtlasException e) {
fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
}
}
@Test
public void testSetKafkaJAASPropertiesForMissingLoginModuleName() {
Properties properties = new Properties();
Configuration configuration = new PropertiesConfiguration();
final String loginModuleControlFlag = "required";
final String optionUseKeyTab = "false";
final String optionStoreKey = "true";
final String optionServiceName = "kafka";
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab);
configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey);
configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
try {
KafkaNotification kafkaNotification = new KafkaNotification(configuration);
kafkaNotification.setKafkaJAASProperties(configuration, properties);
String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
assertNull(newPropertyValue);
} catch (AtlasException e) {
fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
}
}
@Test
public void testSetKafkaJAASPropertiesWithSpecialCharacters() {
Properties properties = new Properties();
Configuration configuration = new PropertiesConfiguration();
final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
final String loginModuleControlFlag = "required";
final String optionKeyTabPath = "/path/to/file.keytab";
final String optionPrincipal = "test/_HOST@EXAMPLE.COM";
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
configuration.setProperty("atlas.jaas.KafkaClient.option.keyTabPath", optionKeyTabPath);
configuration.setProperty("atlas.jaas.KafkaClient.option.principal", optionPrincipal);
try {
KafkaNotification kafkaNotification = new KafkaNotification(configuration);
kafkaNotification.setKafkaJAASProperties(configuration, properties);
String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
String updatedPrincipalValue = org.apache.hadoop.security.SecurityUtil.getServerPrincipal(optionPrincipal, (String) null);
assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property");
assertTrue(newPropertyValue.contains("keyTabPath=\"" + optionKeyTabPath + "\""));
assertTrue(newPropertyValue.contains("principal=\""+ updatedPrincipalValue + "\""));
} catch (AtlasException e) {
fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
} catch (IOException e) {
fail("Failed while getting updated principal value with exception : " + e.getMessage());
}
}
class TestKafkaNotification extends KafkaNotification {
private final AtlasKafkaConsumer consumer1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment