Commit b77d7c7b by Shwetha GS

ATLAS-183 Add a Hook in Storm to post the topology metadata (svenkat,yhemanth via shwethags)

parent a46711c5
......@@ -20,15 +20,11 @@ package org.apache.atlas.hive.hook;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.commons.configuration.Configuration;
......@@ -63,7 +59,7 @@ import java.util.concurrent.TimeUnit;
/**
* AtlasHook sends lineage information to the AtlasSever.
*/
public class HiveHook implements ExecuteWithHookContext {
public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class);
......@@ -103,16 +99,13 @@ public class HiveHook implements ExecuteWithHookContext {
public Long queryStartTime;
}
@Inject
private static NotificationInterface notifInterface;
private List<HookNotification.HookNotificationMessage> messages = new ArrayList<>();
private static final HiveConf hiveConf;
static {
try {
atlasProperties = ApplicationProperties.get(ApplicationProperties.CLIENT_PROPERTIES);
atlasProperties = ApplicationProperties.get();
// initialize the async facility to process hook calls. We don't
// want to do this inline since it adds plenty of overhead for the query.
......@@ -142,15 +135,17 @@ public class HiveHook implements ExecuteWithHookContext {
LOG.info("Attempting to send msg while shutdown in progress.", e);
}
Injector injector = Guice.createInjector(new NotificationModule());
notifInterface = injector.getInstance(NotificationInterface.class);
hiveConf = new HiveConf();
LOG.info("Created Atlas Hook");
}
@Override
protected String getNumberOfRetriesPropertyKey() {
return HOOK_NUM_RETRIES;
}
@Override
public void run(final HookContext hookContext) throws Exception {
// clone to avoid concurrent access
final HiveEvent event = new HiveEvent();
......@@ -233,7 +228,7 @@ public class HiveHook implements ExecuteWithHookContext {
default:
}
notifyAtlas();
notifyEntities(messages);
}
private void renameTable(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
......@@ -324,31 +319,6 @@ public class HiveHook implements ExecuteWithHookContext {
}
}
/**
* Notify atlas of the entity through message. The entity can be a complex entity with reference to other entities.
* De-duping of entities is done on server side depending on the unique attribute on the
*/
private void notifyAtlas() {
int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3);
LOG.debug("Notifying atlas with messages {}", messages);
int numRetries = 0;
while (true) {
try {
notifInterface.send(NotificationInterface.NotificationType.HOOK, messages);
break;
} catch(Exception e) {
numRetries++;
if(numRetries < maxRetries) {
LOG.debug("Failed to notify atlas. Retrying", e);
} else {
LOG.error("Failed to notify atlas after {} retries. Quitting", maxRetries, e);
break;
}
}
}
}
private String normalize(String str) {
if (StringUtils.isEmpty(str)) {
return null;
......
......@@ -31,6 +31,7 @@
<properties>
<storm.version>0.10.0.2.3.99.0-195</storm.version>
<hive.version>1.2.1</hive.version>
</properties>
<dependencies>
......@@ -66,6 +67,24 @@
<artifactId>hive-bridge</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
</dependency>
<!-- apache storm core dependencies -->
<dependency>
<groupId>org.apache.storm</groupId>
......@@ -269,7 +288,16 @@
<artifactId>paranamer</artifactId>
<version>${paranamer.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</artifactItem>
</artifactItems>
</configuration>
</execution>
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.storm.hook;
import backtype.storm.ISubmitterHook;
import backtype.storm.generated.Bolt;
import backtype.storm.generated.SpoutSpec;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.TopologyInfo;
import backtype.storm.utils.Utils;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.storm.model.StormDataModel;
import org.apache.atlas.storm.model.StormDataTypes;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.slf4j.Logger;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* StormAtlasHook sends storm topology metadata information to Atlas
* via a Kafka Broker for durability.
* <p/>
* This is based on the assumption that the same topology name is used
* for the various lifecycle stages.
*/
public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(StormAtlasHook.class);
private static final String CONF_PREFIX = "atlas.hook.storm.";
private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
// will be used for owner if Storm topology does not contain the owner instance
// possible if Storm is running in unsecure mode.
public static final String ANONYMOUS_OWNER = "anonymous";
public static final String HBASE_NAMESPACE_DEFAULT = "default";
private static volatile boolean typesRegistered = false;
public StormAtlasHook() {
super();
}
StormAtlasHook(AtlasClient atlasClient) {
super(atlasClient);
}
@Override
protected String getNumberOfRetriesPropertyKey() {
return HOOK_NUM_RETRIES;
}
/**
* This is the client-side hook that storm fires when a topology is added.
*
* @param topologyInfo topology info
* @param stormConf configuration
* @param stormTopology a storm topology
* @throws IllegalAccessException
*/
@Override
public void notify(TopologyInfo topologyInfo, Map stormConf,
StormTopology stormTopology) throws IllegalAccessException {
LOG.info("Collecting metadata for a new storm topology: {}", topologyInfo.get_name());
try {
if( ! typesRegistered ) {
registerDataModel(new HiveDataModelGenerator());
}
ArrayList<Referenceable> entities = new ArrayList<>();
Referenceable topologyReferenceable = createTopologyInstance(topologyInfo, stormConf);
List<Referenceable> dependentEntities = addTopologyDataSets(stormTopology, topologyReferenceable,
topologyInfo.get_owner(), stormConf);
if (dependentEntities.size()>0) {
entities.addAll(dependentEntities);
}
// create the graph for the topology
ArrayList<Referenceable> graphNodes = createTopologyGraph(
stormTopology, stormTopology.get_spouts(), stormTopology.get_bolts());
// add the connection from topology to the graph
topologyReferenceable.set("nodes", graphNodes);
entities.add(topologyReferenceable);
LOG.debug("notifying entities, size = {}", entities.size());
notifyEntities(entities);
} catch (Exception e) {
throw new RuntimeException("Atlas hook is unable to process the topology.", e);
}
}
private Referenceable createTopologyInstance(TopologyInfo topologyInfo, Map stormConf) throws Exception {
Referenceable topologyReferenceable = new Referenceable(
StormDataTypes.STORM_TOPOLOGY.getName());
topologyReferenceable.set("id", topologyInfo.get_id());
topologyReferenceable.set("name", topologyInfo.get_name());
String owner = topologyInfo.get_owner();
if (StringUtils.isEmpty(owner)) {
owner = ANONYMOUS_OWNER;
}
topologyReferenceable.set("owner", owner);
topologyReferenceable.set("startTime", System.currentTimeMillis());
topologyReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf));
return topologyReferenceable;
}
private List<Referenceable> addTopologyDataSets(StormTopology stormTopology,
Referenceable topologyReferenceable,
String topologyOwner,
Map stormConf) throws Exception {
List<Referenceable> dependentEntities = new ArrayList<>();
// add each spout as an input data set
addTopologyInputs(topologyReferenceable,
stormTopology.get_spouts(), stormConf, topologyOwner, dependentEntities);
// add the appropriate bolts as output data sets
addTopologyOutputs(topologyReferenceable, stormTopology, topologyOwner, stormConf, dependentEntities);
return dependentEntities;
}
private void addTopologyInputs(Referenceable topologyReferenceable,
Map<String, SpoutSpec> spouts,
Map stormConf,
String topologyOwner, List<Referenceable> dependentEntities) throws IllegalAccessException {
final ArrayList<Referenceable> inputDataSets = new ArrayList<>();
for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) {
Serializable instance = Utils.javaDeserialize(
entry.getValue().get_spout_object().get_serialized_java(), Serializable.class);
String simpleName = instance.getClass().getSimpleName();
final Referenceable datasetRef = createDataSet(simpleName, topologyOwner, instance, stormConf, dependentEntities);
if (datasetRef != null) {
inputDataSets.add(datasetRef);
}
}
topologyReferenceable.set("inputs", inputDataSets);
}
private void addTopologyOutputs(Referenceable topologyReferenceable,
StormTopology stormTopology, String topologyOwner,
Map stormConf, List<Referenceable> dependentEntities) throws Exception {
final ArrayList<Referenceable> outputDataSets = new ArrayList<>();
Map<String, Bolt> bolts = stormTopology.get_bolts();
Set<String> terminalBoltNames = StormTopologyUtil.getTerminalUserBoltNames(stormTopology);
for (String terminalBoltName : terminalBoltNames) {
Serializable instance = Utils.javaDeserialize(bolts.get(terminalBoltName)
.get_bolt_object().get_serialized_java(), Serializable.class);
String dataSetType = instance.getClass().getSimpleName();
final Referenceable datasetRef = createDataSet(dataSetType, topologyOwner, instance, stormConf, dependentEntities);
if (datasetRef != null) {
outputDataSets.add(datasetRef);
}
}
topologyReferenceable.set("outputs", outputDataSets);
}
private Referenceable createDataSet(String name, String topologyOwner,
Serializable instance,
Map stormConf, List<Referenceable> dependentEntities) throws IllegalAccessException {
Map<String, String> config = StormTopologyUtil.getFieldValues(instance, true);
String clusterName = null;
Referenceable dataSetReferenceable;
// todo: need to redo this with a config driven approach
switch (name) {
case "KafkaSpout":
dataSetReferenceable = new Referenceable(StormDataTypes.KAFKA_TOPIC.getName());
final String topicName = config.get("KafkaSpout._spoutConfig.topic");
dataSetReferenceable.set("topic", topicName);
dataSetReferenceable.set("uri",
config.get("KafkaSpout._spoutConfig.hosts.brokerZkStr"));
if (StringUtils.isEmpty(topologyOwner)) {
topologyOwner = ANONYMOUS_OWNER;
}
dataSetReferenceable.set("owner", topologyOwner);
dataSetReferenceable.set("name", getKafkaTopicQualifiedName(getClusterName(stormConf), topicName));
break;
case "HBaseBolt":
dataSetReferenceable = new Referenceable(StormDataTypes.HBASE_TABLE.getName());
final String hbaseTableName = config.get("HBaseBolt.tableName");
dataSetReferenceable.set("uri", stormConf.get("hbase.rootdir"));
dataSetReferenceable.set("tableName", hbaseTableName);
dataSetReferenceable.set("owner", stormConf.get("storm.kerberos.principal"));
clusterName = extractComponentClusterName(HBaseConfiguration.create(), stormConf);
//TODO - Hbase Namespace is hardcoded to 'default'. need to check how to get this or is it already part of tableName
dataSetReferenceable.set("name", getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT,
hbaseTableName));
break;
case "HdfsBolt":
dataSetReferenceable = new Referenceable(StormDataTypes.HDFS_DATA_SET.getName());
String hdfsUri = config.get("HdfsBolt.rotationActions") == null
? config.get("HdfsBolt.fileNameFormat.path")
: config.get("HdfsBolt.rotationActions");
final String hdfsPath = config.get("HdfsBolt.fsUrl") + hdfsUri;
dataSetReferenceable.set("pathURI", hdfsPath);
dataSetReferenceable.set("owner", stormConf.get("hdfs.kerberos.principal"));
dataSetReferenceable.set("name", hdfsPath);
break;
case "HiveBolt":
// todo: verify if hive table has everything needed to retrieve existing table
Referenceable dbReferenceable = new Referenceable("hive_db");
String databaseName = config.get("HiveBolt.options.databaseName");
dbReferenceable.set(HiveDataModelGenerator.NAME, databaseName);
dbReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getDBQualifiedName(getClusterName(stormConf), databaseName));
dbReferenceable.set(HiveDataModelGenerator.CLUSTER_NAME, getClusterName(stormConf));
dependentEntities.add(dbReferenceable);
clusterName = extractComponentClusterName(new HiveConf(), stormConf);
final String hiveTableName = config.get("HiveBolt.options.tableName");
dataSetReferenceable = new Referenceable("hive_table");
final String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(clusterName,
databaseName, hiveTableName);
dataSetReferenceable.set(HiveDataModelGenerator.NAME, tableQualifiedName);
dataSetReferenceable.set(HiveDataModelGenerator.DB, dbReferenceable);
dataSetReferenceable.set(HiveDataModelGenerator.TABLE_NAME, hiveTableName);
break;
default:
// custom node - create a base dataset class with name attribute
//TODO - What should we do for custom data sets. Not sure what name we can set here?
return null;
}
dependentEntities.add(dataSetReferenceable);
return dataSetReferenceable;
}
private String extractComponentClusterName(Configuration configuration, Map stormConf) {
String clusterName = configuration.get(AtlasConstants.CLUSTER_NAME_KEY, null);
if (clusterName == null) {
clusterName = getClusterName(stormConf);
}
return clusterName;
}
private ArrayList<Referenceable> createTopologyGraph(StormTopology stormTopology,
Map<String, SpoutSpec> spouts,
Map<String, Bolt> bolts) throws Exception {
// Add graph of nodes in the topology
final Map<String, Referenceable> nodeEntities = new HashMap<>();
addSpouts(spouts, nodeEntities);
addBolts(bolts, nodeEntities);
addGraphConnections(stormTopology, nodeEntities);
ArrayList<Referenceable> nodes = new ArrayList<>();
nodes.addAll(nodeEntities.values());
return nodes;
}
private void addSpouts(Map<String, SpoutSpec> spouts,
Map<String, Referenceable> nodeEntities) throws IllegalAccessException {
for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) {
final String spoutName = entry.getKey();
Referenceable spoutReferenceable = createSpoutInstance(
spoutName, entry.getValue());
nodeEntities.put(spoutName, spoutReferenceable);
}
}
private Referenceable createSpoutInstance(String spoutName,
SpoutSpec stormSpout) throws IllegalAccessException {
Referenceable spoutReferenceable = new Referenceable(
StormDataTypes.STORM_SPOUT.getName(), "DataProducer");
spoutReferenceable.set("name", spoutName);
Serializable instance = Utils.javaDeserialize(
stormSpout.get_spout_object().get_serialized_java(), Serializable.class);
spoutReferenceable.set("driverClass", instance.getClass().getName());
Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true);
spoutReferenceable.set("conf", flatConfigMap);
return spoutReferenceable;
}
private void addBolts(Map<String, Bolt> bolts,
Map<String, Referenceable> nodeEntities) throws IllegalAccessException {
for (Map.Entry<String, Bolt> entry : bolts.entrySet()) {
Referenceable boltInstance = createBoltInstance(entry.getKey(), entry.getValue());
nodeEntities.put(entry.getKey(), boltInstance);
}
}
private Referenceable createBoltInstance(String boltName,
Bolt stormBolt) throws IllegalAccessException {
Referenceable boltReferenceable = new Referenceable(
StormDataTypes.STORM_BOLT.getName(), "DataProcessor");
boltReferenceable.set("name", boltName);
Serializable instance = Utils.javaDeserialize(
stormBolt.get_bolt_object().get_serialized_java(), Serializable.class);
boltReferenceable.set("driverClass", instance.getClass().getName());
Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true);
boltReferenceable.set("conf", flatConfigMap);
return boltReferenceable;
}
private void addGraphConnections(StormTopology stormTopology,
Map<String, Referenceable> nodeEntities) throws Exception {
// adds connections between spouts and bolts
Map<String, Set<String>> adjacencyMap =
StormTopologyUtil.getAdjacencyMap(stormTopology, true);
for (Map.Entry<String, Set<String>> entry : adjacencyMap.entrySet()) {
String nodeName = entry.getKey();
Set<String> adjacencyList = adjacencyMap.get(nodeName);
if (adjacencyList == null || adjacencyList.isEmpty()) {
continue;
}
// add outgoing links
Referenceable node = nodeEntities.get(nodeName);
ArrayList<String> outputs = new ArrayList<>(adjacencyList.size());
outputs.addAll(adjacencyList);
node.set("outputs", outputs);
// add incoming links
for (String adjacentNodeName : adjacencyList) {
Referenceable adjacentNode = nodeEntities.get(adjacentNodeName);
@SuppressWarnings("unchecked")
ArrayList<String> inputs = (ArrayList<String>) adjacentNode.get("inputs");
if (inputs == null) {
inputs = new ArrayList<>();
}
inputs.add(nodeName);
adjacentNode.set("inputs", inputs);
}
}
}
public static String getKafkaTopicQualifiedName(String clusterName, String topicName) {
return String.format("%s@%s", topicName, clusterName);
}
public static String getHbaseTableQualifiedName(String clusterName, String nameSpace, String tableName) {
return String.format("%s.%s@%s", nameSpace, tableName, clusterName);
}
public synchronized void registerDataModel(HiveDataModelGenerator dataModelGenerator) throws AtlasException,
AtlasServiceException {
try {
atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName());
LOG.info("Hive data model is already registered! Going ahead with registration of Storm Data model");
} catch(AtlasServiceException ase) {
if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
//Expected in case types do not exist
LOG.info("Registering Hive data model");
atlasClient.createType(dataModelGenerator.getModelAsJson());
} else {
throw ase;
}
}
try {
atlasClient.getType(StormDataTypes.STORM_TOPOLOGY.getName());
} catch(AtlasServiceException ase) {
if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
LOG.info("Registering Storm/Kafka data model");
StormDataModel.main(new String[]{});
TypesDef typesDef = StormDataModel.typesDef();
String stormTypesAsJSON = TypesSerialization.toJson(typesDef);
LOG.info("stormTypesAsJSON = {}", stormTypesAsJSON);
atlasClient.createType(stormTypesAsJSON);
}
}
typesRegistered = true;
}
private String getClusterName(Map stormConf) {
String clusterName = AtlasConstants.DEFAULT_CLUSTER_NAME;
if (stormConf.containsKey(AtlasConstants.CLUSTER_NAME_KEY)) {
clusterName = (String)stormConf.get(AtlasConstants.CLUSTER_NAME_KEY);
}
return clusterName;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.storm.hook;
import backtype.storm.generated.Bolt;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.generated.Grouping;
import backtype.storm.generated.StormTopology;
import com.google.common.base.Joiner;
import org.slf4j.Logger;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* A storm topology utility class.
*/
public final class StormTopologyUtil {
private StormTopologyUtil() {
}
public static Set<String> getTerminalUserBoltNames(StormTopology topology) throws Exception {
Set<String> terminalBolts = new HashSet<>();
Set<String> inputs = new HashSet<>();
for (Map.Entry<String, Bolt> entry : topology.get_bolts().entrySet()) {
String name = entry.getKey();
Set<GlobalStreamId> inputsForBolt = entry.getValue().get_common().get_inputs().keySet();
if (!isSystemComponent(name)) {
for (GlobalStreamId streamId : inputsForBolt) {
inputs.add(streamId.get_componentId());
}
}
}
for (String boltName : topology.get_bolts().keySet()) {
if (!isSystemComponent(boltName) && !inputs.contains(boltName)) {
terminalBolts.add(boltName);
}
}
return terminalBolts;
}
public static boolean isSystemComponent(String componentName) {
return componentName.startsWith("__");
}
public static Map<String, Set<String>> getAdjacencyMap(StormTopology topology,
boolean removeSystemComponent)
throws Exception {
Map<String, Set<String>> adjacencyMap = new HashMap<>();
for (Map.Entry<String, Bolt> entry : topology.get_bolts().entrySet()) {
String boltName = entry.getKey();
Map<GlobalStreamId, Grouping> inputs = entry.getValue().get_common().get_inputs();
for (Map.Entry<GlobalStreamId, Grouping> input : inputs.entrySet()) {
String inputComponentId = input.getKey().get_componentId();
Set<String> components = adjacencyMap.containsKey(inputComponentId)
? adjacencyMap.get(inputComponentId) : new HashSet<String>();
components.add(boltName);
components = removeSystemComponent ? removeSystemComponents(components)
: components;
if ((removeSystemComponent && !isSystemComponent(inputComponentId)) ||
!removeSystemComponent) {
adjacencyMap.put(inputComponentId, components);
}
}
}
return adjacencyMap;
}
public static Set<String> removeSystemComponents(Set<String> components) {
Set<String> userComponents = new HashSet<>();
for (String component : components) {
if (!isSystemComponent(component))
userComponents.add(component);
}
return userComponents;
}
private static final Set<Class> WRAPPER_TYPES = new HashSet<Class>() {{
add(Boolean.class);
add(Character.class);
add(Byte.class);
add(Short.class);
add(Integer.class);
add(Long.class);
add(Float.class);
add(Double.class);
add(Void.class);
add(String.class);
}};
public static boolean isWrapperType(Class clazz) {
return WRAPPER_TYPES.contains(clazz);
}
public static boolean isCollectionType(Class clazz) {
return Collection.class.isAssignableFrom(clazz);
}
public static boolean isMapType(Class clazz) {
return Map.class.isAssignableFrom(clazz);
}
public static Map<String, String> getFieldValues(Object instance,
boolean prependClassName)
throws IllegalAccessException {
Class clazz = instance.getClass();
Map<String, String> output = new HashMap<>();
for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
Field[] fields = c.getDeclaredFields();
for (Field field : fields) {
if (java.lang.reflect.Modifier.isStatic(field.getModifiers())) {
continue;
}
String key;
if (prependClassName) {
key = String.format("%s.%s", clazz.getSimpleName(), field.getName());
} else {
key = field.getName();
}
boolean accessible = field.isAccessible();
if (!accessible) {
field.setAccessible(true);
}
Object fieldVal = field.get(instance);
if (fieldVal == null) {
continue;
} else if (fieldVal.getClass().isPrimitive() ||
isWrapperType(fieldVal.getClass())) {
if (toString(fieldVal, false).isEmpty()) continue;
output.put(key, toString(fieldVal, false));
} else if (isMapType(fieldVal.getClass())) {
//TODO: check if it makes more sense to just stick to json
// like structure instead of a flatten output.
Map map = (Map) fieldVal;
for (Object entry : map.entrySet()) {
Object mapKey = ((Map.Entry) entry).getKey();
Object mapVal = ((Map.Entry) entry).getValue();
String keyStr = getString(mapKey, false);
String valStr = getString(mapVal, false);
if ((valStr == null) || (valStr.isEmpty())) {
continue;
} else {
output.put(String.format("%s.%s", key, keyStr), valStr);
}
}
} else if (isCollectionType(fieldVal.getClass())) {
//TODO check if it makes more sense to just stick to
// json like structure instead of a flatten output.
Collection collection = (Collection) fieldVal;
if (collection.size()==0) continue;
String outStr = "";
for (Object o : collection) {
outStr += getString(o, false) + ",";
}
if (outStr.length() > 0) {
outStr = outStr.substring(0, outStr.length() - 1);
}
output.put(key, String.format("%s", outStr));
} else {
Map<String, String> nestedFieldValues = getFieldValues(fieldVal, false);
for (Map.Entry<String, String> entry : nestedFieldValues.entrySet()) {
output.put(String.format("%s.%s", key, entry.getKey()), entry.getValue());
}
}
if (!accessible) {
field.setAccessible(false);
}
}
}
return output;
}
private static String getString(Object instance,
boolean wrapWithQuote) throws IllegalAccessException {
if (instance == null) {
return null;
} else if (instance.getClass().isPrimitive() || isWrapperType(instance.getClass())) {
return toString(instance, wrapWithQuote);
} else {
return getString(getFieldValues(instance, false), wrapWithQuote);
}
}
private static String getString(Map<String, String> flattenFields, boolean wrapWithQuote) {
String outStr = "";
if (flattenFields != null && !flattenFields.isEmpty()) {
if (wrapWithQuote) {
outStr += "\"" + Joiner.on(",").join(flattenFields.entrySet()) + "\",";
} else {
outStr += Joiner.on(",").join(flattenFields.entrySet()) + ",";
}
}
if (outStr.length() > 0) {
outStr = outStr.substring(0, outStr.length() - 1);
}
return outStr;
}
private static String toString(Object instance, boolean wrapWithQuote) {
if (instance instanceof String)
if (wrapWithQuote)
return "\"" + instance + "\"";
else
return instance.toString();
else
return instance.toString();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.storm.hook;
import backtype.storm.ILocalCluster;
import backtype.storm.generated.StormTopology;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.storm.model.StormDataModel;
import org.apache.atlas.storm.model.StormDataTypes;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.commons.configuration.Configuration;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Test
public class StormAtlasHookIT {
public static final Logger LOG = LoggerFactory.getLogger(StormAtlasHookIT.class);
private static final String ATLAS_URL = "http://localhost:21000/";
private static final String TOPOLOGY_NAME = "word-count";
private ILocalCluster stormCluster;
private AtlasClient atlasClient;
@BeforeClass
public void setUp() throws Exception {
// start a local storm cluster
stormCluster = StormTestUtil.createLocalStormCluster();
LOG.info("Created a storm local cluster");
Configuration configuration = ApplicationProperties.get();
atlasClient = new AtlasClient(configuration.getString("atlas.rest.address", ATLAS_URL));
}
@AfterClass
public void tearDown() throws Exception {
LOG.info("Shutting down storm local cluster");
stormCluster.shutdown();
atlasClient = null;
}
@Test
public void testCreateDataModel() throws Exception {
StormDataModel.main(new String[]{});
TypesDef stormTypesDef = StormDataModel.typesDef();
String stormTypesAsJSON = TypesSerialization.toJson(stormTypesDef);
LOG.info("stormTypesAsJSON = {}", stormTypesAsJSON);
new StormAtlasHook().registerDataModel(new HiveDataModelGenerator());
// verify types are registered
for (StormDataTypes stormDataType : StormDataTypes.values()) {
Assert.assertNotNull(atlasClient.getType(stormDataType.getName()));
}
}
@Test (dependsOnMethods = "testCreateDataModel")
public void testAddEntities() throws Exception {
StormTopology stormTopology = StormTestUtil.createTestTopology();
StormTestUtil.submitTopology(stormCluster, TOPOLOGY_NAME, stormTopology);
LOG.info("Submitted topology {}", TOPOLOGY_NAME);
// todo: test if topology metadata is registered in atlas
String guid = getTopologyGUID();
Assert.assertNotNull(guid);
LOG.info("GUID is {}", guid);
Referenceable topologyReferenceable = atlasClient.getEntity(guid);
Assert.assertNotNull(topologyReferenceable);
}
private String getTopologyGUID() throws Exception {
LOG.debug("Searching for topology {}", TOPOLOGY_NAME);
String query = String.format("from %s where name = \"%s\"",
StormDataTypes.STORM_TOPOLOGY.getName(), TOPOLOGY_NAME);
JSONArray results = atlasClient.search(query);
JSONObject row = results.getJSONObject(0);
return row.has("$id$") ? row.getJSONObject("$id$").getString("id"): null;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.storm.hook;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.storm.model.StormDataTypes;
import org.testng.annotations.Test;
import static org.mockito.Matchers.contains;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@Test
public class StormAtlasHookTest {
@Test
public void testStormRegistersHiveDataModelIfNotPresent() throws AtlasException, AtlasServiceException {
AtlasClient atlasClient = mock(AtlasClient.class);
HiveDataModelGenerator dataModelGenerator = mock(HiveDataModelGenerator.class);
AtlasServiceException atlasServiceException = mock(AtlasServiceException.class);
when(atlasServiceException.getStatus()).thenReturn(ClientResponse.Status.NOT_FOUND);
when(atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName())).thenThrow(atlasServiceException);
String hiveModel = "{hive_model_as_json}";
when(dataModelGenerator.getModelAsJson()).thenReturn(hiveModel);
StormAtlasHook stormAtlasHook = new StormAtlasHook(atlasClient);
stormAtlasHook.registerDataModel(dataModelGenerator);
verify(atlasClient).createType(hiveModel);
}
@Test
public void testStormRegistersStormModelIfNotPresent() throws AtlasServiceException, AtlasException {
AtlasClient atlasClient = mock(AtlasClient.class);
HiveDataModelGenerator dataModelGenerator = mock(HiveDataModelGenerator.class);
when(atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName())).thenReturn("hive_process_definition");
AtlasServiceException atlasServiceException = mock(AtlasServiceException.class);
when(atlasServiceException.getStatus()).thenReturn(ClientResponse.Status.NOT_FOUND);
when(atlasClient.getType(StormDataTypes.STORM_TOPOLOGY.getName())).thenThrow(atlasServiceException);
StormAtlasHook stormAtlasHook = new StormAtlasHook(atlasClient);
stormAtlasHook.registerDataModel(dataModelGenerator);
verify(atlasClient).createType(contains("storm_topology"));
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.storm.hook;
import backtype.storm.Config;
import backtype.storm.ILocalCluster;
import backtype.storm.Testing;
import backtype.storm.generated.StormTopology;
import backtype.storm.testing.TestGlobalCount;
import backtype.storm.testing.TestWordCounter;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
import java.util.HashMap;
/**
* An until to create a test topology.
*/
final class StormTestUtil {
private StormTestUtil() {
}
public static ILocalCluster createLocalStormCluster() {
// start a local storm cluster
HashMap<String,Object> localClusterConf = new HashMap<>();
localClusterConf.put("nimbus-daemon", true);
return Testing.getLocalCluster(localClusterConf);
}
public static StormTopology createTestTopology() {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 10);
builder.setBolt("count", new TestWordCounter(), 3).shuffleGrouping("words");
builder.setBolt("globalCount", new TestGlobalCount(), 2).shuffleGrouping("count");
return builder.createTopology();
}
public static Config submitTopology(ILocalCluster stormCluster, String topologyName,
StormTopology stormTopology) throws Exception {
Config stormConf = new Config();
stormConf.putAll(Utils.readDefaultConfig());
stormConf.setDebug(true);
stormConf.setMaxTaskParallelism(3);
stormConf.put(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN,
org.apache.atlas.storm.hook.StormAtlasHook.class.getName());
stormCluster.submitTopology(topologyName, stormConf, stormTopology);
Thread.sleep(10000);
return stormConf;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas;
public interface AtlasConstants {
String CLUSTER_NAME_KEY = "atlas.cluster.name";
String DEFAULT_CLUSTER_NAME = "primary";
String CLUSTER_NAME_ATTRIBUTE = "clusterName";
}
......@@ -110,6 +110,12 @@
<directory>../addons/sqoop-bridge/target/dependency/hook</directory>
<outputDirectory>hook</outputDirectory>
</fileSet>
<!-- addons/storm -->
<fileSet>
<directory>../addons/storm-bridge/target/dependency/hook</directory>
<outputDirectory>hook</outputDirectory>
</fileSet>
</fileSets>
<files>
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hook;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.commons.configuration.Configuration;
import org.codehaus.jettison.json.JSONArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* A base class for atlas hooks.
*/
public abstract class AtlasHook {
private static final Logger LOG = LoggerFactory.getLogger(AtlasHook.class);
private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
public static final String ATLAS_ENDPOINT = "atlas.rest.address";
protected final AtlasClient atlasClient;
/**
* Hadoop Cluster name for this instance, typically used for namespace.
*/
protected static Configuration atlasProperties;
@Inject
protected static NotificationInterface notifInterface;
static {
try {
atlasProperties = ApplicationProperties.get(ApplicationProperties.CLIENT_PROPERTIES);
} catch (Exception e) {
LOG.info("Attempting to send msg while shutdown in progress.", e);
}
Injector injector = Guice.createInjector(new NotificationModule());
notifInterface = injector.getInstance(NotificationInterface.class);
LOG.info("Created Atlas Hook");
}
public AtlasHook() {
this(new AtlasClient(atlasProperties.getString(ATLAS_ENDPOINT, DEFAULT_ATLAS_URL)));
}
public AtlasHook(AtlasClient atlasClient) {
this.atlasClient = atlasClient;
//TODO - take care of passing in - ugi, doAsUser for secure cluster
}
protected abstract String getNumberOfRetriesPropertyKey();
protected void notifyEntities(Collection<Referenceable> entities) {
JSONArray entitiesArray = new JSONArray();
for (Referenceable entity : entities) {
LOG.info("Adding entity for type: {}", entity.getTypeName());
final String entityJson = InstanceSerialization.toJson(entity, true);
entitiesArray.put(entityJson);
}
List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>();
hookNotificationMessages.add(new HookNotification.EntityCreateRequest(entitiesArray));
notifyEntities(hookNotificationMessages);
}
/**
* Notify atlas
* of the entity through message. The entity can be a
* complex entity with reference to other entities.
* De-duping of entities is done on server side depending on the
* unique attribute on the entities.
*
* @param entities entities
*/
protected void notifyEntities(List<HookNotification.HookNotificationMessage> entities) {
final int maxRetries = atlasProperties.getInt(getNumberOfRetriesPropertyKey(), 3);
final String message = entities.toString();
int numRetries = 0;
while (true) {
try {
notifInterface.send(NotificationInterface.NotificationType.HOOK, entities);
return;
} catch(Exception e) {
numRetries++;
if(numRetries < maxRetries) {
LOG.debug("Failed to notify atlas for entity {}. Retrying", message, e);
} else {
LOG.error("Failed to notify atlas for entity {} after {} retries. Quitting",
message, maxRetries, e);
}
}
}
}
}
......@@ -32,8 +32,7 @@ public class KafkaNotificationProvider implements Provider<KafkaNotification> {
public KafkaNotification get() {
try {
Configuration applicationProperties = ApplicationProperties.get();
KafkaNotification instance = new KafkaNotification(applicationProperties);
return instance;
return new KafkaNotification(applicationProperties);
} catch(AtlasException e) {
throw new RuntimeException(e);
}
......
......@@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES:
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
ATLAS-183 Add a Hook in Storm to post the topology metadata (svenkat,yhemanth via shwethags)
ATLAS-370 Implement deleteEntities at repository level (dkantor via shwethags)
ATLAS-406 Resizing lineage window – should be an anchor on a corner – like ppt for graphic (sanjayp via shwethags)
ATLAS-432 QuickStart lineage is broken (yhemanth via shwethags)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment