Commit e3935d5f by Shwetha GS

using guice AOP for graph transaction

parent 84c8636d
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD)
public @interface GraphTransaction {}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata;
import com.google.inject.Inject;
import com.thinkaurelius.titan.core.TitanGraph;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.hadoop.metadata.repository.graph.GraphProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class GraphTransactionInterceptor implements MethodInterceptor {
private static final Logger LOG = LoggerFactory.getLogger(GraphTransactionInterceptor.class);
private TitanGraph titanGraph;
@Inject
GraphProvider<TitanGraph> graphProvider;
public Object invoke(MethodInvocation invocation) throws Throwable {
if (titanGraph == null) {
titanGraph = graphProvider.get();
}
try {
LOG.debug("graph rollback to cleanup previous state");
titanGraph.rollback(); //cleanup previous state
Object response = invocation.proceed();
titanGraph.commit();
LOG.debug("graph commit");
return response;
} catch (Throwable t){
titanGraph.rollback();
LOG.debug("graph rollback");
throw t;
}
}
}
......@@ -19,8 +19,10 @@
package org.apache.hadoop.metadata;
import com.google.inject.Scopes;
import com.google.inject.matcher.Matchers;
import com.google.inject.throwingproviders.ThrowingProviderBinder;
import com.thinkaurelius.titan.core.TitanGraph;
import org.aopalliance.intercept.MethodInterceptor;
import org.apache.hadoop.metadata.discovery.DiscoveryService;
import org.apache.hadoop.metadata.discovery.HiveLineageService;
import org.apache.hadoop.metadata.discovery.LineageService;
......@@ -40,31 +42,7 @@ import org.apache.hadoop.metadata.services.MetadataService;
* Guice module for Repository module.
*/
public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
// Graph Service implementation class
// private Class<? extends GraphService> graphServiceClass;
// MetadataRepositoryService implementation class
private Class<? extends MetadataRepository> metadataRepoClass;
private Class<? extends ITypeStore> typeStore;
private Class<? extends MetadataService> metadataService;
private Class<? extends DiscoveryService> discoveryService;
private Class<? extends SearchIndexer> searchIndexer;
private Class<? extends LineageService> lineageService;
public RepositoryMetadataModule() {
// GraphServiceConfigurator gsp = new GraphServiceConfigurator();
// get the impl classes for the repo and the graph service
// this.graphServiceClass = gsp.getImplClass();
this.metadataRepoClass = GraphBackedMetadataRepository.class;
this.typeStore = GraphBackedTypeStore.class;
this.metadataService = DefaultMetadataService.class;
this.discoveryService = GraphBackedDiscoveryService.class;
this.searchIndexer = GraphBackedSearchIndexer.class;
this.lineageService = HiveLineageService.class;
}
@Override
protected void configure() {
// special wiring for Titan Graph
ThrowingProviderBinder.create(binder())
......@@ -75,22 +53,26 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
// allow for dynamic binding of the metadata repo & graph service
// bind the MetadataRepositoryService interface to an implementation
bind(MetadataRepository.class).to(metadataRepoClass);
bind(MetadataRepository.class).to(GraphBackedMetadataRepository.class);
// bind the ITypeStore interface to an implementation
bind(ITypeStore.class).to(typeStore);
bind(ITypeStore.class).to(GraphBackedTypeStore.class);
// bind the GraphService interface to an implementation
// bind(GraphService.class).to(graphServiceClass);
// bind the MetadataService interface to an implementation
bind(MetadataService.class).to(metadataService);
bind(MetadataService.class).to(DefaultMetadataService.class);
// bind the DiscoveryService interface to an implementation
bind(DiscoveryService.class).to(discoveryService);
bind(DiscoveryService.class).to(GraphBackedDiscoveryService.class);
bind(SearchIndexer.class).to(GraphBackedSearchIndexer.class);
bind(SearchIndexer.class).to(searchIndexer);
bind(LineageService.class).to(HiveLineageService.class);
bind(LineageService.class).to(lineageService);
MethodInterceptor interceptor = new GraphTransactionInterceptor();
requestInjection(interceptor);
bindInterceptor(Matchers.any(), Matchers.annotatedWith(GraphTransaction.class), interceptor);
}
}
......@@ -20,6 +20,7 @@ package org.apache.hadoop.metadata.discovery;
import com.thinkaurelius.titan.core.TitanGraph;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.metadata.GraphTransaction;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.PropertiesUtil;
import org.apache.hadoop.metadata.discovery.graph.DefaultGraphPersistenceStrategy;
......@@ -96,6 +97,7 @@ public class HiveLineageService implements LineageService {
* @return Lineage Outputs as JSON
*/
@Override
@GraphTransaction
public String getOutputs(String tableName) throws DiscoveryException {
LOG.info("Fetching lineage outputs for tableName={}", tableName);
......@@ -121,6 +123,7 @@ public class HiveLineageService implements LineageService {
* @return Outputs Graph as JSON
*/
@Override
@GraphTransaction
public String getOutputsGraph(String tableName) throws DiscoveryException {
LOG.info("Fetching lineage outputs graph for tableName={}", tableName);
......@@ -139,6 +142,7 @@ public class HiveLineageService implements LineageService {
* @return Lineage Inputs as JSON
*/
@Override
@GraphTransaction
public String getInputs(String tableName) throws DiscoveryException {
LOG.info("Fetching lineage inputs for tableName={}", tableName);
......@@ -164,6 +168,7 @@ public class HiveLineageService implements LineageService {
* @return Inputs Graph as JSON
*/
@Override
@GraphTransaction
public String getInputsGraph(String tableName) throws DiscoveryException {
LOG.info("Fetching lineage inputs graph for tableName={}", tableName);
......@@ -182,6 +187,7 @@ public class HiveLineageService implements LineageService {
* @return Schema as JSON
*/
@Override
@GraphTransaction
public String getSchema(String tableName) throws DiscoveryException {
// todo - validate if indeed this is a table type and exists
String schemaQuery = HIVE_TABLE_TYPE_NAME
......
......@@ -25,6 +25,7 @@ import com.thinkaurelius.titan.core.TitanVertex;
import com.tinkerpop.blueprints.Vertex;
import com.tinkerpop.gremlin.groovy.Gremlin;
import com.tinkerpop.gremlin.java.GremlinPipeline;
import org.apache.hadoop.metadata.GraphTransaction;
import org.apache.hadoop.metadata.MetadataServiceClient;
import org.apache.hadoop.metadata.discovery.DiscoveryException;
import org.apache.hadoop.metadata.discovery.DiscoveryService;
......@@ -82,6 +83,7 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
//http://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query
// .html#query-string-syntax for query syntax
@Override
// @GraphTransaction
public String searchByFullText(String query) throws DiscoveryException {
String graphQuery = String.format("v.%s:(%s)", Constants.ENTITY_TEXT_PROPERTY_KEY, query);
LOG.debug("Full text query: {}", graphQuery);
......@@ -118,6 +120,7 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
* @return JSON representing the type and results.
*/
@Override
@GraphTransaction
public String searchByDSL(String dslQuery) throws DiscoveryException {
LOG.info("Executing dsl query={}", dslQuery);
try {
......@@ -155,6 +158,7 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
* @throws org.apache.hadoop.metadata.discovery.DiscoveryException
*/
@Override
@GraphTransaction
public List<Map<String, String>> searchByGremlin(String gremlinQuery)
throws DiscoveryException {
LOG.info("Executing gremlin query={}", gremlinQuery);
......
......@@ -27,6 +27,7 @@ import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.GraphQuery;
import com.tinkerpop.blueprints.Vertex;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.metadata.GraphTransaction;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.repository.Constants;
import org.apache.hadoop.metadata.repository.MetadataRepository;
......@@ -137,27 +138,23 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
}
@Override
@GraphTransaction
public String createEntity(IReferenceableInstance typedInstance) throws RepositoryException {
LOG.info("adding entity={}", typedInstance);
try {
titanGraph.rollback();
final String guid = instanceToGraphMapper.mapTypedInstanceToGraph(typedInstance);
titanGraph.commit(); // commit if there are no errors
return guid;
} catch (MetadataException e) {
titanGraph.rollback();
throw new RepositoryException(e);
}
}
@Override
@GraphTransaction
public ITypedReferenceableInstance getEntityDefinition(String guid) throws RepositoryException {
LOG.info("Retrieving entity with guid={}", guid);
try {
titanGraph.rollback(); // clean up before starting a query
Vertex instanceVertex = getVertexForGUID(guid);
LOG.debug("Found a vertex {} for guid {}", instanceVertex, guid);
......@@ -206,9 +203,9 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
* @throws RepositoryException
*/
@Override
@GraphTransaction
public List<String> getTraitNames(String guid) throws RepositoryException {
LOG.info("Retrieving trait names for entity={}", guid);
titanGraph.rollback(); // clean up before starting a query
Vertex instanceVertex = getVertexForGUID(guid);
return getTraitNames(instanceVertex);
}
......@@ -231,6 +228,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
* @throws RepositoryException
*/
@Override
@GraphTransaction
public void addTrait(String guid,
ITypedStruct traitInstance) throws RepositoryException {
Preconditions.checkNotNull(traitInstance, "Trait instance cannot be null");
......@@ -238,22 +236,18 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
LOG.info("Adding a new trait={} for entity={}", traitName, guid);
try {
titanGraph.rollback(); // clean up before starting a query
Vertex instanceVertex = getVertexForGUID(guid);
// add the trait instance as a new vertex
final String typeName = getTypeName(instanceVertex);
instanceToGraphMapper.mapTraitInstanceToVertex(traitInstance,
getIdFromVertex(typeName, instanceVertex),
instanceToGraphMapper.mapTraitInstanceToVertex(traitInstance, getIdFromVertex(typeName, instanceVertex),
typeName, instanceVertex, Collections.<Id, Vertex>emptyMap());
// update the traits in entity once adding trait instance is successful
((TitanVertex) instanceVertex)
.addProperty(Constants.TRAIT_NAMES_PROPERTY_KEY, traitName);
titanGraph.commit(); // commit if there are no errors
} catch (MetadataException e) {
titanGraph.rollback();
throw new RepositoryException(e);
}
}
......@@ -266,11 +260,11 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
* @throws RepositoryException
*/
@Override
@GraphTransaction
public void deleteTrait(String guid, String traitNameToBeDeleted)
throws RepositoryException {
LOG.info("Deleting trait={} from entity={}", traitNameToBeDeleted, guid);
try {
titanGraph.rollback(); // clean up before starting a query
Vertex instanceVertex = getVertexForGUID(guid);
List<String> traitNames = getTraitNames(instanceVertex);
......@@ -297,11 +291,8 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
traitNames.remove(traitNameToBeDeleted);
updateTraits(instanceVertex, traitNames);
}
titanGraph.commit(); // commit if there are no errors
}
} catch (Exception e) {
titanGraph.rollback();
throw new RepositoryException(e);
}
}
......@@ -318,11 +309,11 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
}
@Override
@GraphTransaction
public void updateEntity(String guid, String property, String value) throws RepositoryException {
LOG.info("Adding property {} for entity guid {}", property, guid);
try {
titanGraph.rollback(); // clean up before starting a query
Vertex instanceVertex = GraphHelper.findVertexByGUID(titanGraph, guid);
if (instanceVertex == null) {
throw new RepositoryException("Could not find a vertex for guid " + guid);
......@@ -349,11 +340,8 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
instanceToGraphMapper.mapAttributesToVertex(getIdFromVertex(typeName, instanceVertex), instance,
instanceVertex, new HashMap<Id, Vertex>(), attributeInfo, attributeInfo.dataType());
titanGraph.commit();
} catch (Exception e) {
throw new RepositoryException(e);
} finally {
titanGraph.rollback();
}
}
......
......@@ -18,6 +18,7 @@
package org.apache.hadoop.metadata.repository.graph;
import com.google.inject.Provides;
import com.thinkaurelius.titan.core.TitanFactory;
import com.thinkaurelius.titan.core.TitanGraph;
import org.apache.commons.configuration.Configuration;
......@@ -64,6 +65,7 @@ public class TitanGraphProvider implements GraphProvider<TitanGraph> {
@Override
@Singleton
@Provides
public TitanGraph get() {
Configuration config;
try {
......
......@@ -25,6 +25,7 @@ import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Vertex;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.metadata.GraphTransaction;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.repository.Constants;
import org.apache.hadoop.metadata.repository.graph.GraphProvider;
......@@ -76,10 +77,9 @@ public class GraphBackedTypeStore implements ITypeStore {
}
@Override
@GraphTransaction
public void store(TypeSystem typeSystem, ImmutableList<String> typeNames) throws MetadataException {
try {
ImmutableList<String> coreTypes = typeSystem.getCoreTypes();
titanGraph.rollback(); //Cleanup previous state
for (String typeName : typeNames) {
if (!coreTypes.contains(typeName)) {
IDataType dataType = typeSystem.getDataType(IDataType.class, typeName);
......@@ -107,10 +107,6 @@ public class GraphBackedTypeStore implements ITypeStore {
}
}
}
titanGraph.commit();
} finally {
titanGraph.rollback();
}
}
private void storeInGraph(EnumType dataType) {
......@@ -206,9 +202,8 @@ public class GraphBackedTypeStore implements ITypeStore {
}
@Override
@GraphTransaction
public TypesDef restore() throws MetadataException {
try {
titanGraph.rollback(); //Cleanup previous state
//Get all vertices for type system
Iterator vertices =
titanGraph.query().has(Constants.VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE).vertices().iterator();
......@@ -249,11 +244,7 @@ public class GraphBackedTypeStore implements ITypeStore {
throw new IllegalArgumentException("Unhandled type category " + typeCategory);
}
}
titanGraph.commit();
return TypeUtils.getTypesDef(enums.build(), structs.build(), traits.build(), classTypes.build());
} finally {
titanGraph.rollback();
}
}
private EnumTypeDefinition getEnumType(Vertex vertex) {
......
......@@ -27,7 +27,6 @@ metadata.graph.index.search.backend=elasticsearch
metadata.graph.index.search.directory=./target/data/es
metadata.graph.index.search.elasticsearch.client-only=false
metadata.graph.index.search.elasticsearch.local-mode=true
metadata.graph.index.search.elasticsearch.create.sleep=1000
######### Hive Lineage Configs #########
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment