Commit f7fe3750 by Suma Shivaprasad

Fixed indexing failures for specific types and due to missing transaction…

Fixed indexing failures for specific types and due to missing transaction closures. Todo - Nested transactions with write lock on graph and nested read transaction in DMS.createType are not working with bdb
parent ed6b9bf4
...@@ -44,7 +44,7 @@ public class GraphTransactionInterceptor implements MethodInterceptor { ...@@ -44,7 +44,7 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
return response; return response;
} catch (Throwable t){ } catch (Throwable t){
titanGraph.rollback(); titanGraph.rollback();
LOG.debug("graph rollback"); LOG.error("graph rollback due to exception ", t);
throw t; throw t;
} }
} }
......
...@@ -42,6 +42,7 @@ import org.apache.hadoop.metadata.services.MetadataService; ...@@ -42,6 +42,7 @@ import org.apache.hadoop.metadata.services.MetadataService;
* Guice module for Repository module. * Guice module for Repository module.
*/ */
public class RepositoryMetadataModule extends com.google.inject.AbstractModule { public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
@Override @Override
protected void configure() { protected void configure() {
// special wiring for Titan Graph // special wiring for Titan Graph
...@@ -67,7 +68,7 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule { ...@@ -67,7 +68,7 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
// bind the DiscoveryService interface to an implementation // bind the DiscoveryService interface to an implementation
bind(DiscoveryService.class).to(GraphBackedDiscoveryService.class).asEagerSingleton(); bind(DiscoveryService.class).to(GraphBackedDiscoveryService.class).asEagerSingleton();
bind(SearchIndexer.class).to(GraphBackedSearchIndexer.class).asEagerSingleton(); bind(SearchIndexer.class).to(GraphBackedSearchIndexer.class);
bind(LineageService.class).to(HiveLineageService.class).asEagerSingleton(); bind(LineageService.class).to(HiveLineageService.class).asEagerSingleton();
...@@ -75,4 +76,5 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule { ...@@ -75,4 +76,5 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
requestInjection(interceptor); requestInjection(interceptor);
bindInterceptor(Matchers.any(), Matchers.annotatedWith(GraphTransaction.class), interceptor); bindInterceptor(Matchers.any(), Matchers.annotatedWith(GraphTransaction.class), interceptor);
} }
} }
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package org.apache.hadoop.metadata.discovery; package org.apache.hadoop.metadata.discovery;
import org.apache.hadoop.metadata.listener.TypesChangeListener; import org.apache.hadoop.metadata.listener.TypesChangeListener;
import org.apache.hadoop.metadata.repository.IndexException;
/** /**
...@@ -26,4 +27,11 @@ import org.apache.hadoop.metadata.listener.TypesChangeListener; ...@@ -26,4 +27,11 @@ import org.apache.hadoop.metadata.listener.TypesChangeListener;
*/ */
public interface SearchIndexer extends TypesChangeListener { public interface SearchIndexer extends TypesChangeListener {
/* Commit the indexes */
void commit() throws IndexException;
/* RollBack the index */
void rollback() throws IndexException;
} }
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package org.apache.hadoop.metadata.discovery.graph; package org.apache.hadoop.metadata.discovery.graph;
import com.thinkaurelius.titan.core.TitanVertex; import com.thinkaurelius.titan.core.TitanVertex;
import org.apache.hadoop.metadata.GraphTransaction;
import org.apache.hadoop.metadata.MetadataException; import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.query.Expressions; import org.apache.hadoop.metadata.query.Expressions;
import org.apache.hadoop.metadata.query.GraphPersistenceStrategies; import org.apache.hadoop.metadata.query.GraphPersistenceStrategies;
...@@ -90,6 +91,7 @@ public class DefaultGraphPersistenceStrategy implements GraphPersistenceStrategi ...@@ -90,6 +91,7 @@ public class DefaultGraphPersistenceStrategy implements GraphPersistenceStrategi
} }
@Override @Override
@GraphTransaction
public Id getIdFromVertex(String dataTypeName, TitanVertex vertex) { public Id getIdFromVertex(String dataTypeName, TitanVertex vertex) {
return metadataRepository.getIdFromVertex(dataTypeName, vertex); return metadataRepository.getIdFromVertex(dataTypeName, vertex);
} }
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.repository;
public class IndexCreationException extends IndexException {
public IndexCreationException() {
}
public IndexCreationException(String message) {
super(message);
}
public IndexCreationException(String message, Throwable cause) {
super(message, cause);
}
public IndexCreationException(Throwable cause) {
super(cause);
}
public IndexCreationException(String message, Throwable cause, boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.repository;
import org.apache.hadoop.metadata.MetadataException;
public class IndexException extends MetadataException {
public IndexException() {
}
public IndexException(String message) {
super(message);
}
public IndexException(String message, Throwable cause) {
super(message, cause);
}
public IndexException(Throwable cause) {
super(cause);
}
public IndexException(String message, Throwable cause, boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
...@@ -181,6 +181,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { ...@@ -181,6 +181,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
} }
@Override @Override
@GraphTransaction
public List<String> getEntityList(String entityType) throws RepositoryException { public List<String> getEntityList(String entityType) throws RepositoryException {
LOG.info("Retrieving entity list for type={}", entityType); LOG.info("Retrieving entity list for type={}", entityType);
GraphQuery query = titanGraph.query() GraphQuery query = titanGraph.query()
...@@ -421,7 +422,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { ...@@ -421,7 +422,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
} }
} }
public void createVerticesForClassTypes( private void createVerticesForClassTypes(
List<ITypedReferenceableInstance> newInstances) throws MetadataException { List<ITypedReferenceableInstance> newInstances) throws MetadataException {
for (ITypedReferenceableInstance typedInstance : newInstances) { for (ITypedReferenceableInstance typedInstance : newInstances) {
final Id id = typedInstance.getId(); final Id id = typedInstance.getId();
...@@ -927,6 +928,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { ...@@ -927,6 +928,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
public final class GraphToTypedInstanceMapper { public final class GraphToTypedInstanceMapper {
@GraphTransaction
public ITypedReferenceableInstance mapGraphToTypedInstance(String guid, public ITypedReferenceableInstance mapGraphToTypedInstance(String guid,
Vertex instanceVertex) Vertex instanceVertex)
throws MetadataException { throws MetadataException {
...@@ -950,7 +952,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { ...@@ -950,7 +952,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
return typedInstance; return typedInstance;
} }
public void mapVertexToInstanceTraits(Vertex instanceVertex, private void mapVertexToInstanceTraits(Vertex instanceVertex,
ITypedReferenceableInstance typedInstance, ITypedReferenceableInstance typedInstance,
List<String> traits) throws MetadataException { List<String> traits) throws MetadataException {
for (String traitName : traits) { for (String traitName : traits) {
...@@ -961,6 +963,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { ...@@ -961,6 +963,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
} }
} }
@GraphTransaction
public void mapVertexToInstance(Vertex instanceVertex, ITypedInstance typedInstance, public void mapVertexToInstance(Vertex instanceVertex, ITypedInstance typedInstance,
Map<String, AttributeInfo> fields) Map<String, AttributeInfo> fields)
throws MetadataException { throws MetadataException {
...@@ -972,7 +975,8 @@ public class GraphBackedMetadataRepository implements MetadataRepository { ...@@ -972,7 +975,8 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
} }
} }
public void mapVertexToAttribute(Vertex instanceVertex, ITypedInstance typedInstance,
private void mapVertexToAttribute(Vertex instanceVertex, ITypedInstance typedInstance,
AttributeInfo attributeInfo) throws MetadataException { AttributeInfo attributeInfo) throws MetadataException {
LOG.debug("Mapping attributeInfo {}", attributeInfo.name); LOG.debug("Mapping attributeInfo {}", attributeInfo.name);
final IDataType dataType = attributeInfo.dataType(); final IDataType dataType = attributeInfo.dataType();
...@@ -1022,7 +1026,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { ...@@ -1022,7 +1026,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
} }
} }
public Object mapClassReferenceToVertex(Vertex instanceVertex, private Object mapClassReferenceToVertex(Vertex instanceVertex,
AttributeInfo attributeInfo, AttributeInfo attributeInfo,
String relationshipLabel, String relationshipLabel,
IDataType dataType) throws MetadataException { IDataType dataType) throws MetadataException {
...@@ -1052,7 +1056,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { ...@@ -1052,7 +1056,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void mapVertexToArrayInstance(Vertex instanceVertex, ITypedInstance typedInstance, private void mapVertexToArrayInstance(Vertex instanceVertex, ITypedInstance typedInstance,
AttributeInfo attributeInfo, AttributeInfo attributeInfo,
String propertyName) throws MetadataException { String propertyName) throws MetadataException {
LOG.debug("mapping vertex {} to array {}", instanceVertex, attributeInfo.name); LOG.debug("mapping vertex {} to array {}", instanceVertex, attributeInfo.name);
...@@ -1072,7 +1076,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { ...@@ -1072,7 +1076,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
typedInstance.set(attributeInfo.name, values); typedInstance.set(attributeInfo.name, values);
} }
public Object mapVertexToCollectionEntry(Vertex instanceVertex, private Object mapVertexToCollectionEntry(Vertex instanceVertex,
AttributeInfo attributeInfo, AttributeInfo attributeInfo,
IDataType elementType, Object value, String propertyName) IDataType elementType, Object value, String propertyName)
throws MetadataException { throws MetadataException {
...@@ -1156,7 +1160,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { ...@@ -1156,7 +1160,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
return null; return null;
} }
public Object mapClassReferenceToVertex(Vertex instanceVertex, private Object mapClassReferenceToVertex(Vertex instanceVertex,
AttributeInfo attributeInfo, AttributeInfo attributeInfo,
String relationshipLabel, String relationshipLabel,
IDataType dataType, IDataType dataType,
...@@ -1223,7 +1227,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { ...@@ -1223,7 +1227,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
traitName, traitType, traitInstance); traitName, traitType, traitInstance);
} }
public void mapVertexToTraitInstance(Vertex instanceVertex, String typedInstanceTypeName, private void mapVertexToTraitInstance(Vertex instanceVertex, String typedInstanceTypeName,
String traitName, TraitType traitType, String traitName, TraitType traitType,
ITypedStruct traitInstance) throws MetadataException { ITypedStruct traitInstance) throws MetadataException {
String relationshipLabel = getEdgeLabel(typedInstanceTypeName, traitName); String relationshipLabel = getEdgeLabel(typedInstanceTypeName, traitName);
......
...@@ -18,21 +18,18 @@ ...@@ -18,21 +18,18 @@
package org.apache.hadoop.metadata.repository.graph; package org.apache.hadoop.metadata.repository.graph;
import com.google.inject.Singleton;
import com.thinkaurelius.titan.core.Cardinality; import com.thinkaurelius.titan.core.Cardinality;
import com.thinkaurelius.titan.core.EdgeLabel;
import com.thinkaurelius.titan.core.Order;
import com.thinkaurelius.titan.core.PropertyKey; import com.thinkaurelius.titan.core.PropertyKey;
import com.thinkaurelius.titan.core.TitanGraph; import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.schema.Mapping; import com.thinkaurelius.titan.core.schema.Mapping;
import com.thinkaurelius.titan.core.schema.TitanGraphIndex; import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
import com.thinkaurelius.titan.core.schema.TitanManagement; import com.thinkaurelius.titan.core.schema.TitanManagement;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge; import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Vertex; import com.tinkerpop.blueprints.Vertex;
import org.apache.hadoop.metadata.MetadataException; import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.discovery.SearchIndexer; import org.apache.hadoop.metadata.discovery.SearchIndexer;
import org.apache.hadoop.metadata.repository.Constants; import org.apache.hadoop.metadata.repository.Constants;
import org.apache.hadoop.metadata.repository.IndexException;
import org.apache.hadoop.metadata.repository.RepositoryException; import org.apache.hadoop.metadata.repository.RepositoryException;
import org.apache.hadoop.metadata.typesystem.types.AttributeInfo; import org.apache.hadoop.metadata.typesystem.types.AttributeInfo;
import org.apache.hadoop.metadata.typesystem.types.ClassType; import org.apache.hadoop.metadata.typesystem.types.ClassType;
...@@ -52,19 +49,24 @@ import java.util.Map; ...@@ -52,19 +49,24 @@ import java.util.Map;
/** /**
* Adds index for properties of a given type when its added before any instances are added. * Adds index for properties of a given type when its added before any instances are added.
*/ */
@Singleton
public class GraphBackedSearchIndexer implements SearchIndexer { public class GraphBackedSearchIndexer implements SearchIndexer {
private static final Logger LOG = LoggerFactory.getLogger(GraphBackedSearchIndexer.class); private static final Logger LOG = LoggerFactory.getLogger(GraphBackedSearchIndexer.class);
private final TitanGraph titanGraph; private final TitanGraph titanGraph;
private TitanManagement management;
@Inject @Inject
public GraphBackedSearchIndexer(GraphProvider<TitanGraph> graphProvider) public GraphBackedSearchIndexer(GraphProvider<TitanGraph> graphProvider)
throws RepositoryException { throws RepositoryException {
this.titanGraph = graphProvider.get(); this.titanGraph = graphProvider.get();
/* Create the transaction for indexing.
* Commit/rollback is expected to be called from the caller.
*/
management = titanGraph.getManagementSystem();
initialize(); initialize();
} }
...@@ -72,18 +74,17 @@ public class GraphBackedSearchIndexer implements SearchIndexer { ...@@ -72,18 +74,17 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
* Initializes the indices for the graph - create indices for Global Vertex Keys * Initializes the indices for the graph - create indices for Global Vertex Keys
*/ */
private void initialize() { private void initialize() {
TitanManagement management = titanGraph.getManagementSystem();
if (management.containsPropertyKey(Constants.GUID_PROPERTY_KEY)) { if (management.containsPropertyKey(Constants.GUID_PROPERTY_KEY)) {
LOG.info("Global indexes already exist for graph"); LOG.info("Global indexes already exist for graph");
return; return;
} }
/* This is called only once, which is the first time Atlas types are made indexable .*/
LOG.info("Indexes do not exist, Creating indexes for titanGraph."); LOG.info("Indexes do not exist, Creating indexes for titanGraph.");
management.buildIndex(Constants.VERTEX_INDEX, Vertex.class) management.buildIndex(Constants.VERTEX_INDEX, Vertex.class)
.buildMixedIndex(Constants.BACKING_INDEX); .buildMixedIndex(Constants.BACKING_INDEX);
management.buildIndex(Constants.EDGE_INDEX, Edge.class) management.buildIndex(Constants.EDGE_INDEX, Edge.class)
.buildMixedIndex(Constants.BACKING_INDEX); .buildMixedIndex(Constants.BACKING_INDEX);
management.commit();
// create a composite index for guid as its unique // create a composite index for guid as its unique
createCompositeIndex(Constants.GUID_INDEX, createCompositeIndex(Constants.GUID_INDEX,
...@@ -108,18 +109,20 @@ public class GraphBackedSearchIndexer implements SearchIndexer { ...@@ -108,18 +109,20 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
//Indexes for graph backed type system store //Indexes for graph backed type system store
createTypeStoreIndexes(); createTypeStoreIndexes();
management.commit();
//Make sure we acquire another transaction after commit for subsequent indexing
management = titanGraph.getManagementSystem();
LOG.info("Index creation for global keys complete."); LOG.info("Index creation for global keys complete.");
} }
private void createFullTextIndex() { private void createFullTextIndex() {
TitanManagement management = titanGraph.getManagementSystem();
PropertyKey fullText = PropertyKey fullText =
management.makePropertyKey(Constants.ENTITY_TEXT_PROPERTY_KEY).dataType(String.class).make(); management.makePropertyKey(Constants.ENTITY_TEXT_PROPERTY_KEY).dataType(String.class).make();
management.buildIndex(Constants.FULLTEXT_INDEX, Vertex.class) management.buildIndex(Constants.FULLTEXT_INDEX, Vertex.class)
.addKey(fullText, com.thinkaurelius.titan.core.schema.Parameter.of("mapping", Mapping.TEXT)) .addKey(fullText, com.thinkaurelius.titan.core.schema.Parameter.of("mapping", Mapping.TEXT))
.buildMixedIndex(Constants.BACKING_INDEX); .buildMixedIndex(Constants.BACKING_INDEX);
management.commit();
LOG.info("Created mixed index for {}", Constants.ENTITY_TEXT_PROPERTY_KEY); LOG.info("Created mixed index for {}", Constants.ENTITY_TEXT_PROPERTY_KEY);
} }
...@@ -151,8 +154,8 @@ public class GraphBackedSearchIndexer implements SearchIndexer { ...@@ -151,8 +154,8 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
} catch (Throwable throwable) { } catch (Throwable throwable) {
// gets handle to currently open transaction // gets handle to currently open transaction
titanGraph.getManagementSystem().rollback();
LOG.error("Error creating index for type {}", dataType, throwable); LOG.error("Error creating index for type {}", dataType, throwable);
throw new IndexException("Error while creating index for type " + dataType, throwable);
} }
} }
...@@ -285,7 +288,6 @@ public class GraphBackedSearchIndexer implements SearchIndexer { ...@@ -285,7 +288,6 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
private PropertyKey createCompositeIndex(String indexName, private PropertyKey createCompositeIndex(String indexName,
String propertyName, Class propertyClass, String propertyName, Class propertyClass,
boolean isUnique, Cardinality cardinality) { boolean isUnique, Cardinality cardinality) {
TitanManagement management = titanGraph.getManagementSystem();
PropertyKey propertyKey = management.getPropertyKey(propertyName); PropertyKey propertyKey = management.getPropertyKey(propertyName);
if (propertyKey == null) { if (propertyKey == null) {
propertyKey = management propertyKey = management
...@@ -303,7 +305,6 @@ public class GraphBackedSearchIndexer implements SearchIndexer { ...@@ -303,7 +305,6 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
} }
indexBuilder.buildCompositeIndex(); indexBuilder.buildCompositeIndex();
management.commit();
LOG.info("Created index for property {} in composite index {}", propertyName, indexName); LOG.info("Created index for property {} in composite index {}", propertyName, indexName);
} }
...@@ -312,7 +313,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer { ...@@ -312,7 +313,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
} }
private PropertyKey createVertexMixedIndex(String propertyName, Class propertyClass) { private PropertyKey createVertexMixedIndex(String propertyName, Class propertyClass) {
TitanManagement management = titanGraph.getManagementSystem();
PropertyKey propertyKey = management.getPropertyKey(propertyName); PropertyKey propertyKey = management.getPropertyKey(propertyName);
if (propertyKey == null) { if (propertyKey == null) {
// ignored cardinality as Can only index single-valued property keys on vertices // ignored cardinality as Can only index single-valued property keys on vertices
...@@ -321,15 +322,18 @@ public class GraphBackedSearchIndexer implements SearchIndexer { ...@@ -321,15 +322,18 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
.dataType(propertyClass) .dataType(propertyClass)
.make(); .make();
if (propertyClass == Boolean.class) { if (!checkIfMixedIndexApplicable(propertyClass)) {
LOG.debug("Creating composite index for property {} of type {} ", propertyName, propertyClass.getName());
//Use standard index as backing index only supports string, int and geo types //Use standard index as backing index only supports string, int and geo types
management.buildIndex(propertyName, Vertex.class).addKey(propertyKey).buildCompositeIndex(); management.buildIndex(propertyName, Vertex.class).addKey(propertyKey).buildCompositeIndex();
management.commit(); LOG.debug("Created composite index for property {} of type {} ", propertyName, propertyClass.getName());
} else { } else {
//Use backing index //Use backing index
LOG.debug("Creating backing index for property {} of type {} ", propertyName, propertyClass.getName());
TitanGraphIndex vertexIndex = management.getGraphIndex(Constants.VERTEX_INDEX); TitanGraphIndex vertexIndex = management.getGraphIndex(Constants.VERTEX_INDEX);
management.addIndexKey(vertexIndex, propertyKey); management.addIndexKey(vertexIndex, propertyKey);
management.commit(); LOG.debug("Created backing index for property {} of type {} ", propertyName, propertyClass.getName());
} }
LOG.info("Created mixed vertex index for property {}", propertyName); LOG.info("Created mixed vertex index for property {}", propertyName);
} }
...@@ -337,15 +341,41 @@ public class GraphBackedSearchIndexer implements SearchIndexer { ...@@ -337,15 +341,41 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
return propertyKey; return propertyKey;
} }
private boolean checkIfMixedIndexApplicable(Class propertyClass) {
//TODO - Check why date types are failing in ES/Solr
if (propertyClass == Boolean.class || propertyClass == BigDecimal.class || propertyClass == BigInteger.class || propertyClass == Date.class) {
return false;
}
return true;
}
@Override
public void commit() throws IndexException {
try {
management.commit();
} catch(Exception e) {
LOG.error("Index commit failed" , e);
throw new IndexException("Index commit failed " , e);
}
}
@Override
public void rollback() throws IndexException {
try {
management.rollback();
} catch(Exception e) {
LOG.error("Index rollback failed " , e);
throw new IndexException("Index rollback failed " , e);
}
}
/* Commenting this out since we do not need an index for edge label here /* Commenting this out since we do not need an index for edge label here
private void createEdgeMixedIndex(String propertyName) { private void createEdgeMixedIndex(String propertyName) {
TitanManagement management = titanGraph.getManagementSystem();
EdgeLabel edgeLabel = management.getEdgeLabel(propertyName); EdgeLabel edgeLabel = management.getEdgeLabel(propertyName);
if (edgeLabel == null) { if (edgeLabel == null) {
edgeLabel = management.makeEdgeLabel(propertyName).make(); edgeLabel = management.makeEdgeLabel(propertyName).make();
management.buildEdgeIndex(edgeLabel, propertyName, Direction.BOTH, Order.DEFAULT); management.buildEdgeIndex(edgeLabel, propertyName, Direction.BOTH, Order.DEFAULT);
management.commit();
LOG.info("Created index for edge label {}", propertyName); LOG.info("Created index for edge label {}", propertyName);
} }
} */ }*/
} }
...@@ -75,6 +75,7 @@ public class GraphBackedTypeStore implements ITypeStore { ...@@ -75,6 +75,7 @@ public class GraphBackedTypeStore implements ITypeStore {
} }
@Override @Override
@GraphTransaction
public void store(TypeSystem typeSystem, ImmutableList<String> typeNames) throws MetadataException { public void store(TypeSystem typeSystem, ImmutableList<String> typeNames) throws MetadataException {
ImmutableList<String> coreTypes = typeSystem.getCoreTypes(); ImmutableList<String> coreTypes = typeSystem.getCoreTypes();
for (String typeName : typeNames) { for (String typeName : typeNames) {
......
...@@ -20,14 +20,19 @@ package org.apache.hadoop.metadata.services; ...@@ -20,14 +20,19 @@ package org.apache.hadoop.metadata.services;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Provider;
import org.apache.hadoop.metadata.GraphTransaction; import org.apache.hadoop.metadata.GraphTransaction;
import org.apache.hadoop.metadata.MetadataException; import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.MetadataServiceClient; import org.apache.hadoop.metadata.MetadataServiceClient;
import org.apache.hadoop.metadata.ParamChecker; import org.apache.hadoop.metadata.ParamChecker;
import org.apache.hadoop.metadata.RepositoryMetadataModule;
import org.apache.hadoop.metadata.classification.InterfaceAudience; import org.apache.hadoop.metadata.classification.InterfaceAudience;
import org.apache.hadoop.metadata.discovery.SearchIndexer; import org.apache.hadoop.metadata.discovery.SearchIndexer;
import org.apache.hadoop.metadata.listener.EntityChangeListener; import org.apache.hadoop.metadata.listener.EntityChangeListener;
import org.apache.hadoop.metadata.listener.TypesChangeListener; import org.apache.hadoop.metadata.repository.IndexCreationException;
import org.apache.hadoop.metadata.repository.IndexException;
import org.apache.hadoop.metadata.repository.MetadataRepository; import org.apache.hadoop.metadata.repository.MetadataRepository;
import org.apache.hadoop.metadata.repository.typestore.ITypeStore; import org.apache.hadoop.metadata.repository.typestore.ITypeStore;
import org.apache.hadoop.metadata.typesystem.ITypedReferenceableInstance; import org.apache.hadoop.metadata.typesystem.ITypedReferenceableInstance;
...@@ -53,6 +58,8 @@ import org.slf4j.LoggerFactory; ...@@ -53,6 +58,8 @@ import org.slf4j.LoggerFactory;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Singleton; import javax.inject.Singleton;
import java.io.IOException;
import java.text.ParseException;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -68,23 +75,23 @@ public class DefaultMetadataService implements MetadataService { ...@@ -68,23 +75,23 @@ public class DefaultMetadataService implements MetadataService {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(DefaultMetadataService.class); LoggerFactory.getLogger(DefaultMetadataService.class);
private final Set<TypesChangeListener> typesChangeListeners = new LinkedHashSet<>();
private final Set<EntityChangeListener> entityChangeListeners private final Set<EntityChangeListener> entityChangeListeners
= new LinkedHashSet<>(); = new LinkedHashSet<>();
private final TypeSystem typeSystem; private final TypeSystem typeSystem;
private final MetadataRepository repository; private final MetadataRepository repository;
private final ITypeStore typeStore; private final ITypeStore typeStore;
private final Provider<SearchIndexer> searchIndexProvider;
@Inject @Inject
DefaultMetadataService(MetadataRepository repository, DefaultMetadataService(MetadataRepository repository,
SearchIndexer searchIndexer, ITypeStore typeStore) throws MetadataException { Provider<SearchIndexer> searchIndexProvider, ITypeStore typeStore) throws MetadataException {
this.typeStore = typeStore; this.typeStore = typeStore;
this.typeSystem = TypeSystem.getInstance(); this.typeSystem = TypeSystem.getInstance();
this.repository = repository; this.repository = repository;
this.searchIndexProvider = searchIndexProvider;
restoreTypeSystem(); restoreTypeSystem();
registerListener(searchIndexer);
} }
private void restoreTypeSystem() { private void restoreTypeSystem() {
...@@ -146,19 +153,18 @@ public class DefaultMetadataService implements MetadataService { ...@@ -146,19 +153,18 @@ public class DefaultMetadataService implements MetadataService {
* @return a unique id for this type * @return a unique id for this type
*/ */
@Override @Override
@GraphTransaction
public JSONObject createType(String typeDefinition) throws MetadataException { public JSONObject createType(String typeDefinition) throws MetadataException {
ParamChecker.notEmpty(typeDefinition, "type definition cannot be empty"); ParamChecker.notEmpty(typeDefinition, "type definition cannot be empty");
TypesDef typesDef; TypesDef typesDef;
try { try {
typesDef = TypesSerialization.fromJson(typeDefinition); typesDef = TypesSerialization.fromJson(typeDefinition);
if(typesDef.isEmpty()) { if (typesDef.isEmpty()) {
throw new MetadataException("Invalid type definition"); throw new MetadataException("Invalid type definition");
} }
} catch (Exception e) { } catch (Exception e) {
LOG.error("Unable to deserialize json={}", typeDefinition, e); LOG.error("Unable to deserialize json={}", typeDefinition, e);
throw new IllegalArgumentException("Unable to deserialize json"); throw new IllegalArgumentException("Unable to deserialize json ", e);
} }
try { try {
...@@ -167,9 +173,9 @@ public class DefaultMetadataService implements MetadataService { ...@@ -167,9 +173,9 @@ public class DefaultMetadataService implements MetadataService {
try { try {
typeStore.store(typeSystem, ImmutableList.copyOf(typesAdded.keySet())); typeStore.store(typeSystem, ImmutableList.copyOf(typesAdded.keySet()));
onTypesAddedToRepo(typesAdded); onTypesAddedToRepo(typesAdded);
} catch(Throwable t) { } catch (Throwable t) {
typeSystem.removeTypes(ImmutableList.copyOf(typesAdded.keySet())); typeSystem.removeTypes(typesAdded);
throw new MetadataException(t); throw new MetadataException("Unable to persist types ", t);
} }
return new JSONObject() {{ return new JSONObject() {{
...@@ -177,7 +183,7 @@ public class DefaultMetadataService implements MetadataService { ...@@ -177,7 +183,7 @@ public class DefaultMetadataService implements MetadataService {
}}; }};
} catch (JSONException e) { } catch (JSONException e) {
LOG.error("Unable to create response for types={}", typeDefinition, e); LOG.error("Unable to create response for types={}", typeDefinition, e);
throw new MetadataException("Unable to create response"); throw new MetadataException("Unable to create response ", e);
} }
} }
...@@ -374,20 +380,18 @@ public class DefaultMetadataService implements MetadataService { ...@@ -374,20 +380,18 @@ public class DefaultMetadataService implements MetadataService {
} }
private void onTypesAddedToRepo(Map<String, IDataType> typesAdded) throws MetadataException { private void onTypesAddedToRepo(Map<String, IDataType> typesAdded) throws MetadataException {
for (TypesChangeListener listener : typesChangeListeners) { final SearchIndexer indexer = searchIndexProvider.get();
try {
for (Map.Entry<String, IDataType> entry : typesAdded.entrySet()) { for (Map.Entry<String, IDataType> entry : typesAdded.entrySet()) {
listener.onAdd(entry.getKey(), entry.getValue()); indexer.onAdd(entry.getKey(), entry.getValue());
}
} }
indexer.commit();
} catch(IndexCreationException ice) {
indexer.rollback();
throw ice;
} }
public void registerListener(TypesChangeListener listener) {
typesChangeListeners.add(listener);
} }
public void unregisterListener(TypesChangeListener listener) {
typesChangeListeners.remove(listener);
}
private void onEntityAddedToRepo(ITypedReferenceableInstance typedInstance) private void onEntityAddedToRepo(ITypedReferenceableInstance typedInstance)
throws MetadataException { throws MetadataException {
...@@ -418,4 +422,5 @@ public class DefaultMetadataService implements MetadataService { ...@@ -418,4 +422,5 @@ public class DefaultMetadataService implements MetadataService {
public void unregisterListener(EntityChangeListener listener) { public void unregisterListener(EntityChangeListener listener) {
entityChangeListeners.remove(listener); entityChangeListeners.remove(listener);
} }
} }
\ No newline at end of file
...@@ -6,9 +6,9 @@ ...@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* * <p/>
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* * <p/>
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
...@@ -19,11 +19,24 @@ ...@@ -19,11 +19,24 @@
package org.apache.hadoop.metadata.repository.graph; package org.apache.hadoop.metadata.repository.graph;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.thinkaurelius.titan.core.TitanFactory;
import com.thinkaurelius.titan.core.TitanGraph; import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.TitanIndexQuery; import com.thinkaurelius.titan.core.TitanIndexQuery;
import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
import com.thinkaurelius.titan.diskstorage.BackendException;
import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration;
import com.thinkaurelius.titan.diskstorage.configuration.ReadConfiguration;
import com.thinkaurelius.titan.diskstorage.configuration.backend.CommonsConfiguration;
import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
import com.tinkerpop.blueprints.Compare; import com.tinkerpop.blueprints.Compare;
import com.tinkerpop.blueprints.GraphQuery; import com.tinkerpop.blueprints.GraphQuery;
import com.tinkerpop.blueprints.Vertex; import com.tinkerpop.blueprints.Vertex;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.metadata.GraphTransaction;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.PropertiesUtil;
import org.apache.hadoop.metadata.RepositoryMetadataModule; import org.apache.hadoop.metadata.RepositoryMetadataModule;
import org.apache.hadoop.metadata.repository.Constants; import org.apache.hadoop.metadata.repository.Constants;
import org.apache.hadoop.metadata.typesystem.ITypedReferenceableInstance; import org.apache.hadoop.metadata.typesystem.ITypedReferenceableInstance;
...@@ -42,16 +55,20 @@ import org.apache.hadoop.metadata.typesystem.types.StructTypeDefinition; ...@@ -42,16 +55,20 @@ import org.apache.hadoop.metadata.typesystem.types.StructTypeDefinition;
import org.apache.hadoop.metadata.typesystem.types.TraitType; import org.apache.hadoop.metadata.typesystem.types.TraitType;
import org.apache.hadoop.metadata.typesystem.types.TypeSystem; import org.apache.hadoop.metadata.typesystem.types.TypeSystem;
import org.apache.hadoop.metadata.typesystem.types.utils.TypesUtil; import org.apache.hadoop.metadata.typesystem.types.utils.TypesUtil;
import org.junit.AfterClass;
import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice; import org.testng.annotations.Guice;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import javax.inject.Inject; import javax.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Random;
@Test @Test
@Guice(modules = RepositoryMetadataModule.class)
public class GraphRepoMapperScaleTest { public class GraphRepoMapperScaleTest {
private static final String DATABASE_TYPE = "hive_database_type"; private static final String DATABASE_TYPE = "hive_database_type";
...@@ -59,9 +76,39 @@ public class GraphRepoMapperScaleTest { ...@@ -59,9 +76,39 @@ public class GraphRepoMapperScaleTest {
private static final String TABLE_TYPE = "hive_table_type"; private static final String TABLE_TYPE = "hive_table_type";
private static final String TABLE_NAME = "bar"; private static final String TABLE_NAME = "bar";
@Inject private static final String INDEX_DIR = System.getProperty("java.io.tmpdir", "/tmp") + "/titan-schema-test" + new Random().nextLong();
private GraphProvider<TitanGraph> graphProvider;
@Inject private GraphProvider<TitanGraph> graphProvider = new GraphProvider<TitanGraph>() {
private TitanGraph graph = null;
//Ensure separate directory for graph provider to avoid issues with index merging
@Override
public TitanGraph get() {
try {
if (graph == null) {
synchronized (GraphRepoMapperScaleTest.class) {
if (graph == null) {
ReadConfiguration config = new CommonsConfiguration() {{
set("storage.backend", "inmemory");
set("index.search.directory", INDEX_DIR);
set("index.search.backend", "elasticsearch");
set("index.search.elasticsearch.local-mode", "true");
set("index.search.elasticsearch.client-only", "false");
}};
GraphDatabaseConfiguration graphconfig = new GraphDatabaseConfiguration(config);
graphconfig.getBackend().clearStorage();
graph = TitanFactory.open(config);
}
}
}
} catch (BackendException e) {
e.printStackTrace();
}
return graph;
}
};
private GraphBackedMetadataRepository repositoryService; private GraphBackedMetadataRepository repositoryService;
private GraphBackedSearchIndexer searchIndexer; private GraphBackedSearchIndexer searchIndexer;
...@@ -69,7 +116,11 @@ public class GraphRepoMapperScaleTest { ...@@ -69,7 +116,11 @@ public class GraphRepoMapperScaleTest {
private String dbGUID; private String dbGUID;
@BeforeClass @BeforeClass
@GraphTransaction
public void setUp() throws Exception { public void setUp() throws Exception {
//Make sure we can cleanup the index directory
repositoryService = new GraphBackedMetadataRepository(graphProvider);
searchIndexer = new GraphBackedSearchIndexer(graphProvider); searchIndexer = new GraphBackedSearchIndexer(graphProvider);
typeSystem = TypeSystem.getInstance(); typeSystem = TypeSystem.getInstance();
...@@ -77,6 +128,16 @@ public class GraphRepoMapperScaleTest { ...@@ -77,6 +128,16 @@ public class GraphRepoMapperScaleTest {
createHiveTypes(); createHiveTypes();
} }
@AfterClass
public void tearDown() throws Exception {
graphProvider.get().shutdown();
try {
FileUtils.deleteDirectory(new File(INDEX_DIR));
} catch(IOException ioe) {
System.err.println("Failed to cleanup index directory");
}
}
@Test @Test
public void testSubmitEntity() throws Exception { public void testSubmitEntity() throws Exception {
Referenceable databaseInstance = new Referenceable(DATABASE_TYPE); Referenceable databaseInstance = new Referenceable(DATABASE_TYPE);
...@@ -220,6 +281,7 @@ public class GraphRepoMapperScaleTest { ...@@ -220,6 +281,7 @@ public class GraphRepoMapperScaleTest {
for (Map.Entry<String, IDataType> entry : types.entrySet()) { for (Map.Entry<String, IDataType> entry : types.entrySet()) {
searchIndexer.onAdd(entry.getKey(), entry.getValue()); searchIndexer.onAdd(entry.getKey(), entry.getValue());
} }
searchIndexer.commit();
} }
private ITypedReferenceableInstance createHiveTableInstance( private ITypedReferenceableInstance createHiveTableInstance(
...@@ -270,3 +332,4 @@ public class GraphRepoMapperScaleTest { ...@@ -270,3 +332,4 @@ public class GraphRepoMapperScaleTest {
return tableType.convert(tableInstance, Multiplicity.REQUIRED); return tableType.convert(tableInstance, Multiplicity.REQUIRED);
} }
} }
...@@ -81,7 +81,6 @@ public class GraphBackedTypeStoreTest { ...@@ -81,7 +81,6 @@ public class GraphBackedTypeStoreTest {
} }
@Test (dependsOnMethods = "testStore") @Test (dependsOnMethods = "testStore")
@GraphTransaction
public void testRestore() throws Exception { public void testRestore() throws Exception {
TypesDef types = typeStore.restore(); TypesDef types = typeStore.restore();
......
...@@ -307,8 +307,13 @@ public class TypeSystem { ...@@ -307,8 +307,13 @@ public class TypeSystem {
return false; return false;
} }
public void removeTypes(ImmutableList<String> typeNames) { public void removeTypes(Map<String, IDataType> typeNames) {
for(String typeName : typeNames.keySet()) {
types.remove(typeName);
IDataType dataType = typeNames.get(typeName);
final DataTypes.TypeCategory typeCategory = dataType.getTypeCategory();
typeCategoriesToTypeNamesMap.get(typeCategory).remove(typeName);
}
} }
class TransientTypeSystem extends TypeSystem { class TransientTypeSystem extends TypeSystem {
......
...@@ -217,6 +217,9 @@ public class TypesJerseyResourceIT extends BaseResourceIT { ...@@ -217,6 +217,9 @@ public class TypesJerseyResourceIT extends BaseResourceIT {
ImmutableList.<String>of(), ImmutableList.<String>of(),
TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE), TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE),
TypesUtil.createRequiredAttrDef("description", DataTypes.STRING_TYPE), TypesUtil.createRequiredAttrDef("description", DataTypes.STRING_TYPE),
TypesUtil.createOptionalAttrDef("columnNames", DataTypes.arrayTypeName(DataTypes.STRING_TYPE)),
TypesUtil.createOptionalAttrDef("created", DataTypes.DATE_TYPE),
TypesUtil.createOptionalAttrDef("parameters", DataTypes.mapTypeName(DataTypes.STRING_TYPE, DataTypes.STRING_TYPE)),
TypesUtil.createRequiredAttrDef("type", DataTypes.STRING_TYPE), TypesUtil.createRequiredAttrDef("type", DataTypes.STRING_TYPE),
new AttributeDefinition("database", new AttributeDefinition("database",
"database", Multiplicity.REQUIRED, false, "database")); "database", Multiplicity.REQUIRED, false, "database"));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment