Commit 54c31d5c by rdsolani Committed by Madhan Neethiraj

ATLAS-2545: updated Storm hook to use V2 notifications

parent 75415862
...@@ -254,6 +254,11 @@ ...@@ -254,6 +254,11 @@
<artifactId>commons-configuration</artifactId> <artifactId>commons-configuration</artifactId>
<version>${commons-conf.version}</version> <version>${commons-conf.version}</version>
</artifactItem> </artifactItem>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-configuration2</artifactId>
<version>${commons-conf2.version}</version>
</dependency>
<artifactItem> <artifactItem>
<groupId>commons-logging</groupId> <groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId> <artifactId>commons-logging</artifactId>
...@@ -295,6 +300,40 @@ ...@@ -295,6 +300,40 @@
<version>${hadoop.version}</version> <version>${hadoop.version}</version>
</artifactItem> </artifactItem>
<artifactItem> <artifactItem>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
</artifactItem>
<artifactItem>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</artifactItem>
<artifactItem>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</artifactItem>
<artifactItem>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</artifactItem>
<dependency>
<groupId>org.codehaus.woodstox</groupId>
<artifactId>stax2-api</artifactId>
<version>${codehaus.woodstox.stax2-api.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>${hadoop.hdfs-client.version}</version>
</dependency>
<artifactItem>
<groupId>log4j</groupId> <groupId>log4j</groupId>
<artifactId>log4j</artifactId> <artifactId>log4j</artifactId>
<version>${log4j.version}</version> <version>${log4j.version}</version>
...@@ -309,6 +348,11 @@ ...@@ -309,6 +348,11 @@
<artifactId>jsr311-api</artifactId> <artifactId>jsr311-api</artifactId>
<version>${jsr.version}</version> <version>${jsr.version}</version>
</artifactItem> </artifactItem>
<artifactItem>
<groupId>com.fasterxml.woodstox</groupId>
<artifactId>woodstox-core</artifactId>
<version>${woodstox-core.version}</version>
</artifactItem>
</artifactItems> </artifactItems>
</configuration> </configuration>
</execution> </execution>
......
...@@ -18,8 +18,14 @@ ...@@ -18,8 +18,14 @@
package org.apache.atlas.storm.hook; package org.apache.atlas.storm.hook;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.utils.HdfsNameServiceResolver; import org.apache.atlas.utils.HdfsNameServiceResolver;
import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.commons.collections.CollectionUtils;
import org.apache.storm.ISubmitterHook; import org.apache.storm.ISubmitterHook;
import org.apache.storm.generated.Bolt; import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.SpoutSpec; import org.apache.storm.generated.SpoutSpec;
...@@ -40,6 +46,7 @@ import org.slf4j.Logger; ...@@ -40,6 +46,7 @@ import org.slf4j.Logger;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -54,17 +61,13 @@ import java.util.Date; ...@@ -54,17 +61,13 @@ import java.util.Date;
* for the various lifecycle stages. * for the various lifecycle stages.
*/ */
public class StormAtlasHook extends AtlasHook implements ISubmitterHook { public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(StormAtlasHook.class); public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(StormAtlasHook.class);
private static final String CONF_PREFIX = "atlas.hook.storm."; private static final String CONF_PREFIX = "atlas.hook.storm.";
private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
// will be used for owner if Storm topology does not contain the owner instance public static final String ANONYMOUS_OWNER = "anonymous"; // if Storm topology does not contain the owner instance; possible if Storm is running in unsecure mode.
// possible if Storm is running in unsecure mode. public static final String HBASE_NAMESPACE_DEFAULT = "default";
public static final String ANONYMOUS_OWNER = "anonymous"; public static final String ATTRIBUTE_DB = "db";
public static final String HBASE_NAMESPACE_DEFAULT = "default";
public static final String ATTRIBUTE_DB = "db";
private final HdfsNameServiceResolver hdfsNameServiceResolver = HdfsNameServiceResolver.getInstance(); private final HdfsNameServiceResolver hdfsNameServiceResolver = HdfsNameServiceResolver.getInstance();
...@@ -81,112 +84,103 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -81,112 +84,103 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
* @param stormTopology a storm topology * @param stormTopology a storm topology
*/ */
@Override @Override
public void notify(TopologyInfo topologyInfo, Map stormConf, public void notify(TopologyInfo topologyInfo, Map stormConf, StormTopology stormTopology) {
StormTopology stormTopology) {
LOG.info("Collecting metadata for a new storm topology: {}", topologyInfo.get_name()); LOG.info("Collecting metadata for a new storm topology: {}", topologyInfo.get_name());
try { try {
ArrayList<Referenceable> entities = new ArrayList<>(); String user = getUser(topologyInfo.get_owner(), null);
Referenceable topologyReferenceable = createTopologyInstance(topologyInfo, stormConf); AtlasEntity topology = createTopologyInstance(topologyInfo, stormConf);
List<Referenceable> dependentEntities = addTopologyDataSets(stormTopology, topologyReferenceable, AtlasEntitiesWithExtInfo entity = new AtlasEntitiesWithExtInfo(topology);
topologyInfo.get_owner(), stormConf);
if (dependentEntities.size()>0) { addTopologyDataSets(stormTopology, topologyInfo.get_owner(), stormConf, topology, entity);
entities.addAll(dependentEntities);
}
// create the graph for the topology // create the graph for the topology
ArrayList<Referenceable> graphNodes = createTopologyGraph( List<AtlasEntity> graphNodes = createTopologyGraph(stormTopology, stormTopology.get_spouts(), stormTopology.get_bolts());
stormTopology, stormTopology.get_spouts(), stormTopology.get_bolts());
// add the connection from topology to the graph if (CollectionUtils.isNotEmpty(graphNodes)) {
topologyReferenceable.set("nodes", graphNodes); // add the connection from topology to the graph
entities.add(topologyReferenceable); topology.setAttribute("nodes", AtlasTypeUtil.getAtlasObjectIds(graphNodes));
LOG.debug("notifying entities, size = {}", entities.size()); for (AtlasEntity graphNode : graphNodes) {
String user = getUser(topologyInfo.get_owner(), null); entity.addReferredEntity(graphNode);
notifyEntities(user, entities); }
}
List<HookNotification> hookNotifications = Collections.singletonList(new EntityCreateRequestV2(user, entity));
notifyEntities(hookNotifications);
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException("Atlas hook is unable to process the topology.", e); throw new RuntimeException("Atlas hook is unable to process the topology.", e);
} }
} }
private Referenceable createTopologyInstance(TopologyInfo topologyInfo, Map stormConf) { private AtlasEntity createTopologyInstance(TopologyInfo topologyInfo, Map stormConf) {
Referenceable topologyReferenceable = new Referenceable( AtlasEntity topology = new AtlasEntity(StormDataTypes.STORM_TOPOLOGY.getName());
StormDataTypes.STORM_TOPOLOGY.getName()); String owner = topologyInfo.get_owner();
topologyReferenceable.set("id", topologyInfo.get_id());
topologyReferenceable.set(AtlasClient.NAME, topologyInfo.get_name());
topologyReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, topologyInfo.get_name());
String owner = topologyInfo.get_owner();
if (StringUtils.isEmpty(owner)) { if (StringUtils.isEmpty(owner)) {
owner = ANONYMOUS_OWNER; owner = ANONYMOUS_OWNER;
} }
topologyReferenceable.set(AtlasClient.OWNER, owner);
topologyReferenceable.set("startTime", new Date(System.currentTimeMillis()));
topologyReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf));
return topologyReferenceable; topology.setAttribute("id", topologyInfo.get_id());
topology.setAttribute(AtlasClient.NAME, topologyInfo.get_name());
topology.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, topologyInfo.get_name());
topology.setAttribute(AtlasClient.OWNER, owner);
topology.setAttribute("startTime", new Date(System.currentTimeMillis()));
topology.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf));
return topology;
} }
private List<Referenceable> addTopologyDataSets(StormTopology stormTopology, private void addTopologyDataSets(StormTopology stormTopology, String topologyOwner, Map stormConf, AtlasEntity topology, AtlasEntityExtInfo entityExtInfo) {
Referenceable topologyReferenceable,
String topologyOwner,
Map stormConf) {
List<Referenceable> dependentEntities = new ArrayList<>();
// add each spout as an input data set // add each spout as an input data set
addTopologyInputs(topologyReferenceable, addTopologyInputs(stormTopology.get_spouts(), stormConf, topologyOwner, topology, entityExtInfo);
stormTopology.get_spouts(), stormConf, topologyOwner, dependentEntities);
// add the appropriate bolts as output data sets // add the appropriate bolts as output data sets
addTopologyOutputs(topologyReferenceable, stormTopology, topologyOwner, stormConf, dependentEntities); addTopologyOutputs(stormTopology, topologyOwner, stormConf, topology, entityExtInfo);
return dependentEntities;
} }
private void addTopologyInputs(Referenceable topologyReferenceable, private void addTopologyInputs(Map<String, SpoutSpec> spouts, Map stormConf, String topologyOwner, AtlasEntity topology, AtlasEntityExtInfo entityExtInfo) {
Map<String, SpoutSpec> spouts, List<AtlasEntity> inputs = new ArrayList<>();
Map stormConf,
String topologyOwner, List<Referenceable> dependentEntities) {
final ArrayList<Referenceable> inputDataSets = new ArrayList<>();
for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) { for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) {
Serializable instance = Utils.javaDeserialize( Serializable instance = Utils.javaDeserialize(entry.getValue().get_spout_object().get_serialized_java(), Serializable.class);
entry.getValue().get_spout_object().get_serialized_java(), Serializable.class); String dsType = instance.getClass().getSimpleName();
AtlasEntity dsEntity = addDataSet(dsType, topologyOwner, instance, stormConf, entityExtInfo);
String simpleName = instance.getClass().getSimpleName(); if (dsEntity != null) {
final Referenceable datasetRef = createDataSet(simpleName, topologyOwner, instance, stormConf, dependentEntities); inputs.add(dsEntity);
if (datasetRef != null) {
inputDataSets.add(datasetRef);
} }
} }
topologyReferenceable.set("inputs", inputDataSets); topology.setAttribute("inputs", AtlasTypeUtil.getAtlasObjectIds(inputs));
} }
private void addTopologyOutputs(Referenceable topologyReferenceable, private void addTopologyOutputs(StormTopology stormTopology, String topologyOwner, Map stormConf, AtlasEntity topology, AtlasEntityExtInfo entityExtInfo) {
StormTopology stormTopology, String topologyOwner, List<AtlasEntity> outputs = new ArrayList<>();
Map stormConf, List<Referenceable> dependentEntities) { Map<String, Bolt> bolts = stormTopology.get_bolts();
final ArrayList<Referenceable> outputDataSets = new ArrayList<>(); Set<String> boltNames = StormTopologyUtil.getTerminalUserBoltNames(stormTopology);
Map<String, Bolt> bolts = stormTopology.get_bolts(); for (String boltName : boltNames) {
Set<String> terminalBoltNames = StormTopologyUtil.getTerminalUserBoltNames(stormTopology); Serializable instance = Utils.javaDeserialize(bolts.get(boltName).get_bolt_object().get_serialized_java(), Serializable.class);
for (String terminalBoltName : terminalBoltNames) { String dsType = instance.getClass().getSimpleName();
Serializable instance = Utils.javaDeserialize(bolts.get(terminalBoltName) AtlasEntity dsEntity = addDataSet(dsType, topologyOwner, instance, stormConf, entityExtInfo);
.get_bolt_object().get_serialized_java(), Serializable.class);
if (dsEntity != null) {
String dataSetType = instance.getClass().getSimpleName(); outputs.add(dsEntity);
final Referenceable datasetRef = createDataSet(dataSetType, topologyOwner, instance, stormConf, dependentEntities);
if (datasetRef != null) {
outputDataSets.add(datasetRef);
} }
} }
topologyReferenceable.set("outputs", outputDataSets); topology.setAttribute("outputs", AtlasTypeUtil.getAtlasObjectIds(outputs));
} }
private Referenceable createDataSet(String name, String topologyOwner, private AtlasEntity addDataSet(String dataSetType, String topologyOwner, Serializable instance, Map stormConf, AtlasEntityExtInfo entityExtInfo) {
Serializable instance, Map<String, String> config = StormTopologyUtil.getFieldValues(instance, true, null);
Map stormConf, List<Referenceable> dependentEntities) { String clusterName = null;
Map<String, String> config = StormTopologyUtil.getFieldValues(instance, true, null); AtlasEntity ret = null;
String clusterName = null;
Referenceable dataSetReferenceable;
// todo: need to redo this with a config driven approach // todo: need to redo this with a config driven approach
switch (name) { switch (dataSetType) {
case "KafkaSpout": { case "KafkaSpout": {
String topicName = config.get("KafkaSpout.kafkaSpoutConfig.translator.topic"); String topicName = config.get("KafkaSpout.kafkaSpoutConfig.translator.topic");
String uri = config.get("KafkaSpout.kafkaSpoutConfig.kafkaProps.bootstrap.servers"); String uri = config.get("KafkaSpout.kafkaSpoutConfig.kafkaProps.bootstrap.servers");
...@@ -199,21 +193,23 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -199,21 +193,23 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
uri = config.get("KafkaSpout._spoutConfig.hosts.brokerZkStr"); uri = config.get("KafkaSpout._spoutConfig.hosts.brokerZkStr");
} }
dataSetReferenceable = new Referenceable(StormDataTypes.KAFKA_TOPIC.getName());
dataSetReferenceable.set("topic", topicName);
dataSetReferenceable.set("uri", uri);
if (StringUtils.isEmpty(topologyOwner)) { if (StringUtils.isEmpty(topologyOwner)) {
topologyOwner = ANONYMOUS_OWNER; topologyOwner = ANONYMOUS_OWNER;
} }
dataSetReferenceable.set(AtlasClient.OWNER, topologyOwner);
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getKafkaTopicQualifiedName(getClusterName(stormConf), topicName)); clusterName = getClusterName(stormConf);
dataSetReferenceable.set(AtlasClient.NAME, topicName);
ret = new AtlasEntity(StormDataTypes.KAFKA_TOPIC.getName());
ret.setAttribute("topic", topicName);
ret.setAttribute("uri", uri);
ret.setAttribute(AtlasClient.OWNER, topologyOwner);
ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getKafkaTopicQualifiedName(clusterName, topicName));
ret.setAttribute(AtlasClient.NAME, topicName);
} }
break; break;
case "HBaseBolt": { case "HBaseBolt": {
dataSetReferenceable = new Referenceable(StormDataTypes.HBASE_TABLE.getName());
final String hbaseTableName = config.get("HBaseBolt.tableName"); final String hbaseTableName = config.get("HBaseBolt.tableName");
String uri = config.get("hbase.rootdir"); String uri = config.get("hbase.rootdir");
...@@ -221,191 +217,195 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -221,191 +217,195 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
uri = hbaseTableName; uri = hbaseTableName;
} }
dataSetReferenceable.set("uri", hbaseTableName);
dataSetReferenceable.set(AtlasClient.NAME, uri);
dataSetReferenceable.set(AtlasClient.OWNER, stormConf.get("storm.kerberos.principal"));
clusterName = extractComponentClusterName(HBaseConfiguration.create(), stormConf); clusterName = extractComponentClusterName(HBaseConfiguration.create(), stormConf);
ret = new AtlasEntity(StormDataTypes.HBASE_TABLE.getName());
ret.setAttribute("uri", hbaseTableName);
ret.setAttribute(AtlasClient.NAME, uri);
ret.setAttribute(AtlasClient.OWNER, stormConf.get("storm.kerberos.principal"));
//TODO - Hbase Namespace is hardcoded to 'default'. need to check how to get this or is it already part of tableName //TODO - Hbase Namespace is hardcoded to 'default'. need to check how to get this or is it already part of tableName
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT, ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT, hbaseTableName));
hbaseTableName));
} }
break; break;
case "HdfsBolt": case "HdfsBolt": {
dataSetReferenceable = new Referenceable(HiveMetaStoreBridge.HDFS_PATH); final String hdfsUri = config.get("HdfsBolt.rotationActions") == null ? config.get("HdfsBolt.fileNameFormat.path") : config.get("HdfsBolt.rotationActions");
String hdfsUri = config.get("HdfsBolt.rotationActions") == null
? config.get("HdfsBolt.fileNameFormat.path")
: config.get("HdfsBolt.rotationActions");
final String hdfsPathStr = config.get("HdfsBolt.fsUrl") + hdfsUri; final String hdfsPathStr = config.get("HdfsBolt.fsUrl") + hdfsUri;
final Path hdfsPath = new Path(hdfsPathStr);
final String nameServiceID = hdfsNameServiceResolver.getNameServiceIDForPath(hdfsPathStr); final String nameServiceID = hdfsNameServiceResolver.getNameServiceIDForPath(hdfsPathStr);
clusterName = getClusterName(stormConf); clusterName = getClusterName(stormConf);
dataSetReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf)); ret = new AtlasEntity(HiveMetaStoreBridge.HDFS_PATH);
ret.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf));
ret.setAttribute(AtlasClient.OWNER, stormConf.get("hdfs.kerberos.principal"));
ret.setAttribute(AtlasClient.NAME, Path.getPathWithoutSchemeAndAuthority(hdfsPath).toString().toLowerCase());
if (StringUtils.isNotEmpty(nameServiceID)) { if (StringUtils.isNotEmpty(nameServiceID)) {
String updatedPath = hdfsNameServiceResolver.getPathWithNameServiceID(hdfsPathStr); String updatedPath = hdfsNameServiceResolver.getPathWithNameServiceID(hdfsPathStr);
dataSetReferenceable.set("path", updatedPath);
dataSetReferenceable.set("nameServiceId", nameServiceID); ret.setAttribute("path", updatedPath);
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(clusterName, updatedPath)); ret.setAttribute("nameServiceId", nameServiceID);
ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(clusterName, updatedPath));
} else { } else {
dataSetReferenceable.set("path", hdfsPathStr); ret.setAttribute("path", hdfsPathStr);
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(clusterName, hdfsPathStr)); ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(clusterName, hdfsPathStr));
} }
dataSetReferenceable.set(AtlasClient.OWNER, stormConf.get("hdfs.kerberos.principal")); }
final Path hdfsPath = new Path(hdfsPathStr); break;
dataSetReferenceable.set(AtlasClient.NAME, Path.getPathWithoutSchemeAndAuthority(hdfsPath).toString().toLowerCase());
break;
case "HiveBolt": case "HiveBolt": {
// todo: verify if hive table has everything needed to retrieve existing table
Referenceable dbReferenceable = new Referenceable("hive_db");
String databaseName = config.get("HiveBolt.options.databaseName");
dbReferenceable.set(AtlasClient.NAME, databaseName);
dbReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getDBQualifiedName(getClusterName(stormConf), databaseName));
dbReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf));
dependentEntities.add(dbReferenceable);
clusterName = extractComponentClusterName(new HiveConf(), stormConf); clusterName = extractComponentClusterName(new HiveConf(), stormConf);
final String hiveTableName = config.get("HiveBolt.options.tableName");
dataSetReferenceable = new Referenceable("hive_table"); final String dbName = config.get("HiveBolt.options.databaseName");
final String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(clusterName, final String tblName = config.get("HiveBolt.options.tableName");
databaseName, hiveTableName); final String tblQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tblName);
dataSetReferenceable.set(AtlasClient.NAME, hiveTableName);
dataSetReferenceable.set(ATTRIBUTE_DB, dbReferenceable); AtlasEntity dbEntity = new AtlasEntity("hive_db");
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
break; dbEntity.setAttribute(AtlasClient.NAME, dbName);
dbEntity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getDBQualifiedName(getClusterName(stormConf), dbName));
dbEntity.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf));
entityExtInfo.addReferredEntity(dbEntity);
// todo: verify if hive table has everything needed to retrieve existing table
ret = new AtlasEntity("hive_table");
ret.setAttribute(AtlasClient.NAME, tblName);
ret.setAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasObjectId(dbEntity));
ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tblQualifiedName);
}
break;
default: default:
// custom node - create a base dataset class with name attribute // custom node - create a base dataset class with name attribute
//TODO - What should we do for custom data sets. Not sure what name we can set here? //TODO - What should we do for custom data sets. Not sure what name we can set here?
return null; return null;
} }
dependentEntities.add(dataSetReferenceable);
return dataSetReferenceable;
}
private String extractComponentClusterName(Configuration configuration, Map stormConf) { if (ret != null) {
String clusterName = configuration.get(AtlasConstants.CLUSTER_NAME_KEY, null); entityExtInfo.addReferredEntity(ret);
if (clusterName == null) {
clusterName = getClusterName(stormConf);
} }
return clusterName;
}
return ret;
}
private ArrayList<Referenceable> createTopologyGraph(StormTopology stormTopology, private List<AtlasEntity> createTopologyGraph(StormTopology stormTopology, Map<String, SpoutSpec> spouts, Map<String, Bolt> bolts) {
Map<String, SpoutSpec> spouts,
Map<String, Bolt> bolts) {
// Add graph of nodes in the topology // Add graph of nodes in the topology
final Map<String, Referenceable> nodeEntities = new HashMap<>(); Map<String, AtlasEntity> nodeEntities = new HashMap<>();
addSpouts(spouts, nodeEntities); addSpouts(spouts, nodeEntities);
addBolts(bolts, nodeEntities); addBolts(bolts, nodeEntities);
addGraphConnections(stormTopology, nodeEntities); addGraphConnections(stormTopology, nodeEntities);
ArrayList<Referenceable> nodes = new ArrayList<>(); return new ArrayList<>(nodeEntities.values());
nodes.addAll(nodeEntities.values());
return nodes;
} }
private void addSpouts(Map<String, SpoutSpec> spouts, private void addSpouts(Map<String, SpoutSpec> spouts, Map<String, AtlasEntity> nodeEntities) {
Map<String, Referenceable> nodeEntities) {
for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) { for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) {
final String spoutName = entry.getKey(); String spoutName = entry.getKey();
Referenceable spoutReferenceable = createSpoutInstance( AtlasEntity spout = createSpoutInstance(spoutName, entry.getValue());
spoutName, entry.getValue());
nodeEntities.put(spoutName, spoutReferenceable);
}
}
private Referenceable createSpoutInstance(String spoutName, nodeEntities.put(spoutName, spout);
SpoutSpec stormSpout) { }
Referenceable spoutReferenceable = new Referenceable(StormDataTypes.STORM_SPOUT.getName());
spoutReferenceable.set(AtlasClient.NAME, spoutName);
Serializable instance = Utils.javaDeserialize(
stormSpout.get_spout_object().get_serialized_java(), Serializable.class);
spoutReferenceable.set("driverClass", instance.getClass().getName());
Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true, null);
spoutReferenceable.set("conf", flatConfigMap);
return spoutReferenceable;
} }
private void addBolts(Map<String, Bolt> bolts, private void addBolts(Map<String, Bolt> bolts, Map<String, AtlasEntity> nodeEntities) {
Map<String, Referenceable> nodeEntities) {
for (Map.Entry<String, Bolt> entry : bolts.entrySet()) { for (Map.Entry<String, Bolt> entry : bolts.entrySet()) {
Referenceable boltInstance = createBoltInstance(entry.getKey(), entry.getValue()); String boltName = entry.getKey();
nodeEntities.put(entry.getKey(), boltInstance); AtlasEntity boltInstance = createBoltInstance(boltName, entry.getValue());
nodeEntities.put(boltName, boltInstance);
} }
} }
private Referenceable createBoltInstance(String boltName, private AtlasEntity createSpoutInstance(String spoutName, SpoutSpec stormSpout) {
Bolt stormBolt) { AtlasEntity spout = new AtlasEntity(StormDataTypes.STORM_SPOUT.getName());
Referenceable boltReferenceable = new Referenceable(StormDataTypes.STORM_BOLT.getName()); Serializable instance = Utils.javaDeserialize(stormSpout.get_spout_object().get_serialized_java(), Serializable.class);
Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true, null);
boltReferenceable.set(AtlasClient.NAME, boltName); spout.setAttribute(AtlasClient.NAME, spoutName);
spout.setAttribute("driverClass", instance.getClass().getName());
spout.setAttribute("conf", flatConfigMap);
Serializable instance = Utils.javaDeserialize( return spout;
stormBolt.get_bolt_object().get_serialized_java(), Serializable.class); }
boltReferenceable.set("driverClass", instance.getClass().getName());
private AtlasEntity createBoltInstance(String boltName, Bolt stormBolt) {
AtlasEntity bolt = new AtlasEntity(StormDataTypes.STORM_BOLT.getName());
Serializable instance = Utils.javaDeserialize(stormBolt.get_bolt_object().get_serialized_java(), Serializable.class);
Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true, null); Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true, null);
boltReferenceable.set("conf", flatConfigMap);
return boltReferenceable; bolt.setAttribute(AtlasClient.NAME, boltName);
bolt.setAttribute("driverClass", instance.getClass().getName());
bolt.setAttribute("conf", flatConfigMap);
return bolt;
} }
private void addGraphConnections(StormTopology stormTopology, private void addGraphConnections(StormTopology stormTopology, Map<String, AtlasEntity> nodeEntities) {
Map<String, Referenceable> nodeEntities) {
// adds connections between spouts and bolts // adds connections between spouts and bolts
Map<String, Set<String>> adjacencyMap = Map<String, Set<String>> adjacencyMap = StormTopologyUtil.getAdjacencyMap(stormTopology, true);
StormTopologyUtil.getAdjacencyMap(stormTopology, true);
for (Map.Entry<String, Set<String>> entry : adjacencyMap.entrySet()) { for (Map.Entry<String, Set<String>> entry : adjacencyMap.entrySet()) {
String nodeName = entry.getKey(); String nodeName = entry.getKey();
Set<String> adjacencyList = adjacencyMap.get(nodeName); Set<String> adjacencyList = adjacencyMap.get(nodeName);
if (adjacencyList == null || adjacencyList.isEmpty()) {
if (CollectionUtils.isEmpty(adjacencyList)) {
continue; continue;
} }
// add outgoing links // add outgoing links
Referenceable node = nodeEntities.get(nodeName); AtlasEntity node = nodeEntities.get(nodeName);
ArrayList<String> outputs = new ArrayList<>(adjacencyList.size()); List<String> outputs = new ArrayList<>(adjacencyList.size());
outputs.addAll(adjacencyList); outputs.addAll(adjacencyList);
node.set("outputs", outputs); node.setAttribute("outputs", outputs);
// add incoming links // add incoming links
for (String adjacentNodeName : adjacencyList) { for (String adjacentNodeName : adjacencyList) {
Referenceable adjacentNode = nodeEntities.get(adjacentNodeName); AtlasEntity adjacentNode = nodeEntities.get(adjacentNodeName);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
ArrayList<String> inputs = (ArrayList<String>) adjacentNode.get("inputs"); List<String> inputs = (List<String>) adjacentNode.getAttribute("inputs");
if (inputs == null) { if (inputs == null) {
inputs = new ArrayList<>(); inputs = new ArrayList<>();
} }
inputs.add(nodeName); inputs.add(nodeName);
adjacentNode.set("inputs", inputs); adjacentNode.setAttribute("inputs", inputs);
} }
} }
} }
public static String getKafkaTopicQualifiedName(String clusterName, String topicName) { public static String getKafkaTopicQualifiedName(String clusterName, String topicName) {
return String.format("%s@%s", topicName, clusterName); return String.format("%s@%s", topicName.toLowerCase(), clusterName);
} }
public static String getHbaseTableQualifiedName(String clusterName, String nameSpace, String tableName) { public static String getHbaseTableQualifiedName(String clusterName, String nameSpace, String tableName) {
return String.format("%s.%s@%s", nameSpace, tableName, clusterName); return String.format("%s.%s@%s", nameSpace.toLowerCase(), tableName.toLowerCase(), clusterName);
} }
public static String getHdfsPathQualifiedName(String clusterName, String hdfsPath) { public static String getHdfsPathQualifiedName(String clusterName, String hdfsPath) {
return String.format("%s@%s", hdfsPath, clusterName); return String.format("%s@%s", hdfsPath.toLowerCase(), clusterName);
} }
private String getClusterName(Map stormConf) { private String getClusterName(Map stormConf) {
return atlasProperties.getString(AtlasConstants.CLUSTER_NAME_KEY, AtlasConstants.DEFAULT_CLUSTER_NAME); return atlasProperties.getString(AtlasConstants.CLUSTER_NAME_KEY, AtlasConstants.DEFAULT_CLUSTER_NAME);
} }
private String extractComponentClusterName(Configuration configuration, Map stormConf) {
String clusterName = configuration.get(AtlasConstants.CLUSTER_NAME_KEY, null);
if (clusterName == null) {
clusterName = getClusterName(stormConf);
}
return clusterName;
}
} }
...@@ -570,6 +570,8 @@ ...@@ -570,6 +570,8 @@
<aopalliance.version>1.0</aopalliance.version> <aopalliance.version>1.0</aopalliance.version>
<jackson.version>2.9.2</jackson.version> <jackson.version>2.9.2</jackson.version>
<commons-conf.version>1.10</commons-conf.version> <commons-conf.version>1.10</commons-conf.version>
<commons-conf2.version>2.2</commons-conf2.version>
<commons-collections.version>3.2.2</commons-collections.version> <commons-collections.version>3.2.2</commons-collections.version>
<commons-logging.version>1.1.3</commons-logging.version> <commons-logging.version>1.1.3</commons-logging.version>
<commons-lang.version>2.6</commons-lang.version> <commons-lang.version>2.6</commons-lang.version>
...@@ -582,6 +584,10 @@ ...@@ -582,6 +584,10 @@
<maven-site-plugin.version>3.7</maven-site-plugin.version> <maven-site-plugin.version>3.7</maven-site-plugin.version>
<doxia.version>1.8</doxia.version> <doxia.version>1.8</doxia.version>
<dropwizard-metrics>3.2.2</dropwizard-metrics> <dropwizard-metrics>3.2.2</dropwizard-metrics>
<!-- hadoop.hdfs-client.version should same as hadoop version -->
<hadoop.hdfs-client.version>2.8.1</hadoop.hdfs-client.version>
<codehaus.woodstox.stax2-api.version>3.1.4</codehaus.woodstox.stax2-api.version>
<woodstox-core.version>5.0.3</woodstox-core.version>
<PermGen>64m</PermGen> <PermGen>64m</PermGen>
<MaxPermGen>512m</MaxPermGen> <MaxPermGen>512m</MaxPermGen>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment