Commit d4b15c7a by Jayendra Parab Committed by nixonrodrigues

ATLAS-3864 : Break the dependency between Atlas and Kafka's Zookeeper

parent 34709f22
...@@ -310,7 +310,7 @@ public class HiveMetaStoreBridgeTest { ...@@ -310,7 +310,7 @@ public class HiveMetaStoreBridgeTest {
return table; return table;
} }
private class MatchesReferenceableProperty extends ArgumentMatcher<Object> { private class MatchesReferenceableProperty implements ArgumentMatcher<Object> {
private final String attrName; private final String attrName;
private final Object attrValue; private final Object attrValue;
......
...@@ -18,12 +18,7 @@ ...@@ -18,12 +18,7 @@
package org.apache.atlas.kafka.bridge; package org.apache.atlas.kafka.bridge;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClientV2; import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.kafka.model.KafkaDataTypes; import org.apache.atlas.kafka.model.KafkaDataTypes;
...@@ -32,6 +27,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; ...@@ -32,6 +27,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.utils.AuthenticationUtil; import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.atlas.utils.KafkaUtils;
import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.CommandLineParser;
...@@ -53,6 +49,7 @@ import java.util.Collection; ...@@ -53,6 +49,7 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern; import java.util.regex.Pattern;
public class KafkaBridge { public class KafkaBridge {
...@@ -74,22 +71,17 @@ public class KafkaBridge { ...@@ -74,22 +71,17 @@ public class KafkaBridge {
private static final String TOPIC = "topic"; private static final String TOPIC = "topic";
private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME = "%s@%s"; private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME = "%s@%s";
private static final String ZOOKEEPER_CONNECT = "atlas.kafka.zookeeper.connect";
private static final String ZOOKEEPER_CONNECTION_TIMEOUT_MS = "atlas.kafka.zookeeper.connection.timeout.ms";
private static final String ZOOKEEPER_SESSION_TIMEOUT_MS = "atlas.kafka.zookeeper.session.timeout.ms";
private static final String DEFAULT_ZOOKEEPER_CONNECT = "localhost:2181";
private static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MS = 10 * 1000;
private static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS = 10 * 1000;
private final List<String> availableTopics; private final List<String> availableTopics;
private final String metadataNamespace; private final String metadataNamespace;
private final AtlasClientV2 atlasClientV2; private final AtlasClientV2 atlasClientV2;
private final ZkUtils zkUtils; private final KafkaUtils kafkaUtils;
public static void main(String[] args) { public static void main(String[] args) {
int exitCode = EXIT_CODE_FAILED; int exitCode = EXIT_CODE_FAILED;
AtlasClientV2 atlasClientV2 = null; AtlasClientV2 atlasClientV2 = null;
KafkaBridge importer = null;
try { try {
Options options = new Options(); Options options = new Options();
...@@ -118,7 +110,7 @@ public class KafkaBridge { ...@@ -118,7 +110,7 @@ public class KafkaBridge {
atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), urls); atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), urls);
} }
KafkaBridge importer = new KafkaBridge(atlasConf, atlasClientV2); importer = new KafkaBridge(atlasConf, atlasClientV2);
if (StringUtils.isNotEmpty(fileToImport)) { if (StringUtils.isNotEmpty(fileToImport)) {
File f = new File(fileToImport); File f = new File(fileToImport);
...@@ -153,21 +145,25 @@ public class KafkaBridge { ...@@ -153,21 +145,25 @@ public class KafkaBridge {
if (atlasClientV2 != null) { if (atlasClientV2 != null) {
atlasClientV2.close(); atlasClientV2.close();
} }
if (importer != null) {
importer.close();
}
} }
System.exit(exitCode); System.exit(exitCode);
} }
public KafkaBridge(Configuration atlasConf, AtlasClientV2 atlasClientV2) throws Exception { public KafkaBridge(Configuration atlasConf, AtlasClientV2 atlasClientV2) throws Exception {
String zookeeperConnect = getZKConnection(atlasConf);
int sessionTimeOutMs = atlasConf.getInt(ZOOKEEPER_SESSION_TIMEOUT_MS, DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MS) ;
int connectionTimeOutMs = atlasConf.getInt(ZOOKEEPER_CONNECTION_TIMEOUT_MS, DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS);
ZkClient zkClient = new ZkClient(zookeeperConnect, sessionTimeOutMs, connectionTimeOutMs, ZKStringSerializer$.MODULE$);
this.atlasClientV2 = atlasClientV2; this.atlasClientV2 = atlasClientV2;
this.metadataNamespace = getMetadataNamespace(atlasConf); this.metadataNamespace = getMetadataNamespace(atlasConf);
this.zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), JaasUtils.isZkSecurityEnabled()); this.kafkaUtils = new KafkaUtils(atlasConf);
this.availableTopics = scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()); this.availableTopics = kafkaUtils.listAllTopics();
}
public void close() {
if (this.kafkaUtils != null) {
this.kafkaUtils.close();
}
} }
private String getMetadataNamespace(Configuration config) { private String getMetadataNamespace(Configuration config) {
...@@ -225,7 +221,7 @@ public class KafkaBridge { ...@@ -225,7 +221,7 @@ public class KafkaBridge {
} }
@VisibleForTesting @VisibleForTesting
AtlasEntity getTopicEntity(String topic, AtlasEntity topicEntity) { AtlasEntity getTopicEntity(String topic, AtlasEntity topicEntity) throws Exception {
final AtlasEntity ret; final AtlasEntity ret;
if (topicEntity == null) { if (topicEntity == null) {
...@@ -242,7 +238,12 @@ public class KafkaBridge { ...@@ -242,7 +238,12 @@ public class KafkaBridge {
ret.setAttribute(NAME,topic); ret.setAttribute(NAME,topic);
ret.setAttribute(DESCRIPTION_ATTR, topic); ret.setAttribute(DESCRIPTION_ATTR, topic);
ret.setAttribute(URI, topic); ret.setAttribute(URI, topic);
ret.setAttribute(PARTITION_COUNT, (Integer) zkUtils.getTopicPartitionCount(topic).get()); try {
ret.setAttribute(PARTITION_COUNT, kafkaUtils.getPartitionCount(topic));
} catch (ExecutionException | InterruptedException e) {
LOG.error("Error while getting partition count for topic :" + topic, e);
throw new Exception("Error while getting partition count for topic :" + topic, e);
}
return ret; return ret;
} }
...@@ -351,21 +352,4 @@ public class KafkaBridge { ...@@ -351,21 +352,4 @@ public class KafkaBridge {
entity.getRelationshipAttributes().clear(); entity.getRelationshipAttributes().clear();
} }
} }
private String getStringValue(String[] vals) {
String ret = null;
for(String val:vals) {
ret = (ret == null) ? val : ret + "," + val;
}
return ret;
}
private String getZKConnection(Configuration atlasConf) {
String ret = null;
ret = getStringValue(atlasConf.getStringArray(ZOOKEEPER_CONNECT));
if (StringUtils.isEmpty(ret) ) {
ret = DEFAULT_ZOOKEEPER_CONNECT;
}
return ret;
}
} }
...@@ -81,7 +81,14 @@ ...@@ -81,7 +81,14 @@
<dependency> <dependency>
<groupId>org.mockito</groupId> <groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId> <artifactId>mockito-core</artifactId>
<version>3.5.10</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>3.5.10</version>
</dependency> </dependency>
<dependency> <dependency>
...@@ -115,6 +122,12 @@ ...@@ -115,6 +122,12 @@
<version>1.3.2</version> <version>1.3.2</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
......
...@@ -19,24 +19,19 @@ ...@@ -19,24 +19,19 @@
package org.apache.atlas.hook; package org.apache.atlas.hook;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.utils.AuthenticationUtil; import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.atlas.utils.KafkaUtils;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.IOException; import java.io.IOException;
import java.util.Properties; import java.util.Arrays;
/** /**
* A class to create Kafka topics used by Atlas components. * A class to create Kafka topics used by Atlas components.
...@@ -66,20 +61,13 @@ public class AtlasTopicCreator { ...@@ -66,20 +61,13 @@ public class AtlasTopicCreator {
if (!handleSecurity(atlasProperties)) { if (!handleSecurity(atlasProperties)) {
return; return;
} }
ZkUtils zkUtils = createZkUtils(atlasProperties); try(KafkaUtils kafkaUtils = getKafkaUtils(atlasProperties)) {
for (String topicName : topicNames) { int numPartitions = atlasProperties.getInt("atlas.notification.partitions", 1);
try { int numReplicas = atlasProperties.getInt("atlas.notification.replicas", 1);
LOG.warn("Attempting to create topic {}", topicName); kafkaUtils.createTopics(Arrays.asList(topicNames), numPartitions, numReplicas);
if (!ifTopicExists(topicName, zkUtils)) { } catch (Exception e) {
createTopic(atlasProperties, topicName, zkUtils); LOG.error("Error while creating topics e :" + e.getMessage(), e);
} else {
LOG.warn("Ignoring call to create topic {}, as it already exists.", topicName);
}
} catch (Throwable t) {
LOG.error("Failed while creating topic {}", topicName, t);
}
} }
zkUtils.close();
} else { } else {
LOG.info("Not creating topics {} as {} is false", StringUtils.join(topicNames, ","), LOG.info("Not creating topics {} as {} is false", StringUtils.join(topicNames, ","),
ATLAS_NOTIFICATION_CREATE_TOPICS_KEY); ATLAS_NOTIFICATION_CREATE_TOPICS_KEY);
...@@ -105,28 +93,9 @@ public class AtlasTopicCreator { ...@@ -105,28 +93,9 @@ public class AtlasTopicCreator {
return true; return true;
} }
@VisibleForTesting // This method is added to mock the creation of kafkaUtils object while writing the test cases
protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) { KafkaUtils getKafkaUtils(Configuration configuration) {
return AdminUtils.topicExists(zkUtils, topicName); return new KafkaUtils(configuration);
}
@VisibleForTesting
protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) {
int numPartitions = atlasProperties.getInt("atlas.notification.hook.numthreads", 1);
int numReplicas = atlasProperties.getInt("atlas.notification.replicas", 1);
AdminUtils.createTopic(zkUtils, topicName, numPartitions, numReplicas,
new Properties(), RackAwareMode.Enforced$.MODULE$);
LOG.warn("Created topic {} with partitions {} and replicas {}", topicName, numPartitions, numReplicas);
}
@VisibleForTesting
protected ZkUtils createZkUtils(Configuration atlasProperties) {
String zkConnect = atlasProperties.getString("atlas.kafka.zookeeper.connect");
int sessionTimeout = atlasProperties.getInt("atlas.kafka.zookeeper.session.timeout.ms", 400);
int connectionTimeout = atlasProperties.getInt("atlas.kafka.zookeeper.connection.timeout.ms", 200);
Tuple2<ZkClient, ZkConnection> zkClientAndConnection = ZkUtils.createZkClientAndConnection(
zkConnect, sessionTimeout, connectionTimeout);
return new ZkUtils(zkClientAndConnection._1(), zkClientAndConnection._2(), false);
} }
public static void main(String[] args) throws AtlasException { public static void main(String[] args) throws AtlasException {
......
...@@ -25,10 +25,10 @@ import org.apache.atlas.notification.AbstractNotification; ...@@ -25,10 +25,10 @@ import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.NotificationConsumer; import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException; import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.service.Service; import org.apache.atlas.service.Service;
import org.apache.atlas.utils.KafkaUtils;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter; import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.Producer;
...@@ -42,7 +42,6 @@ import org.springframework.core.annotation.Order; ...@@ -42,7 +42,6 @@ import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.inject.Inject; import javax.inject.Inject;
import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.concurrent.Future; import java.util.concurrent.Future;
...@@ -63,17 +62,6 @@ public class KafkaNotification extends AbstractNotification implements Service { ...@@ -63,17 +62,6 @@ public class KafkaNotification extends AbstractNotification implements Service {
public static final String ATLAS_ENTITIES_TOPIC = AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString(); public static final String ATLAS_ENTITIES_TOPIC = AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString();
protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id"; protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";
static final String KAFKA_SASL_JAAS_CONFIG_PROPERTY = "sasl.jaas.config";
private static final String JAAS_CONFIG_PREFIX_PARAM = "atlas.jaas";
private static final String JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM = "loginModuleName";
private static final String JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM = "loginModuleControlFlag";
private static final String JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG = "required";
private static final String JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS = "optional|requisite|sufficient|required";
private static final String JAAS_CONFIG_LOGIN_OPTIONS_PREFIX = "option";
private static final String JAAS_PRINCIPAL_PROP = "principal";
private static final String JAAS_DEFAULT_CLIENT_NAME = "KafkaClient";
private static final String JAAS_TICKET_BASED_CLIENT_NAME = "ticketBased-KafkaClient";
private static final String[] ATLAS_HOOK_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_HOOK_TOPIC); private static final String[] ATLAS_HOOK_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_HOOK_TOPIC);
private static final String[] ATLAS_ENTITIES_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_ENTITIES_TOPIC); private static final String[] ATLAS_ENTITIES_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_ENTITIES_TOPIC);
...@@ -144,7 +132,7 @@ public class KafkaNotification extends AbstractNotification implements Service { ...@@ -144,7 +132,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
// if no value is specified for max.poll.records, set to 1 // if no value is specified for max.poll.records, set to 1
properties.put("max.poll.records", kafkaConf.getInt("max.poll.records", 1)); properties.put("max.poll.records", kafkaConf.getInt("max.poll.records", 1));
setKafkaJAASProperties(applicationProperties, properties); KafkaUtils.setKafkaJAASProperties(applicationProperties, properties);
LOG.info("<== KafkaNotification()"); LOG.info("<== KafkaNotification()");
} }
...@@ -414,127 +402,4 @@ public class KafkaNotification extends AbstractNotification implements Service { ...@@ -414,127 +402,4 @@ public class KafkaNotification extends AbstractNotification implements Service {
return ret; return ret;
} }
void setKafkaJAASProperties(Configuration configuration, Properties kafkaProperties) {
LOG.debug("==> KafkaNotification.setKafkaJAASProperties()");
if(kafkaProperties.containsKey(KAFKA_SASL_JAAS_CONFIG_PROPERTY)) {
LOG.debug("JAAS config is already set, returning");
return;
}
Properties jaasConfig = ApplicationProperties.getSubsetAsProperties(configuration, JAAS_CONFIG_PREFIX_PARAM);
// JAAS Configuration is present then update set those properties in sasl.jaas.config
if(jaasConfig != null && !jaasConfig.isEmpty()) {
String jaasClientName = JAAS_DEFAULT_CLIENT_NAME;
// Required for backward compatability for Hive CLI
if (!isLoginKeytabBased() && isLoginTicketBased()) {
LOG.debug("Checking if ticketBased-KafkaClient is set");
// if ticketBased-KafkaClient property is not specified then use the default client name
String ticketBasedConfigPrefix = JAAS_CONFIG_PREFIX_PARAM + "." + JAAS_TICKET_BASED_CLIENT_NAME;
Configuration ticketBasedConfig = configuration.subset(ticketBasedConfigPrefix);
if(ticketBasedConfig != null && !ticketBasedConfig.isEmpty()) {
LOG.debug("ticketBased-KafkaClient JAAS configuration is set, using it");
jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME;
} else {
LOG.info("UserGroupInformation.isLoginTicketBased is true, but no JAAS configuration found for client {}. Will use JAAS configuration of client {}", JAAS_TICKET_BASED_CLIENT_NAME, jaasClientName);
}
}
String keyPrefix = jaasClientName + ".";
String keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM;
String loginModuleName = jaasConfig.getProperty(keyParam);
if (loginModuleName == null) {
LOG.error("Unable to add JAAS configuration for client [{}] as it is missing param [{}]. Skipping JAAS config for [{}]", jaasClientName, keyParam, jaasClientName);
return;
}
keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM;
String controlFlag = jaasConfig.getProperty(keyParam);
if(StringUtils.isEmpty(controlFlag)) {
String validValues = JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS;
controlFlag = JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG;
LOG.warn("Unknown JAAS configuration value for ({}) = [{}], valid value are [{}] using the default value, REQUIRED", keyParam, controlFlag, validValues);
}
String optionPrefix = keyPrefix + JAAS_CONFIG_LOGIN_OPTIONS_PREFIX + ".";
String principalOptionKey = optionPrefix + JAAS_PRINCIPAL_PROP;
int optionPrefixLen = optionPrefix.length();
StringBuffer optionStringBuffer = new StringBuffer();
for (String key : jaasConfig.stringPropertyNames()) {
if (key.startsWith(optionPrefix)) {
String optionVal = jaasConfig.getProperty(key);
if (optionVal != null) {
optionVal = optionVal.trim();
try {
if (key.equalsIgnoreCase(principalOptionKey)) {
optionVal = org.apache.hadoop.security.SecurityUtil.getServerPrincipal(optionVal, (String) null);
}
} catch (IOException e) {
LOG.warn("Failed to build serverPrincipal. Using provided value:[{}]", optionVal);
}
optionVal = surroundWithQuotes(optionVal);
optionStringBuffer.append(String.format(" %s=%s", key.substring(optionPrefixLen), optionVal));
}
}
}
String newJaasProperty = String.format("%s %s %s ;", loginModuleName.trim(), controlFlag, optionStringBuffer.toString());
kafkaProperties.put(KAFKA_SASL_JAAS_CONFIG_PROPERTY, newJaasProperty);
}
LOG.debug("<== KafkaNotification.setKafkaJAASProperties()");
}
@VisibleForTesting
boolean isLoginKeytabBased() {
boolean ret = false;
try {
ret = UserGroupInformation.isLoginKeytabBased();
} catch (Exception excp) {
LOG.warn("Error in determining keytab for KafkaClient-JAAS config", excp);
}
return ret;
}
@VisibleForTesting
boolean isLoginTicketBased() {
boolean ret = false;
try {
ret = UserGroupInformation.isLoginTicketBased();
} catch (Exception excp) {
LOG.warn("Error in determining ticket-cache for KafkaClient-JAAS config", excp);
}
return ret;
}
private static String surroundWithQuotes(String optionVal) {
if(StringUtils.isEmpty(optionVal)) {
return optionVal;
}
String ret = optionVal;
// For property values which have special chars like "@" or "/", we need to enclose it in
// double quotes, so that Kafka can parse it
// If the property is already enclosed in double quotes, then do nothing.
if(optionVal.indexOf(0) != '"' && optionVal.indexOf(optionVal.length() - 1) != '"') {
// If the string as special characters like except _,-
final String SPECIAL_CHAR_LIST = "/!@#%^&*";
if (StringUtils.containsAny(optionVal, SPECIAL_CHAR_LIST)) {
ret = String.format("\"%s\"", optionVal);
}
}
return ret;
}
} }
...@@ -55,7 +55,7 @@ public class BulkImportPercentTest { ...@@ -55,7 +55,7 @@ public class BulkImportPercentTest {
percentHolder.add(d.intValue()); percentHolder.add(d.intValue());
return null; return null;
} }
}).when(log).info(anyString(), anyFloat(), anyInt(), anyString()); }).when(log).info(anyString(), anyInt(), anyLong(), anyString());
} }
@Test @Test
......
...@@ -140,7 +140,7 @@ public class NotificationHookConsumerTest { ...@@ -140,7 +140,7 @@ public class NotificationHookConsumerTest {
hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1)); hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1));
verify(consumer).commit(any(TopicPartition.class), anyInt()); verify(consumer).commit(any(TopicPartition.class), anyLong());
} }
@Test @Test
......
...@@ -82,8 +82,7 @@ public class CuratorFactoryTest { ...@@ -82,8 +82,7 @@ public class CuratorFactoryTest {
curatorFactory.enhanceBuilderWithSecurityParameters(zookeeperProperties, builder); curatorFactory.enhanceBuilderWithSecurityParameters(zookeeperProperties, builder);
verify(builder).aclProvider(argThat(new ArgumentMatcher<ACLProvider>() { verify(builder).aclProvider(argThat(new ArgumentMatcher<ACLProvider>() {
@Override @Override
public boolean matches(Object o) { public boolean matches(ACLProvider aclProvider) {
ACLProvider aclProvider = (ACLProvider) o;
ACL acl = aclProvider.getDefaultAcl().get(0); ACL acl = aclProvider.getDefaultAcl().get(0);
return acl.getId().getId().equals("myclient@EXAMPLE.COM") return acl.getId().getId().equals("myclient@EXAMPLE.COM")
&& acl.getId().getScheme().equals("sasl"); && acl.getId().getScheme().equals("sasl");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment