Commit 7a8ca51f by Jayendra Parab Committed by Madhan Neethiraj

ATLAS-3779: fallback to KafkaClient jaas configiration when…

ATLAS-3779: fallback to KafkaClient jaas configiration when ticket-basedKafkaClient is not specified Signed-off-by: 's avatarMadhan Neethiraj <madhan@apache.org>
parent ac0cd87a
......@@ -29,8 +29,6 @@ import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
......@@ -431,12 +429,22 @@ public class KafkaNotification extends AbstractNotification implements Service {
// Required for backward compatability for Hive CLI
if (!isLoginKeytabBased() && isLoginTicketBased()) {
LOG.debug("Using ticketBased-KafkaClient JAAS configuration");
jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME;
LOG.debug("Checking if ticketBased-KafkaClient is set");
// if ticketBased-KafkaClient property is not specified then use the default client name
String ticketBasedConfigPrefix = JAAS_CONFIG_PREFIX_PARAM + "." + JAAS_TICKET_BASED_CLIENT_NAME;
Configuration ticketBasedConfig = configuration.subset(ticketBasedConfigPrefix);
if(ticketBasedConfig != null && !ticketBasedConfig.isEmpty()) {
LOG.debug("ticketBased-KafkaClient JAAS configuration is set, using it");
jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME;
} else {
LOG.info("UserGroupInformation.isLoginTicketBased is true, but no JAAS configuration found for client {}. Will use JAAS configuration of client {}", JAAS_TICKET_BASED_CLIENT_NAME, jaasClientName);
}
}
String keyPrefix = jaasClientName + ".";
String keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM;
String keyPrefix = jaasClientName + ".";
String keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM;
String loginModuleName = jaasConfig.getProperty(keyParam);
if (loginModuleName == null) {
......@@ -483,7 +491,8 @@ public class KafkaNotification extends AbstractNotification implements Service {
LOG.debug("<== KafkaNotification.setKafkaJAASProperties()");
}
private static boolean isLoginKeytabBased() {
@VisibleForTesting
boolean isLoginKeytabBased() {
boolean ret = false;
try {
......@@ -495,7 +504,8 @@ public class KafkaNotification extends AbstractNotification implements Service {
return ret;
}
private static boolean isLoginTicketBased() {
@VisibleForTesting
boolean isLoginTicketBased() {
boolean ret = false;
try {
......
......@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.mockito.Mockito;
import org.testng.annotations.Test;
import java.io.IOException;
......@@ -276,6 +277,76 @@ public class KafkaNotificationMockTest {
}
@Test
public void testSetKafkaJAASPropertiesForTicketBasedLoginConfig() {
Properties properties = new Properties();
Configuration configuration = new PropertiesConfiguration();
final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
final String loginModuleControlFlag = "required";
final String optionUseKeyTab = "false";
final String optionStoreKey = "true";
final String optionServiceName = "kafka";
configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.loginModuleName",loginModuleName);
configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.useKeyTab", optionUseKeyTab);
configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.storeKey", optionStoreKey);
configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.serviceName",optionServiceName);
try {
KafkaNotification kafkaNotification = new KafkaNotification(configuration);
KafkaNotification spyKafkaNotification = Mockito.spy(kafkaNotification);
when(spyKafkaNotification.isLoginKeytabBased()).thenReturn(false);
when(spyKafkaNotification.isLoginTicketBased()).thenReturn(true);
spyKafkaNotification.setKafkaJAASProperties(configuration, properties);
String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property");
assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match");
assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), "storeKey not present in new property or value doesn't match");
assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match");
} catch (AtlasException e) {
fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
}
}
@Test
public void testSetKafkaJAASPropertiesForTicketBasedLoginFallback() {
Properties properties = new Properties();
Configuration configuration = new PropertiesConfiguration();
final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
final String loginModuleControlFlag = "required";
final String optionUseKeyTab = "false";
final String optionStoreKey = "true";
final String optionServiceName = "kafka";
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab);
configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey);
configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
try {
KafkaNotification kafkaNotification = new KafkaNotification(configuration);
KafkaNotification spyKafkaNotification = Mockito.spy(kafkaNotification);
when(spyKafkaNotification.isLoginKeytabBased()).thenReturn(false);
when(spyKafkaNotification.isLoginTicketBased()).thenReturn(true);
spyKafkaNotification.setKafkaJAASProperties(configuration, properties);
String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property");
assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match");
assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), "storeKey not present in new property or value doesn't match");
assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match");
} catch (AtlasException e) {
fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
}
}
class TestKafkaNotification extends KafkaNotification {
private final AtlasKafkaConsumer consumer1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment