Commit 386d8d38 by Venkatesh Seetharam

ISSUE-35 Add typed instance put/get end to end. Contributed by Venkatesh Seetharam

parent 8285e3b9
......@@ -83,8 +83,10 @@ public class DefaultMetadataService implements MetadataService {
return response;
} catch (ParseException e) {
LOG.error("Unable to parse JSON for type {}", typeName, e);
throw new MetadataException("validation failed for: " + typeName);
} catch (JSONException e) {
LOG.error("Unable to persist type {}", typeName, e);
throw new MetadataException("Unable to create response for: " + typeName);
}
}
......@@ -147,6 +149,7 @@ public class DefaultMetadataService implements MetadataService {
Serialization$.MODULE$.fromJson(entityDefinition);
return repository.createEntity(entityInstance, entityType);
} catch (ParseException e) {
LOG.error("Unable to parse JSON {} for type {}", entityDefinition, entityType, e);
throw new MetadataException("validation failed for: " + entityType);
}
}
......@@ -154,7 +157,9 @@ public class DefaultMetadataService implements MetadataService {
private void validateEntity(String entity, String entityType) throws ParseException {
Preconditions.checkNotNull(entity, "entity cannot be null");
Preconditions.checkNotNull(entityType, "entity type cannot be null");
JSONValue.parseWithException(entity);
// todo: this is failing for instances but not types
// JSONValue.parseWithException(entity);
}
/**
......@@ -167,21 +172,9 @@ public class DefaultMetadataService implements MetadataService {
public String getEntityDefinition(String guid) throws MetadataException {
final ITypedReferenceableInstance instance =
repository.getEntityDefinition(guid);
return Serialization$.MODULE$.toJson(instance);
}
/**
* Return the definition for the given entity name and type.
*
* @param entityName name
* @param entityType type
* @return entity definition as JSON
*/
@Override
public String getEntityDefinition(String entityName,
String entityType) throws MetadataException {
throw new UnsupportedOperationException();
return instance == null
? null
: Serialization$.MODULE$.toJson(instance);
}
/**
......
......@@ -18,8 +18,12 @@
package org.apache.hadoop.metadata.services;
import com.thinkaurelius.titan.core.TitanProperty;
import com.thinkaurelius.titan.core.TitanVertex;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.GraphQuery;
import com.tinkerpop.blueprints.TransactionalGraph;
import com.tinkerpop.blueprints.Vertex;
import org.apache.hadoop.metadata.IReferenceableInstance;
......@@ -36,20 +40,25 @@ import org.apache.hadoop.metadata.types.DataTypes;
import org.apache.hadoop.metadata.types.IDataType;
import org.apache.hadoop.metadata.types.Multiplicity;
import org.apache.hadoop.metadata.types.ObjectGraphWalker;
import org.apache.hadoop.metadata.types.StructType;
import org.apache.hadoop.metadata.types.TraitType;
import org.apache.hadoop.metadata.types.TypeSystem;
import org.apache.hadoop.metadata.util.GraphUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.inject.Inject;
/**
* An implementation backed by a Graph database provided
......@@ -60,23 +69,24 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
private static final Logger LOG =
LoggerFactory.getLogger(GraphBackedMetadataRepository.class);
private static final String GUID_PROPERTY_KEY = "guid";
private static final String TIMESTAMP_PROPERTY_KEY = "timestamp";
private static final String GUID_PROPERTY_KEY = "GUID";
private static final String ENTITY_TYPE_PROPERTY_KEY = "entityType";
private static final String VERSION_PROPERTY_KEY = "version";
private static final String TRAIT_PROPERTY_SUFFIX = "trait.";
private final AtomicInteger ID_SEQ = new AtomicInteger(0);
// todo: remove this
private final ConcurrentHashMap<String, ITypedReferenceableInstance> instances;
private final TypedInstanceToGraphMapper instanceToGraphMapper
= new TypedInstanceToGraphMapper();
private final GraphToTypedInstanceMapper graphToInstanceMapper
= new GraphToTypedInstanceMapper();
private final GraphService graphService;
private final TypeSystem typeSystem;
@Inject
GraphBackedMetadataRepository(GraphService graphService) throws MetadataException {
this.instances = new ConcurrentHashMap<>();
this.graphService = graphService;
this.typeSystem = TypeSystem.getInstance();
}
......@@ -121,6 +131,111 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
transactionalGraph.rollback();
return instanceToGraphMapper.mapTypedInstanceToGraph(entity, transactionalGraph);
} catch (MetadataException e) {
transactionalGraph.rollback();
throw new RepositoryException(e);
} finally {
transactionalGraph.commit();
}
}
@Override
public ITypedReferenceableInstance getEntityDefinition(String guid) throws RepositoryException {
LOG.info("Retrieving entity with guid={}", guid);
final Graph graph = graphService.getBlueprintsGraph();
try {
GraphQuery query = graph.query().has(GUID_PROPERTY_KEY, guid);
Iterator<Vertex> results = query.vertices().iterator();
// returning one since name/type is unique
Vertex instanceVertex = results.hasNext() ? results.next() : null;
if (instanceVertex == null) {
return null;
}
return graphToInstanceMapper.mapGraphToTypedInstance(guid, instanceVertex);
} catch (Exception e) {
throw new RepositoryException(e);
}
}
@Override
public List<String> getEntityList(String entityType) throws RepositoryException {
LOG.info("Retrieving entity list for type={}", entityType);
return Collections.emptyList();
}
private final class EntityProcessor implements ObjectGraphWalker.NodeProcessor {
public final Map<Id, Id> idToNewIdMap;
public final Map<Id, IReferenceableInstance> idToInstanceMap;
public final Map<Id, Vertex> idToVertexMap;
public EntityProcessor() {
idToNewIdMap = new HashMap<>();
idToInstanceMap = new HashMap<>();
idToVertexMap = new HashMap<>();
}
@Override
public void processNode(ObjectGraphWalker.Node nd) throws MetadataException {
IReferenceableInstance ref = null;
Id id = null;
if (nd.attributeName == null) {
ref = (IReferenceableInstance) nd.instance;
id = ref.getId();
} else if (nd.aInfo.dataType().getTypeCategory() == DataTypes.TypeCategory.CLASS) {
if (nd.value != null && (nd.value instanceof Id)) {
id = (Id) nd.value;
}
}
if (id != null) {
if (id.isUnassigned()) {
if (!idToNewIdMap.containsKey(id)) {
idToNewIdMap.put(id, new Id(ID_SEQ.getAndIncrement(), 0, id.className));
}
if (ref != null && idToInstanceMap.containsKey(id)) { // Oops
throw new RepositoryException(String.format(
"Unexpected internal error: Id %s processed again", id));
}
if (ref != null) {
idToInstanceMap.put(id, ref);
}
}
}
}
public void createVerticesForClassTypes(TransactionalGraph transactionalGraph,
List<ITypedReferenceableInstance> newInstances) {
for (ITypedReferenceableInstance typedInstance : newInstances) {
final Vertex instanceVertex = transactionalGraph.addVertex(null);
instanceVertex.setProperty(ENTITY_TYPE_PROPERTY_KEY, typedInstance.getTypeName());
// entityVertex.setProperty("entityName", instance.getString("name"));
final String guid = UUID.randomUUID().toString();
instanceVertex.setProperty(GUID_PROPERTY_KEY, guid);
final Id typedInstanceId = typedInstance.getId();
instanceVertex.setProperty(VERSION_PROPERTY_KEY, typedInstanceId.version);
idToVertexMap.put(typedInstanceId, instanceVertex);
}
}
}
private final class TypedInstanceToGraphMapper {
private String mapTypedInstanceToGraph(IReferenceableInstance entity,
TransactionalGraph transactionalGraph)
throws MetadataException {
EntityProcessor entityProcessor = new EntityProcessor();
try {
new ObjectGraphWalker(typeSystem, entityProcessor, entity).walk();
......@@ -128,74 +243,102 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
throw new RepositoryException("TypeSystem error when walking the ObjectGraph", me);
}
List<ITypedReferenceableInstance> newInstances = discoverInstances(entityProcessor);
entityProcessor.createVerticesForClasses(transactionalGraph, newInstances);
return addDiscoveredInstances(entity, entityProcessor, newInstances);
List<ITypedReferenceableInstance> newTypedInstances = discoverInstances(entityProcessor);
entityProcessor.createVerticesForClassTypes(transactionalGraph, newTypedInstances);
return addDiscoveredInstances(entity, entityProcessor, newTypedInstances);
}
} catch (MetadataException e) {
transactionalGraph.rollback();
throw new RepositoryException(e);
} finally {
transactionalGraph.commit();
/*
* Step 2: Traverse oldIdToInstance map create newInstances :
* List[ITypedReferenceableInstance]
* - create a ITypedReferenceableInstance.
* replace any old References ( ids or object references) with new Ids.
*/
private List<ITypedReferenceableInstance> discoverInstances(EntityProcessor entityProcessor)
throws RepositoryException {
List<ITypedReferenceableInstance> newTypedInstances = new ArrayList<>();
for (IReferenceableInstance transientInstance : entityProcessor.idToInstanceMap.values()) {
LOG.debug("instance {}", transientInstance);
try {
ClassType cT = typeSystem.getDataType(
ClassType.class, transientInstance.getTypeName());
ITypedReferenceableInstance newInstance = cT.convert(
transientInstance, Multiplicity.REQUIRED);
newTypedInstances.add(newInstance);
/*
* Now replace old references with new Ids
*/
MapIds mapIds = new MapIds(entityProcessor.idToNewIdMap);
new ObjectGraphWalker(typeSystem, mapIds, newTypedInstances).walk();
} catch (MetadataException me) {
throw new RepositoryException(
String.format("Failed to create Instance(id = %s",
transientInstance.getId()), me);
}
}
return newTypedInstances;
}
private String addDiscoveredInstances(IReferenceableInstance entity,
EntityProcessor entityProcessor,
List<ITypedReferenceableInstance> newInstances)
List<ITypedReferenceableInstance> newTypedInstances)
throws MetadataException {
String guid = null;
for (ITypedReferenceableInstance instance : newInstances) { // Traverse over newInstances
for (ITypedReferenceableInstance typedInstance : newTypedInstances) { // Traverse over newInstances
Id id = instance.getId();
Id id = typedInstance.getId();
if (id == null) {
// oops
throw new RepositoryException("id cannot be null");
}
Vertex entityVertex = entityProcessor.idToVertexMap.get(id);
instances.put((String) entityVertex.getProperty(GUID_PROPERTY_KEY), instance);
Vertex instanceVertex = entityProcessor.idToVertexMap.get(id);
// add the attributes for the instance
final Map<String, AttributeInfo> fields = instance.fieldMapping().fields;
final Map<String, AttributeInfo> fields = typedInstance.fieldMapping().fields;
addInstanceToVertex(instance, entityVertex, fields,
addInstanceToVertex(typedInstance, instanceVertex, fields,
entityProcessor.idToVertexMap);
for (String traitName : instance.getTraits()) {
ITypedStruct traitInstance = (ITypedStruct) instance.getTrait(traitName);
for (String traitName : typedInstance.getTraits()) {
((TitanVertex) instanceVertex).addProperty("traits", traitName);
ITypedStruct traitInstance = (ITypedStruct) typedInstance.getTrait(traitName);
// add the attributes for the trait instance
entityVertex.setProperty(TRAIT_PROPERTY_SUFFIX + traitName, traitName);
addInstanceToVertex(traitInstance, entityVertex,
instanceVertex.setProperty(TRAIT_PROPERTY_SUFFIX + traitName, traitName);
addInstanceToVertex(traitInstance, instanceVertex,
traitInstance.fieldMapping().fields,
entityProcessor.idToVertexMap);
}
if (instance.getId() == entity.getId()) {
guid = entityVertex.getProperty(GUID_PROPERTY_KEY);
if (typedInstance.getId() == entity.getId()) {
guid = instanceVertex.getProperty(GUID_PROPERTY_KEY);
}
}
return guid;
}
private void addInstanceToVertex(ITypedInstance instance, Vertex entityVertex,
private void addInstanceToVertex(ITypedInstance typedInstance, Vertex instanceVertex,
Map<String, AttributeInfo> fields,
Map<Id, Vertex> idToVertexMap) throws MetadataException {
for (AttributeInfo attributeInfo : fields.values()) {
System.out.println("*** attributeInfo = " + attributeInfo);
final IDataType dataType = attributeInfo.dataType();
Object attributeValue = instance.get(attributeInfo.name);
Object attributeValue = typedInstance.get(attributeInfo.name);
switch (dataType.getTypeCategory()) {
case PRIMITIVE:
addPrimitiveToVertex(instance, entityVertex, attributeInfo);
addPrimitiveToVertex(typedInstance, instanceVertex, attributeInfo);
break;
case ENUM:
addToVertex(entityVertex, attributeInfo.name,
instance.getInt(attributeInfo.name));
addToVertex(instanceVertex, attributeInfo.name,
typedInstance.getInt(attributeInfo.name));
break;
case ARRAY:
......@@ -208,21 +351,21 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
case STRUCT:
ITypedStruct structInstance = (ITypedStruct) attributeValue;
addInstanceToVertex(structInstance, entityVertex,
addInstanceToVertex(structInstance, instanceVertex,
structInstance.fieldMapping().fields, idToVertexMap);
break;
case TRAIT:
ITypedStruct traitInstance = (ITypedStruct) attributeValue;
addInstanceToVertex(traitInstance, entityVertex,
addInstanceToVertex(traitInstance, instanceVertex,
traitInstance.fieldMapping().fields, idToVertexMap);
break;
case CLASS:
Id id = (Id) instance.get(attributeInfo.name);
Id id = (Id) typedInstance.get(attributeInfo.name);
if (id != null) {
Vertex referenceVertex = idToVertexMap.get(id);
addEdge(entityVertex, referenceVertex, "references");
GraphUtils.addEdge(instanceVertex, referenceVertex, id.id);
}
break;
......@@ -232,173 +375,182 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
}
}
protected Edge addEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) {
return addEdge(fromVertex, toVertex, edgeLabel, null);
}
protected Edge addEdge(Vertex fromVertex, Vertex toVertex,
String edgeLabel, String timestamp) {
Edge edge = findEdge(fromVertex, toVertex, edgeLabel);
Edge edgeToVertex = edge != null ? edge : fromVertex.addEdge(edgeLabel, toVertex);
if (timestamp != null) {
edgeToVertex.setProperty(TIMESTAMP_PROPERTY_KEY, timestamp);
}
return edgeToVertex;
}
protected Edge findEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) {
return findEdge(fromVertex, toVertex.getProperty(GUID_PROPERTY_KEY), edgeLabel);
}
protected Edge findEdge(Vertex fromVertex, Object toVertexName, String edgeLabel) {
Edge edgeToFind = null;
for (Edge edge : fromVertex.getEdges(Direction.OUT, edgeLabel)) {
if (edge.getVertex(Direction.IN).getProperty(GUID_PROPERTY_KEY).equals(toVertexName)) {
edgeToFind = edge;
break;
}
}
return edgeToFind;
}
/*
* Step 2: Traverse oldIdToInstance map create newInstances :
* List[ITypedReferenceableInstance]
* - create a ITypedReferenceableInstance.
* replace any old References ( ids or object references) with new Ids.
*/
private List<ITypedReferenceableInstance> discoverInstances(EntityProcessor entityProcessor)
throws RepositoryException {
List<ITypedReferenceableInstance> newInstances = new ArrayList<>();
for (IReferenceableInstance transientInstance : entityProcessor.idToInstanceMap.values()) {
LOG.debug("instance {}", transientInstance);
try {
ClassType cT = typeSystem.getDataType(
ClassType.class, transientInstance.getTypeName());
ITypedReferenceableInstance newInstance = cT.convert(
transientInstance, Multiplicity.REQUIRED);
newInstances.add(newInstance);
/*
* Now replace old references with new Ids
*/
MapIds mapIds = new MapIds(entityProcessor.idToNewIdMap);
new ObjectGraphWalker(typeSystem, mapIds, newInstances).walk();
} catch (MetadataException me) {
throw new RepositoryException(
String.format("Failed to create Instance(id = %s",
transientInstance.getId()), me);
}
}
return newInstances;
}
private void addPrimitiveToVertex(ITypedInstance instance,
Vertex entityVertex,
private void addPrimitiveToVertex(ITypedInstance typedInstance,
Vertex instanceVertex,
AttributeInfo attributeInfo) throws MetadataException {
if (instance.get(attributeInfo.name) == null) { // add only if instance has this attribute
if (typedInstance.get(attributeInfo.name) == null) { // add only if instance has this attribute
return;
}
if (attributeInfo.dataType() == DataTypes.STRING_TYPE) {
entityVertex.setProperty(attributeInfo.name, instance.getString(attributeInfo.name));
instanceVertex.setProperty(attributeInfo.name,
typedInstance.getString(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.SHORT_TYPE) {
entityVertex.setProperty(attributeInfo.name, instance.getShort(attributeInfo.name));
instanceVertex.setProperty(attributeInfo.name,
typedInstance.getShort(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.INT_TYPE) {
entityVertex.setProperty(attributeInfo.name, instance.getInt(attributeInfo.name));
instanceVertex.setProperty(attributeInfo.name,
typedInstance.getInt(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.BIGINTEGER_TYPE) {
entityVertex.setProperty(attributeInfo.name, instance.getBigInt(attributeInfo.name));
instanceVertex.setProperty(attributeInfo.name,
typedInstance.getBigInt(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.BOOLEAN_TYPE) {
entityVertex.setProperty(attributeInfo.name, instance.getBoolean(attributeInfo.name));
instanceVertex.setProperty(attributeInfo.name,
typedInstance.getBoolean(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.BYTE_TYPE) {
entityVertex.setProperty(attributeInfo.name, instance.getByte(attributeInfo.name));
instanceVertex.setProperty(attributeInfo.name,
typedInstance.getByte(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.LONG_TYPE) {
entityVertex.setProperty(attributeInfo.name, instance.getLong(attributeInfo.name));
instanceVertex.setProperty(attributeInfo.name,
typedInstance.getLong(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.FLOAT_TYPE) {
entityVertex.setProperty(attributeInfo.name, instance.getFloat(attributeInfo.name));
instanceVertex.setProperty(attributeInfo.name,
typedInstance.getFloat(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.DOUBLE_TYPE) {
entityVertex.setProperty(attributeInfo.name, instance.getDouble(attributeInfo.name));
instanceVertex.setProperty(attributeInfo.name,
typedInstance.getDouble(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.BIGDECIMAL_TYPE) {
entityVertex.setProperty(attributeInfo.name, instance.getBigDecimal(attributeInfo.name));
instanceVertex.setProperty(attributeInfo.name,
typedInstance.getBigDecimal(attributeInfo.name));
}
}
public static void addToVertex(Vertex entityVertex, String name, int value) {
entityVertex.setProperty(name, value);
public void addToVertex(Vertex instanceVertex, String name, int value) {
instanceVertex.setProperty(name, value);
}
@Override
public ITypedReferenceableInstance getEntityDefinition(String guid) throws RepositoryException {
LOG.info("Retrieving entity with guid={}", guid);
return instances.get(guid);
}
@Override
public List<String> getEntityList(String entityType) throws RepositoryException {
LOG.info("Retrieving entity list for type={}", entityType);
return Collections.emptyList();
private final class GraphToTypedInstanceMapper {
private ITypedReferenceableInstance mapGraphToTypedInstance(String guid,
Vertex instanceVertex)
throws MetadataException {
String typeName = instanceVertex.getProperty(ENTITY_TYPE_PROPERTY_KEY);
List<String> traits = new ArrayList<>();
for (TitanProperty property : ((TitanVertex) instanceVertex).getProperties( "traits")) {
traits.add((String) property.getValue());
}
private final class EntityProcessor implements ObjectGraphWalker.NodeProcessor {
Id id = new Id(guid, instanceVertex.<Integer>getProperty("version"), typeName);
public final Map<Id, Id> idToNewIdMap;
public final Map<Id, IReferenceableInstance> idToInstanceMap;
public final Map<Id, Vertex> idToVertexMap;
ClassType classType = typeSystem.getDataType(ClassType.class, typeName);
ITypedReferenceableInstance typedInstance = classType.createInstance(
id, traits.toArray(new String[traits.size()]));
public EntityProcessor() {
idToNewIdMap = new HashMap<>();
idToInstanceMap = new HashMap<>();
idToVertexMap = new HashMap<>();
graphToInstanceMapper.mapVertexToInstance(
instanceVertex, typedInstance, classType.fieldMapping().fields);
return typedInstance;
}
@Override
public void processNode(ObjectGraphWalker.Node nd) throws MetadataException {
IReferenceableInstance ref = null;
Id id = null;
private void mapVertexToInstance(Vertex instanceVertex, ITypedInstance typedInstance,
Map<String, AttributeInfo> fields) throws MetadataException {
for (AttributeInfo attributeInfo : fields.values()) {
System.out.println("*** attributeInfo = " + attributeInfo);
final IDataType dataType = attributeInfo.dataType();
if (nd.attributeName == null) {
ref = (IReferenceableInstance) nd.instance;
id = ref.getId();
} else if (nd.aInfo.dataType().getTypeCategory() == DataTypes.TypeCategory.CLASS) {
if (nd.value != null && (nd.value instanceof Id)) {
id = (Id) nd.value;
}
}
switch (dataType.getTypeCategory()) {
case PRIMITIVE:
mapVertexToInstance(instanceVertex, typedInstance, attributeInfo);
break; // add only if vertex has this attribute
if (id != null) {
if (id.isUnassigned()) {
if (!idToNewIdMap.containsKey(id)) {
idToNewIdMap.put(id, new Id(ID_SEQ.getAndIncrement(), 0, id.className));
}
case ENUM:
// EnumType enumType = typeSystem.getDataType(
// EnumType.class, attributeInfo.name);
// todo - is this enough
typedInstance.setInt(attributeInfo.name,
instanceVertex.<Integer>getProperty(attributeInfo.name));
break;
if (ref != null && idToInstanceMap.containsKey(id)) { // Oops
throw new RepositoryException(String.format(
"Unexpected internal error: Id %s processed again", id));
case ARRAY:
// todo - Add to/from json for collections
break;
case MAP:
// todo - Add to/from json for collections
break;
case STRUCT:
StructType structType = typeSystem.getDataType(
StructType.class, attributeInfo.name);
ITypedStruct structInstance = structType.createInstance();
typedInstance.set(attributeInfo.name, structInstance);
mapVertexToInstance(instanceVertex, structInstance,
structInstance.fieldMapping().fields);
break;
case TRAIT:
TraitType traitType = typeSystem.getDataType(
TraitType.class, attributeInfo.name);
ITypedStruct traitInstance = (ITypedStruct)
((ITypedReferenceableInstance) typedInstance).getTrait(attributeInfo.name);
typedInstance.set(attributeInfo.name, traitInstance);
mapVertexToInstance(instanceVertex, traitInstance,
traitType.fieldMapping().fields);
break;
case CLASS:
Id referenceId = null;
for (Edge edge : instanceVertex.getEdges(Direction.IN)) {
final Vertex vertex = edge.getVertex(Direction.OUT);
if (vertex.getProperty(ENTITY_TYPE_PROPERTY_KEY).equals(attributeInfo.name)) {
referenceId = new Id(vertex.<String>getProperty(GUID_PROPERTY_KEY),
vertex.<Integer>getProperty("version"),
attributeInfo.name);
break;
}
}
if (ref != null) {
idToInstanceMap.put(id, ref);
if (referenceId != null) {
typedInstance.set(attributeInfo.name, referenceId);
}
break;
default:
break;
}
}
}
public void createVerticesForClasses(TransactionalGraph transactionalGraph,
List<ITypedReferenceableInstance> newInstances) {
for (ITypedReferenceableInstance instance : newInstances) {
final Vertex entityVertex = transactionalGraph.addVertex(null);
entityVertex.setProperty(ENTITY_TYPE_PROPERTY_KEY, instance.getTypeName());
// entityVertex.setProperty("entityName", instance.getString("name"));
final String guid = UUID.randomUUID().toString();
entityVertex.setProperty(GUID_PROPERTY_KEY, guid);
private void mapVertexToInstance(Vertex instanceVertex,
ITypedInstance typedInstance,
AttributeInfo attributeInfo) throws MetadataException {
if (instanceVertex.getProperty(attributeInfo.name) == null) {
return;
}
idToVertexMap.put(instance.getId(), entityVertex);
if (attributeInfo.dataType() == DataTypes.STRING_TYPE) {
typedInstance.setString(attributeInfo.name,
instanceVertex.<String>getProperty(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.SHORT_TYPE) {
typedInstance.setShort(attributeInfo.name,
instanceVertex.<Short>getProperty(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.INT_TYPE) {
typedInstance.setInt(attributeInfo.name,
instanceVertex.<Integer>getProperty(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.BIGINTEGER_TYPE) {
typedInstance.setBigInt(attributeInfo.name,
instanceVertex.<BigInteger>getProperty(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.BOOLEAN_TYPE) {
typedInstance.setBoolean(attributeInfo.name,
instanceVertex.<Boolean>getProperty(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.BYTE_TYPE) {
typedInstance.setByte(attributeInfo.name,
instanceVertex.<Byte>getProperty(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.LONG_TYPE) {
typedInstance.setLong(attributeInfo.name,
instanceVertex.<Long>getProperty(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.FLOAT_TYPE) {
typedInstance.setFloat(attributeInfo.name,
instanceVertex.<Float>getProperty(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.DOUBLE_TYPE) {
typedInstance.setDouble(attributeInfo.name,
instanceVertex.<Double>getProperty(attributeInfo.name));
} else if (attributeInfo.dataType() == DataTypes.BIGDECIMAL_TYPE) {
typedInstance.setBigDecimal(attributeInfo.name,
instanceVertex.<BigDecimal>getProperty(attributeInfo.name));
}
}
}
......
......@@ -73,15 +73,6 @@ public interface MetadataService extends Service {
String getEntityDefinition(String guid) throws MetadataException;
/**
* Return the definition for the given entity name and type.
*
* @param entityName name
* @param entityType type
* @return entity definition as JSON
*/
String getEntityDefinition(String entityName, String entityType) throws MetadataException;
/**
* Return the list of entity names for the given type in the repository.
*
* @param entityType type
......
......@@ -23,9 +23,45 @@ public final class GraphUtils {
private static final Logger LOG = LoggerFactory.getLogger(GraphUtils.class);
private static final String GUID_PROPERTY_KEY = "guid";
private static final String TIMESTAMP_PROPERTY_KEY = "timestamp";
private GraphUtils() {
}
public static Edge addEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) {
return addEdge(fromVertex, toVertex, edgeLabel, null);
}
public static Edge addEdge(Vertex fromVertex, Vertex toVertex,
String edgeLabel, String timestamp) {
Edge edge = findEdge(fromVertex, toVertex, edgeLabel);
Edge edgeToVertex = edge != null ? edge : fromVertex.addEdge(edgeLabel, toVertex);
if (timestamp != null) {
edgeToVertex.setProperty(TIMESTAMP_PROPERTY_KEY, timestamp);
}
return edgeToVertex;
}
public static Edge findEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) {
return findEdge(fromVertex, toVertex.getProperty(GUID_PROPERTY_KEY), edgeLabel);
}
public static Edge findEdge(Vertex fromVertex, Object toVertexName, String edgeLabel) {
Edge edgeToFind = null;
for (Edge edge : fromVertex.getEdges(Direction.OUT, edgeLabel)) {
if (edge.getVertex(Direction.IN).getProperty(
GUID_PROPERTY_KEY).equals(toVertexName)) {
edgeToFind = edge;
break;
}
}
return edgeToFind;
}
public static Vertex findVertex(Graph blueprintsGraph,
String guid) {
LOG.debug("Finding vertex for: guid={}", guid);
......
......@@ -67,22 +67,23 @@ public class EntityResource {
}
@POST
@Path("submit/{entityType}")
@Path("submit/{typeName}")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response submit(@Context HttpServletRequest request,
@PathParam("entityType") final String entityType) {
@PathParam("typeName") final String typeName) {
try {
final String entity = Servlets.getRequestPayload(request);
System.out.println("entity = " + entity);
LOG.debug("submitting entity {} ", entity);
final String guid = metadataService.createEntity(entity, entityType);
final String guid = metadataService.createEntity(typeName, entity);
JSONObject response = new JSONObject();
response.put("GUID", guid);
response.put("requestId", Thread.currentThread().getName());
return Response.ok(response).build();
} catch (Exception e) {
LOG.error("Unable to persist instance for type {}", typeName, e);
throw new WebApplicationException(
Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
}
......@@ -109,8 +110,7 @@ public class EntityResource {
return Response.status(status).entity(response).build();
} catch (Exception e) {
LOG.error("Action failed: {}\nError: {}",
Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
LOG.error("Unable to get instance definition for GUID {}", guid, e);
throw new WebApplicationException(e, Response
.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity(e.getMessage())
......
......@@ -10,16 +10,21 @@ import org.apache.hadoop.metadata.types.HierarchicalTypeDefinition;
import org.apache.hadoop.metadata.types.IDataType;
import org.apache.hadoop.metadata.types.Multiplicity;
import org.apache.hadoop.metadata.types.TraitType;
import org.apache.hadoop.metadata.types.TypeSystem;
import org.testng.annotations.BeforeClass;
import javax.ws.rs.core.UriBuilder;
public class BaseResourceIT {
public abstract class BaseResourceIT {
protected TypeSystem typeSystem;
protected WebResource service;
@BeforeClass
public void setUp() throws Exception {
typeSystem = TypeSystem.getInstance();
typeSystem.reset();
String baseUrl = "http://localhost:21000/";
DefaultClientConfig config = new DefaultClientConfig();
......
......@@ -18,54 +18,78 @@
package org.apache.hadoop.metadata.web.resources;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import com.google.common.collect.ImmutableList;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import org.apache.hadoop.metadata.ITypedReferenceableInstance;
import org.apache.hadoop.metadata.Referenceable;
import org.apache.hadoop.metadata.Struct;
import org.apache.hadoop.metadata.json.Serialization$;
import org.apache.hadoop.metadata.json.TypesSerialization;
import org.apache.hadoop.metadata.types.AttributeDefinition;
import org.apache.hadoop.metadata.types.ClassType;
import org.apache.hadoop.metadata.types.DataTypes;
import org.apache.hadoop.metadata.types.HierarchicalTypeDefinition;
import org.apache.hadoop.metadata.types.Multiplicity;
import org.apache.hadoop.metadata.types.StructTypeDefinition;
import org.apache.hadoop.metadata.types.TraitType;
import org.codehaus.jettison.json.JSONObject;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.json.simple.JSONValue;
import org.testng.Assert;
import org.testng.annotations.Test;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
/**
* Integration tests for Entity Jersey Resource.
*/
@Test (enabled = false)
public class EntityJerseyResourceIT extends BaseResourceIT {
private static final String ENTITY_NAME = "clicks-table";
private static final String ENTITY_TYPE = "hive-table";
private static final String DATABASE_NAME = "ads";
private static final String TABLE_NAME = "clicks-table";
private static final String DATABASE_TYPE = "hive_database";
private static final String DATABASE_NAME = "foo";
private static final String TABLE_TYPE = "hive_table";
private static final String TABLE_NAME = "bar";
private static final String TRAIT_TYPE = "hive_fetl";
private String guid;
@BeforeClass
public void setUp() throws Exception {
super.setUp();
List<HierarchicalTypeDefinition> typeDefinitions = createHiveTypes();
submitTypes(typeDefinitions);
}
@Test
public void testSubmitEntity() throws Exception {
ITypedReferenceableInstance tableInstance = createHiveTableInstance();
@Test (enabled = false)
public void testSubmitEntity() {
String entityStream = getTestEntityJSON();
JsonParser parser = new JsonParser();
String instanceAsJSON = Serialization$.MODULE$.toJson(tableInstance);
WebResource resource = service
.path("api/metadata/entities/submit")
.path(ENTITY_TYPE);
.path(TABLE_TYPE);
ClientResponse clientResponse = resource
.accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)
.method(HttpMethod.POST, ClientResponse.class, entityStream);
.method(HttpMethod.POST, ClientResponse.class, instanceAsJSON);
Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
String response = clientResponse.getEntity(String.class);
Assert.assertNotNull(response);
String responseAsString = clientResponse.getEntity(String.class);
Assert.assertNotNull(responseAsString);
JSONObject response = new JSONObject(responseAsString);
Assert.assertNotNull(response.get("requestId"));
JsonElement elem = parser.parse(response);
String guid = elem.getAsJsonObject().get("GUID").getAsString();
guid = response.get("GUID").toString();
Assert.assertNotNull(guid);
try {
Assert.assertNotNull(UUID.fromString(guid));
......@@ -74,12 +98,11 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
}
}
@Test (dependsOnMethods = "testSubmitEntity", enabled = false)
@Test (dependsOnMethods = "testSubmitEntity")
public void testGetEntityDefinition() {
WebResource resource = service
.path("api/metadata/entities/definition")
.path(ENTITY_TYPE)
.path(ENTITY_NAME);
.path(guid);
ClientResponse clientResponse = resource
.accept(MediaType.APPLICATION_JSON)
......@@ -90,20 +113,10 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
System.out.println("response = " + response);
}
private static String getTestEntityJSON() {
Map<String, String> props = new HashMap<>();
props.put("entityName", ENTITY_NAME);
props.put("entityType", ENTITY_TYPE);
props.put("database", DATABASE_NAME);
props.put("table", TABLE_NAME);
return JSONValue.toJSONString(props);
}
@Test (enabled = false)
@Test
public void testGetInvalidEntityDefinition() {
WebResource resource = service
.path("api/metadata/entities/definition")
.path(ENTITY_TYPE)
.path("blah");
ClientResponse clientResponse = resource
......@@ -119,7 +132,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
public void testGetEntityList() {
ClientResponse clientResponse = service
.path("api/metadata/entities/list/")
.path(ENTITY_TYPE)
.path(TABLE_TYPE)
.accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)
.method(HttpMethod.GET, ClientResponse.class);
......@@ -139,4 +152,81 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
String response = clientResponse.getEntity(String.class);
System.out.println("response = " + response);
}
private List<HierarchicalTypeDefinition> createHiveTypes() throws Exception {
ArrayList<HierarchicalTypeDefinition> typeDefinitions = new ArrayList<>();
HierarchicalTypeDefinition<ClassType> databaseTypeDefinition =
createClassTypeDef(DATABASE_TYPE,
ImmutableList.<String>of(),
createRequiredAttrDef("name", DataTypes.STRING_TYPE),
createRequiredAttrDef("description", DataTypes.STRING_TYPE));
typeDefinitions.add(databaseTypeDefinition);
HierarchicalTypeDefinition<ClassType> tableTypeDefinition =
createClassTypeDef(TABLE_TYPE,
ImmutableList.<String>of(),
createRequiredAttrDef("name", DataTypes.STRING_TYPE),
createRequiredAttrDef("description", DataTypes.STRING_TYPE),
createRequiredAttrDef("type", DataTypes.STRING_TYPE),
new AttributeDefinition(DATABASE_TYPE,
DATABASE_TYPE, Multiplicity.REQUIRED, true, DATABASE_TYPE));
typeDefinitions.add(tableTypeDefinition);
HierarchicalTypeDefinition<TraitType> fetlTypeDefinition =
createTraitTypeDef(TRAIT_TYPE,
ImmutableList.<String>of(),
createRequiredAttrDef("level", DataTypes.INT_TYPE));
typeDefinitions.add(fetlTypeDefinition);
typeSystem.defineTypes(
ImmutableList.<StructTypeDefinition>of(),
ImmutableList.of(fetlTypeDefinition),
ImmutableList.of(databaseTypeDefinition, tableTypeDefinition));
return typeDefinitions;
}
private void submitTypes(List<HierarchicalTypeDefinition> typeDefinitions) throws Exception {
for (HierarchicalTypeDefinition typeDefinition : typeDefinitions) {
String typesAsJSON = TypesSerialization.toJson(
typeSystem, typeDefinition.typeName);
WebResource resource = service
.path("api/metadata/types/submit")
.path(typeDefinition.typeName);
ClientResponse clientResponse = resource
.accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)
.method(HttpMethod.POST, ClientResponse.class, typesAsJSON);
Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
String responseAsString = clientResponse.getEntity(String.class);
Assert.assertNotNull(responseAsString);
JSONObject response = new JSONObject(responseAsString);
Assert.assertEquals(response.get("typeName"), typeDefinition.typeName);
Assert.assertNotNull(response.get("types"));
Assert.assertNotNull(response.get("requestId"));
}
}
protected ITypedReferenceableInstance createHiveTableInstance() throws Exception {
Referenceable databaseInstance = new Referenceable(DATABASE_TYPE);
databaseInstance.set("name", DATABASE_NAME);
databaseInstance.set("description", "foo database");
Referenceable tableInstance = new Referenceable(TABLE_TYPE, TRAIT_TYPE);
tableInstance.set("name", TABLE_NAME);
tableInstance.set("description", "bar table");
tableInstance.set("type", "managed");
tableInstance.set(DATABASE_TYPE, databaseInstance);
Struct traitInstance = (Struct) tableInstance.getTrait(TRAIT_TYPE);
traitInstance.set("level", 1);
ClassType tableType = typeSystem.getDataType(ClassType.class, TABLE_TYPE);
return tableType.convert(tableInstance, Multiplicity.REQUIRED);
}
}
......@@ -21,7 +21,6 @@ package org.apache.hadoop.metadata.web.resources;
import com.google.common.collect.ImmutableList;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.json.TypesSerialization;
import org.apache.hadoop.metadata.types.AttributeDefinition;
import org.apache.hadoop.metadata.types.ClassType;
......@@ -30,7 +29,6 @@ import org.apache.hadoop.metadata.types.HierarchicalTypeDefinition;
import org.apache.hadoop.metadata.types.Multiplicity;
import org.apache.hadoop.metadata.types.StructTypeDefinition;
import org.apache.hadoop.metadata.types.TraitType;
import org.apache.hadoop.metadata.types.TypeSystem;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.testng.Assert;
......@@ -49,14 +47,12 @@ import java.util.List;
*/
public class TypesJerseyResourceIT extends BaseResourceIT {
private TypeSystem typeSystem;
private List<HierarchicalTypeDefinition> typeDefinitions;
@BeforeClass
public void setUp() throws Exception {
super.setUp();
typeSystem = TypeSystem.getInstance();
typeDefinitions = createHiveTypes();
}
......@@ -151,11 +147,12 @@ public class TypesJerseyResourceIT extends BaseResourceIT {
Assert.assertNotNull(list);
}
private List<HierarchicalTypeDefinition> createHiveTypes() throws MetadataException {
private List<HierarchicalTypeDefinition> createHiveTypes() throws Exception {
ArrayList<HierarchicalTypeDefinition> typeDefinitions = new ArrayList<>();
HierarchicalTypeDefinition<ClassType> databaseTypeDefinition =
createClassTypeDef("database", ImmutableList.<String>of(),
createClassTypeDef("database",
ImmutableList.<String>of(),
createRequiredAttrDef("name", DataTypes.STRING_TYPE),
createRequiredAttrDef("description", DataTypes.STRING_TYPE));
typeDefinitions.add(databaseTypeDefinition);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment