diff --git a/typesystem/src/main/scala/org/apache/hadoop/metadata/query/Expressions.scala b/typesystem/src/main/scala/org/apache/hadoop/metadata/query/Expressions.scala index 42d7af0..986906c 100644 --- a/typesystem/src/main/scala/org/apache/hadoop/metadata/query/Expressions.scala +++ b/typesystem/src/main/scala/org/apache/hadoop/metadata/query/Expressions.scala @@ -240,6 +240,32 @@ object Expressions { } /* + * treeString methods + */ + def nodeName = getClass.getSimpleName + + def argString: String = productIterator.flatMap { + case e: Expression if children contains e => Nil + case e: Expression if e.toString contains "\n" => s"(${e.simpleString})" :: Nil + case seq: Seq[_] => seq.mkString("[", ",", "]") :: Nil + case set: Set[_] => set.mkString("{", ",", "}") :: Nil + case other => other :: Nil + }.mkString(", ") + + /** String representation of this node without any children */ + def simpleString = s"$nodeName $argString" + + protected def generateTreeString(depth: Int, builder: StringBuilder): StringBuilder = { + builder.append(" " * depth) + builder.append(simpleString) + builder.append("\n") + children.foreach(_.generateTreeString(depth + 1, builder)) + builder + } + + def treeString = generateTreeString(0, new StringBuilder).toString + + /* * Fluent API methods */ def field(fieldName : String) = new UnresolvedFieldExpression(this, fieldName) @@ -342,7 +368,7 @@ object Expressions { case class FieldExpression(fieldName: String, fieldInfo: FieldInfo, child: Option[Expression]) extends Expression { val children = if (child.isDefined) List(child.get) else Nil - lazy val dataType = fieldInfo.attrInfo.dataType() + lazy val dataType = if (!fieldInfo.isReverse) fieldInfo.attrInfo.dataType() else fieldInfo.reverseDataType override lazy val resolved: Boolean = true override def namedExpressions = if ( child.isDefined ) child.get.namedExpressions else Map() diff --git a/typesystem/src/main/scala/org/apache/hadoop/metadata/query/GremlinQuery.scala b/typesystem/src/main/scala/org/apache/hadoop/metadata/query/GremlinQuery.scala index 17fa93a..e1d7d11 100644 --- a/typesystem/src/main/scala/org/apache/hadoop/metadata/query/GremlinQuery.scala +++ b/typesystem/src/main/scala/org/apache/hadoop/metadata/query/GremlinQuery.scala @@ -19,10 +19,16 @@ package org.apache.hadoop.metadata.query import Expressions._ +import org.apache.hadoop.metadata.query.TypeUtils.FieldInfo +import org.apache.hadoop.metadata.types.DataTypes.TypeCategory import org.apache.hadoop.metadata.types.{AttributeInfo, IDataType} +import scala.collection.immutable.TreeMap +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer -case class GremlinQuery(queryStr : String) { + +case class GremlinQuery(expr : Expression, queryStr : String) { } @@ -36,15 +42,105 @@ trait GraphPersistenceStrategies { * Given a dataType and a reference attribute, how is edge labeled */ def edgeLabel(iDataType: IDataType[_], aInfo : AttributeInfo) : String + + def traitLabel(cls : IDataType[_], traitName : String) : String + + def edgeLabel(fInfo : FieldInfo) : String = fInfo match { + case FieldInfo(dataType, aInfo, null) => edgeLabel(dataType, aInfo) + case FieldInfo(dataType, aInfo, reverseDataType) => edgeLabel(reverseDataType, aInfo) + } + + def fieldPrefixInSelect : String + + def gremlinCompOp(op : ComparisonExpression) = op.symbol match { + case "=" => "T.eq" + case "!=" => "T.neq" + case ">" => "T.gt" + case ">=" => "T.gte" + case "<" => "T.lt" + case "<=" => "T.lte" + case _ => throw new ExpressionException(op, "Comparison operator not supported in Gremlin") + } } object GraphPersistenceStrategy1 extends GraphPersistenceStrategies { val typeAttributeName = "typeName" - def edgeLabel(dataType: IDataType[_], aInfo : AttributeInfo) = s"${dataType}.${aInfo.name}" + def edgeLabel(dataType: IDataType[_], aInfo : AttributeInfo) = s"${dataType.getName}.${aInfo.name}" + val fieldPrefixInSelect = "it" + def traitLabel(cls : IDataType[_], traitName : String) = s"${cls.getName}.$traitName" +} + +trait SelectExpressionHandling { + /** + * To aide in gremlinQuery generation add an alias to the input of SelectExpressions + */ + class AddAliasToSelectInput extends PartialFunction[Expression, Expression] { + + private var idx = 0 + + def isDefinedAt(e : Expression) = true + + class DecorateFieldWithAlias(aliasE : AliasExpression) + extends PartialFunction[Expression, Expression] { + def isDefinedAt(e : Expression) = true + + def apply(e : Expression) = e match { + case fe@FieldExpression(fieldName, fInfo, None) => + FieldExpression(fieldName, fInfo, Some(BackReference(aliasE.alias, aliasE.child, None))) + case _ => e + } + } + + def apply(e : Expression) = e match { + case SelectExpression(_:AliasExpression, _) => e + case SelectExpression(child, selList) => { + idx = idx + 1 + val aliasE = AliasExpression(child, s"_src$idx") + SelectExpression(aliasE, selList.map(_.transformUp(new DecorateFieldWithAlias(aliasE)))) + } + case _ => e + } + } + + def getSelectExpressionSrc(e : Expression) : List[String] = { + val l = ArrayBuffer[String]() + e.traverseUp { + case BackReference(alias, _, _) => l += alias + } + l.toSet.toList + } + + def validateSelectExprHaveOneSrc : PartialFunction[Expression, Unit] = { + case SelectExpression(_, selList) => { + selList.foreach { se => + val srcs = getSelectExpressionSrc(se) + if ( srcs.size > 1 ) { + throw new GremlinTranslationException(se, "Only one src allowed in a Select Expression") + } + } + } + } + + def groupSelectExpressionsBySrc(sel :SelectExpression) : mutable.LinkedHashMap[String, List[Expression]] = { + val m = mutable.LinkedHashMap[String, List[Expression]]() + sel.selectListWithAlias.foreach {se => + val l = getSelectExpressionSrc(se.child) + if ( !m.contains(l(0)) ) { + m(l(0)) = List() + } + m(l(0)) = m(l(0)) :+ se.child + } + m + } + } +class GremlinTranslationException(expr: Expression, reason: String) extends + ExpressionException(expr, s"Unsupported Gremlin translation: $reason") + class GremlinTranslator(expr : Expression, - gPersistenceBehavior : GraphPersistenceStrategies = GraphPersistenceStrategy1) { + gPersistenceBehavior : GraphPersistenceStrategies = GraphPersistenceStrategy1) +extends SelectExpressionHandling { val wrapAndRule : PartialFunction[Expression, Expression] = { case f : FilterExpression if !f.condExpr.isInstanceOf[LogicalExpression] => @@ -54,26 +150,102 @@ class GremlinTranslator(expr : Expression, val validateComparisonForm : PartialFunction[Expression, Unit]= { case c@ComparisonExpression(_, left, right) => if (!left.isInstanceOf[FieldExpression]) { - throw new ExpressionException(c, s"Gremlin transaltion not supported: lhs of comparison is not a field") + throw new GremlinTranslationException(c, s"lhs of comparison is not a field") } if (!right.isInstanceOf[Literal[_]]) { - throw new ExpressionException(c, - s"Gremlin transaltion for $c not supported: rhs of comparison is not a literal") + throw new GremlinTranslationException(c, + s"rhs of comparison is not a literal") } () } - private def genQuery(expr : Expression) : String = expr match { + private def genQuery(expr : Expression, inSelect : Boolean) : String = expr match { case ClassExpression(clsName) => s"""has("${gPersistenceBehavior.typeAttributeName}","$clsName")""" case TraitExpression(clsName) => s"""has("${gPersistenceBehavior.typeAttributeName}","$clsName")""" - case x => "" //throw new ExpressionException(x, "Gremlin translation not supported") + case fe@FieldExpression(fieldName, fInfo, child) if fe.dataType.getTypeCategory == TypeCategory.PRIMITIVE => { + child match { + case Some(e) => s"${genQuery(e, inSelect)}.$fieldName" + case None => fieldName + } + } + case fe@FieldExpression(fieldName, fInfo, child) + if fe.dataType.getTypeCategory == TypeCategory.CLASS || fe.dataType.getTypeCategory == TypeCategory.STRUCT => { + val direction = if (fInfo.isReverse) "in" else "out" + val edgeLbl = gPersistenceBehavior.edgeLabel(fInfo) + val step = s"""$direction("$edgeLbl")""" + child match { + case Some(e) => s"${genQuery(e, inSelect)}.$step" + case None => step + } + } + case c@ComparisonExpression(symb, f@FieldExpression(fieldName, fInfo, ch), l) => { + ch match { + case Some(child) => + s"""${genQuery(child, inSelect)}.has("$fieldName", ${gPersistenceBehavior.gremlinCompOp(c)}, $l)""" + case None => s"""has("$fieldName", ${gPersistenceBehavior.gremlinCompOp(c)}, $l)""" + } + } + case fil@FilterExpression(child, condExpr) => { + s"${genQuery(child, inSelect)}.${genQuery(condExpr, inSelect)}" + } + case l@LogicalExpression(symb, children) => { + s"""$symb${children.map("_()." + genQuery(_, inSelect)).mkString("(", ",", ")")}""" + } + case sel@SelectExpression(child, selList) => { + val m = groupSelectExpressionsBySrc(sel) + var srcNamesList : List[String] = List() + var srcExprsList : List[List[String]] = List() + val it = m.iterator + while(it.hasNext) { + val (src, selExprs) = it.next + srcNamesList = srcNamesList :+ s""""$src"""" + srcExprsList = srcExprsList :+ selExprs.map { selExpr => + genQuery(selExpr, true) + } + } + val srcNamesString = srcNamesList.mkString("[", ",", "]") + val srcExprsStringList = srcExprsList.map{_.mkString("[", ",", "]")} + val srcExprsString = srcExprsStringList.foldLeft("")(_ + "{" + _ + "}") + s"${genQuery(child, inSelect)}.select($srcNamesString)$srcExprsString" + } + case BackReference(alias, _, _) => + if (inSelect) gPersistenceBehavior.fieldPrefixInSelect else s"""back("$alias")""" + case AliasExpression(child, alias) => s"""${genQuery(child, inSelect)}.as("$alias")""" + case isTraitLeafExpression(traitName, Some(clsExp)) => + s"""out("${gPersistenceBehavior.traitLabel(clsExp.dataType, traitName)}")""" + case isTraitUnaryExpression(traitName, child) => + s"""out("${gPersistenceBehavior.traitLabel(child.dataType, traitName)}")""" + case hasFieldLeafExpression(fieldName, Some(clsExp)) => + s"""has("$fieldName")""" + case hasFieldUnaryExpression(fieldName, child) => + s"""${genQuery(child, inSelect)}.has("$fieldName")""" + case ArithmeticExpression(symb,left,right) => s"${genQuery(left,inSelect)} $symb ${genQuery(right, inSelect)}" + case l : Literal[_] => l.toString + case x => throw new GremlinTranslationException(x, "expression not yet supported") } def translate() : GremlinQuery = { - val e1 = expr.transformUp(wrapAndRule) + var e1 = expr.transformUp(wrapAndRule) e1.traverseUp(validateComparisonForm) - GremlinQuery("g.V." + genQuery(e1) + ".toList()") + + e1 = e1.transformUp(new AddAliasToSelectInput) + e1.traverseUp(validateSelectExprHaveOneSrc) + + e1 match { + case e1: SelectExpression => GremlinQuery(e1, s"g.V.${genQuery(e1, false)}.toList()") + case e1 => GremlinQuery(e1, s"g.V.${genQuery(e1, false)}.map().toList()") + } + } + /* + * Translation Issues: + * 1. back references in filters. For e.g. testBackreference: 'DB as db Table where (db.name = "Reporting")' + * this is translated to: + * g.V.has("typeName","DB").as("db").in("Table.db").and(_().back("db").has("name", T.eq, "Reporting")).map().toList() + * But the '_().back("db") within the and is ignored, the has condition is applied on the current element. + * The solution is to to do predicate pushdown and apply the filter immediately on top of the referred Expression. + */ + } \ No newline at end of file diff --git a/typesystem/src/main/scala/org/apache/hadoop/metadata/query/QueryProcessor.scala b/typesystem/src/main/scala/org/apache/hadoop/metadata/query/QueryProcessor.scala index aba1f3f..ba271d7 100644 --- a/typesystem/src/main/scala/org/apache/hadoop/metadata/query/QueryProcessor.scala +++ b/typesystem/src/main/scala/org/apache/hadoop/metadata/query/QueryProcessor.scala @@ -24,8 +24,8 @@ import com.thinkaurelius.titan.core.TitanGraph object QueryProcessor { def evaluate(e : Expression, g : TitanGraph) : AnyRef = { - validate(e) - val q = new GremlinTranslator(e).translate() + val e1 = validate(e) + val q = new GremlinTranslator(e1).translate() println(q.queryStr) new GremlinEvaluator(q, g).evaluate() } diff --git a/typesystem/src/test/scala/org/apache/hadoop/metadata/query/ExpressionTest.scala b/typesystem/src/test/scala/org/apache/hadoop/metadata/query/ExpressionTest.scala index db5f274..0b564b0 100644 --- a/typesystem/src/test/scala/org/apache/hadoop/metadata/query/ExpressionTest.scala +++ b/typesystem/src/test/scala/org/apache/hadoop/metadata/query/ExpressionTest.scala @@ -60,7 +60,7 @@ class ExpressionTest extends BaseTest { } @Test def testIsTrait: Unit = { - val e = QueryProcessor.validate(_class("DB").where(isTrait("Jdbc"))) + val e = QueryProcessor.validate(_class("DB").where(isTrait("JdbcAccess"))) println(e) } @@ -112,15 +112,16 @@ class ExpressionTest extends BaseTest { @Test def testJoinAndSelect1: Unit = { val e = QueryProcessor.validate( - _class("DB").as("db").field("Table").as("tab").where((id("createTime") + int(1) > int(0)) - .and(id("db").field("name").`=`(string("Reporting")))).select(id("db").field("name").as("dbName"), id("tab").field("name").as("tabName")) + _class("DB").as("db").field("Table").as("tab").where((id("db").field("createTime") + int(1) > int(0)) + .and(id("db").field("name").`=`(string("Reporting")))).select(id("db").field("name").as("dbName"), + id("tab").field("name").as("tabName")) ) println(e) } @Test def testJoinAndSelect2: Unit = { val e = QueryProcessor.validate( - _class("DB").as("db").field("Table").as("tab").where((id("createTime") + int(1) > int(0)) + _class("DB").as("db").field("Table").as("tab").where((id("db").field("createTime") + int(1) > int(0)) .or(id("db").field("name").`=`(string("Reporting")))) .select(id("db").field("name").as("dbName"), id("tab").field("name").as("tabName")) ) @@ -129,7 +130,7 @@ class ExpressionTest extends BaseTest { @Test def testJoinAndSelect3: Unit = { val e = QueryProcessor.validate( - _class("DB").as("db").field("Table").as("tab").where((id("createTime") + int(1) > int(0)) + _class("DB").as("db").field("Table").as("tab").where((id("db").field("createTime") + int(1) > int(0)) .and(id("db").field("name").`=`(string("Reporting"))) .or(id("db").hasField("owner"))) .select(id("db").field("name").as("dbName"), id("tab").field("name").as("tabName")) @@ -140,7 +141,7 @@ class ExpressionTest extends BaseTest { @Test def testJoinAndSelect4: Unit = { val e = QueryProcessor.validate( _class("DB") as "db" join "Table" as "tab" where ( - id("createTime") + int(1) > int(0) and + id("db").field("createTime") + int(1) > int(0) and ( id("db") `.` "name" `=` string("Reporting") ) or ( id("db") hasField "owner" ) ) select ( diff --git a/typesystem/src/test/scala/org/apache/hadoop/metadata/query/GremlinTest.scala b/typesystem/src/test/scala/org/apache/hadoop/metadata/query/GremlinTest.scala index 3dc06c8..646241a 100644 --- a/typesystem/src/test/scala/org/apache/hadoop/metadata/query/GremlinTest.scala +++ b/typesystem/src/test/scala/org/apache/hadoop/metadata/query/GremlinTest.scala @@ -2,13 +2,18 @@ package org.apache.hadoop.metadata.query import com.thinkaurelius.titan.core.TitanGraph import org.apache.hadoop.metadata.query.Expressions._ +import org.apache.hadoop.metadata.types.TypeSystem +import org.junit.runner.RunWith import org.scalatest._ +import org.scalatest.junit.JUnitRunner +@RunWith(classOf[JUnitRunner]) class GremlinTest extends FunSuite with BeforeAndAfterAll { var g: TitanGraph = null override def beforeAll() { + TypeSystem.getInstance().reset() QueryTestsUtils.setupTypes g = QueryTestsUtils.setupTestGraph } @@ -22,5 +27,76 @@ class GremlinTest extends FunSuite with BeforeAndAfterAll { println(r) } + test("testFilter") { + val r = QueryProcessor.evaluate(_class("DB").where(id("name").`=`(string("Reporting"))), g) + println(r) + } + + test("testSelect") { + val r = QueryProcessor.evaluate(_class("DB").where(id("name").`=`(string("Reporting"))). + select(id("name"), id("owner")), g) + println(r) + } + + test("testIsTrait") { + val r = QueryProcessor.evaluate(_class("Table").where(isTrait("Dimension")), g) + println(r) + } + + test("testhasField") { + val r = QueryProcessor.evaluate(_class("DB").where(hasField("name")), g) + println(r) + } + + test("testFieldReference") { + val r = QueryProcessor.evaluate(_class("DB").field("Table"), g) + println(r) + } + + test("testBackReference") { + val r = QueryProcessor.evaluate( + _class("DB").as("db").field("Table").where(id("db").field("name").`=`(string("Reporting"))), g) + println(r) + } + + test("testArith") { + val r = QueryProcessor.evaluate(_class("DB").where(id("name").`=`(string("Reporting"))). + select(id("name"), id("createTime") + int(1)), g) + println(r) + } + + test("testComparisonLogical") { + val r = QueryProcessor.evaluate(_class("DB").where(id("name").`=`(string("Reporting")). + and(id("createTime") > int(0))), g) + println(r) + } + + test("testJoinAndSelect1") { + val r = QueryProcessor.evaluate( + _class("DB").as("db1").where(id("name").`=`(string("Sales"))).field("Table").as("tab"). + where((isTrait("Dimension"))). + select(id("db1").field("name").as("dbName"), id("tab").field("name").as("tabName")),g + ) + println(r) + } + + test("testJoinAndSelect2") { + val r = QueryProcessor.evaluate( + _class("DB").as("db1").where((id("db1").field("createTime") > int(0)) + .or(id("name").`=`(string("Reporting")))).field("Table").as("tab") + .select(id("db1").field("name").as("dbName"), id("tab").field("name").as("tabName")), g + ) + println(r) + } + + test("testJoinAndSelect3") { + val r = QueryProcessor.evaluate( + _class("DB").as("db1").where((id("db1").field("createTime") > int(0)) + .and(id("db1").field("name").`=`(string("Reporting"))) + .or(id("db1").hasField("owner"))).field("Table").as("tab") + .select(id("db1").field("name").as("dbName"), id("tab").field("name").as("tabName")), g + ) + println(r) + } } diff --git a/typesystem/src/test/scala/org/apache/hadoop/metadata/query/HiveTitanSample.scala b/typesystem/src/test/scala/org/apache/hadoop/metadata/query/HiveTitanSample.scala index 0e76422..c2a5e6f 100644 --- a/typesystem/src/test/scala/org/apache/hadoop/metadata/query/HiveTitanSample.scala +++ b/typesystem/src/test/scala/org/apache/hadoop/metadata/query/HiveTitanSample.scala @@ -78,7 +78,7 @@ object HiveTitanSample { case class ETL(_id: String = "" + nextVertexId.incrementAndGet()) extends Trait - case class DB(name: String, owner: String, traits : Option[List[Trait]] = None, + case class DB(name: String, owner: String, createTime : Int, traits : Option[List[Trait]] = None, _id: String = "" + nextVertexId.incrementAndGet()) extends Instance case class StorageDescriptor(inputFormat : String, outputFormat : String, @@ -118,7 +118,7 @@ object HiveTitanSample { traits : Option[List[Trait]] = None, _id: String = "" + nextVertexId.incrementAndGet()) extends Instance - val salesDB = DB("Sales", "John ETL") + val salesDB = DB("Sales", "John ETL", 1000) val salesFact = TableDef("sales_fact", salesDB, "TextInputFormat", @@ -160,7 +160,7 @@ object HiveTitanSample { ), Some(List(Dimension()))) - val reportingDB = DB("Reporting", "Jane BI") + val reportingDB = DB("Reporting", "Jane BI", 1500) val salesFactDaily = TableDef("sales_fact_daily_mv", reportingDB, "TextInputFormat", diff --git a/typesystem/src/test/scala/org/apache/hadoop/metadata/query/QueryTestsUtils.scala b/typesystem/src/test/scala/org/apache/hadoop/metadata/query/QueryTestsUtils.scala index b7a14c9..20dd6e0 100644 --- a/typesystem/src/test/scala/org/apache/hadoop/metadata/query/QueryTestsUtils.scala +++ b/typesystem/src/test/scala/org/apache/hadoop/metadata/query/QueryTestsUtils.scala @@ -110,7 +110,7 @@ object QueryTestsUtils extends GraphUtils { Array[AttributeDefinition]()) def etlTraitDef = new HierarchicalTypeDefinition[TraitType](classOf[TraitType], "ETL", null, Array[AttributeDefinition]()) - def jdbcTraitDef = new HierarchicalTypeDefinition[TraitType](classOf[TraitType], "Jdbc", null, + def jdbcTraitDef = new HierarchicalTypeDefinition[TraitType](classOf[TraitType], "JdbcAccess", null, Array[AttributeDefinition]()) TypeSystem.getInstance().defineTypes(ImmutableList.of[StructTypeDefinition],