Commit fe1c7a3b by rdsolani Committed by Madhan Neethiraj

ATLAS-2592 Storm atlas hook fails with NPE

parent 57c7e85e
...@@ -199,13 +199,17 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -199,13 +199,17 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
clusterName = getClusterName(stormConf); clusterName = getClusterName(stormConf);
ret = new AtlasEntity(StormDataTypes.KAFKA_TOPIC.getName()); if (topicName == null) {
LOG.error("Kafka topic name not found");
} else {
ret = new AtlasEntity(StormDataTypes.KAFKA_TOPIC.getName());
ret.setAttribute("topic", topicName); ret.setAttribute("topic", topicName);
ret.setAttribute("uri", uri); ret.setAttribute("uri", uri);
ret.setAttribute(AtlasClient.OWNER, topologyOwner); ret.setAttribute(AtlasClient.OWNER, topologyOwner);
ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getKafkaTopicQualifiedName(clusterName, topicName)); ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getKafkaTopicQualifiedName(clusterName, topicName));
ret.setAttribute(AtlasClient.NAME, topicName); ret.setAttribute(AtlasClient.NAME, topicName);
}
} }
break; break;
...@@ -219,13 +223,17 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -219,13 +223,17 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
clusterName = extractComponentClusterName(HBaseConfiguration.create(), stormConf); clusterName = extractComponentClusterName(HBaseConfiguration.create(), stormConf);
ret = new AtlasEntity(StormDataTypes.HBASE_TABLE.getName()); if (hbaseTableName == null) {
LOG.error("HBase table name not found");
} else {
ret = new AtlasEntity(StormDataTypes.HBASE_TABLE.getName());
ret.setAttribute("uri", hbaseTableName); ret.setAttribute("uri", hbaseTableName);
ret.setAttribute(AtlasClient.NAME, uri); ret.setAttribute(AtlasClient.NAME, uri);
ret.setAttribute(AtlasClient.OWNER, stormConf.get("storm.kerberos.principal")); ret.setAttribute(AtlasClient.OWNER, stormConf.get("storm.kerberos.principal"));
//TODO - Hbase Namespace is hardcoded to 'default'. need to check how to get this or is it already part of tableName //TODO - Hbase Namespace is hardcoded to 'default'. need to check how to get this or is it already part of tableName
ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT, hbaseTableName)); ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT, hbaseTableName));
}
} }
break; break;
...@@ -259,24 +267,27 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -259,24 +267,27 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
case "HiveBolt": { case "HiveBolt": {
clusterName = extractComponentClusterName(new HiveConf(), stormConf); clusterName = extractComponentClusterName(new HiveConf(), stormConf);
final String dbName = config.get("HiveBolt.options.databaseName"); final String dbName = config.get("HiveBolt.options.databaseName");
final String tblName = config.get("HiveBolt.options.tableName"); final String tblName = config.get("HiveBolt.options.tableName");
final String tblQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tblName);
AtlasEntity dbEntity = new AtlasEntity("hive_db"); if (dbName == null || tblName ==null) {
LOG.error("Hive database or table name not found");
} else {
AtlasEntity dbEntity = new AtlasEntity("hive_db");
dbEntity.setAttribute(AtlasClient.NAME, dbName); dbEntity.setAttribute(AtlasClient.NAME, dbName);
dbEntity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getDBQualifiedName(getClusterName(stormConf), dbName)); dbEntity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getDBQualifiedName(getClusterName(stormConf), dbName));
dbEntity.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf)); dbEntity.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf));
entityExtInfo.addReferredEntity(dbEntity); entityExtInfo.addReferredEntity(dbEntity);
// todo: verify if hive table has everything needed to retrieve existing table // todo: verify if hive table has everything needed to retrieve existing table
ret = new AtlasEntity("hive_table"); ret = new AtlasEntity("hive_table");
ret.setAttribute(AtlasClient.NAME, tblName); ret.setAttribute(AtlasClient.NAME, tblName);
ret.setAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasObjectId(dbEntity)); ret.setAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasObjectId(dbEntity));
ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tblQualifiedName); ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tblName));
}
} }
break; break;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment