Commit af99895b by Harish Butani

flush out Hive lineage query implementations

parent fb3f6b46
...@@ -18,7 +18,10 @@ ...@@ -18,7 +18,10 @@
package org.apache.hadoop.metadata.query package org.apache.hadoop.metadata.query
import Expressions._
import com.thinkaurelius.titan.core.TitanGraph import com.thinkaurelius.titan.core.TitanGraph
import org.apache.hadoop.metadata.typesystem.types.DataTypes
import org.apache.hadoop.metadata.typesystem.types.DataTypes.PrimitiveType
/** /**
* Represents a Query to compute the closure based on a relationship between entities of a particular type. * Represents a Query to compute the closure based on a relationship between entities of a particular type.
...@@ -59,9 +62,20 @@ import com.thinkaurelius.titan.core.TitanGraph ...@@ -59,9 +62,20 @@ import com.thinkaurelius.titan.core.TitanGraph
*/ */
trait ClosureQuery { trait ClosureQuery {
sealed trait PathAttribute sealed trait PathAttribute {
case class ReverseRelation(typeName : String, attributeName : String)
case class Relation(attributeName : String) def toExpr : Expression = this match {
case r : Relation => id(r.attributeName)
case rr : ReverseRelation => id(rr.typeName)
}
def toFieldName : String = this match {
case r : Relation => r.attributeName
case rr : ReverseRelation => rr.typeName
}
}
case class ReverseRelation(typeName : String, attributeName : String) extends PathAttribute
case class Relation(attributeName : String) extends PathAttribute
/** /**
* Type on whose instances the closure needs to be computed * Type on whose instances the closure needs to be computed
...@@ -71,9 +85,8 @@ trait ClosureQuery { ...@@ -71,9 +85,8 @@ trait ClosureQuery {
/** /**
* specify how instances are related. * specify how instances are related.
* @param attributePath
*/ */
def closureRelation(attributePath : List[PathAttribute]) def closureRelation : List[PathAttribute]
/** /**
* The maximum hops between related instances. A [[None]] implies there is maximum. * The maximum hops between related instances. A [[None]] implies there is maximum.
...@@ -97,7 +110,54 @@ trait ClosureQuery { ...@@ -97,7 +110,54 @@ trait ClosureQuery {
def persistenceStrategy: GraphPersistenceStrategies def persistenceStrategy: GraphPersistenceStrategies
def g: TitanGraph def g: TitanGraph
def evaluate(): GremlinQueryResult = ??? def pathExpr : Expressions.Expression = {
closureRelation.tail.foldLeft(closureRelation.head.toExpr)((b,a) => b.field(a.toFieldName))
}
def selectExpr(alias : String) : List[Expression] = {
selectAttributes.map { _.map { a =>
id(alias).field(a).as(s"${alias}_$a")
}
}.getOrElse(List(id(alias)))
}
/**
* hook to allow a filter to be added for the closureType
* @param expr
* @return
*/
def srcCondition(expr : Expression) : Expression = expr
def expr : Expressions.Expression = {
val e = srcCondition(Expressions._class(closureType)).as("src").loop(pathExpr).as("dest").
select((selectExpr("src") ++ selectExpr("dest")):_*)
if (withPath) e.path else e
}
def evaluate(): GremlinQueryResult = {
var e = expr
QueryProcessor.evaluate(e, g, persistenceStrategy)
}
}
/**
* Closure for a single instance. Instance is specified by an ''attributeToSelectInstance'' and the value
* for the attribute.
*
* @tparam T
*/
trait SingleInstanceClosureQuery[T] extends ClosureQuery {
def attributeToSelectInstance : String
def attributeTyp : PrimitiveType[T]
def instanceValue : T
override def srcCondition(expr : Expression) : Expression = {
expr.where(
Expressions.id(attributeToSelectInstance).`=`(Expressions.literal(attributeTyp, instanceValue))
)
}
} }
/** /**
...@@ -115,6 +175,7 @@ trait ClosureQuery { ...@@ -115,6 +175,7 @@ trait ClosureQuery {
* @param g as needed to evaluate the Closure Query. * @param g as needed to evaluate the Closure Query.
*/ */
case class HiveLineageQuery(tableTypeName : String, case class HiveLineageQuery(tableTypeName : String,
tableName : String,
ctasTypeName : String, ctasTypeName : String,
ctasInputTableAttribute : String, ctasInputTableAttribute : String,
ctasOutputTableAttribute : String, ctasOutputTableAttribute : String,
...@@ -123,11 +184,16 @@ case class HiveLineageQuery(tableTypeName : String, ...@@ -123,11 +184,16 @@ case class HiveLineageQuery(tableTypeName : String,
withPath : Boolean, withPath : Boolean,
persistenceStrategy: GraphPersistenceStrategies, persistenceStrategy: GraphPersistenceStrategies,
g: TitanGraph g: TitanGraph
) extends ClosureQuery { ) extends SingleInstanceClosureQuery[String] {
def closureType : String = tableTypeName val closureType : String = tableTypeName
def closureRelation(attributePath : List[PathAttribute]) = List( val attributeToSelectInstance = "name"
val attributeTyp = DataTypes.STRING_TYPE
val instanceValue = tableName
lazy val closureRelation = List(
ReverseRelation(ctasTypeName, ctasOutputTableAttribute), ReverseRelation(ctasTypeName, ctasOutputTableAttribute),
Relation(ctasInputTableAttribute) Relation(ctasInputTableAttribute)
) )
...@@ -149,6 +215,7 @@ case class HiveLineageQuery(tableTypeName : String, ...@@ -149,6 +215,7 @@ case class HiveLineageQuery(tableTypeName : String,
* @param g as needed to evaluate the Closure Query. * @param g as needed to evaluate the Closure Query.
*/ */
case class HiveWhereUsedQuery(tableTypeName : String, case class HiveWhereUsedQuery(tableTypeName : String,
tableName : String,
ctasTypeName : String, ctasTypeName : String,
ctasInputTableAttribute : String, ctasInputTableAttribute : String,
ctasOutputTableAttribute : String, ctasOutputTableAttribute : String,
...@@ -157,11 +224,16 @@ case class HiveWhereUsedQuery(tableTypeName : String, ...@@ -157,11 +224,16 @@ case class HiveWhereUsedQuery(tableTypeName : String,
withPath : Boolean, withPath : Boolean,
persistenceStrategy: GraphPersistenceStrategies, persistenceStrategy: GraphPersistenceStrategies,
g: TitanGraph g: TitanGraph
) extends ClosureQuery { ) extends SingleInstanceClosureQuery[String] {
val closureType : String = tableTypeName
val attributeToSelectInstance = "name"
val attributeTyp = DataTypes.STRING_TYPE
def closureType : String = tableTypeName val instanceValue = tableName
def closureRelation(attributePath : List[PathAttribute]) = List( lazy val closureRelation = List(
ReverseRelation(ctasTypeName, ctasInputTableAttribute), ReverseRelation(ctasTypeName, ctasInputTableAttribute),
Relation(ctasOutputTableAttribute) Relation(ctasOutputTableAttribute)
) )
......
...@@ -23,15 +23,15 @@ import org.apache.hadoop.metadata.query.Expressions._ ...@@ -23,15 +23,15 @@ import org.apache.hadoop.metadata.query.Expressions._
object QueryProcessor { object QueryProcessor {
def evaluate(e: Expression, g: TitanGraph): GremlinQueryResult = { def evaluate(e: Expression, g: TitanGraph, gP : GraphPersistenceStrategies = GraphPersistenceStrategy1):
val gP = GraphPersistenceStrategy1 GremlinQueryResult = {
val e1 = validate(e) val e1 = validate(e)
val q = new GremlinTranslator(e1, gP).translate() val q = new GremlinTranslator(e1, gP).translate()
// println("---------------------") // println("---------------------")
// println("Query: " + e1) // println("Query: " + e1)
// println("Expression Tree:\n" + e1.treeString) // println("Expression Tree:\n" + e1.treeString)
// println("Gremlin Query: " + q.queryStr) // println("Gremlin Query: " + q.queryStr)
// println("---------------------") // println("---------------------")
new GremlinEvaluator(q, gP, g).evaluate() new GremlinEvaluator(q, gP, g).evaluate()
} }
......
...@@ -86,4 +86,22 @@ class GremlinTest2 extends FunSuite with BeforeAndAfterAll with BaseGremlinTest ...@@ -86,4 +86,22 @@ class GremlinTest2 extends FunSuite with BeforeAndAfterAll with BaseGremlinTest
validateJson(r) validateJson(r)
} }
// test("testLineageAllSelectWithPathFromParser2") {
// val p = new QueryParser
// val e = p("Table as src loop (LoadProcess inputTables) as dest " +
// "select src.name as srcTable, dest.name as destTable withPath").right.get
// //Table as src loop (LoadProcess where LoadProcess.outputTable) as dest select src.name as srcTable, dest.name as destTable withPath
// val r = QueryProcessor.evaluate(e, g)
// validateJson(r)
// }
//
// test("testHighLevelLineage") {
// val r = HiveLineageQuery("Table", "sales_daily_mv",
// "LoadProcess",
// "inputTables",
// "outputTable",
// None, Some(List("name")), true, GraphPersistenceStrategy1, g).evaluate()
// validateJson(r)
// }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment