Commit 7aece73b by Harish Butani

support query evaluation for select, where and join/step clauses

parent 98d4bace
......@@ -240,6 +240,32 @@ object Expressions {
}
/*
* treeString methods
*/
def nodeName = getClass.getSimpleName
def argString: String = productIterator.flatMap {
case e: Expression if children contains e => Nil
case e: Expression if e.toString contains "\n" => s"(${e.simpleString})" :: Nil
case seq: Seq[_] => seq.mkString("[", ",", "]") :: Nil
case set: Set[_] => set.mkString("{", ",", "}") :: Nil
case other => other :: Nil
}.mkString(", ")
/** String representation of this node without any children */
def simpleString = s"$nodeName $argString"
protected def generateTreeString(depth: Int, builder: StringBuilder): StringBuilder = {
builder.append(" " * depth)
builder.append(simpleString)
builder.append("\n")
children.foreach(_.generateTreeString(depth + 1, builder))
builder
}
def treeString = generateTreeString(0, new StringBuilder).toString
/*
* Fluent API methods
*/
def field(fieldName : String) = new UnresolvedFieldExpression(this, fieldName)
......@@ -342,7 +368,7 @@ object Expressions {
case class FieldExpression(fieldName: String, fieldInfo: FieldInfo, child: Option[Expression])
extends Expression {
val children = if (child.isDefined) List(child.get) else Nil
lazy val dataType = fieldInfo.attrInfo.dataType()
lazy val dataType = if (!fieldInfo.isReverse) fieldInfo.attrInfo.dataType() else fieldInfo.reverseDataType
override lazy val resolved: Boolean = true
override def namedExpressions = if ( child.isDefined ) child.get.namedExpressions else Map()
......
......@@ -19,10 +19,16 @@
package org.apache.hadoop.metadata.query
import Expressions._
import org.apache.hadoop.metadata.query.TypeUtils.FieldInfo
import org.apache.hadoop.metadata.types.DataTypes.TypeCategory
import org.apache.hadoop.metadata.types.{AttributeInfo, IDataType}
import scala.collection.immutable.TreeMap
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
case class GremlinQuery(queryStr : String) {
case class GremlinQuery(expr : Expression, queryStr : String) {
}
......@@ -36,15 +42,105 @@ trait GraphPersistenceStrategies {
* Given a dataType and a reference attribute, how is edge labeled
*/
def edgeLabel(iDataType: IDataType[_], aInfo : AttributeInfo) : String
def traitLabel(cls : IDataType[_], traitName : String) : String
def edgeLabel(fInfo : FieldInfo) : String = fInfo match {
case FieldInfo(dataType, aInfo, null) => edgeLabel(dataType, aInfo)
case FieldInfo(dataType, aInfo, reverseDataType) => edgeLabel(reverseDataType, aInfo)
}
def fieldPrefixInSelect : String
def gremlinCompOp(op : ComparisonExpression) = op.symbol match {
case "=" => "T.eq"
case "!=" => "T.neq"
case ">" => "T.gt"
case ">=" => "T.gte"
case "<" => "T.lt"
case "<=" => "T.lte"
case _ => throw new ExpressionException(op, "Comparison operator not supported in Gremlin")
}
}
object GraphPersistenceStrategy1 extends GraphPersistenceStrategies {
val typeAttributeName = "typeName"
def edgeLabel(dataType: IDataType[_], aInfo : AttributeInfo) = s"${dataType}.${aInfo.name}"
def edgeLabel(dataType: IDataType[_], aInfo : AttributeInfo) = s"${dataType.getName}.${aInfo.name}"
val fieldPrefixInSelect = "it"
def traitLabel(cls : IDataType[_], traitName : String) = s"${cls.getName}.$traitName"
}
trait SelectExpressionHandling {
/**
* To aide in gremlinQuery generation add an alias to the input of SelectExpressions
*/
class AddAliasToSelectInput extends PartialFunction[Expression, Expression] {
private var idx = 0
def isDefinedAt(e : Expression) = true
class DecorateFieldWithAlias(aliasE : AliasExpression)
extends PartialFunction[Expression, Expression] {
def isDefinedAt(e : Expression) = true
def apply(e : Expression) = e match {
case fe@FieldExpression(fieldName, fInfo, None) =>
FieldExpression(fieldName, fInfo, Some(BackReference(aliasE.alias, aliasE.child, None)))
case _ => e
}
}
def apply(e : Expression) = e match {
case SelectExpression(_:AliasExpression, _) => e
case SelectExpression(child, selList) => {
idx = idx + 1
val aliasE = AliasExpression(child, s"_src$idx")
SelectExpression(aliasE, selList.map(_.transformUp(new DecorateFieldWithAlias(aliasE))))
}
case _ => e
}
}
def getSelectExpressionSrc(e : Expression) : List[String] = {
val l = ArrayBuffer[String]()
e.traverseUp {
case BackReference(alias, _, _) => l += alias
}
l.toSet.toList
}
def validateSelectExprHaveOneSrc : PartialFunction[Expression, Unit] = {
case SelectExpression(_, selList) => {
selList.foreach { se =>
val srcs = getSelectExpressionSrc(se)
if ( srcs.size > 1 ) {
throw new GremlinTranslationException(se, "Only one src allowed in a Select Expression")
}
}
}
}
def groupSelectExpressionsBySrc(sel :SelectExpression) : mutable.LinkedHashMap[String, List[Expression]] = {
val m = mutable.LinkedHashMap[String, List[Expression]]()
sel.selectListWithAlias.foreach {se =>
val l = getSelectExpressionSrc(se.child)
if ( !m.contains(l(0)) ) {
m(l(0)) = List()
}
m(l(0)) = m(l(0)) :+ se.child
}
m
}
}
class GremlinTranslationException(expr: Expression, reason: String) extends
ExpressionException(expr, s"Unsupported Gremlin translation: $reason")
class GremlinTranslator(expr : Expression,
gPersistenceBehavior : GraphPersistenceStrategies = GraphPersistenceStrategy1) {
gPersistenceBehavior : GraphPersistenceStrategies = GraphPersistenceStrategy1)
extends SelectExpressionHandling {
val wrapAndRule : PartialFunction[Expression, Expression] = {
case f : FilterExpression if !f.condExpr.isInstanceOf[LogicalExpression] =>
......@@ -54,26 +150,102 @@ class GremlinTranslator(expr : Expression,
val validateComparisonForm : PartialFunction[Expression, Unit]= {
case c@ComparisonExpression(_, left, right) =>
if (!left.isInstanceOf[FieldExpression]) {
throw new ExpressionException(c, s"Gremlin transaltion not supported: lhs of comparison is not a field")
throw new GremlinTranslationException(c, s"lhs of comparison is not a field")
}
if (!right.isInstanceOf[Literal[_]]) {
throw new ExpressionException(c,
s"Gremlin transaltion for $c not supported: rhs of comparison is not a literal")
throw new GremlinTranslationException(c,
s"rhs of comparison is not a literal")
}
()
}
private def genQuery(expr : Expression) : String = expr match {
private def genQuery(expr : Expression, inSelect : Boolean) : String = expr match {
case ClassExpression(clsName) => s"""has("${gPersistenceBehavior.typeAttributeName}","$clsName")"""
case TraitExpression(clsName) => s"""has("${gPersistenceBehavior.typeAttributeName}","$clsName")"""
case x => "" //throw new ExpressionException(x, "Gremlin translation not supported")
case fe@FieldExpression(fieldName, fInfo, child) if fe.dataType.getTypeCategory == TypeCategory.PRIMITIVE => {
child match {
case Some(e) => s"${genQuery(e, inSelect)}.$fieldName"
case None => fieldName
}
}
case fe@FieldExpression(fieldName, fInfo, child)
if fe.dataType.getTypeCategory == TypeCategory.CLASS || fe.dataType.getTypeCategory == TypeCategory.STRUCT => {
val direction = if (fInfo.isReverse) "in" else "out"
val edgeLbl = gPersistenceBehavior.edgeLabel(fInfo)
val step = s"""$direction("$edgeLbl")"""
child match {
case Some(e) => s"${genQuery(e, inSelect)}.$step"
case None => step
}
}
case c@ComparisonExpression(symb, f@FieldExpression(fieldName, fInfo, ch), l) => {
ch match {
case Some(child) =>
s"""${genQuery(child, inSelect)}.has("$fieldName", ${gPersistenceBehavior.gremlinCompOp(c)}, $l)"""
case None => s"""has("$fieldName", ${gPersistenceBehavior.gremlinCompOp(c)}, $l)"""
}
}
case fil@FilterExpression(child, condExpr) => {
s"${genQuery(child, inSelect)}.${genQuery(condExpr, inSelect)}"
}
case l@LogicalExpression(symb, children) => {
s"""$symb${children.map("_()." + genQuery(_, inSelect)).mkString("(", ",", ")")}"""
}
case sel@SelectExpression(child, selList) => {
val m = groupSelectExpressionsBySrc(sel)
var srcNamesList : List[String] = List()
var srcExprsList : List[List[String]] = List()
val it = m.iterator
while(it.hasNext) {
val (src, selExprs) = it.next
srcNamesList = srcNamesList :+ s""""$src""""
srcExprsList = srcExprsList :+ selExprs.map { selExpr =>
genQuery(selExpr, true)
}
}
val srcNamesString = srcNamesList.mkString("[", ",", "]")
val srcExprsStringList = srcExprsList.map{_.mkString("[", ",", "]")}
val srcExprsString = srcExprsStringList.foldLeft("")(_ + "{" + _ + "}")
s"${genQuery(child, inSelect)}.select($srcNamesString)$srcExprsString"
}
case BackReference(alias, _, _) =>
if (inSelect) gPersistenceBehavior.fieldPrefixInSelect else s"""back("$alias")"""
case AliasExpression(child, alias) => s"""${genQuery(child, inSelect)}.as("$alias")"""
case isTraitLeafExpression(traitName, Some(clsExp)) =>
s"""out("${gPersistenceBehavior.traitLabel(clsExp.dataType, traitName)}")"""
case isTraitUnaryExpression(traitName, child) =>
s"""out("${gPersistenceBehavior.traitLabel(child.dataType, traitName)}")"""
case hasFieldLeafExpression(fieldName, Some(clsExp)) =>
s"""has("$fieldName")"""
case hasFieldUnaryExpression(fieldName, child) =>
s"""${genQuery(child, inSelect)}.has("$fieldName")"""
case ArithmeticExpression(symb,left,right) => s"${genQuery(left,inSelect)} $symb ${genQuery(right, inSelect)}"
case l : Literal[_] => l.toString
case x => throw new GremlinTranslationException(x, "expression not yet supported")
}
def translate() : GremlinQuery = {
val e1 = expr.transformUp(wrapAndRule)
var e1 = expr.transformUp(wrapAndRule)
e1.traverseUp(validateComparisonForm)
GremlinQuery("g.V." + genQuery(e1) + ".toList()")
e1 = e1.transformUp(new AddAliasToSelectInput)
e1.traverseUp(validateSelectExprHaveOneSrc)
e1 match {
case e1: SelectExpression => GremlinQuery(e1, s"g.V.${genQuery(e1, false)}.toList()")
case e1 => GremlinQuery(e1, s"g.V.${genQuery(e1, false)}.map().toList()")
}
}
/*
* Translation Issues:
* 1. back references in filters. For e.g. testBackreference: 'DB as db Table where (db.name = "Reporting")'
* this is translated to:
* g.V.has("typeName","DB").as("db").in("Table.db").and(_().back("db").has("name", T.eq, "Reporting")).map().toList()
* But the '_().back("db") within the and is ignored, the has condition is applied on the current element.
* The solution is to to do predicate pushdown and apply the filter immediately on top of the referred Expression.
*/
}
\ No newline at end of file
......@@ -24,8 +24,8 @@ import com.thinkaurelius.titan.core.TitanGraph
object QueryProcessor {
def evaluate(e : Expression, g : TitanGraph) : AnyRef = {
validate(e)
val q = new GremlinTranslator(e).translate()
val e1 = validate(e)
val q = new GremlinTranslator(e1).translate()
println(q.queryStr)
new GremlinEvaluator(q, g).evaluate()
}
......
......@@ -60,7 +60,7 @@ class ExpressionTest extends BaseTest {
}
@Test def testIsTrait: Unit = {
val e = QueryProcessor.validate(_class("DB").where(isTrait("Jdbc")))
val e = QueryProcessor.validate(_class("DB").where(isTrait("JdbcAccess")))
println(e)
}
......@@ -112,15 +112,16 @@ class ExpressionTest extends BaseTest {
@Test def testJoinAndSelect1: Unit = {
val e = QueryProcessor.validate(
_class("DB").as("db").field("Table").as("tab").where((id("createTime") + int(1) > int(0))
.and(id("db").field("name").`=`(string("Reporting")))).select(id("db").field("name").as("dbName"), id("tab").field("name").as("tabName"))
_class("DB").as("db").field("Table").as("tab").where((id("db").field("createTime") + int(1) > int(0))
.and(id("db").field("name").`=`(string("Reporting")))).select(id("db").field("name").as("dbName"),
id("tab").field("name").as("tabName"))
)
println(e)
}
@Test def testJoinAndSelect2: Unit = {
val e = QueryProcessor.validate(
_class("DB").as("db").field("Table").as("tab").where((id("createTime") + int(1) > int(0))
_class("DB").as("db").field("Table").as("tab").where((id("db").field("createTime") + int(1) > int(0))
.or(id("db").field("name").`=`(string("Reporting"))))
.select(id("db").field("name").as("dbName"), id("tab").field("name").as("tabName"))
)
......@@ -129,7 +130,7 @@ class ExpressionTest extends BaseTest {
@Test def testJoinAndSelect3: Unit = {
val e = QueryProcessor.validate(
_class("DB").as("db").field("Table").as("tab").where((id("createTime") + int(1) > int(0))
_class("DB").as("db").field("Table").as("tab").where((id("db").field("createTime") + int(1) > int(0))
.and(id("db").field("name").`=`(string("Reporting")))
.or(id("db").hasField("owner")))
.select(id("db").field("name").as("dbName"), id("tab").field("name").as("tabName"))
......@@ -140,7 +141,7 @@ class ExpressionTest extends BaseTest {
@Test def testJoinAndSelect4: Unit = {
val e = QueryProcessor.validate(
_class("DB") as "db" join "Table" as "tab" where (
id("createTime") + int(1) > int(0) and
id("db").field("createTime") + int(1) > int(0) and
( id("db") `.` "name" `=` string("Reporting") ) or
( id("db") hasField "owner" )
) select (
......
......@@ -2,13 +2,18 @@ package org.apache.hadoop.metadata.query
import com.thinkaurelius.titan.core.TitanGraph
import org.apache.hadoop.metadata.query.Expressions._
import org.apache.hadoop.metadata.types.TypeSystem
import org.junit.runner.RunWith
import org.scalatest._
import org.scalatest.junit.JUnitRunner
@RunWith(classOf[JUnitRunner])
class GremlinTest extends FunSuite with BeforeAndAfterAll {
var g: TitanGraph = null
override def beforeAll() {
TypeSystem.getInstance().reset()
QueryTestsUtils.setupTypes
g = QueryTestsUtils.setupTestGraph
}
......@@ -22,5 +27,76 @@ class GremlinTest extends FunSuite with BeforeAndAfterAll {
println(r)
}
test("testFilter") {
val r = QueryProcessor.evaluate(_class("DB").where(id("name").`=`(string("Reporting"))), g)
println(r)
}
test("testSelect") {
val r = QueryProcessor.evaluate(_class("DB").where(id("name").`=`(string("Reporting"))).
select(id("name"), id("owner")), g)
println(r)
}
test("testIsTrait") {
val r = QueryProcessor.evaluate(_class("Table").where(isTrait("Dimension")), g)
println(r)
}
test("testhasField") {
val r = QueryProcessor.evaluate(_class("DB").where(hasField("name")), g)
println(r)
}
test("testFieldReference") {
val r = QueryProcessor.evaluate(_class("DB").field("Table"), g)
println(r)
}
test("testBackReference") {
val r = QueryProcessor.evaluate(
_class("DB").as("db").field("Table").where(id("db").field("name").`=`(string("Reporting"))), g)
println(r)
}
test("testArith") {
val r = QueryProcessor.evaluate(_class("DB").where(id("name").`=`(string("Reporting"))).
select(id("name"), id("createTime") + int(1)), g)
println(r)
}
test("testComparisonLogical") {
val r = QueryProcessor.evaluate(_class("DB").where(id("name").`=`(string("Reporting")).
and(id("createTime") > int(0))), g)
println(r)
}
test("testJoinAndSelect1") {
val r = QueryProcessor.evaluate(
_class("DB").as("db1").where(id("name").`=`(string("Sales"))).field("Table").as("tab").
where((isTrait("Dimension"))).
select(id("db1").field("name").as("dbName"), id("tab").field("name").as("tabName")),g
)
println(r)
}
test("testJoinAndSelect2") {
val r = QueryProcessor.evaluate(
_class("DB").as("db1").where((id("db1").field("createTime") > int(0))
.or(id("name").`=`(string("Reporting")))).field("Table").as("tab")
.select(id("db1").field("name").as("dbName"), id("tab").field("name").as("tabName")), g
)
println(r)
}
test("testJoinAndSelect3") {
val r = QueryProcessor.evaluate(
_class("DB").as("db1").where((id("db1").field("createTime") > int(0))
.and(id("db1").field("name").`=`(string("Reporting")))
.or(id("db1").hasField("owner"))).field("Table").as("tab")
.select(id("db1").field("name").as("dbName"), id("tab").field("name").as("tabName")), g
)
println(r)
}
}
......@@ -78,7 +78,7 @@ object HiveTitanSample {
case class ETL(_id: String = "" + nextVertexId.incrementAndGet()) extends Trait
case class DB(name: String, owner: String, traits : Option[List[Trait]] = None,
case class DB(name: String, owner: String, createTime : Int, traits : Option[List[Trait]] = None,
_id: String = "" + nextVertexId.incrementAndGet()) extends Instance
case class StorageDescriptor(inputFormat : String, outputFormat : String,
......@@ -118,7 +118,7 @@ object HiveTitanSample {
traits : Option[List[Trait]] = None,
_id: String = "" + nextVertexId.incrementAndGet()) extends Instance
val salesDB = DB("Sales", "John ETL")
val salesDB = DB("Sales", "John ETL", 1000)
val salesFact = TableDef("sales_fact",
salesDB,
"TextInputFormat",
......@@ -160,7 +160,7 @@ object HiveTitanSample {
),
Some(List(Dimension())))
val reportingDB = DB("Reporting", "Jane BI")
val reportingDB = DB("Reporting", "Jane BI", 1500)
val salesFactDaily = TableDef("sales_fact_daily_mv",
reportingDB,
"TextInputFormat",
......
......@@ -110,7 +110,7 @@ object QueryTestsUtils extends GraphUtils {
Array[AttributeDefinition]())
def etlTraitDef = new HierarchicalTypeDefinition[TraitType](classOf[TraitType], "ETL", null,
Array[AttributeDefinition]())
def jdbcTraitDef = new HierarchicalTypeDefinition[TraitType](classOf[TraitType], "Jdbc", null,
def jdbcTraitDef = new HierarchicalTypeDefinition[TraitType](classOf[TraitType], "JdbcAccess", null,
Array[AttributeDefinition]())
TypeSystem.getInstance().defineTypes(ImmutableList.of[StructTypeDefinition],
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment