Commit 1a536c2a by Suma Shivaprasad

Merge branch 'master' of https://github.com/hortonworks/metadata into BUG_36928

parents 1776b1f1 54c3c7f0
......@@ -18,10 +18,16 @@
package org.apache.hadoop.metadata.query
import java.util
import Expressions._
import com.thinkaurelius.titan.core.TitanGraph
import org.apache.hadoop.metadata.typesystem.types.DataTypes
import org.apache.hadoop.metadata.typesystem.types.DataTypes.PrimitiveType
import org.apache.hadoop.metadata.MetadataException
import org.apache.hadoop.metadata.typesystem.ITypedStruct
import org.apache.hadoop.metadata.typesystem.json.{InstanceSerialization, Serialization}
import org.apache.hadoop.metadata.typesystem.persistence.{Id, StructInstance}
import org.apache.hadoop.metadata.typesystem.types.{TypeSystem, StructType, DataTypes}
import org.apache.hadoop.metadata.typesystem.types.DataTypes.{MapType, PrimitiveType}
/**
* Represents a Query to compute the closure based on a relationship between entities of a particular type.
......@@ -62,6 +68,9 @@ import org.apache.hadoop.metadata.typesystem.types.DataTypes.PrimitiveType
*/
trait ClosureQuery {
val SRC_PREFIX = TypeUtils.GraphResultStruct.SRC_PREFIX
val DEST_PREFIX = TypeUtils.GraphResultStruct.DEST_PREFIX
sealed trait PathAttribute {
def toExpr : Expression = this match {
......@@ -129,8 +138,8 @@ trait ClosureQuery {
def srcCondition(expr : Expression) : Expression = expr
def expr : Expressions.Expression = {
val e = srcCondition(Expressions._class(closureType)).as("src").loop(pathExpr).as("dest").
select((selectExpr("src") ++ selectExpr("dest")):_*)
val e = srcCondition(Expressions._class(closureType)).as(SRC_PREFIX).loop(pathExpr).as(DEST_PREFIX).
select((selectExpr(SRC_PREFIX) ++ selectExpr(DEST_PREFIX)):_*)
if (withPath) e.path else e
}
......@@ -138,6 +147,80 @@ trait ClosureQuery {
var e = expr
QueryProcessor.evaluate(e, g, persistenceStrategy)
}
def graph : GraphResult = {
if (!withPath) {
throw new ExpressionException(expr, "Graph requested for non Path Query")
}
import scala.collection.JavaConverters._
val res = evaluate()
val graphResType = TypeUtils.GraphResultStruct.createType(res.resultDataType.asInstanceOf[StructType])
val vertexPayloadType = {
val mT = graphResType.fieldMapping.fields.get(TypeUtils.GraphResultStruct.verticesAttrName).
dataType().asInstanceOf[MapType]
mT.getValueType.asInstanceOf[StructType]
}
def id(idObj : StructInstance) : String = idObj.getString(TypeSystem.ID_STRUCT_ID_ATTRNAME)
def vertexStruct(idObj : StructInstance, resRow : ITypedStruct, attrPrefix : String) : StructInstance = {
val vP = vertexPayloadType.createInstance()
vP.set(TypeUtils.GraphResultStruct.vertexIdAttrName, idObj)
vertexPayloadType.fieldMapping.fields.asScala.keys.
filter(_ != TypeUtils.GraphResultStruct.vertexIdAttrName).foreach{a =>
vP.set(a, resRow.get(s"${attrPrefix}$a"))
}
vP.asInstanceOf[StructInstance]
}
val instance = graphResType.createInstance()
val vertices = new util.HashMap[String, AnyRef]()
val edges = new util.HashMap[String,java.util.List[String]]()
/**
* foreach resultRow
* for each Path entry
* add an entry in the edges Map
* add an entry for the Src Vertex to the vertex Map
* add an entry for the Dest Vertex to the vertex Map
*/
res.rows.map(_.asInstanceOf[StructInstance]).foreach { r =>
val path = r.get(TypeUtils.ResultWithPathStruct.pathAttrName).asInstanceOf[java.util.List[_]].asScala
val srcVertex = path.head.asInstanceOf[StructInstance]
var currVertex = srcVertex
path.tail.foreach { n =>
val nextVertex = n.asInstanceOf[StructInstance]
val iList = if (!edges.containsKey(id(currVertex))) {
val l = new util.ArrayList[String]()
edges.put(id(currVertex), l)
l
} else {
edges.get(id(currVertex))
}
if ( !iList.contains(id(nextVertex))) {
iList.add(id(nextVertex))
}
currVertex = nextVertex
}
val vertex = r.get(TypeUtils.ResultWithPathStruct.resultAttrName)
vertices.put(id(srcVertex), vertexStruct(srcVertex,
r.get(TypeUtils.ResultWithPathStruct.resultAttrName).asInstanceOf[ITypedStruct],
s"${SRC_PREFIX}_"))
vertices.put(id(currVertex), vertexStruct(currVertex,
r.get(TypeUtils.ResultWithPathStruct.resultAttrName).asInstanceOf[ITypedStruct],
s"${DEST_PREFIX}_"))
}
instance.set(TypeUtils.GraphResultStruct.verticesAttrName, vertices)
instance.set(TypeUtils.GraphResultStruct.edgesAttrName, edges)
GraphResult(res.query, instance)
}
}
/**
......@@ -237,4 +320,11 @@ case class HiveWhereUsedQuery(tableTypeName : String,
ReverseRelation(ctasTypeName, ctasInputTableAttribute),
Relation(ctasOutputTableAttribute)
)
}
case class GraphResult(query: String, result : ITypedStruct) {
def toTypedJson = Serialization.toJson(result)
def toInstanceJson = InstanceSerialization.toJson(result)
}
\ No newline at end of file
......@@ -86,6 +86,66 @@ object TypeUtils {
}
}
/**
* Structure representing the Closure Graph.
* Returns:
* 1. A map of vertexId -> vertex Info(these are the attributes requested in the query)
* 2. A edges map: each entry is a mapping from an vertexId to the List of adjacent vertexIds.
*
* '''The Vertex Map doesn't contain all the vertices in the Graph. Only the ones for which Attributes are
* available.''' These are the vertices that represent the EntityType whose Closure was requested. For e.g. for
* Table Lineage the ''vertex map'' will contain information about Tables, but not about ''Load Process'' vertices
* that connect Tables.
*/
object GraphResultStruct {
val SRC_PREFIX = "src"
val DEST_PREFIX = "dest"
val verticesAttrName = "vertices"
val edgesAttrName = "edges"
val vertexIdAttrName = "vertexId"
lazy val edgesAttrType = typSystem.defineMapType(DataTypes.STRING_TYPE,
typSystem.defineArrayType(DataTypes.STRING_TYPE))
def createType(resultWithPathType: StructType): StructType = {
val resultType = resultWithPathType.fieldMapping().fields.get(ResultWithPathStruct.resultAttrName).dataType()
val verticesAttrType = typSystem.defineMapType(DataTypes.STRING_TYPE,
vertexType(resultType.asInstanceOf[StructType]))
val typName = s"${TEMP_STRUCT_NAME_PREFIX}${tempStructCounter.getAndIncrement}"
val verticesAttr = new AttributeDefinition(verticesAttrName, verticesAttrType.getName,
Multiplicity.REQUIRED, false, null)
val edgesAttr = new AttributeDefinition(edgesAttrName, edgesAttrType.getName, Multiplicity.REQUIRED, false, null)
val m: java.util.HashMap[String, IDataType[_]] = new util.HashMap[String, IDataType[_]]()
m.put(resultWithPathType.getName, resultWithPathType)
m.put(resultType.getName, resultType)
m.put(edgesAttrType.getName, edgesAttrType)
m.put(verticesAttrType.getName, verticesAttrType)
typSystem.defineQueryResultType(typName, m, verticesAttr, edgesAttr)
}
private def vertexType(resultType: StructType): StructType = {
import scala.collection.JavaConverters._
var attrs: List[AttributeDefinition] =
resultType.fieldMapping.fields.asScala.filter(_._1.startsWith(s"${SRC_PREFIX}_")).mapValues { aInfo =>
new AttributeDefinition(aInfo.name.substring(s"${SRC_PREFIX}_".length), aInfo.dataType.getName,
aInfo.multiplicity, aInfo.isComposite, aInfo.reverseAttributeName)
}.values.toList
attrs = new AttributeDefinition(vertexIdAttrName, typSystem.getIdType.getStructType.name,
Multiplicity.REQUIRED, false, null) :: attrs
return typSystem.defineQueryResultType(s"${TEMP_STRUCT_NAME_PREFIX}${tempStructCounter.getAndIncrement}",
null,
attrs: _*)
}
}
def fieldMapping(iDataType: IDataType[_]) : Option[FieldMapping] = iDataType match {
case c : ClassType => Some(c.fieldMapping())
case t : TraitType => Some(t.fieldMapping())
......
......@@ -104,6 +104,17 @@ class GremlinTest2 extends FunSuite with BeforeAndAfterAll with BaseGremlinTest
validateJson(r)
}
test("testHighLevelLineageReturnGraph") {
val r = HiveLineageQuery("Table", "sales_fact_monthly_mv",
"LoadProcess",
"inputTables",
"outputTable",
None, Some(List("name")), true, GraphPersistenceStrategy1, g).graph
println(r.toInstanceJson)
//validateJson(r)
}
test("testHighLevelWhereUsed") {
val r = HiveWhereUsedQuery("Table", "sales_fact",
"LoadProcess",
......@@ -113,4 +124,13 @@ class GremlinTest2 extends FunSuite with BeforeAndAfterAll with BaseGremlinTest
validateJson(r)
}
test("testHighLevelWhereUsedReturnGraph") {
val r = HiveWhereUsedQuery("Table", "sales_fact",
"LoadProcess",
"inputTables",
"outputTable",
None, Some(List("name")), true, GraphPersistenceStrategy1, g).graph
println(r.toInstanceJson)
}
}
\ No newline at end of file
......@@ -658,4 +658,6 @@ public class TypeSystem {
public String idAttrName() { return ID_ATTRNAME;}
public String typeNameAttrName() { return TYPENAME_ATTRNAME;}
}
public static final String ID_STRUCT_ID_ATTRNAME = IdType.ID_ATTRNAME;
}
......@@ -278,7 +278,7 @@ object InstanceSerialization {
}
case s : IStruct => _Struct(s.getTypeName, asScala(s.getValuesMap).asInstanceOf[Map[String, AnyRef]])
case l : java.util.List[_] => l.asScala.map(e => asScala(e)).toList
case m : java.util.Map[_, _] => m.asScala.mapValues(v => asScala(v)).toMap
case m : java.util.Map[_, _] => m.asScala.map(t => (asScala(t._1), asScala(t._2))).toMap
case _ => v
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment