Commit f592043c by Venkatesh Seetharam

Add Hive Lineage API

parent 1380771d
......@@ -352,8 +352,9 @@ public class HiveDataModelGenerator {
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("sd", HiveDataTypes.HIVE_STORAGEDESC.getName(),
Multiplicity.REQUIRED, false, null),
//new AttributeDefinition("columns", String.format("array<%s>", DefinedTypes
// .HIVE_COLUMN.getName()), Multiplicity.COLLECTION, true, null),
new AttributeDefinition("columns",
DataTypes.arrayTypeName(HiveDataTypes.HIVE_COLUMN.getName()),
Multiplicity.COLLECTION, true, null),
new AttributeDefinition("parameters", STRING_MAP_TYPE.getName(),
Multiplicity.OPTIONAL, false, null),
......@@ -382,11 +383,11 @@ public class HiveDataModelGenerator {
new AttributeDefinition("sd", HiveDataTypes.HIVE_STORAGEDESC.getName(),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("partitionKeys",
String.format("array<%s>", HiveDataTypes.HIVE_COLUMN.getName()),
DataTypes.arrayTypeName(HiveDataTypes.HIVE_COLUMN.getName()),
Multiplicity.OPTIONAL, false, null),
// new AttributeDefinition("columns", // todo - ask venkat
// String.format("array<%s>", HiveDataTypes.HIVE_COLUMN.getName()),
// Multiplicity.COLLECTION, true, null),
new AttributeDefinition("columns",
DataTypes.arrayTypeName(HiveDataTypes.HIVE_COLUMN.getName()),
Multiplicity.COLLECTION, true, null),
new AttributeDefinition("parameters", STRING_MAP_TYPE.getName(),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("viewOriginalText", DataTypes.STRING_TYPE.getName(),
......@@ -488,11 +489,11 @@ public class HiveDataModelGenerator {
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("userName", DataTypes.STRING_TYPE.getName(),
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("sourceTableNames",
String.format("array<%s>", HiveDataTypes.HIVE_TABLE.getName()),
new AttributeDefinition("inputTables",
DataTypes.arrayTypeName(HiveDataTypes.HIVE_TABLE.getName()),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("targetTableNames",
String.format("array<%s>", HiveDataTypes.HIVE_TABLE.getName()),
new AttributeDefinition("outputTables",
DataTypes.arrayTypeName(HiveDataTypes.HIVE_TABLE.getName()),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("queryText", DataTypes.STRING_TYPE.getName(),
Multiplicity.REQUIRED, false, null),
......
......@@ -26,8 +26,6 @@ import org.apache.hadoop.metadata.MetadataServiceClient;
import org.apache.hadoop.metadata.hive.bridge.HiveMetaStoreBridge;
import org.apache.hadoop.metadata.hive.model.HiveDataModelGenerator;
import org.apache.hadoop.metadata.hive.model.HiveDataTypes;
import org.apache.hadoop.metadata.typesystem.TypesDef;
import org.apache.hadoop.metadata.typesystem.json.TypesSerialization;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.testng.Assert;
......
......@@ -692,7 +692,6 @@
<filtering>false</filtering>
<includes>
<include>application.properties</include>
<include>graph.properties</include>
<include>log4j.xml</include>
</includes>
</resource>
......@@ -893,6 +892,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7.2</version>
<configuration>
<!--<skipTests>true</skipTests>-->
<forkMode>always</forkMode>
</configuration>
<dependencies>
......
......@@ -22,6 +22,8 @@ import com.google.inject.Scopes;
import com.google.inject.throwingproviders.ThrowingProviderBinder;
import com.thinkaurelius.titan.core.TitanGraph;
import org.apache.hadoop.metadata.discovery.DiscoveryService;
import org.apache.hadoop.metadata.discovery.HiveLineageService;
import org.apache.hadoop.metadata.discovery.LineageService;
import org.apache.hadoop.metadata.discovery.SearchIndexer;
import org.apache.hadoop.metadata.discovery.graph.GraphBackedDiscoveryService;
import org.apache.hadoop.metadata.repository.MetadataRepository;
......@@ -48,6 +50,7 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
private Class<? extends MetadataService> metadataService;
private Class<? extends DiscoveryService> discoveryService;
private Class<? extends SearchIndexer> searchIndexer;
private Class<? extends LineageService> lineageService;
public RepositoryMetadataModule() {
// GraphServiceConfigurator gsp = new GraphServiceConfigurator();
......@@ -59,6 +62,7 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
this.metadataService = DefaultMetadataService.class;
this.discoveryService = GraphBackedDiscoveryService.class;
this.searchIndexer = GraphBackedSearchIndexer.class;
this.lineageService = HiveLineageService.class;
}
protected void configure() {
......@@ -86,5 +90,7 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
bind(DiscoveryService.class).to(discoveryService);
bind(SearchIndexer.class).to(searchIndexer);
bind(LineageService.class).to(lineageService);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.discovery;
import com.thinkaurelius.titan.core.TitanGraph;
import org.apache.hadoop.metadata.discovery.graph.DefaultGraphPersistenceStrategy;
import org.apache.hadoop.metadata.query.Expressions;
import org.apache.hadoop.metadata.query.GremlinQuery;
import org.apache.hadoop.metadata.query.GremlinTranslator;
import org.apache.hadoop.metadata.query.HiveLineageQuery;
import org.apache.hadoop.metadata.query.HiveWhereUsedQuery;
import org.apache.hadoop.metadata.query.QueryProcessor;
import org.apache.hadoop.metadata.repository.MetadataRepository;
import org.apache.hadoop.metadata.repository.graph.GraphProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.immutable.List;
import javax.inject.Inject;
import javax.inject.Singleton;
/**
* Hive implementation of Lineage service interface.
*/
@Singleton
public class HiveLineageService implements LineageService {
private static final Logger LOG = LoggerFactory.getLogger(HiveLineageService.class);
// todo - externalize these into configuration
private static final String HIVE_TABLE_TYPE_NAME = "hive_table";
private static final String HIVE_PROCESS_TYPE_NAME = "hive_process";
private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME = "inputTables";
private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME = "outputTables";
private final TitanGraph titanGraph;
private final DefaultGraphPersistenceStrategy graphPersistenceStrategy;
@Inject
HiveLineageService(GraphProvider<TitanGraph> graphProvider,
MetadataRepository metadataRepository) throws DiscoveryException {
this.titanGraph = graphProvider.get();
this.graphPersistenceStrategy = new DefaultGraphPersistenceStrategy(metadataRepository);
}
/**
* Return the lineage outputs for the given tableName.
*
* @param tableName tableName
* @return Lineage Outputs as JSON
*/
@Override
public String getOutputs(String tableName) throws DiscoveryException {
LOG.info("Fetching lineage outputs for tableName={}", tableName);
try {
HiveWhereUsedQuery outputsQuery = new HiveWhereUsedQuery(
HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME,
Option.empty(), Option.<List<String>>empty(), true,
graphPersistenceStrategy, titanGraph);
Expressions.Expression expression = outputsQuery.expr();
Expressions.Expression validatedExpression = QueryProcessor.validate(expression);
GremlinQuery gremlinQuery = new GremlinTranslator(
validatedExpression, graphPersistenceStrategy).translate();
if (LOG.isDebugEnabled()) {
System.out.println("Query = " + validatedExpression);
System.out.println("Expression Tree = " + validatedExpression.treeString());
System.out.println("Gremlin Query = " + gremlinQuery.queryStr());
}
return outputsQuery.evaluate().toJson();
} catch (Exception e) { // unable to catch ExpressionException
throw new DiscoveryException("Invalid expression", e);
}
}
/**
* Return the lineage inputs for the given tableName.
*
* @param tableName tableName
* @return Lineage Inputs as JSON
*/
@Override
public String getInputs(String tableName) throws DiscoveryException {
LOG.info("Fetching lineage inputs for tableName={}", tableName);
try {
HiveLineageQuery inputsQuery = new HiveLineageQuery(
HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME,
Option.empty(), Option.<List<String>>empty(), true,
graphPersistenceStrategy, titanGraph);
Expressions.Expression expression = inputsQuery.expr();
Expressions.Expression validatedExpression = QueryProcessor.validate(expression);
GremlinQuery gremlinQuery =
new GremlinTranslator(validatedExpression, graphPersistenceStrategy).translate();
if (LOG.isDebugEnabled()) {
System.out.println("Query = " + validatedExpression);
System.out.println("Expression Tree = " + validatedExpression.treeString());
System.out.println("Gremlin Query = " + gremlinQuery.queryStr());
}
return inputsQuery.evaluate().toJson();
} catch (Exception e) { // unable to catch ExpressionException
throw new DiscoveryException("Invalid expression", e);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.discovery;
/**
* Lineage service interface.
*/
public interface LineageService {
/**
* Return the lineage outputs for the given tableName.
*
* @param tableName tableName
* @return Outputs as JSON
*/
String getOutputs(String tableName) throws DiscoveryException;
/**
* Return the lineage inputs for the given tableName.
*
* @param tableName tableName
* @return Inputs as JSON
*/
String getInputs(String tableName) throws DiscoveryException;
}
......@@ -72,6 +72,8 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
LoggerFactory.getLogger(GraphBackedMetadataRepository.class);
private static final String FULL_TEXT_DELIMITER = " ";
private static final String EDGE_LABEL_PREFIX = "__";
private final AtomicInteger ID_SEQ = new AtomicInteger(0);
private final TypedInstanceToGraphMapper instanceToGraphMapper
......@@ -114,7 +116,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@Override
public String getEdgeLabel(IDataType<?> dataType, AttributeInfo aInfo) {
return dataType.getName() + "." + aInfo.name;
return EDGE_LABEL_PREFIX + dataType.getName() + "." + aInfo.name;
}
@Override
......@@ -631,7 +633,8 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
break;
default:
break;
throw new IllegalArgumentException("Unknown type category: "
+ dataType.getTypeCategory());
}
}
......@@ -653,10 +656,10 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
StringBuilder buffer = new StringBuilder();
Object[] array = list.toArray();
for (int index = 0; index < array.length; index++) {
String propertyNameWithSuffix = propertyName + "." + index;
buffer.append(propertyNameWithSuffix).append(",");
mapCollectionEntryToVertex(id, instanceVertex, attributeInfo,
idToVertexMap, elementType, array[index], propertyNameWithSuffix);
String entryId = mapCollectionEntryToVertex(id, instanceVertex,
attributeInfo, idToVertexMap, elementType, array[index],
propertyName, String.valueOf(index));
buffer.append(entryId).append(",");
}
buffer.setLength(buffer.length() - 1);
......@@ -672,8 +675,8 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
LOG.debug("Mapping instance {} to vertex {} for name {}",
typedInstance.getTypeName(), instanceVertex, attributeInfo.name);
@SuppressWarnings("unchecked")
Map<Object, Object> collection = (Map<Object, Object>) typedInstance
.get(attributeInfo.name);
Map<Object, Object> collection =
(Map<Object, Object>) typedInstance.get(attributeInfo.name);
if (collection == null || collection.isEmpty()) {
return;
}
......@@ -682,10 +685,10 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
StringBuilder buffer = new StringBuilder();
IDataType elementType = ((DataTypes.MapType) attributeInfo.dataType()).getValueType();
for (Map.Entry entry : collection.entrySet()) {
String propertyNameWithSuffix = propertyName + "." + entry.getKey();
buffer.append(propertyNameWithSuffix).append(",");
mapCollectionEntryToVertex(id, instanceVertex, attributeInfo,
idToVertexMap, elementType, entry.getValue(), propertyNameWithSuffix);
String entryId = mapCollectionEntryToVertex(id, instanceVertex, attributeInfo,
idToVertexMap, elementType, entry.getValue(),
propertyName, String.valueOf(entry.getKey()));
buffer.append(entryId).append(",");
}
buffer.setLength(buffer.length() - 1);
......@@ -693,49 +696,49 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
instanceVertex.setProperty(propertyName, buffer.toString());
}
private void mapCollectionEntryToVertex(Id id, Vertex instanceVertex,
AttributeInfo attributeInfo,
Map<Id, Vertex> idToVertexMap,
IDataType elementType, Object value,
String propertyName) throws MetadataException {
private String mapCollectionEntryToVertex(Id id, Vertex instanceVertex,
AttributeInfo attributeInfo,
Map<Id, Vertex> idToVertexMap,
IDataType elementType, Object value,
String propertyName,
String key) throws MetadataException {
final String propertyNameWithSuffix = propertyName + "." + key;
final String edgeLabel = EDGE_LABEL_PREFIX + propertyName;
switch (elementType.getTypeCategory()) {
case PRIMITIVE:
instanceVertex.setProperty(propertyName, value);
break;
case ENUM:
instanceVertex.setProperty(propertyName, value);
break;
instanceVertex.setProperty(propertyNameWithSuffix, value);
return propertyNameWithSuffix;
case ARRAY:
case MAP:
case TRAIT:
// do nothing
break;
return null;
case STRUCT:
Vertex structInstanceVertex = mapStructInstanceToVertex(id,
(ITypedStruct) value, attributeInfo, idToVertexMap);
// add an edge to the newly created vertex from the parent
GraphHelper.addEdge(
titanGraph, instanceVertex, structInstanceVertex, propertyName);
break;
Edge structElementEdge = GraphHelper.addEdge(
titanGraph, instanceVertex, structInstanceVertex, edgeLabel);
return propertyName + "." + key + ":" + structElementEdge.getId();
case CLASS:
Id referenceId = (Id) value;
mapClassReferenceAsEdge(
instanceVertex, idToVertexMap, propertyName, referenceId);
break;
String edgeId = mapClassReferenceAsEdge(
instanceVertex, idToVertexMap, edgeLabel, referenceId);
return propertyName + "." + key + ":" + edgeId;
default:
break;
throw new IllegalArgumentException("Unknown type category: "
+ elementType.getTypeCategory());
}
}
private void mapClassReferenceAsEdge(Vertex instanceVertex,
Map<Id, Vertex> idToVertexMap,
String propertyKey, Id id) throws MetadataException {
private String mapClassReferenceAsEdge(Vertex instanceVertex,
Map<Id, Vertex> idToVertexMap,
String propertyKey, Id id) throws MetadataException {
if (id != null) {
Vertex referenceVertex;
if (id.isAssigned()) {
......@@ -746,9 +749,13 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
if (referenceVertex != null) {
// add an edge to the class vertex from the instance
GraphHelper.addEdge(titanGraph, instanceVertex, referenceVertex, propertyKey);
Edge edge = GraphHelper.addEdge(
titanGraph, instanceVertex, referenceVertex, propertyKey);
return String.valueOf(edge.getId());
}
}
return null;
}
private Vertex mapStructInstanceToVertex(Id id, ITypedStruct structInstance,
......@@ -912,11 +919,13 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
break;
case ARRAY:
mapVertexToArrayInstance(instanceVertex, typedInstance, attributeInfo);
mapVertexToArrayInstance(
instanceVertex, typedInstance, attributeInfo, vertexPropertyName);
break;
case MAP:
mapVertexToMapInstance(instanceVertex, typedInstance, attributeInfo);
mapVertexToMapInstance(
instanceVertex, typedInstance, attributeInfo, vertexPropertyName);
break;
case STRUCT:
......@@ -958,8 +967,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
return mapGraphToTypedInstance(guid, referenceVertex);
} else {
Id referenceId = new Id(guid,
referenceVertex
.<Integer>getProperty(Constants.VERSION_PROPERTY_KEY),
referenceVertex.<Integer>getProperty( Constants.VERSION_PROPERTY_KEY),
dataType.getName());
LOG.debug("Found non-composite, adding id {} ", referenceId);
return referenceId;
......@@ -972,9 +980,9 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@SuppressWarnings("unchecked")
public void mapVertexToArrayInstance(Vertex instanceVertex, ITypedInstance typedInstance,
AttributeInfo attributeInfo) throws MetadataException {
AttributeInfo attributeInfo,
String propertyName) throws MetadataException {
LOG.debug("mapping vertex {} to array {}", instanceVertex, attributeInfo.name);
String propertyName = typedInstance.getTypeName() + "." + attributeInfo.name;
String keys = instanceVertex.getProperty(propertyName);
if (keys == null || keys.length() == 0) {
return;
......@@ -984,8 +992,8 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
ArrayList values = new ArrayList();
for (String propertyNameWithSuffix : keys.split(",")) {
values.add(mapVertexToCollectionEntry(
instanceVertex, attributeInfo, elementType, propertyNameWithSuffix));
values.add(mapVertexToCollectionEntry(instanceVertex, attributeInfo,
elementType, propertyName, propertyNameWithSuffix));
}
typedInstance.set(attributeInfo.name, values);
......@@ -994,9 +1002,13 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
public Object mapVertexToCollectionEntry(Vertex instanceVertex,
AttributeInfo attributeInfo,
IDataType elementType,
String propertyName,
String propertyNameWithSuffix)
throws MetadataException {
throws MetadataException {
final String edgeLabel = EDGE_LABEL_PREFIX + propertyName;
final String edgeId = propertyNameWithSuffix.substring(
propertyNameWithSuffix.lastIndexOf(":") + 1, propertyNameWithSuffix.length());
switch (elementType.getTypeCategory()) {
case PRIMITIVE:
return instanceVertex.getProperty(propertyNameWithSuffix);
......@@ -1012,11 +1024,11 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
case STRUCT:
return getStructInstanceFromVertex(instanceVertex,
elementType, attributeInfo.name, propertyNameWithSuffix);
elementType, attributeInfo.name, edgeLabel, edgeId);
case CLASS:
return mapClassReferenceToVertex(
instanceVertex, attributeInfo, propertyNameWithSuffix, elementType);
instanceVertex, attributeInfo, edgeLabel, elementType, edgeId);
default:
break;
......@@ -1027,9 +1039,9 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@SuppressWarnings("unchecked")
private void mapVertexToMapInstance(Vertex instanceVertex, ITypedInstance typedInstance,
AttributeInfo attributeInfo) throws MetadataException {
AttributeInfo attributeInfo,
String propertyName) throws MetadataException {
LOG.debug("mapping vertex {} to array {}", instanceVertex, attributeInfo.name);
String propertyName = typedInstance.getTypeName() + "." + attributeInfo.name;
String keys = instanceVertex.getProperty(propertyName);
if (keys == null || keys.length() == 0) {
return;
......@@ -1040,9 +1052,10 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
HashMap values = new HashMap();
for (String propertyNameWithSuffix : keys.split(",")) {
final String key = propertyNameWithSuffix.substring(
propertyNameWithSuffix.lastIndexOf("."), propertyNameWithSuffix.length());
values.put(key, mapVertexToCollectionEntry(
instanceVertex, attributeInfo, elementType, propertyNameWithSuffix));
propertyNameWithSuffix.lastIndexOf(".") + 1,
propertyNameWithSuffix.lastIndexOf(":"));
values.put(key, mapVertexToCollectionEntry(instanceVertex, attributeInfo,
elementType, propertyName, propertyNameWithSuffix));
}
typedInstance.set(attributeInfo.name, values);
......@@ -1051,29 +1064,63 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
private ITypedStruct getStructInstanceFromVertex(Vertex instanceVertex,
IDataType elemType,
String attributeName,
String relationshipLabel)
throws MetadataException {
String relationshipLabel,
String edgeId) throws MetadataException {
LOG.debug("Finding edge for {} -> label {} ", instanceVertex, relationshipLabel);
Iterator<Edge> results = instanceVertex.getEdges(
Direction.OUT, relationshipLabel).iterator();
Edge edge = results.hasNext() ? results.next() : null;
if (edge == null) {
return null;
for (Edge edge : instanceVertex.getEdges(Direction.OUT, relationshipLabel)) {
if (edgeId.equals(String.valueOf(edge.getId()))) {
Vertex structInstanceVertex = edge.getVertex(Direction.IN);
LOG.debug("mapping vertex {} to struct {}", structInstanceVertex,
attributeName);
if (structInstanceVertex != null) {
LOG.debug("Found struct instance vertex {}, mapping to instance {} ",
structInstanceVertex, elemType.getName());
StructType structType = typeSystem
.getDataType(StructType.class, elemType.getName());
ITypedStruct structInstance = structType.createInstance();
mapVertexToInstance(structInstanceVertex, structInstance,
structType.fieldMapping().fields);
return structInstance;
}
break;
}
}
Vertex structInstanceVertex = edge.getVertex(Direction.IN);
LOG.debug("mapping vertex {} to struct {}", structInstanceVertex, attributeName);
return null;
}
if (structInstanceVertex != null) {
LOG.debug("Found struct instance vertex {}, mapping to instance {} ",
structInstanceVertex, elemType.getName());
StructType structType = typeSystem
.getDataType(StructType.class, elemType.getName());
ITypedStruct structInstance = structType.createInstance();
public Object mapClassReferenceToVertex(Vertex instanceVertex,
AttributeInfo attributeInfo,
String relationshipLabel,
IDataType dataType,
String edgeId) throws MetadataException {
LOG.debug("Finding edge for {} -> label {} ", instanceVertex, relationshipLabel);
for (Edge edge : instanceVertex.getEdges(Direction.OUT, relationshipLabel)) {
if (edgeId.equals(String.valueOf(edge.getId()))) {
final Vertex referenceVertex = edge.getVertex(Direction.IN);
if (referenceVertex != null) {
final String guid = referenceVertex
.getProperty(Constants.GUID_PROPERTY_KEY);
LOG.debug("Found vertex {} for label {} with guid {}",
referenceVertex, relationshipLabel, guid);
if (attributeInfo.isComposite) {
LOG.debug("Found composite, mapping vertex to instance");
return mapGraphToTypedInstance(guid, referenceVertex);
} else {
Id referenceId = new Id(guid,
referenceVertex
.<Integer>getProperty(Constants.VERSION_PROPERTY_KEY),
dataType.getName());
LOG.debug("Found non-composite, adding id {} ", referenceId);
return referenceId;
}
}
mapVertexToInstance(structInstanceVertex, structInstance,
structType.fieldMapping().fields);
return structInstance;
break;
}
}
return null;
......@@ -1082,7 +1129,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
private void mapVertexToStructInstance(Vertex instanceVertex,
ITypedInstance typedInstance,
AttributeInfo attributeInfo)
throws MetadataException {
throws MetadataException {
LOG.debug("mapping vertex {} to struct {}", instanceVertex, attributeInfo.name);
StructType structType = typeSystem.getDataType(
StructType.class, attributeInfo.dataType().getName());
......
......@@ -45,6 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
......@@ -54,6 +55,7 @@ import java.util.Set;
* Simple wrapper over TypeSystem and MetadataRepository services with hooks
* for listening to changes to the repository.
*/
@Singleton
public class DefaultMetadataService implements MetadataService {
private static final Logger LOG =
......
......@@ -67,6 +67,11 @@
<appender-ref ref="FILE"/>
</logger>
<logger name="com.google" additivity="false">
<level value="info"/>
<appender-ref ref="FILE"/>
</logger>
<logger name="AUDIT">
<level value="info"/>
<appender-ref ref="AUDIT"/>
......
......@@ -84,8 +84,13 @@
<appender-ref ref="FILE"/>
</logger>
<logger name="com.google" additivity="false">
<level value="info"/>
<appender-ref ref="FILE"/>
</logger>
<root>
<priority value="debug"/>
<priority value="info"/>
<appender-ref ref="FILE"/>
</root>
......
......@@ -243,8 +243,7 @@
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${basedir}/conf
</outputDirectory>
<outputDirectory>${basedir}/conf</outputDirectory>
<resources>
<resource>
<directory>${project.build.directory}/conf</directory>
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.web.resources;
import com.google.common.base.Preconditions;
import org.apache.hadoop.metadata.MetadataServiceClient;
import org.apache.hadoop.metadata.discovery.DiscoveryException;
import org.apache.hadoop.metadata.discovery.LineageService;
import org.apache.hadoop.metadata.web.util.Servlets;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
/**
* Jersey Resource for Hive Table Lineage.
*/
@Path("lineage/hive")
@Singleton
public class HiveLineageResource {
private static final Logger LOG = LoggerFactory.getLogger(HiveLineageResource.class);
private final LineageService lineageService;
/**
* Created by the Guice ServletModule and injected with the
* configured LineageService.
*
* @param lineageService lineage service handle
*/
@Inject
public HiveLineageResource(LineageService lineageService) {
this.lineageService = lineageService;
}
/**
* Returns the inputs for a given entity.
*
* @param tableName table name
*/
@GET
@Path("inputs/{tableName}")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response inputs(@Context HttpServletRequest request,
@PathParam("tableName") String tableName) {
Preconditions.checkNotNull(tableName, "table name cannot be null");
LOG.info("Fetching lineage inputs for tableName={}", tableName);
try {
final String jsonResult = lineageService.getInputs(tableName);
JSONObject response = new JSONObject();
response.put(MetadataServiceClient.REQUEST_ID, Servlets.getRequestId());
response.put("tableName", tableName);
response.put(MetadataServiceClient.RESULTS, new JSONObject(jsonResult));
return Response.ok(response).build();
} catch (DiscoveryException e) {
LOG.error("Unable to get lineage inputs for table {}", tableName, e);
throw new WebApplicationException(
Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (JSONException e) {
LOG.error("Unable to get lineage inputs for table {}", tableName, e);
throw new WebApplicationException(
Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
}
}
/**
* Returns the outputs for a given entity.
*
* @param tableName table name
*/
@GET
@Path("outputs/{tableName}")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response outputs(@Context HttpServletRequest request,
@PathParam("tableName") String tableName) {
Preconditions.checkNotNull(tableName, "table name cannot be null");
LOG.info("Fetching lineage outputs for tableName={}", tableName);
try {
final String jsonResult = lineageService.getOutputs(tableName);
JSONObject response = new JSONObject();
response.put(MetadataServiceClient.REQUEST_ID, Servlets.getRequestId());
response.put("tableName", tableName);
response.put(MetadataServiceClient.RESULTS, new JSONObject(jsonResult));
return Response.ok(response).build();
} catch (DiscoveryException e) {
LOG.error("Unable to get lineage inputs for table {}", tableName, e);
throw new WebApplicationException(
Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (JSONException e) {
LOG.error("Unable to get lineage inputs for table {}", tableName, e);
throw new WebApplicationException(
Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
}
}
}
......@@ -43,12 +43,6 @@ import java.util.Map;
/**
* Jersey Resource for metadata operations.
*
* The entry point for all operations against various aspects of the entities graph.
*
* For instance,
* lineage: given an entity, X, get me the lineage - all entities X is derived from (recursively)
* 'search': find entities generated by Hive processes or that were generated by Sqoop, etc.
*/
@Path("discovery")
@Singleton
......
......@@ -64,7 +64,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
private static final String DATABASE_TYPE = "hive_database";
private static final String DATABASE_NAME = "foo";
private static final String TABLE_TYPE = "hive_table";
private static final String TABLE_TYPE = "hive_table_type";
private static final String TABLE_NAME = "bar";
private Referenceable tableInstance;
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.web.resources;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import org.apache.hadoop.metadata.MetadataServiceClient;
import org.apache.hadoop.metadata.typesystem.Referenceable;
import org.apache.hadoop.metadata.typesystem.TypesDef;
import org.apache.hadoop.metadata.typesystem.persistence.Id;
import org.apache.hadoop.metadata.typesystem.types.AttributeDefinition;
import org.apache.hadoop.metadata.typesystem.types.ClassType;
import org.apache.hadoop.metadata.typesystem.types.DataTypes;
import org.apache.hadoop.metadata.typesystem.types.EnumTypeDefinition;
import org.apache.hadoop.metadata.typesystem.types.HierarchicalTypeDefinition;
import org.apache.hadoop.metadata.typesystem.types.IDataType;
import org.apache.hadoop.metadata.typesystem.types.Multiplicity;
import org.apache.hadoop.metadata.typesystem.types.StructTypeDefinition;
import org.apache.hadoop.metadata.typesystem.types.TraitType;
import org.apache.hadoop.metadata.typesystem.types.TypeUtils;
import org.apache.hadoop.metadata.typesystem.types.utils.TypesUtil;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.List;
/**
* Hive Lineage Integration Tests.
*/
public class HiveLineageJerseyResourceIT extends BaseResourceIT {
@BeforeClass
public void setUp() throws Exception {
super.setUp();
setUpTypes();
setupInstances();
}
@Test
public void testInputs() throws Exception {
WebResource resource = service
.path("api/metadata/lineage/hive/inputs")
.path("sales_fact_monthly_mv");
ClientResponse clientResponse = resource
.accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)
.method(HttpMethod.GET, ClientResponse.class);
Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
String responseAsString = clientResponse.getEntity(String.class);
Assert.assertNotNull(responseAsString);
System.out.println("inputs = " + responseAsString);
JSONObject response = new JSONObject(responseAsString);
Assert.assertNotNull(response.get(MetadataServiceClient.REQUEST_ID));
JSONObject results = response.getJSONObject(MetadataServiceClient.RESULTS);
Assert.assertNotNull(results);
JSONArray rows = results.getJSONArray("rows");
Assert.assertTrue(rows.length() > 0);
}
@Test
public void testOutputs() throws Exception {
WebResource resource = service
.path("api/metadata/lineage/hive/outputs")
.path("sales_fact");
ClientResponse clientResponse = resource
.accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)
.method(HttpMethod.GET, ClientResponse.class);
Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
String responseAsString = clientResponse.getEntity(String.class);
Assert.assertNotNull(responseAsString);
System.out.println("outputs = " + responseAsString);
JSONObject response = new JSONObject(responseAsString);
Assert.assertNotNull(response.get(MetadataServiceClient.REQUEST_ID));
JSONObject results = response.getJSONObject(MetadataServiceClient.RESULTS);
Assert.assertNotNull(results);
JSONArray rows = results.getJSONArray("rows");
Assert.assertTrue(rows.length() > 0);
}
private void setUpTypes() throws Exception {
TypesDef typesDef = createTypeDefinitions();
createType(typesDef);
}
private static final String HIVE_TABLE_TYPE = "hive_table";
private static final String HIVE_PROCESS_TYPE = "hive_process";
private TypesDef createTypeDefinitions() {
HierarchicalTypeDefinition<ClassType> tblClsDef =
TypesUtil.createClassTypeDef(HIVE_TABLE_TYPE, null,
attrDef("name", DataTypes.STRING_TYPE),
attrDef("description", DataTypes.STRING_TYPE),
attrDef("owner", DataTypes.STRING_TYPE),
attrDef("createTime", DataTypes.INT_TYPE),
attrDef("lastAccessTime", DataTypes.INT_TYPE),
attrDef("tableType", DataTypes.STRING_TYPE),
attrDef("temporary", DataTypes.BOOLEAN_TYPE)
);
HierarchicalTypeDefinition<ClassType> loadProcessClsDef =
TypesUtil.createClassTypeDef(HIVE_PROCESS_TYPE, null,
attrDef("name", DataTypes.STRING_TYPE),
attrDef("userName", DataTypes.STRING_TYPE),
attrDef("startTime", DataTypes.INT_TYPE),
attrDef("endTime", DataTypes.INT_TYPE),
new AttributeDefinition("inputTables",
DataTypes.arrayTypeName(HIVE_TABLE_TYPE),
Multiplicity.COLLECTION, false, null),
new AttributeDefinition("outputTables",
DataTypes.arrayTypeName(HIVE_TABLE_TYPE),
Multiplicity.COLLECTION, false, null),
attrDef("queryText", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryPlan", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryId", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryGraph", DataTypes.STRING_TYPE, Multiplicity.REQUIRED)
);
HierarchicalTypeDefinition<TraitType> dimTraitDef =
TypesUtil.createTraitTypeDef("Dimension", null);
HierarchicalTypeDefinition<TraitType> factTraitDef =
TypesUtil.createTraitTypeDef("Fact", null);
HierarchicalTypeDefinition<TraitType> metricTraitDef =
TypesUtil.createTraitTypeDef("Metric", null);
HierarchicalTypeDefinition<TraitType> etlTraitDef =
TypesUtil.createTraitTypeDef("ETL", null);
return TypeUtils.getTypesDef(
ImmutableList.<EnumTypeDefinition>of(),
ImmutableList.<StructTypeDefinition>of(),
ImmutableList.of(dimTraitDef, factTraitDef, metricTraitDef, etlTraitDef),
ImmutableList.of(tblClsDef, loadProcessClsDef)
);
}
AttributeDefinition attrDef(String name, IDataType dT) {
return attrDef(name, dT, Multiplicity.OPTIONAL, false, null);
}
AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m) {
return attrDef(name, dT, m, false, null);
}
AttributeDefinition attrDef(String name, IDataType dT,
Multiplicity m, boolean isComposite, String reverseAttributeName) {
Preconditions.checkNotNull(name);
Preconditions.checkNotNull(dT);
return new AttributeDefinition(name, dT.getName(), m, isComposite, reverseAttributeName);
}
private void setupInstances() throws Exception {
Referenceable salesFact = table("sales_fact", "sales fact table",
"Joe", "Managed", "Fact");
Referenceable timeDim = table("time_dim", "time dimension table",
"John Doe", "External", "Dimension");
Referenceable salesFactDaily = table("sales_fact_daily_mv",
"sales fact daily materialized view",
"Joe BI", "Managed", "Metric");
Referenceable loadSalesFactDaily = loadProcess("loadSalesDaily", "John ETL",
ImmutableList.of(salesFact, timeDim), ImmutableList.of(salesFactDaily),
"create table as select ", "plan", "id", "graph",
"ETL");
System.out.println("added loadSalesFactDaily = " + loadSalesFactDaily);
Referenceable salesFactMonthly = table("sales_fact_monthly_mv",
"sales fact monthly materialized view",
"Jane BI", "Managed", "Metric");
Referenceable loadSalesFactMonthly = loadProcess("loadSalesMonthly", "John ETL",
ImmutableList.of(salesFactDaily), ImmutableList.of(salesFactMonthly),
"create table as select ", "plan", "id", "graph",
"ETL");
System.out.println("added loadSalesFactMonthly = " + loadSalesFactMonthly);
}
Referenceable table(String name, String description,
String owner, String tableType,
String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("description", description);
referenceable.set("owner", owner);
referenceable.set("tableType", tableType);
referenceable.set("createTime", System.currentTimeMillis());
referenceable.set("lastAccessTime", System.currentTimeMillis());
referenceable.set("retention", System.currentTimeMillis());
return createInstance(referenceable);
}
Referenceable loadProcess(String name, String user,
List<Referenceable> inputTables,
List<Referenceable> outputTables,
String queryText, String queryPlan,
String queryId, String queryGraph,
String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("user", user);
referenceable.set("startTime", System.currentTimeMillis());
referenceable.set("endTime", System.currentTimeMillis() + 10000);
ArrayList<Id> inputTableIds = new ArrayList<>();
for (Referenceable inputTable : inputTables) {
inputTableIds.add(inputTable.getId());
}
referenceable.set("inputTables", inputTableIds);
ArrayList<Id> outputTableIds = new ArrayList<>();
for (Referenceable outputTable : outputTables) {
outputTableIds.add(outputTable.getId());
}
referenceable.set("outputTables", outputTableIds);
referenceable.set("queryText", queryText);
referenceable.set("queryPlan", queryPlan);
referenceable.set("queryId", queryId);
referenceable.set("queryGraph", queryGraph);
return createInstance(referenceable);
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment