Commit ed6b9bf4 by Suma Shivaprasad

Merge branch 'master' of https://github.com/hortonworks/metadata into BUG_37105

parents 8305460a 9c96922e
......@@ -67,7 +67,7 @@ public class HiveMetaStoreBridge {
/**
* Construct a HiveMetaStoreBridge.
* @param hiveConf
* @param hiveConf hive conf
*/
public HiveMetaStoreBridge(HiveConf hiveConf) throws Exception {
clusterName = hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
......@@ -149,7 +149,7 @@ public class HiveMetaStoreBridge {
* Gets reference for the database
*
*
* @param databaseName
* @param databaseName database Name
* @param clusterName cluster name
* @return Reference for database if exists, else null
* @throws Exception
......@@ -183,7 +183,7 @@ public class HiveMetaStoreBridge {
/**
* Gets reference for the table
*
* @param dbName
* @param dbName database name
* @param tableName table name
* @return table reference if exists, else null
* @throws Exception
......@@ -222,16 +222,18 @@ public class HiveMetaStoreBridge {
LOG.debug("Getting reference for partition for {}.{} with values {}", dbName, tableName, valuesStr);
String typeName = HiveDataTypes.HIVE_PARTITION.getName();
//todo replace gremlin with DSL
// String dslQuery = String.format("%s as p where values = %s, tableName where name = '%s', "
// + "dbName where name = '%s' and clusterName = '%s' select p", typeName, valuesStr, tableName,
// dbName, clusterName);
String dbType = HiveDataTypes.HIVE_DB.getName();
String tableType = HiveDataTypes.HIVE_TABLE.getName();
String datasetType = MetadataServiceClient.DATA_SET_SUPER_TYPE;
String gremlinQuery = String.format("g.V.has('__typeName', '%s').has('%s.values', %s).as('p')."
+ "out('__%s.tableName').has('%s.name', '%s').out('__%s.dbName').has('%s.name', '%s')"
+ ".has('%s.clusterName', '%s').back('p').toList()", typeName, typeName, valuesStr, typeName,
tableType, tableName.toLowerCase(), tableType, dbType, dbName.toLowerCase(), dbType, clusterName);
datasetType, tableName.toLowerCase(), tableType, dbType, dbName.toLowerCase(), dbType, clusterName);
return getEntityReferenceFromGremlin(typeName, gremlinQuery);
}
......@@ -283,7 +285,7 @@ public class HiveMetaStoreBridge {
tableRef.set("sd", sdReferenceable);
// add reference to the Partition Keys
List<Referenceable> partKeys = getColumns(hiveTable.getPartitionKeys());;
List<Referenceable> partKeys = getColumns(hiveTable.getPartitionKeys());
tableRef.set("partitionKeys", partKeys);
tableRef.set("parameters", hiveTable.getParameters());
......
......@@ -293,6 +293,7 @@ public class HiveHook implements ExecuteWithHookContext {
processReferenceable.set("name", event.operation.getOperationName());
processReferenceable.set("startTime", queryStartTime);
processReferenceable.set("userName", event.user);
List<Referenceable> source = new ArrayList<>();
for (ReadEntity readEntity : inputs) {
if (readEntity.getType() == Entity.Type.TABLE) {
......@@ -304,7 +305,8 @@ public class HiveHook implements ExecuteWithHookContext {
dgiBridge.registerPartition(readEntity.getPartition());
}
}
processReferenceable.set("inputTables", source);
processReferenceable.set("inputs", source);
List<Referenceable> target = new ArrayList<>();
for (WriteEntity writeEntity : outputs) {
if (writeEntity.getType() == Entity.Type.TABLE || writeEntity.getType() == Entity.Type.PARTITION) {
......@@ -316,7 +318,7 @@ public class HiveHook implements ExecuteWithHookContext {
dgiBridge.registerPartition(writeEntity.getPartition());
}
}
processReferenceable.set("outputTables", target);
processReferenceable.set("outputs", target);
processReferenceable.set("queryText", queryStr);
processReferenceable.set("queryId", queryId);
processReferenceable.set("queryPlan", event.jsonPlan.toString());
......
......@@ -20,6 +20,7 @@ package org.apache.hadoop.metadata.hive.model;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.MetadataServiceClient;
import org.apache.hadoop.metadata.typesystem.TypesDef;
import org.apache.hadoop.metadata.typesystem.json.TypesSerialization;
import org.apache.hadoop.metadata.typesystem.types.AttributeDefinition;
......@@ -350,9 +351,9 @@ public class HiveDataModelGenerator {
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("tableName", HiveDataTypes.HIVE_TABLE.getName(),
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("createTime", DataTypes.INT_TYPE.getName(),
new AttributeDefinition("createTime", DataTypes.LONG_TYPE.getName(),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("lastAccessTime", DataTypes.INT_TYPE.getName(),
new AttributeDefinition("lastAccessTime", DataTypes.LONG_TYPE.getName(),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("sd", HiveDataTypes.HIVE_STORAGEDESC.getName(),
Multiplicity.REQUIRED, false, null),
......@@ -371,15 +372,13 @@ public class HiveDataModelGenerator {
private void createTableClass() throws MetadataException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition("name", DataTypes.STRING_TYPE.getName(),
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("dbName", HiveDataTypes.HIVE_DB.getName(),
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("owner", DataTypes.STRING_TYPE.getName(),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("createTime", DataTypes.INT_TYPE.getName(),
new AttributeDefinition("createTime", DataTypes.LONG_TYPE.getName(),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("lastAccessTime", DataTypes.INT_TYPE.getName(),
new AttributeDefinition("lastAccessTime", DataTypes.LONG_TYPE.getName(),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition(COMMENT, DataTypes.STRING_TYPE.getName(),
Multiplicity.OPTIONAL, false, null),
......@@ -406,7 +405,7 @@ public class HiveDataModelGenerator {
};
HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_TABLE.getName(),
null, attributeDefinitions);
ImmutableList.of("DataSet"), attributeDefinitions);
classTypeDefinitions.put(HiveDataTypes.HIVE_TABLE.getName(), definition);
LOG.debug("Created definition for " + HiveDataTypes.HIVE_TABLE.getName());
}
......@@ -419,9 +418,9 @@ public class HiveDataModelGenerator {
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("dbName", HiveDataTypes.HIVE_DB.getName(),
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("createTime", DataTypes.INT_TYPE.getName(),
new AttributeDefinition("createTime", DataTypes.LONG_TYPE.getName(),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("lastAccessTime", DataTypes.INT_TYPE.getName(),
new AttributeDefinition("lastAccessTime", DataTypes.LONG_TYPE.getName(),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("origTableName", HiveDataTypes.HIVE_TABLE.getName(),
Multiplicity.REQUIRED, false, null),
......@@ -437,7 +436,7 @@ public class HiveDataModelGenerator {
HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_INDEX.getName(),
null, attributeDefinitions);
ImmutableList.of(MetadataServiceClient.DATA_SET_SUPER_TYPE), attributeDefinitions);
classTypeDefinitions.put(HiveDataTypes.HIVE_INDEX.getName(), definition);
LOG.debug("Created definition for " + HiveDataTypes.HIVE_INDEX.getName());
}
......@@ -454,7 +453,7 @@ public class HiveDataModelGenerator {
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("ownerType", HiveDataTypes.HIVE_PRINCIPAL_TYPE.getName(),
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("createTime", DataTypes.INT_TYPE.getName(),
new AttributeDefinition("createTime", DataTypes.LONG_TYPE.getName(),
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("functionType", HiveDataTypes.HIVE_FUNCTION_TYPE.getName(),
Multiplicity.REQUIRED, false, null),
......@@ -473,7 +472,7 @@ public class HiveDataModelGenerator {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition("roleName", DataTypes.STRING_TYPE.getName(),
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("createTime", DataTypes.INT_TYPE.getName(),
new AttributeDefinition("createTime", DataTypes.LONG_TYPE.getName(),
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("ownerName", DataTypes.STRING_TYPE.getName(),
Multiplicity.REQUIRED, false, null),
......@@ -487,20 +486,12 @@ public class HiveDataModelGenerator {
private void createProcessClass() throws MetadataException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition("name", DataTypes.STRING_TYPE.getName(),
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("startTime", DataTypes.INT_TYPE.getName(),
new AttributeDefinition("startTime", DataTypes.LONG_TYPE.getName(),
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("endTime", DataTypes.INT_TYPE.getName(),
new AttributeDefinition("endTime", DataTypes.LONG_TYPE.getName(),
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("userName", DataTypes.STRING_TYPE.getName(),
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("inputTables",
DataTypes.arrayTypeName(HiveDataTypes.HIVE_TABLE.getName()),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("outputTables",
DataTypes.arrayTypeName(HiveDataTypes.HIVE_TABLE.getName()),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("queryText", DataTypes.STRING_TYPE.getName(),
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("queryPlan", DataTypes.STRING_TYPE.getName(),
......@@ -512,7 +503,8 @@ public class HiveDataModelGenerator {
};
HierarchicalTypeDefinition<ClassType> definition = new HierarchicalTypeDefinition<>(
ClassType.class, HiveDataTypes.HIVE_PROCESS.getName(), null, attributeDefinitions);
ClassType.class, HiveDataTypes.HIVE_PROCESS.getName(),
ImmutableList.of(MetadataServiceClient.PROCESS_SUPER_TYPE), attributeDefinitions);
classTypeDefinitions.put(HiveDataTypes.HIVE_PROCESS.getName(), definition);
LOG.debug("Created definition for " + HiveDataTypes.HIVE_PROCESS.getName());
}
......
......@@ -21,7 +21,6 @@ package org.apache.hadoop.metadata.hive.hook;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.metadata.MetadataServiceClient;
......@@ -30,7 +29,6 @@ import org.apache.hadoop.metadata.hive.model.HiveDataModelGenerator;
import org.apache.hadoop.metadata.hive.model.HiveDataTypes;
import org.apache.hadoop.metadata.typesystem.Referenceable;
import org.apache.hadoop.metadata.typesystem.persistence.Id;
import org.apache.log4j.spi.LoggerFactory;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
......@@ -301,12 +299,14 @@ public class HiveHookIT {
String typeName = HiveDataTypes.HIVE_PARTITION.getName();
String dbType = HiveDataTypes.HIVE_DB.getName();
String tableType = HiveDataTypes.HIVE_TABLE.getName();
String datasetType = MetadataServiceClient.DATA_SET_SUPER_TYPE;
LOG.debug("Searching for partition of {}.{} with values {}", dbName, tableName, value);
//todo replace with DSL
String gremlinQuery = String.format("g.V.has('__typeName', '%s').has('%s.values', ['%s']).as('p')."
+ "out('__%s.tableName').has('%s.name', '%s').out('__%s.dbName').has('%s.name', '%s')"
+ ".has('%s.clusterName', '%s').back('p').toList()", typeName, typeName, value, typeName,
tableType, tableName.toLowerCase(), tableType, dbType, dbName.toLowerCase(), dbType, CLUSTER_NAME);
datasetType, tableName.toLowerCase(), tableType, dbType, dbName.toLowerCase(), dbType, CLUSTER_NAME);
JSONObject response = dgiCLient.searchByGremlin(gremlinQuery);
JSONArray results = response.getJSONArray(MetadataServiceClient.RESULTS);
Assert.assertEquals(results.length(), 1);
......
......@@ -26,12 +26,14 @@ metadata.graph.index.search.directory=target/data/lucene
######### Hive Lineage Configs #########
metadata.lineage.hive.table.type.name=hive_table
metadata.lineage.hive.column.type.name=hive_column
metadata.lineage.hive.table.column.name=columns
metadata.lineage.hive.process.type.name=hive_process
metadata.lineage.hive.process.inputs.name=inputTables
metadata.lineage.hive.process.outputs.name=outputTables
# This models reflects the base super types for Data and Process
#metadata.lineage.hive.table.type.name=DataSet
#metadata.lineage.hive.process.type.name=Process
#metadata.lineage.hive.process.inputs.name=inputs
#metadata.lineage.hive.process.outputs.name=outputs
## Schema
#metadata.lineage.hive.table.schema.query=hive_table where name=?, columns
######### Security Properties #########
......
......@@ -50,7 +50,6 @@ public class MetadataServiceClient {
public static final String NAME = "name";
public static final String GUID = "GUID";
public static final String TYPENAME = "typeName";
public static final String TYPE = "type";
public static final String DEFINITION = "definition";
public static final String ERROR = "error";
......@@ -72,6 +71,10 @@ public class MetadataServiceClient {
public static final String ATTRIBUTE_VALUE = "value";
public static final String INFRASTRUCTURE_SUPER_TYPE = "Infrastructure";
public static final String DATA_SET_SUPER_TYPE = "DataSet";
public static final String PROCESS_SUPER_TYPE = "Process";
private WebResource service;
public MetadataServiceClient(String baseUrl) {
......@@ -100,7 +103,7 @@ public class MetadataServiceClient {
return PropertiesUtil.getClientProperties();
}
static enum API {
enum API {
//Type operations
CREATE_TYPE(BASE_URI + TYPES, HttpMethod.POST),
......
......@@ -38,15 +38,15 @@ metadata.graph.index.search.elasticsearch.create.sleep=2000
The higher layer services like hive lineage, schema, etc. are driven by the type system and this
section encodes the specific types for the hive data model.
# This models follows the quick-start guide
# This models reflects the base super types for Data and Process
<verbatim>
metadata.lineage.hive.table.type.name=hive_table
metadata.lineage.hive.table.column.name=columns
metadata.lineage.hive.process.type.name=hive_process
metadata.lineage.hive.process.inputs.name=inputTables
metadata.lineage.hive.process.outputs.name=outputTables
#Currently unused
#metadata.lineage.hive.column.type.name=Column
metadata.lineage.hive.table.type.name=DataSet
metadata.lineage.hive.process.type.name=Process
metadata.lineage.hive.process.inputs.name=inputs
metadata.lineage.hive.process.outputs.name=outputs
## Schema
metadata.lineage.hive.table.schema.query=hive_table where name=?, columns
</verbatim>
---+++ Security Properties
......
......@@ -22,10 +22,12 @@ import com.thinkaurelius.titan.core.TitanGraph;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.metadata.GraphTransaction;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.ParamChecker;
import org.apache.hadoop.metadata.PropertiesUtil;
import org.apache.hadoop.metadata.discovery.graph.DefaultGraphPersistenceStrategy;
import org.apache.hadoop.metadata.discovery.graph.GraphBackedDiscoveryService;
import org.apache.hadoop.metadata.query.Expressions;
import org.apache.hadoop.metadata.query.GremlinQueryResult;
import org.apache.hadoop.metadata.query.HiveLineageQuery;
import org.apache.hadoop.metadata.query.HiveWhereUsedQuery;
import org.apache.hadoop.metadata.repository.MetadataRepository;
......@@ -51,26 +53,32 @@ public class HiveLineageService implements LineageService {
Some.<List<String>>apply(List.<String>fromArray(new String[]{"name"}));
private static final String HIVE_TABLE_TYPE_NAME;
private static final String HIVE_TABLE_COLUMNS_ATTRIBUTE_NAME;
private static final String HIVE_PROCESS_TYPE_NAME;
private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME;
private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME;
private static final String HIVE_TABLE_SCHEMA_QUERY;
private static final String HIVE_TABLE_EXISTS_QUERY;
static {
// todo - externalize this using type system - dog food
try {
PropertiesConfiguration conf = PropertiesUtil.getApplicationProperties();
HIVE_TABLE_TYPE_NAME =
conf.getString("metadata.lineage.hive.table.type.name", "hive_table");
HIVE_TABLE_COLUMNS_ATTRIBUTE_NAME =
conf.getString("metadata.lineage.hive.table.column.name", "columns");
conf.getString("metadata.lineage.hive.table.type.name", "DataSet");
HIVE_PROCESS_TYPE_NAME =
conf.getString("metadata.lineage.hive.process.type.name", "hive_process");
conf.getString("metadata.lineage.hive.process.type.name", "Process");
HIVE_PROCESS_INPUT_ATTRIBUTE_NAME =
conf.getString("metadata.lineage.hive.process.inputs.name", "inputTables");
conf.getString("metadata.lineage.hive.process.inputs.name", "inputs");
HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME =
conf.getString("metadata.lineage.hive.process.outputs.name", "outputTables");
conf.getString("metadata.lineage.hive.process.outputs.name", "outputs");
HIVE_TABLE_SCHEMA_QUERY = conf.getString(
"metadata.lineage.hive.table.schema.query",
"hive_table where name=\"?\", columns");
HIVE_TABLE_EXISTS_QUERY = conf.getString(
"metadata.lineage.hive.table.exists.query",
"from hive_table where name=\"?\"");
} catch (MetadataException e) {
throw new RuntimeException(e);
}
......@@ -100,6 +108,8 @@ public class HiveLineageService implements LineageService {
@GraphTransaction
public String getOutputs(String tableName) throws DiscoveryException {
LOG.info("Fetching lineage outputs for tableName={}", tableName);
ParamChecker.notEmpty(tableName, "table name cannot be null");
validateTableExists(tableName);
HiveWhereUsedQuery outputsQuery = new HiveWhereUsedQuery(
HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
......@@ -126,6 +136,8 @@ public class HiveLineageService implements LineageService {
@GraphTransaction
public String getOutputsGraph(String tableName) throws DiscoveryException {
LOG.info("Fetching lineage outputs graph for tableName={}", tableName);
ParamChecker.notEmpty(tableName, "table name cannot be null");
validateTableExists(tableName);
HiveWhereUsedQuery outputsQuery = new HiveWhereUsedQuery(
HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
......@@ -145,6 +157,8 @@ public class HiveLineageService implements LineageService {
@GraphTransaction
public String getInputs(String tableName) throws DiscoveryException {
LOG.info("Fetching lineage inputs for tableName={}", tableName);
ParamChecker.notEmpty(tableName, "table name cannot be null");
validateTableExists(tableName);
HiveLineageQuery inputsQuery = new HiveLineageQuery(
HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
......@@ -171,6 +185,8 @@ public class HiveLineageService implements LineageService {
@GraphTransaction
public String getInputsGraph(String tableName) throws DiscoveryException {
LOG.info("Fetching lineage inputs graph for tableName={}", tableName);
ParamChecker.notEmpty(tableName, "table name cannot be null");
validateTableExists(tableName);
HiveLineageQuery inputsQuery = new HiveLineageQuery(
HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
......@@ -189,12 +205,24 @@ public class HiveLineageService implements LineageService {
@Override
@GraphTransaction
public String getSchema(String tableName) throws DiscoveryException {
// todo - validate if indeed this is a table type and exists
String schemaQuery = HIVE_TABLE_TYPE_NAME
+ " where name=\"" + tableName + "\""
+ ", " + HIVE_TABLE_COLUMNS_ATTRIBUTE_NAME
// + " as column select column.name, column.dataType, column.comment"
;
LOG.info("Fetching schema for tableName={}", tableName);
ParamChecker.notEmpty(tableName, "table name cannot be null");
validateTableExists(tableName);
String schemaQuery = HIVE_TABLE_SCHEMA_QUERY.replace("?", tableName);
return discoveryService.searchByDSL(schemaQuery);
}
/**
* Validate if indeed this is a table type and exists.
*
* @param tableName table name
*/
private void validateTableExists(String tableName) throws DiscoveryException {
String tableExistsQuery = HIVE_TABLE_EXISTS_QUERY.replace("?", tableName);
GremlinQueryResult queryResult = discoveryService.evaluate(tableExistsQuery);
if (!(queryResult.rows().length() > 0)) {
throw new IllegalArgumentException(tableName + " does not exist");
}
}
}
......@@ -23,8 +23,6 @@ import com.thinkaurelius.titan.core.TitanIndexQuery;
import com.thinkaurelius.titan.core.TitanProperty;
import com.thinkaurelius.titan.core.TitanVertex;
import com.tinkerpop.blueprints.Vertex;
import com.tinkerpop.gremlin.groovy.Gremlin;
import com.tinkerpop.gremlin.java.GremlinPipeline;
import org.apache.hadoop.metadata.GraphTransaction;
import org.apache.hadoop.metadata.MetadataServiceClient;
import org.apache.hadoop.metadata.discovery.DiscoveryException;
......@@ -123,13 +121,18 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
@GraphTransaction
public String searchByDSL(String dslQuery) throws DiscoveryException {
LOG.info("Executing dsl query={}", dslQuery);
GremlinQueryResult queryResult = evaluate(dslQuery);
return queryResult.toJson();
}
public GremlinQueryResult evaluate(String dslQuery) throws DiscoveryException {
LOG.info("Executing dsl query={}", dslQuery);
try {
QueryParser queryParser = new QueryParser();
Either<Parsers.NoSuccess, Expressions.Expression> either = queryParser.apply(dslQuery);
if (either.isRight()) {
Expressions.Expression expression = either.right().get();
GremlinQueryResult queryResult = evaluate(expression);
return queryResult.toJson();
return evaluate(expression);
}
} catch (Exception e) { // unable to catch ExpressionException
throw new DiscoveryException("Invalid expression : " + dslQuery, e);
......
......@@ -104,26 +104,38 @@ public class DefaultMetadataService implements MetadataService {
private static final AttributeDefinition NAME_ATTRIBUTE =
TypesUtil.createRequiredAttrDef("name", DataTypes.STRING_TYPE);
private static final AttributeDefinition DESCRIPTION_ATTRIBUTE =
TypesUtil.createRequiredAttrDef("description", DataTypes.STRING_TYPE);
private static final String[] SUPER_TYPES = {
"DataSet",
"Process",
"Infrastructure",
};
TypesUtil.createOptionalAttrDef("description", DataTypes.STRING_TYPE);
@InterfaceAudience.Private
public void createSuperTypes() throws MetadataException {
if (typeSystem.isRegistered(SUPER_TYPES[0])) {
if (typeSystem.isRegistered(MetadataServiceClient.DATA_SET_SUPER_TYPE)) {
return; // this is already registered
}
for (String superTypeName : SUPER_TYPES) {
HierarchicalTypeDefinition<ClassType> superTypeDefinition =
TypesUtil.createClassTypeDef(superTypeName,
ImmutableList.<String>of(),
NAME_ATTRIBUTE, DESCRIPTION_ATTRIBUTE);
typeSystem.defineClassType(superTypeDefinition);
}
HierarchicalTypeDefinition<ClassType> superTypeDefinition =
TypesUtil.createClassTypeDef(MetadataServiceClient.INFRASTRUCTURE_SUPER_TYPE,
ImmutableList.<String>of(),
NAME_ATTRIBUTE, DESCRIPTION_ATTRIBUTE);
typeSystem.defineClassType(superTypeDefinition);
superTypeDefinition =
TypesUtil.createClassTypeDef(MetadataServiceClient.DATA_SET_SUPER_TYPE,
ImmutableList.<String>of(),
NAME_ATTRIBUTE, DESCRIPTION_ATTRIBUTE);
typeSystem.defineClassType(superTypeDefinition);
superTypeDefinition =
TypesUtil.createClassTypeDef(MetadataServiceClient.PROCESS_SUPER_TYPE,
ImmutableList.<String>of(),
NAME_ATTRIBUTE, DESCRIPTION_ATTRIBUTE,
new AttributeDefinition("inputs",
DataTypes.arrayTypeName(MetadataServiceClient.DATA_SET_SUPER_TYPE),
new Multiplicity(0, Integer.MAX_VALUE, false), false, null),
new AttributeDefinition("outputs",
DataTypes.arrayTypeName(MetadataServiceClient.DATA_SET_SUPER_TYPE),
new Multiplicity(0, Integer.MAX_VALUE, false), false, null)
);
typeSystem.defineClassType(superTypeDefinition);
}
/**
......
......@@ -37,7 +37,6 @@ import org.apache.hadoop.metadata.typesystem.types.IDataType;
import org.apache.hadoop.metadata.typesystem.types.Multiplicity;
import org.apache.hadoop.metadata.typesystem.types.StructTypeDefinition;
import org.apache.hadoop.metadata.typesystem.types.TraitType;
import org.apache.hadoop.metadata.typesystem.types.TypeSystem;
import org.apache.hadoop.metadata.typesystem.types.TypeUtils;
import org.apache.hadoop.metadata.typesystem.types.utils.TypesUtil;
import org.codehaus.jettison.json.JSONArray;
......@@ -163,6 +162,24 @@ public class HiveLineageServiceTest {
Assert.assertTrue(paths.length() > 0);
}
@Test (expectedExceptions = IllegalArgumentException.class)
public void testGetInputsTableNameNull() throws Exception {
hiveLineageService.getInputs(null);
Assert.fail();
}
@Test (expectedExceptions = IllegalArgumentException.class)
public void testGetInputsTableNameEmpty() throws Exception {
hiveLineageService.getInputs("");
Assert.fail();
}
@Test (expectedExceptions = IllegalArgumentException.class)
public void testGetInputsBadTableName() throws Exception {
hiveLineageService.getInputs("blah");
Assert.fail();
}
@Test
public void testGetInputsGraph() throws Exception {
JSONObject results = new JSONObject(
......@@ -194,6 +211,24 @@ public class HiveLineageServiceTest {
Assert.assertTrue(paths.length() > 0);
}
@Test (expectedExceptions = IllegalArgumentException.class)
public void testGetOututsTableNameNull() throws Exception {
hiveLineageService.getOutputs(null);
Assert.fail();
}
@Test (expectedExceptions = IllegalArgumentException.class)
public void testGetOutputsTableNameEmpty() throws Exception {
hiveLineageService.getOutputs("");
Assert.fail();
}
@Test (expectedExceptions = IllegalArgumentException.class)
public void testGetOutputsBadTableName() throws Exception {
hiveLineageService.getOutputs("blah");
Assert.fail();
}
@Test
public void testGetOutputsGraph() throws Exception {
JSONObject results = new JSONObject(hiveLineageService.getOutputsGraph("sales_fact"));
......@@ -238,6 +273,24 @@ public class HiveLineageServiceTest {
}
}
@Test (expectedExceptions = IllegalArgumentException.class)
public void testGetSchemaTableNameNull() throws Exception {
hiveLineageService.getSchema(null);
Assert.fail();
}
@Test (expectedExceptions = IllegalArgumentException.class)
public void testGetSchemaTableNameEmpty() throws Exception {
hiveLineageService.getSchema("");
Assert.fail();
}
@Test (expectedExceptions = IllegalArgumentException.class)
public void testGetSchemaBadTableName() throws Exception {
hiveLineageService.getSchema("blah");
Assert.fail();
}
private void setUpTypes() throws Exception {
TypesDef typesDef = createTypeDefinitions();
String typesAsJSON = TypesSerialization.toJson(typesDef);
......@@ -298,12 +351,6 @@ public class HiveLineageServiceTest {
attrDef("userName", DataTypes.STRING_TYPE),
attrDef("startTime", DataTypes.INT_TYPE),
attrDef("endTime", DataTypes.INT_TYPE),
new AttributeDefinition("inputTables",
DataTypes.arrayTypeName(HIVE_TABLE_TYPE),
Multiplicity.COLLECTION, false, null),
new AttributeDefinition("outputTables",
DataTypes.arrayTypeName(HIVE_TABLE_TYPE),
Multiplicity.COLLECTION, false, null),
attrDef("queryText", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryPlan", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryId", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
......@@ -504,8 +551,8 @@ public class HiveLineageServiceTest {
referenceable.set("startTime", System.currentTimeMillis());
referenceable.set("endTime", System.currentTimeMillis() + 10000);
referenceable.set("inputTables", inputTables);
referenceable.set("outputTables", outputTables);
referenceable.set("inputs", inputTables);
referenceable.set("outputs", outputTables);
referenceable.set("queryText", queryText);
referenceable.set("queryPlan", queryPlan);
......
......@@ -30,12 +30,13 @@ metadata.graph.index.search.elasticsearch.local-mode=true
######### Hive Lineage Configs #########
metadata.lineage.hive.table.type.name=hive_table
metadata.lineage.hive.column.type.name=hive_column
metadata.lineage.hive.table.column.name=columns
metadata.lineage.hive.process.type.name=hive_process
metadata.lineage.hive.process.inputs.name=inputTables
metadata.lineage.hive.process.outputs.name=outputTables
#metadata.lineage.hive.table.type.name=DataSet
#metadata.lineage.hive.process.type.name=Process
#metadata.lineage.hive.process.inputs.name=inputs
#metadata.lineage.hive.process.outputs.name=outputs
## Schema
#metadata.lineage.hive.table.schema.query=hive_table where name=?, columns
######### Security Properties #########
......
......@@ -30,11 +30,13 @@ metadata.graph.index.search.elasticsearch.create.sleep=2000
######### Hive Lineage Configs #########
# This models reflects the base super types for Data and Process
metadata.lineage.hive.table.type.name=DataSet
metadata.lineage.hive.table.column.name=columns
metadata.lineage.hive.process.type.name=Process
metadata.lineage.hive.process.inputs.name=inputTables
metadata.lineage.hive.process.outputs.name=outputTables
#metadata.lineage.hive.table.type.name=DataSet
#metadata.lineage.hive.process.type.name=Process
#metadata.lineage.hive.process.inputs.name=inputs
#metadata.lineage.hive.process.outputs.name=outputs
## Schema
#metadata.lineage.hive.table.schema.query=hive_table where name=?, columns
######### Security Properties #########
......
......@@ -23,18 +23,3 @@ metadata.graph.storage.backend=inmemory
# Graph Search Index
metadata.graph.index.search.backend=lucene
metadata.graph.index.search.directory=target/data/lucene
######### Hive Lineage Configs #########
metadata.lineage.hive.table.type.name=hive_table
metadata.lineage.hive.column.type.name=hive_column
metadata.lineage.hive.table.column.name=columns
metadata.lineage.hive.process.type.name=hive_process
metadata.lineage.hive.process.inputs.name=inputTables
metadata.lineage.hive.process.outputs.name=outputTables
######### Security Properties #########
# SSL config
metadata.enableTLS=false
......@@ -151,12 +151,6 @@ public class QuickStart {
attrDef("userName", DataTypes.STRING_TYPE),
attrDef("startTime", DataTypes.INT_TYPE),
attrDef("endTime", DataTypes.INT_TYPE),
new AttributeDefinition("inputTables",
DataTypes.arrayTypeName(TABLE_TYPE),
Multiplicity.COLLECTION, false, null),
new AttributeDefinition("outputTables",
DataTypes.arrayTypeName(TABLE_TYPE),
Multiplicity.COLLECTION, false, null),
attrDef("queryText", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryPlan", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryId", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
......@@ -366,15 +360,16 @@ public class QuickStart {
String queryId, String queryGraph,
String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(LOAD_PROCESS_TYPE, traitNames);
// super type attributes
referenceable.set("name", name);
referenceable.set("description", description);
referenceable.set("inputs", inputTables);
referenceable.set("outputs", outputTables);
referenceable.set("user", user);
referenceable.set("startTime", System.currentTimeMillis());
referenceable.set("endTime", System.currentTimeMillis() + 10000);
referenceable.set("inputTables", inputTables);
referenceable.set("outputTables", outputTables);
referenceable.set("queryText", queryText);
referenceable.set("queryPlan", queryPlan);
referenceable.set("queryId", queryId);
......
......@@ -58,41 +58,6 @@ public class HiveLineageResource {
}
/**
* Returns the inputs for a given entity.
*
* @param tableName table name
*/
@GET
@Path("table/{tableName}/inputs")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response inputs(@Context HttpServletRequest request,
@PathParam("tableName") String tableName) {
LOG.info("Fetching lineage inputs for tableName={}", tableName);
try {
ParamChecker.notEmpty(tableName, "table name cannot be null");
final String jsonResult = lineageService.getInputs(tableName);
JSONObject response = new JSONObject();
response.put(MetadataServiceClient.REQUEST_ID, Servlets.getRequestId());
response.put("tableName", tableName);
response.put(MetadataServiceClient.RESULTS, new JSONObject(jsonResult));
return Response.ok(response).build();
} catch (DiscoveryException | IllegalArgumentException e) {
LOG.error("Unable to get lineage inputs for table {}", tableName, e);
throw new WebApplicationException(
Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (Throwable e) {
LOG.error("Unable to get lineage inputs for table {}", tableName, e);
throw new WebApplicationException(
Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
}
}
/**
* Returns the inputs graph for a given entity.
*
* @param tableName table name
......@@ -127,41 +92,6 @@ public class HiveLineageResource {
}
/**
* Returns the outputs for a given entity.
*
* @param tableName table name
*/
@GET
@Path("table/{tableName}/outputs")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response outputs(@Context HttpServletRequest request,
@PathParam("tableName") String tableName) {
LOG.info("Fetching lineage outputs for tableName={}", tableName);
try {
ParamChecker.notEmpty(tableName, "table name cannot be null");
final String jsonResult = lineageService.getOutputs(tableName);
JSONObject response = new JSONObject();
response.put(MetadataServiceClient.REQUEST_ID, Servlets.getRequestId());
response.put("tableName", tableName);
response.put(MetadataServiceClient.RESULTS, new JSONObject(jsonResult));
return Response.ok(response).build();
} catch (DiscoveryException | IllegalArgumentException e) {
LOG.error("Unable to get lineage outputs for table {}", tableName, e);
throw new WebApplicationException(
Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (Throwable e) {
LOG.error("Unable to get lineage outputs for table {}", tableName, e);
throw new WebApplicationException(
Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
}
}
/**
* Returns the outputs graph for a given entity.
*
* @param tableName table name
......@@ -172,7 +102,6 @@ public class HiveLineageResource {
@Produces(MediaType.APPLICATION_JSON)
public Response outputsGraph(@Context HttpServletRequest request,
@PathParam("tableName") String tableName) {
LOG.info("Fetching lineage outputs graph for tableName={}", tableName);
try {
......@@ -207,8 +136,6 @@ public class HiveLineageResource {
@Produces(MediaType.APPLICATION_JSON)
public Response schema(@Context HttpServletRequest request,
@PathParam("tableName") String tableName) {
LOG.info("Fetching schema for tableName={}", tableName);
try {
......
......@@ -29,12 +29,14 @@ metadata.graph.index.search.elasticsearch.local-mode=true
metadata.graph.index.search.elasticsearch.create.sleep=2000
######### Hive Lineage Configs #########
metadata.lineage.hive.table.type.name=hive_table
metadata.lineage.hive.column.type.name=hive_column
metadata.lineage.hive.table.column.name=columns
metadata.lineage.hive.process.type.name=hive_process
metadata.lineage.hive.process.inputs.name=inputTables
metadata.lineage.hive.process.outputs.name=outputTables
# This models reflects the base super types for Data and Process
#metadata.lineage.hive.table.type.name=DataSet
#metadata.lineage.hive.process.type.name=Process
#metadata.lineage.hive.process.inputs.name=inputs
#metadata.lineage.hive.process.outputs.name=outputs
## Schema
#metadata.lineage.hive.table.schema.query=hive_table where name=?, columns
######### Security Properties #########
......
......@@ -64,37 +64,6 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT {
}
@Test
public void testInputs() throws Exception {
WebResource resource = service
.path(BASE_URI)
.path("sales_fact_monthly_mv")
.path("inputs");
ClientResponse clientResponse = resource
.accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)
.method(HttpMethod.GET, ClientResponse.class);
Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
String responseAsString = clientResponse.getEntity(String.class);
Assert.assertNotNull(responseAsString);
System.out.println("inputs = " + responseAsString);
JSONObject response = new JSONObject(responseAsString);
Assert.assertNotNull(response.get(MetadataServiceClient.REQUEST_ID));
JSONObject results = response.getJSONObject(MetadataServiceClient.RESULTS);
Assert.assertNotNull(results);
JSONArray rows = results.getJSONArray("rows");
Assert.assertTrue(rows.length() > 0);
final JSONObject row = rows.getJSONObject(0);
JSONArray paths = row.getJSONArray("path");
Assert.assertTrue(paths.length() > 0);
}
@Test
public void testInputsGraph() throws Exception {
WebResource resource = service
.path(BASE_URI)
......@@ -129,37 +98,6 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT {
}
@Test
public void testOutputs() throws Exception {
WebResource resource = service
.path(BASE_URI)
.path("sales_fact")
.path("outputs");
ClientResponse clientResponse = resource
.accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)
.method(HttpMethod.GET, ClientResponse.class);
Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
String responseAsString = clientResponse.getEntity(String.class);
Assert.assertNotNull(responseAsString);
System.out.println("outputs = " + responseAsString);
JSONObject response = new JSONObject(responseAsString);
Assert.assertNotNull(response.get(MetadataServiceClient.REQUEST_ID));
JSONObject results = response.getJSONObject(MetadataServiceClient.RESULTS);
Assert.assertNotNull(results);
JSONArray rows = results.getJSONArray("rows");
Assert.assertTrue(rows.length() > 0);
final JSONObject row = rows.getJSONObject(0);
JSONArray paths = row.getJSONArray("path");
Assert.assertTrue(paths.length() > 0);
}
@Test
public void testOutputsGraph() throws Exception {
WebResource resource = service
.path(BASE_URI)
......@@ -228,6 +166,36 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT {
}
}
@Test
public void testSchemaForEmptyTable() throws Exception {
WebResource resource = service
.path(BASE_URI)
.path("")
.path("schema");
ClientResponse clientResponse = resource
.accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)
.method(HttpMethod.GET, ClientResponse.class);
Assert.assertEquals(clientResponse.getStatus(),
Response.Status.NOT_FOUND.getStatusCode());
}
@Test
public void testSchemaForInvalidTable() throws Exception {
WebResource resource = service
.path(BASE_URI)
.path("blah")
.path("schema");
ClientResponse clientResponse = resource
.accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)
.method(HttpMethod.GET, ClientResponse.class);
Assert.assertEquals(clientResponse.getStatus(),
Response.Status.BAD_REQUEST.getStatusCode());
}
private void setUpTypes() throws Exception {
TypesDef typesDef = createTypeDefinitions();
createType(typesDef);
......@@ -256,9 +224,7 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT {
);
HierarchicalTypeDefinition<ClassType> tblClsDef =
TypesUtil.createClassTypeDef(HIVE_TABLE_TYPE, null,
attrDef("name", DataTypes.STRING_TYPE),
attrDef("description", DataTypes.STRING_TYPE),
TypesUtil.createClassTypeDef(HIVE_TABLE_TYPE, ImmutableList.of("DataSet"),
attrDef("owner", DataTypes.STRING_TYPE),
attrDef("createTime", DataTypes.INT_TYPE),
attrDef("lastAccessTime", DataTypes.INT_TYPE),
......@@ -272,17 +238,10 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT {
);
HierarchicalTypeDefinition<ClassType> loadProcessClsDef =
TypesUtil.createClassTypeDef(HIVE_PROCESS_TYPE, null,
attrDef("name", DataTypes.STRING_TYPE),
TypesUtil.createClassTypeDef(HIVE_PROCESS_TYPE, ImmutableList.of("Process"),
attrDef("userName", DataTypes.STRING_TYPE),
attrDef("startTime", DataTypes.INT_TYPE),
attrDef("endTime", DataTypes.INT_TYPE),
new AttributeDefinition("inputTables",
DataTypes.arrayTypeName(HIVE_TABLE_TYPE),
Multiplicity.COLLECTION, false, null),
new AttributeDefinition("outputTables",
DataTypes.arrayTypeName(HIVE_TABLE_TYPE),
Multiplicity.COLLECTION, false, null),
attrDef("queryText", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryPlan", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryId", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
......@@ -427,8 +386,8 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT {
referenceable.set("startTime", System.currentTimeMillis());
referenceable.set("endTime", System.currentTimeMillis() + 10000);
referenceable.set("inputTables", inputTables);
referenceable.set("outputTables", outputTables);
referenceable.set("inputs", inputTables);
referenceable.set("outputs", outputTables);
referenceable.set("queryText", queryText);
referenceable.set("queryPlan", queryPlan);
......@@ -437,4 +396,4 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT {
return createInstance(referenceable);
}
}
\ No newline at end of file
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment