Commit 0a44790e by Hemanth Yamijala

ATLAS-515 Ability to initialize Kafka topics with more than 1 replica (yhemanth)

parent f47cea3f
......@@ -38,17 +38,20 @@ public final class AuthenticationUtil {
public static boolean isKerberosAuthenticationEnabled() {
boolean isKerberosAuthenticationEnabled = false;
try {
Configuration atlasConf = ApplicationProperties.get();
isKerberosAuthenticationEnabled = isKerberosAuthenticationEnabled(ApplicationProperties.get());
} catch (AtlasException e) {
LOG.error("Error while isKerberosAuthenticationEnabled ", e);
}
return isKerberosAuthenticationEnabled;
}
public static boolean isKerberosAuthenticationEnabled(Configuration atlasConf) {
boolean isKerberosAuthenticationEnabled;
if ("true".equalsIgnoreCase(atlasConf.getString("atlas.authentication.method.kerberos"))) {
isKerberosAuthenticationEnabled = true;
} else {
isKerberosAuthenticationEnabled = false;
}
} catch (AtlasException e) {
LOG.error("Error while isKerberosAuthenticationEnabled ", e);
}
return isKerberosAuthenticationEnabled;
}
......
......@@ -40,6 +40,16 @@ def get_atlas_classpath(confdir):
atlas_classpath = mc.convertCygwinPath(atlas_classpath, True)
return atlas_classpath
def get_atlas_hook_classpath(confdir):
atlas_home = mc.atlasDir()
kafka_topic_setup_dir = mc.kafkaTopicSetupDir(atlas_home)
p = os.pathsep
atlas_hook_classpath = confdir + p \
+ os.path.join(kafka_topic_setup_dir, "*")
if mc.isCygwin():
atlas_hook_classpath = mc.convertCygwinPath(atlas_hook_classpath, True)
return atlas_hook_classpath
def setup_jvm_opts_list(confdir, log_name):
atlas_home = mc.atlasDir()
mc.executeEnvSh(confdir)
......
......@@ -64,6 +64,7 @@ HBASE_STORAGE_LOCAL_CONF_ENTRY="atlas.graph.storage.hostname\s*=\s*localhost"
SOLR_INDEX_CONF_ENTRY="atlas.graph.index.search.backend\s*=\s*solr5"
SOLR_INDEX_LOCAL_CONF_ENTRY="atlas.graph.index.search.solr.zookeeper-url\s*=\s*localhost"
SOLR_INDEX_ZK_URL="atlas.graph.index.search.solr.zookeeper-url"
TOPICS_TO_CREATE="atlas.notification.topics"
DEBUG = False
......@@ -121,6 +122,9 @@ def webAppDir(dir):
webapp = os.path.join(dir, WEBAPP)
return os.environ.get(ATLAS_WEBAPP, webapp)
def kafkaTopicSetupDir(homeDir):
return os.path.join(homeDir, "hook", "kafka-topic-setup")
def expandWebApp(dir):
webappDir = webAppDir(dir)
webAppMetadataDir = os.path.join(webappDir, "atlas")
......@@ -429,6 +433,16 @@ def get_solr_zk_url(confdir):
confdir = os.path.join(confdir, CONF_FILE)
return getConfig(confdir, SOLR_INDEX_ZK_URL)
def get_topics_to_create(confdir):
confdir = os.path.join(confdir, CONF_FILE)
topic_list = getConfig(confdir, TOPICS_TO_CREATE)
if topic_list is not None:
topics = topic_list.split(",")
else:
topics = ["ATLAS_HOOK", "ATLAS_ENTITIES"]
return topics
def run_solr(dir, action, zk_url = None, port = None, logdir = None, wait=True):
solrScript = "solr"
......
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import atlas_client_cmdline as cmdline
import atlas_config as mc
def main():
conf_dir = cmdline.setup_conf_dir()
jvm_opts_list = cmdline.setup_jvm_opts_list(conf_dir, 'atlas_kafka_setup.log')
atlas_classpath = cmdline.get_atlas_classpath(conf_dir)
topics_array = mc.get_topics_to_create(conf_dir)
process = mc.java("org.apache.atlas.hook.AtlasTopicCreator", topics_array, atlas_classpath, jvm_opts_list)
return process.wait()
if __name__ == '__main__':
try:
returncode = main()
except Exception as e:
print "Exception in setting up Kafka topics for Atlas: %s" % str(e)
returncode = -1
sys.exit(returncode)
\ No newline at end of file
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import atlas_client_cmdline as cmdline
import atlas_config as mc
def main():
conf_dir = cmdline.setup_conf_dir()
jvm_opts_list = cmdline.setup_jvm_opts_list(conf_dir, 'atlas_kafka_setup_hook.log')
atlas_classpath = cmdline.get_atlas_hook_classpath(conf_dir)
topics_array = mc.get_topics_to_create(conf_dir)
process = mc.java("org.apache.atlas.hook.AtlasTopicCreator", topics_array, atlas_classpath, jvm_opts_list)
return process.wait()
if __name__ == '__main__':
try:
returncode = main()
except Exception as e:
print "Exception in setting up Kafka topics for Atlas: %s" % str(e)
returncode = -1
sys.exit(returncode)
\ No newline at end of file
......@@ -56,12 +56,18 @@ atlas.kafka.data=${sys:atlas.home}/data/kafka
atlas.kafka.zookeeper.connect=localhost:9026
atlas.kafka.bootstrap.servers=localhost:9027
atlas.kafka.zookeeper.session.timeout.ms=400
atlas.kafka.zookeeper.connection.timeout.ms=200
atlas.kafka.zookeeper.sync.time.ms=20
atlas.kafka.auto.commit.interval.ms=1000
atlas.kafka.auto.offset.reset=smallest
atlas.kafka.hook.group.id=atlas
atlas.kafka.auto.commit.enable=false
atlas.notification.create.topics=true
atlas.notification.replicas=1
atlas.notification.topics=ATLAS_HOOK,ATLAS_ENTITIES
# Enable for Kerberized Kafka clusters
#atlas.notification.kafka.service.principal=kafka/_HOST@EXAMPLE.COM
#atlas.notification.kafka.keytab.location=/etc/security/keytabs/kafka.service.keytab
######### Hive Lineage Configs #########
## Schema
......
......@@ -55,6 +55,18 @@
</fileSet>
<fileSet>
<directory>target/bin</directory>
<outputDirectory>hook-bin</outputDirectory>
<includes>
<include>atlas_client_cmdline.py</include>
<include>atlas_config.py</include>
<include>atlas_kafka_setup_hook.py</include>
</includes>
<fileMode>0755</fileMode>
<directoryMode>0755</directoryMode>
</fileSet>
<fileSet>
<directory>target/hbase</directory>
<outputDirectory>hbase</outputDirectory>
<fileMode>0755</fileMode>
......@@ -156,6 +168,12 @@
<directory>../addons/storm-bridge/target/models</directory>
<outputDirectory>models</outputDirectory>
</fileSet>
<!-- for kafka topic setup -->
<fileSet>
<directory>../notification/target/dependency/hook</directory>
<outputDirectory>hook</outputDirectory>
</fileSet>
</fileSets>
<files>
......
......@@ -154,6 +154,23 @@ Note that Kafka group ids are specified for a specific topic. The Kafka group i
atlas.kafka.entities.group.id=<consumer id>
</verbatim>
These configuration parameters are useful for setting up Kafka topics via Atlas provided scripts, described in the
[[InstallationSteps][Installation Steps]] page.
<verbatim>
# Whether to create the topics automatically, default is true.
# Comma separated list of topics to be created, default is "ATLAS_HOOK,ATLAS_ENTITES"
atlas.notification.topics=ATLAS_HOOK,ATLAS_ENTITIES
# Number of replicas for the Atlas topics, default is 1. Increase for higher resilience to Kafka failures.
atlas.notification.replicas=1
# Enable the below two properties if Kafka is running in Kerberized mode.
# Set this to the service principal representing the Kafka service
atlas.notification.kafka.service.principal=kafka/_HOST@EXAMPLE.COM
# Set this to the location of the keytab file for Kafka
#atlas.notification.kafka.keytab.location=/etc/security/keytabs/kafka.service.keytab
</verbatim>
---++ Client Configs
<verbatim>
......
......@@ -262,6 +262,16 @@ Pre-requisites for running Solr in cloud mode
* !SolrCloud has support for replication and sharding. It is highly recommended to use !SolrCloud with at least two Solr nodes running on different servers with replication enabled.
If using !SolrCloud, then you also need !ZooKeeper installed and configured with 3 or 5 !ZooKeeper nodes
*Configuring Kafka Topics*
Atlas uses Kafka to ingest metadata from other components at runtime. This is described in the [[Architecture][Architecture page]]
in more detail. Depending on the configuration of Kafka, sometimes you might need to setup the topics explicitly before
using Atlas. To do so, Atlas provides a script =bin/atlas_kafka_setup.py= which can be run from the Atlas server. In some
environments, the hooks might start getting used first before Atlas server itself is setup. In such cases, the topics
can be run on the hosts where hooks are installed using a similar script =hook-bin/atlas_kafka_setup_hook.py=. Both these
use configuration in =atlas-application.properties= for setting up the topics. Please refer to the [[Configuration][Configuration page]]
for these details.
---++++ Setting up Atlas
There are a few steps that setup dependencies of Atlas. One such example is setting up the Titan schema
......
......@@ -90,5 +90,139 @@
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>${zkclient.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-hook-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/dependency/hook/kafka-topic-setup</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>${project.artifactId}</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-common</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>${commons-logging.version}</version>
</artifactItem>
<artifactItem>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>${commons-conf.version}</version>
</artifactItem>
<artifactItem>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>${commons-collections.version}</version>
</artifactItem>
<artifactItem>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>${commons-lang.version}</version>
</artifactItem>
<artifactItem>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</artifactItem>
<artifactItem>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</artifactItem>
<artifactItem>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>${kafka.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.scala-lang</groupId>
<artifactId>scalap</artifactId>
<version>${scala.version}</version>
</artifactItem>
<artifactItem>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>${zkclient.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hook;
import com.google.common.annotations.VisibleForTesting;
import kafka.admin.AdminUtils;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.IOException;
import java.util.Properties;
/**
* A class to create Kafka topics used by Atlas components.
*
* Use this class to create a Kafka topic with specific configuration like number of partitions, replicas, etc.
*/
public class AtlasTopicCreator {
private static final Logger LOG = LoggerFactory.getLogger(AtlasTopicCreator.class);
public static final String ATLAS_NOTIFICATION_CREATE_TOPICS_KEY = "atlas.notification.create.topics";
/**
* Create an Atlas topic.
*
* The topic will get created based on following conditions:
* {@link #ATLAS_NOTIFICATION_CREATE_TOPICS_KEY} is set to true.
* The topic does not already exist.
* Note that despite this, there could be multiple topic creation calls that happen in parallel because hooks
* run in a distributed fashion. Exceptions are caught and logged by this method to prevent the startup of
* the hooks from failing.
* @param atlasProperties {@link Configuration} containing properties to be used for creating topics.
* @param topicNames list of topics to create
*/
public void createAtlasTopic(Configuration atlasProperties, String... topicNames) {
if (atlasProperties.getBoolean(ATLAS_NOTIFICATION_CREATE_TOPICS_KEY, true)) {
if (!handleSecurity(atlasProperties)) {
return;
}
ZkUtils zkUtils = createZkUtils(atlasProperties);
for (String topicName : topicNames) {
try {
LOG.warn("Attempting to create topic {}", topicName);
if (!ifTopicExists(topicName, zkUtils)) {
createTopic(atlasProperties, topicName, zkUtils);
} else {
LOG.warn("Ignoring call to create topic {}, as it already exists.", topicName);
}
} catch (Throwable t) {
LOG.error("Failed while creating topic {}", topicName, t);
}
}
zkUtils.close();
} else {
LOG.info("Not creating topics {} as {} is false", StringUtils.join(topicNames, ","),
ATLAS_NOTIFICATION_CREATE_TOPICS_KEY);
}
}
@VisibleForTesting
protected boolean handleSecurity(Configuration atlasProperties) {
if (AuthenticationUtil.isKerberosAuthenticationEnabled(atlasProperties)) {
String kafkaPrincipal = atlasProperties.getString("atlas.notification.kafka.service.principal");
String kafkaKeyTab = atlasProperties.getString("atlas.notification.kafka.keytab.location");
org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, hadoopConf);
try {
String serverPrincipal = SecurityUtil.getServerPrincipal(kafkaPrincipal, (String) null);
UserGroupInformation.setConfiguration(hadoopConf);
UserGroupInformation.loginUserFromKeytab(serverPrincipal, kafkaKeyTab);
} catch (IOException e) {
LOG.warn("Could not login as {} from keytab file {}", kafkaPrincipal, kafkaKeyTab, e);
return false;
}
}
return true;
}
@VisibleForTesting
protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) {
return AdminUtils.topicExists(zkUtils, topicName);
}
@VisibleForTesting
protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) {
int numPartitions = atlasProperties.getInt("atlas.notification.hook.numthreads", 1);
int numReplicas = atlasProperties.getInt("atlas.notification.replicas", 1);
AdminUtils.createTopic(zkUtils, topicName, numPartitions, numReplicas,
new Properties());
LOG.warn("Created topic {} with partitions {} and replicas {}", topicName, numPartitions, numReplicas);
}
@VisibleForTesting
protected ZkUtils createZkUtils(Configuration atlasProperties) {
String zkConnect = atlasProperties.getString("atlas.kafka.zookeeper.connect");
int sessionTimeout = atlasProperties.getInt("atlas.kafka.zookeeper.session.timeout.ms", 400);
int connectionTimeout = atlasProperties.getInt("atlas.kafka.zookeeper.connection.timeout.ms", 200);
Tuple2<ZkClient, ZkConnection> zkClientAndConnection = ZkUtils.createZkClientAndConnection(
zkConnect, sessionTimeout, connectionTimeout);
return new ZkUtils(zkClientAndConnection._1(), zkClientAndConnection._2(), false);
}
public static void main(String[] args) throws AtlasException {
Configuration configuration = ApplicationProperties.get();
AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator();
atlasTopicCreator.createAtlasTopic(configuration, args);
}
}
......@@ -392,9 +392,11 @@
<commons-conf.version>1.10</commons-conf.version>
<commons-collections.version>3.2.2</commons-collections.version>
<commons-logging.version>1.1.3</commons-logging.version>
<commons-lang.version>2.6</commons-lang.version>
<javax-inject.version>1</javax-inject.version>
<jettison.version>1.3.7</jettison.version>
<paranamer.version>2.3</paranamer.version>
<zkclient.version>0.8</zkclient.version>
<PermGen>64m</PermGen>
<MaxPermGen>512m</MaxPermGen>
......@@ -1266,6 +1268,12 @@
<version>3.4</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>${commons-lang.version}</version>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
......
......@@ -22,6 +22,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
ATLAS-515 Ability to initialize Kafka topics with more than 1 replica (yhemanth)
ATLAS-891 UI changes to implement Update term (Kalyanikashikar via yhemanth)
ATLAS-794 Business Catalog Update (jspeidel via yhemanth)
ATLAS-837 Enhance Sqoop addon to handle export operation (venkatnrangan via shwethags)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment