Commit 80674f0a by Suma Shivaprasad

Merged with master

parents 018bbd62 25a68075
......@@ -35,15 +35,12 @@
<properties>
<hive.version>1.2.0</hive.version>
<calcite.version>0.9.2-incubating</calcite.version>
<hadoop.version>2.6.0</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<!-- Logging -->
......@@ -71,7 +68,6 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-cli</artifactId>
......@@ -101,13 +97,11 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
......@@ -118,7 +112,6 @@
<dependency>
<groupId>org.apache.hadoop.metadata</groupId>
<artifactId>metadata-webapp</artifactId>
<version>${project.version}</version>
<classifier>classes</classifier>
</dependency>
......@@ -127,7 +120,6 @@
<artifactId>jetty</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
......@@ -135,7 +127,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<executions>
<execution>
<id>copy-bridge-dependencies</id>
......@@ -311,6 +302,7 @@
<skip>false</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
......
......@@ -77,7 +77,11 @@ public class HiveHookIT {
public void testCreateDatabase() throws Exception {
String dbName = "db" + random();
runCommand("create database " + dbName);
assertDatabaseIsRegistered(dbName);
//There should be just one entity per dbname
runCommand("drop database " + dbName);
runCommand("create database " + dbName);
assertDatabaseIsRegistered(dbName);
}
......
......@@ -39,9 +39,17 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
......@@ -65,18 +73,6 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
......@@ -87,7 +83,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.2</version>
<executions>
<execution>
<goals>
......@@ -96,6 +91,7 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
......
......@@ -255,6 +255,39 @@
</exclusions>
</dependency>
<!-- hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<!-- commons -->
<dependency>
<groupId>commons-configuration</groupId>
......@@ -274,6 +307,12 @@
<version>1.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<!-- utilities -->
<dependency>
<groupId>com.google.inject</groupId>
......@@ -288,6 +327,18 @@
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
<version>3.0</version>
</dependency>
<dependency>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-guice</artifactId>
<version>1.18.3</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.5</version>
......@@ -453,10 +504,25 @@
<dependency>
<groupId>org.apache.hadoop.metadata</groupId>
<artifactId>metadata-webapp</artifactId>
<version>${project.version}</version>
<classifier>classes</classifier>
</dependency>
<dependency>
<groupId>org.apache.hadoop.metadata</groupId>
<artifactId>metadata-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop.metadata</groupId>
<artifactId>metadata-client</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!--Scala dependencies-->
<dependency>
<groupId>org.scala-lang</groupId>
......@@ -585,6 +651,13 @@
<version>${fastutil.version}</version>
</dependency>
<!-- supports simple auth handler -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.2.5</version>
</dependency>
<!--Test dependencies-->
<dependency>
<groupId>org.testng</groupId>
......@@ -695,7 +768,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-war-plugin</artifactId>
<version>2.1.1</version>
<version>2.4</version>
</plugin>
<plugin>
......@@ -731,7 +804,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
<version>2.10</version>
</plugin>
<plugin>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD)
public @interface GraphTransaction {}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata;
import com.google.inject.Inject;
import com.thinkaurelius.titan.core.TitanGraph;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.hadoop.metadata.repository.graph.GraphProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class GraphTransactionInterceptor implements MethodInterceptor {
private static final Logger LOG = LoggerFactory.getLogger(GraphTransactionInterceptor.class);
private TitanGraph titanGraph;
@Inject
GraphProvider<TitanGraph> graphProvider;
public Object invoke(MethodInvocation invocation) throws Throwable {
if (titanGraph == null) {
titanGraph = graphProvider.get();
}
try {
LOG.debug("graph rollback to cleanup previous state");
titanGraph.rollback(); //cleanup previous state
Object response = invocation.proceed();
titanGraph.commit();
LOG.debug("graph commit");
return response;
} catch (Throwable t){
titanGraph.rollback();
LOG.debug("graph rollback");
throw t;
}
}
}
......@@ -19,8 +19,10 @@
package org.apache.hadoop.metadata;
import com.google.inject.Scopes;
import com.google.inject.matcher.Matchers;
import com.google.inject.throwingproviders.ThrowingProviderBinder;
import com.thinkaurelius.titan.core.TitanGraph;
import org.aopalliance.intercept.MethodInterceptor;
import org.apache.hadoop.metadata.discovery.DiscoveryService;
import org.apache.hadoop.metadata.discovery.HiveLineageService;
import org.apache.hadoop.metadata.discovery.LineageService;
......@@ -40,31 +42,7 @@ import org.apache.hadoop.metadata.services.MetadataService;
* Guice module for Repository module.
*/
public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
// Graph Service implementation class
// private Class<? extends GraphService> graphServiceClass;
// MetadataRepositoryService implementation class
private Class<? extends MetadataRepository> metadataRepoClass;
private Class<? extends ITypeStore> typeStore;
private Class<? extends MetadataService> metadataService;
private Class<? extends DiscoveryService> discoveryService;
private Class<? extends SearchIndexer> searchIndexer;
private Class<? extends LineageService> lineageService;
public RepositoryMetadataModule() {
// GraphServiceConfigurator gsp = new GraphServiceConfigurator();
// get the impl classes for the repo and the graph service
// this.graphServiceClass = gsp.getImplClass();
this.metadataRepoClass = GraphBackedMetadataRepository.class;
this.typeStore = GraphBackedTypeStore.class;
this.metadataService = DefaultMetadataService.class;
this.discoveryService = GraphBackedDiscoveryService.class;
this.searchIndexer = GraphBackedSearchIndexer.class;
this.lineageService = HiveLineageService.class;
}
@Override
protected void configure() {
// special wiring for Titan Graph
ThrowingProviderBinder.create(binder())
......@@ -75,22 +53,26 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
// allow for dynamic binding of the metadata repo & graph service
// bind the MetadataRepositoryService interface to an implementation
bind(MetadataRepository.class).to(metadataRepoClass);
bind(MetadataRepository.class).to(GraphBackedMetadataRepository.class);
// bind the ITypeStore interface to an implementation
bind(ITypeStore.class).to(typeStore);
bind(ITypeStore.class).to(GraphBackedTypeStore.class);
// bind the GraphService interface to an implementation
// bind(GraphService.class).to(graphServiceClass);
// bind the MetadataService interface to an implementation
bind(MetadataService.class).to(metadataService);
bind(MetadataService.class).to(DefaultMetadataService.class);
// bind the DiscoveryService interface to an implementation
bind(DiscoveryService.class).to(discoveryService);
bind(DiscoveryService.class).to(GraphBackedDiscoveryService.class);
bind(SearchIndexer.class).to(GraphBackedSearchIndexer.class);
bind(SearchIndexer.class).to(searchIndexer);
bind(LineageService.class).to(HiveLineageService.class);
bind(LineageService.class).to(lineageService);
MethodInterceptor interceptor = new GraphTransactionInterceptor();
requestInjection(interceptor);
bindInterceptor(Matchers.any(), Matchers.annotatedWith(GraphTransaction.class), interceptor);
}
}
......@@ -20,6 +20,7 @@ package org.apache.hadoop.metadata.discovery;
import com.thinkaurelius.titan.core.TitanGraph;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.metadata.GraphTransaction;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.PropertiesUtil;
import org.apache.hadoop.metadata.discovery.graph.DefaultGraphPersistenceStrategy;
......@@ -96,6 +97,7 @@ public class HiveLineageService implements LineageService {
* @return Lineage Outputs as JSON
*/
@Override
@GraphTransaction
public String getOutputs(String tableName) throws DiscoveryException {
LOG.info("Fetching lineage outputs for tableName={}", tableName);
......@@ -121,6 +123,7 @@ public class HiveLineageService implements LineageService {
* @return Outputs Graph as JSON
*/
@Override
@GraphTransaction
public String getOutputsGraph(String tableName) throws DiscoveryException {
LOG.info("Fetching lineage outputs graph for tableName={}", tableName);
......@@ -139,6 +142,7 @@ public class HiveLineageService implements LineageService {
* @return Lineage Inputs as JSON
*/
@Override
@GraphTransaction
public String getInputs(String tableName) throws DiscoveryException {
LOG.info("Fetching lineage inputs for tableName={}", tableName);
......@@ -164,6 +168,7 @@ public class HiveLineageService implements LineageService {
* @return Inputs Graph as JSON
*/
@Override
@GraphTransaction
public String getInputsGraph(String tableName) throws DiscoveryException {
LOG.info("Fetching lineage inputs graph for tableName={}", tableName);
......@@ -182,6 +187,7 @@ public class HiveLineageService implements LineageService {
* @return Schema as JSON
*/
@Override
@GraphTransaction
public String getSchema(String tableName) throws DiscoveryException {
// todo - validate if indeed this is a table type and exists
String schemaQuery = HIVE_TABLE_TYPE_NAME
......
......@@ -22,6 +22,7 @@ import com.thinkaurelius.titan.core.TitanVertex;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.query.Expressions;
import org.apache.hadoop.metadata.query.GraphPersistenceStrategies;
import org.apache.hadoop.metadata.query.GraphPersistenceStrategies$class;
import org.apache.hadoop.metadata.query.TypeUtils;
import org.apache.hadoop.metadata.repository.MetadataRepository;
import org.apache.hadoop.metadata.repository.Constants;
......@@ -168,33 +169,12 @@ public class DefaultGraphPersistenceStrategy implements GraphPersistenceStrategi
@Override
public String gremlinCompOp(Expressions.ComparisonExpression op) {
switch (op.symbol()) {
case "=":
return "T.eq";
case "!=":
return "T.neq";
case ">":
return "T.gt";
case ">=":
return "T.gte";
case "<":
return "T.lt";
case "<=":
return "T.lte";
default:
throw new RuntimeException(("Comparison operator not supported in Gremlin: " + op));
}
return GraphPersistenceStrategies$class.gremlinCompOp(this, op);
}
@Override
public String loopObjectExpression(IDataType<?> dataType) {
return "{it.object." + typeAttributeName() + " == '" + dataType.getName() + "'}";
return GraphPersistenceStrategies$class.loopObjectExpression(this, dataType);
}
@Override
......@@ -206,4 +186,9 @@ public class DefaultGraphPersistenceStrategy implements GraphPersistenceStrategi
@Override
public String idAttributeName() { return metadataRepository.getIdAttributeName(); }
@Override
public String typeTestExpression(String typeName) {
return GraphPersistenceStrategies$class.typeTestExpression(this, typeName);
}
}
......@@ -25,6 +25,7 @@ import com.thinkaurelius.titan.core.TitanVertex;
import com.tinkerpop.blueprints.Vertex;
import com.tinkerpop.gremlin.groovy.Gremlin;
import com.tinkerpop.gremlin.java.GremlinPipeline;
import org.apache.hadoop.metadata.GraphTransaction;
import org.apache.hadoop.metadata.MetadataServiceClient;
import org.apache.hadoop.metadata.discovery.DiscoveryException;
import org.apache.hadoop.metadata.discovery.DiscoveryService;
......@@ -82,6 +83,7 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
//http://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query
// .html#query-string-syntax for query syntax
@Override
@GraphTransaction
public String searchByFullText(String query) throws DiscoveryException {
String graphQuery = String.format("v.%s:(%s)", Constants.ENTITY_TEXT_PROPERTY_KEY, query);
LOG.debug("Full text query: {}", graphQuery);
......@@ -118,6 +120,7 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
* @return JSON representing the type and results.
*/
@Override
@GraphTransaction
public String searchByDSL(String dslQuery) throws DiscoveryException {
LOG.info("Executing dsl query={}", dslQuery);
try {
......@@ -155,6 +158,7 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
* @throws org.apache.hadoop.metadata.discovery.DiscoveryException
*/
@Override
@GraphTransaction
public List<Map<String, String>> searchByGremlin(String gremlinQuery)
throws DiscoveryException {
LOG.info("Executing gremlin query={}", gremlinQuery);
......
......@@ -27,6 +27,7 @@ import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.GraphQuery;
import com.tinkerpop.blueprints.Vertex;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.metadata.GraphTransaction;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.repository.Constants;
import org.apache.hadoop.metadata.repository.MetadataRepository;
......@@ -137,27 +138,23 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
}
@Override
@GraphTransaction
public String createEntity(IReferenceableInstance typedInstance) throws RepositoryException {
LOG.info("adding entity={}", typedInstance);
try {
titanGraph.rollback();
final String guid = instanceToGraphMapper.mapTypedInstanceToGraph(typedInstance);
titanGraph.commit(); // commit if there are no errors
return guid;
} catch (MetadataException e) {
titanGraph.rollback();
throw new RepositoryException(e);
}
}
@Override
@GraphTransaction
public ITypedReferenceableInstance getEntityDefinition(String guid) throws RepositoryException {
LOG.info("Retrieving entity with guid={}", guid);
try {
titanGraph.rollback(); // clean up before starting a query
Vertex instanceVertex = getVertexForGUID(guid);
LOG.debug("Found a vertex {} for guid {}", instanceVertex, guid);
......@@ -206,9 +203,9 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
* @throws RepositoryException
*/
@Override
@GraphTransaction
public List<String> getTraitNames(String guid) throws RepositoryException {
LOG.info("Retrieving trait names for entity={}", guid);
titanGraph.rollback(); // clean up before starting a query
Vertex instanceVertex = getVertexForGUID(guid);
return getTraitNames(instanceVertex);
}
......@@ -231,6 +228,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
* @throws RepositoryException
*/
@Override
@GraphTransaction
public void addTrait(String guid,
ITypedStruct traitInstance) throws RepositoryException {
Preconditions.checkNotNull(traitInstance, "Trait instance cannot be null");
......@@ -238,22 +236,18 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
LOG.info("Adding a new trait={} for entity={}", traitName, guid);
try {
titanGraph.rollback(); // clean up before starting a query
Vertex instanceVertex = getVertexForGUID(guid);
// add the trait instance as a new vertex
final String typeName = getTypeName(instanceVertex);
instanceToGraphMapper.mapTraitInstanceToVertex(traitInstance,
getIdFromVertex(typeName, instanceVertex),
instanceToGraphMapper.mapTraitInstanceToVertex(traitInstance, getIdFromVertex(typeName, instanceVertex),
typeName, instanceVertex, Collections.<Id, Vertex>emptyMap());
// update the traits in entity once adding trait instance is successful
((TitanVertex) instanceVertex)
.addProperty(Constants.TRAIT_NAMES_PROPERTY_KEY, traitName);
titanGraph.commit(); // commit if there are no errors
} catch (MetadataException e) {
titanGraph.rollback();
throw new RepositoryException(e);
}
}
......@@ -266,11 +260,11 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
* @throws RepositoryException
*/
@Override
@GraphTransaction
public void deleteTrait(String guid, String traitNameToBeDeleted)
throws RepositoryException {
LOG.info("Deleting trait={} from entity={}", traitNameToBeDeleted, guid);
try {
titanGraph.rollback(); // clean up before starting a query
Vertex instanceVertex = getVertexForGUID(guid);
List<String> traitNames = getTraitNames(instanceVertex);
......@@ -297,11 +291,8 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
traitNames.remove(traitNameToBeDeleted);
updateTraits(instanceVertex, traitNames);
}
titanGraph.commit(); // commit if there are no errors
}
} catch (Exception e) {
titanGraph.rollback();
throw new RepositoryException(e);
}
}
......@@ -318,11 +309,11 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
}
@Override
@GraphTransaction
public void updateEntity(String guid, String property, String value) throws RepositoryException {
LOG.info("Adding property {} for entity guid {}", property, guid);
try {
titanGraph.rollback(); // clean up before starting a query
Vertex instanceVertex = GraphHelper.findVertexByGUID(titanGraph, guid);
if (instanceVertex == null) {
throw new RepositoryException("Could not find a vertex for guid " + guid);
......@@ -349,11 +340,8 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
instanceToGraphMapper.mapAttributesToVertex(getIdFromVertex(typeName, instanceVertex), instance,
instanceVertex, new HashMap<Id, Vertex>(), attributeInfo, attributeInfo.dataType());
titanGraph.commit();
} catch (Exception e) {
throw new RepositoryException(e);
} finally {
titanGraph.rollback();
}
}
......
......@@ -18,6 +18,7 @@
package org.apache.hadoop.metadata.repository.graph;
import com.google.inject.Provides;
import com.thinkaurelius.titan.core.TitanFactory;
import com.thinkaurelius.titan.core.TitanGraph;
import org.apache.commons.configuration.Configuration;
......@@ -64,6 +65,7 @@ public class TitanGraphProvider implements GraphProvider<TitanGraph> {
@Override
@Singleton
@Provides
public TitanGraph get() {
Configuration config;
try {
......
......@@ -25,6 +25,7 @@ import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Vertex;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.metadata.GraphTransaction;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.repository.Constants;
import org.apache.hadoop.metadata.repository.graph.GraphProvider;
......@@ -76,40 +77,35 @@ public class GraphBackedTypeStore implements ITypeStore {
}
@Override
@GraphTransaction
public void store(TypeSystem typeSystem, ImmutableList<String> typeNames) throws MetadataException {
try {
ImmutableList<String> coreTypes = typeSystem.getCoreTypes();
titanGraph.rollback(); //Cleanup previous state
for (String typeName : typeNames) {
if (!coreTypes.contains(typeName)) {
IDataType dataType = typeSystem.getDataType(IDataType.class, typeName);
LOG.debug("Processing {}.{} in type store", dataType.getTypeCategory(), dataType.getName());
switch (dataType.getTypeCategory()) {
case ENUM:
storeInGraph((EnumType)dataType);
break;
case STRUCT:
StructType structType = (StructType) dataType;
storeInGraph(typeSystem, dataType.getTypeCategory(), dataType.getName(),
ImmutableList.copyOf(structType.infoToNameMap.keySet()), ImmutableList.<String>of());
break;
case TRAIT:
case CLASS:
HierarchicalType type = (HierarchicalType) dataType;
storeInGraph(typeSystem, dataType.getTypeCategory(), dataType.getName(),
type.immediateAttrs, type.superTypes);
break;
default: //Ignore primitive/collection types as they are covered under references
break;
}
ImmutableList<String> coreTypes = typeSystem.getCoreTypes();
for (String typeName : typeNames) {
if (!coreTypes.contains(typeName)) {
IDataType dataType = typeSystem.getDataType(IDataType.class, typeName);
LOG.debug("Processing {}.{} in type store", dataType.getTypeCategory(), dataType.getName());
switch (dataType.getTypeCategory()) {
case ENUM:
storeInGraph((EnumType)dataType);
break;
case STRUCT:
StructType structType = (StructType) dataType;
storeInGraph(typeSystem, dataType.getTypeCategory(), dataType.getName(),
ImmutableList.copyOf(structType.infoToNameMap.keySet()), ImmutableList.<String>of());
break;
case TRAIT:
case CLASS:
HierarchicalType type = (HierarchicalType) dataType;
storeInGraph(typeSystem, dataType.getTypeCategory(), dataType.getName(),
type.immediateAttrs, type.superTypes);
break;
default: //Ignore primitive/collection types as they are covered under references
break;
}
}
titanGraph.commit();
} finally {
titanGraph.rollback();
}
}
......@@ -206,54 +202,49 @@ public class GraphBackedTypeStore implements ITypeStore {
}
@Override
@GraphTransaction
public TypesDef restore() throws MetadataException {
try {
titanGraph.rollback(); //Cleanup previous state
//Get all vertices for type system
Iterator vertices =
titanGraph.query().has(Constants.VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE).vertices().iterator();
ImmutableList.Builder<EnumTypeDefinition> enums = ImmutableList.builder();
ImmutableList.Builder<StructTypeDefinition> structs = ImmutableList.builder();
ImmutableList.Builder<HierarchicalTypeDefinition<ClassType>> classTypes = ImmutableList.builder();
ImmutableList.Builder<HierarchicalTypeDefinition<TraitType>> traits = ImmutableList.builder();
while (vertices.hasNext()) {
Vertex vertex = (Vertex) vertices.next();
DataTypes.TypeCategory typeCategory = vertex.getProperty(Constants.TYPE_CATEGORY_PROPERTY_KEY);
String typeName = vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY);
LOG.info("Restoring type {}.{}", typeCategory, typeName);
switch (typeCategory) {
case ENUM:
enums.add(getEnumType(vertex));
break;
case STRUCT:
AttributeDefinition[] attributes = getAttributes(vertex);
structs.add(new StructTypeDefinition(typeName, attributes));
break;
case CLASS:
ImmutableList<String> superTypes = getSuperTypes(vertex);
attributes = getAttributes(vertex);
classTypes.add(new HierarchicalTypeDefinition(ClassType.class, typeName, superTypes, attributes));
break;
case TRAIT:
superTypes = getSuperTypes(vertex);
attributes = getAttributes(vertex);
traits.add(new HierarchicalTypeDefinition(TraitType.class, typeName, superTypes, attributes));
break;
default:
throw new IllegalArgumentException("Unhandled type category " + typeCategory);
}
//Get all vertices for type system
Iterator vertices =
titanGraph.query().has(Constants.VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE).vertices().iterator();
ImmutableList.Builder<EnumTypeDefinition> enums = ImmutableList.builder();
ImmutableList.Builder<StructTypeDefinition> structs = ImmutableList.builder();
ImmutableList.Builder<HierarchicalTypeDefinition<ClassType>> classTypes = ImmutableList.builder();
ImmutableList.Builder<HierarchicalTypeDefinition<TraitType>> traits = ImmutableList.builder();
while (vertices.hasNext()) {
Vertex vertex = (Vertex) vertices.next();
DataTypes.TypeCategory typeCategory = vertex.getProperty(Constants.TYPE_CATEGORY_PROPERTY_KEY);
String typeName = vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY);
LOG.info("Restoring type {}.{}", typeCategory, typeName);
switch (typeCategory) {
case ENUM:
enums.add(getEnumType(vertex));
break;
case STRUCT:
AttributeDefinition[] attributes = getAttributes(vertex);
structs.add(new StructTypeDefinition(typeName, attributes));
break;
case CLASS:
ImmutableList<String> superTypes = getSuperTypes(vertex);
attributes = getAttributes(vertex);
classTypes.add(new HierarchicalTypeDefinition(ClassType.class, typeName, superTypes, attributes));
break;
case TRAIT:
superTypes = getSuperTypes(vertex);
attributes = getAttributes(vertex);
traits.add(new HierarchicalTypeDefinition(TraitType.class, typeName, superTypes, attributes));
break;
default:
throw new IllegalArgumentException("Unhandled type category " + typeCategory);
}
titanGraph.commit();
return TypeUtils.getTypesDef(enums.build(), structs.build(), traits.build(), classTypes.build());
} finally {
titanGraph.rollback();
}
return TypeUtils.getTypesDef(enums.build(), structs.build(), traits.build(), classTypes.build());
}
private EnumTypeDefinition getEnumType(Vertex vertex) {
......
......@@ -26,7 +26,7 @@ import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.MetadataServiceClient;
import org.apache.hadoop.metadata.ParamChecker;
import org.apache.hadoop.metadata.RepositoryMetadataModule;
import org.apache.hadoop.metadata.classification.InterfaceAudience;
import org.apache.hadoop.metadata.discovery.SearchIndexer;
import org.apache.hadoop.metadata.listener.EntityChangeListener;
import org.apache.hadoop.metadata.listener.TypesChangeListener;
......@@ -41,14 +41,20 @@ import org.apache.hadoop.metadata.typesystem.Referenceable;
import org.apache.hadoop.metadata.typesystem.Struct;
import org.apache.hadoop.metadata.typesystem.TypesDef;
import org.apache.hadoop.metadata.typesystem.json.InstanceSerialization;
import org.apache.hadoop.metadata.typesystem.json.Serialization$;
import org.apache.hadoop.metadata.typesystem.json.TypesSerialization;
import org.apache.hadoop.metadata.typesystem.types.*;
import org.apache.hadoop.metadata.typesystem.types.AttributeDefinition;
import org.apache.hadoop.metadata.typesystem.types.ClassType;
import org.apache.hadoop.metadata.typesystem.types.DataTypes;
import org.apache.hadoop.metadata.typesystem.types.HierarchicalTypeDefinition;
import org.apache.hadoop.metadata.typesystem.types.IDataType;
import org.apache.hadoop.metadata.typesystem.types.Multiplicity;
import org.apache.hadoop.metadata.typesystem.types.TraitType;
import org.apache.hadoop.metadata.typesystem.types.TypeSystem;
import org.apache.hadoop.metadata.typesystem.types.utils.TypesUtil;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.tools.cmd.Meta;
import javax.inject.Inject;
import javax.inject.Singleton;
......@@ -94,12 +100,38 @@ public class DefaultMetadataService implements MetadataService {
try {
TypesDef typesDef = typeStore.restore();
typeSystem.defineTypes(typesDef);
createSuperTypes();
} catch (MetadataException e) {
throw new RuntimeException(e);
}
LOG.info("Restored type system from the store");
}
private static final AttributeDefinition NAME_ATTRIBUTE =
TypesUtil.createRequiredAttrDef("name", DataTypes.STRING_TYPE);
private static final AttributeDefinition DESCRIPTION_ATTRIBUTE =
TypesUtil.createRequiredAttrDef("description", DataTypes.STRING_TYPE);
private static final String[] SUPER_TYPES = {
"DataSet",
"Process",
"Infrastructure",
};
@InterfaceAudience.Private
public void createSuperTypes() throws MetadataException {
if (typeSystem.isRegistered(SUPER_TYPES[0])) {
return; // this is already registered
}
for (String superTypeName : SUPER_TYPES) {
HierarchicalTypeDefinition<ClassType> superTypeDefinition =
TypesUtil.createClassTypeDef(superTypeName,
ImmutableList.<String>of(),
NAME_ATTRIBUTE, DESCRIPTION_ATTRIBUTE);
typeSystem.defineClassType(superTypeDefinition);
}
}
/**
* Creates a new type based on the type system to enable adding
......@@ -123,10 +155,9 @@ public class DefaultMetadataService implements MetadataService {
typeStore.store(typeSystem, ImmutableList.copyOf(typesAdded.keySet()));
onTypesAddedToRepo(typesAdded);
JSONObject response = new JSONObject() {{
return new JSONObject() {{
put(MetadataServiceClient.TYPES, typesAdded.keySet());
}};
return response;
} catch (JSONException e) {
LOG.error("Unable to create response for types={}", typeDefinition, e);
throw new MetadataException("Unable to create response");
......
......@@ -112,8 +112,19 @@ trait GraphPersistenceStrategies {
}
def loopObjectExpression(dataType: IDataType[_]) = {
s"{it.object.'${typeAttributeName}' == '${dataType.getName}'}"
_typeTestExpression(dataType.getName, "it.object")
}
def typeTestExpression(typeName : String) :String = {
_typeTestExpression(typeName, "it")
}
private def _typeTestExpression(typeName: String, itRef: String): String = {
s"""{(${itRef}.'${typeAttributeName}' == '${typeName}') |
|(${itRef}.'${superTypeAttributeName}' ?
|${itRef}.'${superTypeAttributeName}'.contains('${typeName}') : false)}""".
stripMargin.replace(System.getProperty("line.separator"), "")
}
}
object GraphPersistenceStrategy1 extends GraphPersistenceStrategies {
......
......@@ -185,13 +185,9 @@ class GremlinTranslator(expr: Expression,
private def genQuery(expr: Expression, inSelect: Boolean): String = expr match {
case ClassExpression(clsName) =>
val typeName = gPersistenceBehavior.typeAttributeName
val superTypeName = gPersistenceBehavior.superTypeAttributeName
s"""filter{(it.$typeName == "$clsName") | (it.$superTypeName ? it.$superTypeName.contains("$clsName") : false)}"""
s"""filter${gPersistenceBehavior.typeTestExpression(clsName)}"""
case TraitExpression(clsName) =>
val typeName = gPersistenceBehavior.typeAttributeName
val superTypeName = gPersistenceBehavior.superTypeAttributeName
s"""filter{(it.$typeName == "$clsName") | (it.$superTypeName ? it.$superTypeName.contains("$clsName") : false)}"""
s"""filter${gPersistenceBehavior.typeTestExpression(clsName)}"""
case fe@FieldExpression(fieldName, fInfo, child) if fe.dataType.getTypeCategory == TypeCategory.PRIMITIVE => {
val fN = "\"" + gPersistenceBehavior.fieldNameInVertex(fInfo.dataType, fInfo.attrInfo) + "\""
child match {
......
......@@ -29,7 +29,6 @@ import org.apache.hadoop.metadata.discovery.graph.GraphBackedDiscoveryService;
import org.apache.hadoop.metadata.query.HiveTitanSample;
import org.apache.hadoop.metadata.query.QueryTestsUtils;
import org.apache.hadoop.metadata.repository.graph.GraphBackedMetadataRepository;
import org.apache.hadoop.metadata.repository.graph.GraphBackedSearchIndexer;
import org.apache.hadoop.metadata.repository.graph.GraphHelper;
import org.apache.hadoop.metadata.repository.graph.GraphProvider;
import org.apache.hadoop.metadata.typesystem.ITypedReferenceableInstance;
......
......@@ -57,6 +57,11 @@ import java.util.List;
@Guice(modules = RepositoryMetadataModule.class)
public class HiveLineageServiceTest {
static {
// this would override super types creation if not first thing
TypeSystem.getInstance().reset();
}
@Inject
private DefaultMetadataService metadataService;
......@@ -71,8 +76,6 @@ public class HiveLineageServiceTest {
@BeforeClass
public void setUp() throws Exception {
TypeSystem.getInstance().reset();
setUpTypes();
setupInstances();
......@@ -280,9 +283,7 @@ public class HiveLineageServiceTest {
);
HierarchicalTypeDefinition<ClassType> tblClsDef =
TypesUtil.createClassTypeDef(HIVE_TABLE_TYPE, null,
attrDef("name", DataTypes.STRING_TYPE),
attrDef("description", DataTypes.STRING_TYPE),
TypesUtil.createClassTypeDef(HIVE_TABLE_TYPE, ImmutableList.of("DataSet"),
attrDef("owner", DataTypes.STRING_TYPE),
attrDef("createTime", DataTypes.INT_TYPE),
attrDef("lastAccessTime", DataTypes.INT_TYPE),
......@@ -298,8 +299,7 @@ public class HiveLineageServiceTest {
);
HierarchicalTypeDefinition<ClassType> loadProcessClsDef =
TypesUtil.createClassTypeDef(HIVE_PROCESS_TYPE, null,
attrDef("name", DataTypes.STRING_TYPE),
TypesUtil.createClassTypeDef(HIVE_PROCESS_TYPE, ImmutableList.of("Process"),
attrDef("userName", DataTypes.STRING_TYPE),
attrDef("startTime", DataTypes.INT_TYPE),
attrDef("endTime", DataTypes.INT_TYPE),
......@@ -401,7 +401,7 @@ public class HiveLineageServiceTest {
"sales fact daily materialized view",
reportingDB, sd, "Joe BI", "Managed", salesFactColumns, "Metric");
loadProcess("loadSalesDaily", "John ETL",
loadProcess("loadSalesDaily", "hive query for daily summary", "John ETL",
ImmutableList.of(salesFact, timeDim), ImmutableList.of(salesFactDaily),
"create table as select ", "plan", "id", "graph",
"ETL");
......@@ -434,7 +434,7 @@ public class HiveLineageServiceTest {
"sales fact monthly materialized view",
reportingDB, sd, "Jane BI", "Managed", salesFactColumns, "Metric");
loadProcess("loadSalesMonthly", "John ETL",
loadProcess("loadSalesMonthly", "hive query for monthly summary", "John ETL",
ImmutableList.of(salesFactDaily), ImmutableList.of(salesFactMonthly),
"create table as select ", "plan", "id", "graph",
"ETL");
......@@ -496,7 +496,7 @@ public class HiveLineageServiceTest {
return createInstance(referenceable);
}
Id loadProcess(String name, String user,
Id loadProcess(String name, String description, String user,
List<Id> inputTables,
List<Id> outputTables,
String queryText, String queryPlan,
......@@ -504,6 +504,7 @@ public class HiveLineageServiceTest {
String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("description", description);
referenceable.set("user", user);
referenceable.set("startTime", System.currentTimeMillis());
referenceable.set("endTime", System.currentTimeMillis() + 10000);
......
......@@ -379,6 +379,30 @@ public class GraphBackedMetadataRepositoryTest {
Assert.assertEquals(row.getString("name"), "Jane");
}
@Test(dependsOnMethods = "testCreateEntity")
public void testBug37860() throws Exception {
String dslQuery =
"hive_table as t where name = 'bar' " +
"database where name = 'foo' and description = 'foo database' select t";
System.out.println("Executing dslQuery = " + dslQuery);
String jsonResults = discoveryService.searchByDSL(dslQuery);
Assert.assertNotNull(jsonResults);
JSONObject results = new JSONObject(jsonResults);
Assert.assertEquals(results.length(), 3);
System.out.println("results = " + results);
Object query = results.get("query");
Assert.assertNotNull(query);
JSONObject dataType = results.getJSONObject("dataType");
Assert.assertNotNull(dataType);
JSONArray rows = results.getJSONArray("rows");
Assert.assertEquals(rows.length(), 1);
}
/**
* Full text search requires GraphBackedSearchIndexer, and GraphBackedSearchIndexer can't be enabled in
* GraphBackedDiscoveryServiceTest because of its test data. So, test for full text search is in
......
......@@ -27,7 +27,6 @@ metadata.graph.index.search.backend=elasticsearch
metadata.graph.index.search.directory=./target/data/es
metadata.graph.index.search.elasticsearch.client-only=false
metadata.graph.index.search.elasticsearch.local-mode=true
metadata.graph.index.search.elasticsearch.create.sleep=1000
######### Hive Lineage Configs #########
......
......@@ -29,14 +29,12 @@ metadata.graph.index.search.elasticsearch.local-mode=true
metadata.graph.index.search.elasticsearch.create.sleep=2000
######### Hive Lineage Configs #########
# This models follows the quick-start guide
metadata.lineage.hive.table.type.name=Table
# This models reflects the base super types for Data and Process
metadata.lineage.hive.table.type.name=DataSet
metadata.lineage.hive.table.column.name=columns
metadata.lineage.hive.process.type.name=LoadProcess
metadata.lineage.hive.process.type.name=Process
metadata.lineage.hive.process.inputs.name=inputTables
metadata.lineage.hive.process.outputs.name=outputTables
#Currently unused
#metadata.lineage.hive.column.type.name=Column
######### Security Properties #########
......
......@@ -48,44 +48,31 @@
<artifactId>metadata-repository</artifactId>
</dependency>
<!--<dependency>-->
<!--<groupId>org.apache.hadoop.metadata</groupId>-->
<!--<artifactId>metadata-client</artifactId>-->
<!--</dependency>-->
<dependency>
<groupId>org.apache.hadoop.metadata</groupId>
<artifactId>metadata-client</artifactId>
<version>${version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<!-- supports simple auth handler -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.2.5</version>
</dependency>
<dependency>
......@@ -171,19 +158,16 @@
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
<version>3.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-guice</artifactId>
<version>1.18.3</version>
</dependency>
<dependency>
......@@ -194,7 +178,6 @@
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
</dependencies>
......@@ -225,7 +208,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-war-plugin</artifactId>
<version>2.4</version>
<configuration>
<attachClasses>true</attachClasses>
<webResources>
......@@ -362,6 +344,10 @@
<inherited>true</inherited>
<extensions>true</extensions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
......@@ -128,9 +128,7 @@ public class QuickStart {
);
HierarchicalTypeDefinition<ClassType> tblClsDef =
TypesUtil.createClassTypeDef(TABLE_TYPE, null,
attrDef("name", DataTypes.STRING_TYPE),
attrDef("description", DataTypes.STRING_TYPE),
TypesUtil.createClassTypeDef(TABLE_TYPE, ImmutableList.of("DataSet"),
new AttributeDefinition("db", DATABASE_TYPE,
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("sd", STORAGE_DESC_TYPE,
......@@ -149,8 +147,7 @@ public class QuickStart {
);
HierarchicalTypeDefinition<ClassType> loadProcessClsDef =
TypesUtil.createClassTypeDef(LOAD_PROCESS_TYPE, null,
attrDef("name", DataTypes.STRING_TYPE),
TypesUtil.createClassTypeDef(LOAD_PROCESS_TYPE, ImmutableList.of("Process"),
attrDef("userName", DataTypes.STRING_TYPE),
attrDef("startTime", DataTypes.INT_TYPE),
attrDef("endTime", DataTypes.INT_TYPE),
......@@ -273,7 +270,7 @@ public class QuickStart {
"sales fact daily materialized view", reportingDB, sd,
"Joe BI", "Managed", salesFactColumns, "Metric");
loadProcess("loadSalesDaily", "John ETL",
loadProcess("loadSalesDaily", "hive query for daily summary", "John ETL",
ImmutableList.of(salesFact, timeDim), ImmutableList.of(salesFactDaily),
"create table as select ", "plan", "id", "graph",
"ETL");
......@@ -288,7 +285,7 @@ public class QuickStart {
"sales fact monthly materialized view",
reportingDB, sd, "Jane BI", "Managed", salesFactColumns, "Metric");
loadProcess("loadSalesMonthly", "John ETL",
loadProcess("loadSalesMonthly", "hive query for monthly summary", "John ETL",
ImmutableList.of(salesFactDaily), ImmutableList.of(salesFactMonthly),
"create table as select ", "plan", "id", "graph",
"ETL");
......@@ -362,7 +359,7 @@ public class QuickStart {
return createInstance(referenceable);
}
Id loadProcess(String name, String user,
Id loadProcess(String name, String description, String user,
List<Id> inputTables,
List<Id> outputTables,
String queryText, String queryPlan,
......@@ -370,6 +367,7 @@ public class QuickStart {
String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(LOAD_PROCESS_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("description", description);
referenceable.set("user", user);
referenceable.set("startTime", System.currentTimeMillis());
referenceable.set("endTime", System.currentTimeMillis() + 10000);
......@@ -465,6 +463,8 @@ public class QuickStart {
*/
"Table where name=\"sales_fact\", columns",
"Table where name=\"sales_fact\", columns as column select column.name, column.dataType, column.comment",
"from DataSet",
"from Process",
};
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment