Commit d8d9da29 by Venkatesh Seetharam

Fix Array type mapping with tests. Contributed by Venkatesh Seetharam

parent 22e8a750
......@@ -18,12 +18,12 @@
package org.apache.hadoop.metadata.repository.graph;
import com.google.common.collect.ImmutableCollection;
import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.TitanProperty;
import com.thinkaurelius.titan.core.TitanVertex;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.GraphQuery;
import com.tinkerpop.blueprints.TransactionalGraph;
import com.tinkerpop.blueprints.Vertex;
......@@ -129,18 +129,14 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
final TransactionalGraph transactionalGraph = graphService.getTransactionalGraph();
try {
// todo check if this is a duplicate
transactionalGraph.rollback();
return instanceToGraphMapper.mapTypedInstanceToGraph(typedInstance);
final String guid = instanceToGraphMapper.mapTypedInstanceToGraph(typedInstance);
transactionalGraph.commit(); // commit if there are no errors
return guid;
} catch (MetadataException e) {
transactionalGraph.rollback();
throw new RepositoryException(e);
} finally {
transactionalGraph.commit();
GraphHelper.dumpToLog(transactionalGraph);
}
}
......@@ -148,9 +144,10 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
public ITypedReferenceableInstance getEntityDefinition(String guid) throws RepositoryException {
LOG.info("Retrieving entity with guid={}", guid);
final Graph graph = graphService.getBlueprintsGraph();
final TransactionalGraph transactionalGraph = graphService.getTransactionalGraph();
try {
Vertex instanceVertex = GraphHelper.findVertexByGUID(graph, guid);
transactionalGraph.rollback(); // clean up before starting a query
Vertex instanceVertex = GraphHelper.findVertexByGUID(transactionalGraph, guid);
if (instanceVertex == null) {
LOG.debug("Could not find a vertex for guid {}", guid);
return null;
......@@ -161,8 +158,6 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
} catch (Exception e) {
throw new RepositoryException(e);
} finally {
GraphHelper.dumpToLog(graph);
}
}
......@@ -404,7 +399,6 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
LOG.debug("Mapping instance {} to vertex {} for fields {}",
typedInstance.getTypeName(), instanceVertex, fields);
for (AttributeInfo attributeInfo : fields.values()) {
LOG.debug("mapping attributeInfo {}", attributeInfo);
final IDataType dataType = attributeInfo.dataType();
mapAttributesToVertex(id, typedInstance, instanceVertex,
idToVertexMap, attributeInfo, dataType);
......@@ -416,6 +410,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
Map<Id, Vertex> idToVertexMap,
AttributeInfo attributeInfo,
IDataType dataType) throws MetadataException {
LOG.debug("mapping attributeInfo {}", attributeInfo);
final String propertyName = typedInstance.getTypeName() + "." + attributeInfo.name;
switch (dataType.getTypeCategory()) {
......@@ -429,15 +424,19 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
break;
case ARRAY:
// todo - Add to/from json for collections
mapArrayCollectionToVertex(
id, typedInstance, instanceVertex, attributeInfo, idToVertexMap);
break;
case MAP:
// todo - Add to/from json for collections
/*
mapMapCollectionToVertex(
id, typedInstance, instanceVertex, attributeInfo, idToVertexMap);
*/
break;
case STRUCT:
Vertex structInstanceVertex = mapStructInstanceToVertex(id,
Vertex structInstanceVertex = mapStructInstanceToVertex(id, typedInstance,
(ITypedStruct) typedInstance.get(attributeInfo.name),
attributeInfo, idToVertexMap);
// add an edge to the newly created vertex from the parent
......@@ -451,7 +450,8 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
case CLASS:
Id referenceId = (Id) typedInstance.get(attributeInfo.name);
mapClassReferenceAsEdge(
instanceVertex, idToVertexMap, propertyName, referenceId);
instanceVertex, typedInstance, idToVertexMap, propertyName, referenceId,
attributeInfo.dataType().getName());
break;
default:
......@@ -459,14 +459,83 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
}
}
private void mapClassReferenceAsEdge(Vertex instanceVertex, Map<Id, Vertex> idToVertexMap,
String propertyKey, Id id) {
private void mapArrayCollectionToVertex(Id id, ITypedInstance typedInstance,
Vertex instanceVertex,
AttributeInfo attributeInfo,
Map<Id, Vertex> idToVertexMap) throws MetadataException {
LOG.debug("Mapping instance {} to vertex {} for name {}",
typedInstance.getTypeName(), instanceVertex, attributeInfo.name);
ImmutableCollection list = (ImmutableCollection) typedInstance.get(attributeInfo.name);
if (list == null) {
return;
}
String propertyName = typedInstance.getTypeName() + "." + attributeInfo.name;
// todo: move this to the indexer
GraphHelper.createPropertyKey(titanGraph.getManagementSystem(), propertyName);
IDataType elementType = ((DataTypes.ArrayType) attributeInfo.dataType()).getElemType();
StringBuilder buffer = new StringBuilder();
Object[] array = list.toArray();
for (int index = 0; index < array.length; index++) {
String propertyNameWithSuffix = propertyName + "." + index;
buffer.append(propertyNameWithSuffix).append(",");
final Object value = array[index];
switch (elementType.getTypeCategory()) {
case PRIMITIVE:
instanceVertex.setProperty(propertyNameWithSuffix, value);
break;
case ENUM:
instanceVertex.setProperty(propertyNameWithSuffix, value);
break;
case ARRAY:
case MAP:
case TRAIT:
// do nothing
break;
case STRUCT:
Vertex structInstanceVertex = mapStructInstanceToVertex(id, typedInstance,
(ITypedStruct) value, attributeInfo, idToVertexMap);
// add an edge to the newly created vertex from the parent
GraphHelper.addEdge(instanceVertex, structInstanceVertex,
propertyNameWithSuffix);
break;
case CLASS:
Id referenceId = (Id) value;
mapClassReferenceAsEdge(
instanceVertex, typedInstance, idToVertexMap,
propertyNameWithSuffix, referenceId, elementType.getName());
break;
default:
break;
}
}
buffer.setLength(buffer.length() - 1);
// for dereference on way out
instanceVertex.setProperty(propertyName, buffer.toString());
}
private void mapClassReferenceAsEdge(Vertex instanceVertex, ITypedInstance typedInstance,
Map<Id, Vertex> idToVertexMap,
String propertyKey, Id id, String typeName)
throws MetadataException {
if (id != null) {
Vertex referenceVertex;
if (id.isAssigned()) {
referenceVertex = GraphHelper.findVertexByGUID(titanGraph, id.id);
} else {
referenceVertex = idToVertexMap.get(id);
ClassType classType = typeSystem.getDataType(ClassType.class, typeName);
mapInstanceToVertex(id, typedInstance, referenceVertex,
classType.fieldMapping().fields, idToVertexMap);
}
if (referenceVertex != null) {
......@@ -478,7 +547,19 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
}
}
private Vertex mapStructInstanceToVertex(Id id, ITypedStruct structInstance,
/*
private void mapMapCollectionToVertex(ITypedInstance typedInstance,
Vertex instanceVertex,
AttributeInfo attributeInfo,
Map<Id, Vertex> idToVertexMap) throws MetadataException {
DataTypes.MapType collection = (DataTypes.MapType) typedInstance.get(attributeInfo.name);
ImmutableMap map = collection.convert(collection, Multiplicity.COLLECTION);
}
*/
private Vertex mapStructInstanceToVertex(Id id, ITypedInstance typedInstance,
ITypedStruct structInstance,
AttributeInfo attributeInfo,
Map<Id, Vertex> idToVertexMap) throws MetadataException {
// add a new vertex for the struct or trait instance
......@@ -555,23 +636,6 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
}
}
/*
private PropertyKey getPropertyKey(DataTypes.ArrayType arrayType, String propertyName) {
PropertyKey propertyKey = titanGraph.getPropertyKey(propertyName);
if (propertyKey == null) { //First time we have seen it, let's define it
propertyKey = management.makePropertyKey(propertyName)
.dataType(arrayType.getElemType().getClass())
.cardinality(Cardinality.LIST)
.make();
management.buildIndex("by" + propertyName, Vertex.class)
.addKey(propertyKey)
.buildCompositeIndex();
}
return propertyKey;
}
*/
private final class GraphToTypedInstanceMapper {
private ITypedReferenceableInstance mapGraphToTypedInstance(String guid,
......@@ -630,7 +694,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
break;
case ARRAY:
// todo - Add to/from json for collections
mapVertexToArrayInstance(instanceVertex, typedInstance, attributeInfo);
break;
case MAP:
......@@ -650,33 +714,117 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
// todo - use ObjectWalker here instead else it can be an infinite loop
// for cross references
String relationshipLabel = typedInstance.getTypeName() + "." + attributeInfo.name;
LOG.debug("Finding edge for {} -> label {} ", instanceVertex, relationshipLabel);
for (Edge edge : instanceVertex.getEdges(Direction.OUT, relationshipLabel)) {
final Vertex referenceVertex = edge.getVertex(Direction.IN);
if (referenceVertex != null) {
final String guid = referenceVertex.getProperty(Constants.GUID_PROPERTY_KEY);
LOG.debug("Found vertex {} for label {} with guid {}",
referenceVertex, relationshipLabel, guid);
if (attributeInfo.isComposite) {
LOG.debug("Found composite, mapping vertex to instance");
typedInstance.set(attributeInfo.name,
mapGraphToTypedInstance(guid, referenceVertex));
} else {
Id referenceId = new Id(guid,
referenceVertex.<Integer>getProperty(Constants.VERSION_PROPERTY_KEY),
attributeInfo.name);
LOG.debug("Found non-composite, adding id {} ", referenceId);
typedInstance.set(attributeInfo.name, referenceId);
}
break;
}
}
Object idOrInstance = mapClassReferenceToVertex(
instanceVertex, attributeInfo, relationshipLabel);
typedInstance.set(attributeInfo.name, idOrInstance);
break;
default:
break;
}
}
}
private Object mapClassReferenceToVertex(Vertex instanceVertex,
AttributeInfo attributeInfo,
String relationshipLabel) throws MetadataException {
LOG.debug("Finding edge for {} -> label {} ", instanceVertex, relationshipLabel);
Iterator<Edge> results = instanceVertex.getEdges(
Direction.OUT, relationshipLabel).iterator();
if (results.hasNext()) {
final Vertex referenceVertex = results.next().getVertex(Direction.IN);
if (referenceVertex != null) {
final String guid = referenceVertex.getProperty(Constants.GUID_PROPERTY_KEY);
LOG.debug("Found vertex {} for label {} with guid {}",
referenceVertex, relationshipLabel, guid);
if (attributeInfo.isComposite) {
LOG.debug("Found composite, mapping vertex to instance");
return mapGraphToTypedInstance(guid, referenceVertex);
} else {
Id referenceId = new Id(guid,
referenceVertex.<Integer>getProperty(Constants.VERSION_PROPERTY_KEY),
attributeInfo.name);
LOG.debug("Found non-composite, adding id {} ", referenceId);
return referenceId;
}
}
}
return null;
}
private void mapVertexToArrayInstance(Vertex instanceVertex, ITypedInstance typedInstance,
AttributeInfo attributeInfo) throws MetadataException {
LOG.debug("mapping vertex {} to array {}", instanceVertex, attributeInfo.name);
DataTypes.ArrayType arrayType = (DataTypes.ArrayType) attributeInfo.dataType();
String propertyName = typedInstance.getTypeName() + "." + attributeInfo.name;
String keys = instanceVertex.getProperty(propertyName);
ArrayList values = new ArrayList();
for (String propertyNameWithSuffix : keys.split(",")) {
switch (arrayType.getElemType().getTypeCategory()) {
case PRIMITIVE:
values.add(instanceVertex.getProperty(propertyNameWithSuffix));
break;
case ENUM:
values.add(instanceVertex.<Integer>getProperty(propertyNameWithSuffix));
break;
case ARRAY:
case MAP:
case TRAIT:
// do nothing
break;
case STRUCT:
ITypedStruct structInstance = getStructInstanceFromVertex(instanceVertex,
arrayType.getElemType(), attributeInfo.name, propertyNameWithSuffix);
values.add(structInstance);
break;
case CLASS:
Object idOrInstance = mapClassReferenceToVertex(
instanceVertex, attributeInfo, propertyNameWithSuffix);
values.add(idOrInstance);
break;
default:
break;
}
}
typedInstance.set(attributeInfo.name, values);
}
private ITypedStruct getStructInstanceFromVertex(Vertex instanceVertex,
IDataType elemType,
String attributeName,
String relationshipLabel) throws MetadataException {
LOG.debug("Finding edge for {} -> label {} ", instanceVertex, relationshipLabel);
Iterator<Edge> results = instanceVertex.getEdges(
Direction.OUT, relationshipLabel).iterator();
Edge edge = results.hasNext() ? results.next() : null;
if (edge == null) {
return null;
}
Vertex structInstanceVertex = edge.getVertex(Direction.IN);
LOG.debug("mapping vertex {} to struct {}", structInstanceVertex, attributeName);
if (structInstanceVertex != null) {
LOG.debug("Found struct instance vertex {}, mapping to instance {} ",
structInstanceVertex, elemType.getName());
StructType structType = typeSystem.getDataType(StructType.class, elemType.getName());
ITypedStruct structInstance = structType.createInstance();
mapVertexToInstance(structInstanceVertex, structInstance,
structType.fieldMapping().fields);
return structInstance;
}
return null;
}
private void mapVertexToStructInstance(Vertex instanceVertex,
......
......@@ -18,6 +18,9 @@
package org.apache.hadoop.metadata.repository.graph;
import com.thinkaurelius.titan.core.Cardinality;
import com.thinkaurelius.titan.core.PropertyKey;
import com.thinkaurelius.titan.core.schema.TitanManagement;
import com.tinkerpop.blueprints.Compare;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
......@@ -88,6 +91,23 @@ public final class GraphHelper {
return results.hasNext() ? results.next() : null;
}
public static PropertyKey createPropertyKey(TitanManagement management, String propertyName) {
PropertyKey propertyKey = management.getPropertyKey(propertyName);
if (propertyKey == null) {
propertyKey = management
.makePropertyKey(propertyName)
.dataType(String.class)
.cardinality(Cardinality.SET)
.make();
management.buildIndex("by" + propertyName, Vertex.class)
.addKey(propertyKey)
.buildCompositeIndex();
}
return propertyKey;
}
public static String vertexString(final Vertex vertex) {
StringBuilder properties = new StringBuilder();
for (String propertyKey : vertex.getPropertyKeys()) {
......
......@@ -20,7 +20,10 @@ package org.apache.hadoop.metadata.repository.graph;
import com.google.common.collect.ImmutableList;
import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.TitanVertex;
import com.tinkerpop.blueprints.Compare;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.GraphQuery;
import com.tinkerpop.blueprints.Vertex;
import org.apache.hadoop.metadata.ITypedReferenceableInstance;
import org.apache.hadoop.metadata.Referenceable;
......@@ -37,11 +40,15 @@ import org.apache.hadoop.metadata.types.Multiplicity;
import org.apache.hadoop.metadata.types.StructTypeDefinition;
import org.apache.hadoop.metadata.types.TraitType;
import org.apache.hadoop.metadata.types.TypeSystem;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Iterator;
@Test
@Guice(modules = RepositoryMetadataModule.class)
......@@ -53,9 +60,9 @@ public class GraphRepoMapperTest {
private static final String TABLE_NAME = "bar";
@Inject
TitanGraphService titanGraphService;
private TitanGraphService titanGraphService;
@Inject
GraphBackedMetadataRepository repositoryService;
private GraphBackedMetadataRepository repositoryService;
private TypeSystem typeSystem;
......@@ -71,6 +78,11 @@ public class GraphRepoMapperTest {
createHiveTypes();
}
@AfterMethod
public void tearDown() {
dumpGraph();
}
@Test
public void testSubmitEntity() throws Exception {
Referenceable databaseInstance = new Referenceable(DATABASE_TYPE);
......@@ -91,8 +103,28 @@ public class GraphRepoMapperTest {
ITypedReferenceableInstance table = createHiveTableInstance(dbInstance);
String tableGUID = repositoryService.createEntity(table, TABLE_TYPE);
System.out.println("added table = " + tableGUID);
}
dumpGraph();
@Test (dependsOnMethods = "testSubmitEntity")
public void testGetEntityDefinition() throws Exception {
TitanGraph graph = titanGraphService.getTitanGraph();
GraphQuery query = graph.query()
.has("type", Compare.EQUAL, TABLE_TYPE)
.has("type", Compare.EQUAL, TABLE_TYPE);
Iterator<Vertex> results = query.vertices().iterator();
// returning one since guid should be unique
Vertex tableVertex = results.hasNext() ? results.next() : null;
if (tableVertex == null) {
Assert.fail();
}
String guid = tableVertex.getProperty(Constants.GUID_PROPERTY_KEY);
if (guid == null) {
Assert.fail();
}
ITypedReferenceableInstance table = repositoryService.getEntityDefinition(guid);
System.out.println("*** table = " + table);
}
private void dumpGraph() {
......@@ -100,7 +132,12 @@ public class GraphRepoMapperTest {
System.out.println("*******************Graph Dump****************************");
System.out.println("Vertices of " + graph);
for (Vertex vertex : graph.getVertices()) {
System.out.println(GraphHelper.vertexString(vertex));
// System.out.println(GraphHelper.vertexString(vertex));
System.out.print("vertex = " + vertex + " [");
TitanVertex titanVertex = (TitanVertex) graph.getVertex(vertex.getId());
System.out.print("guid= " + titanVertex.getProperty(Constants.GUID_PROPERTY_KEY));
System.out.print("type= " + titanVertex.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY));
System.out.println("]");
}
System.out.println("Edges of " + graph);
......@@ -132,23 +169,45 @@ public class GraphRepoMapperTest {
EnumTypeDefinition enumTypeDefinition = new EnumTypeDefinition("tableType", values);
typeSystem.defineEnumType(enumTypeDefinition);
HierarchicalTypeDefinition<ClassType> columnsDefinition =
createClassTypeDef("column_type",
ImmutableList.<String>of(),
createRequiredAttrDef("name", DataTypes.STRING_TYPE),
createRequiredAttrDef("type", DataTypes.STRING_TYPE));
StructTypeDefinition partitionDefinition =
new StructTypeDefinition("partition_type",
new AttributeDefinition[] {
createRequiredAttrDef("name", DataTypes.STRING_TYPE),
});
HierarchicalTypeDefinition<ClassType> tableTypeDefinition =
createClassTypeDef(TABLE_TYPE,
ImmutableList.<String>of(),
createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE),
createRequiredAttrDef("description", DataTypes.STRING_TYPE),
createRequiredAttrDef("type", DataTypes.STRING_TYPE),
// enum
new AttributeDefinition("tableType", "tableType",
Multiplicity.REQUIRED, false, null),
/*
new AttributeDefinition("columns",
// array of strings
new AttributeDefinition("columnNames",
String.format("array<%s>", DataTypes.STRING_TYPE.getName()),
Multiplicity.COLLECTION, false, null),
*/
// array of classes
new AttributeDefinition("columns",
String.format("array<%s>", "column_type"),
Multiplicity.COLLECTION, true, null),
// array of structs
new AttributeDefinition("partitions",
String.format("array<%s>", "partition_type"),
Multiplicity.COLLECTION, true, null),
// struct reference
new AttributeDefinition("serde1",
"serdeType", Multiplicity.REQUIRED, false, null),
new AttributeDefinition("serde2",
"serdeType", Multiplicity.REQUIRED, false, null),
// class reference
new AttributeDefinition("database",
DATABASE_TYPE, Multiplicity.REQUIRED, true, null));
......@@ -158,9 +217,9 @@ public class GraphRepoMapperTest {
createRequiredAttrDef("tag", DataTypes.STRING_TYPE));
typeSystem.defineTypes(
ImmutableList.of(structTypeDefinition),
ImmutableList.of(structTypeDefinition, partitionDefinition),
ImmutableList.of(classificationTypeDefinition),
ImmutableList.of(databaseTypeDefinition, tableTypeDefinition));
ImmutableList.of(databaseTypeDefinition, columnsDefinition, tableTypeDefinition));
}
private ITypedReferenceableInstance createHiveTableInstance(
......@@ -176,8 +235,15 @@ public class GraphRepoMapperTest {
tableInstance.set("description", "bar table");
tableInstance.set("type", "managed");
tableInstance.set("tableType", 1); // enum
// refer to an existing class
tableInstance.set("database", databaseInstance);
ArrayList<String> columnNames = new ArrayList<>();
columnNames.add("first_name");
columnNames.add("last_name");
tableInstance.set("columnNames", columnNames);
Struct traitInstance = (Struct) tableInstance.getTrait("classification");
traitInstance.set("tag", "foundation_etl");
......@@ -191,6 +257,23 @@ public class GraphRepoMapperTest {
serde2Instance.set("serde", "serde2");
tableInstance.set("serde2", serde2Instance);
ArrayList<Referenceable> columns = new ArrayList<>();
for (int index = 0; index < 5; index++) {
Referenceable columnInstance = new Referenceable("column_type");
columnInstance.set("name", "column_" + index);
columnInstance.set("type", "string");
columns.add(columnInstance);
}
tableInstance.set("columns", columns);
ArrayList<Struct> partitions = new ArrayList<>();
for (int index = 0; index < 5; index++) {
Struct partitionInstance = new Struct("partition_type");
partitionInstance.set("name", "partition_" + index);
partitions.add(partitionInstance);
}
tableInstance.set("partitions", partitions);
ClassType tableType = typeSystem.getDataType(ClassType.class, TABLE_TYPE);
return tableType.convert(tableInstance, Multiplicity.REQUIRED);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment