Commit 06ade6c4 by Harish Butani

Expressions initial checkin

parent 657b74a4
#+TITLE: Query DSL
#+AUTHOR: Harish Butani
#+EMAIL: hbutani@apache.org
#+LANGUAGE: en
#+INFOJS_OPT: view:showall toc:t ltoc:t mouse:underline path:http://orgmode.org/org-info.js
#+LINK_HOME: http://home.fnal.gov/~neilsen
#+LINK_UP: http://home.fnal.gov/~neilsen/notebook
#+HTML_HEAD: <link rel="stylesheet" type="text/css" href="http://orgmode.org/org-manual.css" />
#+LaTeX_CLASS: smarticle
#+LaTeX_HEADER: \pdfmapfile{/home/neilsen/texmf/fonts/map/dvips/libertine/libertine.map}
#+LaTeX_HEADER: \usepackage[ttscale=.875]{libertine}
#+LaTeX_HEADER: \usepackage{sectsty}
#+LaTeX_HEADER: \sectionfont{\normalfont\scshape}
#+LaTeX_HEADER: \subsectionfont{\normalfont\itshape}
#+EXPORT_SELECT_TAGS: export
#+EXPORT_EXCLUDE_TAGS: noexport
#+OPTIONS: H:2 num:nil toc:nil \n:nil @:t ::t |:t ^:{} _:{} *:t TeX:t LaTeX:t
#+STARTUP: showall
#+OPTIONS: html-postamble:nil
** Example Type Definitions
#+begin_src plantuml :file class_diagram.png
scale 1300 width
note left of Trait : traits are classifications/tags attached to Instances
class Trait
Trait <|-- JDbcAccess
Trait <|-- PII
Trait <|-- Dimension
Trait <|-- Metric
Trait <|-- ETL
class Object
Object --* Trait : traits >
Object <|-- DB
Object <|-- Table
Object <|-- Column
class DB {
name : String
owner : String
}
class StorageDescriptor {
inputFormat : String
outputFormat : String
}
class Column {
name : String
dataType : String
}
class Table {
name: String
db: DB
}
Table -> StorageDescriptor : storageDesc >
Table -> DB : db >
Column *-> StorageDescriptor : storageDesc >
class LoadProcess {
name String
}
LoadProcess -* Table : inputTables >
LoadProcess -> Table : outputTable >
class View {
name String
}
View -* Table : inputTables >
#+end_src
#+CAPTION: ETL and Reporting Scenario Types
#+LABEL: fig:sampleTypeDefs
#+results:
[[file:class_diagram.png]]
** Example Instance Graph
#+begin_src dot :file instanceGraph.png :cmdline -Kdot -Tpng
digraph G {
//size ="6 6";
nodesep=.2;
//rankdir=LR;
ranksep=.25;
node [shape=record fontsize=9];
compound=true;
subgraph cluster0 {
style=bold;
label = "Sales Database"; fontsize=18;
salesDB[label="DB(sales)"]
salesFact[label="Table(sales_fact)" style=filled; color="khaki"]
salesStorage[label="Storage(text,text)"]
sales_time_id[label="time_id" shape="circle" style=filled color="peachpuff"]
sales_product_id[label="product_id" shape="circle" style=filled color="peachpuff"]
sales_customer_id[label="customer_id" shape="circle" style=filled color="peachpuff"]
sales_sales[label="sales" shape="circle" style=filled color="peachpuff"]
sales_sales_metric[label="Metric" style=filled; shape="ellipse" color="turquoise"]
salesFact -> salesDB;
salesFact -> salesStorage;
sales_time_id -> salesStorage;
sales_product_id -> salesStorage;
sales_customer_id -> salesStorage;
sales_sales -> salesStorage;
sales_sales -> sales_sales_metric;
productDim[label="Table(product_dim)" style=filled; color="khaki"]
productStorage[label="Storage(text,text)"]
product_product_id[label="product_id" shape="circle" style=filled color="peachpuff"]
product_product_name[label="product_name" shape="circle" style=filled color="peachpuff"]
product_brand_name[label="brand_name" shape="circle" style=filled color="peachpuff"]
product_dimension[label="Dimension" style=filled; shape="ellipse" color="turquoise"]
productDim -> salesDB;
productDim -> productStorage;
product_product_id -> productStorage;
product_product_name -> productStorage;
product_brand_name -> productStorage;
productDim -> product_dimension;
productDim -> salesFact [style=invis];
timeDim[label="Table(time_dim)" style=filled; color="khaki"]
timeStorage[label="Storage(text,text)"]
time_time_id[label="time_id" shape="circle" style=filled color="peachpuff"]
time_dayOfYear[label="day_of_year" shape="circle" style=filled color="peachpuff"]
time_weekDay[label="week_day" shape="circle" style=filled color="peachpuff"]
time_dimension[label="Dimension" style=filled; shape="ellipse" color="turquoise"]
timeDim -> salesDB;
timeDim -> timeStorage;
time_time_id -> timeStorage;
time_dayOfYear -> timeStorage;
time_weekDay -> timeStorage;
timeDim -> time_dimension;
timeDim -> productDim [style=invis];
customerDim[label="Table(customer_dim)" style=filled; color="khaki"]
customerStorage[label="Storage(text,text)"]
customer_customer_id[label="customer_id" shape="circle" style=filled color="peachpuff"]
customer_name[label="name" shape="circle" style=filled color="peachpuff"]
customer_address[label="address" shape="circle" style=filled color="peachpuff"]
customer_dimension[label="Dimension" style=filled; shape="ellipse" color="turquoise"]
address_pii[label="PII" style=filled; shape="ellipse" color="turquoise"]
customerDim -> salesDB;
customerDim -> customerStorage;
customer_customer_id -> customerStorage;
customer_name -> customerStorage;
customer_address -> customerStorage;
customerDim -> customer_dimension;
customer_address -> address_pii;
customerDim -> timeDim [style=invis];
//{rank=min; salesDB};
{rank=min; salesDB};
};
subgraph cluster1 {
style=bold;
label = "Reporting Database"; fontsize=18;
reportingDB[label="DB(reporting)"]
salesFactDaily[label="Table(sales_daily_mv)" style=filled; color="khaki"]
salesDailyStorage[label="Storage(orc,orc)"]
salesD_time_id[label="time_id" shape="circle" style=filled color="peachpuff"]
salesD_product_id[label="product_id" shape="circle" style=filled color="peachpuff"]
salesD_customer_id[label="customer_id" shape="circle" style=filled color="peachpuff"]
salesD_sales[label="sales" shape="circle" style=filled color="peachpuff"]
salesD_sales_metric[label="Metric" style=filled; shape="ellipse" color="turquoise"]
salesFactDaily -> reportingDB;
salesFactDaily -> salesDailyStorage;
salesD_time_id -> salesDailyStorage;
salesD_product_id -> salesDailyStorage;
salesD_customer_id -> salesDailyStorage;
salesD_sales -> salesDailyStorage;
salesD_sales -> salesD_sales_metric;
salesFactDaily -> reportingDB [style=invis];
productDimView[label="View(product_dim_v)" style=filled; color="khaki"]
productDim -> productDimView [style=dotted];
productDimView_dim[label="Dimension" style=filled; shape="ellipse" color="turquoise"]
productDimView_jdbc[label="JdbcAccess" style=filled; shape="ellipse" color="turquoise"]
productDimView -> productDimView_dim;
productDimView -> productDimView_jdbc;
productDimView -> salesFactDaily [style=invis];
customerDimView[label="View(customer_dim_v)" style=filled; color="khaki"]
customerDim -> customerDimView [style=dotted];
customerDimView_dim[label="Dimension" style=filled; shape="ellipse" color="turquoise"]
customerDimView_jdbc[label="JdbcAccess" style=filled; shape="ellipse" color="turquoise"]
customerDimView -> customerDimView_dim;
customerDimView -> customerDimView_jdbc;
customerDimView -> salesFactDaily [style=invis];
salesMonthly[label="Table(sales_monthly_mv)" style=filled; color="khaki"]
salesMonthlyStorage[label="Storage(orc,orc)"]
salesM_time_id[label="time_id" shape="circle" style=filled color="peachpuff"]
salesM_product_id[label="product_id" shape="circle" style=filled color="peachpuff"]
salesM_customer_id[label="customer_id" shape="circle" style=filled color="peachpuff"]
salesM_sales[label="sales" shape="circle" style=filled color="peachpuff"]
salesM_sales_metric[label="Metric" style=filled; shape="ellipse" color="turquoise"]
salesMonthly -> reportingDB;
salesMonthly -> salesMonthlyStorage;
salesM_time_id -> salesMonthlyStorage;
salesM_product_id -> salesMonthlyStorage;
salesM_customer_id -> salesMonthlyStorage;
salesM_sales -> salesMonthlyStorage;
salesM_sales -> salesM_sales_metric;
salesMonthly -> customerDimView [style=invis];
{rank=min; reportingDB};
};
loadSalesDaily[label="LoadProcess(loadSalesDaily)" style=filled; color="seagreen"; shape="octagon"]
loadSalesDaily_etl[label="ETL" style=filled; shape="ellipse" color="turquoise"]
salesFact -> loadSalesDaily [style=dotted];
timeDim -> loadSalesDaily [style=dotted];
loadSalesDaily -> salesFactDaily [style=dotted];
loadSalesDaily -> loadSalesDaily_etl;
loadSalesMonthly[label="LoadProcess(loadSalesMonthly)" style=filled; color="seagreen"; shape="octagon"]
loadSalesMonthly_etl[label="ETL" style=filled; shape="ellipse" color="turquoise"]
salesFactDaily -> loadSalesMonthly [style=dotted];
timeDim -> loadSalesMonthly [style=dotted];
loadSalesMonthly -> salesMonthly [style=dotted];
loadSalesMonthly -> loadSalesMonthly_etl;
}
#+end_src
#+CAPTION: ETL and Reporting Scenario
#+LABEL: fig:sampleInstanceGraph
#+results:
[[file:instanceGraph.png]]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.query
import org.apache.hadoop.metadata.MetadataException
import org.apache.hadoop.metadata.types.DataTypes.PrimitiveType
import org.apache.hadoop.metadata.types._
object Expressions {
import TypeUtils._
class ExpressionException(val e: Expression, message: String, cause: Throwable, enableSuppression: Boolean,
writableStackTrace: Boolean)
extends MetadataException(message, cause, enableSuppression, writableStackTrace) {
def this(e: Expression, message: String) {
this(e, message, null, false, false)
}
def this(e: Expression, message: String, cause: Throwable) {
this(e, message, cause, false, false)
}
def this(e: Expression, cause: Throwable) {
this(e, null, cause, false, false)
}
override def getMessage: String = {
val eString = e.toString
s"${super.getMessage}, expression:${if (eString contains "\n") "\n" else " "}$e"
}
}
class UnresolvedException(expr: Expression, function: String) extends
ExpressionException(expr, s"Unresolved $function")
def attachExpression[A](e: Expression, msg: String = "")(f: => A): A = {
try f catch {
case eex: ExpressionException => throw eex
case ex: Exception => throw new ExpressionException(e, msg, ex)
}
}
trait Expression {
self: Product =>
def children: Seq[Expression]
/**
* Returns `true` if the schema for this expression and all its children have been resolved.
* The default logic is that an Expression is resolve if all its children are resolved.
*/
lazy val resolved: Boolean = childrenResolved
/**
* Returns the output [[IDataType[_]] of this expression. Expressions that are unresolved will
* throw if this method is invoked.
*/
def dataType: IDataType[_]
/**
* Returns true if all the children have been resolved.
*/
def childrenResolved = !children.exists(!_.resolved)
/**
* the aliases that are present in this Expression Tree
*/
def namedExpressions: Map[String, Expression] = Map()
def fastEquals(other: Expression): Boolean = {
this.eq(other) || this == other
}
def makeCopy(newArgs: Array[AnyRef]): this.type = attachExpression(this, "makeCopy") {
try {
val defaultCtor = getClass.getConstructors.find(_.getParameterTypes.size != 0).head
defaultCtor.newInstance(newArgs: _*).asInstanceOf[this.type]
} catch {
case e: java.lang.IllegalArgumentException =>
throw new ExpressionException(
this, s"Failed to copy node. Reason: ${e.getMessage}.")
}
}
def transformChildrenDown(rule: PartialFunction[Expression, Expression]): this.type = {
var changed = false
val newArgs = productIterator.map {
case arg: Expression if children contains arg =>
val newChild = arg.asInstanceOf[Expression].transformDown(rule)
if (!(newChild fastEquals arg)) {
changed = true
newChild
} else {
arg
}
case Some(arg: Expression) if children contains arg =>
val newChild = arg.asInstanceOf[Expression].transformDown(rule)
if (!(newChild fastEquals arg)) {
changed = true
Some(newChild)
} else {
Some(arg)
}
case m: Map[_, _] => m
case args: Traversable[_] => args.map {
case arg: Expression if children contains arg =>
val newChild = arg.asInstanceOf[Expression].transformDown(rule)
if (!(newChild fastEquals arg)) {
changed = true
newChild
} else {
arg
}
case other => other
}
case nonChild: AnyRef => nonChild
case null => null
}.toArray
if (changed) makeCopy(newArgs) else this
}
def transformDown(rule: PartialFunction[Expression, Expression]): Expression = {
val afterRule = rule.applyOrElse(this, identity[Expression])
// Check if unchanged and then possibly return old copy to avoid gc churn.
if (this fastEquals afterRule) {
transformChildrenDown(rule)
} else {
afterRule.transformChildrenDown(rule)
}
}
def traverseChildren(traverseFunc: (Expression, PartialFunction[Expression, Unit]) => Unit)
(rule: PartialFunction[Expression, Unit]): Unit = {
productIterator.foreach {
case arg: Expression if children contains arg =>
traverseFunc(arg.asInstanceOf[Expression], rule)
case Some(arg: Expression) if children contains arg =>
traverseFunc(arg.asInstanceOf[Expression], rule)
case m: Map[_, _] => m
case args: Traversable[_] => args.map {
case arg: Expression if children contains arg =>
traverseFunc(arg.asInstanceOf[Expression], rule)
case other => other
}
case nonChild: AnyRef => nonChild
case null => null
}
}
def traverseChildrenDown = traverseChildren(_traverseDown) _
private def _traverseDown(e: Expression, rule: PartialFunction[Expression, Unit]): Unit = {
if (rule.isDefinedAt(e)) {
rule.apply(e)
}
e.traverseChildrenDown(rule)
}
def traverseDown(rule: PartialFunction[Expression, Unit]): Unit = {
_traverseDown(this, rule)
}
def traverseChildrenUp = traverseChildren(_traverseUp) _
private def _traverseUp(e: Expression, rule: PartialFunction[Expression, Unit]): Unit = {
e.traverseChildrenUp(rule)
if (rule.isDefinedAt(e)) {
rule.apply(e)
}
}
def traverseUp(rule: PartialFunction[Expression, Unit]): Unit = {
_traverseUp(this, rule)
}
def transformUp(rule: PartialFunction[Expression, Expression]): Expression = {
val afterRuleOnChildren = transformChildrenUp(rule);
if (this fastEquals afterRuleOnChildren) {
rule.applyOrElse(this, identity[Expression])
} else {
rule.applyOrElse(afterRuleOnChildren, identity[Expression])
}
}
def transformChildrenUp(rule: PartialFunction[Expression, Expression]): this.type = {
var changed = false
val newArgs = productIterator.map {
case arg: Expression if children contains arg =>
val newChild = arg.asInstanceOf[Expression].transformUp(rule)
if (!(newChild fastEquals arg)) {
changed = true
newChild
} else {
arg
}
case Some(arg: Expression) if children contains arg =>
val newChild = arg.asInstanceOf[Expression].transformUp(rule)
if (!(newChild fastEquals arg)) {
changed = true
Some(newChild)
} else {
Some(arg)
}
case m: Map[_, _] => m
case args: Traversable[_] => args.map {
case arg: Expression if children contains arg =>
val newChild = arg.asInstanceOf[Expression].transformUp(rule)
if (!(newChild fastEquals arg)) {
changed = true
newChild
} else {
arg
}
case other => other
}
case nonChild: AnyRef => nonChild
case null => null
}.toArray
if (changed) makeCopy(newArgs) else this
}
/*
* Fluent API methods
*/
def field(fieldName : String) = new UnresolvedFieldExpression(this, fieldName)
def as(alias : String) = new AliasExpression(this, alias)
def arith(op : String)(rightExpr : Expression) = new ArithmeticExpression(op, this, rightExpr)
def + = arith("+")_
def - = arith("-")_
def * = arith("*")_
def / = arith("/")_
def % = arith("%")_
def isTrait(name : String) = new isTraitUnaryExpression(name, this)
def hasField(name : String) = new hasFieldUnaryExpression(name, this)
def compareOp(op : String)(rightExpr : Expression) = new ComparisonExpression(op, this, rightExpr)
def `=` = compareOp("=")_
def `!=` = compareOp("!=")_
def `>` = compareOp(">")_
def `>=` = compareOp(">=")_
def `<` = compareOp("<")_
def `<=` = compareOp("=")_
def logicalOp(op : String)(rightExpr : Expression) = new LogicalExpression(op, List(this, rightExpr))
def and = logicalOp("and")_
def or = logicalOp("or")_
def where(condExpr: Expression) = new FilterExpression(this, condExpr)
def select(selectList: Expression*) = new SelectExpression(this, selectList.toList)
}
trait BinaryNode {
def left: Expression
def right: Expression
def children = Seq(left, right)
}
trait LeafNode {
def children = Nil
}
trait UnaryNode {
def child: Expression
def children = child :: Nil
}
abstract class BinaryExpression extends Expression with BinaryNode {
self: Product =>
def symbol: String
override def toString = s"($left $symbol $right)"
}
case class ClassExpression(clsName: String) extends Expression with LeafNode {
val dataType = typSystem.getDataType(classOf[ClassType], clsName)
override def toString = clsName
}
def _class(name: String): Expression = new ClassExpression(name)
case class TraitExpression(traitName: String) extends Expression with LeafNode {
val dataType = typSystem.getDataType(classOf[TraitType], traitName)
override def toString = traitName
}
def _trait(name: String) = new TraitExpression(name)
case class IdExpression(name: String) extends Expression with LeafNode {
override def toString = name
override lazy val resolved = false
override def dataType = throw new UnresolvedException(this, "id")
}
def id(name: String) = new IdExpression(name)
case class UnresolvedFieldExpression(child: Expression, fieldName: String) extends Expression
with UnaryNode {
override def toString = s"${child}.$fieldName"
override lazy val resolved = false
override def dataType = throw new UnresolvedException(this, "field")
}
case class FieldExpression(fieldName: String, fieldInfo: FieldInfo, child: Option[Expression])
extends Expression {
val children = if (child.isDefined) List(child.get) else Nil
lazy val dataType = fieldInfo.attrInfo.dataType()
override lazy val resolved: Boolean = true
override def toString = {
if (child.isDefined) {
val sep = if (dataType.isInstanceOf[ClassType]) " " else ","
s"${child.get}${sep}$fieldName"
} else {
fieldName
}
}
}
case class AliasExpression(child: Expression, alias: String) extends Expression with UnaryNode {
override def namedExpressions = child.namedExpressions + (alias -> child)
override def toString = s"$child as $alias"
lazy val dataType = {
if (!resolved) {
throw new UnresolvedException(this,
s"datatype. Can not resolve due to unresolved child")
}
child.dataType
}
}
case class BackReference(alias: String, reference: Expression, child: Option[Expression]) extends Expression {
val children = if (child.isDefined) List(child.get) else Nil
val dataType = reference.dataType
override def toString = if (child.isDefined) s"${child.get} $alias" else alias
}
case class Literal[T](dataType: PrimitiveType[T], rawValue: Any) extends Expression with LeafNode {
val value = if (rawValue == null) dataType.nullValue() else dataType.convert(rawValue, Multiplicity.REQUIRED)
override def toString = value match {
case s: String => s""""$s""""
case x => x.toString
}
}
def literal[T](typ: PrimitiveType[T], rawValue: Any) = new Literal[T](typ, rawValue)
def boolean(rawValue: Any) = literal(DataTypes.BOOLEAN_TYPE, rawValue)
def byte(rawValue: Any) = literal(DataTypes.BYTE_TYPE, rawValue)
def short(rawValue: Any) = literal(DataTypes.SHORT_TYPE, rawValue)
def int(rawValue: Any) = literal(DataTypes.INT_TYPE, rawValue)
def long(rawValue: Any) = literal(DataTypes.LONG_TYPE, rawValue)
def float(rawValue: Any) = literal(DataTypes.FLOAT_TYPE, rawValue)
def double(rawValue: Any) = literal(DataTypes.DOUBLE_TYPE, rawValue)
def bigint(rawValue: Any) = literal(DataTypes.BIGINTEGER_TYPE, rawValue)
def bigdecimal(rawValue: Any) = literal(DataTypes.BIGDECIMAL_TYPE, rawValue)
def string(rawValue: Any) = literal(DataTypes.STRING_TYPE, rawValue)
case class ArithmeticExpression(symbol: String,
left: Expression,
right: Expression)
extends BinaryExpression {
lazy val dataType = {
if (!resolved) {
throw new UnresolvedException(this,
s"datatype. Can not resolve due to unresolved children")
}
TypeUtils.combinedType(left.dataType, right.dataType)
}
}
case class isTraitLeafExpression(traitName: String, classExpression: Option[Expression] = None)
extends Expression with LeafNode {
// validate TraitName
try {
typSystem.getDataType(classOf[TraitType], traitName)
} catch {
case me : MetadataException => throw new ExpressionException(this, "not a TraitType", me)
}
override lazy val resolved = classExpression.isDefined
lazy val dataType = {
if (!resolved) {
throw new UnresolvedException(this,
s"cannot resolve isTrait application")
}
if ( !classExpression.get.dataType.isInstanceOf[ClassType]) {
throw new ExpressionException(this,
s"Cannot apply isTrait on ${classExpression.get.dataType.getName}, it is not a ClassType")
}
DataTypes.BOOLEAN_TYPE
}
override def toString = s"${classExpression.getOrElse("")} is $traitName"
}
def isTrait(name: String) = new isTraitLeafExpression(name)
case class isTraitUnaryExpression(traitName: String, child: Expression)
extends Expression with UnaryNode {
// validate TraitName
typSystem.getDataType(classOf[ClassType], traitName)
lazy val dataType = {
if (!resolved) {
throw new UnresolvedException(this,
s"datatype. Can not resolve due to unresolved child")
}
if ( !child.dataType.isInstanceOf[ClassType]) {
throw new ExpressionException(this,
s"Cannot apply isTrait on ${child.dataType.getName}, it is not a ClassType")
}
DataTypes.BOOLEAN_TYPE
}
override def toString = s"$child is $traitName"
}
case class hasFieldLeafExpression(fieldName: String, classExpression: Option[Expression] = None)
extends Expression with LeafNode {
override lazy val resolved = classExpression.isDefined
lazy val dataType = {
if (!resolved) {
throw new UnresolvedException(this,
s"Cannot apply hasField on ${classExpression.get.dataType.getName}, it is not a ClassType")
}
if (classExpression.isDefined && !TypeUtils.fieldMapping(classExpression.get.dataType).isDefined) {
throw new ExpressionException(this, s"Cannot apply hasField on ${classExpression.get.dataType.getName}")
}
DataTypes.BOOLEAN_TYPE
}
override def toString = s"${classExpression.getOrElse("")} has $fieldName"
}
def hasField(name: String) = new hasFieldLeafExpression(name)
case class hasFieldUnaryExpression(fieldName: String, child: Expression)
extends Expression with UnaryNode {
lazy val dataType = {
if (!resolved) {
throw new UnresolvedException(this,
s"datatype. Can not resolve due to unresolved child")
}
if (!TypeUtils.fieldMapping(child.dataType).isDefined) {
throw new MetadataException(s"Cannot apply hasField on ${child.dataType.getName}")
}
DataTypes.BOOLEAN_TYPE
}
override def toString = s"$child has $fieldName"
}
case class ComparisonExpression(symbol: String,
left: Expression,
right: Expression)
extends BinaryExpression {
lazy val dataType = {
if (!resolved) {
throw new UnresolvedException(this,
s"datatype. Can not resolve due to unresolved children")
}
if (left.dataType != DataTypes.STRING_TYPE || right.dataType != DataTypes.STRING_TYPE) {
TypeUtils.combinedType(left.dataType, right.dataType)
}
DataTypes.BOOLEAN_TYPE
}
}
case class LogicalExpression(symbol: String, children: List[Expression])
extends Expression {
assert(children.size > 0)
lazy val dataType = {
if (!resolved) {
throw new UnresolvedException(this,
s"datatype. Can not resolve due to unresolved children")
}
children.foreach { childExpr =>
if (childExpr.dataType != DataTypes.BOOLEAN_TYPE) {
throw new MetadataException(
s"Cannot apply logical operator '$symbol' on input of type '${childExpr.dataType}")
}
}
DataTypes.BOOLEAN_TYPE
}
override def toString = children.mkString("", " and ", "")
}
case class FilterExpression(val child: Expression, val condExpr: Expression) extends Expression {
val children = List(child, condExpr)
lazy val dataType = {
if (!resolved) {
throw new UnresolvedException(this,
s"datatype. Can not resolve due to unresolved children")
}
if (condExpr.dataType != DataTypes.BOOLEAN_TYPE) {
throw new ExpressionException(this, s"Filter condition '$condExpr' is not a boolean expression")
}
child.dataType
}
override def toString = s"$child where $condExpr"
}
val GEN_COL_ALIAS_PREFIX = "_col"
case class SelectExpression(child: Expression, selectList: List[Expression]) extends Expression {
val children = List(child) ::: selectList
lazy val selectListWithAlias = selectList.zipWithIndex map {
case (s: AliasExpression, _) => s
case (x, i) => new AliasExpression(x, s"${GEN_COL_ALIAS_PREFIX}_$i")
}
lazy val dataType = {
if (!resolved) {
throw new UnresolvedException(this,
s"datatype. Can not resolve due to unresolved children")
}
TypeUtils.createStructType(selectListWithAlias)
}
override def toString = s"""$child select ${selectListWithAlias.mkString("", ", ", "")}"""
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.query
import Expressions._
object QueryProcessor {
def validate(e : Expression) : Expression = {
val e1 = e.transformUp(new Resolver())
e1.traverseUp {
case x : Expression if !x.resolved =>
throw new ExpressionException(x, s"Failed to resolved expression $x")
}
/*
* trigger computation of dataType of expression tree
*/
e1.dataType
e1
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.query
import Expressions._
class Resolver(srcExpr : Option[Expression] = None, aliases : Map[String, Expression] = Map())
extends PartialFunction[Expression, Expression] {
import TypeUtils._
def isDefinedAt(x: Expression) = true
def apply(e : Expression) : Expression = e match {
case idE@IdExpression(name) => {
val cType = resolveAsClassType(name)
if (cType.isDefined) {
return new ClassExpression(name)
}
val tType = resolveAsTraitType(name)
if (tType.isDefined) {
return new TraitExpression(name)
}
if (srcExpr.isDefined ) {
val fInfo = resolveReference(srcExpr.get.dataType, name)
if ( fInfo.isDefined) {
return new FieldExpression(name, fInfo.get, None)
}
}
val backExpr = aliases.get(name)
if ( backExpr.isDefined) {
return new BackReference(name, backExpr.get, None)
}
idE
}
case f@UnresolvedFieldExpression(child, fieldName) if child.resolved => {
var fInfo : Option[FieldInfo] = None
fInfo = resolveReference(child.dataType, fieldName)
if ( fInfo.isDefined) {
return new FieldExpression(fieldName, fInfo.get, Some(child))
}
f
}
case isTraitLeafExpression(traitName, classExpression)
if srcExpr.isDefined && !classExpression.isDefined =>
isTraitLeafExpression(traitName, srcExpr)
case hasFieldLeafExpression(traitName, classExpression)
if srcExpr.isDefined && !classExpression.isDefined =>
hasFieldLeafExpression(traitName, srcExpr)
case f@FilterExpression(inputExpr, condExpr) if inputExpr.resolved => {
val r = new Resolver(Some(inputExpr), inputExpr.namedExpressions)
return new FilterExpression(inputExpr, condExpr.transformUp(r))
}
case SelectExpression(child, selectList) if child.resolved => {
val r = new Resolver(Some(child), child.namedExpressions)
return new SelectExpression(child, selectList.map{_.transformUp(r)})
}
case x => x
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.query
import java.util.concurrent.atomic.AtomicInteger
import org.apache.hadoop.metadata.MetadataException
import org.apache.hadoop.metadata.types.DataTypes.PrimitiveType
import org.apache.hadoop.metadata.types._
object TypeUtils {
val typSystem = TypeSystem.getInstance()
def numericTypes : Seq[PrimitiveType[_]] = Seq(DataTypes.BYTE_TYPE,
DataTypes.SHORT_TYPE,
DataTypes.INT_TYPE,
DataTypes.FLOAT_TYPE,
DataTypes.LONG_TYPE,
DataTypes.DOUBLE_TYPE,
DataTypes.BIGINTEGER_TYPE,
DataTypes.BIGDECIMAL_TYPE)
def combinedType(typ1 : IDataType[_], typ2 : IDataType[_]) : PrimitiveType[_] = {
val typ1Idx = if (numericTypes.contains(typ1)) Some(numericTypes.indexOf(typ1)) else None
val typ2Idx = if (numericTypes.contains(typ2)) Some(numericTypes.indexOf(typ2)) else None
if ( typ1Idx.isDefined && typ2Idx.isDefined ) {
val rIdx = math.max(typ1Idx.get, typ2Idx.get)
if ( (typ1 == DataTypes.FLOAT_TYPE && typ2 == DataTypes.LONG_TYPE) ||
(typ1 == DataTypes.LONG_TYPE && typ2 == DataTypes.FLOAT_TYPE) ) {
return DataTypes.DOUBLE_TYPE
}
return numericTypes(rIdx)
}
throw new MetadataException(s"Cannot combine types: ${typ1.getName} and ${typ2.getName}")
}
var tempStructCounter : AtomicInteger = new AtomicInteger(0)
val TEMP_STRUCT_NAME_PREFIX = "__tempQueryResultStruct"
def createStructType(selectExprs : List[Expressions.AliasExpression]) : StructType = {
val aDefs = new Array[AttributeDefinition](selectExprs.size)
selectExprs.zipWithIndex.foreach { t =>
val (e,i) = t
aDefs(i) = new AttributeDefinition(e.alias,e.dataType.getName, Multiplicity.OPTIONAL, false, null)
}
return typSystem.defineQueryResultType(s"${TEMP_STRUCT_NAME_PREFIX}${tempStructCounter.getAndIncrement}",
aDefs:_*);
}
def fieldMapping(iDataType: IDataType[_]) : Option[FieldMapping] = iDataType match {
case c : ClassType => Some(c.fieldMapping())
case t : TraitType => Some(t.fieldMapping())
case s : StructType => Some(s.fieldMapping())
case _ => None
}
import scala.language.existentials
case class FieldInfo(dataType : IDataType[_], attrInfo : AttributeInfo, reverseDataType : IDataType[_] = null) {
def isReverse = reverseDataType != null
}
/**
* Given a ComposedType `t` and a name resolve using the following rules:
* - if `id` is a field in `t` resolve to the field
* - if `id` is the name of a Struct|Class|Trait Type and it has a field that is of type `t` then return that type
*
* For e.g.
* 1. if we have types Table(name : String, cols : List[Column]), Column(name : String) then
* `resolveReference(Table, "cols")` resolves to type Column. So a query can be "Table.cols"
* 2. But if we have Table(name : String), Column(name : String, tbl : Table) then "Table.Column" will resolve
* to type Column
*
* This way the language will support navigation even if the relationship is one-sided.
*
* @param typ
* @param id
* @return
*/
def resolveReference(typ : IDataType[_], id : String) : Option[FieldInfo] = {
val fMap = fieldMapping(typ)
if ( fMap.isDefined) {
if (fMap.get.fields.containsKey(id)) {
return Some(FieldInfo(typ,fMap.get.fields.get(id)))
}
try {
val idTyp = typSystem.getDataType(classOf[IDataType[_]], id)
val idTypFMap = fieldMapping(idTyp)
if (idTypFMap.isDefined) {
import scala.collection.JavaConversions._
val fields: Seq[AttributeInfo] = idTypFMap.get.fields.values().filter { aInfo =>
aInfo.dataType() == typ
}.toSeq
if (fields.size == 1) {
return Some(FieldInfo(typ, fields(0), idTyp))
}
}
} catch {
case _ : MetadataException => None
}
}
None
}
def resolveAsClassType(id : String) : Option[ClassType] = {
try {
Some(typSystem.getDataType(classOf[ClassType], id))
} catch {
case _ : MetadataException => None
}
}
def resolveAsTraitType(id : String) : Option[TraitType] = {
try {
Some(typSystem.getDataType(classOf[TraitType], id))
} catch {
case _ : MetadataException => None
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.query
import com.google.common.collect.ImmutableList
import org.apache.hadoop.metadata.BaseTest
import org.apache.hadoop.metadata.types._
import org.junit.{Before, Test}
import Expressions._
class ExpressionTest extends BaseTest {
@Before
override def setup {
super.setup
def attrDef(name : String, dT : IDataType[_],
m : Multiplicity = Multiplicity.OPTIONAL,
isComposite: Boolean = false,
reverseAttributeName: String = null) = {
require(name != null)
require(dT != null)
new AttributeDefinition(name, dT.getName, m, isComposite, reverseAttributeName)
}
def dbClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "DB", null,
Array(
attrDef("name", DataTypes.STRING_TYPE),
attrDef("owner", DataTypes.STRING_TYPE)
))
def storageDescClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "StorageDesc", null,
Array(
attrDef("inputFormat", DataTypes.STRING_TYPE),
attrDef("outputFormat", DataTypes.STRING_TYPE)
))
def columnClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "Column", null,
Array(
attrDef("name", DataTypes.STRING_TYPE),
attrDef("dataType", DataTypes.STRING_TYPE),
new AttributeDefinition("sd", "StorageDesc", Multiplicity.REQUIRED, false, null)
))
def tblClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "Table", null,
Array(
attrDef("name", DataTypes.STRING_TYPE),
new AttributeDefinition("db", "DB", Multiplicity.REQUIRED, false, null),
new AttributeDefinition("sd", "StorageDesc", Multiplicity.REQUIRED, false, null)
))
def loadProcessClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "LoadProcess", null,
Array(
attrDef("name", DataTypes.STRING_TYPE),
new AttributeDefinition("inputTables", "Table", Multiplicity.COLLECTION, false, null),
new AttributeDefinition("outputTable", "Table", Multiplicity.REQUIRED, false, null)
))
def viewClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "View", null,
Array(
attrDef("name", DataTypes.STRING_TYPE),
new AttributeDefinition("inputTables", "Table", Multiplicity.COLLECTION, false, null)
))
def dimTraitDef = new HierarchicalTypeDefinition[TraitType](classOf[TraitType], "Dimension", null,
Array[AttributeDefinition]())
def piiTraitDef = new HierarchicalTypeDefinition[TraitType](classOf[TraitType], "PII", null,
Array[AttributeDefinition]())
def metricTraitDef = new HierarchicalTypeDefinition[TraitType](classOf[TraitType], "Metric", null,
Array[AttributeDefinition]())
def etlTraitDef = new HierarchicalTypeDefinition[TraitType](classOf[TraitType], "ETL", null,
Array[AttributeDefinition]())
def jdbcTraitDef = new HierarchicalTypeDefinition[TraitType](classOf[TraitType], "Jdbc", null,
Array[AttributeDefinition]())
getTypeSystem.defineTypes(ImmutableList.of[StructTypeDefinition],
ImmutableList.of[HierarchicalTypeDefinition[TraitType]](dimTraitDef, piiTraitDef,
metricTraitDef, etlTraitDef, jdbcTraitDef),
ImmutableList.of[HierarchicalTypeDefinition[ClassType]](dbClsDef, storageDescClsDef, columnClsDef, tblClsDef,
loadProcessClsDef, viewClsDef))
}
@Test def testClass: Unit = {
val e = QueryProcessor.validate(_class("DB"))
println(e)
}
@Test def testFilter: Unit = {
val e = QueryProcessor.validate(_class("DB").where(id("name").`=`(string("Reporting"))))
println(e)
}
@Test def testSelect: Unit = {
val e = QueryProcessor.validate(_class("DB").where(id("name").`=`(string("Reporting"))).
select(id("name"), id("owner")))
println(e)
}
@Test def testNegTypeTest: Unit = {
try {
val e = QueryProcessor.validate(_class("DB").where(id("name")))
println(e)
} catch {
case e : ExpressionException if e.getMessage.endsWith("expression: DB where name") => ()
}
}
@Test def testIsTrait: Unit = {
val e = QueryProcessor.validate(_class("DB").where(isTrait("Jdbc")))
println(e)
}
@Test def testIsTraitNegative: Unit = {
try {
val e = QueryProcessor.validate(_class("DB").where(isTrait("Jdb")))
println(e)
} catch {
case e : ExpressionException if e.getMessage.endsWith("not a TraitType, expression: is Jdb") => ()
}
}
@Test def testhasField: Unit = {
val e = QueryProcessor.validate(_class("DB").where(hasField("name")))
println(e)
}
@Test def testHasFieldNegative: Unit = {
try {
val e = QueryProcessor.validate(_class("DB").where(hasField("nam")))
println(e)
} catch {
case e : ExpressionException if e.getMessage.endsWith("not a TraitType, expression: is Jdb") => ()
}
}
@Test def testFieldReference: Unit = {
val e = QueryProcessor.validate(_class("DB").field("Table"))
println(e)
}
@Test def testBackReference: Unit = {
val e = QueryProcessor.validate(
_class("DB").as("db").field("Table")).where(id("db").field("name").`=`(string("Reporting")))
println(e)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment