Commit f02580f0 by Harish Butani

plumbing for Gremlin translation and evaluation; handle className query

parent 25ff452b
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.query
import javax.script.{Bindings, ScriptEngine, ScriptEngineManager}
import com.thinkaurelius.titan.core.TitanGraph
class GremlinEvaluator(qry : GremlinQuery, g: TitanGraph) {
val manager: ScriptEngineManager = new ScriptEngineManager
val engine: ScriptEngine = manager.getEngineByName("gremlin-groovy")
val bindings: Bindings = engine.createBindings
bindings.put("g", g)
def evaluate() : AnyRef = engine.eval(qry.queryStr, bindings)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.query
import Expressions._
import org.apache.hadoop.metadata.types.{AttributeInfo, IDataType}
case class GremlinQuery(queryStr : String) {
}
trait GraphPersistenceStrategies {
/**
* Name of attribute used to store typeName in vertex
*/
def typeAttributeName : String
/**
* Given a dataType and a reference attribute, how is edge labeled
*/
def edgeLabel(iDataType: IDataType[_], aInfo : AttributeInfo) : String
}
object GraphPersistenceStrategy1 extends GraphPersistenceStrategies {
val typeAttributeName = "typeName"
def edgeLabel(dataType: IDataType[_], aInfo : AttributeInfo) = s"${dataType}.${aInfo.name}"
}
class GremlinTranslator(expr : Expression,
gPersistenceBehavior : GraphPersistenceStrategies = GraphPersistenceStrategy1) {
val wrapAndRule : PartialFunction[Expression, Expression] = {
case f : FilterExpression if !f.condExpr.isInstanceOf[LogicalExpression] =>
FilterExpression(f.child, new LogicalExpression("and", List(f.condExpr)))
}
val validateComparisonForm : PartialFunction[Expression, Unit]= {
case c@ComparisonExpression(_, left, right) =>
if (!left.isInstanceOf[FieldExpression]) {
throw new ExpressionException(c, s"Gremlin transaltion not supported: lhs of comparison is not a field")
}
if (!right.isInstanceOf[Literal[_]]) {
throw new ExpressionException(c,
s"Gremlin transaltion for $c not supported: rhs of comparison is not a literal")
}
()
}
private def genQuery(expr : Expression) : String = expr match {
case ClassExpression(clsName) => s"""has("${gPersistenceBehavior.typeAttributeName}","$clsName")"""
case TraitExpression(clsName) => s"""has("${gPersistenceBehavior.typeAttributeName}","$clsName")"""
case x => "" //throw new ExpressionException(x, "Gremlin translation not supported")
}
def translate() : GremlinQuery = {
val e1 = expr.transformUp(wrapAndRule)
e1.traverseUp(validateComparisonForm)
GremlinQuery("g.V." + genQuery(e1) + ".toList()")
}
}
\ No newline at end of file
...@@ -19,9 +19,17 @@ ...@@ -19,9 +19,17 @@
package org.apache.hadoop.metadata.query package org.apache.hadoop.metadata.query
import Expressions._ import Expressions._
import com.thinkaurelius.titan.core.TitanGraph
object QueryProcessor { object QueryProcessor {
def evaluate(e : Expression, g : TitanGraph) : AnyRef = {
validate(e)
val q = new GremlinTranslator(e).translate()
println(q.queryStr)
new GremlinEvaluator(q, g).evaluate()
}
def validate(e : Expression) : Expression = { def validate(e : Expression) : Expression = {
val e1 = e.transformUp(new Resolver()) val e1 = e.transformUp(new Resolver())
......
...@@ -30,71 +30,7 @@ class ExpressionTest extends BaseTest { ...@@ -30,71 +30,7 @@ class ExpressionTest extends BaseTest {
override def setup { override def setup {
super.setup super.setup
def attrDef(name : String, dT : IDataType[_], QueryTestsUtils.setupTypes
m : Multiplicity = Multiplicity.OPTIONAL,
isComposite: Boolean = false,
reverseAttributeName: String = null) = {
require(name != null)
require(dT != null)
new AttributeDefinition(name, dT.getName, m, isComposite, reverseAttributeName)
}
def dbClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "DB", null,
Array(
attrDef("name", DataTypes.STRING_TYPE),
attrDef("owner", DataTypes.STRING_TYPE),
attrDef("createTime", DataTypes.LONG_TYPE)
))
def storageDescClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "StorageDesc", null,
Array(
attrDef("inputFormat", DataTypes.STRING_TYPE),
attrDef("outputFormat", DataTypes.STRING_TYPE)
))
def columnClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "Column", null,
Array(
attrDef("name", DataTypes.STRING_TYPE),
attrDef("dataType", DataTypes.STRING_TYPE),
new AttributeDefinition("sd", "StorageDesc", Multiplicity.REQUIRED, false, null)
))
def tblClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "Table", null,
Array(
attrDef("name", DataTypes.STRING_TYPE),
new AttributeDefinition("db", "DB", Multiplicity.REQUIRED, false, null),
new AttributeDefinition("sd", "StorageDesc", Multiplicity.REQUIRED, false, null)
))
def loadProcessClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "LoadProcess", null,
Array(
attrDef("name", DataTypes.STRING_TYPE),
new AttributeDefinition("inputTables", "Table", Multiplicity.COLLECTION, false, null),
new AttributeDefinition("outputTable", "Table", Multiplicity.REQUIRED, false, null)
))
def viewClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "View", null,
Array(
attrDef("name", DataTypes.STRING_TYPE),
new AttributeDefinition("inputTables", "Table", Multiplicity.COLLECTION, false, null)
))
def dimTraitDef = new HierarchicalTypeDefinition[TraitType](classOf[TraitType], "Dimension", null,
Array[AttributeDefinition]())
def piiTraitDef = new HierarchicalTypeDefinition[TraitType](classOf[TraitType], "PII", null,
Array[AttributeDefinition]())
def metricTraitDef = new HierarchicalTypeDefinition[TraitType](classOf[TraitType], "Metric", null,
Array[AttributeDefinition]())
def etlTraitDef = new HierarchicalTypeDefinition[TraitType](classOf[TraitType], "ETL", null,
Array[AttributeDefinition]())
def jdbcTraitDef = new HierarchicalTypeDefinition[TraitType](classOf[TraitType], "Jdbc", null,
Array[AttributeDefinition]())
getTypeSystem.defineTypes(ImmutableList.of[StructTypeDefinition],
ImmutableList.of[HierarchicalTypeDefinition[TraitType]](dimTraitDef, piiTraitDef,
metricTraitDef, etlTraitDef, jdbcTraitDef),
ImmutableList.of[HierarchicalTypeDefinition[ClassType]](dbClsDef, storageDescClsDef, columnClsDef, tblClsDef,
loadProcessClsDef, viewClsDef))
} }
......
package org.apache.hadoop.metadata.query
import com.thinkaurelius.titan.core.TitanGraph
import org.apache.hadoop.metadata.query.Expressions._
import org.scalatest._
class GremlinTest extends FunSuite with BeforeAndAfterAll {
var g: TitanGraph = null
override def beforeAll() {
QueryTestsUtils.setupTypes
g = QueryTestsUtils.setupTestGraph
}
override def afterAll() {
g.shutdown()
}
test("testClass") {
val r = QueryProcessor.evaluate(_class("DB"), g)
println(r)
}
}
...@@ -4,35 +4,12 @@ import java.io.File ...@@ -4,35 +4,12 @@ import java.io.File
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import javax.script.{Bindings, ScriptEngine, ScriptEngineManager} import javax.script.{Bindings, ScriptEngine, ScriptEngineManager}
import com.thinkaurelius.titan.core.{TitanFactory, TitanGraph} import com.thinkaurelius.titan.core.TitanGraph
import com.typesafe.config.{Config, ConfigFactory} import com.typesafe.config.ConfigFactory
import org.apache.commons.configuration.{Configuration, ConfigurationException, MapConfiguration}
import org.apache.commons.io.FileUtils import org.apache.commons.io.FileUtils
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
trait GraphUtils {
import scala.collection.JavaConversions._
def getConfiguration(config : Config) : Configuration = {
val keys = config.entrySet().map { _.getKey}
val gConfig : java.util.Map[String, String] = new java.util.HashMap[String, String]()
keys.foreach { k =>
gConfig.put(k, config.getString(k))
}
return new MapConfiguration(gConfig)
}
def titanGraph(conf : Config) = {
try {
TitanFactory.open(getConfiguration(conf))
} catch {
case e : ConfigurationException => throw new RuntimeException(e)
}
}
}
object HiveTitanSample { object HiveTitanSample {
private var nextVertexId : AtomicInteger = new AtomicInteger(0) private var nextVertexId : AtomicInteger = new AtomicInteger(0)
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.query
import java.io.File
import javax.script.{Bindings, ScriptEngine, ScriptEngineManager}
import com.google.common.collect.ImmutableList
import com.thinkaurelius.titan.core.{TitanFactory, TitanGraph}
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.commons.configuration.{ConfigurationException, MapConfiguration, Configuration}
import org.apache.commons.io.FileUtils
import org.apache.hadoop.metadata.types._
trait GraphUtils {
import scala.collection.JavaConversions._
def getConfiguration(config : Config) : Configuration = {
val keys = config.entrySet().map { _.getKey}
val gConfig : java.util.Map[String, String] = new java.util.HashMap[String, String]()
keys.foreach { k =>
gConfig.put(k, config.getString(k))
}
return new MapConfiguration(gConfig)
}
def titanGraph(conf : Config) = {
try {
TitanFactory.open(getConfiguration(conf))
} catch {
case e : ConfigurationException => throw new RuntimeException(e)
}
}
}
object QueryTestsUtils extends GraphUtils {
def setupTypes : Unit = {
def attrDef(name : String, dT : IDataType[_],
m : Multiplicity = Multiplicity.OPTIONAL,
isComposite: Boolean = false,
reverseAttributeName: String = null) = {
require(name != null)
require(dT != null)
new AttributeDefinition(name, dT.getName, m, isComposite, reverseAttributeName)
}
def dbClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "DB", null,
Array(
attrDef("name", DataTypes.STRING_TYPE),
attrDef("owner", DataTypes.STRING_TYPE),
attrDef("createTime", DataTypes.LONG_TYPE)
))
def storageDescClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "StorageDesc", null,
Array(
attrDef("inputFormat", DataTypes.STRING_TYPE),
attrDef("outputFormat", DataTypes.STRING_TYPE)
))
def columnClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "Column", null,
Array(
attrDef("name", DataTypes.STRING_TYPE),
attrDef("dataType", DataTypes.STRING_TYPE),
new AttributeDefinition("sd", "StorageDesc", Multiplicity.REQUIRED, false, null)
))
def tblClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "Table", null,
Array(
attrDef("name", DataTypes.STRING_TYPE),
new AttributeDefinition("db", "DB", Multiplicity.REQUIRED, false, null),
new AttributeDefinition("sd", "StorageDesc", Multiplicity.REQUIRED, false, null)
))
def loadProcessClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "LoadProcess", null,
Array(
attrDef("name", DataTypes.STRING_TYPE),
new AttributeDefinition("inputTables", "Table", Multiplicity.COLLECTION, false, null),
new AttributeDefinition("outputTable", "Table", Multiplicity.REQUIRED, false, null)
))
def viewClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "View", null,
Array(
attrDef("name", DataTypes.STRING_TYPE),
new AttributeDefinition("inputTables", "Table", Multiplicity.COLLECTION, false, null)
))
def dimTraitDef = new HierarchicalTypeDefinition[TraitType](classOf[TraitType], "Dimension", null,
Array[AttributeDefinition]())
def piiTraitDef = new HierarchicalTypeDefinition[TraitType](classOf[TraitType], "PII", null,
Array[AttributeDefinition]())
def metricTraitDef = new HierarchicalTypeDefinition[TraitType](classOf[TraitType], "Metric", null,
Array[AttributeDefinition]())
def etlTraitDef = new HierarchicalTypeDefinition[TraitType](classOf[TraitType], "ETL", null,
Array[AttributeDefinition]())
def jdbcTraitDef = new HierarchicalTypeDefinition[TraitType](classOf[TraitType], "Jdbc", null,
Array[AttributeDefinition]())
TypeSystem.getInstance().defineTypes(ImmutableList.of[StructTypeDefinition],
ImmutableList.of[HierarchicalTypeDefinition[TraitType]](dimTraitDef, piiTraitDef,
metricTraitDef, etlTraitDef, jdbcTraitDef),
ImmutableList.of[HierarchicalTypeDefinition[ClassType]](dbClsDef, storageDescClsDef, columnClsDef, tblClsDef,
loadProcessClsDef, viewClsDef))
()
}
def setupTestGraph : TitanGraph = {
var conf = ConfigFactory.load()
conf = conf.getConfig("graphRepo")
val g = titanGraph(conf)
val manager: ScriptEngineManager = new ScriptEngineManager
val engine: ScriptEngine = manager.getEngineByName("gremlin-groovy")
val bindings: Bindings = engine.createBindings
bindings.put("g", g)
val hiveGraphFile = FileUtils.getTempDirectory().getPath.toString + File.separator + System.nanoTime() + ".gson"
HiveTitanSample.writeGson(hiveGraphFile)
bindings.put("hiveGraphFile", hiveGraphFile)
engine.eval("g.loadGraphSON(hiveGraphFile)", bindings)
g
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment