Commit bde6316c by Venkatesh Seetharam

BUG-38478 Hive lineage must work across process types and address namespace

parent a94aa21a
...@@ -67,7 +67,7 @@ public class HiveMetaStoreBridge { ...@@ -67,7 +67,7 @@ public class HiveMetaStoreBridge {
/** /**
* Construct a HiveMetaStoreBridge. * Construct a HiveMetaStoreBridge.
* @param hiveConf * @param hiveConf hive conf
*/ */
public HiveMetaStoreBridge(HiveConf hiveConf) throws Exception { public HiveMetaStoreBridge(HiveConf hiveConf) throws Exception {
clusterName = hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); clusterName = hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
...@@ -149,7 +149,7 @@ public class HiveMetaStoreBridge { ...@@ -149,7 +149,7 @@ public class HiveMetaStoreBridge {
* Gets reference for the database * Gets reference for the database
* *
* *
* @param databaseName * @param databaseName database Name
* @param clusterName cluster name * @param clusterName cluster name
* @return Reference for database if exists, else null * @return Reference for database if exists, else null
* @throws Exception * @throws Exception
...@@ -183,7 +183,7 @@ public class HiveMetaStoreBridge { ...@@ -183,7 +183,7 @@ public class HiveMetaStoreBridge {
/** /**
* Gets reference for the table * Gets reference for the table
* *
* @param dbName * @param dbName database name
* @param tableName table name * @param tableName table name
* @return table reference if exists, else null * @return table reference if exists, else null
* @throws Exception * @throws Exception
...@@ -222,16 +222,18 @@ public class HiveMetaStoreBridge { ...@@ -222,16 +222,18 @@ public class HiveMetaStoreBridge {
LOG.debug("Getting reference for partition for {}.{} with values {}", dbName, tableName, valuesStr); LOG.debug("Getting reference for partition for {}.{} with values {}", dbName, tableName, valuesStr);
String typeName = HiveDataTypes.HIVE_PARTITION.getName(); String typeName = HiveDataTypes.HIVE_PARTITION.getName();
//todo replace gremlin with DSL
// String dslQuery = String.format("%s as p where values = %s, tableName where name = '%s', " // String dslQuery = String.format("%s as p where values = %s, tableName where name = '%s', "
// + "dbName where name = '%s' and clusterName = '%s' select p", typeName, valuesStr, tableName, // + "dbName where name = '%s' and clusterName = '%s' select p", typeName, valuesStr, tableName,
// dbName, clusterName); // dbName, clusterName);
String dbType = HiveDataTypes.HIVE_DB.getName(); String dbType = HiveDataTypes.HIVE_DB.getName();
String tableType = HiveDataTypes.HIVE_TABLE.getName(); String tableType = HiveDataTypes.HIVE_TABLE.getName();
String datasetType = MetadataServiceClient.DATA_SET_SUPER_TYPE;
String gremlinQuery = String.format("g.V.has('__typeName', '%s').has('%s.values', %s).as('p')." String gremlinQuery = String.format("g.V.has('__typeName', '%s').has('%s.values', %s).as('p')."
+ "out('__%s.tableName').has('%s.name', '%s').out('__%s.dbName').has('%s.name', '%s')" + "out('__%s.tableName').has('%s.name', '%s').out('__%s.dbName').has('%s.name', '%s')"
+ ".has('%s.clusterName', '%s').back('p').toList()", typeName, typeName, valuesStr, typeName, + ".has('%s.clusterName', '%s').back('p').toList()", typeName, typeName, valuesStr, typeName,
tableType, tableName.toLowerCase(), tableType, dbType, dbName.toLowerCase(), dbType, clusterName); datasetType, tableName.toLowerCase(), tableType, dbType, dbName.toLowerCase(), dbType, clusterName);
return getEntityReferenceFromGremlin(typeName, gremlinQuery); return getEntityReferenceFromGremlin(typeName, gremlinQuery);
} }
...@@ -283,7 +285,7 @@ public class HiveMetaStoreBridge { ...@@ -283,7 +285,7 @@ public class HiveMetaStoreBridge {
tableRef.set("sd", sdReferenceable); tableRef.set("sd", sdReferenceable);
// add reference to the Partition Keys // add reference to the Partition Keys
List<Referenceable> partKeys = getColumns(hiveTable.getPartitionKeys());; List<Referenceable> partKeys = getColumns(hiveTable.getPartitionKeys());
tableRef.set("partitionKeys", partKeys); tableRef.set("partitionKeys", partKeys);
tableRef.set("parameters", hiveTable.getParameters()); tableRef.set("parameters", hiveTable.getParameters());
......
...@@ -293,6 +293,7 @@ public class HiveHook implements ExecuteWithHookContext { ...@@ -293,6 +293,7 @@ public class HiveHook implements ExecuteWithHookContext {
processReferenceable.set("name", event.operation.getOperationName()); processReferenceable.set("name", event.operation.getOperationName());
processReferenceable.set("startTime", queryStartTime); processReferenceable.set("startTime", queryStartTime);
processReferenceable.set("userName", event.user); processReferenceable.set("userName", event.user);
List<Referenceable> source = new ArrayList<>(); List<Referenceable> source = new ArrayList<>();
for (ReadEntity readEntity : inputs) { for (ReadEntity readEntity : inputs) {
if (readEntity.getType() == Entity.Type.TABLE) { if (readEntity.getType() == Entity.Type.TABLE) {
...@@ -304,7 +305,8 @@ public class HiveHook implements ExecuteWithHookContext { ...@@ -304,7 +305,8 @@ public class HiveHook implements ExecuteWithHookContext {
dgiBridge.registerPartition(readEntity.getPartition()); dgiBridge.registerPartition(readEntity.getPartition());
} }
} }
processReferenceable.set("inputTables", source); processReferenceable.set("inputs", source);
List<Referenceable> target = new ArrayList<>(); List<Referenceable> target = new ArrayList<>();
for (WriteEntity writeEntity : outputs) { for (WriteEntity writeEntity : outputs) {
if (writeEntity.getType() == Entity.Type.TABLE || writeEntity.getType() == Entity.Type.PARTITION) { if (writeEntity.getType() == Entity.Type.TABLE || writeEntity.getType() == Entity.Type.PARTITION) {
...@@ -316,7 +318,7 @@ public class HiveHook implements ExecuteWithHookContext { ...@@ -316,7 +318,7 @@ public class HiveHook implements ExecuteWithHookContext {
dgiBridge.registerPartition(writeEntity.getPartition()); dgiBridge.registerPartition(writeEntity.getPartition());
} }
} }
processReferenceable.set("outputTables", target); processReferenceable.set("outputs", target);
processReferenceable.set("queryText", queryStr); processReferenceable.set("queryText", queryStr);
processReferenceable.set("queryId", queryId); processReferenceable.set("queryId", queryId);
processReferenceable.set("queryPlan", event.jsonPlan.toString()); processReferenceable.set("queryPlan", event.jsonPlan.toString());
......
...@@ -20,6 +20,7 @@ package org.apache.hadoop.metadata.hive.model; ...@@ -20,6 +20,7 @@ package org.apache.hadoop.metadata.hive.model;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.hadoop.metadata.MetadataException; import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.MetadataServiceClient;
import org.apache.hadoop.metadata.typesystem.TypesDef; import org.apache.hadoop.metadata.typesystem.TypesDef;
import org.apache.hadoop.metadata.typesystem.json.TypesSerialization; import org.apache.hadoop.metadata.typesystem.json.TypesSerialization;
import org.apache.hadoop.metadata.typesystem.types.AttributeDefinition; import org.apache.hadoop.metadata.typesystem.types.AttributeDefinition;
...@@ -371,8 +372,6 @@ public class HiveDataModelGenerator { ...@@ -371,8 +372,6 @@ public class HiveDataModelGenerator {
private void createTableClass() throws MetadataException { private void createTableClass() throws MetadataException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition("name", DataTypes.STRING_TYPE.getName(),
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("dbName", HiveDataTypes.HIVE_DB.getName(), new AttributeDefinition("dbName", HiveDataTypes.HIVE_DB.getName(),
Multiplicity.REQUIRED, false, null), Multiplicity.REQUIRED, false, null),
new AttributeDefinition("owner", DataTypes.STRING_TYPE.getName(), new AttributeDefinition("owner", DataTypes.STRING_TYPE.getName(),
...@@ -406,7 +405,7 @@ public class HiveDataModelGenerator { ...@@ -406,7 +405,7 @@ public class HiveDataModelGenerator {
}; };
HierarchicalTypeDefinition<ClassType> definition = HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_TABLE.getName(), new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_TABLE.getName(),
null, attributeDefinitions); ImmutableList.of("DataSet"), attributeDefinitions);
classTypeDefinitions.put(HiveDataTypes.HIVE_TABLE.getName(), definition); classTypeDefinitions.put(HiveDataTypes.HIVE_TABLE.getName(), definition);
LOG.debug("Created definition for " + HiveDataTypes.HIVE_TABLE.getName()); LOG.debug("Created definition for " + HiveDataTypes.HIVE_TABLE.getName());
} }
...@@ -437,7 +436,7 @@ public class HiveDataModelGenerator { ...@@ -437,7 +436,7 @@ public class HiveDataModelGenerator {
HierarchicalTypeDefinition<ClassType> definition = HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_INDEX.getName(), new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_INDEX.getName(),
null, attributeDefinitions); ImmutableList.of(MetadataServiceClient.DATA_SET_SUPER_TYPE), attributeDefinitions);
classTypeDefinitions.put(HiveDataTypes.HIVE_INDEX.getName(), definition); classTypeDefinitions.put(HiveDataTypes.HIVE_INDEX.getName(), definition);
LOG.debug("Created definition for " + HiveDataTypes.HIVE_INDEX.getName()); LOG.debug("Created definition for " + HiveDataTypes.HIVE_INDEX.getName());
} }
...@@ -487,20 +486,12 @@ public class HiveDataModelGenerator { ...@@ -487,20 +486,12 @@ public class HiveDataModelGenerator {
private void createProcessClass() throws MetadataException { private void createProcessClass() throws MetadataException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition("name", DataTypes.STRING_TYPE.getName(),
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("startTime", DataTypes.INT_TYPE.getName(), new AttributeDefinition("startTime", DataTypes.INT_TYPE.getName(),
Multiplicity.REQUIRED, false, null), Multiplicity.REQUIRED, false, null),
new AttributeDefinition("endTime", DataTypes.INT_TYPE.getName(), new AttributeDefinition("endTime", DataTypes.INT_TYPE.getName(),
Multiplicity.REQUIRED, false, null), Multiplicity.REQUIRED, false, null),
new AttributeDefinition("userName", DataTypes.STRING_TYPE.getName(), new AttributeDefinition("userName", DataTypes.STRING_TYPE.getName(),
Multiplicity.REQUIRED, false, null), Multiplicity.REQUIRED, false, null),
new AttributeDefinition("inputTables",
DataTypes.arrayTypeName(HiveDataTypes.HIVE_TABLE.getName()),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("outputTables",
DataTypes.arrayTypeName(HiveDataTypes.HIVE_TABLE.getName()),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("queryText", DataTypes.STRING_TYPE.getName(), new AttributeDefinition("queryText", DataTypes.STRING_TYPE.getName(),
Multiplicity.REQUIRED, false, null), Multiplicity.REQUIRED, false, null),
new AttributeDefinition("queryPlan", DataTypes.STRING_TYPE.getName(), new AttributeDefinition("queryPlan", DataTypes.STRING_TYPE.getName(),
...@@ -512,7 +503,8 @@ public class HiveDataModelGenerator { ...@@ -512,7 +503,8 @@ public class HiveDataModelGenerator {
}; };
HierarchicalTypeDefinition<ClassType> definition = new HierarchicalTypeDefinition<>( HierarchicalTypeDefinition<ClassType> definition = new HierarchicalTypeDefinition<>(
ClassType.class, HiveDataTypes.HIVE_PROCESS.getName(), null, attributeDefinitions); ClassType.class, HiveDataTypes.HIVE_PROCESS.getName(),
ImmutableList.of(MetadataServiceClient.PROCESS_SUPER_TYPE), attributeDefinitions);
classTypeDefinitions.put(HiveDataTypes.HIVE_PROCESS.getName(), definition); classTypeDefinitions.put(HiveDataTypes.HIVE_PROCESS.getName(), definition);
LOG.debug("Created definition for " + HiveDataTypes.HIVE_PROCESS.getName()); LOG.debug("Created definition for " + HiveDataTypes.HIVE_PROCESS.getName());
} }
......
...@@ -21,7 +21,6 @@ package org.apache.hadoop.metadata.hive.hook; ...@@ -21,7 +21,6 @@ package org.apache.hadoop.metadata.hive.hook;
import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.metadata.MetadataServiceClient; import org.apache.hadoop.metadata.MetadataServiceClient;
...@@ -30,7 +29,6 @@ import org.apache.hadoop.metadata.hive.model.HiveDataModelGenerator; ...@@ -30,7 +29,6 @@ import org.apache.hadoop.metadata.hive.model.HiveDataModelGenerator;
import org.apache.hadoop.metadata.hive.model.HiveDataTypes; import org.apache.hadoop.metadata.hive.model.HiveDataTypes;
import org.apache.hadoop.metadata.typesystem.Referenceable; import org.apache.hadoop.metadata.typesystem.Referenceable;
import org.apache.hadoop.metadata.typesystem.persistence.Id; import org.apache.hadoop.metadata.typesystem.persistence.Id;
import org.apache.log4j.spi.LoggerFactory;
import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -301,12 +299,14 @@ public class HiveHookIT { ...@@ -301,12 +299,14 @@ public class HiveHookIT {
String typeName = HiveDataTypes.HIVE_PARTITION.getName(); String typeName = HiveDataTypes.HIVE_PARTITION.getName();
String dbType = HiveDataTypes.HIVE_DB.getName(); String dbType = HiveDataTypes.HIVE_DB.getName();
String tableType = HiveDataTypes.HIVE_TABLE.getName(); String tableType = HiveDataTypes.HIVE_TABLE.getName();
String datasetType = MetadataServiceClient.DATA_SET_SUPER_TYPE;
LOG.debug("Searching for partition of {}.{} with values {}", dbName, tableName, value); LOG.debug("Searching for partition of {}.{} with values {}", dbName, tableName, value);
//todo replace with DSL
String gremlinQuery = String.format("g.V.has('__typeName', '%s').has('%s.values', ['%s']).as('p')." String gremlinQuery = String.format("g.V.has('__typeName', '%s').has('%s.values', ['%s']).as('p')."
+ "out('__%s.tableName').has('%s.name', '%s').out('__%s.dbName').has('%s.name', '%s')" + "out('__%s.tableName').has('%s.name', '%s').out('__%s.dbName').has('%s.name', '%s')"
+ ".has('%s.clusterName', '%s').back('p').toList()", typeName, typeName, value, typeName, + ".has('%s.clusterName', '%s').back('p').toList()", typeName, typeName, value, typeName,
tableType, tableName.toLowerCase(), tableType, dbType, dbName.toLowerCase(), dbType, CLUSTER_NAME); datasetType, tableName.toLowerCase(), tableType, dbType, dbName.toLowerCase(), dbType, CLUSTER_NAME);
JSONObject response = dgiCLient.searchByGremlin(gremlinQuery); JSONObject response = dgiCLient.searchByGremlin(gremlinQuery);
JSONArray results = response.getJSONArray(MetadataServiceClient.RESULTS); JSONArray results = response.getJSONArray(MetadataServiceClient.RESULTS);
Assert.assertEquals(results.length(), 1); Assert.assertEquals(results.length(), 1);
......
...@@ -26,11 +26,14 @@ metadata.graph.index.search.directory=target/data/lucene ...@@ -26,11 +26,14 @@ metadata.graph.index.search.directory=target/data/lucene
######### Hive Lineage Configs ######### ######### Hive Lineage Configs #########
metadata.lineage.hive.table.type.name=hive_table # This models reflects the base super types for Data and Process
metadata.lineage.hive.table.column.name=columns #metadata.lineage.hive.table.type.name=DataSet
metadata.lineage.hive.process.type.name=hive_process #metadata.lineage.hive.process.type.name=Process
metadata.lineage.hive.process.inputs.name=inputTables #metadata.lineage.hive.process.inputs.name=inputs
metadata.lineage.hive.process.outputs.name=outputTables #metadata.lineage.hive.process.outputs.name=outputs
## Schema
#metadata.lineage.hive.table.schema.query=hive_table where name=?, columns
######### Security Properties ######### ######### Security Properties #########
......
...@@ -72,6 +72,10 @@ public class MetadataServiceClient { ...@@ -72,6 +72,10 @@ public class MetadataServiceClient {
public static final String ATTRIBUTE_VALUE = "value"; public static final String ATTRIBUTE_VALUE = "value";
public static final String INFRASTRUCTURE_SUPER_TYPE = "Infrastructure";
public static final String DATA_SET_SUPER_TYPE = "DataSet";
public static final String PROCESS_SUPER_TYPE = "Process";
private WebResource service; private WebResource service;
public MetadataServiceClient(String baseUrl) { public MetadataServiceClient(String baseUrl) {
...@@ -100,7 +104,7 @@ public class MetadataServiceClient { ...@@ -100,7 +104,7 @@ public class MetadataServiceClient {
return PropertiesUtil.getClientProperties(); return PropertiesUtil.getClientProperties();
} }
static enum API { enum API {
//Type operations //Type operations
CREATE_TYPE(BASE_URI + TYPES, HttpMethod.POST), CREATE_TYPE(BASE_URI + TYPES, HttpMethod.POST),
......
...@@ -38,15 +38,15 @@ metadata.graph.index.search.elasticsearch.create.sleep=2000 ...@@ -38,15 +38,15 @@ metadata.graph.index.search.elasticsearch.create.sleep=2000
The higher layer services like hive lineage, schema, etc. are driven by the type system and this The higher layer services like hive lineage, schema, etc. are driven by the type system and this
section encodes the specific types for the hive data model. section encodes the specific types for the hive data model.
# This models follows the quick-start guide # This models reflects the base super types for Data and Process
<verbatim> <verbatim>
metadata.lineage.hive.table.type.name=hive_table metadata.lineage.hive.table.type.name=DataSet
metadata.lineage.hive.table.column.name=columns metadata.lineage.hive.process.type.name=Process
metadata.lineage.hive.process.type.name=hive_process metadata.lineage.hive.process.inputs.name=inputs
metadata.lineage.hive.process.inputs.name=inputTables metadata.lineage.hive.process.outputs.name=outputs
metadata.lineage.hive.process.outputs.name=outputTables
#Currently unused ## Schema
#metadata.lineage.hive.column.type.name=Column metadata.lineage.hive.table.schema.query=hive_table where name=?, columns
</verbatim> </verbatim>
---+++ Security Properties ---+++ Security Properties
......
...@@ -51,26 +51,29 @@ public class HiveLineageService implements LineageService { ...@@ -51,26 +51,29 @@ public class HiveLineageService implements LineageService {
Some.<List<String>>apply(List.<String>fromArray(new String[]{"name"})); Some.<List<String>>apply(List.<String>fromArray(new String[]{"name"}));
private static final String HIVE_TABLE_TYPE_NAME; private static final String HIVE_TABLE_TYPE_NAME;
private static final String HIVE_TABLE_COLUMNS_ATTRIBUTE_NAME;
private static final String HIVE_PROCESS_TYPE_NAME; private static final String HIVE_PROCESS_TYPE_NAME;
private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME; private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME;
private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME; private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME;
private static final String HIVE_TABLE_SCHEMA_QUERY;
static { static {
// todo - externalize this using type system - dog food // todo - externalize this using type system - dog food
try { try {
PropertiesConfiguration conf = PropertiesUtil.getApplicationProperties(); PropertiesConfiguration conf = PropertiesUtil.getApplicationProperties();
HIVE_TABLE_TYPE_NAME = HIVE_TABLE_TYPE_NAME =
conf.getString("metadata.lineage.hive.table.type.name", "DataSet");
conf.getString("metadata.lineage.hive.table.type.name", "hive_table"); conf.getString("metadata.lineage.hive.table.type.name", "hive_table");
HIVE_TABLE_COLUMNS_ATTRIBUTE_NAME =
conf.getString("metadata.lineage.hive.table.column.name", "columns");
HIVE_PROCESS_TYPE_NAME = HIVE_PROCESS_TYPE_NAME =
conf.getString("metadata.lineage.hive.process.type.name", "hive_process"); conf.getString("metadata.lineage.hive.process.type.name", "Process");
HIVE_PROCESS_INPUT_ATTRIBUTE_NAME = HIVE_PROCESS_INPUT_ATTRIBUTE_NAME =
conf.getString("metadata.lineage.hive.process.inputs.name", "inputTables"); conf.getString("metadata.lineage.hive.process.inputs.name", "inputs");
HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME = HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME =
conf.getString("metadata.lineage.hive.process.outputs.name", "outputTables"); conf.getString("metadata.lineage.hive.process.outputs.name", "outputs");
HIVE_TABLE_SCHEMA_QUERY = conf.getString(
"metadata.lineage.hive.table.schema.query",
"hive_table where name=\"?\", columns");
} catch (MetadataException e) { } catch (MetadataException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
...@@ -190,11 +193,7 @@ public class HiveLineageService implements LineageService { ...@@ -190,11 +193,7 @@ public class HiveLineageService implements LineageService {
@GraphTransaction @GraphTransaction
public String getSchema(String tableName) throws DiscoveryException { public String getSchema(String tableName) throws DiscoveryException {
// todo - validate if indeed this is a table type and exists // todo - validate if indeed this is a table type and exists
String schemaQuery = HIVE_TABLE_TYPE_NAME String schemaQuery = HIVE_TABLE_SCHEMA_QUERY.replace("?", tableName);
+ " where name=\"" + tableName + "\""
+ ", " + HIVE_TABLE_COLUMNS_ATTRIBUTE_NAME
// + " as column select column.name, column.dataType, column.comment"
;
return discoveryService.searchByDSL(schemaQuery); return discoveryService.searchByDSL(schemaQuery);
} }
} }
...@@ -103,26 +103,38 @@ public class DefaultMetadataService implements MetadataService { ...@@ -103,26 +103,38 @@ public class DefaultMetadataService implements MetadataService {
private static final AttributeDefinition NAME_ATTRIBUTE = private static final AttributeDefinition NAME_ATTRIBUTE =
TypesUtil.createRequiredAttrDef("name", DataTypes.STRING_TYPE); TypesUtil.createRequiredAttrDef("name", DataTypes.STRING_TYPE);
private static final AttributeDefinition DESCRIPTION_ATTRIBUTE = private static final AttributeDefinition DESCRIPTION_ATTRIBUTE =
TypesUtil.createRequiredAttrDef("description", DataTypes.STRING_TYPE); TypesUtil.createOptionalAttrDef("description", DataTypes.STRING_TYPE);
private static final String[] SUPER_TYPES = {
"DataSet",
"Process",
"Infrastructure",
};
@InterfaceAudience.Private @InterfaceAudience.Private
public void createSuperTypes() throws MetadataException { public void createSuperTypes() throws MetadataException {
if (typeSystem.isRegistered(SUPER_TYPES[0])) { if (typeSystem.isRegistered(MetadataServiceClient.DATA_SET_SUPER_TYPE)) {
return; // this is already registered return; // this is already registered
} }
for (String superTypeName : SUPER_TYPES) {
HierarchicalTypeDefinition<ClassType> superTypeDefinition = HierarchicalTypeDefinition<ClassType> superTypeDefinition =
TypesUtil.createClassTypeDef(superTypeName, TypesUtil.createClassTypeDef(MetadataServiceClient.INFRASTRUCTURE_SUPER_TYPE,
ImmutableList.<String>of(), ImmutableList.<String>of(),
NAME_ATTRIBUTE, DESCRIPTION_ATTRIBUTE); NAME_ATTRIBUTE, DESCRIPTION_ATTRIBUTE);
typeSystem.defineClassType(superTypeDefinition); typeSystem.defineClassType(superTypeDefinition);
}
superTypeDefinition =
TypesUtil.createClassTypeDef(MetadataServiceClient.DATA_SET_SUPER_TYPE,
ImmutableList.<String>of(),
NAME_ATTRIBUTE, DESCRIPTION_ATTRIBUTE);
typeSystem.defineClassType(superTypeDefinition);
superTypeDefinition =
TypesUtil.createClassTypeDef(MetadataServiceClient.PROCESS_SUPER_TYPE,
ImmutableList.<String>of(),
NAME_ATTRIBUTE, DESCRIPTION_ATTRIBUTE,
new AttributeDefinition("inputs",
DataTypes.arrayTypeName(MetadataServiceClient.DATA_SET_SUPER_TYPE),
new Multiplicity(0, Integer.MAX_VALUE, false), false, null),
new AttributeDefinition("outputs",
DataTypes.arrayTypeName(MetadataServiceClient.DATA_SET_SUPER_TYPE),
new Multiplicity(0, Integer.MAX_VALUE, false), false, null)
);
typeSystem.defineClassType(superTypeDefinition);
} }
/** /**
......
...@@ -37,7 +37,6 @@ import org.apache.hadoop.metadata.typesystem.types.IDataType; ...@@ -37,7 +37,6 @@ import org.apache.hadoop.metadata.typesystem.types.IDataType;
import org.apache.hadoop.metadata.typesystem.types.Multiplicity; import org.apache.hadoop.metadata.typesystem.types.Multiplicity;
import org.apache.hadoop.metadata.typesystem.types.StructTypeDefinition; import org.apache.hadoop.metadata.typesystem.types.StructTypeDefinition;
import org.apache.hadoop.metadata.typesystem.types.TraitType; import org.apache.hadoop.metadata.typesystem.types.TraitType;
import org.apache.hadoop.metadata.typesystem.types.TypeSystem;
import org.apache.hadoop.metadata.typesystem.types.TypeUtils; import org.apache.hadoop.metadata.typesystem.types.TypeUtils;
import org.apache.hadoop.metadata.typesystem.types.utils.TypesUtil; import org.apache.hadoop.metadata.typesystem.types.utils.TypesUtil;
import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONArray;
...@@ -298,12 +297,6 @@ public class HiveLineageServiceTest { ...@@ -298,12 +297,6 @@ public class HiveLineageServiceTest {
attrDef("userName", DataTypes.STRING_TYPE), attrDef("userName", DataTypes.STRING_TYPE),
attrDef("startTime", DataTypes.INT_TYPE), attrDef("startTime", DataTypes.INT_TYPE),
attrDef("endTime", DataTypes.INT_TYPE), attrDef("endTime", DataTypes.INT_TYPE),
new AttributeDefinition("inputTables",
DataTypes.arrayTypeName(HIVE_TABLE_TYPE),
Multiplicity.COLLECTION, false, null),
new AttributeDefinition("outputTables",
DataTypes.arrayTypeName(HIVE_TABLE_TYPE),
Multiplicity.COLLECTION, false, null),
attrDef("queryText", DataTypes.STRING_TYPE, Multiplicity.REQUIRED), attrDef("queryText", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryPlan", DataTypes.STRING_TYPE, Multiplicity.REQUIRED), attrDef("queryPlan", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryId", DataTypes.STRING_TYPE, Multiplicity.REQUIRED), attrDef("queryId", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
...@@ -504,8 +497,8 @@ public class HiveLineageServiceTest { ...@@ -504,8 +497,8 @@ public class HiveLineageServiceTest {
referenceable.set("startTime", System.currentTimeMillis()); referenceable.set("startTime", System.currentTimeMillis());
referenceable.set("endTime", System.currentTimeMillis() + 10000); referenceable.set("endTime", System.currentTimeMillis() + 10000);
referenceable.set("inputTables", inputTables); referenceable.set("inputs", inputTables);
referenceable.set("outputTables", outputTables); referenceable.set("outputs", outputTables);
referenceable.set("queryText", queryText); referenceable.set("queryText", queryText);
referenceable.set("queryPlan", queryPlan); referenceable.set("queryPlan", queryPlan);
......
...@@ -30,11 +30,13 @@ metadata.graph.index.search.elasticsearch.local-mode=true ...@@ -30,11 +30,13 @@ metadata.graph.index.search.elasticsearch.local-mode=true
######### Hive Lineage Configs ######### ######### Hive Lineage Configs #########
metadata.lineage.hive.table.type.name=hive_table #metadata.lineage.hive.table.type.name=DataSet
metadata.lineage.hive.table.column.name=columns #metadata.lineage.hive.process.type.name=Process
metadata.lineage.hive.process.type.name=hive_process #metadata.lineage.hive.process.inputs.name=inputs
metadata.lineage.hive.process.inputs.name=inputTables #metadata.lineage.hive.process.outputs.name=outputs
metadata.lineage.hive.process.outputs.name=outputTables
## Schema
#metadata.lineage.hive.table.schema.query=hive_table where name=?, columns
######### Security Properties ######### ######### Security Properties #########
......
...@@ -29,11 +29,14 @@ metadata.graph.index.search.elasticsearch.local-mode=true ...@@ -29,11 +29,14 @@ metadata.graph.index.search.elasticsearch.local-mode=true
metadata.graph.index.search.elasticsearch.create.sleep=2000 metadata.graph.index.search.elasticsearch.create.sleep=2000
######### Hive Lineage Configs ######### ######### Hive Lineage Configs #########
metadata.lineage.hive.table.type.name=hive_table # This models reflects the base super types for Data and Process
metadata.lineage.hive.table.column.name=columns #metadata.lineage.hive.table.type.name=DataSet
metadata.lineage.hive.process.type.name=hive_process #metadata.lineage.hive.process.type.name=Process
metadata.lineage.hive.process.inputs.name=inputTables #metadata.lineage.hive.process.inputs.name=inputs
metadata.lineage.hive.process.outputs.name=outputTables #metadata.lineage.hive.process.outputs.name=outputs
## Schema
#metadata.lineage.hive.table.schema.query=hive_table where name=?, columns
######### Security Properties ######### ######### Security Properties #########
......
...@@ -151,12 +151,6 @@ public class QuickStart { ...@@ -151,12 +151,6 @@ public class QuickStart {
attrDef("userName", DataTypes.STRING_TYPE), attrDef("userName", DataTypes.STRING_TYPE),
attrDef("startTime", DataTypes.INT_TYPE), attrDef("startTime", DataTypes.INT_TYPE),
attrDef("endTime", DataTypes.INT_TYPE), attrDef("endTime", DataTypes.INT_TYPE),
new AttributeDefinition("inputTables",
DataTypes.arrayTypeName(TABLE_TYPE),
Multiplicity.COLLECTION, false, null),
new AttributeDefinition("outputTables",
DataTypes.arrayTypeName(TABLE_TYPE),
Multiplicity.COLLECTION, false, null),
attrDef("queryText", DataTypes.STRING_TYPE, Multiplicity.REQUIRED), attrDef("queryText", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryPlan", DataTypes.STRING_TYPE, Multiplicity.REQUIRED), attrDef("queryPlan", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryId", DataTypes.STRING_TYPE, Multiplicity.REQUIRED), attrDef("queryId", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
...@@ -366,15 +360,16 @@ public class QuickStart { ...@@ -366,15 +360,16 @@ public class QuickStart {
String queryId, String queryGraph, String queryId, String queryGraph,
String... traitNames) throws Exception { String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(LOAD_PROCESS_TYPE, traitNames); Referenceable referenceable = new Referenceable(LOAD_PROCESS_TYPE, traitNames);
// super type attributes
referenceable.set("name", name); referenceable.set("name", name);
referenceable.set("description", description); referenceable.set("description", description);
referenceable.set("inputs", inputTables);
referenceable.set("outputs", outputTables);
referenceable.set("user", user); referenceable.set("user", user);
referenceable.set("startTime", System.currentTimeMillis()); referenceable.set("startTime", System.currentTimeMillis());
referenceable.set("endTime", System.currentTimeMillis() + 10000); referenceable.set("endTime", System.currentTimeMillis() + 10000);
referenceable.set("inputTables", inputTables);
referenceable.set("outputTables", outputTables);
referenceable.set("queryText", queryText); referenceable.set("queryText", queryText);
referenceable.set("queryPlan", queryPlan); referenceable.set("queryPlan", queryPlan);
referenceable.set("queryId", queryId); referenceable.set("queryId", queryId);
......
...@@ -29,11 +29,14 @@ metadata.graph.index.search.elasticsearch.local-mode=true ...@@ -29,11 +29,14 @@ metadata.graph.index.search.elasticsearch.local-mode=true
metadata.graph.index.search.elasticsearch.create.sleep=2000 metadata.graph.index.search.elasticsearch.create.sleep=2000
######### Hive Lineage Configs ######### ######### Hive Lineage Configs #########
metadata.lineage.hive.table.type.name=hive_table # This models reflects the base super types for Data and Process
metadata.lineage.hive.table.column.name=columns #metadata.lineage.hive.table.type.name=DataSet
metadata.lineage.hive.process.type.name=hive_process #metadata.lineage.hive.process.type.name=Process
metadata.lineage.hive.process.inputs.name=inputTables #metadata.lineage.hive.process.inputs.name=inputs
metadata.lineage.hive.process.outputs.name=outputTables #metadata.lineage.hive.process.outputs.name=outputs
## Schema
#metadata.lineage.hive.table.schema.query=hive_table where name=?, columns
######### Security Properties ######### ######### Security Properties #########
......
...@@ -256,9 +256,7 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT { ...@@ -256,9 +256,7 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT {
); );
HierarchicalTypeDefinition<ClassType> tblClsDef = HierarchicalTypeDefinition<ClassType> tblClsDef =
TypesUtil.createClassTypeDef(HIVE_TABLE_TYPE, null, TypesUtil.createClassTypeDef(HIVE_TABLE_TYPE, ImmutableList.of("DataSet"),
attrDef("name", DataTypes.STRING_TYPE),
attrDef("description", DataTypes.STRING_TYPE),
attrDef("owner", DataTypes.STRING_TYPE), attrDef("owner", DataTypes.STRING_TYPE),
attrDef("createTime", DataTypes.INT_TYPE), attrDef("createTime", DataTypes.INT_TYPE),
attrDef("lastAccessTime", DataTypes.INT_TYPE), attrDef("lastAccessTime", DataTypes.INT_TYPE),
...@@ -272,17 +270,10 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT { ...@@ -272,17 +270,10 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT {
); );
HierarchicalTypeDefinition<ClassType> loadProcessClsDef = HierarchicalTypeDefinition<ClassType> loadProcessClsDef =
TypesUtil.createClassTypeDef(HIVE_PROCESS_TYPE, null, TypesUtil.createClassTypeDef(HIVE_PROCESS_TYPE, ImmutableList.of("Process"),
attrDef("name", DataTypes.STRING_TYPE),
attrDef("userName", DataTypes.STRING_TYPE), attrDef("userName", DataTypes.STRING_TYPE),
attrDef("startTime", DataTypes.INT_TYPE), attrDef("startTime", DataTypes.INT_TYPE),
attrDef("endTime", DataTypes.INT_TYPE), attrDef("endTime", DataTypes.INT_TYPE),
new AttributeDefinition("inputTables",
DataTypes.arrayTypeName(HIVE_TABLE_TYPE),
Multiplicity.COLLECTION, false, null),
new AttributeDefinition("outputTables",
DataTypes.arrayTypeName(HIVE_TABLE_TYPE),
Multiplicity.COLLECTION, false, null),
attrDef("queryText", DataTypes.STRING_TYPE, Multiplicity.REQUIRED), attrDef("queryText", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryPlan", DataTypes.STRING_TYPE, Multiplicity.REQUIRED), attrDef("queryPlan", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryId", DataTypes.STRING_TYPE, Multiplicity.REQUIRED), attrDef("queryId", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
...@@ -427,8 +418,8 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT { ...@@ -427,8 +418,8 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT {
referenceable.set("startTime", System.currentTimeMillis()); referenceable.set("startTime", System.currentTimeMillis());
referenceable.set("endTime", System.currentTimeMillis() + 10000); referenceable.set("endTime", System.currentTimeMillis() + 10000);
referenceable.set("inputTables", inputTables); referenceable.set("inputs", inputTables);
referenceable.set("outputTables", outputTables); referenceable.set("outputs", outputTables);
referenceable.set("queryText", queryText); referenceable.set("queryText", queryText);
referenceable.set("queryPlan", queryPlan); referenceable.set("queryPlan", queryPlan);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment