Commit 639671ca by Venkatesh Seetharam

ISSUE-25 Serialize entity (type instance) into graph db. Contributed by Venkatesh Seetharam

parent 3c350780
......@@ -587,6 +587,12 @@
<dependency>
<groupId>org.apache.hadoop.metadata</groupId>
<artifactId>metadata-typesystem</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop.metadata</groupId>
<artifactId>metadata-repository</artifactId>
<version>${project.version}</version>
</dependency>
......
......@@ -73,6 +73,11 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop.metadata</groupId>
<artifactId>metadata-typesystem</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.repository.graph;
import com.thinkaurelius.titan.core.TitanFactory;
import com.thinkaurelius.titan.core.TitanGraph;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Vertex;
public class TitanBootstrap {
private final TitanGraph graph;
public TitanBootstrap() {
graph = TitanFactory.build().set("storage.backend", "inmemory").open();
}
public TitanGraph getGraph() {
return graph;
}
public Vertex createVertex() {
return graph.addVertex(null);
}
public static String vertexString(final Vertex vertex) {
StringBuilder properties = new StringBuilder();
for (String propertyKey : vertex.getPropertyKeys()) {
properties.append(propertyKey)
.append("=").append(vertex.getProperty(propertyKey))
.append(", ");
}
return "v[" + vertex.getId() + "], Properties[" + properties + "]";
}
public static String edgeString(final Edge edge) {
return "e[" + edge.getLabel() + "], ["
+ edge.getVertex(Direction.OUT).getProperty("name")
+ " -> " + edge.getLabel() + " -> "
+ edge.getVertex(Direction.IN).getProperty("name")
+ "]";
}
public static void main(String[] args) {
TitanBootstrap bootstrap = new TitanBootstrap();
TitanGraph graph = bootstrap.getGraph();
try {
Vertex harish = bootstrap.createVertex();
harish.setProperty("name", "harish");
Vertex venkatesh = bootstrap.createVertex();
venkatesh.setProperty("name", "venkatesh");
harish.addEdge("buddy", venkatesh);
for (Vertex v : graph.getVertices()) {
System.out.println("v = " + vertexString(v));
for (Edge e : v.getEdges(Direction.OUT)) {
System.out.println("e = " + edgeString(e));
}
}
} finally {
graph.shutdown();
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.services;
import org.apache.hadoop.metadata.ITypedReferenceableInstance;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.json.Serialization$;
import org.apache.hadoop.metadata.service.Services;
import org.apache.hadoop.metadata.types.TypeSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
public class DefaultMetadataService implements MetadataService {
private static final Logger LOG =
LoggerFactory.getLogger(DefaultMetadataService.class);
public static final String NAME = DefaultMetadataService.class.getSimpleName();
private TypeSystem typeSystem;
private MetadataRepositoryService repositoryService;
/**
* Creates a new type based on the type system to enable adding
* entities (instances for types).
*
* @param typeName name for this type, must be unique
* @param typeDefinition definition as json
* @return a unique id for this type
*/
@Override
public String createType(String typeName, String typeDefinition) throws MetadataException {
return null;
}
/**
* Return the definition for the given type.
*
* @param typeName name for this type, must be unique
* @return type definition as JSON
*/
@Override
public String getTypeDefinition(String typeName) throws MetadataException {
return null;
}
/**
* Return the list of types in the repository.
*
* @return list of type names in the repository
*/
@Override
public List<String> getTypeNamesList() throws MetadataException {
return null;
}
/**
* Creates an entity, instance of the type.
*
* @param entityType type
* @param entityDefinition definition
* @return guid
*/
@Override
public String createEntity(String entityType,
String entityDefinition) throws MetadataException {
ITypedReferenceableInstance entityInstance =
Serialization$.MODULE$.fromJson(entityDefinition);
return repositoryService.createEntity(entityInstance, entityType);
}
/**
* Return the definition for the given guid.
*
* @param guid guid
* @return entity definition as JSON
*/
@Override
public String getEntityDefinition(String guid) throws MetadataException {
return null;
}
/**
* Return the definition for the given entity name and type.
*
* @param entityName name
* @param entityType type
* @return entity definition as JSON
*/
@Override
public String getEntityDefinition(String entityName,
String entityType) throws MetadataException {
throw new UnsupportedOperationException();
}
/**
* Return the list of entity names for the given type in the repository.
*
* @param entityType type
* @return list of entity names for the given type in the repository
*/
@Override
public List<String> getEntityNamesList(String entityType) throws MetadataException {
throw new UnsupportedOperationException();
}
/**
* Name of the service.
*
* @return name of the service
*/
@Override
public String getName() {
return NAME;
}
/**
* Starts the service. This method blocks until the service has completely started.
*
* @throws Exception
*/
@Override
public void start() throws Exception {
LOG.info("Initializing the Metadata service");
if (Services.get().isRegistered(TitanGraphService.NAME)) {
DefaultTypesService typesService = Services.get().getService(DefaultTypesService.NAME);
typeSystem = typesService.getTypeSystem();
} else {
throw new RuntimeException("Types service is not initialized");
}
if (Services.get().isRegistered(TitanGraphService.NAME)) {
repositoryService = Services.get().getService(GraphBackedMetadataRepositoryService.NAME);
} else {
throw new RuntimeException("repository service is not initialized");
}
}
/**
* Stops the service. This method blocks until the service has completely shut down.
*/
@Override
public void stop() {
// do nothing
repositoryService = null;
}
/**
* A version of stop() that is designed to be usable in Java7 closure
* clauses.
* Implementation classes MUST relay this directly to {@link #stop()}
*
* @throws java.io.IOException never
* @throws RuntimeException on any failure during the stop operation
*/
@Override
public void close() throws IOException {
stop();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.services;
import org.apache.hadoop.metadata.types.TypeSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class DefaultTypesService implements TypesService {
private static final Logger LOG =
LoggerFactory.getLogger(DefaultTypesService.class);
public static final String NAME = DefaultTypesService.class.getSimpleName();
private TypeSystem typeSystem;
@Override
public TypeSystem getTypeSystem() {
assert typeSystem != null;
return typeSystem;
}
/**
* Name of the service.
*
* @return name of the service
*/
@Override
public String getName() {
return NAME;
}
/**
* Starts the service. This method blocks until the service has completely started.
*
* @throws Exception
*/
@Override
public void start() throws Exception {
LOG.info("Initializing the type system");
typeSystem = new TypeSystem();
}
/**
* Stops the service. This method blocks until the service has completely shut down.
*/
@Override
public void stop() {
}
/**
* A version of stop() that is designed to be usable in Java7 closure
* clauses.
* Implementation classes MUST relay this directly to {@link #stop()}
*
* @throws java.io.IOException never
* @throws RuntimeException on any failure during the stop operation
*/
@Override
public void close() throws IOException {
}
}
......@@ -18,21 +18,38 @@
package org.apache.hadoop.metadata.services;
import com.google.common.base.Preconditions;
import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.TransactionalGraph;
import com.tinkerpop.blueprints.Vertex;
import org.apache.hadoop.metadata.IReferenceableInstance;
import org.apache.hadoop.metadata.ITypedInstance;
import org.apache.hadoop.metadata.ITypedReferenceableInstance;
import org.apache.hadoop.metadata.ITypedStruct;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.service.Services;
import org.apache.hadoop.metadata.util.GraphUtils;
import org.json.simple.JSONValue;
import org.apache.hadoop.metadata.storage.Id;
import org.apache.hadoop.metadata.storage.MapIds;
import org.apache.hadoop.metadata.storage.RepositoryException;
import org.apache.hadoop.metadata.types.AttributeInfo;
import org.apache.hadoop.metadata.types.ClassType;
import org.apache.hadoop.metadata.types.DataTypes;
import org.apache.hadoop.metadata.types.IDataType;
import org.apache.hadoop.metadata.types.Multiplicity;
import org.apache.hadoop.metadata.types.ObjectGraphWalker;
import org.apache.hadoop.metadata.types.TypeSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* An implementation backed by Titan Graph DB.
......@@ -43,7 +60,19 @@ public class GraphBackedMetadataRepositoryService implements MetadataRepositoryS
LoggerFactory.getLogger(GraphBackedMetadataRepositoryService.class);
public static final String NAME = GraphBackedMetadataRepositoryService.class.getSimpleName();
private static final String GUID_PROPERTY_KEY = "guid";
private static final String TIMESTAMP_PROPERTY_KEY = "timestamp";
private static final String ENTITY_TYPE_PROPERTY_KEY = "entityType";
private static final String TRAIT_PROPERTY_SUFFIX = "trait.";
private final AtomicInteger ID_SEQ = new AtomicInteger(0);
// private ConcurrentHashMap<String, ITypedReferenceableInstance> types;
private ConcurrentHashMap<String, ITypedReferenceableInstance> instances;
private GraphService graphService;
private TypeSystem typeSystem;
/**
* Name of the service.
......@@ -67,6 +96,15 @@ public class GraphBackedMetadataRepositoryService implements MetadataRepositoryS
} else {
throw new RuntimeException("graph service is not initialized");
}
if (Services.get().isRegistered(DefaultTypesService.NAME)) {
DefaultTypesService typesService = Services.get().getService(DefaultTypesService.NAME);
typeSystem = typesService.getTypeSystem();
} else {
throw new RuntimeException("Types service is not initialized");
}
instances = new ConcurrentHashMap<>();
}
/**
......@@ -91,61 +129,301 @@ public class GraphBackedMetadataRepositoryService implements MetadataRepositoryS
stop();
}
private Graph getBlueprintsGraph() {
return graphService.getBlueprintsGraph();
}
private TransactionalGraph getTransactionalGraph() {
return graphService.getTransactionalGraph();
}
@Override
public String submitEntity(String entity, String entityType) {
public String createEntity(IReferenceableInstance entity,
String entityType) throws RepositoryException {
LOG.info("adding entity={} type={}", entity, entityType);
@SuppressWarnings("unchecked")
Map<String, String> properties = (Map<String, String>) JSONValue.parse(entity);
final String entityName = properties.get("entityName");
Preconditions.checkNotNull(entityName, "entity name cannot be null");
// todo check if this is a duplicate
final String guid = UUID.randomUUID().toString();
final TransactionalGraph transactionalGraph = getTransactionalGraph();
try {
// todo check if this is a duplicate
transactionalGraph.rollback();
Vertex entityVertex = transactionalGraph.addVertex(null);
entityVertex.setProperty("guid", guid);
entityVertex.setProperty("entityName", entityName);
entityVertex.setProperty("entityType", entityType);
for (Map.Entry<String, String> entry : properties.entrySet()) {
entityVertex.setProperty(entry.getKey(), entry.getValue());
EntityProcessor entityProcessor = new EntityProcessor();
try {
new ObjectGraphWalker(typeSystem, entityProcessor, entity).walk();
} catch (MetadataException me) {
throw new RepositoryException("TypeSystem error when walking the ObjectGraph", me);
}
} catch (Exception e) {
List<ITypedReferenceableInstance> newInstances = discoverInstances(entityProcessor);
entityProcessor.createVerticesForClasses(transactionalGraph, newInstances);
return addDiscoveredInstances(entity, entityProcessor, newInstances);
} catch (MetadataException e) {
transactionalGraph.rollback();
throw new RepositoryException(e);
} finally {
transactionalGraph.commit();
}
}
private String addDiscoveredInstances(IReferenceableInstance entity,
EntityProcessor entityProcessor,
List<ITypedReferenceableInstance> newInstances)
throws MetadataException {
String guid = null;
for (ITypedReferenceableInstance instance : newInstances) { // Traverse over newInstances
Id id = instance.getId();
if (id == null) {
// oops
throw new RepositoryException("id cannot be null");
}
Vertex entityVertex = entityProcessor.idToVertexMap.get(id);
instances.put((String) entityVertex.getProperty(GUID_PROPERTY_KEY), instance);
// add the attributes for the instance
final Map<String, AttributeInfo> fields = instance.fieldMapping().fields;
addInstanceToVertex(instance, entityVertex, fields,
entityProcessor.idToVertexMap);
for (String traitName : instance.getTraits()) {
ITypedStruct traitInstance = (ITypedStruct) instance.getTrait(traitName);
// add the attributes for the trait instance
entityVertex.setProperty(TRAIT_PROPERTY_SUFFIX + traitName, traitName);
addInstanceToVertex(traitInstance, entityVertex,
traitInstance.fieldMapping().fields,
entityProcessor.idToVertexMap);
}
if (instance.getId() == entity.getId()) {
guid = entityVertex.getProperty(GUID_PROPERTY_KEY);
}
}
return guid;
}
@Override
public String getEntityDefinition(String entityName, String entityType) {
LOG.info("Retrieving entity name={} type={}", entityName, entityType);
Vertex entityVertex = GraphUtils.findVertex(getBlueprintsGraph(), entityName, entityType);
if (entityVertex == null) {
return null;
private void addInstanceToVertex(ITypedInstance instance, Vertex entityVertex,
Map<String, AttributeInfo> fields,
Map<Id, Vertex> idToVertexMap) throws MetadataException {
for (AttributeInfo attributeInfo : fields.values()) {
System.out.println("*** attributeInfo = " + attributeInfo);
final IDataType dataType = attributeInfo.dataType();
String attributeName = attributeInfo.name;
Object attributeValue = instance.get(attributeInfo.name);
switch (dataType.getTypeCategory()) {
case PRIMITIVE:
addPrimitiveToVertex(instance, entityVertex, attributeInfo);
break;
case ENUM:
addToVertex(entityVertex, attributeInfo.name,
instance.getInt(attributeInfo.name));
break;
case ARRAY:
// todo - Add to/from json for collections
break;
case MAP:
// todo - Add to/from json for collections
break;
case STRUCT:
ITypedStruct structInstance = (ITypedStruct) attributeValue;
addInstanceToVertex(structInstance, entityVertex,
structInstance.fieldMapping().fields, idToVertexMap);
break;
case TRAIT:
ITypedStruct traitInstance = (ITypedStruct) attributeValue;
addInstanceToVertex(traitInstance, entityVertex,
traitInstance.fieldMapping().fields, idToVertexMap);
break;
case CLASS:
Id id = (Id) instance.get(attributeName);
if (id != null) {
Vertex referenceVertex = idToVertexMap.get(id);
addEdge(entityVertex, referenceVertex, "references");
}
break;
default:
break;
}
}
}
protected Edge addEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) {
return addEdge(fromVertex, toVertex, edgeLabel, null);
}
protected Edge addEdge(Vertex fromVertex, Vertex toVertex,
String edgeLabel, String timestamp) {
Edge edge = findEdge(fromVertex, toVertex, edgeLabel);
Edge edgeToVertex = edge != null ? edge : fromVertex.addEdge(edgeLabel, toVertex);
if (timestamp != null) {
edgeToVertex.setProperty(TIMESTAMP_PROPERTY_KEY, timestamp);
}
return edgeToVertex;
}
Map<String, String> properties = GraphUtils.extractProperties(entityVertex);
return JSONValue.toJSONString(properties);
protected Edge findEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) {
return findEdge(fromVertex, toVertex.getProperty(GUID_PROPERTY_KEY), edgeLabel);
}
protected Edge findEdge(Vertex fromVertex, Object toVertexName, String edgeLabel) {
Edge edgeToFind = null;
for (Edge edge : fromVertex.getEdges(Direction.OUT, edgeLabel)) {
if (edge.getVertex(Direction.IN).getProperty(GUID_PROPERTY_KEY).equals(toVertexName)) {
edgeToFind = edge;
break;
}
}
return edgeToFind;
}
/*
* Step 2: Traverse oldIdToInstance map create newInstances :
* List[ITypedReferenceableInstance]
* - create a ITypedReferenceableInstance.
* replace any old References ( ids or object references) with new Ids.
*/
private List<ITypedReferenceableInstance> discoverInstances(EntityProcessor entityProcessor)
throws RepositoryException {
List<ITypedReferenceableInstance> newInstances = new ArrayList<>();
for (IReferenceableInstance transientInstance : entityProcessor.idToInstanceMap.values()) {
LOG.debug("instance {}", transientInstance);
try {
ClassType cT = typeSystem.getDataType(
ClassType.class, transientInstance.getTypeName());
ITypedReferenceableInstance newInstance = cT.convert(
transientInstance, Multiplicity.REQUIRED);
newInstances.add(newInstance);
/*
* Now replace old references with new Ids
*/
MapIds mapIds = new MapIds(entityProcessor.idToNewIdMap);
new ObjectGraphWalker(typeSystem, mapIds, newInstances).walk();
} catch (MetadataException me) {
throw new RepositoryException(
String.format("Failed to create Instance(id = %s",
transientInstance.getId()), me);
}
}
return newInstances;
}
private void addPrimitiveToVertex(ITypedInstance instance,
Vertex entityVertex,
AttributeInfo attributeInfo) throws MetadataException {
if (instance.get(attributeInfo.name) == null) { // add only if instance has this attribute
return;
}
if (attributeInfo.dataType() == DataTypes.STRING_TYPE) {
entityVertex.setProperty(attributeInfo.name, instance.getString(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.SHORT_TYPE) {
entityVertex.setProperty(attributeInfo.name, instance.getShort(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.INT_TYPE) {
entityVertex.setProperty(attributeInfo.name, instance.getInt(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.BIGINTEGER_TYPE) {
entityVertex.setProperty(attributeInfo.name, instance.getBigInt(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.BOOLEAN_TYPE) {
entityVertex.setProperty(attributeInfo.name, instance.getBoolean(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.BYTE_TYPE) {
entityVertex.setProperty(attributeInfo.name, instance.getByte(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.LONG_TYPE) {
entityVertex.setProperty(attributeInfo.name, instance.getLong(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.FLOAT_TYPE) {
entityVertex.setProperty(attributeInfo.name, instance.getFloat(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.DOUBLE_TYPE) {
entityVertex.setProperty(attributeInfo.name, instance.getDouble(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.BIGDECIMAL_TYPE) {
entityVertex.setProperty(attributeInfo.name, instance.getBigDecimal(attributeInfo.name));
}
}
public static void addToVertex(Vertex entityVertex, String name, int value) {
entityVertex.setProperty(name, value);
}
@Override
public ITypedReferenceableInstance getEntityDefinition(String guid) throws RepositoryException {
LOG.info("Retrieving entity with guid={}", guid);
return instances.get(guid);
}
@Override
public List<String> getEntityList(String entityType) {
public List<String> getEntityList(String entityType) throws RepositoryException {
LOG.info("Retrieving entity list for type={}", entityType);
return Collections.emptyList();
}
private final class EntityProcessor implements ObjectGraphWalker.NodeProcessor {
public final Map<Id, Id> idToNewIdMap;
public final Map<Id, IReferenceableInstance> idToInstanceMap;
public final Map<Id, Vertex> idToVertexMap;
public EntityProcessor() {
idToNewIdMap = new HashMap<>();
idToInstanceMap = new HashMap<>();
idToVertexMap = new HashMap<>();
}
@Override
public void processNode(ObjectGraphWalker.Node nd) throws MetadataException {
IReferenceableInstance ref = null;
Id id = null;
if (nd.attributeName == null) {
ref = (IReferenceableInstance) nd.instance;
id = ref.getId();
} else if (nd.aInfo.dataType().getTypeCategory() == DataTypes.TypeCategory.CLASS) {
if (nd.value != null && (nd.value instanceof Id)) {
id = (Id) nd.value;
}
}
if (id != null) {
if (id.isUnassigned()) {
if (!idToNewIdMap.containsKey(id)) {
idToNewIdMap.put(id, new Id(ID_SEQ.getAndIncrement(), 0, id.className));
}
if (ref != null && idToInstanceMap.containsKey(id)) { // Oops
throw new RepositoryException(String.format(
"Unexpected internal error: Id %s processed again", id));
}
if (ref != null) {
idToInstanceMap.put(id, ref);
}
}
}
}
public void createVerticesForClasses(TransactionalGraph transactionalGraph,
List<ITypedReferenceableInstance> newInstances) {
for (ITypedReferenceableInstance instance : newInstances) {
final Vertex entityVertex = transactionalGraph.addVertex(null);
entityVertex.setProperty(ENTITY_TYPE_PROPERTY_KEY, instance.getTypeName());
// entityVertex.setProperty("entityName", instance.getString("name"));
final String guid = UUID.randomUUID().toString();
entityVertex.setProperty(GUID_PROPERTY_KEY, guid);
idToVertexMap.put(instance.getId(), entityVertex);
}
}
}
}
......@@ -18,7 +18,10 @@
package org.apache.hadoop.metadata.services;
import org.apache.hadoop.metadata.IReferenceableInstance;
import org.apache.hadoop.metadata.ITypedReferenceableInstance;
import org.apache.hadoop.metadata.service.Service;
import org.apache.hadoop.metadata.storage.RepositoryException;
import java.util.List;
......@@ -27,9 +30,10 @@ import java.util.List;
*/
public interface MetadataRepositoryService extends Service {
String submitEntity(String entity, String entityType);
String createEntity(IReferenceableInstance entity,
String entityType) throws RepositoryException;
String getEntityDefinition(String entityName, String entityType);
ITypedReferenceableInstance getEntityDefinition(String guid) throws RepositoryException;
List<String> getEntityList(String entityType);
List<String> getEntityList(String entityType) throws RepositoryException;
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.services;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.service.Service;
import java.util.List;
public interface MetadataService extends Service {
/**
* Creates a new type based on the type system to enable adding
* entities (instances for types).
*
* @param typeName name for this type, must be unique
* @param typeDefinition definition as json
* @return a unique id for this type
*/
String createType(String typeName, String typeDefinition) throws MetadataException;
/**
* Return the definition for the given type.
*
* @param typeName name for this type, must be unique
* @return type definition as JSON
*/
String getTypeDefinition(String typeName) throws MetadataException;
/**
* Return the list of types in the repository.
*
* @return list of type names in the repository
*/
List<String> getTypeNamesList() throws MetadataException;
/**
* Creates an entity, instance of the type.
*
* @param entityType type
* @param entityDefinition definition
* @return guid
*/
String createEntity(String entityType, String entityDefinition) throws MetadataException;
/**
* Return the definition for the given guid.
*
* @param guid guid
* @return entity definition as JSON
*/
String getEntityDefinition(String guid) throws MetadataException;
/**
* Return the definition for the given entity name and type.
*
* @param entityName name
* @param entityType type
* @return entity definition as JSON
*/
String getEntityDefinition(String entityName, String entityType) throws MetadataException;
/**
* Return the list of entity names for the given type in the repository.
*
* @param entityType type
* @return list of entity names for the given type in the repository
*/
List<String> getEntityNamesList(String entityType) throws MetadataException;
}
......@@ -142,12 +142,13 @@ public class TitanGraphService implements GraphService {
LOG.info("Indexes do not exist, Creating indexes for titanGraph using indexer.properties.");
Configuration indexConfig = getConfiguration("indexer.properties", INDEXER_PREFIX);
TitanManagement mgmt = titanGraph.getManagementSystem();
mgmt.buildIndex("mainIndex", Vertex.class).buildMixedIndex("search");
TitanGraphIndex graphIndex = mgmt.getGraphIndex("mainIndex");
mgmt.addIndexKey(graphIndex, mgmt.makePropertyKey("guid").dataType(String.class).make());
Configuration indexConfig = getConfiguration("indexer.properties", INDEXER_PREFIX);
// Properties are formatted: prop_name:type;prop_name:type
// E.g. Name:String;Date:Long
if (!indexConfig.isEmpty()) {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.services;
import org.apache.hadoop.metadata.service.Service;
import org.apache.hadoop.metadata.types.TypeSystem;
public interface TypesService extends Service {
TypeSystem getTypeSystem();
}
......@@ -5,6 +5,10 @@ import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.GraphQuery;
import com.tinkerpop.blueprints.Vertex;
import com.tinkerpop.blueprints.util.io.graphson.GraphSONMode;
import com.tinkerpop.blueprints.util.io.graphson.GraphSONUtility;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -23,6 +27,16 @@ public final class GraphUtils {
}
public static Vertex findVertex(Graph blueprintsGraph,
String guid) {
LOG.debug("Finding vertex for: guid={}", guid);
GraphQuery query = blueprintsGraph.query().has("guid", guid);
Iterator<Vertex> results = query.vertices().iterator();
// returning one since name/type is unique
return results.hasNext() ? results.next() : null;
}
public static Vertex findVertex(Graph blueprintsGraph,
String entityName, String entityType) {
LOG.debug("Finding vertex for: name={}, type={}", entityName, entityType);
......@@ -54,6 +68,10 @@ public final class GraphUtils {
return "v[" + vertex.getId() + "], Properties[" + properties + "]";
}
public static JSONObject vertexJSON(final Vertex vertex) throws JSONException {
return GraphSONUtility.jsonFromElement(vertex, null, GraphSONMode.NORMAL);
}
public static String edgeString(final Edge edge) {
return "e[" + edge.getLabel() + "], ["
+ edge.getVertex(Direction.OUT).getProperty("name")
......
......@@ -69,6 +69,11 @@
<appender-ref ref="FILE"/>
</logger>
<logger name="com.thinkaurelius.titan" additivity="false">
<level value="warn"/>
<appender-ref ref="FILE"/>
</logger>
<root>
<priority value="debug"/>
<appender-ref ref="console"/>
......
package org.apache.hadoop.metadata.services;
import com.google.common.collect.ImmutableList;
import com.thinkaurelius.titan.core.TitanGraph;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Vertex;
import org.apache.hadoop.metadata.ITypedReferenceableInstance;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.MetadataService;
import org.apache.hadoop.metadata.Referenceable;
import org.apache.hadoop.metadata.service.Services;
import org.json.simple.JSONValue;
import org.apache.hadoop.metadata.storage.memory.MemRepository;
import org.apache.hadoop.metadata.types.AttributeDefinition;
import org.apache.hadoop.metadata.types.ClassType;
import org.apache.hadoop.metadata.types.DataTypes;
import org.apache.hadoop.metadata.types.HierarchicalType;
import org.apache.hadoop.metadata.types.HierarchicalTypeDefinition;
import org.apache.hadoop.metadata.types.IDataType;
import org.apache.hadoop.metadata.types.Multiplicity;
import org.apache.hadoop.metadata.types.StructTypeDefinition;
import org.apache.hadoop.metadata.types.TraitType;
import org.apache.hadoop.metadata.types.TypeSystem;
import org.apache.hadoop.metadata.util.GraphUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class GraphBackedMetadataRepositoryServiceTest {
private static final String ENTITY_NAME = "clicks-table";
private static final String ENTITY_TYPE = "hive-table";
private static final String DATABASE_NAME = "ads";
private static final String TABLE_NAME = "clicks-table";
private TitanGraphService titanGraphService;
private GraphBackedMetadataRepositoryService repositoryService;
protected org.apache.hadoop.metadata.MetadataService ms;
private String guid;
@BeforeClass
public void setUp() throws Exception {
......@@ -27,9 +44,21 @@ public class GraphBackedMetadataRepositoryServiceTest {
titanGraphService.start();
Services.get().register(titanGraphService);
DefaultTypesService typesService = new DefaultTypesService();
typesService.start();
Services.get().register(typesService);
TypeSystem ts = typesService.getTypeSystem();
repositoryService = new GraphBackedMetadataRepositoryService();
repositoryService.start();
Services.get().register(repositoryService);
// todo - only used for types
MemRepository mr = new MemRepository(ts);
ms = new org.apache.hadoop.metadata.MetadataService(mr, ts);
MetadataService.setCurrentService(ms);
defineDeptEmployeeTypes(ts);
}
@AfterClass
......@@ -48,35 +77,37 @@ public class GraphBackedMetadataRepositoryServiceTest {
@Test
public void testSubmitEntity() throws Exception {
String entityStream = getTestEntityJSON();
String guid = repositoryService.submitEntity(entityStream, ENTITY_TYPE);
TypeSystem typeSystem = MetadataService.getCurrentService().getTypeSystem();
Referenceable hrDept = createDeptEg1(typeSystem);
ClassType deptType = typeSystem.getDataType(ClassType.class, "Department");
ITypedReferenceableInstance hrDept2 = deptType.convert(hrDept, Multiplicity.REQUIRED);
guid = repositoryService.createEntity(hrDept2, ENTITY_TYPE);
Assert.assertNotNull(guid);
dumpGraph();
}
private String getTestEntityJSON() {
Map<String, String> props = new HashMap<>();
props.put("entityName", ENTITY_NAME);
props.put("entityType", ENTITY_TYPE);
props.put("database", DATABASE_NAME);
props.put("table", TABLE_NAME);
return JSONValue.toJSONString(props);
private void dumpGraph() {
TitanGraph graph = titanGraphService.getTitanGraph();
for (Vertex v : graph.getVertices()) {
// System.out.println("****v = " + GraphUtils.vertexString(v));
System.out.println("v = " + v);
for (Edge e : v.getEdges(Direction.OUT)) {
System.out.println("****e = " + GraphUtils.edgeString(e));
}
}
}
@Test (dependsOnMethods = "testSubmitEntity")
@Test(dependsOnMethods = "testSubmitEntity")
public void testGetEntityDefinition() throws Exception {
String entity = repositoryService.getEntityDefinition(ENTITY_NAME, ENTITY_TYPE);
Map<String, String> entityProperties =
(Map<String, String>) JSONValue.parseWithException(entity);
Assert.assertNotNull(entityProperties.get("guid"));
Assert.assertEquals(entityProperties.get("entityName"), ENTITY_NAME);
Assert.assertEquals(entityProperties.get("entityType"), ENTITY_TYPE);
Assert.assertEquals(entityProperties.get("database"), DATABASE_NAME);
Assert.assertEquals(entityProperties.get("table"), TABLE_NAME);
ITypedReferenceableInstance entity = repositoryService.getEntityDefinition(guid);
Assert.assertNotNull(entity);
}
@Test
public void testGetEntityDefinitionNonExistent() throws Exception {
String entity = repositoryService.getEntityDefinition("blah", "blah");
ITypedReferenceableInstance entity = repositoryService.getEntityDefinition("blah");
Assert.assertNull(entity);
}
......@@ -87,7 +118,7 @@ public class GraphBackedMetadataRepositoryServiceTest {
Assert.assertEquals(entityList.size(), 0); // as this is not implemented yet
}
@Test (expectedExceptions = RuntimeException.class)
@Test(expectedExceptions = RuntimeException.class)
public void testStartWithOutGraphServiceRegistration() throws Exception {
try {
Services.get().reset();
......@@ -100,4 +131,101 @@ public class GraphBackedMetadataRepositoryServiceTest {
Services.get().register(repositoryService);
}
}
/*
* Class Hierarchy is:
* Department(name : String, employees : Array[Person])
* Person(name : String, department : Department, manager : Manager)
* Manager(subordinates : Array[Person]) extends Person
*
* Persons can have SecurityClearance(level : Int) clearance.
*/
protected void defineDeptEmployeeTypes(TypeSystem ts) throws MetadataException {
HierarchicalTypeDefinition<ClassType> deptTypeDef =
createClassTypeDef("Department", ImmutableList.<String>of(),
createRequiredAttrDef("name", DataTypes.STRING_TYPE),
new AttributeDefinition("employees",
String.format("array<%s>", "Person"), Multiplicity.COLLECTION, true,
"department")
);
HierarchicalTypeDefinition<ClassType> personTypeDef = createClassTypeDef("Person",
ImmutableList.<String>of(),
createRequiredAttrDef("name", DataTypes.STRING_TYPE),
new AttributeDefinition("department",
"Department", Multiplicity.REQUIRED, false, "employees"),
new AttributeDefinition("manager",
"Manager", Multiplicity.OPTIONAL, false, "subordinates")
);
HierarchicalTypeDefinition<ClassType> managerTypeDef = createClassTypeDef("Manager",
ImmutableList.<String>of("Person"),
new AttributeDefinition("subordinates",
String.format("array<%s>", "Person"), Multiplicity.COLLECTION, false,
"manager")
);
HierarchicalTypeDefinition<TraitType> securityClearanceTypeDef = createTraitTypeDef(
"SecurityClearance",
ImmutableList.<String>of(),
createRequiredAttrDef("level", DataTypes.INT_TYPE)
);
ts.defineTypes(ImmutableList.<StructTypeDefinition>of(),
ImmutableList.<HierarchicalTypeDefinition<TraitType>>of(securityClearanceTypeDef),
ImmutableList.<HierarchicalTypeDefinition<ClassType>>of(deptTypeDef, personTypeDef,
managerTypeDef));
ImmutableList<HierarchicalType> types = ImmutableList.of(
ts.getDataType(HierarchicalType.class, "SecurityClearance"),
ts.getDataType(ClassType.class, "Department"),
ts.getDataType(ClassType.class, "Person"),
ts.getDataType(ClassType.class, "Manager")
);
ms.getRepository().defineTypes(types);
}
protected Referenceable createDeptEg1(TypeSystem ts) throws MetadataException {
Referenceable hrDept = new Referenceable("Department");
Referenceable john = new Referenceable("Person");
Referenceable jane = new Referenceable("Manager", "SecurityClearance");
hrDept.set("name", "hr");
john.set("name", "John");
john.set("department", hrDept);
jane.set("name", "Jane");
jane.set("department", hrDept);
john.set("manager", jane);
hrDept.set("employees", ImmutableList.<Referenceable>of(john, jane));
jane.set("subordinates", ImmutableList.<Referenceable>of(john));
jane.getTrait("SecurityClearance").set("level", 1);
ClassType deptType = ts.getDataType(ClassType.class, "Department");
ITypedReferenceableInstance hrDept2 = deptType.convert(hrDept, Multiplicity.REQUIRED);
return hrDept;
}
public static AttributeDefinition createRequiredAttrDef(String name,
IDataType dataType) {
return new AttributeDefinition(name, dataType.getName(), Multiplicity.REQUIRED, false, null);
}
@SuppressWarnings("unchecked")
protected HierarchicalTypeDefinition<TraitType> createTraitTypeDef(
String name, ImmutableList<String> superTypes, AttributeDefinition... attrDefs) {
return new HierarchicalTypeDefinition(TraitType.class, name, superTypes, attrDefs);
}
@SuppressWarnings("unchecked")
protected HierarchicalTypeDefinition<ClassType> createClassTypeDef(
String name, ImmutableList<String> superTypes, AttributeDefinition... attrDefs) {
return new HierarchicalTypeDefinition(ClassType.class, name, superTypes, attrDefs);
}
}
......@@ -57,12 +57,12 @@ public class TitanGraphServiceTest {
@Test
public void testGetVertexIndexedKeys() throws Exception {
Assert.assertNotNull(titanGraphService.getVertexIndexedKeys());
Assert.assertEquals(titanGraphService.getVertexIndexedKeys().size(), 8);
Assert.assertTrue(titanGraphService.getVertexIndexedKeys().size() > 0);
}
@Test
public void testGetEdgeIndexedKeys() throws Exception {
Assert.assertNotNull(titanGraphService.getEdgeIndexedKeys());
Assert.assertEquals(titanGraphService.getEdgeIndexedKeys().size(), 8);
Assert.assertTrue(titanGraphService.getEdgeIndexedKeys().size() > 0);
}
}
\ No newline at end of file
......@@ -24,20 +24,14 @@ import org.apache.hadoop.metadata.IReferenceableInstance;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.types.DataTypes;
import org.apache.hadoop.metadata.types.ObjectGraphWalker;
import org.apache.hadoop.metadata.IReferenceableInstance;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.types.DataTypes;
import org.apache.hadoop.metadata.types.ObjectGraphWalker;
import java.util.Map;
public class MapIds implements ObjectGraphWalker.NodeProcessor {
final IRepository repository;
final Map<Id, Id> idToNewIdMap;
public MapIds(IRepository repository, Map<Id, Id> idToNewIdMap) {
this.repository = repository;
public MapIds(Map<Id, Id> idToNewIdMap) {
this.idToNewIdMap = idToNewIdMap;
}
......
......@@ -142,7 +142,7 @@ public class MemRepository implements IRepository {
/*
* Now replace old references with new Ids
*/
MapIds mapIds = new MapIds(this, discoverInstances.idToNewIdMap);
MapIds mapIds = new MapIds(discoverInstances.idToNewIdMap);
new ObjectGraphWalker(typeSystem, mapIds, newInstances).walk();
} catch (MetadataException me) {
......
......@@ -49,4 +49,15 @@ public class AttributeInfo {
void setDataType(IDataType dT) {
dataType = dT;
}
@Override
public String toString() {
return "AttributeInfo{" +
"name='" + name + '\'' +
", dataType=" + dataType +
", multiplicity=" + multiplicity +
", isComposite=" + isComposite +
", reverseAttributeName='" + reverseAttributeName + '\'' +
'}';
}
}
......@@ -20,13 +20,16 @@ package org.apache.hadoop.metadata.web.resources;
import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.service.Services;
import org.apache.hadoop.metadata.services.GraphBackedMetadataRepositoryService;
import org.apache.hadoop.metadata.services.MetadataRepositoryService;
import org.apache.hadoop.metadata.services.DefaultMetadataService;
import org.apache.hadoop.metadata.services.MetadataService;
import org.apache.hadoop.metadata.web.util.Servlets;
import org.codehaus.jettison.json.JSONObject;
import org.json.simple.JSONValue;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
......@@ -54,11 +57,13 @@ import java.io.StringWriter;
@Path("entities")
public class EntityResource {
private MetadataRepositoryService repositoryService;
private static final Logger LOG = LoggerFactory.getLogger(EntityResource.class);
private MetadataService metadataService;
public EntityResource() {
repositoryService = Services.get().getService(GraphBackedMetadataRepositoryService.NAME);
if (repositoryService == null) {
metadataService = Services.get().getService(DefaultMetadataService.NAME);
if (metadataService == null) {
throw new RuntimeException("graph service is not initialized");
}
}
......@@ -74,7 +79,7 @@ public class EntityResource {
System.out.println("entity = " + entity);
validateEntity(entity, entityType);
final String guid = repositoryService.submitEntity(entity, entityType);
final String guid = metadataService.createEntity(entity, entityType);
JSONObject response = new JSONObject();
response.put("GUID", guid);
......@@ -102,8 +107,22 @@ public class EntityResource {
@Path("definition/{guid}")
@Produces(MediaType.APPLICATION_JSON)
public Response getEntityDefinition(@PathParam("guid") String guid) {
Preconditions.checkNotNull(guid, "guid cannot be null");
return Response.ok().build();
try {
final String entityDefinition = metadataService.getEntityDefinition(guid);
return (entityDefinition == null)
? Response.status(Response.Status.NOT_FOUND).build()
: Response.ok(entityDefinition).build();
} catch (MetadataException e) {
LOG.error("Action failed: {}\nError: {}",
Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
throw new WebApplicationException(e, Response
.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity(e.getMessage())
.type(MediaType.APPLICATION_JSON)
.build());
}
}
@GET
......@@ -111,10 +130,16 @@ public class EntityResource {
@Produces(MediaType.APPLICATION_JSON)
public Response getEntityDefinition(@PathParam("entityType") String entityType,
@PathParam("entityName") String entityName) {
final String entityDefinition = repositoryService.getEntityDefinition(entityName, entityType);
return (entityDefinition == null)
? Response.status(Response.Status.NOT_FOUND).build()
: Response.ok(entityDefinition).build();
return Response.status(Response.Status.SERVICE_UNAVAILABLE).build();
}
@GET
@Path("list/{entityType}")
@Produces(MediaType.APPLICATION_JSON)
public Response getEntityList(@PathParam("entityType") String entityType,
@DefaultValue("0") @QueryParam("offset") Integer offset,
@QueryParam("numResults") Integer resultsPerPage) {
return Response.ok().build();
}
@POST
......@@ -161,13 +186,4 @@ public class EntityResource {
@PathParam("entityName") String entityName) {
return Response.ok().build();
}
@GET
@Path("list/{entityType}")
@Produces(MediaType.APPLICATION_JSON)
public Response getEntityList(@PathParam("entityType") String entityType,
@DefaultValue("0") @QueryParam("offset") Integer offset,
@QueryParam("numResults") Integer resultsPerPage) {
return Response.ok().build();
}
}
......@@ -42,6 +42,14 @@ public class TypesResource {
return Response.ok().build();
}
@GET
@Path("definition/{type}")
@Produces(MediaType.APPLICATION_JSON)
public Response getDefinition(@Context HttpServletRequest request,
@PathParam("type") String type) {
return Response.ok().build();
}
@DELETE
@Path("delete/{type}")
@Produces(MediaType.APPLICATION_JSON)
......
......@@ -17,7 +17,8 @@
#
application.services=org.apache.hadoop.metadata.services.TitanGraphService,\
org.apache.hadoop.metadata.services.GraphBackedMetadataRepositoryService
org.apache.hadoop.metadata.services.GraphBackedMetadataRepositoryService,\
org.apache.hadoop.metadata.services.DefaultMetadataService
######### Implementation classes #########
......
......@@ -38,13 +38,14 @@ public class GraphRepositoryServiceIT {
Services.get().reset();
}
/*
@Test
public void testRepository() throws Exception {
GraphBackedMetadataRepositoryService repositoryService =
Services.get().getService(GraphBackedMetadataRepositoryService.NAME);
String entityStream = getTestEntityJSON();
String guid = repositoryService.submitEntity(entityStream, ENTITY_TYPE);
String guid = repositoryService.createEntity(entityStream, ENTITY_TYPE);
Assert.assertNotNull(guid);
String entity = repositoryService.getEntityDefinition(ENTITY_NAME, ENTITY_TYPE);
......@@ -66,4 +67,5 @@ public class GraphRepositoryServiceIT {
props.put("table", TABLE_NAME);
return JSONValue.toJSONString(props);
}
*/
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment