Commit b06e45b8 by Harish Butani

add support for loop expression, add lineage tests

parent 47594923
...@@ -299,6 +299,9 @@ object Expressions { ...@@ -299,6 +299,9 @@ object Expressions {
def where(condExpr: Expression) = new FilterExpression(this, condExpr) def where(condExpr: Expression) = new FilterExpression(this, condExpr)
def select(selectList: Expression*) = new SelectExpression(this, selectList.toList) def select(selectList: Expression*) = new SelectExpression(this, selectList.toList)
def loop(loopingExpr : Expression) = new LoopExpression(this, loopingExpr, None)
def loop(loopingExpr : Expression, times : Literal[Int]) =
new LoopExpression(this, loopingExpr, Some(times))
} }
...@@ -624,4 +627,30 @@ object Expressions { ...@@ -624,4 +627,30 @@ object Expressions {
override def toString = s"""$child select ${selectListWithAlias.mkString("", ", ", "")}""" override def toString = s"""$child select ${selectListWithAlias.mkString("", ", ", "")}"""
} }
case class LoopExpression(val input: Expression, val loopingExpression: Expression,
val times : Option[Literal[Int]]) extends Expression {
val children = List(input, loopingExpression)
lazy val dataType = {
if (!resolved) {
throw new UnresolvedException(this,
s"datatype. Can not resolve due to unresolved children")
}
if ( input.dataType.getTypeCategory != TypeCategory.CLASS) {
throw new ExpressionException(this, s"Loop Expression applied to type : '${input.dataType.getName}';" +
" loop can only be applied to Class Expressions")
}
if (input.dataType != loopingExpression.dataType) {
throw new ExpressionException(this,
s"Invalid Loop Expression; input and loopExpression dataTypes don't match: " +
s"(${input.dataType.getName},${loopingExpression.dataType.getName}})")
}
input.dataType
}
override def namedExpressions = input.namedExpressions
override def toString = {
if (times.isDefined) s"$input loop ($loopingExpression) times ${times.get.value}"
else s"$input loop ($loopingExpression)"
}
}
} }
...@@ -91,6 +91,10 @@ trait GraphPersistenceStrategies { ...@@ -91,6 +91,10 @@ trait GraphPersistenceStrategies {
case "<=" => "T.lte" case "<=" => "T.lte"
case _ => throw new ExpressionException(op, "Comparison operator not supported in Gremlin") case _ => throw new ExpressionException(op, "Comparison operator not supported in Gremlin")
} }
def loopObjectExpression(dataType : IDataType[_]) = {
s"{it.object.'${typeAttributeName}' == '${dataType.getName}'}"
}
} }
object GraphPersistenceStrategy1 extends GraphPersistenceStrategies { object GraphPersistenceStrategy1 extends GraphPersistenceStrategies {
......
...@@ -20,6 +20,7 @@ package org.apache.hadoop.metadata.query ...@@ -20,6 +20,7 @@ package org.apache.hadoop.metadata.query
import org.apache.hadoop.metadata.query.Expressions._ import org.apache.hadoop.metadata.query.Expressions._
import org.apache.hadoop.metadata.types.DataTypes.TypeCategory import org.apache.hadoop.metadata.types.DataTypes.TypeCategory
import org.apache.hadoop.metadata.types.IDataType
import scala.collection.mutable import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
...@@ -141,6 +142,16 @@ class GremlinTranslator(expr: Expression, ...@@ -141,6 +142,16 @@ class GremlinTranslator(expr: Expression,
() ()
} }
class counter {var i : Int = -1; def next : Int = {i+= 1; i}}
def addAliasToLoopInput(c : counter = new counter()) : PartialFunction[Expression, Expression] = {
case l@LoopExpression(aliasE@AliasExpression(_,_), _, _) => l
case l@LoopExpression(inputExpr, loopExpr, t) => {
val aliasE = AliasExpression(inputExpr, s"_loop${c.next}")
LoopExpression(aliasE, loopExpr, t)
}
}
private def genQuery(expr: Expression, inSelect: Boolean): String = expr match { private def genQuery(expr: Expression, inSelect: Boolean): String = expr match {
case ClassExpression(clsName) => s"""has("${gPersistenceBehavior.typeAttributeName}","$clsName")""" case ClassExpression(clsName) => s"""has("${gPersistenceBehavior.typeAttributeName}","$clsName")"""
case TraitExpression(clsName) => s"""has("${gPersistenceBehavior.typeAttributeName}","$clsName")""" case TraitExpression(clsName) => s"""has("${gPersistenceBehavior.typeAttributeName}","$clsName")"""
...@@ -192,6 +203,14 @@ class GremlinTranslator(expr: Expression, ...@@ -192,6 +203,14 @@ class GremlinTranslator(expr: Expression,
val srcExprsString = srcExprsStringList.foldLeft("")(_ + "{" + _ + "}") val srcExprsString = srcExprsStringList.foldLeft("")(_ + "{" + _ + "}")
s"${genQuery(child, inSelect)}.select($srcNamesString)$srcExprsString" s"${genQuery(child, inSelect)}.select($srcNamesString)$srcExprsString"
} }
case loop@LoopExpression(input, loopExpr, t) => {
val inputQry = genQuery(input, inSelect)
val loopingPathGExpr = genQuery(loopExpr, inSelect)
val loopGExpr = s"""loop("${input.asInstanceOf[AliasExpression].alias}")"""
val untilCriteria = if ( t.isDefined) s"{it.loops < ${t.get.value}}" else "{true}"
val loopObjectGExpr = gPersistenceBehavior.loopObjectExpression(input.dataType)
s"""${inputQry}.${loopingPathGExpr}.${loopGExpr}${untilCriteria}${loopObjectGExpr}"""
}
case BackReference(alias, _, _) => case BackReference(alias, _, _) =>
if (inSelect) gPersistenceBehavior.fieldPrefixInSelect else s"""back("$alias")""" if (inSelect) gPersistenceBehavior.fieldPrefixInSelect else s"""back("$alias")"""
case AliasExpression(child, alias) => s"""${genQuery(child, inSelect)}.as("$alias")""" case AliasExpression(child, alias) => s"""${genQuery(child, inSelect)}.as("$alias")"""
...@@ -215,6 +234,7 @@ class GremlinTranslator(expr: Expression, ...@@ -215,6 +234,7 @@ class GremlinTranslator(expr: Expression,
e1 = e1.transformUp(new AddAliasToSelectInput) e1 = e1.transformUp(new AddAliasToSelectInput)
e1.traverseUp(validateSelectExprHaveOneSrc) e1.traverseUp(validateSelectExprHaveOneSrc)
e1 = e1.transformUp(addAliasToLoopInput())
e1 match { e1 match {
case e1: SelectExpression => { case e1: SelectExpression => {
......
...@@ -21,7 +21,8 @@ package org.apache.hadoop.metadata.query ...@@ -21,7 +21,8 @@ package org.apache.hadoop.metadata.query
import Expressions._ import Expressions._
import org.apache.hadoop.metadata.types.IDataType import org.apache.hadoop.metadata.types.IDataType
class Resolver(srcExpr : Option[Expression] = None, aliases : Map[String, Expression] = Map()) class Resolver(srcExpr : Option[Expression] = None, aliases : Map[String, Expression] = Map(),
connectClassExprToSrc : Boolean = false)
extends PartialFunction[Expression, Expression] { extends PartialFunction[Expression, Expression] {
import TypeUtils._ import TypeUtils._
...@@ -30,6 +31,16 @@ extends PartialFunction[Expression, Expression] { ...@@ -30,6 +31,16 @@ extends PartialFunction[Expression, Expression] {
def apply(e : Expression) : Expression = e match { def apply(e : Expression) : Expression = e match {
case idE@IdExpression(name) => { case idE@IdExpression(name) => {
val backExpr = aliases.get(name)
if ( backExpr.isDefined) {
return new BackReference(name, backExpr.get, None)
}
if (srcExpr.isDefined ) {
val fInfo = resolveReference(srcExpr.get.dataType, name)
if ( fInfo.isDefined) {
return new FieldExpression(name, fInfo.get, None)
}
}
val cType = resolveAsClassType(name) val cType = resolveAsClassType(name)
if (cType.isDefined) { if (cType.isDefined) {
return new ClassExpression(name) return new ClassExpression(name)
...@@ -38,17 +49,14 @@ extends PartialFunction[Expression, Expression] { ...@@ -38,17 +49,14 @@ extends PartialFunction[Expression, Expression] {
if (tType.isDefined) { if (tType.isDefined) {
return new TraitExpression(name) return new TraitExpression(name)
} }
if (srcExpr.isDefined ) { idE
val fInfo = resolveReference(srcExpr.get.dataType, name)
if ( fInfo.isDefined) {
return new FieldExpression(name, fInfo.get, None)
}
} }
val backExpr = aliases.get(name) case ce@ClassExpression(clsName) if connectClassExprToSrc && srcExpr.isDefined => {
if ( backExpr.isDefined) { val fInfo = resolveReference(srcExpr.get.dataType, clsName)
return new BackReference(name, backExpr.get, None) if (fInfo.isDefined) {
return new FieldExpression(clsName, fInfo.get, None)
} }
idE ce
} }
case f@UnresolvedFieldExpression(child, fieldName) if child.resolved => { case f@UnresolvedFieldExpression(child, fieldName) if child.resolved => {
var fInfo : Option[FieldInfo] = None var fInfo : Option[FieldInfo] = None
...@@ -73,6 +81,10 @@ extends PartialFunction[Expression, Expression] { ...@@ -73,6 +81,10 @@ extends PartialFunction[Expression, Expression] {
val r = new Resolver(Some(child), child.namedExpressions) val r = new Resolver(Some(child), child.namedExpressions)
return new SelectExpression(child, selectList.map{_.transformUp(r)}) return new SelectExpression(child, selectList.map{_.transformUp(r)})
} }
case l@LoopExpression(inputExpr, loopExpr, t) if inputExpr.resolved => {
val r = new Resolver(Some(inputExpr), inputExpr.namedExpressions, true)
return new LoopExpression(inputExpr, loopExpr.transformUp(r), t)
}
case x => x case x => x
} }
} }
...@@ -88,7 +100,8 @@ object FieldValidator extends PartialFunction[Expression, Expression] { ...@@ -88,7 +100,8 @@ object FieldValidator extends PartialFunction[Expression, Expression] {
def isSrc(e : Expression) = e.isInstanceOf[ClassExpression] || e.isInstanceOf[TraitExpression] def isSrc(e : Expression) = e.isInstanceOf[ClassExpression] || e.isInstanceOf[TraitExpression]
def validateQualifiedField(srcDataType : IDataType[_]) : PartialFunction[Expression, Expression] = { def validateQualifiedField(srcDataType : IDataType[_]) : PartialFunction[Expression, Expression] = {
case FieldExpression(fNm, fInfo, Some(child)) if (child.children == Nil && child.dataType == srcDataType) => case FieldExpression(fNm, fInfo, Some(child))
if (child.children == Nil && !child.isInstanceOf[BackReference] && child.dataType == srcDataType) =>
FieldExpression(fNm, fInfo, None) FieldExpression(fNm, fInfo, None)
case fe@FieldExpression(fNm, fInfo, Some(child)) if isSrc(child) => case fe@FieldExpression(fNm, fInfo, Some(child)) if isSrc(child) =>
throw new ExpressionException(fe, s"srcType of field doesn't match input type") throw new ExpressionException(fe, s"srcType of field doesn't match input type")
...@@ -102,6 +115,13 @@ object FieldValidator extends PartialFunction[Expression, Expression] { ...@@ -102,6 +115,13 @@ object FieldValidator extends PartialFunction[Expression, Expression] {
throw new ExpressionException(iT, s"srcType of field doesn't match input type") throw new ExpressionException(iT, s"srcType of field doesn't match input type")
} }
def validateOnlyFieldReferencesInLoopExpressions(loopExpr : LoopExpression)
: PartialFunction[Expression, Unit] = {
case f : FieldExpression => ()
case x => throw new ExpressionException(loopExpr,
s"Loop Expression can only contain field references; '${x.toString}' not supported." )
}
def apply(e : Expression) : Expression = e match { def apply(e : Expression) : Expression = e match {
case f@FilterExpression(inputExpr, condExpr) => { case f@FilterExpression(inputExpr, condExpr) => {
val validatedCE = condExpr.transformUp(validateQualifiedField(inputExpr.dataType)) val validatedCE = condExpr.transformUp(validateQualifiedField(inputExpr.dataType))
...@@ -115,6 +135,15 @@ object FieldValidator extends PartialFunction[Expression, Expression] { ...@@ -115,6 +135,15 @@ object FieldValidator extends PartialFunction[Expression, Expression] {
val v = validateQualifiedField(child.dataType) val v = validateQualifiedField(child.dataType)
return new SelectExpression(child, selectList.map{_.transformUp(v)}) return new SelectExpression(child, selectList.map{_.transformUp(v)})
} }
case l@LoopExpression(inputExpr, loopExpr, t) => {
val validatedLE = loopExpr.transformUp(validateQualifiedField(inputExpr.dataType))
val l1 = {
if ( validatedLE.fastEquals(loopExpr) ) l
else new LoopExpression(inputExpr, validatedLE, t)
}
l1.loopingExpression.traverseUp(validateOnlyFieldReferencesInLoopExpressions(l1))
l1
}
case x => x case x => x
} }
} }
\ No newline at end of file
...@@ -165,4 +165,9 @@ class ExpressionTest extends BaseTest { ...@@ -165,4 +165,9 @@ class ExpressionTest extends BaseTest {
) )
println(e) println(e)
} }
@Test def testLineageAll: Unit = {
val e = QueryProcessor.validate(_class("Table").loop(id("LoadProcess").field("outputTable")))
println(e)
}
} }
package org.apache.hadoop.metadata.query
import com.thinkaurelius.titan.core.TitanGraph
import org.apache.hadoop.metadata.query.Expressions._
import org.apache.hadoop.metadata.types.TypeSystem
import org.junit.Test
import org.junit.runner.RunWith
import org.scalatest.{Assertions, BeforeAndAfterAll, FunSuite}
import org.scalatest.junit.JUnitRunner
@RunWith(classOf[JUnitRunner])
class LineageQueryTest extends FunSuite with BeforeAndAfterAll {
var g: TitanGraph = null
override def beforeAll() {
TypeSystem.getInstance().reset()
QueryTestsUtils.setupTypes
g = QueryTestsUtils.setupTestGraph
}
override def afterAll() {
g.shutdown()
}
val STRUCT_NAME_REGEX = (TypeUtils.TEMP_STRUCT_NAME_PREFIX + "\\d+").r
val PREFIX_SPACES_REGEX = ("\\n\\s*").r
def validateJson(r : GremlinQueryResult, expected : String = null) : Unit = {
val rJ = r.toJson
if ( expected != null ) {
var a = STRUCT_NAME_REGEX.replaceAllIn(rJ, "")
a = PREFIX_SPACES_REGEX.replaceAllIn(a, "")
var b = STRUCT_NAME_REGEX.replaceAllIn(expected, "")
b = PREFIX_SPACES_REGEX.replaceAllIn(b, "")
Assertions.assert(a == b)
} else {
println(rJ)
}
}
test("testInputTables") {
val r = QueryProcessor.evaluate(_class("LoadProcess").field("inputTables"), g)
val x = r.toJson
validateJson(r, """{
"query":"LoadProcess inputTables",
"dataType":{
"superTypes":[
],
"hierarchicalMetaTypeName":"org.apache.hadoop.metadata.types.ClassType",
"typeName":"Table",
"attributeDefinitions":[
{
"name":"name",
"dataTypeName":"string",
"multiplicity":{
"lower":0,
"upper":1,
"isUnique":false
},
"isComposite":false,
"isUnique":false,
"isIndexable":true,
"reverseAttributeName":null
},
{
"name":"db",
"dataTypeName":"DB",
"multiplicity":{
"lower":1,
"upper":1,
"isUnique":false
},
"isComposite":false,
"isUnique":false,
"isIndexable":true,
"reverseAttributeName":null
},
{
"name":"sd",
"dataTypeName":"StorageDesc",
"multiplicity":{
"lower":1,
"upper":1,
"isUnique":false
},
"isComposite":false,
"isUnique":false,
"isIndexable":true,
"reverseAttributeName":null
}
]
},
"rows":[
{
"$typeName$":"Table",
"$id$":{
"id":"2048",
"$typeName$":"Table",
"version":0
},
"sd":{
"id":"512",
"$typeName$":"StorageDesc",
"version":0
},
"db":{
"id":"256",
"$typeName$":"DB",
"version":0
},
"name":"sales_fact"
},
{
"$typeName$":"Table",
"$id$":{
"id":"4864",
"$typeName$":"Table",
"version":0
},
"sd":{
"id":"3840",
"$typeName$":"StorageDesc",
"version":0
},
"db":{
"id":"256",
"$typeName$":"DB",
"version":0
},
"name":"time_dim",
"$traits$":{
"Dimension":{
"$typeName$":"Dimension"
}
}
},
{
"$typeName$":"Table",
"$id$":{
"id":"8960",
"$typeName$":"Table",
"version":0
},
"sd":{
"id":"7424",
"$typeName$":"StorageDesc",
"version":0
},
"db":{
"id":"7168",
"$typeName$":"DB",
"version":0
},
"name":"sales_fact_daily_mv"
}
]
}""")
}
test("testLoadProcessOut") {
val r = QueryProcessor.evaluate(_class("Table").field("LoadProcess").field("outputTable"), g)
validateJson(r, null)
}
test("testLineageAll") {
val r = QueryProcessor.evaluate(_class("Table").loop(id("LoadProcess").field("outputTable")), g)
validateJson(r, """{
"query":"Table as _loop0 loop (LoadProcess outputTable)",
"dataType":{
"superTypes":[
],
"hierarchicalMetaTypeName":"org.apache.hadoop.metadata.types.ClassType",
"typeName":"Table",
"attributeDefinitions":[
{
"name":"name",
"dataTypeName":"string",
"multiplicity":{
"lower":0,
"upper":1,
"isUnique":false
},
"isComposite":false,
"isUnique":false,
"isIndexable":true,
"reverseAttributeName":null
},
{
"name":"db",
"dataTypeName":"DB",
"multiplicity":{
"lower":1,
"upper":1,
"isUnique":false
},
"isComposite":false,
"isUnique":false,
"isIndexable":true,
"reverseAttributeName":null
},
{
"name":"sd",
"dataTypeName":"StorageDesc",
"multiplicity":{
"lower":1,
"upper":1,
"isUnique":false
},
"isComposite":false,
"isUnique":false,
"isIndexable":true,
"reverseAttributeName":null
}
]
},
"rows":[
{
"$typeName$":"Table",
"$id$":{
"id":"8960",
"$typeName$":"Table",
"version":0
},
"sd":{
"id":"7424",
"$typeName$":"StorageDesc",
"version":0
},
"db":{
"id":"7168",
"$typeName$":"DB",
"version":0
},
"name":"sales_fact_daily_mv"
},
{
"$typeName$":"Table",
"$id$":{
"id":"12800",
"$typeName$":"Table",
"version":0
},
"sd":{
"id":"11264",
"$typeName$":"StorageDesc",
"version":0
},
"db":{
"id":"7168",
"$typeName$":"DB",
"version":0
},
"name":"sales_fact_monthly_mv"
},
{
"$typeName$":"Table",
"$id$":{
"id":"8960",
"$typeName$":"Table",
"version":0
},
"sd":{
"id":"7424",
"$typeName$":"StorageDesc",
"version":0
},
"db":{
"id":"7168",
"$typeName$":"DB",
"version":0
},
"name":"sales_fact_daily_mv"
},
{
"$typeName$":"Table",
"$id$":{
"id":"12800",
"$typeName$":"Table",
"version":0
},
"sd":{
"id":"11264",
"$typeName$":"StorageDesc",
"version":0
},
"db":{
"id":"7168",
"$typeName$":"DB",
"version":0
},
"name":"sales_fact_monthly_mv"
},
{
"$typeName$":"Table",
"$id$":{
"id":"12800",
"$typeName$":"Table",
"version":0
},
"sd":{
"id":"11264",
"$typeName$":"StorageDesc",
"version":0
},
"db":{
"id":"7168",
"$typeName$":"DB",
"version":0
},
"name":"sales_fact_monthly_mv"
}
]
}""")
}
test("testLineageAllSelect") {
val r = QueryProcessor.evaluate(_class("Table").as("src").loop(id("LoadProcess").field("outputTable")).as("dest").
select(id("src").field("name").as("srcTable"), id("dest").field("name").as("destTable")), g)
validateJson(r, """{
"query":"Table as src loop (LoadProcess outputTable) as dest select src.name as srcTable, dest.name as destTable",
"dataType":{
"typeName":"__tempQueryResultStruct2",
"attributeDefinitions":[
{
"name":"srcTable",
"dataTypeName":"string",
"multiplicity":{
"lower":0,
"upper":1,
"isUnique":false
},
"isComposite":false,
"isUnique":false,
"isIndexable":true,
"reverseAttributeName":null
},
{
"name":"destTable",
"dataTypeName":"string",
"multiplicity":{
"lower":0,
"upper":1,
"isUnique":false
},
"isComposite":false,
"isUnique":false,
"isIndexable":true,
"reverseAttributeName":null
}
]
},
"rows":[
{
"$typeName$":"__tempQueryResultStruct2",
"srcTable":"sales_fact",
"destTable":"sales_fact_daily_mv"
},
{
"$typeName$":"__tempQueryResultStruct2",
"srcTable":"sales_fact",
"destTable":"sales_fact_monthly_mv"
},
{
"$typeName$":"__tempQueryResultStruct2",
"srcTable":"time_dim",
"destTable":"sales_fact_daily_mv"
},
{
"$typeName$":"__tempQueryResultStruct2",
"srcTable":"time_dim",
"destTable":"sales_fact_monthly_mv"
},
{
"$typeName$":"__tempQueryResultStruct2",
"srcTable":"sales_fact_daily_mv",
"destTable":"sales_fact_monthly_mv"
}
]
}""")
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment