Commit 801aea9a by rmani Committed by Madhan Neethiraj

ATLAS-2586: added import-kafka bridge to import Kafka topic into Atlas

parent f11c1801
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>apache-atlas</artifactId>
<groupId>org.apache.atlas</groupId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../../</relativePath>
</parent>
<artifactId>kafka-bridge</artifactId>
<description>Apache Atlas Kafka Bridge Module</description>
<name>Apache Atlas Kafka Bridge</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-client-v1</artifactId>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-client-v2</artifactId>
</dependency>
<!-- to bring up atlas server for integration tests -->
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-bundle</artifactId>
<version>1.19</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-webapp</artifactId>
<type>war</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.6</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>${zkclient.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${kafka.scala.binary.version}</artifactId>
<version>${kafka.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-webapp</artifactId>
<version>${jetty.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>12.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
<profiles>
<profile>
<id>dist</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-hook</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/dependency/hook/kafka/atlas-kafka-plugin-impl</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>${project.artifactId}</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-client-v1</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-client-common</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-client-v2</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-intg</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-common</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${kafka.scala.binary.version}</artifactId>
<version>${kafka.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</artifactItem>
<artifactItem>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-multipart</artifactId>
<version>${jersey.version}</version>
</artifactItem>
<artifactItem>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>${commons-conf.version}</version>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<build>
<plugins>
<plugin>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-maven-plugin</artifactId>
<configuration>
<skip>${skipTests}</skip>
<!--only skip int tests -->
<httpConnector>
<port>31000</port>
<idleTimeout>60000</idleTimeout>
</httpConnector>
<war>../../webapp/target/atlas-webapp-${project.version}.war</war>
<daemon>true</daemon>
<webApp>
<contextPath>/</contextPath>
<descriptor>${project.basedir}/../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
<extraClasspath>${project.basedir}/../../webapp/target/test-classes/</extraClasspath>
</webApp>
<useTestScope>true</useTestScope>
<systemProperties>
<force>true</force>
<systemProperty>
<name>atlas.home</name>
<value>${project.build.directory}</value>
</systemProperty>
<systemProperty>
<key>atlas.conf</key>
<value>${project.build.directory}/test-classes</value>
</systemProperty>
<systemProperty>
<name>atlas.data</name>
<value>${project.build.directory}/data</value>
</systemProperty>
<systemProperty>
<name>atlas.log.dir</name>
<value>${project.build.directory}/logs</value>
</systemProperty>
<systemProperty>
<name>atlas.log.file</name>
<value>application.log</value>
</systemProperty>
<systemProperty>
<name>log4j.configuration</name>
<value>file:///${project.build.directory}/test-classes/atlas-log4j.xml</value>
</systemProperty>
<systemProperty>
<name>atlas.graphdb.backend</name>
<value>${graphdb.backend.impl}</value>
</systemProperty>
<systemProperty>
<key>embedded.solr.directory</key>
<value>${project.build.directory}</value>
</systemProperty>
</systemProperties>
<stopKey>atlas-stop</stopKey>
<stopPort>31001</stopPort>
<stopWait>${jetty-maven-plugin.stopWait}</stopWait>
</configuration>
<executions>
<execution>
<id>start-jetty</id>
<phase>pre-integration-test</phase>
<goals>
<goal>deploy-war</goal>
</goals>
<configuration>
<daemon>true</daemon>
</configuration>
</execution>
<execution>
<id>stop-jetty</id>
<phase>post-integration-test</phase>
<goals>
<goal>stop</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-site-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.maven.doxia</groupId>
<artifactId>doxia-module-twiki</artifactId>
<version>${doxia.version}</version>
</dependency>
<dependency>
<groupId>org.apache.maven.doxia</groupId>
<artifactId>doxia-core</artifactId>
<version>${doxia.version}</version>
</dependency>
</dependencies>
<executions>
<execution>
<goals>
<goal>site</goal>
</goals>
<phase>prepare-package</phase>
</execution>
</executions>
<configuration>
<generateProjectInfo>false</generateProjectInfo>
<generateReports>false</generateReports>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<inherited>false</inherited>
<executions>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-resources</id>
<phase>validate</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${basedir}/target/models</outputDirectory>
<resources>
<resource>
<directory>${basedir}/../models</directory>
<filtering>true</filtering>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-solr-resources</id>
<phase>validate</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/solr</outputDirectory>
<resources>
<resource>
<directory>${basedir}/../../test-tools/src/main/resources/solr</directory>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
#!/bin/bash
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License. See accompanying LICENSE file.
#
# resolve links - $0 may be a softlink
PRG="${0}"
[[ `uname -s` == *"CYGWIN"* ]] && CYGWIN=true
while [ -h "${PRG}" ]; do
ls=`ls -ld "${PRG}"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG=`dirname "${PRG}"`/"$link"
fi
done
echo ">>>>> $PRG"
BASEDIR=`dirname ${PRG}`
BASEDIR=`cd ${BASEDIR}/..;pwd`
echo ">>>>> $BASEDIR"
allargs=$@
if test -z "${JAVA_HOME}"
then
JAVA_BIN=`which java`
JAR_BIN=`which jar`
else
JAVA_BIN="${JAVA_HOME}/bin/java"
JAR_BIN="${JAVA_HOME}/bin/jar"
fi
export JAVA_BIN
if [ ! -e "${JAVA_BIN}" ] || [ ! -e "${JAR_BIN}" ]; then
echo "$JAVA_BIN and/or $JAR_BIN not found on the system. Please make sure java and jar commands are available."
exit 1
fi
# Construct Atlas classpath using jars from hook/kafka/atlas-kafka-plugin-impl/ directory.
for i in "${BASEDIR}/hook/kafka/atlas-kafka-plugin-impl/"*.jar; do
ATLASCPPATH="${ATLASCPPATH}:$i"
done
ATLAS_CONF_DIR=/etc/atlas/conf
ATLASCPPATH=${ATLASCPPATH}:${ATLAS_CONF_DIR}
# log dir for applications
ATLAS_LOG_DIR="${ATLAS_LOG_DIR:-$BASEDIR/logs}"
export ATLAS_LOG_DIR
LOGFILE="$ATLAS_LOG_DIR/import-kafka.log"
TIME=`date +%Y%m%d%H%M%s`
#Add Kafka conf in classpath
if [ ! -z "$KAFKA_CONF_DIR" ]; then
KAFKA_CONF=$KAFKA_CONF_DIR
elif [ ! -z "$KAFKA_HOME" ]; then
KAFKA_CONF="$KAFKA_HOME/conf"
elif [ -e /etc/kafka/conf ]; then
KAFKA_CONF="/etc/kafka/conf"
else
echo "Could not find a valid KAFKA configuration"
exit 1
fi
echo Using Kafka configuration directory "[$KAFKA_CONF]"
if [ -f "${KAFKA_CONF}/kafka-env.sh" ]; then
. "${KAFKA_CONF}/kafka-env.sh"
fi
if [ -z "$KAFKA_HOME" ]; then
if [ -d "${BASEDIR}/../kafka" ]; then
KAFKA_HOME=${BASEDIR}/../kafka
else
echo "Please set KAFKA_HOME to the root of Kafka installation"
exit 1
fi
fi
KAFKA_CP="${KAFKA_CONF}"
for i in "${KAFKA_HOME}/libs/"*.jar; do
KAFKA_CP="${KAFKA_CP}:$i"
done
#Add hadoop conf in classpath
if [ ! -z "$HADOOP_CLASSPATH" ]; then
HADOOP_CP=$HADOOP_CLASSPATH
elif [ ! -z "$HADOOP_HOME" ]; then
HADOOP_CP=`$HADOOP_HOME/bin/hadoop classpath`
elif [ $(command -v hadoop) ]; then
HADOOP_CP=`hadoop classpath`
#echo $HADOOP_CP
else
echo "Environment variable HADOOP_CLASSPATH or HADOOP_HOME need to be set"
exit 1
fi
CP="${KAFKA_CP}:${ATLASCPPATH}:${HADOOP_CP}"
# If running in cygwin, convert pathnames and classpath to Windows format.
if [ "${CYGWIN}" == "true" ]
then
ATLAS_LOG_DIR=`cygpath -w ${ATLAS_LOG_DIR}`
LOGFILE=`cygpath -w ${LOGFILE}`
KAFKA_CP=`cygpath -w ${KAFKA_CP}`
HADOOP_CP=`cygpath -w ${HADOOP_CP}`
CP=`cygpath -w -p ${CP}`
fi
JAVA_PROPERTIES="$ATLAS_OPTS -Datlas.log.dir=$ATLAS_LOG_DIR -Datlas.log.file=import-kafka.log
-Dlog4j.configuration=atlas-kafka-import-log4j.xml"
shift
while [[ ${1} =~ ^\-D ]]; do
JAVA_PROPERTIES="${JAVA_PROPERTIES} ${1}"
shift
done
echo "Log file for import is $LOGFILE"
"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" org.apache.atlas.kafka.bridge.KafkaBridge $allargs
RETVAL=$?
[ $RETVAL -eq 0 ] && echo Kafka Data Model imported successfully!!!
[ $RETVAL -ne 0 ] && echo Failed to import Kafka Data Model!!!
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.kafka.bridge;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.kafka.model.KafkaDataTypes;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class KafkaBridge {
private static final Logger LOG = LoggerFactory.getLogger(KafkaBridge.class);
private static final int EXIT_CODE_SUCCESS = 0;
private static final int EXIT_CODE_FAILED = 1;
private static final String ATLAS_ENDPOINT = "atlas.rest.address";
private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
private static final String KAFKA_CLUSTER_NAME = "atlas.cluster.name";
private static final String DEFAULT_CLUSTER_NAME = "primary";
private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
private static final String DESCRIPTION_ATTR = "description";
private static final String PARTITION_COUNT = "partitionCount";
private static final String NAME = "name";
private static final String URI = "uri";
private static final String CLUSTERNAME = "clusterName";
private static final String TOPIC = "topic";
private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME = "%s@%s";
private static final String ZOOKEEPER_CONNECT = "atlas.kafka.zookeeper.connect";
private static final String ZOOKEEPER_CONNECTION_TIMEOUT_MS = "atlas.kafka.zookeeper.connection.timeout.ms";
private static final String ZOOKEEPER_SESSION_TIMEOUT_MS = "atlas.kafka.zookeeper.session.timeout.ms";
private static final String DEFAULT_ZOOKEEPER_CONNECT = "localhost:2181";
private static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MS = 10 * 1000;
private static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS = 10 * 1000;
private final List<String> availableTopics;
private final String clusterName;
private final AtlasClientV2 atlasClientV2;
private final ZkUtils zkUtils;
public static void main(String[] args) {
int exitCode = EXIT_CODE_FAILED;
try {
Options options = new Options();
options.addOption("t","topic", true, "topic");
options.addOption("f", "filename", true, "filename");
CommandLineParser parser = new BasicParser();
CommandLine cmd = parser.parse(options, args);
String topicToImport = cmd.getOptionValue("t");
String fileToImport = cmd.getOptionValue("f");
Configuration atlasConf = ApplicationProperties.get();
String[] urls = atlasConf.getStringArray(ATLAS_ENDPOINT);
if (urls == null || urls.length == 0) {
urls = new String[] { DEFAULT_ATLAS_URL };
}
final AtlasClientV2 atlasClientV2;
if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput();
atlasClientV2 = new AtlasClientV2(urls, basicAuthUsernamePassword);
} else {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), urls);
}
KafkaBridge importer = new KafkaBridge(atlasConf, atlasClientV2);
if (StringUtils.isNotEmpty(fileToImport)) {
File f = new File(fileToImport);
if (f.exists() && f.canRead()) {
BufferedReader br = new BufferedReader(new FileReader(f));
String line = null;
while((line = br.readLine()) != null) {
topicToImport = line.trim();
importer.importTopic(topicToImport);
}
exitCode = EXIT_CODE_SUCCESS;
} else {
LOG.error("Failed to read the file");
}
} else {
importer.importTopic(topicToImport);
exitCode = EXIT_CODE_SUCCESS;
}
} catch(ParseException e) {
LOG.error("Failed to parse arguments. Error: ", e.getMessage());
printUsage();
} catch(Exception e) {
System.out.println("ImportKafkaEntities failed. Please check the log file for the detailed error message");
e.printStackTrace();
LOG.error("ImportKafkaEntities failed", e);
}
System.exit(exitCode);
}
public KafkaBridge(Configuration atlasConf, AtlasClientV2 atlasClientV2) throws Exception {
String zookeeperConnect = getZKConnection(atlasConf);
int sessionTimeOutMs = atlasConf.getInt(ZOOKEEPER_SESSION_TIMEOUT_MS, DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MS) ;
int connectionTimeOutMs = atlasConf.getInt(ZOOKEEPER_CONNECTION_TIMEOUT_MS, DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS);
ZkClient zkClient = new ZkClient(zookeeperConnect, sessionTimeOutMs, connectionTimeOutMs, ZKStringSerializer$.MODULE$);
this.atlasClientV2 = atlasClientV2;
this.clusterName = atlasConf.getString(KAFKA_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
this.zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), JaasUtils.isZkSecurityEnabled());
this.availableTopics = scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics());
}
public void importTopic(String topicToImport) throws Exception {
List<String> topics = availableTopics;
if (StringUtils.isNotEmpty(topicToImport)) {
List<String> topics_subset = new ArrayList<>();
for(String topic : topics) {
if (topic.startsWith(topicToImport)) {
topics_subset.add(topic);
}
}
topics = topics_subset;
}
if (CollectionUtils.isNotEmpty(topics)) {
for(String topic : topics) {
createOrUpdateTopic(topic);
}
}
}
protected AtlasEntityWithExtInfo createOrUpdateTopic(String topic) throws Exception {
String topicQualifiedName = getTopicQualifiedName(clusterName, topic);
AtlasEntityWithExtInfo topicEntity = findTopicEntityInAtlas(topicQualifiedName);
if (topicEntity == null) {
System.out.println("Adding Kafka topic " + topic);
LOG.info("Importing Kafka topic: {}", topicQualifiedName);
AtlasEntity entity = getTopicEntity(topic, null);
topicEntity = createEntityInAtlas(new AtlasEntityWithExtInfo(entity));
} else {
System.out.println("Updating Kafka topic " + topic);
LOG.info("Kafka topic {} already exists in Atlas. Updating it..", topicQualifiedName);
AtlasEntity entity = getTopicEntity(topic, topicEntity.getEntity());
topicEntity.setEntity(entity);
topicEntity = updateEntityInAtlas(topicEntity);
}
return topicEntity;
}
protected AtlasEntity getTopicEntity(String topic, AtlasEntity topicEntity) {
final AtlasEntity ret;
if (topicEntity == null) {
ret = new AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName());
} else {
ret = topicEntity;
}
String qualifiedName = getTopicQualifiedName(clusterName, topic);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
ret.setAttribute(CLUSTERNAME, clusterName);
ret.setAttribute(TOPIC, topic);
ret.setAttribute(NAME,topic);
ret.setAttribute(DESCRIPTION_ATTR, topic);
ret.setAttribute(URI, topic);
ret.setAttribute(PARTITION_COUNT, (Integer) zkUtils.getTopicPartitionCount(topic).get());
return ret;
}
protected static String getTopicQualifiedName(String clusterName, String topic) {
return String.format(FORMAT_KAKFA_TOPIC_QUALIFIED_NAME, topic.toLowerCase(), clusterName);
}
private AtlasEntityWithExtInfo findTopicEntityInAtlas(String topicQualifiedName) {
AtlasEntityWithExtInfo ret = null;
try {
ret = findEntityInAtlas(KafkaDataTypes.KAFKA_TOPIC.getName(), topicQualifiedName);
clearRelationshipAttributes(ret);
} catch (Exception e) {
ret = null; // entity doesn't exist in Atlas
}
return ret;
}
private AtlasEntityWithExtInfo findEntityInAtlas(String typeName, String qualifiedName) throws Exception {
Map<String, String> attributes = Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
return atlasClientV2.getEntityByAttribute(typeName, attributes);
}
private AtlasEntityWithExtInfo createEntityInAtlas(AtlasEntityWithExtInfo entity) throws Exception {
AtlasEntityWithExtInfo ret = null;
EntityMutationResponse response = atlasClientV2.createEntity(entity);
List<AtlasEntityHeader> entities = response.getCreatedEntities();
if (CollectionUtils.isNotEmpty(entities)) {
AtlasEntityWithExtInfo getByGuidResponse = atlasClientV2.getEntityByGuid(entities.get(0).getGuid());
ret = getByGuidResponse;
LOG.info("Created {} entity: name={}, guid={}", ret.getEntity().getTypeName(), ret.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), ret.getEntity().getGuid());
}
return ret;
}
private AtlasEntityWithExtInfo updateEntityInAtlas(AtlasEntityWithExtInfo entity) throws Exception {
AtlasEntityWithExtInfo ret = null;
EntityMutationResponse response = atlasClientV2.updateEntity(entity);
if (response != null) {
List<AtlasEntityHeader> entities = response.getUpdatedEntities();
if (CollectionUtils.isNotEmpty(entities)) {
AtlasEntityWithExtInfo getByGuidResponse = atlasClientV2.getEntityByGuid(entities.get(0).getGuid());
ret = getByGuidResponse;
LOG.info("Updated {} entity: name={}, guid={} ", ret.getEntity().getTypeName(), ret.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), ret.getEntity().getGuid());
} else {
LOG.info("Entity: name={} ", entity.toString() + " not updated as it is unchanged from what is in Atlas" );
ret = entity;
}
} else {
LOG.info("Entity: name={} ", entity.toString() + " not updated as it is unchanged from what is in Atlas" );
ret = entity;
}
return ret;
}
private static void printUsage(){
System.out.println("Usage 1: import-kafka.sh");
System.out.println("Usage 2: import-kafka.sh [-n <topic regex> OR --topic <topic regex >]");
System.out.println("Usage 3: import-kafka.sh [-f <filename>]" );
System.out.println(" Format:");
System.out.println(" topic1");
System.out.println(" topic2");
System.out.println(" topic3");
}
private void clearRelationshipAttributes(AtlasEntityWithExtInfo entity) {
if (entity != null) {
clearRelationshipAttributes(entity.getEntity());
if (entity.getReferredEntities() != null) {
clearRelationshipAttributes(entity.getReferredEntities().values());
}
}
}
private void clearRelationshipAttributes(Collection<AtlasEntity> entities) {
if (entities != null) {
for (AtlasEntity entity : entities) {
clearRelationshipAttributes(entity);
}
}
}
private void clearRelationshipAttributes(AtlasEntity entity) {
if (entity != null && entity.getRelationshipAttributes() != null) {
entity.getRelationshipAttributes().clear();
}
}
private String getStringValue(String[] vals) {
String ret = null;
for(String val:vals) {
ret = (ret == null) ? val : ret + "," + val;
}
return ret;
}
private String getZKConnection(Configuration atlasConf) {
String ret = null;
ret = getStringValue(atlasConf.getStringArray(ZOOKEEPER_CONNECT));
if (StringUtils.isEmpty(ret) ) {
ret = DEFAULT_ZOOKEEPER_CONNECT;
}
return ret;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.kafka.model;
/**
* HBASE Data Types for model and bridge.
*/
public enum KafkaDataTypes {
// Classes
KAFKA_TOPIC;
public String getName() {
return name().toLowerCase();
}
}
<?xml version="1.0" encoding="UTF-8" ?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="console" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/>
</layout>
</appender>
<appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
<param name="File" value="${atlas.log.dir}/${atlas.log.file}"/>
<param name="Append" value="true"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/>
</layout>
</appender>
<logger name="org.apache.atlas" additivity="false">
<level value="info"/>
<appender-ref ref="FILE"/>
</logger>
<!-- to avoid logs - The configuration log.flush.interval.messages = 1 was supplied but isn't a known config -->
<logger name="org.apache.kafka.common.config.AbstractConfig" additivity="false">
<level value="error"/>
<appender-ref ref="FILE"/>
</logger>
<root>
<priority value="info"/>
<appender-ref ref="FILE"/>
</root>
</log4j:configuration>
{
"patches": [
{
"action": "ADD_ATTRIBUTE",
"typeName": "kafka_topic",
"applyToVersion": "1.0",
"updateToVersion": "1.1",
"params": null,
"attributeDefs": [
{
"name": "partitionCount",
"typeName": "int",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": true,
"isUnique": false
}
]
}
]
}
...@@ -116,6 +116,7 @@ atlas.graph.index.search.solr.wait-searcher=true ...@@ -116,6 +116,7 @@ atlas.graph.index.search.solr.wait-searcher=true
<descriptor>src/main/assemblies/atlas-falcon-hook-package.xml</descriptor> <descriptor>src/main/assemblies/atlas-falcon-hook-package.xml</descriptor>
<descriptor>src/main/assemblies/atlas-sqoop-hook-package.xml</descriptor> <descriptor>src/main/assemblies/atlas-sqoop-hook-package.xml</descriptor>
<descriptor>src/main/assemblies/atlas-storm-hook-package.xml</descriptor> <descriptor>src/main/assemblies/atlas-storm-hook-package.xml</descriptor>
<descriptor>src/main/assemblies/atlas-kafka-hook-package.xml</descriptor>
<descriptor>src/main/assemblies/standalone-package.xml</descriptor> <descriptor>src/main/assemblies/standalone-package.xml</descriptor>
<descriptor>src/main/assemblies/src-package.xml</descriptor> <descriptor>src/main/assemblies/src-package.xml</descriptor>
<descriptor>src/main/assemblies/migration-exporter.xml</descriptor> <descriptor>src/main/assemblies/migration-exporter.xml</descriptor>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<assembly xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
<formats>
<format>tar.gz</format>
<format>dir</format>
</formats>
<id>kafka-hook</id>
<baseDirectory>apache-atlas-kafka-hook-${project.version}</baseDirectory>
<fileSets>
<fileSet>
<directory>target/bin</directory>
<outputDirectory>hook-bin</outputDirectory>
<includes>
<include>import-kafka.sh</include>
</includes>
<fileMode>0755</fileMode>
<directoryMode>0755</directoryMode>
</fileSet>
<!-- addons/kafka -->
<fileSet>
<directory>../addons/kafka-bridge/src/bin</directory>
<outputDirectory>hook-bin</outputDirectory>
<fileMode>0755</fileMode>
<directoryMode>0755</directoryMode>
</fileSet>
<!-- addons/hbase -->
<fileSet>
<directory>../addons/kafka-bridge/target/dependency/bridge</directory>
<outputDirectory>bridge</outputDirectory>
</fileSet>
<fileSet>
<directory>../addons/kafka-bridge/target/dependency/hook</directory>
<outputDirectory>hook</outputDirectory>
</fileSet>
</fileSets>
</assembly>
...@@ -176,7 +176,18 @@ ...@@ -176,7 +176,18 @@
<!-- for kafka topic setup --> <!-- for kafka topic setup -->
<fileSet> <fileSet>
<directory>../notification/target/dependency/hook</directory> <directory>../addons/kafka-bridge/src/bin</directory>
<outputDirectory>hook-bin</outputDirectory>
<fileMode>0755</fileMode>
<directoryMode>0755</directoryMode>
</fileSet>
<fileSet>
<directory>../addons/kakfa-bridge/target/dependency/bridge</directory>
<outputDirectory>bridge</outputDirectory>
</fileSet>
<fileSet>
<directory>../addons/kakfa-bridge/target/dependency/hook</directory>
<outputDirectory>hook</outputDirectory> <outputDirectory>hook</outputDirectory>
</fileSet> </fileSet>
</fileSets> </fileSets>
......
...@@ -767,6 +767,7 @@ ...@@ -767,6 +767,7 @@
<module>addons/storm-bridge</module> <module>addons/storm-bridge</module>
<module>addons/hbase-bridge-shim</module> <module>addons/hbase-bridge-shim</module>
<module>addons/hbase-bridge</module> <module>addons/hbase-bridge</module>
<module>addons/kafka-bridge</module>
<module>tools/atlas-migration-exporter</module> <module>tools/atlas-migration-exporter</module>
<module>distro</module> <module>distro</module>
...@@ -1492,6 +1493,11 @@ ...@@ -1492,6 +1493,11 @@
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>kafka-bridge</artifactId>
<version>${project.version}</version>
</dependency>
<!-- API documentation --> <!-- API documentation -->
<dependency> <dependency>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment