Commit 5ebd7070 by Madhan Neethiraj

ATLAS-2492: updated Storm version to 1.2.0

parent ec00aed1
......@@ -31,7 +31,7 @@
<packaging>jar</packaging>
<properties>
<storm.version>1.0.0</storm.version>
<storm.version>1.2.0</storm.version>
</properties>
<dependencies>
......
......@@ -30,7 +30,7 @@
<packaging>jar</packaging>
<properties>
<storm.version>1.0.0</storm.version>
<storm.version>1.2.0</storm.version>
<hive.version>1.2.1</hive.version>
</properties>
......
......@@ -186,30 +186,48 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
Referenceable dataSetReferenceable;
// todo: need to redo this with a config driven approach
switch (name) {
case "KafkaSpout":
case "KafkaSpout": {
String topicName = config.get("KafkaSpout.kafkaSpoutConfig.translator.topic");
String uri = config.get("KafkaSpout.kafkaSpoutConfig.kafkaProps.bootstrap.servers");
if (StringUtils.isEmpty(topicName)) {
topicName = config.get("KafkaSpout._spoutConfig.topic");
}
if (StringUtils.isEmpty(uri)) {
uri = config.get("KafkaSpout._spoutConfig.hosts.brokerZkStr");
}
dataSetReferenceable = new Referenceable(StormDataTypes.KAFKA_TOPIC.getName());
final String topicName = config.get("KafkaSpout._spoutConfig.topic");
dataSetReferenceable.set("topic", topicName);
dataSetReferenceable.set("uri",
config.get("KafkaSpout._spoutConfig.hosts.brokerZkStr"));
dataSetReferenceable.set("uri", uri);
if (StringUtils.isEmpty(topologyOwner)) {
topologyOwner = ANONYMOUS_OWNER;
}
dataSetReferenceable.set(AtlasClient.OWNER, topologyOwner);
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getKafkaTopicQualifiedName(getClusterName(stormConf), topicName));
dataSetReferenceable.set(AtlasClient.NAME, topicName);
}
break;
case "HBaseBolt":
case "HBaseBolt": {
dataSetReferenceable = new Referenceable(StormDataTypes.HBASE_TABLE.getName());
final String hbaseTableName = config.get("HBaseBolt.tableName");
dataSetReferenceable.set("uri", stormConf.get("hbase.rootdir"));
dataSetReferenceable.set(AtlasClient.NAME, hbaseTableName);
String uri = config.get("hbase.rootdir");
if (StringUtils.isEmpty(uri)) {
uri = hbaseTableName;
}
dataSetReferenceable.set("uri", hbaseTableName);
dataSetReferenceable.set(AtlasClient.NAME, uri);
dataSetReferenceable.set(AtlasClient.OWNER, stormConf.get("storm.kerberos.principal"));
clusterName = extractComponentClusterName(HBaseConfiguration.create(), stormConf);
//TODO - Hbase Namespace is hardcoded to 'default'. need to check how to get this or is it already part of tableName
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT,
hbaseTableName));
}
break;
case "HdfsBolt":
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment