Commit f0f22765 by Venkatesh Seetharam

Add tests for search indexing. Contributed by Venkatesh Seetharam

parent 4301264e
......@@ -156,7 +156,6 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@Override
public List<String> getEntityList(String entityType) throws RepositoryException {
LOG.info("Retrieving entity list for type={}", entityType);
// todo - replace this with index based query
GraphQuery query = graphService.getBlueprintsGraph().query()
.has(Constants.ENTITY_TYPE_PROPERTY_KEY, entityType);
Iterator<Vertex> results = query.vertices().iterator();
......@@ -171,23 +170,6 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
}
return entityList;
/*
TitanIndexQuery query = titanGraph.indexQuery(Constants.VERTEX_INDEX,
"v." + Constants.ENTITY_TYPE_PROPERTY_KEY + ":(" + entityType + ")");
Iterator<TitanIndexQuery.Result<Vertex>> results = query.vertices().iterator();
if (!results.hasNext()) {
return Collections.emptyList();
}
ArrayList<String> entityList = new ArrayList<>();
while (results.hasNext()) {
Vertex vertex = results.next().getElement();
entityList.add(vertex.<String>getProperty(Constants.GUID_PROPERTY_KEY));
}
return entityList;
*/
}
private final class EntityProcessor implements ObjectGraphWalker.NodeProcessor {
......
......@@ -70,23 +70,20 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
}
LOG.info("Indexes do not exist, Creating indexes for titanGraph.");
try {
management.buildIndex(Constants.VERTEX_INDEX, Vertex.class)
.buildMixedIndex(Constants.BACKING_INDEX);
management.buildIndex(Constants.EDGE_INDEX, Edge.class)
.buildMixedIndex(Constants.BACKING_INDEX);
// create a composite index for guid as its unique
createCompositeIndex(management, Constants.GUID_INDEX,
Constants.GUID_PROPERTY_KEY, String.class, true);
// create a composite and mixed index for type since it can be combined with other keys
createCompositeIndex(management, Constants.ENTITY_TYPE_INDEX,
Constants.ENTITY_TYPE_PROPERTY_KEY, String.class, false);
createVertexMixedIndex(management, Constants.ENTITY_TYPE_PROPERTY_KEY, String.class);
} finally {
management.commit();
}
management.buildIndex(Constants.VERTEX_INDEX, Vertex.class)
.buildMixedIndex(Constants.BACKING_INDEX);
management.buildIndex(Constants.EDGE_INDEX, Edge.class)
.buildMixedIndex(Constants.BACKING_INDEX);
management.commit();
// create a composite index for guid as its unique
createCompositeIndex(Constants.GUID_INDEX,
Constants.GUID_PROPERTY_KEY, String.class, true);
// create a composite and mixed index for type since it can be combined with other keys
createCompositeIndex(Constants.ENTITY_TYPE_INDEX,
Constants.ENTITY_TYPE_PROPERTY_KEY, String.class, false);
createVertexMixedIndex(Constants.ENTITY_TYPE_PROPERTY_KEY, String.class);
LOG.info("Index creation for global keys complete.");
}
......@@ -102,19 +99,18 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
public void onAdd(String typeName, IDataType dataType) throws MetadataException {
LOG.info("Creating indexes for type name={}, definition={}", typeName, dataType);
TitanManagement management = titanGraph.getManagementSystem();
try {
addIndexForType(management, dataType);
management.commit();
addIndexForType(dataType);
LOG.info("Index creation for type {} complete", typeName);
} catch (Exception e) {
LOG.error("Error creating index for type {}", dataType, e);
management.rollback();
} catch (Throwable throwable) {
// gets handle to currently open transaction
titanGraph.getManagementSystem().rollback();
LOG.error("Error creating index for type {}", dataType, throwable);
}
}
private void addIndexForType(TitanManagement management, IDataType dataType) {
private void addIndexForType(IDataType dataType) {
switch (dataType.getTypeCategory()) {
case PRIMITIVE:
case ENUM:
......@@ -125,17 +121,17 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
case STRUCT:
StructType structType = (StructType) dataType;
createIndexForFields(management, structType, structType.fieldMapping().fields);
createIndexForFields(structType, structType.fieldMapping().fields);
break;
case TRAIT:
TraitType traitType = (TraitType) dataType;
createIndexForFields(management, traitType, traitType.fieldMapping().fields);
createIndexForFields(traitType, traitType.fieldMapping().fields);
break;
case CLASS:
ClassType classType = (ClassType) dataType;
createIndexForFields(management, classType, classType.fieldMapping().fields);
createIndexForFields(classType, classType.fieldMapping().fields);
break;
default:
......@@ -143,37 +139,35 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
}
}
private void createIndexForFields(TitanManagement management,
IDataType dataType, Map<String, AttributeInfo> fields) {
private void createIndexForFields(IDataType dataType, Map<String, AttributeInfo> fields) {
for (AttributeInfo field : fields.values()) {
if (field.isIndexable) {
createIndexForAttribute(management, dataType.getName(), field);
createIndexForAttribute(dataType.getName(), field);
}
}
}
private void createIndexForAttribute(TitanManagement management,
String typeName, AttributeInfo field) {
private void createIndexForAttribute(String typeName, AttributeInfo field) {
final String propertyName = typeName + "." + field.name;
switch (field.dataType().getTypeCategory()) {
case PRIMITIVE:
createVertexMixedIndex(
management, propertyName, getPrimitiveClass(field.dataType()));
propertyName, getPrimitiveClass(field.dataType()));
break;
case ENUM:
createVertexMixedIndex(management, propertyName, Integer.class);
createVertexMixedIndex(propertyName, Integer.class);
break;
case ARRAY:
case MAP:
// index the property holder for element names
createVertexMixedIndex(management, propertyName, String.class);
createVertexMixedIndex(propertyName, String.class);
break;
case STRUCT:
StructType structType = (StructType) field.dataType();
createIndexForFields(management, structType, structType.fieldMapping().fields);
createIndexForFields(structType, structType.fieldMapping().fields);
break;
case TRAIT:
......@@ -182,7 +176,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
case CLASS:
// this is only A reference, index the attribute for edge
createEdgeMixedIndex(management, propertyName);
createEdgeMixedIndex(propertyName);
break;
default:
......@@ -216,9 +210,9 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
throw new IllegalArgumentException("unknown data type " + dataType);
}
private static PropertyKey createCompositeIndex(TitanManagement management, String indexName,
String propertyName, Class propertyClass,
boolean isUnique) {
private PropertyKey createCompositeIndex(String indexName, String propertyName,
Class propertyClass, boolean isUnique) {
TitanManagement management = titanGraph.getManagementSystem();
PropertyKey propertyKey = management.getPropertyKey(propertyName);
if (propertyKey == null) {
propertyKey = management
......@@ -235,30 +229,39 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
}
indexBuilder.buildCompositeIndex();
management.commit();
}
LOG.info("Created index for property {} in composite index {}", propertyName, indexName);
return propertyKey;
}
private static PropertyKey createVertexMixedIndex(TitanManagement management,
String propertyName, Class propertyClass) {
private PropertyKey createVertexMixedIndex(String propertyName, Class propertyClass) {
TitanManagement management = titanGraph.getManagementSystem();
PropertyKey propertyKey = management.getPropertyKey(propertyName);
if (propertyKey == null) {
propertyKey = management
.makePropertyKey(propertyName)
.dataType(propertyClass)
.make();
}
TitanGraphIndex vertexIndex = management.getGraphIndex(Constants.VERTEX_INDEX);
management.addIndexKey(vertexIndex, propertyKey);
TitanGraphIndex vertexIndex = management.getGraphIndex(Constants.VERTEX_INDEX);
management.addIndexKey(vertexIndex, propertyKey);
management.commit();
LOG.info("Created mixed vertex index for property {}", propertyName);
}
return propertyKey;
}
private static void createEdgeMixedIndex(TitanManagement management,
String propertyName) {
EdgeLabel edgeLabel = management.makeEdgeLabel(propertyName).make();
management.buildEdgeIndex(edgeLabel, propertyName, Direction.BOTH, Order.DEFAULT);
private void createEdgeMixedIndex(String propertyName) {
TitanManagement management = titanGraph.getManagementSystem();
EdgeLabel edgeLabel = management.getEdgeLabel(propertyName);
if (edgeLabel == null) {
edgeLabel = management.makeEdgeLabel(propertyName).make();
management.buildEdgeIndex(edgeLabel, propertyName, Direction.BOTH, Order.DEFAULT);
management.commit();
LOG.info("Created index for edge label {}", propertyName);
}
}
}
......@@ -115,6 +115,7 @@ public class TitanGraphService implements GraphService {
* @throws ConfigurationException
*/
// TODO move this functionality to the SearchIndexer?
@Deprecated
protected void createIndicesForVertexKeys() throws ConfigurationException {
if (!titanGraph.getIndexedKeys(Vertex.class).isEmpty()) {
LOG.info("Indexes already exist for titanGraph");
......
......@@ -74,6 +74,11 @@
<appender-ref ref="FILE"/>
</logger>
<logger name="org.elasticsearch" additivity="false">
<level value="warn"/>
<appender-ref ref="FILE"/>
</logger>
<root>
<priority value="debug"/>
<appender-ref ref="console"/>
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.repository.graph;
import com.google.common.collect.ImmutableList;
import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.TitanIndexQuery;
import com.tinkerpop.blueprints.Compare;
import com.tinkerpop.blueprints.GraphQuery;
import com.tinkerpop.blueprints.Vertex;
import org.apache.hadoop.metadata.ITypedReferenceableInstance;
import org.apache.hadoop.metadata.Referenceable;
import org.apache.hadoop.metadata.RepositoryMetadataModule;
import org.apache.hadoop.metadata.Struct;
import org.apache.hadoop.metadata.types.AttributeDefinition;
import org.apache.hadoop.metadata.types.ClassType;
import org.apache.hadoop.metadata.types.DataTypes;
import org.apache.hadoop.metadata.types.EnumType;
import org.apache.hadoop.metadata.types.EnumTypeDefinition;
import org.apache.hadoop.metadata.types.EnumValue;
import org.apache.hadoop.metadata.types.HierarchicalTypeDefinition;
import org.apache.hadoop.metadata.types.IDataType;
import org.apache.hadoop.metadata.types.Multiplicity;
import org.apache.hadoop.metadata.types.StructTypeDefinition;
import org.apache.hadoop.metadata.types.TraitType;
import org.apache.hadoop.metadata.types.TypeSystem;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Map;
@Test
@Guice(modules = RepositoryMetadataModule.class)
public class GraphRepoMapperScaleTest {
private static final String DATABASE_TYPE = "hive_database_type";
private static final String DATABASE_NAME = "foo";
private static final String TABLE_TYPE = "hive_table_type";
private static final String TABLE_NAME = "bar";
@Inject
private TitanGraphService titanGraphService;
@Inject
private GraphBackedMetadataRepository repositoryService;
private GraphBackedSearchIndexer searchIndexer;
private TypeSystem typeSystem;
private String dbGUID;
@BeforeClass
public void setUp() throws Exception {
// start the injected graph service
titanGraphService.start();
// start the injected repository service
repositoryService.start();
searchIndexer = new GraphBackedSearchIndexer(titanGraphService);
typeSystem = TypeSystem.getInstance();
createHiveTypes();
}
@Test
public void testSubmitEntity() throws Exception {
Referenceable databaseInstance = new Referenceable(DATABASE_TYPE);
databaseInstance.set("name", DATABASE_NAME);
databaseInstance.set("description", "foo database");
// System.out.println("databaseInstance = " + databaseInstance);
ClassType dbType = typeSystem.getDataType(ClassType.class, DATABASE_TYPE);
ITypedReferenceableInstance db = dbType.convert(databaseInstance, Multiplicity.REQUIRED);
dbGUID = repositoryService.createEntity(db, DATABASE_TYPE);
Referenceable dbInstance = new Referenceable(
dbGUID, DATABASE_TYPE, databaseInstance.getValuesMap());
for (int index = 0; index < 1000; index ++) {
ITypedReferenceableInstance table = createHiveTableInstance(dbInstance, index);
repositoryService.createEntity(table, TABLE_TYPE);
}
}
@Test (dependsOnMethods = "testSubmitEntity")
public void testSearchIndex() throws Exception {
searchWithOutIndex(Constants.GUID_PROPERTY_KEY, dbGUID);
searchWithOutIndex(Constants.ENTITY_TYPE_PROPERTY_KEY, "hive_column_type");
searchWithOutIndex(Constants.ENTITY_TYPE_PROPERTY_KEY, TABLE_TYPE);
searchWithOutIndex("hive_table_type.name", "bar-999");
searchWithIndex("hive_table_type.name", "bar-999");
for (int index = 500; index < 600; index ++) {
searchWithIndex("hive_table_type.name", "bar-" + index);
}
}
private void searchWithOutIndex(String key, String value) {
TitanGraph graph = titanGraphService.getTitanGraph();
long start = System.currentTimeMillis();
int count = 0;
try {
GraphQuery query = graph.query()
.has(key, Compare.EQUAL, value);
for (Vertex ignored : query.vertices()) {
count++;
}
} finally {
System.out.println("Search on [" + key + "=" + value + "] returned results: " + count
+ ", took " + (System.currentTimeMillis() - start) + " ms");
}
}
private void searchWithIndex(String key, String value) {
TitanGraph graph = titanGraphService.getTitanGraph();
long start = System.currentTimeMillis();
int count = 0;
try {
String queryString = "v.\"" + key + "\":(" + value + ")";
TitanIndexQuery query = graph.indexQuery(Constants.VERTEX_INDEX, queryString);
for (TitanIndexQuery.Result<Vertex> ignored : query.vertices()) {
count++;
}
} finally {
System.out.println("Search on [" + key + "=" + value + "] returned results: " + count
+ ", took " + (System.currentTimeMillis() - start) + " ms");
}
}
private void createHiveTypes() throws Exception {
HierarchicalTypeDefinition<ClassType> databaseTypeDefinition =
createClassTypeDef(DATABASE_TYPE,
ImmutableList.<String>of(),
createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE),
createRequiredAttrDef("description", DataTypes.STRING_TYPE));
StructTypeDefinition structTypeDefinition =
new StructTypeDefinition("hive_serde_type",
new AttributeDefinition[] {
createRequiredAttrDef("name", DataTypes.STRING_TYPE),
createRequiredAttrDef("serde", DataTypes.STRING_TYPE)
});
EnumValue values[] = {
new EnumValue("MANAGED", 1),
new EnumValue("EXTERNAL", 2),
};
EnumTypeDefinition enumTypeDefinition = new EnumTypeDefinition("table_type", values);
EnumType enumType = typeSystem.defineEnumType(enumTypeDefinition);
searchIndexer.onAdd("table_type", enumType);
HierarchicalTypeDefinition<ClassType> columnsDefinition =
createClassTypeDef("hive_column_type",
ImmutableList.<String>of(),
createRequiredAttrDef("name", DataTypes.STRING_TYPE),
createRequiredAttrDef("type", DataTypes.STRING_TYPE));
StructTypeDefinition partitionDefinition =
new StructTypeDefinition("hive_partition_type",
new AttributeDefinition[] {
createRequiredAttrDef("name", DataTypes.STRING_TYPE),
});
HierarchicalTypeDefinition<ClassType> tableTypeDefinition =
createClassTypeDef(TABLE_TYPE,
ImmutableList.<String>of(),
createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE),
createRequiredAttrDef("description", DataTypes.STRING_TYPE),
createRequiredAttrDef("type", DataTypes.STRING_TYPE),
// enum
new AttributeDefinition("tableType", "table_type",
Multiplicity.REQUIRED, false, null),
// array of strings
new AttributeDefinition("columnNames",
String.format("array<%s>", DataTypes.STRING_TYPE.getName()),
Multiplicity.COLLECTION, false, null),
// array of classes
new AttributeDefinition("columns",
String.format("array<%s>", "hive_column_type"),
Multiplicity.COLLECTION, true, null),
// array of structs
new AttributeDefinition("partitions",
String.format("array<%s>", "hive_partition_type"),
Multiplicity.COLLECTION, true, null),
// struct reference
new AttributeDefinition("serde1",
"hive_serde_type", Multiplicity.REQUIRED, false, null),
new AttributeDefinition("serde2",
"hive_serde_type", Multiplicity.REQUIRED, false, null),
// class reference
new AttributeDefinition("database",
DATABASE_TYPE, Multiplicity.REQUIRED, true, null));
HierarchicalTypeDefinition<TraitType> classificationTypeDefinition =
createTraitTypeDef("pii_type", ImmutableList.<String>of());
Map<String, IDataType> types = typeSystem.defineTypes(
ImmutableList.of(structTypeDefinition, partitionDefinition),
ImmutableList.of(classificationTypeDefinition),
ImmutableList.of(databaseTypeDefinition, columnsDefinition, tableTypeDefinition));
for (Map.Entry<String, IDataType> entry : types.entrySet()) {
searchIndexer.onAdd(entry.getKey(), entry.getValue());
}
}
private ITypedReferenceableInstance createHiveTableInstance(
Referenceable databaseInstance, int uberIndex) throws Exception {
Referenceable tableInstance = new Referenceable(TABLE_TYPE, "pii_type");
tableInstance.set("name", TABLE_NAME + "-" + uberIndex);
tableInstance.set("description", "bar table" + "-" + uberIndex);
tableInstance.set("type", "managed");
tableInstance.set("tableType", 1); // enum
// refer to an existing class
tableInstance.set("database", databaseInstance);
ArrayList<String> columnNames = new ArrayList<>();
columnNames.add("first_name" + "-" + uberIndex);
columnNames.add("last_name" + "-" + uberIndex);
tableInstance.set("columnNames", columnNames);
Struct serde1Instance = new Struct("hive_serde_type");
serde1Instance.set("name", "serde1" + "-" + uberIndex);
serde1Instance.set("serde", "serde1" + "-" + uberIndex);
tableInstance.set("serde1", serde1Instance);
Struct serde2Instance = new Struct("hive_serde_type");
serde2Instance.set("name", "serde2" + "-" + uberIndex);
serde2Instance.set("serde", "serde2" + "-" + uberIndex);
tableInstance.set("serde2", serde2Instance);
ArrayList<Referenceable> columns = new ArrayList<>();
for (int index = 0; index < 5; index++) {
Referenceable columnInstance = new Referenceable("hive_column_type");
columnInstance.set("name", "column_" + "-" + uberIndex + "-" + index);
columnInstance.set("type", "string");
columns.add(columnInstance);
}
tableInstance.set("columns", columns);
ArrayList<Struct> partitions = new ArrayList<>();
for (int index = 0; index < 5; index++) {
Struct partitionInstance = new Struct("hive_partition_type");
partitionInstance.set("name", "partition_" + "-" + uberIndex + "-" + index);
partitions.add(partitionInstance);
}
tableInstance.set("partitions", partitions);
ClassType tableType = typeSystem.getDataType(ClassType.class, TABLE_TYPE);
return tableType.convert(tableInstance, Multiplicity.REQUIRED);
}
protected AttributeDefinition createUniqueRequiredAttrDef(String name,
IDataType dataType) {
return new AttributeDefinition(name, dataType.getName(),
Multiplicity.REQUIRED, false, true, true, null);
}
protected AttributeDefinition createRequiredAttrDef(String name,
IDataType dataType) {
return new AttributeDefinition(name, dataType.getName(),
Multiplicity.REQUIRED, false, null);
}
@SuppressWarnings("unchecked")
protected HierarchicalTypeDefinition<TraitType> createTraitTypeDef(
String name, ImmutableList<String> superTypes, AttributeDefinition... attrDefs) {
return new HierarchicalTypeDefinition(TraitType.class, name, superTypes, attrDefs);
}
@SuppressWarnings("unchecked")
protected HierarchicalTypeDefinition<ClassType> createClassTypeDef(
String name, ImmutableList<String> superTypes, AttributeDefinition... attrDefs) {
return new HierarchicalTypeDefinition(ClassType.class, name, superTypes, attrDefs);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment