Commit 74bfe947 by Mandar Ambawane Committed by Sarath Subramanian

ATLAS-3335 Update Sqoop/Storm hook to use relationship attributes

parent 709db096
...@@ -70,6 +70,10 @@ public class SqoopHook extends SqoopJobDataPublisher { ...@@ -70,6 +70,10 @@ public class SqoopHook extends SqoopJobDataPublisher {
public static final String OUTPUTS = "outputs"; public static final String OUTPUTS = "outputs";
public static final String ATTRIBUTE_DB = "db"; public static final String ATTRIBUTE_DB = "db";
public static final String RELATIONSHIP_HIVE_TABLE_DB = "hive_table_db";
public static final String RELATIONSHIP_DATASET_PROCESS_INPUTS = "dataset_process_inputs";
public static final String RELATIONSHIP_PROCESS_DATASET_OUTPUTS = "process_dataset_outputs";
private static final AtlasHookImpl atlasHook; private static final AtlasHookImpl atlasHook;
static { static {
...@@ -129,7 +133,7 @@ public class SqoopHook extends SqoopJobDataPublisher { ...@@ -129,7 +133,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
entHiveTable.setAttribute(AtlasClient.NAME, tableName.toLowerCase()); entHiveTable.setAttribute(AtlasClient.NAME, tableName.toLowerCase());
entHiveTable.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, qualifiedName); entHiveTable.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, qualifiedName);
entHiveTable.setAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasObjectId(entHiveDb)); entHiveTable.setRelationshipAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasRelatedObjectId(entHiveDb, RELATIONSHIP_HIVE_TABLE_DB));
return entHiveTable; return entHiveTable;
} }
...@@ -179,11 +183,11 @@ public class SqoopHook extends SqoopJobDataPublisher { ...@@ -179,11 +183,11 @@ public class SqoopHook extends SqoopJobDataPublisher {
List<AtlasObjectId> hiveObjects = Collections.singletonList(AtlasTypeUtil.getAtlasObjectId(entHiveTable != null ? entHiveTable : entHiveDb)); List<AtlasObjectId> hiveObjects = Collections.singletonList(AtlasTypeUtil.getAtlasObjectId(entHiveTable != null ? entHiveTable : entHiveDb));
if (isImportOperation(data)) { if (isImportOperation(data)) {
entProcess.setAttribute(SqoopHook.INPUTS, sqoopObjects); entProcess.setRelationshipAttribute(SqoopHook.INPUTS, AtlasTypeUtil.getAtlasRelatedObjectIdList(sqoopObjects, RELATIONSHIP_DATASET_PROCESS_INPUTS));
entProcess.setAttribute(SqoopHook.OUTPUTS, hiveObjects); entProcess.setRelationshipAttribute(SqoopHook.OUTPUTS, AtlasTypeUtil.getAtlasRelatedObjectIdList(hiveObjects, RELATIONSHIP_PROCESS_DATASET_OUTPUTS));
} else { } else {
entProcess.setAttribute(SqoopHook.INPUTS, hiveObjects); entProcess.setRelationshipAttribute(SqoopHook.INPUTS, AtlasTypeUtil.getAtlasRelatedObjectIdList(hiveObjects, RELATIONSHIP_DATASET_PROCESS_INPUTS));
entProcess.setAttribute(SqoopHook.OUTPUTS, sqoopObjects); entProcess.setRelationshipAttribute(SqoopHook.OUTPUTS, AtlasTypeUtil.getAtlasRelatedObjectIdList(sqoopObjects, RELATIONSHIP_PROCESS_DATASET_OUTPUTS));
} }
entProcess.setAttribute(SqoopHook.USER, data.getUser()); entProcess.setAttribute(SqoopHook.USER, data.getUser());
......
...@@ -67,6 +67,10 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -67,6 +67,10 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
public static final String HBASE_NAMESPACE_DEFAULT = "default"; public static final String HBASE_NAMESPACE_DEFAULT = "default";
public static final String ATTRIBUTE_DB = "db"; public static final String ATTRIBUTE_DB = "db";
public static final String RELATIONSHIP_STORM_TOPOLOGY_NODES = "storm_topology_nodes";
public static final String RELATIONSHIP_DATASET_PROCESS_INPUTS = "dataset_process_inputs";
public static final String RELATIONSHIP_PROCESS_DATASET_OUTPUTS = "process_dataset_outputs";
/** /**
* This is the client-side hook that storm fires when a topology is added. * This is the client-side hook that storm fires when a topology is added.
* *
...@@ -90,7 +94,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -90,7 +94,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
if (CollectionUtils.isNotEmpty(graphNodes)) { if (CollectionUtils.isNotEmpty(graphNodes)) {
// add the connection from topology to the graph // add the connection from topology to the graph
topology.setAttribute("nodes", AtlasTypeUtil.getAtlasObjectIds(graphNodes)); topology.setRelationshipAttribute("nodes", AtlasTypeUtil.getAtlasRelatedObjectIds(graphNodes, RELATIONSHIP_STORM_TOPOLOGY_NODES));
for (AtlasEntity graphNode : graphNodes) { for (AtlasEntity graphNode : graphNodes) {
entity.addReferredEntity(graphNode); entity.addReferredEntity(graphNode);
...@@ -144,7 +148,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -144,7 +148,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
} }
} }
topology.setAttribute("inputs", AtlasTypeUtil.getAtlasObjectIds(inputs)); topology.setRelationshipAttribute("inputs", AtlasTypeUtil.getAtlasRelatedObjectIds(inputs, RELATIONSHIP_DATASET_PROCESS_INPUTS));
} }
private void addTopologyOutputs(StormTopology stormTopology, String topologyOwner, Map stormConf, AtlasEntity topology, AtlasEntityExtInfo entityExtInfo) { private void addTopologyOutputs(StormTopology stormTopology, String topologyOwner, Map stormConf, AtlasEntity topology, AtlasEntityExtInfo entityExtInfo) {
...@@ -162,7 +166,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -162,7 +166,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
} }
} }
topology.setAttribute("outputs", AtlasTypeUtil.getAtlasObjectIds(outputs)); topology.setRelationshipAttribute("outputs", AtlasTypeUtil.getAtlasRelatedObjectIds(outputs, RELATIONSHIP_PROCESS_DATASET_OUTPUTS));
} }
private AtlasEntity addDataSet(String dataSetType, String topologyOwner, Serializable instance, Map stormConf, AtlasEntityExtInfo entityExtInfo) { private AtlasEntity addDataSet(String dataSetType, String topologyOwner, Serializable instance, Map stormConf, AtlasEntityExtInfo entityExtInfo) {
...@@ -272,7 +276,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -272,7 +276,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
ret = new AtlasEntity("hive_table"); ret = new AtlasEntity("hive_table");
ret.setAttribute(AtlasClient.NAME, tblName); ret.setAttribute(AtlasClient.NAME, tblName);
ret.setAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasObjectId(dbEntity)); ret.setRelationshipAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasRelatedObjectId(dbEntity, "hive_table_db"));
ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getTableQualifiedName(metadataNamespace, dbName, tblName)); ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getTableQualifiedName(metadataNamespace, dbName, tblName));
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment