Commit ca905722 by Aaron Niskode-Dossett

Merge pull request #78 from hortonworks/lineage

DGI-1 and DGI-2 Fixes for HiveLineageService
parents 4543c837 71fc73fc
...@@ -192,7 +192,7 @@ public class HiveMetaStoreBridge { ...@@ -192,7 +192,7 @@ public class HiveMetaStoreBridge {
//todo DSL support for reference doesn't work. is the usage right? //todo DSL support for reference doesn't work. is the usage right?
// String query = String.format("%s where dbName = \"%s\" and tableName = \"%s\"", typeName, dbRef.getId().id, // String query = String.format("%s where dbName = \"%s\" and tableName = \"%s\"", typeName, dbRef.getId().id,
// tableName); // tableName);
String query = String.format("%s where tableName = \"%s\"", typeName, tableName); String query = String.format("%s where name = \"%s\"", typeName, tableName);
JSONArray results = dgiClient.searchByDSL(query); JSONArray results = dgiClient.searchByDSL(query);
if (results.length() == 0) { if (results.length() == 0) {
return null; return null;
...@@ -223,6 +223,7 @@ public class HiveMetaStoreBridge { ...@@ -223,6 +223,7 @@ public class HiveMetaStoreBridge {
} }
public Referenceable registerTable(Referenceable dbReference, String dbName, String tableName) throws Exception { public Referenceable registerTable(Referenceable dbReference, String dbName, String tableName) throws Exception {
LOG.info("Attempting to register table [" + tableName + "]");
Referenceable tableRef = getTableReference(dbReference, tableName); Referenceable tableRef = getTableReference(dbReference, tableName);
if (tableRef == null) { if (tableRef == null) {
LOG.info("Importing objects from " + dbName + "." + tableName); LOG.info("Importing objects from " + dbName + "." + tableName);
...@@ -230,7 +231,7 @@ public class HiveMetaStoreBridge { ...@@ -230,7 +231,7 @@ public class HiveMetaStoreBridge {
Table hiveTable = hiveClient.getTable(dbName, tableName); Table hiveTable = hiveClient.getTable(dbName, tableName);
tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
tableRef.set("tableName", hiveTable.getTableName()); tableRef.set("name", hiveTable.getTableName());
tableRef.set("owner", hiveTable.getOwner()); tableRef.set("owner", hiveTable.getOwner());
//todo fix //todo fix
tableRef.set("createTime", hiveTable.getLastAccessTime()); tableRef.set("createTime", hiveTable.getLastAccessTime());
...@@ -274,8 +275,8 @@ public class HiveMetaStoreBridge { ...@@ -274,8 +275,8 @@ public class HiveMetaStoreBridge {
tableRef.set("tableType", hiveTable.getTableType()); tableRef.set("tableType", hiveTable.getTableType());
tableRef.set("temporary", hiveTable.isTemporary()); tableRef.set("temporary", hiveTable.isTemporary());
// List<Referenceable> fieldsList = getColumns(storageDesc); List<Referenceable> colList = getColumns(hiveTable.getAllCols());
// tableRef.set("columns", fieldsList); tableRef.set("columns", colList);
tableRef = createInstance(tableRef); tableRef = createInstance(tableRef);
} else { } else {
...@@ -397,7 +398,7 @@ public class HiveMetaStoreBridge { ...@@ -397,7 +398,7 @@ public class HiveMetaStoreBridge {
} }
*/ */
List<Referenceable> fieldsList = getColumns(storageDesc); List<Referenceable> fieldsList = getColumns(storageDesc.getCols());
sdReferenceable.set("cols", fieldsList); sdReferenceable.set("cols", fieldsList);
List<Struct> sortColsStruct = new ArrayList<>(); List<Struct> sortColsStruct = new ArrayList<>();
...@@ -428,19 +429,19 @@ public class HiveMetaStoreBridge { ...@@ -428,19 +429,19 @@ public class HiveMetaStoreBridge {
return createInstance(sdReferenceable); return createInstance(sdReferenceable);
} }
private List<Referenceable> getColumns(StorageDescriptor storageDesc) throws Exception { private List<Referenceable> getColumns(List<FieldSchema> schemaList) throws Exception
List<Referenceable> fieldsList = new ArrayList<>(); {
Referenceable colReferenceable; List<Referenceable> colList = new ArrayList<>();
for (FieldSchema fs : storageDesc.getCols()) { for (FieldSchema fs : schemaList) {
LOG.debug("Processing field " + fs); LOG.debug("Processing field " + fs);
colReferenceable = new Referenceable(HiveDataTypes.HIVE_COLUMN.getName()); Referenceable colReferenceable = new Referenceable(HiveDataTypes.HIVE_COLUMN.getName());
colReferenceable.set("name", fs.getName()); colReferenceable.set("name", fs.getName());
colReferenceable.set("type", fs.getType()); colReferenceable.set("type", fs.getType());
colReferenceable.set("comment", fs.getComment()); colReferenceable.set("comment", fs.getComment());
fieldsList.add(createInstance(colReferenceable)); colList.add(createInstance(colReferenceable));
} }
return fieldsList; return colList;
} }
public synchronized void registerHiveDataModel() throws Exception { public synchronized void registerHiveDataModel() throws Exception {
......
...@@ -246,7 +246,7 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo ...@@ -246,7 +246,7 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo
LOG.debug("Registering CTAS query: {}", queryStr); LOG.debug("Registering CTAS query: {}", queryStr);
Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName()); Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
processReferenceable.set("processName", operation.getOperationName()); processReferenceable.set("name", operation.getOperationName());
processReferenceable.set("startTime", queryStartTime); processReferenceable.set("startTime", queryStartTime);
processReferenceable.set("userName", user); processReferenceable.set("userName", user);
List<Referenceable> source = new ArrayList<>(); List<Referenceable> source = new ArrayList<>();
......
...@@ -367,7 +367,7 @@ public class HiveDataModelGenerator { ...@@ -367,7 +367,7 @@ public class HiveDataModelGenerator {
private void createTableClass() throws MetadataException { private void createTableClass() throws MetadataException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition("tableName", DataTypes.STRING_TYPE.getName(), new AttributeDefinition("name", DataTypes.STRING_TYPE.getName(),
Multiplicity.REQUIRED, false, null), Multiplicity.REQUIRED, false, null),
new AttributeDefinition("dbName", HiveDataTypes.HIVE_DB.getName(), new AttributeDefinition("dbName", HiveDataTypes.HIVE_DB.getName(),
Multiplicity.REQUIRED, false, null), Multiplicity.REQUIRED, false, null),
...@@ -384,9 +384,9 @@ public class HiveDataModelGenerator { ...@@ -384,9 +384,9 @@ public class HiveDataModelGenerator {
new AttributeDefinition("partitionKeys", new AttributeDefinition("partitionKeys",
DataTypes.arrayTypeName(HiveDataTypes.HIVE_COLUMN.getName()), DataTypes.arrayTypeName(HiveDataTypes.HIVE_COLUMN.getName()),
Multiplicity.OPTIONAL, false, null), Multiplicity.OPTIONAL, false, null),
// new AttributeDefinition("columns", new AttributeDefinition("columns",
// DataTypes.arrayTypeName(HiveDataTypes.HIVE_COLUMN.getName()), DataTypes.arrayTypeName(HiveDataTypes.HIVE_COLUMN.getName()),
// Multiplicity.COLLECTION, true, null), Multiplicity.OPTIONAL, true, null),
new AttributeDefinition("parameters", STRING_MAP_TYPE.getName(), new AttributeDefinition("parameters", STRING_MAP_TYPE.getName(),
Multiplicity.OPTIONAL, false, null), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("viewOriginalText", DataTypes.STRING_TYPE.getName(), new AttributeDefinition("viewOriginalText", DataTypes.STRING_TYPE.getName(),
...@@ -480,7 +480,7 @@ public class HiveDataModelGenerator { ...@@ -480,7 +480,7 @@ public class HiveDataModelGenerator {
private void createProcessClass() throws MetadataException { private void createProcessClass() throws MetadataException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition("processName", DataTypes.STRING_TYPE.getName(), new AttributeDefinition("name", DataTypes.STRING_TYPE.getName(),
Multiplicity.REQUIRED, false, null), Multiplicity.REQUIRED, false, null),
new AttributeDefinition("startTime", DataTypes.INT_TYPE.getName(), new AttributeDefinition("startTime", DataTypes.INT_TYPE.getName(),
Multiplicity.REQUIRED, false, null), Multiplicity.REQUIRED, false, null),
......
...@@ -7,7 +7,7 @@ Hive metadata can be modelled in DGI using its Type System. The default modellin ...@@ -7,7 +7,7 @@ Hive metadata can be modelled in DGI using its Type System. The default modellin
* hive_order(StructType) - [col, order] * hive_order(StructType) - [col, order]
* hive_resourceuri(StructType) - [resourceType, uri] * hive_resourceuri(StructType) - [resourceType, uri]
* hive_serde(StructType) - [name, serializationLib, parameters] * hive_serde(StructType) - [name, serializationLib, parameters]
* hive_process(ClassType) - [processName, startTime, endTime, userName, sourceTableNames, targetTableNames, queryText, queryPlan, queryId, queryGraph] * hive_process(ClassType) - [name, startTime, endTime, userName, sourceTableNames, targetTableNames, queryText, queryPlan, queryId, queryGraph]
* hive_function(ClassType) - [functionName, dbName, className, ownerName, ownerType, createTime, functionType, resourceUris] * hive_function(ClassType) - [functionName, dbName, className, ownerName, ownerType, createTime, functionType, resourceUris]
* hive_type(ClassType) - [name, type1, type2, fields] * hive_type(ClassType) - [name, type1, type2, fields]
* hive_partition(ClassType) - [values, dbName, tableName, createTime, lastAccessTime, sd, parameters] * hive_partition(ClassType) - [values, dbName, tableName, createTime, lastAccessTime, sd, parameters]
...@@ -16,7 +16,7 @@ Hive metadata can be modelled in DGI using its Type System. The default modellin ...@@ -16,7 +16,7 @@ Hive metadata can be modelled in DGI using its Type System. The default modellin
* hive_role(ClassType) - [roleName, createTime, ownerName] * hive_role(ClassType) - [roleName, createTime, ownerName]
* hive_column(ClassType) - [name, type, comment] * hive_column(ClassType) - [name, type, comment]
* hive_db(ClassType) - [name, description, locationUri, parameters, ownerName, ownerType] * hive_db(ClassType) - [name, description, locationUri, parameters, ownerName, ownerType]
* hive_table(ClassType) - [tableName, dbName, owner, createTime, lastAccessTime, retention, sd, partitionKeys, parameters, viewOriginalText, viewExpandedText, tableType, temporary] * hive_table(ClassType) - [name, dbName, owner, createTime, lastAccessTime, retention, sd, partitionKeys, columns, parameters, viewOriginalText, viewExpandedText, tableType, temporary]
---++ Importing Hive Metadata ---++ Importing Hive Metadata
......
...@@ -106,7 +106,7 @@ public class HiveHookIT { ...@@ -106,7 +106,7 @@ public class HiveHookIT {
} }
private void assertTableIsRegistered(String tableName) throws Exception { private void assertTableIsRegistered(String tableName) throws Exception {
assertInstanceIsRegistered(HiveDataTypes.HIVE_TABLE.getName(), "tableName", tableName); assertInstanceIsRegistered(HiveDataTypes.HIVE_TABLE.getName(), "name", tableName);
} }
private void assertDatabaseIsRegistered(String dbName) throws Exception { private void assertDatabaseIsRegistered(String dbName) throws Exception {
......
...@@ -236,7 +236,7 @@ public class SSLAndKerberosHiveHookIT extends BaseSSLAndKerberosTest { ...@@ -236,7 +236,7 @@ public class SSLAndKerberosHiveHookIT extends BaseSSLAndKerberosTest {
} }
private void assertTableIsRegistered(String tableName) throws Exception { private void assertTableIsRegistered(String tableName) throws Exception {
assertInstanceIsRegistered(HiveDataTypes.HIVE_TABLE.getName(), "tableName", tableName); assertInstanceIsRegistered(HiveDataTypes.HIVE_TABLE.getName(), "name", tableName);
} }
private void assertDatabaseIsRegistered(String dbName) throws Exception { private void assertDatabaseIsRegistered(String dbName) throws Exception {
......
...@@ -239,7 +239,7 @@ public class SSLHiveHookIT { ...@@ -239,7 +239,7 @@ public class SSLHiveHookIT {
} }
private void assertTableIsRegistered(String tableName) throws Exception { private void assertTableIsRegistered(String tableName) throws Exception {
assertInstanceIsRegistered(HiveDataTypes.HIVE_TABLE.getName(), "tableName", tableName); assertInstanceIsRegistered(HiveDataTypes.HIVE_TABLE.getName(), "name", tableName);
} }
private void assertDatabaseIsRegistered(String dbName) throws Exception { private void assertDatabaseIsRegistered(String dbName) throws Exception {
......
...@@ -100,17 +100,18 @@ public class HiveLineageService implements LineageService { ...@@ -100,17 +100,18 @@ public class HiveLineageService implements LineageService {
public String getOutputs(String tableName) throws DiscoveryException { public String getOutputs(String tableName) throws DiscoveryException {
LOG.info("Fetching lineage outputs for tableName={}", tableName); LOG.info("Fetching lineage outputs for tableName={}", tableName);
HiveWhereUsedQuery outputsQuery = new HiveWhereUsedQuery(
HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME,
Option.empty(), SELECT_ATTRIBUTES, true,
graphPersistenceStrategy, titanGraph);
Expressions.Expression expression = outputsQuery.expr();
LOG.debug("Expression is [" + expression.toString() +"]");
try { try {
HiveWhereUsedQuery outputsQuery = new HiveWhereUsedQuery(
HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME,
Option.empty(), SELECT_ATTRIBUTES, true,
graphPersistenceStrategy, titanGraph);
Expressions.Expression expression = outputsQuery.expr();
return discoveryService.evaluate(expression).toJson(); return discoveryService.evaluate(expression).toJson();
} catch (Exception e) { // unable to catch ExpressionException } catch (Exception e) { // unable to catch ExpressionException
throw new DiscoveryException("Invalid expression", e); throw new DiscoveryException("Invalid expression [" + expression.toString() + "]", e);
} }
} }
...@@ -124,17 +125,18 @@ public class HiveLineageService implements LineageService { ...@@ -124,17 +125,18 @@ public class HiveLineageService implements LineageService {
public String getInputs(String tableName) throws DiscoveryException { public String getInputs(String tableName) throws DiscoveryException {
LOG.info("Fetching lineage inputs for tableName={}", tableName); LOG.info("Fetching lineage inputs for tableName={}", tableName);
try { HiveLineageQuery inputsQuery = new HiveLineageQuery(
HiveLineageQuery inputsQuery = new HiveLineageQuery( HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME, HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME,
HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(), SELECT_ATTRIBUTES, true,
Option.empty(), SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
graphPersistenceStrategy, titanGraph);
Expressions.Expression expression = inputsQuery.expr(); Expressions.Expression expression = inputsQuery.expr();
LOG.debug("Expression is [" + expression.toString() +"]");
try {
return discoveryService.evaluate(expression).toJson(); return discoveryService.evaluate(expression).toJson();
} catch (Exception e) { // unable to catch ExpressionException } catch (Exception e) { // unable to catch ExpressionException
throw new DiscoveryException("Invalid expression", e); throw new DiscoveryException("Invalid expression [" + expression.toString() + "]", e);
} }
} }
...@@ -148,9 +150,10 @@ public class HiveLineageService implements LineageService { ...@@ -148,9 +150,10 @@ public class HiveLineageService implements LineageService {
public String getSchema(String tableName) throws DiscoveryException { public String getSchema(String tableName) throws DiscoveryException {
// todo - validate if indeed this is a table type and exists // todo - validate if indeed this is a table type and exists
String schemaQuery = HIVE_TABLE_TYPE_NAME String schemaQuery = HIVE_TABLE_TYPE_NAME
+ " where name=\"" + tableName + "\", " + " where name=\"" + tableName + "\""
+ HIVE_TABLE_COLUMNS_ATTRIBUTE_NAME; + ", " + HIVE_TABLE_COLUMNS_ATTRIBUTE_NAME
// + " as column select column.name, column.dataType, column.comment"; // + " as column select column.name, column.dataType, column.comment"
;
return discoveryService.searchByDSL(schemaQuery); return discoveryService.searchByDSL(schemaQuery);
} }
} }
...@@ -30,12 +30,13 @@ metadata.graph.index.search.elasticsearch.create.sleep=2000 ...@@ -30,12 +30,13 @@ metadata.graph.index.search.elasticsearch.create.sleep=2000
######### Hive Lineage Configs ######### ######### Hive Lineage Configs #########
# This models follows the quick-start guide # This models follows the quick-start guide
metadata.lineage.hive.table.type.name=Table metadata.lineage.hive.table.type.name=hive_table
metadata.lineage.hive.column.type.name=Column
metadata.lineage.hive.table.column.name=columns metadata.lineage.hive.table.column.name=columns
metadata.lineage.hive.process.type.name=LoadProcess metadata.lineage.hive.process.type.name=hive_process
metadata.lineage.hive.process.inputs.name=inputTables metadata.lineage.hive.process.inputs.name=inputTables
metadata.lineage.hive.process.outputs.name=outputTables metadata.lineage.hive.process.outputs.name=outputTables
#Currently unused
#metadata.lineage.hive.column.type.name=Column
######### Security Properties ######### ######### Security Properties #########
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment