Commit 1643827e by Dennis Fusaro
parents b9cae771 d1335fc9
...@@ -18,9 +18,9 @@ ...@@ -18,9 +18,9 @@
package org.apache.hadoop.metadata.services; package org.apache.hadoop.metadata.services;
import com.thinkaurelius.titan.core.PropertyKey;
import com.thinkaurelius.titan.core.TitanFactory; import com.thinkaurelius.titan.core.TitanFactory;
import com.thinkaurelius.titan.core.TitanGraph; import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
import com.thinkaurelius.titan.core.schema.TitanManagement; import com.thinkaurelius.titan.core.schema.TitanManagement;
import com.tinkerpop.blueprints.Edge; import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Graph; import com.tinkerpop.blueprints.Graph;
...@@ -35,7 +35,10 @@ import org.slf4j.Logger; ...@@ -35,7 +35,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Set; import java.util.Set;
/** /**
...@@ -50,6 +53,8 @@ public class TitanGraphService implements GraphService { ...@@ -50,6 +53,8 @@ public class TitanGraphService implements GraphService {
* Constant for the configuration property that indicates the prefix. * Constant for the configuration property that indicates the prefix.
*/ */
private static final String METADATA_PREFIX = "metadata.graph."; private static final String METADATA_PREFIX = "metadata.graph.";
private static final String INDEXER_PREFIX = "metadata.indexer.vertex.";
private static final List<String> acceptedTypes = Arrays.asList("String","Int","Long");
private TitanGraph titanGraph; private TitanGraph titanGraph;
private Set<String> vertexIndexedKeys; private Set<String> vertexIndexedKeys;
...@@ -104,70 +109,88 @@ public class TitanGraphService implements GraphService { ...@@ -104,70 +109,88 @@ public class TitanGraphService implements GraphService {
return graphConfig; return graphConfig;
} }
private static Configuration getConfiguration(String filename, String prefix) throws ConfigurationException {
PropertiesConfiguration configProperties =
new PropertiesConfiguration(filename);
Configuration graphConfig = new PropertiesConfiguration();
final Iterator<String> iterator = configProperties.getKeys();
while (iterator.hasNext()) {
String key = iterator.next();
if (key.startsWith(prefix)) {
String value = (String) configProperties.getProperty(key);
key = key.substring(prefix.length());
graphConfig.setProperty(key, value);
}
}
return graphConfig;
}
protected TitanGraph initializeGraphDB(Configuration graphConfig) { protected TitanGraph initializeGraphDB(Configuration graphConfig) {
LOG.info("Initializing titanGraph db"); LOG.info("Initializing titanGraph db");
return TitanFactory.open(graphConfig); return TitanFactory.open(graphConfig);
} }
protected void createIndicesForVertexKeys() { protected void createIndicesForVertexKeys() throws ConfigurationException {
if (!titanGraph.getIndexedKeys(Vertex.class).isEmpty()) { if (!titanGraph.getIndexedKeys(Vertex.class).isEmpty()) {
LOG.info("Indexes already exist for titanGraph"); LOG.info("Indexes already exist for titanGraph");
return; return;
} }
LOG.info("Indexes does not exist, Creating indexes for titanGraph"); LOG.info("Indexes do not exist, Creating indexes for titanGraph using indexer.properties.");
// todo - add index for vertex and edge property keys
// Titan index backend does not support Mixed Index - must use a separate backend for that.
// Using composite for now. Literal matches only. Using Global Titan index.
TitanManagement mgmt = titanGraph.getManagementSystem();
// This is the first run try-it-out index setup for property keys.
// These were pulled from the Hive bridge. Edge indices to come.
PropertyKey desc = mgmt.makePropertyKey("DESC").dataType(String.class).make();
mgmt.buildIndex("byDesc",Vertex.class).addKey(desc).buildCompositeIndex();
PropertyKey dbLoc = mgmt.makePropertyKey("DB_LOCATION_URI").dataType(String.class).make();
mgmt.buildIndex("byDbloc",Vertex.class).addKey(dbLoc).buildCompositeIndex();
PropertyKey name = mgmt.makePropertyKey("NAME").dataType(String.class).make();
mgmt.buildIndex("byName",Vertex.class).addKey(name).buildCompositeIndex();
PropertyKey tableName = mgmt.makePropertyKey("TBL_NAME").dataType(String.class).make(); Configuration indexConfig = getConfiguration("indexer.properties", INDEXER_PREFIX);
mgmt.buildIndex("byTableName",Vertex.class).addKey(tableName).buildCompositeIndex();
PropertyKey tableType = mgmt.makePropertyKey("TBL_TYPE").dataType(String.class).make(); TitanManagement mgmt = titanGraph.getManagementSystem();
mgmt.buildIndex("byTableType",Vertex.class).addKey(tableType).buildCompositeIndex(); mgmt.buildIndex("mainIndex", Vertex.class).buildMixedIndex("search");
TitanGraphIndex graphIndex = mgmt.getGraphIndex("mainIndex");
PropertyKey createTime = mgmt.makePropertyKey("CREATE_TIME").dataType(Long.class).make(); // Properties are formatted: prop_name:type;prop_name:type
mgmt.buildIndex("byCreateTime",Vertex.class).addKey(createTime).buildCompositeIndex(); // E.g. Name:String;Date:Long
if (!indexConfig.isEmpty()) {
PropertyKey colName = mgmt.makePropertyKey("COLUMN_NAME").dataType(String.class).make(); // Get a list of property names to iterate through...
mgmt.buildIndex("byColName",Vertex.class).addKey(colName).buildCompositeIndex(); List<String> propList = new ArrayList<String>();
PropertyKey typeName = mgmt.makePropertyKey("TYPE_NAME").dataType(String.class).make(); Iterator<String> it = indexConfig.getKeys("property.name");
mgmt.buildIndex("byTypeName",Vertex.class).addKey(typeName).buildCompositeIndex();
while (it.hasNext()) {
propList.add(it.next());
}
/* More attributes from the Hive bridge. it = propList.iterator();
while (it.hasNext()) {
PropertyKey ownerName = mgmt.makePropertyKey("OWNER_NAME").dataType(String.class).make(); // Pull the property name and index, so we can register the name and look up the type.
mgmt.buildIndex("byOwnerName",Vertex.class).addKey(ownerName).buildCompositeIndex(); String prop = it.next().toString();
String index = prop.substring(prop.lastIndexOf(".") + 1);
String type = null;
prop = indexConfig.getProperty(prop).toString();
PropertyKey lastAccess = mgmt.makePropertyKey("LAST_ACCESS_TIME").dataType(Long.class).make(); // Look up the type for the specified property name.
mgmt.buildIndex("byLastAccess",Vertex.class).addKey(lastAccess).buildCompositeIndex(); if (indexConfig.containsKey("property.type." + index)) {
type = indexConfig.getProperty("property.type." + index).toString();
} else {
throw new ConfigurationException("No type specified for property " + index + " in indexer.properties.");
}
PropertyKey viewExpandedText = mgmt.makePropertyKey("VIEW_EXPANDED_TEXT").dataType(String.class).make(); // Is the type submitted one of the approved ones?
mgmt.buildIndex("byExpandedText",Vertex.class).addKey(viewExpandedText).buildCompositeIndex(); if (!acceptedTypes.contains(type)) {
throw new ConfigurationException("The type provided in indexer.properties for property " + prop + " is not supported. Supported types are: " + acceptedTypes.toString());
}
PropertyKey viewOrigText= mgmt.makePropertyKey("VIEW_ORIGINAL_TEXT").dataType(String.class).make(); // Add the key.
mgmt.buildIndex("byOrigText",Vertex.class).addKey(viewOrigText).buildCompositeIndex(); LOG.info("Adding property: " + prop + " to index as type: " + type);
mgmt.addIndexKey(graphIndex,mgmt.makePropertyKey(prop).dataType(type.getClass()).make());
PropertyKey comment = mgmt.makePropertyKey("COMMENT").dataType(Integer.class).make(); }
mgmt.buildIndex("byComment",Vertex.class).addKey(comment).buildCompositeIndex();
*/
mgmt.commit(); mgmt.commit();
LOG.info("Index creation complete.");
}
} }
/** /**
......
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This is formatted as follows:
# metadata.indexer.vertex.property.name.<index>=<Property Name>
# metadata.indexer.vertex.property.type.<index>=<Data Type>
metadata.indexer.vertex.property.name.0=DESC
metadata.indexer.vertex.property.type.0=String
metadata.indexer.vertex.property.name.1=DB_LOCATION_URI
metadata.indexer.vertex.property.type.1=String
metadata.indexer.vertex.property.name.2=NAME
metadata.indexer.vertex.property.type.2=String
metadata.indexer.vertex.property.name.3=OWNER_NAME
metadata.indexer.vertex.property.type.3=String
metadata.indexer.vertex.property.name.4=TBL_NAME
metadata.indexer.vertex.property.type.4=String
metadata.indexer.vertex.property.name.5=COMMENT
metadata.indexer.vertex.property.type.5=String
metadata.indexer.vertex.property.name.6=COLUMN_NAME
metadata.indexer.vertex.property.type.6=String
metadata.indexer.vertex.property.name.7=TYPE_NAME
metadata.indexer.vertex.property.type.7=String
...@@ -160,7 +160,6 @@ ...@@ -160,7 +160,6 @@
<groupId>com.google.guava</groupId> <groupId>com.google.guava</groupId>
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
<version>${guava.version}</version> <version>${guava.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
......
...@@ -64,13 +64,15 @@ public class DiscoverInstances implements ObjectGraphWalker.NodeProcessor { ...@@ -64,13 +64,15 @@ public class DiscoverInstances implements ObjectGraphWalker.NodeProcessor {
if ( !idToNewIdMap.containsKey(id)) { if ( !idToNewIdMap.containsKey(id)) {
idToNewIdMap.put(id, repository.newId(id.className)); idToNewIdMap.put(id, repository.newId(id.className));
} }
if ( idToInstanceMap.containsKey(ref)) { if ( ref != null && idToInstanceMap.containsKey(ref)) {
// Oops // Oops
throw new RepositoryException( throw new RepositoryException(
String.format("Unexpected internal error: Id %s processed again", id)); String.format("Unexpected internal error: Id %s processed again", id));
} }
if ( ref != null ) {
idToInstanceMap.put(id, ref); idToInstanceMap.put(id, ref);
} }
} }
} }
}
} }
...@@ -81,7 +81,7 @@ public class ClassType extends HierarchicalType<ClassType, IReferenceableInstanc ...@@ -81,7 +81,7 @@ public class ClassType extends HierarchicalType<ClassType, IReferenceableInstanc
if ( val != null ) { if ( val != null ) {
if ( val instanceof ITypedReferenceableInstance) { if ( val instanceof ITypedReferenceableInstance) {
ITypedReferenceableInstance tr = (ITypedReferenceableInstance) val; ITypedReferenceableInstance tr = (ITypedReferenceableInstance) val;
if ( tr.getTypeName() != getName() ) { if ( !tr.getTypeName().equals(getName()) ) {
/* /*
* If val is a subType instance; invoke convert on it. * If val is a subType instance; invoke convert on it.
*/ */
......
akka { akka {
//loggers = [akka.event.slf4j.Slf4jLogger] loglevel = DEBUG
loglevel = debug stdout-loglevel = DEBUG
actor { event-handlers = ["akka.event.Logging$DefaultLogger"]
debug { default-dispatcher {
receive = on fork-join-executor {
lifecycle = on parallelism-min = 8
}
}
test {
timefactor = 1
}
}
spray {
can {
server {
server-header = "Metadata Service"
} }
} }
} }
app {
interface="localhost" http {
port= 8080 host = "0.0.0.0"
host = ${?HOST}
port = 9140
port = ${?PORT}
} }
...@@ -18,18 +18,15 @@ ...@@ -18,18 +18,15 @@
package org.apache.hadoop.metadata.json package org.apache.hadoop.metadata.json
import org.apache.jute.compiler.JLong
import org.apache.hadoop.metadata.types.DataTypes.{MapType, TypeCategory, ArrayType}
import org.apache.hadoop.metadata._ import org.apache.hadoop.metadata._
import org.apache.hadoop.metadata.storage.{Id, ReferenceableInstance}
import org.apache.hadoop.metadata.types.DataTypes.{ArrayType, MapType, TypeCategory}
import org.apache.hadoop.metadata.types._ import org.apache.hadoop.metadata.types._
import org.apache.hadoop.metadata.storage.{ReferenceableInstance, Id}
import org.json4s.JsonAST.JInt import org.json4s.JsonAST.JInt
import org.json4s._ import org.json4s._
import org.json4s.native.Serialization.{write => swrite, _} import org.json4s.native.Serialization.{write => swrite, _}
import org.json4s.reflect.{ScalaType, Reflector}
import java.util.regex.Pattern import scala.collection.JavaConversions._
import java.util.Date
import collection.JavaConversions._
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
class BigDecimalSerializer extends CustomSerializer[java.math.BigDecimal](format => ( { class BigDecimalSerializer extends CustomSerializer[java.math.BigDecimal](format => ( {
...@@ -50,6 +47,15 @@ class IdSerializer extends CustomSerializer[Id](format => ( { ...@@ -50,6 +47,15 @@ class IdSerializer extends CustomSerializer[Id](format => ( {
case JObject(JField("id", JInt(id)) :: case JObject(JField("id", JInt(id)) ::
JField(Serialization.STRUCT_TYPE_FIELD_NAME, JString(className)) :: JField(Serialization.STRUCT_TYPE_FIELD_NAME, JString(className)) ::
JField("version", JInt(version)) :: Nil) => new Id(id.toLong, version.toInt, className) JField("version", JInt(version)) :: Nil) => new Id(id.toLong, version.toInt, className)
case JObject(JField(Serialization.STRUCT_TYPE_FIELD_NAME, JString(className)) ::
JField("id", JInt(id)) ::
JField("version", JInt(version)) :: Nil) => new Id(id.toLong, version.toInt, className)
case JObject(JField("id", JString(id)) ::
JField(Serialization.STRUCT_TYPE_FIELD_NAME, JString(className)) ::
JField("version", JString(version)) :: Nil) => new Id(id.toLong, version.toInt, className)
case JObject(JField(Serialization.STRUCT_TYPE_FIELD_NAME, JString(className)) ::
JField("id", JString(id)) ::
JField("version", JString(version)) :: Nil) => new Id(id.toLong, version.toInt, className)
}, { }, {
case id: Id => JObject(JField("id", JInt(id.id)), case id: Id => JObject(JField("id", JInt(id.id)),
JField(Serialization.STRUCT_TYPE_FIELD_NAME, JString(id.className)), JField(Serialization.STRUCT_TYPE_FIELD_NAME, JString(id.className)),
...@@ -57,17 +63,19 @@ class IdSerializer extends CustomSerializer[Id](format => ( { ...@@ -57,17 +63,19 @@ class IdSerializer extends CustomSerializer[Id](format => ( {
} }
)) ))
class TypedStructSerializer extends Serializer[ITypedStruct] { class TypedStructSerializer(val typSystem : Option[MetadataService] = None) extends Serializer[ITypedStruct] {
def currentMdSvc = typSystem.getOrElse(MetadataService.getCurrentService())
def deserialize(implicit format: Formats) = { def deserialize(implicit format: Formats) = {
case (TypeInfo(clazz, ptype), json) if classOf[ITypedStruct].isAssignableFrom(clazz) => json match { case (TypeInfo(clazz, ptype), json) if classOf[ITypedStruct].isAssignableFrom(clazz) => json match {
case JObject(fs) => case JObject(fs) =>
val(typ, fields) = fs.partition(f => f._1 == Serialization.STRUCT_TYPE_FIELD_NAME) val(typ, fields) = fs.partition(f => f._1 == Serialization.STRUCT_TYPE_FIELD_NAME)
val typName = typ(0)._2.asInstanceOf[JString].s val typName = typ(0)._2.asInstanceOf[JString].s
val sT = MetadataService.getCurrentTypeSystem().getDataType( val sT = currentMdSvc.getTypeSystem.getDataType(
classOf[IConstructableType[IStruct, ITypedStruct]], typName).asInstanceOf[IConstructableType[IStruct, ITypedStruct]] classOf[IConstructableType[IStruct, ITypedStruct]], typName).asInstanceOf[IConstructableType[IStruct, ITypedStruct]]
val s = sT.createInstance() val s = sT.createInstance()
Serialization.deserializeFields(sT, s, fields) Serialization.deserializeFields(currentMdSvc, sT, s, fields)
s s
case x => throw new MappingException("Can't convert " + x + " to TypedStruct") case x => throw new MappingException("Can't convert " + x + " to TypedStruct")
} }
...@@ -88,7 +96,10 @@ class TypedStructSerializer extends Serializer[ITypedStruct] { ...@@ -88,7 +96,10 @@ class TypedStructSerializer extends Serializer[ITypedStruct] {
} }
} }
class TypedReferenceableInstanceSerializer extends Serializer[ITypedReferenceableInstance] { class TypedReferenceableInstanceSerializer(val typSystem : Option[MetadataService] = None)
extends Serializer[ITypedReferenceableInstance] {
def currentMdSvc = typSystem.getOrElse(MetadataService.getCurrentService())
def deserialize(implicit format: Formats) = { def deserialize(implicit format: Formats) = {
case (TypeInfo(clazz, ptype), json) if classOf[ITypedReferenceableInstance].isAssignableFrom(clazz) => json match { case (TypeInfo(clazz, ptype), json) if classOf[ITypedReferenceableInstance].isAssignableFrom(clazz) => json match {
...@@ -120,11 +131,11 @@ class TypedReferenceableInstanceSerializer extends Serializer[ITypedReferenceabl ...@@ -120,11 +131,11 @@ class TypedReferenceableInstanceSerializer extends Serializer[ITypedReferenceabl
} }
val typName = typField.get._2.asInstanceOf[JString].s val typName = typField.get._2.asInstanceOf[JString].s
val sT = MetadataService.getCurrentTypeSystem().getDataType( val sT = currentMdSvc.getTypeSystem.getDataType(
classOf[ClassType], typName).asInstanceOf[ClassType] classOf[ClassType], typName).asInstanceOf[ClassType]
val id = Serialization.deserializeId(idField.get._2) val id = Serialization.deserializeId(idField.get._2)
val s = sT.createInstance(id, traitNames:_*) val s = sT.createInstance(id, traitNames:_*)
Serialization.deserializeFields(sT, s, fields) Serialization.deserializeFields(currentMdSvc, sT, s, fields)
traitsField.map { t => traitsField.map { t =>
val tObj :JObject = t._2.asInstanceOf[JObject] val tObj :JObject = t._2.asInstanceOf[JObject]
...@@ -132,10 +143,10 @@ class TypedReferenceableInstanceSerializer extends Serializer[ITypedReferenceabl ...@@ -132,10 +143,10 @@ class TypedReferenceableInstanceSerializer extends Serializer[ITypedReferenceabl
val tName : String = oTrait._1 val tName : String = oTrait._1
val traitJObj : JObject = oTrait._2.asInstanceOf[JObject] val traitJObj : JObject = oTrait._2.asInstanceOf[JObject]
val traitObj = s.getTrait(tName).asInstanceOf[ITypedStruct] val traitObj = s.getTrait(tName).asInstanceOf[ITypedStruct]
val tT = MetadataService.getCurrentTypeSystem().getDataType( val tT = currentMdSvc.getTypeSystem.getDataType(
classOf[TraitType], traitObj.getTypeName).asInstanceOf[TraitType] classOf[TraitType], traitObj.getTypeName).asInstanceOf[TraitType]
val(tTyp, tFields) = traitJObj.obj.partition(f => f._1 == Serialization.STRUCT_TYPE_FIELD_NAME) val(tTyp, tFields) = traitJObj.obj.partition(f => f._1 == Serialization.STRUCT_TYPE_FIELD_NAME)
Serialization.deserializeFields(tT, traitObj, tFields) Serialization.deserializeFields(currentMdSvc, tT, traitObj, tFields)
} }
} }
...@@ -217,9 +228,12 @@ object Serialization { ...@@ -217,9 +228,12 @@ object Serialization {
} }
}.toList.map(_.asInstanceOf[JField]) }.toList.map(_.asInstanceOf[JField])
def deserializeFields[T <: ITypedInstance](sT : IConstructableType[_, T], def deserializeFields[T <: ITypedInstance](currentMdSvc: MetadataService,
sT : IConstructableType[_, T],
s : T, fields : List[JField] )(implicit format: Formats) s : T, fields : List[JField] )(implicit format: Formats)
= fields.foreach { f => = {
MetadataService.setCurrentService(currentMdSvc)
fields.foreach { f =>
val fName = f._1 val fName = f._1
val fInfo = sT.fieldMapping.fields(fName) val fInfo = sT.fieldMapping.fields(fName)
if ( fInfo != null ) { if ( fInfo != null ) {
...@@ -236,6 +250,7 @@ object Serialization { ...@@ -236,6 +250,7 @@ object Serialization {
s.set(fName, Serialization.extract(fInfo.dataType(), v)) s.set(fName, Serialization.extract(fInfo.dataType(), v))
} }
} }
}
def deserializeId(value : JValue)(implicit format: Formats) = value match { def deserializeId(value : JValue)(implicit format: Formats) = value match {
case JObject(JField("id", JInt(id)) :: case JObject(JField("id", JInt(id)) ::
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.tools.simpleserver
import akka.actor.{Props, ActorSystem}
import akka.io.IO
import com.typesafe.config.ConfigFactory
import org.apache.hadoop.metadata.storage.memory.MemRepository
import org.apache.hadoop.metadata.types.TypeSystem
import spray.can.Http
/**
* A Simple Spray based server to test the TypeSystem and MemRepository.
*
* @example {{{
* -- Using the [[ https://github.com/jakubroztocil/httpie Httpie tool]]
*
* http GET localhost:9140/listTypeNames
* pbpaste | http PUT localhost:9140/defineTypes
* http GET localhost:9140/typeDetails typeNames:='["Department", "Person", "Manager"]'
*
* pbpaste | http PUT localhost:9140/createInstance
* pbpaste | http GET localhost:9140/getInstance
* }}}
*
* - On the Mac, pbpaste makes available what is copied to clipboard. Copy contents of resources/sampleTypes.json
* - for createInstance resources/sampleInstance.json is an example
* - for getInstance send an Id back, you can copy the output from createInstance.
*
*/
object Main extends App {
val config = ConfigFactory.load()
val host = config.getString("http.host")
val port = config.getInt("http.port")
implicit val system = ActorSystem("metadataservice")
val typSys = new TypeSystem
val memRepo = new MemRepository(typSys)
val api = system.actorOf(Props(new RestInterface(typSys, memRepo)), "httpInterface")
IO(Http) ! Http.Bind(listener = api, interface = host, port = port)
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.tools.simpleserver
import akka.actor._
import akka.util.Timeout
import com.google.common.collect.ImmutableList
import org.apache.hadoop.metadata.{MetadataService, ITypedReferenceableInstance}
import org.apache.hadoop.metadata.json._
import org.apache.hadoop.metadata.storage.memory.MemRepository
import org.apache.hadoop.metadata.types._
import org.json4s.{Formats, NoTypeHints}
import spray.httpx.Json4sSupport
import org.apache.hadoop.metadata.storage.Id
import scala.concurrent.duration._
class MetadataActor(val typeSystem: TypeSystem, val memRepository : MemRepository) extends Actor with ActorLogging {
import org.apache.hadoop.metadata.tools.simpleserver.MetadataProtocol._
import scala.collection.JavaConversions._
import scala.language.postfixOps
implicit val timeout = Timeout(5 seconds)
def receive = {
case ListTypeNames() =>
sender ! TypeNames(typeSystem.getTypeNames.toList)
case GetTypeDetails(typeNames) =>
val typesDef = TypesSerialization.convertToTypesDef(typeSystem, (d : IDataType[_]) => typeNames.contains(d.getName))
sender ! TypeDetails(typesDef)
case DefineTypes(typesDef : TypesDef) =>
typesDef.enumTypes.foreach(typeSystem.defineEnumType(_))
typeSystem.defineTypes(ImmutableList.copyOf(typesDef.structTypes.toArray),
ImmutableList.copyOf(typesDef.traitTypes.toArray),
ImmutableList.copyOf(typesDef.classTypes.toArray))
var newTypes : List[HierarchicalType[_ <: HierarchicalType[_ <: AnyRef, _], _]] = Nil
typesDef.traitTypes.foreach { tDef =>
val nm = tDef.typeName
newTypes = newTypes :+
typeSystem.getDataType(classOf[HierarchicalType[_ <: HierarchicalType[_ <: AnyRef, _], _]], nm)
}
typesDef.classTypes.foreach { tDef =>
val nm = tDef.typeName
newTypes = newTypes :+
typeSystem.getDataType(classOf[HierarchicalType[_ <: HierarchicalType[_ <: AnyRef, _], _]], nm)
}
memRepository.defineTypes(newTypes)
sender ! TypesCreated
case CreateInstance(i) =>
MetadataService.setCurrentService(new MetadataService(memRepository, typeSystem))
val r = memRepository.create(i)
sender ! InstanceCreated(r.getId)
case GetInstance(id) =>
MetadataService.setCurrentService(new MetadataService(memRepository, typeSystem))
val r = memRepository.get(id)
sender ! InstanceDetails(r)
}
}
object MetadataProtocol {
case class ListTypeNames()
case class TypeNames(typeNames : List[String])
case class GetTypeDetails(typeNames : List[String])
case class TypeDetails(types : TypesDef)
case class DefineTypes(types : TypesDef)
case class TypesCreated()
case class CreateInstance(i: ITypedReferenceableInstance)
case class InstanceCreated(id : Id)
case class GetInstance(id : Id)
case class InstanceDetails(i: ITypedReferenceableInstance)
}
trait Json4sProtocol extends Json4sSupport {
val typeSystem : TypeSystem
val memRepository : MemRepository
implicit def json4sFormats: Formats =
org.json4s.native.Serialization.formats(NoTypeHints) + new MultiplicitySerializer +
new TypedStructSerializer(Some(new MetadataService(memRepository, typeSystem))) +
new TypedReferenceableInstanceSerializer(Some(new MetadataService(memRepository, typeSystem))) +
new BigDecimalSerializer + new BigIntegerSerializer + new IdSerializer
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.tools.simpleserver
import akka.actor._
import akka.util.Timeout
import org.apache.hadoop.metadata.{MetadataService, ITypedReferenceableInstance}
import org.apache.hadoop.metadata.json.TypesDef
import org.apache.hadoop.metadata.storage.Id
import org.apache.hadoop.metadata.storage.memory.MemRepository
import org.apache.hadoop.metadata.types.TypeSystem
import spray.http.StatusCodes
import spray.routing._
import scala.concurrent.duration._
class Responder(val typeSystem: TypeSystem, val memRepository : MemRepository,
requestContext:RequestContext, mdSvc:ActorRef) extends Actor with Json4sProtocol with ActorLogging {
import org.apache.hadoop.metadata.tools.simpleserver.MetadataProtocol._
def receive = {
case typNames:TypeNames =>
requestContext.complete(StatusCodes.OK, typNames)
self ! PoisonPill
case tD:TypeDetails =>
requestContext.complete(StatusCodes.OK, tD)
self ! PoisonPill
case TypesCreated =>
requestContext.complete(StatusCodes.OK)
self ! PoisonPill
case InstanceCreated(id) =>
requestContext.complete(StatusCodes.OK, id)
case InstanceDetails(i) =>
requestContext.complete(StatusCodes.OK, i)
}
}
class RestInterface(val typeSystem: TypeSystem, val memRepository : MemRepository) extends HttpServiceActor
with RestApi {
def receive = runRoute(routes)
}
trait RestApi extends HttpService with Json4sProtocol with ActorLogging { actor: Actor =>
import MetadataProtocol._
import scala.language.postfixOps
import scala.concurrent.ExecutionContext.Implicits.global
val typeSystem : TypeSystem
val memRepository : MemRepository
implicit val timeout = Timeout(10 seconds)
import akka.pattern.{ask, pipe}
val mdSvc = context.actorOf(Props(new MetadataActor(typeSystem, memRepository)))
def routes: Route =
path("listTypeNames") {
get { requestContext =>
val responder: ActorRef = createResponder(requestContext)
pipe(mdSvc.ask(ListTypeNames))
mdSvc.ask(ListTypeNames()).pipeTo(responder)
}
} ~
path("typeDetails") {
get {
entity(as[GetTypeDetails]) { typeDetails => requestContext =>
val responder = createResponder(requestContext)
mdSvc.ask(typeDetails).pipeTo(responder)
}
}
} ~
path("defineTypes") {
put {
entity(as[TypesDef]) { typesDef => requestContext =>
val responder = createResponder(requestContext)
mdSvc.ask(DefineTypes(typesDef)).pipeTo(responder)
}
}
} ~
path("createInstance") {
put {
entity(as[ITypedReferenceableInstance]) { i => requestContext =>
val responder = createResponder(requestContext)
mdSvc.ask(CreateInstance(i)).pipeTo(responder)
}
}
} ~
path("getInstance") {
get {
entity(as[Id]) { id => requestContext =>
val responder = createResponder(requestContext)
mdSvc.ask(GetInstance(id)).pipeTo(responder)
}
}
}
def createResponder(requestContext:RequestContext) = {
context.actorOf(Props(new Responder(typeSystem, memRepository, requestContext, mdSvc)))
}
}
\ No newline at end of file
{
"$typeName$":"Department",
"$id$":{
"id":-1420494283853484000,
"$typeName$":"Department",
"version":0
},
"employees":[{
"$typeName$":"Person",
"$id$":{
"id":-1420494283853508000,
"$typeName$":"Person",
"version":0
},
"manager":{
"id":-1420494283853511000,
"$typeName$":"Manager",
"version":0
},
"department":{
"id":-1420494283853484000,
"$typeName$":"Department",
"version":0
},
"name":"John"
},{
"$typeName$":"Manager",
"$id$":{
"id":-1420494283853511000,
"$typeName$":"Manager",
"version":0
},
"manager":null,
"subordinates":[{
"$typeName$":"Person",
"$id$":{
"id":-1420494283853508000,
"$typeName$":"Person",
"version":0
},
"manager":{
"id":-1420494283853511000,
"$typeName$":"Manager",
"version":0
},
"department":{
"id":-1420494283853484000,
"$typeName$":"Department",
"version":0
},
"name":"John"
}],
"department":{
"id":-1420494283853484000,
"$typeName$":"Department",
"version":0
},
"name":"Jane",
"$traits$":{
"SecurityClearance":{
"$typeName$":"SecurityClearance",
"level":1
}
}
}],
"name":"hr"
}
\ No newline at end of file
{
"enumTypes":[
{
"name":"HiveObjectType",
"enumValues":[
{
"value":"GLOBAL",
"ordinal":1
},
{
"value":"DATABASE",
"ordinal":2
},
{
"value":"TABLE",
"ordinal":3
},
{
"value":"PARTITION",
"ordinal":4
},
{
"value":"COLUMN",
"ordinal":5
}
]
},
{
"name":"LockLevel",
"enumValues":[
{
"value":"DB",
"ordinal":1
},
{
"value":"TABLE",
"ordinal":2
},
{
"value":"PARTITION",
"ordinal":3
}
]
},
{
"name":"TxnState",
"enumValues":[
{
"value":"COMMITTED",
"ordinal":1
},
{
"value":"ABORTED",
"ordinal":2
},
{
"value":"OPEN",
"ordinal":3
}
]
},
{
"name":"PrincipalType",
"enumValues":[
{
"value":"USER",
"ordinal":1
},
{
"value":"ROLE",
"ordinal":2
},
{
"value":"GROUP",
"ordinal":3
}
]
}
],
"structTypes":[
{
"typeName":"t2",
"attributeDefinitions":[
{
"name":"a",
"dataTypeName":"int",
"multiplicity":"required",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"s",
"dataTypeName":"t2",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
}
]
},
{
"typeName":"t1",
"attributeDefinitions":[
{
"name":"a",
"dataTypeName":"int",
"multiplicity":"required",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"b",
"dataTypeName":"boolean",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"c",
"dataTypeName":"byte",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"d",
"dataTypeName":"short",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"e",
"dataTypeName":"int",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"f",
"dataTypeName":"int",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"g",
"dataTypeName":"long",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"h",
"dataTypeName":"float",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"i",
"dataTypeName":"double",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"j",
"dataTypeName":"biginteger",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"k",
"dataTypeName":"bigdecimal",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"l",
"dataTypeName":"date",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"m",
"dataTypeName":"array<int>",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"n",
"dataTypeName":"array<bigdecimal>",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"o",
"dataTypeName":"map<string,double>",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
}
]
},
{
"typeName":"ts1",
"attributeDefinitions":[
{
"name":"a",
"dataTypeName":"int",
"multiplicity":"required",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"b",
"dataTypeName":"boolean",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"c",
"dataTypeName":"byte",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"d",
"dataTypeName":"short",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"e",
"dataTypeName":"int",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"f",
"dataTypeName":"int",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"g",
"dataTypeName":"long",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"h",
"dataTypeName":"float",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"i",
"dataTypeName":"double",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"j",
"dataTypeName":"biginteger",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"k",
"dataTypeName":"bigdecimal",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"l",
"dataTypeName":"date",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"m",
"dataTypeName":"array<int>",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"n",
"dataTypeName":"array<bigdecimal>",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"o",
"dataTypeName":"map<string,double>",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
}
]
}
],
"traitTypes":[
{
"superTypes":[
"B",
"C"
],
"hierarchicalMetaTypeName":"org.apache.hadoop.metadata.types.TraitType",
"typeName":"D",
"attributeDefinitions":[
{
"name":"d",
"dataTypeName":"short",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
}
]
},
{
"superTypes":[
],
"hierarchicalMetaTypeName":"org.apache.hadoop.metadata.types.TraitType",
"typeName":"A",
"attributeDefinitions":[
{
"name":"a",
"dataTypeName":"int",
"multiplicity":"required",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"b",
"dataTypeName":"boolean",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"c",
"dataTypeName":"byte",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"d",
"dataTypeName":"short",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
}
]
},
{
"superTypes":[
"A"
],
"hierarchicalMetaTypeName":"org.apache.hadoop.metadata.types.TraitType",
"typeName":"B",
"attributeDefinitions":[
{
"name":"b",
"dataTypeName":"boolean",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
}
]
},
{
"superTypes":[
"A"
],
"hierarchicalMetaTypeName":"org.apache.hadoop.metadata.types.TraitType",
"typeName":"C",
"attributeDefinitions":[
{
"name":"c",
"dataTypeName":"byte",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
}
]
},
{
"superTypes":[
],
"hierarchicalMetaTypeName":"org.apache.hadoop.metadata.types.TraitType",
"typeName":"SecurityClearance",
"attributeDefinitions":[
{
"name":"level",
"dataTypeName":"int",
"multiplicity":"required",
"isComposite":false,
"reverseAttributeName":null
}
]
}
],
"classTypes":[
{
"superTypes":[
"Person"
],
"hierarchicalMetaTypeName":"org.apache.hadoop.metadata.types.ClassType",
"typeName":"Manager",
"attributeDefinitions":[
{
"name":"subordinates",
"dataTypeName":"array<Person>",
"multiplicity":"collection",
"isComposite":false,
"reverseAttributeName":"manager"
}
]
},
{
"superTypes":[
],
"hierarchicalMetaTypeName":"org.apache.hadoop.metadata.types.ClassType",
"typeName":"Department",
"attributeDefinitions":[
{
"name":"name",
"dataTypeName":"string",
"multiplicity":"required",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"employees",
"dataTypeName":"array<Person>",
"multiplicity":"collection",
"isComposite":true,
"reverseAttributeName":"department"
}
]
},
{
"superTypes":[
],
"hierarchicalMetaTypeName":"org.apache.hadoop.metadata.types.ClassType",
"typeName":"t4",
"attributeDefinitions":[
{
"name":"a",
"dataTypeName":"int",
"multiplicity":"required",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"b",
"dataTypeName":"boolean",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"c",
"dataTypeName":"byte",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"d",
"dataTypeName":"short",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"enum1",
"dataTypeName":"HiveObjectType",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"e",
"dataTypeName":"int",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"f",
"dataTypeName":"int",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"g",
"dataTypeName":"long",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"enum2",
"dataTypeName":"PrincipalType",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"h",
"dataTypeName":"float",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"i",
"dataTypeName":"double",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"j",
"dataTypeName":"biginteger",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"k",
"dataTypeName":"bigdecimal",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"enum3",
"dataTypeName":"TxnState",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"l",
"dataTypeName":"date",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"m",
"dataTypeName":"array<int>",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"n",
"dataTypeName":"array<bigdecimal>",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"o",
"dataTypeName":"map<string,double>",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"enum4",
"dataTypeName":"LockLevel",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":null
}
]
},
{
"superTypes":[
],
"hierarchicalMetaTypeName":"org.apache.hadoop.metadata.types.ClassType",
"typeName":"Person",
"attributeDefinitions":[
{
"name":"name",
"dataTypeName":"string",
"multiplicity":"required",
"isComposite":false,
"reverseAttributeName":null
},
{
"name":"department",
"dataTypeName":"Department",
"multiplicity":"required",
"isComposite":false,
"reverseAttributeName":"employees"
},
{
"name":"manager",
"dataTypeName":"Manager",
"multiplicity":"optional",
"isComposite":false,
"reverseAttributeName":"subordinates"
}
]
}
]
}
...@@ -146,45 +146,3 @@ class TypesSerializationTest extends BaseTest with TypeHelpers { ...@@ -146,45 +146,3 @@ class TypesSerializationTest extends BaseTest with TypeHelpers {
Assert.assertEquals(typesDef1, typesDef2) Assert.assertEquals(typesDef1, typesDef2)
} }
} }
trait TypeHelpers {
def requiredAttr(name: String, dataType: IDataType[_]) =
new AttributeDefinition(name, dataType.getName, Multiplicity.REQUIRED, false, null)
def optionalAttr(name: String, dataTypeName: String) =
new AttributeDefinition(name, dataTypeName, Multiplicity.OPTIONAL, false, null)
def optionalAttr(name: String, dataType: IDataType[_]) =
new AttributeDefinition(name, dataType.getName, Multiplicity.OPTIONAL, false, null)
def structDef(name : String, attrs : AttributeDefinition*) = {
new StructTypeDefinition(name, attrs.toArray)
}
def defineTraits(ts: TypeSystem, tDefs: HierarchicalTypeDefinition[TraitType]*) = {
ts.defineTraitTypes(tDefs:_*)
}
def createTraitTypeDef(name: String, superTypes: Seq[String], attrDefs: AttributeDefinition*):
HierarchicalTypeDefinition[TraitType] = {
val sts = ImmutableList.copyOf(superTypes.toArray)
return new HierarchicalTypeDefinition[TraitType](classOf[TraitType], name,
sts, attrDefs.toArray)
}
def createClassTypeDef(name: String, superTypes: Seq[String], attrDefs: AttributeDefinition*):
HierarchicalTypeDefinition[ClassType] = {
val sts = ImmutableList.copyOf(superTypes.toArray)
return new HierarchicalTypeDefinition[ClassType](classOf[ClassType], name,
sts, attrDefs.toArray)
}
@throws(classOf[MetadataException])
def defineClassType(ts : TypeSystem, classDef: HierarchicalTypeDefinition[ClassType]): ClassType = {
ts.defineTypes(ImmutableList.of[StructTypeDefinition],
ImmutableList.of[HierarchicalTypeDefinition[TraitType]],
ImmutableList.of[HierarchicalTypeDefinition[ClassType]](classDef))
return ts.getDataType(classOf[ClassType], classDef.typeName)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment