Commit 879eda63 by Venkatesh Seetharam

Add schema API to Hive and minor refactoring

parent b1e6c379
...@@ -19,13 +19,13 @@ ...@@ -19,13 +19,13 @@
package org.apache.hadoop.metadata.discovery; package org.apache.hadoop.metadata.discovery;
import com.thinkaurelius.titan.core.TitanGraph; import com.thinkaurelius.titan.core.TitanGraph;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.metadata.discovery.graph.DefaultGraphPersistenceStrategy; import org.apache.hadoop.metadata.discovery.graph.DefaultGraphPersistenceStrategy;
import org.apache.hadoop.metadata.discovery.graph.GraphBackedDiscoveryService;
import org.apache.hadoop.metadata.query.Expressions; import org.apache.hadoop.metadata.query.Expressions;
import org.apache.hadoop.metadata.query.GremlinQuery;
import org.apache.hadoop.metadata.query.GremlinTranslator;
import org.apache.hadoop.metadata.query.HiveLineageQuery; import org.apache.hadoop.metadata.query.HiveLineageQuery;
import org.apache.hadoop.metadata.query.HiveWhereUsedQuery; import org.apache.hadoop.metadata.query.HiveWhereUsedQuery;
import org.apache.hadoop.metadata.query.QueryProcessor;
import org.apache.hadoop.metadata.repository.MetadataRepository; import org.apache.hadoop.metadata.repository.MetadataRepository;
import org.apache.hadoop.metadata.repository.graph.GraphProvider; import org.apache.hadoop.metadata.repository.graph.GraphProvider;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -45,23 +45,47 @@ public class HiveLineageService implements LineageService { ...@@ -45,23 +45,47 @@ public class HiveLineageService implements LineageService {
private static final Logger LOG = LoggerFactory.getLogger(HiveLineageService.class); private static final Logger LOG = LoggerFactory.getLogger(HiveLineageService.class);
// todo - externalize these into configuration
private static final String HIVE_TABLE_TYPE_NAME = "hive_table";
private static final String HIVE_PROCESS_TYPE_NAME = "hive_process";
private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME = "inputTables";
private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME = "outputTables";
private static final Option<List<String>> SELECT_ATTRIBUTES = private static final Option<List<String>> SELECT_ATTRIBUTES =
Some.<List<String>>apply(List.<String>fromArray(new String[]{"name"})); Some.<List<String>>apply(List.<String>fromArray(new String[]{"name"}));
private static final String HIVE_TABLE_TYPE_NAME;
private static final String HIVE_TABLE_COLUMNS_ATTRIBUTE_NAME;
private static final String HIVE_PROCESS_TYPE_NAME;
private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME;
private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME;
static {
// todo - externalize this using type system - dog food
try {
PropertiesConfiguration conf = new PropertiesConfiguration("application.properties");
HIVE_TABLE_TYPE_NAME =
conf.getString("metadata.lineage.hive.table.type.name", "hive_table");
HIVE_TABLE_COLUMNS_ATTRIBUTE_NAME =
conf.getString("metadata.lineage.hive.table.column.name", "columns");
HIVE_PROCESS_TYPE_NAME =
conf.getString("metadata.lineage.hive.process.type.name", "hive_process");
HIVE_PROCESS_INPUT_ATTRIBUTE_NAME =
conf.getString("metadata.lineage.hive.process.inputs.name", "inputTables");
HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME =
conf.getString("metadata.lineage.hive.process.outputs.name", "outputTables");
} catch (ConfigurationException e) {
throw new RuntimeException(e);
}
}
private final TitanGraph titanGraph; private final TitanGraph titanGraph;
private final DefaultGraphPersistenceStrategy graphPersistenceStrategy; private final DefaultGraphPersistenceStrategy graphPersistenceStrategy;
private final GraphBackedDiscoveryService discoveryService;
@Inject @Inject
HiveLineageService(GraphProvider<TitanGraph> graphProvider, HiveLineageService(GraphProvider<TitanGraph> graphProvider,
MetadataRepository metadataRepository) throws DiscoveryException { MetadataRepository metadataRepository,
GraphBackedDiscoveryService discoveryService) throws DiscoveryException {
this.titanGraph = graphProvider.get(); this.titanGraph = graphProvider.get();
this.graphPersistenceStrategy = new DefaultGraphPersistenceStrategy(metadataRepository); this.graphPersistenceStrategy = new DefaultGraphPersistenceStrategy(metadataRepository);
this.discoveryService = discoveryService;
} }
/** /**
...@@ -82,16 +106,7 @@ public class HiveLineageService implements LineageService { ...@@ -82,16 +106,7 @@ public class HiveLineageService implements LineageService {
graphPersistenceStrategy, titanGraph); graphPersistenceStrategy, titanGraph);
Expressions.Expression expression = outputsQuery.expr(); Expressions.Expression expression = outputsQuery.expr();
Expressions.Expression validatedExpression = QueryProcessor.validate(expression); return discoveryService.evaluate(expression).toJson();
GremlinQuery gremlinQuery = new GremlinTranslator(
validatedExpression, graphPersistenceStrategy).translate();
if (LOG.isDebugEnabled()) {
System.out.println("Query = " + validatedExpression);
System.out.println("Expression Tree = " + validatedExpression.treeString());
System.out.println("Gremlin Query = " + gremlinQuery.queryStr());
}
return outputsQuery.evaluate().toJson();
} catch (Exception e) { // unable to catch ExpressionException } catch (Exception e) { // unable to catch ExpressionException
throw new DiscoveryException("Invalid expression", e); throw new DiscoveryException("Invalid expression", e);
} }
...@@ -115,18 +130,25 @@ public class HiveLineageService implements LineageService { ...@@ -115,18 +130,25 @@ public class HiveLineageService implements LineageService {
graphPersistenceStrategy, titanGraph); graphPersistenceStrategy, titanGraph);
Expressions.Expression expression = inputsQuery.expr(); Expressions.Expression expression = inputsQuery.expr();
Expressions.Expression validatedExpression = QueryProcessor.validate(expression); return discoveryService.evaluate(expression).toJson();
GremlinQuery gremlinQuery =
new GremlinTranslator(validatedExpression, graphPersistenceStrategy).translate();
if (LOG.isDebugEnabled()) {
System.out.println("Query = " + validatedExpression);
System.out.println("Expression Tree = " + validatedExpression.treeString());
System.out.println("Gremlin Query = " + gremlinQuery.queryStr());
}
return inputsQuery.evaluate().toJson();
} catch (Exception e) { // unable to catch ExpressionException } catch (Exception e) { // unable to catch ExpressionException
throw new DiscoveryException("Invalid expression", e); throw new DiscoveryException("Invalid expression", e);
} }
} }
/**
* Return the schema for the given tableName.
*
* @param tableName tableName
* @return Schema as JSON
*/
@Override
public String getSchema(String tableName) throws DiscoveryException {
// todo - validate if indeed this is a table type and exists
String schemaQuery = HIVE_TABLE_TYPE_NAME
+ " where name=\"" + tableName + "\", "
+ HIVE_TABLE_COLUMNS_ATTRIBUTE_NAME;
// + " as column select column.name, column.dataType, column.comment";
return discoveryService.searchByDSL(schemaQuery);
}
} }
...@@ -38,4 +38,12 @@ public interface LineageService { ...@@ -38,4 +38,12 @@ public interface LineageService {
* @return Inputs as JSON * @return Inputs as JSON
*/ */
String getInputs(String tableName) throws DiscoveryException; String getInputs(String tableName) throws DiscoveryException;
/**
* Return the schema for the given tableName.
*
* @param tableName tableName
* @return Schema as JSON
*/
String getSchema(String tableName) throws DiscoveryException;
} }
...@@ -43,6 +43,7 @@ import scala.util.Either; ...@@ -43,6 +43,7 @@ import scala.util.Either;
import scala.util.parsing.combinator.Parsers; import scala.util.parsing.combinator.Parsers;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Singleton;
import javax.script.Bindings; import javax.script.Bindings;
import javax.script.ScriptEngine; import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager; import javax.script.ScriptEngineManager;
...@@ -56,6 +57,7 @@ import java.util.Map; ...@@ -56,6 +57,7 @@ import java.util.Map;
/** /**
* Graph backed implementation of Search. * Graph backed implementation of Search.
*/ */
@Singleton
public class GraphBackedDiscoveryService implements DiscoveryService { public class GraphBackedDiscoveryService implements DiscoveryService {
private static final Logger LOG = LoggerFactory.getLogger(GraphBackedDiscoveryService.class); private static final Logger LOG = LoggerFactory.getLogger(GraphBackedDiscoveryService.class);
...@@ -72,7 +74,10 @@ public class GraphBackedDiscoveryService implements DiscoveryService { ...@@ -72,7 +74,10 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
@Override @Override
public String searchByFullText(String query) throws DiscoveryException { public String searchByFullText(String query) throws DiscoveryException {
Iterator iterator = titanGraph.query().has(Constants.ENTITY_TEXT_PROPERTY_KEY, Text.CONTAINS, query).vertices().iterator(); Iterator iterator = titanGraph.query()
.has(Constants.ENTITY_TEXT_PROPERTY_KEY, Text.CONTAINS, query)
.vertices()
.iterator();
JsonArray results = new JsonArray(); JsonArray results = new JsonArray();
while (iterator.hasNext()) { while (iterator.hasNext()) {
Vertex vertex = (Vertex) iterator.next(); Vertex vertex = (Vertex) iterator.next();
...@@ -111,13 +116,13 @@ public class GraphBackedDiscoveryService implements DiscoveryService { ...@@ -111,13 +116,13 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
throw new DiscoveryException("Invalid expression : " + dslQuery); throw new DiscoveryException("Invalid expression : " + dslQuery);
} }
private GremlinQueryResult evaluate(Expressions.Expression expression) { public GremlinQueryResult evaluate(Expressions.Expression expression) {
Expressions.Expression validatedExpression = QueryProcessor.validate(expression); Expressions.Expression validatedExpression = QueryProcessor.validate(expression);
GremlinQuery gremlinQuery = GremlinQuery gremlinQuery =
new GremlinTranslator(validatedExpression, graphPersistenceStrategy).translate(); new GremlinTranslator(validatedExpression, graphPersistenceStrategy).translate();
LOG.debug("Query = " + validatedExpression); LOG.debug("Query = {}", validatedExpression);
LOG.debug("Expression Tree = " + validatedExpression.treeString()); LOG.debug("Expression Tree = {}", validatedExpression.treeString());
LOG.debug("Gremlin Query = " + gremlinQuery.queryStr()); LOG.debug("Gremlin Query = {}", gremlinQuery.queryStr());
return new GremlinEvaluator(gremlinQuery, graphPersistenceStrategy, titanGraph).evaluate(); return new GremlinEvaluator(gremlinQuery, graphPersistenceStrategy, titanGraph).evaluate();
} }
......
...@@ -50,6 +50,7 @@ import org.slf4j.Logger; ...@@ -50,6 +50,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Singleton;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -64,6 +65,7 @@ import java.util.concurrent.atomic.AtomicInteger; ...@@ -64,6 +65,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* An implementation backed by a Graph database provided * An implementation backed by a Graph database provided
* as a Graph Service. * as a Graph Service.
*/ */
@Singleton
public class GraphBackedMetadataRepository implements MetadataRepository { public class GraphBackedMetadataRepository implements MetadataRepository {
private static final Logger LOG = private static final Logger LOG =
...@@ -431,7 +433,10 @@ public class GraphBackedMetadataRepository implements MetadataRepository { ...@@ -431,7 +433,10 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
return guid; return guid;
} }
private void addFullTextProperty(EntityProcessor entityProcessor, List<ITypedReferenceableInstance> newTypedInstances) throws MetadataException { private void addFullTextProperty(EntityProcessor entityProcessor,
List<ITypedReferenceableInstance> newTypedInstances)
throws MetadataException {
for (ITypedReferenceableInstance typedInstance : newTypedInstances) { // Traverse for (ITypedReferenceableInstance typedInstance : newTypedInstances) { // Traverse
Id id = typedInstance.getId(); Id id = typedInstance.getId();
Vertex instanceVertex = entityProcessor.idToVertexMap.get(id); Vertex instanceVertex = entityProcessor.idToVertexMap.get(id);
...@@ -440,13 +445,16 @@ public class GraphBackedMetadataRepository implements MetadataRepository { ...@@ -440,13 +445,16 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
} }
} }
private String getFullText(Vertex instanceVertex, boolean followReferences) throws MetadataException { private String getFullText(Vertex instanceVertex,
boolean followReferences) throws MetadataException {
String guid = instanceVertex.getProperty(Constants.GUID_PROPERTY_KEY); String guid = instanceVertex.getProperty(Constants.GUID_PROPERTY_KEY);
ITypedReferenceableInstance typedReference = graphToInstanceMapper.mapGraphToTypedInstance(guid, instanceVertex); ITypedReferenceableInstance typedReference =
graphToInstanceMapper.mapGraphToTypedInstance(guid, instanceVertex);
return getFullText(typedReference, followReferences); return getFullText(typedReference, followReferences);
} }
private String getFullText(ITypedInstance typedInstance, boolean followReferences) throws MetadataException { private String getFullText(ITypedInstance typedInstance,
boolean followReferences) throws MetadataException {
StringBuilder fullText = new StringBuilder(); StringBuilder fullText = new StringBuilder();
for (AttributeInfo attributeInfo : typedInstance.fieldMapping().fields.values()) { for (AttributeInfo attributeInfo : typedInstance.fieldMapping().fields.values()) {
Object attrValue = typedInstance.get(attributeInfo.name); Object attrValue = typedInstance.get(attributeInfo.name);
...@@ -502,8 +510,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { ...@@ -502,8 +510,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
private List<ITypedReferenceableInstance> discoverInstances(EntityProcessor entityProcessor) private List<ITypedReferenceableInstance> discoverInstances(EntityProcessor entityProcessor)
throws RepositoryException { throws RepositoryException {
List<ITypedReferenceableInstance> newTypedInstances = new ArrayList<>(); List<ITypedReferenceableInstance> newTypedInstances = new ArrayList<>();
for (IReferenceableInstance transientInstance : entityProcessor.idToInstanceMap for (IReferenceableInstance transientInstance : entityProcessor.idToInstanceMap.values()) {
.values()) {
LOG.debug("Discovered instance {}", transientInstance.getTypeName()); LOG.debug("Discovered instance {}", transientInstance.getTypeName());
try { try {
ClassType cT = typeSystem.getDataType( ClassType cT = typeSystem.getDataType(
...@@ -529,10 +536,10 @@ public class GraphBackedMetadataRepository implements MetadataRepository { ...@@ -529,10 +536,10 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
private String addDiscoveredInstances(IReferenceableInstance entity, private String addDiscoveredInstances(IReferenceableInstance entity,
EntityProcessor entityProcessor, EntityProcessor entityProcessor,
List<ITypedReferenceableInstance> newTypedInstances) List<ITypedReferenceableInstance> newTypedInstances)
throws MetadataException { throws MetadataException {
String typedInstanceGUID = null; String typedInstanceGUID = null;
for (ITypedReferenceableInstance typedInstance : newTypedInstances) { // Traverse for (ITypedReferenceableInstance typedInstance : newTypedInstances) { // Traverse over newInstances
// over newInstances
LOG.debug("Adding typed instance {}", typedInstance.getTypeName()); LOG.debug("Adding typed instance {}", typedInstance.getTypeName());
Id id = typedInstance.getId(); Id id = typedInstance.getId();
......
...@@ -19,6 +19,11 @@ ...@@ -19,6 +19,11 @@
package org.apache.hadoop.metadata; package org.apache.hadoop.metadata;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.thinkaurelius.titan.core.TitanGraph;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Vertex;
import com.tinkerpop.blueprints.util.io.graphson.GraphSONWriter;
import org.apache.hadoop.metadata.repository.graph.GraphHelper;
import org.apache.hadoop.metadata.typesystem.ITypedReferenceableInstance; import org.apache.hadoop.metadata.typesystem.ITypedReferenceableInstance;
import org.apache.hadoop.metadata.typesystem.Referenceable; import org.apache.hadoop.metadata.typesystem.Referenceable;
import org.apache.hadoop.metadata.typesystem.types.AttributeDefinition; import org.apache.hadoop.metadata.typesystem.types.AttributeDefinition;
...@@ -34,6 +39,8 @@ import org.apache.hadoop.metadata.typesystem.types.TraitType; ...@@ -34,6 +39,8 @@ import org.apache.hadoop.metadata.typesystem.types.TraitType;
import org.apache.hadoop.metadata.typesystem.types.TypeSystem; import org.apache.hadoop.metadata.typesystem.types.TypeSystem;
import org.testng.Assert; import org.testng.Assert;
import java.io.File;
import static org.apache.hadoop.metadata.typesystem.types.utils.TypesUtil.createClassTypeDef; import static org.apache.hadoop.metadata.typesystem.types.utils.TypesUtil.createClassTypeDef;
import static org.apache.hadoop.metadata.typesystem.types.utils.TypesUtil.createOptionalAttrDef; import static org.apache.hadoop.metadata.typesystem.types.utils.TypesUtil.createOptionalAttrDef;
import static org.apache.hadoop.metadata.typesystem.types.utils.TypesUtil.createRequiredAttrDef; import static org.apache.hadoop.metadata.typesystem.types.utils.TypesUtil.createRequiredAttrDef;
...@@ -49,6 +56,31 @@ public final class TestUtils { ...@@ -49,6 +56,31 @@ public final class TestUtils {
} }
/** /**
* Dumps the graph in GSON format in the path returned.
*
* @param titanGraph handle to graph
* @return path to the dump file
* @throws Exception
*/
public static String dumpGraph(TitanGraph titanGraph) throws Exception {
File tempFile = File.createTempFile("graph", ".gson");
System.out.println("tempFile.getPath() = " + tempFile.getPath());
GraphSONWriter.outputGraph(titanGraph, tempFile.getPath());
System.out.println("Vertices:");
for (Vertex vertex : titanGraph.getVertices()) {
System.out.println(GraphHelper.vertexString(vertex));
}
System.out.println("Edges:");
for (Edge edge : titanGraph.getEdges()) {
System.out.println(GraphHelper.edgeString(edge));
}
return tempFile.getPath();
}
/**
* Class Hierarchy is: * Class Hierarchy is:
* Department(name : String, employees : Array[Person]) * Department(name : String, employees : Array[Person])
* Person(name : String, department : Department, manager : Manager) * Person(name : String, department : Department, manager : Manager)
......
...@@ -27,9 +27,16 @@ metadata.graph.index.search.elasticsearch.client-only=false ...@@ -27,9 +27,16 @@ metadata.graph.index.search.elasticsearch.client-only=false
metadata.graph.index.search.elasticsearch.local-mode=true metadata.graph.index.search.elasticsearch.local-mode=true
######### Hive Lineage Configs #########
metadata.lineage.hive.table.type.name=hive_table
metadata.lineage.hive.column.type.name=hive_column
metadata.lineage.hive.table.column.name=columns
metadata.lineage.hive.process.type.name=hive_process
metadata.lineage.hive.process.inputs.name=inputTables
metadata.lineage.hive.process.outputs.name=outputTables
######### Security Properties ######### ######### Security Properties #########
# SSL config # SSL config
metadata.enableTLS=false metadata.enableTLS=false
######### Security Properties #########
...@@ -28,6 +28,16 @@ metadata.graph.index.search.elasticsearch.client-only=false ...@@ -28,6 +28,16 @@ metadata.graph.index.search.elasticsearch.client-only=false
metadata.graph.index.search.elasticsearch.local-mode=true metadata.graph.index.search.elasticsearch.local-mode=true
######### Hive Lineage Configs #########
# This models follows the quick-start guide
metadata.lineage.hive.table.type.name=Table
metadata.lineage.hive.column.type.name=Column
metadata.lineage.hive.table.column.name=columns
metadata.lineage.hive.process.type.name=LoadProcess
metadata.lineage.hive.process.inputs.name=inputTables
metadata.lineage.hive.process.outputs.name=outputTables
######### Security Properties ######### ######### Security Properties #########
# SSL config # SSL config
......
...@@ -40,7 +40,6 @@ import org.apache.hadoop.metadata.typesystem.types.utils.TypesUtil; ...@@ -40,7 +40,6 @@ import org.apache.hadoop.metadata.typesystem.types.utils.TypesUtil;
import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
import java.util.ArrayList;
import java.util.List; import java.util.List;
/** /**
...@@ -77,7 +76,7 @@ public class QuickStart { ...@@ -77,7 +76,7 @@ public class QuickStart {
private static final String COLUMN_TYPE = "Column"; private static final String COLUMN_TYPE = "Column";
private static final String TABLE_TYPE = "Table"; private static final String TABLE_TYPE = "Table";
private static final String VIEW_TYPE = "View"; private static final String VIEW_TYPE = "View";
private static final String LOAD_PROCESS_TYPE = "hive_process"; private static final String LOAD_PROCESS_TYPE = "LoadProcess";
private static final String STORAGE_DESC_TYPE = "StorageDesc"; private static final String STORAGE_DESC_TYPE = "StorageDesc";
private static final String[] TYPES = { private static final String[] TYPES = {
...@@ -135,7 +134,7 @@ public class QuickStart { ...@@ -135,7 +134,7 @@ public class QuickStart {
new AttributeDefinition("db", DATABASE_TYPE, new AttributeDefinition("db", DATABASE_TYPE,
Multiplicity.REQUIRED, false, null), Multiplicity.REQUIRED, false, null),
new AttributeDefinition("sd", STORAGE_DESC_TYPE, new AttributeDefinition("sd", STORAGE_DESC_TYPE,
Multiplicity.REQUIRED, false, null), Multiplicity.OPTIONAL, false, null),
attrDef("owner", DataTypes.STRING_TYPE), attrDef("owner", DataTypes.STRING_TYPE),
attrDef("createTime", DataTypes.INT_TYPE), attrDef("createTime", DataTypes.INT_TYPE),
attrDef("lastAccessTime", DataTypes.INT_TYPE), attrDef("lastAccessTime", DataTypes.INT_TYPE),
...@@ -228,36 +227,40 @@ public class QuickStart { ...@@ -228,36 +227,40 @@ public class QuickStart {
Referenceable sd = rawStorageDescriptor("hdfs://host:8000/apps/warehouse/sales", Referenceable sd = rawStorageDescriptor("hdfs://host:8000/apps/warehouse/sales",
"TextInputFormat", "TextOutputFormat", true); "TextInputFormat", "TextOutputFormat", true);
ArrayList<Referenceable> salesFactColumns = new ArrayList<>(); List<Referenceable> salesFactColumns = ImmutableList.of(
salesFactColumns.add(rawColumn("time_id", "int", "time id")); rawColumn("time_id", "int", "time id"),
salesFactColumns.add(rawColumn("product_id", "int", "product id")); rawColumn("product_id", "int", "product id"),
salesFactColumns.add(rawColumn("customer_id", "int", "customer id", "PII")); rawColumn("customer_id", "int", "customer id", "PII"),
salesFactColumns.add(rawColumn("sales", "double", "product id", "Metric")); rawColumn("sales", "double", "product id", "Metric")
);
Id salesFact = table("sales_fact", "sales fact table", Id salesFact = table("sales_fact", "sales fact table",
salesDB, sd, "Joe", "Managed", salesFactColumns, "Fact"); salesDB, sd, "Joe", "Managed", salesFactColumns, "Fact");
ArrayList<Referenceable> productDimColumns = new ArrayList<>(); List<Referenceable> productDimColumns = ImmutableList.of(
productDimColumns.add(rawColumn("product_id", "int", "product id")); rawColumn("product_id", "int", "product id"),
productDimColumns.add(rawColumn("product_name", "string", "product name")); rawColumn("product_name", "string", "product name"),
productDimColumns.add(rawColumn("brand_name", "int", "brand name")); rawColumn("brand_name", "int", "brand name")
);
Id productDim = table("product_dim", "product dimension table", Id productDim = table("product_dim", "product dimension table",
salesDB, sd, "John Doe", "Managed", productDimColumns, "Dimension"); salesDB, sd, "John Doe", "Managed", productDimColumns, "Dimension");
ArrayList<Referenceable> timeDimColumns = new ArrayList<>(); List<Referenceable> timeDimColumns = ImmutableList.of(
timeDimColumns.add(rawColumn("time_id", "int", "time id")); rawColumn("time_id", "int", "time id"),
timeDimColumns.add(rawColumn("dayOfYear", "int", "day Of Year")); rawColumn("dayOfYear", "int", "day Of Year"),
timeDimColumns.add(rawColumn("weekDay", "int", "week Day")); rawColumn("weekDay", "int", "week Day")
);
Id timeDim = table("time_dim", "time dimension table", Id timeDim = table("time_dim", "time dimension table",
salesDB, sd, "John Doe", "External", timeDimColumns, "Dimension"); salesDB, sd, "John Doe", "External", timeDimColumns, "Dimension");
ArrayList<Referenceable> customerDimColumns = new ArrayList<>(); List<Referenceable> customerDimColumns = ImmutableList.of(
customerDimColumns.add(rawColumn("customer_id", "int", "customer id", "PII")); rawColumn("customer_id", "int", "customer id", "PII"),
customerDimColumns.add(rawColumn("name", "string", "customer name", "PII")); rawColumn("name", "string", "customer name", "PII"),
customerDimColumns.add(rawColumn("address", "string", "customer address", "PII")); rawColumn("address", "string", "customer address", "PII")
);
Id customerDim = table("customer_dim", "customer dimension table", Id customerDim = table("customer_dim", "customer dimension table",
salesDB, sd, "fetl", "External", customerDimColumns, "Dimension"); salesDB, sd, "fetl", "External", customerDimColumns, "Dimension");
...@@ -270,29 +273,25 @@ public class QuickStart { ...@@ -270,29 +273,25 @@ public class QuickStart {
"sales fact daily materialized view", reportingDB, sd, "sales fact daily materialized view", reportingDB, sd,
"Joe BI", "Managed", salesFactColumns, "Metric"); "Joe BI", "Managed", salesFactColumns, "Metric");
Id loadSalesFactDaily = loadProcess("loadSalesDaily", "John ETL", loadProcess("loadSalesDaily", "John ETL",
ImmutableList.of(salesFact, timeDim), ImmutableList.of(salesFactDaily), ImmutableList.of(salesFact, timeDim), ImmutableList.of(salesFactDaily),
"create table as select ", "plan", "id", "graph", "create table as select ", "plan", "id", "graph",
"ETL"); "ETL");
System.out.println("added loadSalesFactDaily = " + loadSalesFactDaily);
Id productDimView = view("product_dim_view", reportingDB, view("product_dim_view", reportingDB,
ImmutableList.of(productDim), "Dimension", "JdbcAccess"); ImmutableList.of(productDim), "Dimension", "JdbcAccess");
System.out.println("added productDimView = " + productDimView);
Id customerDimView = view("customer_dim_view", reportingDB, view("customer_dim_view", reportingDB,
ImmutableList.of(customerDim), "Dimension", "JdbcAccess"); ImmutableList.of(customerDim), "Dimension", "JdbcAccess");
System.out.println("added customerDimView = " + customerDimView);
Id salesFactMonthly = table("sales_fact_monthly_mv", Id salesFactMonthly = table("sales_fact_monthly_mv",
"sales fact monthly materialized view", "sales fact monthly materialized view",
reportingDB, sd, "Jane BI", "Managed", salesFactColumns, "Metric"); reportingDB, sd, "Jane BI", "Managed", salesFactColumns, "Metric");
Id loadSalesFactMonthly = loadProcess("loadSalesMonthly", "John ETL", loadProcess("loadSalesMonthly", "John ETL",
ImmutableList.of(salesFactDaily), ImmutableList.of(salesFactMonthly), ImmutableList.of(salesFactDaily), ImmutableList.of(salesFactMonthly),
"create table as select ", "plan", "id", "graph", "create table as select ", "plan", "id", "graph",
"ETL"); "ETL");
System.out.println("added loadSalesFactMonthly = " + loadSalesFactMonthly);
} }
private Id createInstance(Referenceable referenceable) throws Exception { private Id createInstance(Referenceable referenceable) throws Exception {
...@@ -357,7 +356,8 @@ public class QuickStart { ...@@ -357,7 +356,8 @@ public class QuickStart {
referenceable.set("lastAccessTime", System.currentTimeMillis()); referenceable.set("lastAccessTime", System.currentTimeMillis());
referenceable.set("retention", System.currentTimeMillis()); referenceable.set("retention", System.currentTimeMillis());
referenceable.set("db", dbId); referenceable.set("db", dbId);
referenceable.set("sd", sd); // todo: fix this bug with object walker
// referenceable.set("sd", sd);
referenceable.set("columns", columns); referenceable.set("columns", columns);
return createInstance(referenceable); return createInstance(referenceable);
...@@ -464,6 +464,7 @@ public class QuickStart { ...@@ -464,6 +464,7 @@ public class QuickStart {
"Table as _loop0 loop (hive_process outputTables) withPath", "Table as _loop0 loop (hive_process outputTables) withPath",
"Table as src loop (hive_process outputTables) as dest select src.name as srcTable, dest.name as destTable withPath", "Table as src loop (hive_process outputTables) as dest select src.name as srcTable, dest.name as destTable withPath",
*/ */
"Table where name=\"sales_fact\", columns",
"Table where name=\"sales_fact\", columns as column select column.name, column.dataType, column.comment", "Table where name=\"sales_fact\", columns as column select column.name, column.dataType, column.comment",
}; };
} }
......
...@@ -130,4 +130,38 @@ public class HiveLineageResource { ...@@ -130,4 +130,38 @@ public class HiveLineageResource {
Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR)); Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
} }
} }
/**
* Return the schema for the given tableName.
*
* @param tableName table name
*/
@GET
@Path("schema/{tableName}")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response schema(@Context HttpServletRequest request,
@PathParam("tableName") String tableName) {
Preconditions.checkNotNull(tableName, "table name cannot be null");
LOG.info("Fetching schema for tableName={}", tableName);
try {
final String jsonResult = lineageService.getSchema(tableName);
JSONObject response = new JSONObject();
response.put(MetadataServiceClient.REQUEST_ID, Servlets.getRequestId());
response.put("tableName", tableName);
response.put(MetadataServiceClient.RESULTS, new JSONObject(jsonResult));
return Response.ok(response).build();
} catch (DiscoveryException e) {
LOG.error("Unable to get schema for table {}", tableName, e);
throw new WebApplicationException(
Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (JSONException e) {
LOG.error("Unable to get schema for table {}", tableName, e);
throw new WebApplicationException(
Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
}
}
} }
...@@ -25,9 +25,16 @@ metadata.graph.index.search.backend=lucene ...@@ -25,9 +25,16 @@ metadata.graph.index.search.backend=lucene
metadata.graph.index.search.directory=webapp/target/data/lucene metadata.graph.index.search.directory=webapp/target/data/lucene
######### Hive Lineage Configs #########
metadata.lineage.hive.table.type.name=hive_table
metadata.lineage.hive.column.type.name=hive_column
metadata.lineage.hive.table.column.name=columns
metadata.lineage.hive.process.type.name=hive_process
metadata.lineage.hive.process.inputs.name=inputTables
metadata.lineage.hive.process.outputs.name=outputTables
######### Security Properties ######### ######### Security Properties #########
# SSL config # SSL config
metadata.enableTLS=false metadata.enableTLS=false
######### Security Properties #########
...@@ -27,6 +27,7 @@ import org.apache.hadoop.metadata.typesystem.Referenceable; ...@@ -27,6 +27,7 @@ import org.apache.hadoop.metadata.typesystem.Referenceable;
import org.apache.hadoop.metadata.typesystem.TypesDef; import org.apache.hadoop.metadata.typesystem.TypesDef;
import org.apache.hadoop.metadata.typesystem.json.InstanceSerialization; import org.apache.hadoop.metadata.typesystem.json.InstanceSerialization;
import org.apache.hadoop.metadata.typesystem.json.TypesSerialization; import org.apache.hadoop.metadata.typesystem.json.TypesSerialization;
import org.apache.hadoop.metadata.typesystem.persistence.Id;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
import org.testng.Assert; import org.testng.Assert;
import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeClass;
...@@ -44,7 +45,7 @@ public abstract class BaseResourceIT { ...@@ -44,7 +45,7 @@ public abstract class BaseResourceIT {
protected WebResource service; protected WebResource service;
protected MetadataServiceClient serviceClient; protected MetadataServiceClient serviceClient;
public static String baseUrl = "http://localhost:21000/";; public static String baseUrl = "http://localhost:21000/";
@BeforeClass @BeforeClass
public void setUp() throws Exception { public void setUp() throws Exception {
...@@ -80,7 +81,7 @@ public abstract class BaseResourceIT { ...@@ -80,7 +81,7 @@ public abstract class BaseResourceIT {
Assert.assertNotNull(response.get(MetadataServiceClient.REQUEST_ID)); Assert.assertNotNull(response.get(MetadataServiceClient.REQUEST_ID));
} }
protected Referenceable createInstance(Referenceable referenceable) throws Exception { protected Id createInstance(Referenceable referenceable) throws Exception {
String typeName = referenceable.getTypeName(); String typeName = referenceable.getTypeName();
System.out.println("creating instance of type " + typeName); System.out.println("creating instance of type " + typeName);
...@@ -91,6 +92,6 @@ public abstract class BaseResourceIT { ...@@ -91,6 +92,6 @@ public abstract class BaseResourceIT {
System.out.println("created instance for type " + typeName + ", guid: " + guid); System.out.println("created instance for type " + typeName + ", guid: " + guid);
// return the reference to created instance with guid // return the reference to created instance with guid
return new Referenceable(guid, referenceable.getTypeName(), referenceable.getValuesMap()); return new Id(guid, 0, referenceable.getTypeName());
} }
} }
...@@ -68,6 +68,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT { ...@@ -68,6 +68,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
private static final String TABLE_NAME = "bar"; private static final String TABLE_NAME = "bar";
private Referenceable tableInstance; private Referenceable tableInstance;
private Id tableId;
@BeforeClass @BeforeClass
public void setUp() throws Exception { public void setUp() throws Exception {
...@@ -79,8 +80,9 @@ public class EntityJerseyResourceIT extends BaseResourceIT { ...@@ -79,8 +80,9 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
@Test @Test
public void testSubmitEntity() throws Exception { public void testSubmitEntity() throws Exception {
tableInstance = createHiveTableInstance(); tableInstance = createHiveTableInstance();
tableId = createInstance(tableInstance);
String guid = getGuid(tableInstance); final String guid = tableId._getId();
try { try {
Assert.assertNotNull(UUID.fromString(guid)); Assert.assertNotNull(UUID.fromString(guid));
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
...@@ -88,18 +90,9 @@ public class EntityJerseyResourceIT extends BaseResourceIT { ...@@ -88,18 +90,9 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
} }
} }
private String getGuid(Referenceable referenceable) throws Exception {
Id id = referenceable.getId();
Assert.assertNotNull(id);
String guid = id.id;
Assert.assertNotNull(guid);
return guid;
}
@Test (dependsOnMethods = "testSubmitEntity") @Test (dependsOnMethods = "testSubmitEntity")
public void testAddProperty() throws Exception { public void testAddProperty() throws Exception {
String guid = getGuid(tableInstance); final String guid = tableId._getId();
//add property //add property
String description = "bar table - new desc"; String description = "bar table - new desc";
ClientResponse clientResponse = addProperty(guid, "description", description); ClientResponse clientResponse = addProperty(guid, "description", description);
...@@ -131,21 +124,18 @@ public class EntityJerseyResourceIT extends BaseResourceIT { ...@@ -131,21 +124,18 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
databaseInstance.set("name", "newdb"); databaseInstance.set("name", "newdb");
databaseInstance.set("description", "new database"); databaseInstance.set("description", "new database");
// ClassType classType = typeSystem.getDataType(ClassType.class, DATABASE_TYPE); Id dbInstance = createInstance(databaseInstance);
// ITypedReferenceableInstance dbInstance = classType.convert(databaseInstance, Multiplicity.REQUIRED); String dbId = dbInstance._getId();
Referenceable dbInstance = createInstance(databaseInstance);
String dbId = getGuid(dbInstance);
//Add reference property //Add reference property
String guid = getGuid(tableInstance); final String guid = tableId._getId();
ClientResponse clientResponse = addProperty(guid, "database", dbId); ClientResponse clientResponse = addProperty(guid, "database", dbId);
Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode()); Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
} }
@Test(dependsOnMethods = "testSubmitEntity") @Test(dependsOnMethods = "testSubmitEntity")
public void testGetEntityDefinition() throws Exception { public void testGetEntityDefinition() throws Exception {
String guid = getGuid(tableInstance); final String guid = tableId._getId();
ClientResponse clientResponse = getEntityDefinition(guid); ClientResponse clientResponse = getEntityDefinition(guid);
Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode()); Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
...@@ -274,7 +264,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT { ...@@ -274,7 +264,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
@Test (dependsOnMethods = "testSubmitEntity") @Test (dependsOnMethods = "testSubmitEntity")
public void testGetTraitNames() throws Exception { public void testGetTraitNames() throws Exception {
String guid = getGuid(tableInstance); final String guid = tableId._getId();
ClientResponse clientResponse = service ClientResponse clientResponse = service
.path("api/metadata/entities/traits/list") .path("api/metadata/entities/traits/list")
.path(guid) .path(guid)
...@@ -307,7 +297,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT { ...@@ -307,7 +297,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
String traitInstanceAsJSON = InstanceSerialization.toJson(traitInstance, true); String traitInstanceAsJSON = InstanceSerialization.toJson(traitInstance, true);
LOG.debug("traitInstanceAsJSON = " + traitInstanceAsJSON); LOG.debug("traitInstanceAsJSON = " + traitInstanceAsJSON);
String guid = getGuid(tableInstance); final String guid = tableId._getId();
ClientResponse clientResponse = service ClientResponse clientResponse = service
.path("api/metadata/entities/traits/add") .path("api/metadata/entities/traits/add")
.path(guid) .path(guid)
...@@ -350,7 +340,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT { ...@@ -350,7 +340,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
@Test (dependsOnMethods = "testAddTrait") @Test (dependsOnMethods = "testAddTrait")
public void testDeleteTrait() throws Exception { public void testDeleteTrait() throws Exception {
final String traitName = "PII_Trait"; final String traitName = "PII_Trait";
final String guid = getGuid(tableInstance); final String guid = tableId._getId();
ClientResponse clientResponse = service ClientResponse clientResponse = service
.path("api/metadata/entities/traits/delete") .path("api/metadata/entities/traits/delete")
...@@ -478,6 +468,6 @@ public class EntityJerseyResourceIT extends BaseResourceIT { ...@@ -478,6 +468,6 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
List<String> traits = tableInstance.getTraits(); List<String> traits = tableInstance.getTraits();
Assert.assertEquals(traits.size(), 7); Assert.assertEquals(traits.size(), 7);
return createInstance(tableInstance); return tableInstance;
} }
} }
...@@ -25,6 +25,7 @@ import org.apache.hadoop.metadata.MetadataServiceClient; ...@@ -25,6 +25,7 @@ import org.apache.hadoop.metadata.MetadataServiceClient;
import org.apache.hadoop.metadata.typesystem.Referenceable; import org.apache.hadoop.metadata.typesystem.Referenceable;
import org.apache.hadoop.metadata.typesystem.Struct; import org.apache.hadoop.metadata.typesystem.Struct;
import org.apache.hadoop.metadata.typesystem.TypesDef; import org.apache.hadoop.metadata.typesystem.TypesDef;
import org.apache.hadoop.metadata.typesystem.persistence.Id;
import org.apache.hadoop.metadata.typesystem.types.ClassType; import org.apache.hadoop.metadata.typesystem.types.ClassType;
import org.apache.hadoop.metadata.typesystem.types.DataTypes; import org.apache.hadoop.metadata.typesystem.types.DataTypes;
import org.apache.hadoop.metadata.typesystem.types.EnumTypeDefinition; import org.apache.hadoop.metadata.typesystem.types.EnumTypeDefinition;
...@@ -169,7 +170,7 @@ public class MetadataDiscoveryJerseyResourceIT extends BaseResourceIT { ...@@ -169,7 +170,7 @@ public class MetadataDiscoveryJerseyResourceIT extends BaseResourceIT {
ImmutableList.<String>of(), ImmutableList.<String>of(),
TypesUtil.createRequiredAttrDef("tag", DataTypes.STRING_TYPE)); TypesUtil.createRequiredAttrDef("tag", DataTypes.STRING_TYPE));
HierarchicalTypeDefinition<TraitType> piiTrait = HierarchicalTypeDefinition<TraitType> piiTrait =
TypesUtil.createTraitTypeDef("PII", ImmutableList.<String>of()); TypesUtil.createTraitTypeDef("PII_TYPE", ImmutableList.<String>of());
HierarchicalTypeDefinition<TraitType> phiTrait = HierarchicalTypeDefinition<TraitType> phiTrait =
TypesUtil.createTraitTypeDef("PHI", ImmutableList.<String>of()); TypesUtil.createTraitTypeDef("PHI", ImmutableList.<String>of());
HierarchicalTypeDefinition<TraitType> pciTrait = HierarchicalTypeDefinition<TraitType> pciTrait =
...@@ -190,9 +191,9 @@ public class MetadataDiscoveryJerseyResourceIT extends BaseResourceIT { ...@@ -190,9 +191,9 @@ public class MetadataDiscoveryJerseyResourceIT extends BaseResourceIT {
createType(typesDef); createType(typesDef);
} }
private Referenceable createInstance() throws Exception { private Id createInstance() throws Exception {
Referenceable entityInstance = new Referenceable("dsl_test_type", Referenceable entityInstance = new Referenceable("dsl_test_type",
"Classification", "PII", "PHI", "PCI", "SOX", "SEC", "Finance"); "Classification", "PII_TYPE", "PHI", "PCI", "SOX", "SEC", "Finance");
entityInstance.set("name", "foo name"); entityInstance.set("name", "foo name");
entityInstance.set("description", "bar description"); entityInstance.set("description", "bar description");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment