Commit a8ef4901 by Harish Butani

evaluator returns ITypedInstances: add support for resultsets whose type is TypeCategory.CLASS

parent 39c84bd6
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.query
import com.thinkaurelius.titan.core.TitanVertex
import com.tinkerpop.blueprints.Direction
import org.apache.hadoop.metadata.types.DataTypes._
import org.apache.hadoop.metadata.{ITypedInstance, ITypedReferenceableInstance}
import org.apache.hadoop.metadata.query.Expressions.{ExpressionException, ComparisonExpression}
import org.apache.hadoop.metadata.query.TypeUtils.FieldInfo
import org.apache.hadoop.metadata.storage.Id
import org.apache.hadoop.metadata.types._
/**
* Represents the Bridge between the QueryProcessor and the Graph Persistence scheme used.
* Some of the behaviors captured are:
* - how is type and id information stored in the Vertex that represents an [[ITypedReferenceableInstance]]
* - how are edges representing trait and attribute relationships labelled.
* - how are attribute names mapped to Property Keys in Vertices.
*
* This is a work in progress.
*/
trait GraphPersistenceStrategies {
/**
* Name of attribute used to store typeName in vertex
*/
def typeAttributeName : String
/**
* Given a dataType and a reference attribute, how is edge labeled
*/
def edgeLabel(iDataType: IDataType[_], aInfo : AttributeInfo) : String
def traitLabel(cls : IDataType[_], traitName : String) : String
/**
* The propertyKey used to store the attribute in a Graph Vertex.
* @param dataType
* @param aInfo
* @return
*/
def fieldNameInVertex(dataType : IDataType[_], aInfo : AttributeInfo) : String
/**
* from a vertex for an [[ITypedReferenceableInstance]] get the traits that it has.
* @param v
* @return
*/
def traitNames(v : TitanVertex) : Seq[String]
def edgeLabel(fInfo : FieldInfo) : String = fInfo match {
case FieldInfo(dataType, aInfo, null) => edgeLabel(dataType, aInfo)
case FieldInfo(dataType, aInfo, reverseDataType) => edgeLabel(reverseDataType, aInfo)
}
def fieldPrefixInSelect : String
/**
* extract the Id from a Vertex.
* @param dataTypeNm the dataType of the instance that the given vertex represents
* @param v
* @return
*/
def getIdFromVertex(dataTypeNm : String, v : TitanVertex) : Id
/**
* construct a [[ITypedReferenceableInstance]] from its vertex
*
* @param dataType
* @param v
* @return
*/
def constructClassInstance(dataType : ClassType, v : TitanVertex) : ITypedReferenceableInstance
def gremlinCompOp(op : ComparisonExpression) = op.symbol match {
case "=" => "T.eq"
case "!=" => "T.neq"
case ">" => "T.gt"
case ">=" => "T.gte"
case "<" => "T.lt"
case "<=" => "T.lte"
case _ => throw new ExpressionException(op, "Comparison operator not supported in Gremlin")
}
}
object GraphPersistenceStrategy1 extends GraphPersistenceStrategies {
val typeAttributeName = "typeName"
def edgeLabel(dataType: IDataType[_], aInfo : AttributeInfo) = s"${dataType.getName}.${aInfo.name}"
val fieldPrefixInSelect = "it"
def traitLabel(cls : IDataType[_], traitName : String) = s"${cls.getName}.$traitName"
def fieldNameInVertex(dataType : IDataType[_], aInfo : AttributeInfo) = aInfo.name
def getIdFromVertex(dataTypeNm : String, v : TitanVertex) : Id =
new Id(v.getId.toString, 0, dataTypeNm)
def traitNames(v : TitanVertex) : Seq[String] = {
val s = v.getProperty[String]("traitNames")
if ( s != null ) {
Seq[String](s.split(","):_*)
} else {
Seq()
}
}
def loadStructInstance(dataType : IConstructableType[_,_ <: ITypedInstance],
typInstance : ITypedInstance, v : TitanVertex) : Unit = {
import scala.collection.JavaConversions._
dataType.fieldMapping().fields.foreach { t =>
val fName = t._1
val aInfo = t._2
loadAttribute(dataType, aInfo, typInstance, v)
}
}
def constructClassInstance(dataType : ClassType, v : TitanVertex) : ITypedReferenceableInstance = {
val id = getIdFromVertex(dataType.name, v)
val tNms = traitNames(v)
val cInstance = dataType.createInstance(id, tNms:_*)
// load traits
tNms.foreach { tNm =>
val tLabel = traitLabel(dataType, tNm)
val edges = v.getEdges(Direction.OUT, tLabel)
val tVertex = edges.iterator().next().getVertex(Direction.IN).asInstanceOf[TitanVertex]
val tType = TypeSystem.getInstance().getDataType[TraitType](classOf[TraitType], tNm)
val tInstance = cInstance.getTrait(tNm).asInstanceOf[ITypedInstance]
loadStructInstance(tType, tInstance, tVertex)
}
loadStructInstance(dataType, cInstance, v)
cInstance
}
def loadAttribute(dataType : IDataType[_], aInfo : AttributeInfo, i : ITypedInstance, v : TitanVertex) : Unit = {
aInfo.dataType.getTypeCategory match {
case DataTypes.TypeCategory.PRIMITIVE => loadPrimitiveAttribute(dataType, aInfo, i, v)
case DataTypes.TypeCategory.ENUM => loadEnumAttribute(dataType, aInfo, i, v)
case DataTypes.TypeCategory.ARRAY =>
throw new UnsupportedOperationException(s"load for ${aInfo.dataType()} not supported")
case DataTypes.TypeCategory.MAP =>
throw new UnsupportedOperationException(s"load for ${aInfo.dataType()} not supported")
case DataTypes.TypeCategory.STRUCT => loadStructAttribute(dataType, aInfo, i, v)
case DataTypes.TypeCategory.TRAIT =>
throw new UnsupportedOperationException(s"load for ${aInfo.dataType()} not supported")
case DataTypes.TypeCategory.CLASS => loadStructAttribute(dataType, aInfo, i, v)
}
}
private def loadEnumAttribute(dataType : IDataType[_], aInfo : AttributeInfo, i : ITypedInstance, v : TitanVertex)
: Unit = {
val fName = fieldNameInVertex(dataType, aInfo)
i.setInt(aInfo.name, v.getProperty[java.lang.Integer](fName))
}
private def loadPrimitiveAttribute(dataType : IDataType[_], aInfo : AttributeInfo,
i : ITypedInstance, v : TitanVertex) : Unit = {
val fName = fieldNameInVertex(dataType, aInfo)
aInfo.dataType() match {
case x : BooleanType => i.setBoolean(aInfo.name, v.getProperty[java.lang.Boolean](fName))
case x : ByteType => i.setByte(aInfo.name, v.getProperty[java.lang.Byte](fName))
case x : ShortType => i.setShort(aInfo.name, v.getProperty[java.lang.Short](fName))
case x : IntType => i.setInt(aInfo.name, v.getProperty[java.lang.Integer](fName))
case x : LongType => i.setLong(aInfo.name, v.getProperty[java.lang.Long](fName))
case x : FloatType => i.setFloat(aInfo.name, v.getProperty[java.lang.Float](fName))
case x : DoubleType => i.setDouble(aInfo.name, v.getProperty[java.lang.Double](fName))
case x : StringType => i.setString(aInfo.name, v.getProperty[java.lang.String](fName))
case _ => throw new UnsupportedOperationException(s"load for ${aInfo.dataType()} not supported")
}
}
private def loadStructAttribute(dataType : IDataType[_], aInfo : AttributeInfo,
i : ITypedInstance, v : TitanVertex) : Unit = {
val eLabel = edgeLabel(FieldInfo(dataType, aInfo, null))
val edges = v.getEdges(Direction.OUT, eLabel)
val sVertex = edges.iterator().next().getVertex(Direction.IN).asInstanceOf[TitanVertex]
if ( aInfo.dataType().getTypeCategory == DataTypes.TypeCategory.STRUCT ) {
val sType = aInfo.dataType().asInstanceOf[StructType]
val sInstance = sType.createInstance()
loadStructInstance(sType, sInstance, sVertex)
i.set(aInfo.name, sInstance)
} else {
val cInstance = constructClassInstance(aInfo.dataType().asInstanceOf[ClassType], sVertex)
i.set(aInfo.name, cInstance)
}
}
}
...@@ -20,14 +20,36 @@ package org.apache.hadoop.metadata.query ...@@ -20,14 +20,36 @@ package org.apache.hadoop.metadata.query
import javax.script.{Bindings, ScriptEngine, ScriptEngineManager} import javax.script.{Bindings, ScriptEngine, ScriptEngineManager}
import com.thinkaurelius.titan.core.TitanGraph import com.thinkaurelius.titan.core.{TitanVertex, TitanGraph}
import org.apache.hadoop.metadata.ITypedInstance
import org.apache.hadoop.metadata.types.{ClassType, IConstructableType}
import scala.language.existentials
class GremlinEvaluator(qry : GremlinQuery, g: TitanGraph) { case class GremlinQueryResult(qry : GremlinQuery,
resultDataType : IConstructableType[_, _ <: ITypedInstance],
rows : List[ITypedInstance])
class GremlinEvaluator(qry : GremlinQuery, persistenceStrategy : GraphPersistenceStrategies, g: TitanGraph) {
val manager: ScriptEngineManager = new ScriptEngineManager val manager: ScriptEngineManager = new ScriptEngineManager
val engine: ScriptEngine = manager.getEngineByName("gremlin-groovy") val engine: ScriptEngine = manager.getEngineByName("gremlin-groovy")
val bindings: Bindings = engine.createBindings val bindings: Bindings = engine.createBindings
bindings.put("g", g) bindings.put("g", g)
def evaluate() : AnyRef = engine.eval(qry.queryStr, bindings) def evaluate() : GremlinQueryResult = {
import scala.collection.JavaConversions._
val rType = qry.expr.dataType
val rawRes = engine.eval(qry.queryStr, bindings)
if ( rType.isInstanceOf[ClassType]) {
val dType = rType.asInstanceOf[ClassType]
val rows = rawRes.asInstanceOf[java.util.List[TitanVertex]].map { v =>
persistenceStrategy.constructClassInstance(dType, v)
}
GremlinQueryResult(qry, dType, rows.toList)
} else {
null
}
}
} }
...@@ -18,59 +18,19 @@ ...@@ -18,59 +18,19 @@
package org.apache.hadoop.metadata.query package org.apache.hadoop.metadata.query
import Expressions._ import org.apache.hadoop.metadata.query.Expressions._
import org.apache.hadoop.metadata.query.TypeUtils.FieldInfo
import org.apache.hadoop.metadata.types.DataTypes.TypeCategory import org.apache.hadoop.metadata.types.DataTypes.TypeCategory
import org.apache.hadoop.metadata.types.{AttributeInfo, IDataType}
import scala.collection.immutable.TreeMap
import scala.collection.mutable import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
case class GremlinQuery(expr : Expression, queryStr : String) { case class GremlinQuery(expr: Expression, queryStr: String) {
} }
trait GraphPersistenceStrategies {
/**
* Name of attribute used to store typeName in vertex
*/
def typeAttributeName : String
/**
* Given a dataType and a reference attribute, how is edge labeled
*/
def edgeLabel(iDataType: IDataType[_], aInfo : AttributeInfo) : String
def traitLabel(cls : IDataType[_], traitName : String) : String
def edgeLabel(fInfo : FieldInfo) : String = fInfo match {
case FieldInfo(dataType, aInfo, null) => edgeLabel(dataType, aInfo)
case FieldInfo(dataType, aInfo, reverseDataType) => edgeLabel(reverseDataType, aInfo)
}
def fieldPrefixInSelect : String
def gremlinCompOp(op : ComparisonExpression) = op.symbol match {
case "=" => "T.eq"
case "!=" => "T.neq"
case ">" => "T.gt"
case ">=" => "T.gte"
case "<" => "T.lt"
case "<=" => "T.lte"
case _ => throw new ExpressionException(op, "Comparison operator not supported in Gremlin")
}
}
object GraphPersistenceStrategy1 extends GraphPersistenceStrategies {
val typeAttributeName = "typeName"
def edgeLabel(dataType: IDataType[_], aInfo : AttributeInfo) = s"${dataType.getName}.${aInfo.name}"
val fieldPrefixInSelect = "it"
def traitLabel(cls : IDataType[_], traitName : String) = s"${cls.getName}.$traitName"
}
trait SelectExpressionHandling { trait SelectExpressionHandling {
/** /**
* To aide in gremlinQuery generation add an alias to the input of SelectExpressions * To aide in gremlinQuery generation add an alias to the input of SelectExpressions
*/ */
...@@ -78,21 +38,21 @@ trait SelectExpressionHandling { ...@@ -78,21 +38,21 @@ trait SelectExpressionHandling {
private var idx = 0 private var idx = 0
def isDefinedAt(e : Expression) = true def isDefinedAt(e: Expression) = true
class DecorateFieldWithAlias(aliasE : AliasExpression) class DecorateFieldWithAlias(aliasE: AliasExpression)
extends PartialFunction[Expression, Expression] { extends PartialFunction[Expression, Expression] {
def isDefinedAt(e : Expression) = true def isDefinedAt(e: Expression) = true
def apply(e : Expression) = e match { def apply(e: Expression) = e match {
case fe@FieldExpression(fieldName, fInfo, None) => case fe@FieldExpression(fieldName, fInfo, None) =>
FieldExpression(fieldName, fInfo, Some(BackReference(aliasE.alias, aliasE.child, None))) FieldExpression(fieldName, fInfo, Some(BackReference(aliasE.alias, aliasE.child, None)))
case _ => e case _ => e
} }
} }
def apply(e : Expression) = e match { def apply(e: Expression) = e match {
case SelectExpression(_:AliasExpression, _) => e case SelectExpression(_: AliasExpression, _) => e
case SelectExpression(child, selList) => { case SelectExpression(child, selList) => {
idx = idx + 1 idx = idx + 1
val aliasE = AliasExpression(child, s"_src$idx") val aliasE = AliasExpression(child, s"_src$idx")
...@@ -102,7 +62,7 @@ trait SelectExpressionHandling { ...@@ -102,7 +62,7 @@ trait SelectExpressionHandling {
} }
} }
def getSelectExpressionSrc(e : Expression) : List[String] = { def getSelectExpressionSrc(e: Expression): List[String] = {
val l = ArrayBuffer[String]() val l = ArrayBuffer[String]()
e.traverseUp { e.traverseUp {
case BackReference(alias, _, _) => l += alias case BackReference(alias, _, _) => l += alias
...@@ -110,22 +70,22 @@ trait SelectExpressionHandling { ...@@ -110,22 +70,22 @@ trait SelectExpressionHandling {
l.toSet.toList l.toSet.toList
} }
def validateSelectExprHaveOneSrc : PartialFunction[Expression, Unit] = { def validateSelectExprHaveOneSrc: PartialFunction[Expression, Unit] = {
case SelectExpression(_, selList) => { case SelectExpression(_, selList) => {
selList.foreach { se => selList.foreach { se =>
val srcs = getSelectExpressionSrc(se) val srcs = getSelectExpressionSrc(se)
if ( srcs.size > 1 ) { if (srcs.size > 1) {
throw new GremlinTranslationException(se, "Only one src allowed in a Select Expression") throw new GremlinTranslationException(se, "Only one src allowed in a Select Expression")
} }
} }
} }
} }
def groupSelectExpressionsBySrc(sel :SelectExpression) : mutable.LinkedHashMap[String, List[Expression]] = { def groupSelectExpressionsBySrc(sel: SelectExpression): mutable.LinkedHashMap[String, List[Expression]] = {
val m = mutable.LinkedHashMap[String, List[Expression]]() val m = mutable.LinkedHashMap[String, List[Expression]]()
sel.selectListWithAlias.foreach {se => sel.selectListWithAlias.foreach { se =>
val l = getSelectExpressionSrc(se.child) val l = getSelectExpressionSrc(se.child)
if ( !m.contains(l(0)) ) { if (!m.contains(l(0))) {
m(l(0)) = List() m(l(0)) = List()
} }
m(l(0)) = m(l(0)) :+ se.child m(l(0)) = m(l(0)) :+ se.child
...@@ -136,18 +96,18 @@ trait SelectExpressionHandling { ...@@ -136,18 +96,18 @@ trait SelectExpressionHandling {
} }
class GremlinTranslationException(expr: Expression, reason: String) extends class GremlinTranslationException(expr: Expression, reason: String) extends
ExpressionException(expr, s"Unsupported Gremlin translation: $reason") ExpressionException(expr, s"Unsupported Gremlin translation: $reason")
class GremlinTranslator(expr : Expression, class GremlinTranslator(expr: Expression,
gPersistenceBehavior : GraphPersistenceStrategies = GraphPersistenceStrategy1) gPersistenceBehavior: GraphPersistenceStrategies)
extends SelectExpressionHandling { extends SelectExpressionHandling {
val wrapAndRule : PartialFunction[Expression, Expression] = { val wrapAndRule: PartialFunction[Expression, Expression] = {
case f : FilterExpression if !f.condExpr.isInstanceOf[LogicalExpression] => case f: FilterExpression if !f.condExpr.isInstanceOf[LogicalExpression] =>
FilterExpression(f.child, new LogicalExpression("and", List(f.condExpr))) FilterExpression(f.child, new LogicalExpression("and", List(f.condExpr)))
} }
val validateComparisonForm : PartialFunction[Expression, Unit]= { val validateComparisonForm: PartialFunction[Expression, Unit] = {
case c@ComparisonExpression(_, left, right) => case c@ComparisonExpression(_, left, right) =>
if (!left.isInstanceOf[FieldExpression]) { if (!left.isInstanceOf[FieldExpression]) {
throw new GremlinTranslationException(c, s"lhs of comparison is not a field") throw new GremlinTranslationException(c, s"lhs of comparison is not a field")
...@@ -159,7 +119,7 @@ extends SelectExpressionHandling { ...@@ -159,7 +119,7 @@ extends SelectExpressionHandling {
() ()
} }
private def genQuery(expr : Expression, inSelect : Boolean) : String = expr match { private def genQuery(expr: Expression, inSelect: Boolean): String = expr match {
case ClassExpression(clsName) => s"""has("${gPersistenceBehavior.typeAttributeName}","$clsName")""" case ClassExpression(clsName) => s"""has("${gPersistenceBehavior.typeAttributeName}","$clsName")"""
case TraitExpression(clsName) => s"""has("${gPersistenceBehavior.typeAttributeName}","$clsName")""" case TraitExpression(clsName) => s"""has("${gPersistenceBehavior.typeAttributeName}","$clsName")"""
case fe@FieldExpression(fieldName, fInfo, child) if fe.dataType.getTypeCategory == TypeCategory.PRIMITIVE => { case fe@FieldExpression(fieldName, fInfo, child) if fe.dataType.getTypeCategory == TypeCategory.PRIMITIVE => {
...@@ -193,10 +153,10 @@ extends SelectExpressionHandling { ...@@ -193,10 +153,10 @@ extends SelectExpressionHandling {
} }
case sel@SelectExpression(child, selList) => { case sel@SelectExpression(child, selList) => {
val m = groupSelectExpressionsBySrc(sel) val m = groupSelectExpressionsBySrc(sel)
var srcNamesList : List[String] = List() var srcNamesList: List[String] = List()
var srcExprsList : List[List[String]] = List() var srcExprsList: List[List[String]] = List()
val it = m.iterator val it = m.iterator
while(it.hasNext) { while (it.hasNext) {
val (src, selExprs) = it.next val (src, selExprs) = it.next
srcNamesList = srcNamesList :+ s""""$src"""" srcNamesList = srcNamesList :+ s""""$src""""
srcExprsList = srcExprsList :+ selExprs.map { selExpr => srcExprsList = srcExprsList :+ selExprs.map { selExpr =>
...@@ -204,7 +164,9 @@ extends SelectExpressionHandling { ...@@ -204,7 +164,9 @@ extends SelectExpressionHandling {
} }
} }
val srcNamesString = srcNamesList.mkString("[", ",", "]") val srcNamesString = srcNamesList.mkString("[", ",", "]")
val srcExprsStringList = srcExprsList.map{_.mkString("[", ",", "]")} val srcExprsStringList = srcExprsList.map {
_.mkString("[", ",", "]")
}
val srcExprsString = srcExprsStringList.foldLeft("")(_ + "{" + _ + "}") val srcExprsString = srcExprsStringList.foldLeft("")(_ + "{" + _ + "}")
s"${genQuery(child, inSelect)}.select($srcNamesString)$srcExprsString" s"${genQuery(child, inSelect)}.select($srcNamesString)$srcExprsString"
} }
...@@ -219,12 +181,12 @@ extends SelectExpressionHandling { ...@@ -219,12 +181,12 @@ extends SelectExpressionHandling {
s"""has("$fieldName")""" s"""has("$fieldName")"""
case hasFieldUnaryExpression(fieldName, child) => case hasFieldUnaryExpression(fieldName, child) =>
s"""${genQuery(child, inSelect)}.has("$fieldName")""" s"""${genQuery(child, inSelect)}.has("$fieldName")"""
case ArithmeticExpression(symb,left,right) => s"${genQuery(left,inSelect)} $symb ${genQuery(right, inSelect)}" case ArithmeticExpression(symb, left, right) => s"${genQuery(left, inSelect)} $symb ${genQuery(right, inSelect)}"
case l : Literal[_] => l.toString case l: Literal[_] => l.toString
case x => throw new GremlinTranslationException(x, "expression not yet supported") case x => throw new GremlinTranslationException(x, "expression not yet supported")
} }
def translate() : GremlinQuery = { def translate(): GremlinQuery = {
var e1 = expr.transformUp(wrapAndRule) var e1 = expr.transformUp(wrapAndRule)
e1.traverseUp(validateComparisonForm) e1.traverseUp(validateComparisonForm)
...@@ -234,7 +196,7 @@ extends SelectExpressionHandling { ...@@ -234,7 +196,7 @@ extends SelectExpressionHandling {
e1 match { e1 match {
case e1: SelectExpression => GremlinQuery(e1, s"g.V.${genQuery(e1, false)}.toList()") case e1: SelectExpression => GremlinQuery(e1, s"g.V.${genQuery(e1, false)}.toList()")
case e1 => GremlinQuery(e1, s"g.V.${genQuery(e1, false)}.map().toList()") case e1 => GremlinQuery(e1, s"g.V.${genQuery(e1, false)}.toList()")
} }
} }
......
...@@ -23,11 +23,16 @@ import com.thinkaurelius.titan.core.TitanGraph ...@@ -23,11 +23,16 @@ import com.thinkaurelius.titan.core.TitanGraph
object QueryProcessor { object QueryProcessor {
def evaluate(e : Expression, g : TitanGraph) : AnyRef = { def evaluate(e : Expression, g : TitanGraph) : GremlinQueryResult = {
val gP = GraphPersistenceStrategy1
val e1 = validate(e) val e1 = validate(e)
val q = new GremlinTranslator(e1).translate() val q = new GremlinTranslator(e1, gP).translate()
println(q.queryStr) // println("---------------------")
new GremlinEvaluator(q, g).evaluate() // println("Query: " + e1)
// println("Expression Tree:\n" + e1.treeString)
// println("Gremlin Query: " + q.queryStr)
// println("---------------------")
new GremlinEvaluator(q, gP, g).evaluate()
} }
def validate(e : Expression) : Expression = { def validate(e : Expression) : Expression = {
......
...@@ -46,6 +46,16 @@ object HiveTitanSample { ...@@ -46,6 +46,16 @@ object HiveTitanSample {
} }
} }
this.getClass.getDeclaredFields filter (_.getName == "traits") foreach { f =>
f.setAccessible(true)
var traits = f.get(this).asInstanceOf[Option[List[Trait]]]
if ( traits.isDefined ) {
val fV = traits.get.map(_.getClass.getSimpleName).mkString(",")
sb.append(s""", "traitNames" : "$fV"""")
}
}
sb.append("}") sb.append("}")
vertices += sb.toString() vertices += sb.toString()
} }
...@@ -84,11 +94,11 @@ object HiveTitanSample { ...@@ -84,11 +94,11 @@ object HiveTitanSample {
case class StorageDescriptor(inputFormat : String, outputFormat : String, case class StorageDescriptor(inputFormat : String, outputFormat : String,
_id: String = "" + nextVertexId.incrementAndGet()) extends Struct _id: String = "" + nextVertexId.incrementAndGet()) extends Struct
case class Column(name : String, dataType : String, storageDesc : StorageDescriptor, case class Column(name : String, dataType : String, sd : StorageDescriptor,
traits : Option[List[Trait]] = None, traits : Option[List[Trait]] = None,
_id: String = "" + nextVertexId.incrementAndGet()) extends Instance _id: String = "" + nextVertexId.incrementAndGet()) extends Instance
case class Table(name: String, db: DB, storageDesc : StorageDescriptor, case class Table(name: String, db: DB, sd : StorageDescriptor,
traits : Option[List[Trait]] = None, traits : Option[List[Trait]] = None,
_id: String = "" + nextVertexId.incrementAndGet()) extends Instance _id: String = "" + nextVertexId.incrementAndGet()) extends Instance
...@@ -247,31 +257,31 @@ val GremlinQueries = List( ...@@ -247,31 +257,31 @@ val GremlinQueries = List(
// 5. List all tables that are Dimensions and have the TextInputFormat // 5. List all tables that are Dimensions and have the TextInputFormat
""" """
g.V.as("v").and(_().outE("Table.Dimension"), _().out("Table.storageDesc").has("inputFormat", "TextInputFormat")).name g.V.as("v").and(_().outE("Table.Dimension"), _().out("Table.sd").has("inputFormat", "TextInputFormat")).name
""".stripMargin, """.stripMargin,
// 6. List all tables that are Dimensions or have the TextInputFormat // 6. List all tables that are Dimensions or have the TextInputFormat
""" """
g.V.as("v").or(_().outE("Table.Dimension"), _().out("Table.storageDesc").has("inputFormat", "TextInputFormat")).name g.V.as("v").or(_().outE("Table.Dimension"), _().out("Table.sd").has("inputFormat", "TextInputFormat")).name
""".stripMargin, """.stripMargin,
// 7. List tables that have at least 1 PII column // 7. List tables that have at least 1 PII column
""" """
g.V.has("typeName", "Table").as("tab").out("Table.storageDesc").in("Column.storageDesc").as("column"). \ g.V.has("typeName", "Table").as("tab").out("Table.sd").in("Column.sd").as("column"). \
out("Column.PII").select.groupBy{it.getColumn("tab")}{it.getColumn("column")}{[ "c" : it.size]}.cap.scatter.filter{it.value.c > 0}. \ out("Column.PII").select.groupBy{it.getColumn("tab")}{it.getColumn("column")}{[ "c" : it.size]}.cap.scatter.filter{it.value.c > 0}. \
transform{it.key}.name """.stripMargin transform{it.key}.name """.stripMargin
// 7.a from Table as tab -> g.V.has("typeName", "Table").as("tab") // 7.a from Table as tab -> g.V.has("typeName", "Table").as("tab")
// 7.b storageDesc.Column as column -> out("Table.storageDesc").in("Column.storageDesc").as("column") // 7.b sd.Column as column -> out("Table.sd").in("Column.sd").as("column")
// 7.c is PII -> out("Column.PII") // 7.c is PII -> out("Column.PII")
// 7.d select tab, column -> select{it}{it} // 7.d select tab, column -> select{it}{it}
// 7.e groupBy tab compute count(column) as c // 7.e groupBy tab compute count(column) as c
// 7.f where c > 0 // 7.f where c > 0
// 7.a Alias(Type("Table"), "tab") // 7.a Alias(Type("Table"), "tab")
// 7b. Field("storageDesc", Alias(Type("Table"), "tab")) // 7b. Field("sd", Alias(Type("Table"), "tab"))
// Alias(Field("Column", Field("storageDesc", Alias(Type("Table"), "tab"))), "column") // Alias(Field("Column", Field("sd", Alias(Type("Table"), "tab"))), "column")
// 7.c Filter(is("PII"), Alias(Field("Column", Field("storageDesc", Alias(Type("Table"), "tab"))), "column")) // 7.c Filter(is("PII"), Alias(Field("Column", Field("sd", Alias(Type("Table"), "tab"))), "column"))
// 7.d // 7.d
) )
} }
......
...@@ -66,7 +66,7 @@ object QueryTestsUtils extends GraphUtils { ...@@ -66,7 +66,7 @@ object QueryTestsUtils extends GraphUtils {
Array( Array(
attrDef("name", DataTypes.STRING_TYPE), attrDef("name", DataTypes.STRING_TYPE),
attrDef("owner", DataTypes.STRING_TYPE), attrDef("owner", DataTypes.STRING_TYPE),
attrDef("createTime", DataTypes.LONG_TYPE) attrDef("createTime", DataTypes.INT_TYPE)
)) ))
def storageDescClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "StorageDesc", null, def storageDescClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "StorageDesc", null,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment