Commit 82aeb21d by Harish Butani

add support for evaluation of Path Expressions

parent 77b11f26
...@@ -731,7 +731,7 @@ object Expressions { ...@@ -731,7 +731,7 @@ object Expressions {
throw new UnresolvedException(this, throw new UnresolvedException(this,
s"datatype. Can not resolve due to unresolved child") s"datatype. Can not resolve due to unresolved child")
} }
TypeUtils.ResultWithPathStruct.createType(child.dataType) TypeUtils.ResultWithPathStruct.createType(this, child.dataType)
} }
override def toString = s"$child withPath" override def toString = s"$child withPath"
......
...@@ -22,6 +22,7 @@ import javax.script.{Bindings, ScriptEngine, ScriptEngineManager} ...@@ -22,6 +22,7 @@ import javax.script.{Bindings, ScriptEngine, ScriptEngineManager}
import com.thinkaurelius.titan.core.TitanGraph import com.thinkaurelius.titan.core.TitanGraph
import com.tinkerpop.pipes.util.structures.Row import com.tinkerpop.pipes.util.structures.Row
import org.apache.hadoop.metadata.query.TypeUtils.ResultWithPathStruct
import org.apache.hadoop.metadata.typesystem.json._ import org.apache.hadoop.metadata.typesystem.json._
import org.apache.hadoop.metadata.typesystem.types._ import org.apache.hadoop.metadata.typesystem.types._
import org.json4s._ import org.json4s._
...@@ -42,30 +43,71 @@ class GremlinEvaluator(qry: GremlinQuery, persistenceStrategy: GraphPersistenceS ...@@ -42,30 +43,71 @@ class GremlinEvaluator(qry: GremlinQuery, persistenceStrategy: GraphPersistenceS
val bindings: Bindings = engine.createBindings val bindings: Bindings = engine.createBindings
bindings.put("g", g) bindings.put("g", g)
/**
*
* @param gResultObj is the object returned from gremlin. This must be a List
* @param qryResultObj is the object constructed for the output w/o the Path.
* @return a ResultWithPathStruct
*/
def addPathStruct(gResultObj : AnyRef, qryResultObj : Any) : Any = {
if ( !qry.isPathExpresion) {
qryResultObj
} else {
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
val iPaths = gResultObj.asInstanceOf[java.util.List[AnyRef]].init
val oPaths = iPaths.map { p =>
persistenceStrategy.constructInstance(TypeSystem.getInstance().getIdType.getStructType, p)
}.toList.asJava
val sType = qry.expr.dataType.asInstanceOf[StructType]
val sInstance = sType.createInstance()
sInstance.set(ResultWithPathStruct.pathAttrName, oPaths)
sInstance.set(ResultWithPathStruct.resultAttrName, qryResultObj)
sInstance
}
}
def instanceObject(v : AnyRef) : AnyRef = {
if ( qry.isPathExpresion ) {
import scala.collection.JavaConversions._
v.asInstanceOf[java.util.List[AnyRef]].last
} else {
v
}
}
def evaluate(): GremlinQueryResult = { def evaluate(): GremlinQueryResult = {
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
val rType = qry.expr.dataType val rType = qry.expr.dataType
val oType = if (qry.isPathExpresion) qry.expr.children(0).dataType else rType
val rawRes = engine.eval(qry.queryStr, bindings) val rawRes = engine.eval(qry.queryStr, bindings)
if (!qry.hasSelectList) { if (!qry.hasSelectList) {
val rows = rawRes.asInstanceOf[java.util.List[AnyRef]].map { v => val rows = rawRes.asInstanceOf[java.util.List[AnyRef]].map { v =>
persistenceStrategy.constructInstance(rType, v) val iV = instanceObject(v)
val o = persistenceStrategy.constructInstance(oType, iV)
addPathStruct(v, o)
} }
GremlinQueryResult(qry.expr.toString, rType, rows.toList) GremlinQueryResult(qry.expr.toString, rType, rows.toList)
} else { } else {
val sType = rType.asInstanceOf[StructType] val sType = oType.asInstanceOf[StructType]
val rows = rawRes.asInstanceOf[java.util.List[Row[java.util.List[AnyRef]]]].map { r => val rows = rawRes.asInstanceOf[java.util.List[AnyRef]].map { r =>
val rV = instanceObject(r).asInstanceOf[Row[java.util.List[AnyRef]]]
val sInstance = sType.createInstance() val sInstance = sType.createInstance()
val selExpr = qry.expr.asInstanceOf[Expressions.SelectExpression] val selExpr =
(if (qry.isPathExpresion) qry.expr.children(0) else qry.expr).
asInstanceOf[Expressions.SelectExpression]
selExpr.selectListWithAlias.foreach { aE => selExpr.selectListWithAlias.foreach { aE =>
val cName = aE.alias val cName = aE.alias
val (src, idx) = qry.resultMaping(cName) val (src, idx) = qry.resultMaping(cName)
val v = r.getColumn(src).get(idx) val v = rV.getColumn(src).get(idx)
sInstance.set(cName, persistenceStrategy.constructInstance(aE.dataType, v)) sInstance.set(cName, persistenceStrategy.constructInstance(aE.dataType, v))
} }
sInstance sInstance
addPathStruct(r, sInstance)
} }
GremlinQueryResult(qry.expr.toString, sType, rows.toList) GremlinQueryResult(qry.expr.toString, rType, rows.toList)
} }
} }
......
...@@ -29,6 +29,8 @@ import scala.collection.mutable.ArrayBuffer ...@@ -29,6 +29,8 @@ import scala.collection.mutable.ArrayBuffer
case class GremlinQuery(expr: Expression, queryStr: String, resultMaping: Map[String, (String, Int)]) { case class GremlinQuery(expr: Expression, queryStr: String, resultMaping: Map[String, (String, Int)]) {
def hasSelectList = resultMaping != null def hasSelectList = resultMaping != null
def isPathExpresion = expr.isInstanceOf[PathExpression]
} }
trait SelectExpressionHandling { trait SelectExpressionHandling {
...@@ -270,6 +272,9 @@ class GremlinTranslator(expr: Expression, ...@@ -270,6 +272,9 @@ class GremlinTranslator(expr: Expression,
case in@InstanceExpression(child) => { case in@InstanceExpression(child) => {
s"${genQuery(child, inSelect)}" s"${genQuery(child, inSelect)}"
} }
case pe@PathExpression(child) => {
s"${genQuery(child, inSelect)}.path"
}
case x => throw new GremlinTranslationException(x, "expression not yet supported") case x => throw new GremlinTranslationException(x, "expression not yet supported")
} }
...@@ -289,6 +294,10 @@ class GremlinTranslator(expr: Expression, ...@@ -289,6 +294,10 @@ class GremlinTranslator(expr: Expression,
val rMap = buildResultMapping(e1) val rMap = buildResultMapping(e1)
GremlinQuery(e1, s"g.V.${genQuery(e1, false)}.toList()", rMap) GremlinQuery(e1, s"g.V.${genQuery(e1, false)}.toList()", rMap)
} }
case pe@PathExpression(se@SelectExpression(child, selectList)) => {
val rMap = buildResultMapping(se)
GremlinQuery(e1, s"g.V.${genQuery(pe, false)}.toList()", rMap)
}
case e1 => GremlinQuery(e1, s"g.V.${genQuery(e1, false)}.toList()", null) case e1 => GremlinQuery(e1, s"g.V.${genQuery(e1, false)}.toList()", null)
} }
......
...@@ -18,9 +18,11 @@ ...@@ -18,9 +18,11 @@
package org.apache.hadoop.metadata.query package org.apache.hadoop.metadata.query
import java.util
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import org.apache.hadoop.metadata.MetadataException import org.apache.hadoop.metadata.MetadataException
import org.apache.hadoop.metadata.query.Expressions.{SelectExpression, PathExpression}
import org.apache.hadoop.metadata.typesystem.types.DataTypes.{ArrayType, PrimitiveType, TypeCategory} import org.apache.hadoop.metadata.typesystem.types.DataTypes.{ArrayType, PrimitiveType, TypeCategory}
import org.apache.hadoop.metadata.typesystem.types._ import org.apache.hadoop.metadata.typesystem.types._
...@@ -62,6 +64,7 @@ object TypeUtils { ...@@ -62,6 +64,7 @@ object TypeUtils {
aDefs(i) = new AttributeDefinition(e.alias,e.dataType.getName, Multiplicity.OPTIONAL, false, null) aDefs(i) = new AttributeDefinition(e.alias,e.dataType.getName, Multiplicity.OPTIONAL, false, null)
} }
return typSystem.defineQueryResultType(s"${TEMP_STRUCT_NAME_PREFIX}${tempStructCounter.getAndIncrement}", return typSystem.defineQueryResultType(s"${TEMP_STRUCT_NAME_PREFIX}${tempStructCounter.getAndIncrement}",
null,
aDefs:_*); aDefs:_*);
} }
...@@ -72,10 +75,14 @@ object TypeUtils { ...@@ -72,10 +75,14 @@ object TypeUtils {
val pathAttr = new AttributeDefinition(pathAttrName, pathAttrType, Multiplicity.COLLECTION, false, null) val pathAttr = new AttributeDefinition(pathAttrName, pathAttrType, Multiplicity.COLLECTION, false, null)
def createType(resultType : IDataType[_]) : StructType = { def createType(pE : PathExpression, resultType : IDataType[_]) : StructType = {
val resultAttr = new AttributeDefinition(resultAttrName, pathAttrType, Multiplicity.REQUIRED, false, null) val resultAttr = new AttributeDefinition(resultAttrName, resultType.getName, Multiplicity.REQUIRED, false, null)
val typName = s"${TEMP_STRUCT_NAME_PREFIX}${tempStructCounter.getAndIncrement}" val typName = s"${TEMP_STRUCT_NAME_PREFIX}${tempStructCounter.getAndIncrement}"
typSystem.defineQueryResultType(typName, pathAttr, resultAttr); val m : java.util.HashMap[String, IDataType[_]] = new util.HashMap[String, IDataType[_]]()
if ( pE.child.isInstanceOf[SelectExpression]) {
m.put(pE.child.dataType.getName, pE.child.dataType)
}
typSystem.defineQueryResultType(typName, m, pathAttr, resultAttr);
} }
} }
......
...@@ -66,4 +66,15 @@ class GremlinTest2 extends FunSuite with BeforeAndAfterAll with BaseGremlinTest ...@@ -66,4 +66,15 @@ class GremlinTest2 extends FunSuite with BeforeAndAfterAll with BaseGremlinTest
validateJson(r) validateJson(r)
} }
test("testLineageWithPath") {
val r = QueryProcessor.evaluate(_class("Table").loop(id("LoadProcess").field("outputTable")).path(), g)
validateJson(r)
}
test("testLineageAllSelectWithPath") {
val r = QueryProcessor.evaluate(_class("Table").as("src").loop(id("LoadProcess").field("outputTable")).as("dest").
select(id("src").field("name").as("srcTable"), id("dest").field("name").as("destTable")).path(), g)
validateJson(r)
}
} }
\ No newline at end of file
...@@ -20,6 +20,8 @@ package org.apache.hadoop.metadata.typesystem.types; ...@@ -20,6 +20,8 @@ package org.apache.hadoop.metadata.typesystem.types;
import org.apache.hadoop.metadata.MetadataException; import org.apache.hadoop.metadata.MetadataException;
import java.util.Map;
public class AttributeInfo { public class AttributeInfo {
public final String name; public final String name;
public final Multiplicity multiplicity; public final Multiplicity multiplicity;
...@@ -33,10 +35,11 @@ public class AttributeInfo { ...@@ -33,10 +35,11 @@ public class AttributeInfo {
public final String reverseAttributeName; public final String reverseAttributeName;
private IDataType dataType; private IDataType dataType;
AttributeInfo(TypeSystem t, AttributeDefinition def) throws MetadataException { AttributeInfo(TypeSystem t, AttributeDefinition def, Map<String, IDataType> tempTypes) throws MetadataException {
TypeUtils.validateName(def.name); TypeUtils.validateName(def.name);
this.name = def.name; this.name = def.name;
this.dataType = t.getDataType(IDataType.class, def.dataTypeName); this.dataType = (tempTypes != null && tempTypes.containsKey(def.dataTypeName)) ?
tempTypes.get(def.dataTypeName) : t.getDataType(IDataType.class, def.dataTypeName);
this.multiplicity = def.multiplicity; this.multiplicity = def.multiplicity;
this.isComposite = def.isComposite; this.isComposite = def.isComposite;
this.isUnique = def.isUnique; this.isUnique = def.isUnique;
......
...@@ -168,12 +168,13 @@ public class TypeSystem { ...@@ -168,12 +168,13 @@ public class TypeSystem {
* @throws MetadataException * @throws MetadataException
*/ */
public StructType defineQueryResultType(String name, public StructType defineQueryResultType(String name,
Map<String, IDataType> tempTypes,
AttributeDefinition... attrDefs) AttributeDefinition... attrDefs)
throws MetadataException { throws MetadataException {
AttributeInfo[] infos = new AttributeInfo[attrDefs.length]; AttributeInfo[] infos = new AttributeInfo[attrDefs.length];
for (int i = 0; i < attrDefs.length; i++) { for (int i = 0; i < attrDefs.length; i++) {
infos[i] = new AttributeInfo(this, attrDefs[i]); infos[i] = new AttributeInfo(this, attrDefs[i], tempTypes);
} }
return new StructType(this, name, null, infos); return new StructType(this, name, null, infos);
...@@ -414,7 +415,7 @@ public class TypeSystem { ...@@ -414,7 +415,7 @@ public class TypeSystem {
private AttributeInfo constructAttributeInfo(AttributeDefinition attrDef) private AttributeInfo constructAttributeInfo(AttributeDefinition attrDef)
throws MetadataException { throws MetadataException {
AttributeInfo info = new AttributeInfo(this, attrDef); AttributeInfo info = new AttributeInfo(this, attrDef, null);
if (transientTypes.contains(attrDef.dataTypeName)) { if (transientTypes.contains(attrDef.dataTypeName)) {
recursiveRefs.add(info); recursiveRefs.add(info);
} }
...@@ -606,8 +607,8 @@ public class TypeSystem { ...@@ -606,8 +607,8 @@ public class TypeSystem {
DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null); DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null);
try { try {
AttributeInfo[] infos = new AttributeInfo[2]; AttributeInfo[] infos = new AttributeInfo[2];
infos[0] = new AttributeInfo(TypeSystem.this, idAttr); infos[0] = new AttributeInfo(TypeSystem.this, idAttr, null);
infos[1] = new AttributeInfo(TypeSystem.this, typNmAttr); infos[1] = new AttributeInfo(TypeSystem.this, typNmAttr, null);
StructType type = new StructType(TypeSystem.this, TYP_NAME, null, infos); StructType type = new StructType(TypeSystem.this, TYP_NAME, null, infos);
TypeSystem.this.types.put(TYP_NAME, type); TypeSystem.this.types.put(TYP_NAME, type);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment