From 26f3b76b8db68e8f414b6ccb5f6e25e9d219e092 Mon Sep 17 00:00:00 2001
From: Harish Butani <hbutani@hortonworks.com>
Date: Mon, 18 May 2015 18:17:10 -0700
Subject: [PATCH] support retriving closureQuery result as a Graph

---
 repository/src/main/scala/org/apache/hadoop/metadata/query/ClosureQuery.scala                    | 98 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----
 repository/src/main/scala/org/apache/hadoop/metadata/query/TypeUtils.scala                       | 60 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 repository/src/test/scala/org/apache/hadoop/metadata/query/GremlinTest2.scala                    | 20 ++++++++++++++++++++
 typesystem/src/main/java/org/apache/hadoop/metadata/typesystem/types/TypeSystem.java             |  2 ++
 typesystem/src/main/scala/org/apache/hadoop/metadata/typesystem/json/InstanceSerialization.scala |  2 +-
 5 files changed, 177 insertions(+), 5 deletions(-)

diff --git a/repository/src/main/scala/org/apache/hadoop/metadata/query/ClosureQuery.scala b/repository/src/main/scala/org/apache/hadoop/metadata/query/ClosureQuery.scala
index dab19d4..7ebbf64 100755
--- a/repository/src/main/scala/org/apache/hadoop/metadata/query/ClosureQuery.scala
+++ b/repository/src/main/scala/org/apache/hadoop/metadata/query/ClosureQuery.scala
@@ -18,10 +18,16 @@
 
 package org.apache.hadoop.metadata.query
 
+import java.util
+
 import Expressions._
 import com.thinkaurelius.titan.core.TitanGraph
-import org.apache.hadoop.metadata.typesystem.types.DataTypes
-import org.apache.hadoop.metadata.typesystem.types.DataTypes.PrimitiveType
+import org.apache.hadoop.metadata.MetadataException
+import org.apache.hadoop.metadata.typesystem.ITypedStruct
+import org.apache.hadoop.metadata.typesystem.json.{InstanceSerialization, Serialization}
+import org.apache.hadoop.metadata.typesystem.persistence.{Id, StructInstance}
+import org.apache.hadoop.metadata.typesystem.types.{TypeSystem, StructType, DataTypes}
+import org.apache.hadoop.metadata.typesystem.types.DataTypes.{MapType, PrimitiveType}
 
 /**
  * Represents a Query to compute the closure based on a relationship between entities of a particular type.
@@ -62,6 +68,9 @@ import org.apache.hadoop.metadata.typesystem.types.DataTypes.PrimitiveType
  */
 trait ClosureQuery {
 
+  val SRC_PREFIX = TypeUtils.GraphResultStruct.SRC_PREFIX
+  val DEST_PREFIX = TypeUtils.GraphResultStruct.DEST_PREFIX
+
   sealed trait PathAttribute {
 
     def toExpr : Expression = this match {
@@ -129,8 +138,8 @@ trait ClosureQuery {
   def srcCondition(expr : Expression) : Expression = expr
 
   def expr : Expressions.Expression = {
-    val e = srcCondition(Expressions._class(closureType)).as("src").loop(pathExpr).as("dest").
-      select((selectExpr("src") ++ selectExpr("dest")):_*)
+    val e = srcCondition(Expressions._class(closureType)).as(SRC_PREFIX).loop(pathExpr).as(DEST_PREFIX).
+      select((selectExpr(SRC_PREFIX) ++ selectExpr(DEST_PREFIX)):_*)
     if (withPath) e.path else e
   }
 
@@ -138,6 +147,80 @@ trait ClosureQuery {
     var e = expr
     QueryProcessor.evaluate(e, g, persistenceStrategy)
   }
+
+  def graph : GraphResult = {
+
+    if (!withPath) {
+      throw new ExpressionException(expr, "Graph requested for non Path Query")
+    }
+
+    import scala.collection.JavaConverters._
+
+    val res = evaluate()
+
+    val graphResType = TypeUtils.GraphResultStruct.createType(res.resultDataType.asInstanceOf[StructType])
+    val vertexPayloadType = {
+      val mT = graphResType.fieldMapping.fields.get(TypeUtils.GraphResultStruct.verticesAttrName).
+        dataType().asInstanceOf[MapType]
+      mT.getValueType.asInstanceOf[StructType]
+    }
+
+    def id(idObj : StructInstance) : String = idObj.getString(TypeSystem.ID_STRUCT_ID_ATTRNAME)
+
+    def vertexStruct(idObj : StructInstance, resRow : ITypedStruct, attrPrefix : String) : StructInstance = {
+      val vP = vertexPayloadType.createInstance()
+      vP.set(TypeUtils.GraphResultStruct.vertexIdAttrName, idObj)
+      vertexPayloadType.fieldMapping.fields.asScala.keys.
+        filter(_ != TypeUtils.GraphResultStruct.vertexIdAttrName).foreach{a =>
+        vP.set(a, resRow.get(s"${attrPrefix}$a"))
+      }
+      vP.asInstanceOf[StructInstance]
+    }
+
+    val instance = graphResType.createInstance()
+    val vertices = new util.HashMap[String, AnyRef]()
+    val edges = new util.HashMap[String,java.util.List[String]]()
+
+    /**
+     * foreach resultRow
+     *   for each Path entry
+     *     add an entry in the edges Map
+     *   add an entry for the Src Vertex to the vertex Map
+     *   add an entry for the Dest Vertex to the vertex Map
+     */
+    res.rows.map(_.asInstanceOf[StructInstance]).foreach { r =>
+
+      val path = r.get(TypeUtils.ResultWithPathStruct.pathAttrName).asInstanceOf[java.util.List[_]].asScala
+      val srcVertex = path.head.asInstanceOf[StructInstance]
+
+      var currVertex = srcVertex
+      path.tail.foreach { n =>
+        val nextVertex = n.asInstanceOf[StructInstance]
+        val iList = if (!edges.containsKey(id(currVertex))) {
+          val l = new util.ArrayList[String]()
+          edges.put(id(currVertex), l)
+          l
+        } else {
+          edges.get(id(currVertex))
+        }
+        if ( !iList.contains(id(nextVertex))) {
+          iList.add(id(nextVertex))
+        }
+        currVertex = nextVertex
+      }
+      val vertex = r.get(TypeUtils.ResultWithPathStruct.resultAttrName)
+      vertices.put(id(srcVertex), vertexStruct(srcVertex,
+        r.get(TypeUtils.ResultWithPathStruct.resultAttrName).asInstanceOf[ITypedStruct],
+        s"${SRC_PREFIX}_"))
+      vertices.put(id(currVertex), vertexStruct(currVertex,
+        r.get(TypeUtils.ResultWithPathStruct.resultAttrName).asInstanceOf[ITypedStruct],
+        s"${DEST_PREFIX}_"))
+    }
+
+    instance.set(TypeUtils.GraphResultStruct.verticesAttrName, vertices)
+    instance.set(TypeUtils.GraphResultStruct.edgesAttrName, edges)
+    GraphResult(res.query, instance)
+  }
 }
 
 /**
@@ -237,4 +320,11 @@ case class HiveWhereUsedQuery(tableTypeName : String,
     ReverseRelation(ctasTypeName, ctasInputTableAttribute),
     Relation(ctasOutputTableAttribute)
   )
+}
+
+case class GraphResult(query: String, result : ITypedStruct) {
+
+  def toTypedJson = Serialization.toJson(result)
+
+  def toInstanceJson = InstanceSerialization.toJson(result)
 }
\ No newline at end of file
diff --git a/repository/src/main/scala/org/apache/hadoop/metadata/query/TypeUtils.scala b/repository/src/main/scala/org/apache/hadoop/metadata/query/TypeUtils.scala
index 7c30dca..a7e4a6d 100755
--- a/repository/src/main/scala/org/apache/hadoop/metadata/query/TypeUtils.scala
+++ b/repository/src/main/scala/org/apache/hadoop/metadata/query/TypeUtils.scala
@@ -86,6 +86,66 @@ object TypeUtils {
       }
     }
 
+  /**
+   * Structure representing the Closure Graph.
+   * Returns:
+   * 1. A map of vertexId -> vertex Info(these are the attributes requested in the query)
+   * 2. A edges map: each entry is a mapping from an vertexId to the List of adjacent vertexIds.
+   *
+   * '''The Vertex Map doesn't contain all the vertices in the Graph. Only the ones for which Attributes are
+   * available.''' These are the vertices that represent the EntityType whose Closure was requested. For e.g. for
+   * Table Lineage the ''vertex map'' will contain information  about Tables, but not about ''Load Process'' vertices
+   * that connect Tables.
+   */
+  object GraphResultStruct {
+    val SRC_PREFIX = "src"
+    val DEST_PREFIX = "dest"
+
+    val verticesAttrName = "vertices"
+    val edgesAttrName = "edges"
+    val vertexIdAttrName = "vertexId"
+
+    lazy val edgesAttrType = typSystem.defineMapType(DataTypes.STRING_TYPE,
+      typSystem.defineArrayType(DataTypes.STRING_TYPE))
+
+    def createType(resultWithPathType: StructType): StructType = {
+      val resultType = resultWithPathType.fieldMapping().fields.get(ResultWithPathStruct.resultAttrName).dataType()
+
+      val verticesAttrType = typSystem.defineMapType(DataTypes.STRING_TYPE,
+        vertexType(resultType.asInstanceOf[StructType]))
+      val typName = s"${TEMP_STRUCT_NAME_PREFIX}${tempStructCounter.getAndIncrement}"
+      val verticesAttr = new AttributeDefinition(verticesAttrName, verticesAttrType.getName,
+        Multiplicity.REQUIRED, false, null)
+      val edgesAttr = new AttributeDefinition(edgesAttrName, edgesAttrType.getName, Multiplicity.REQUIRED, false, null)
+
+      val m: java.util.HashMap[String, IDataType[_]] = new util.HashMap[String, IDataType[_]]()
+      m.put(resultWithPathType.getName, resultWithPathType)
+      m.put(resultType.getName, resultType)
+      m.put(edgesAttrType.getName, edgesAttrType)
+      m.put(verticesAttrType.getName, verticesAttrType)
+      typSystem.defineQueryResultType(typName, m, verticesAttr, edgesAttr)
+    }
+
+    private def vertexType(resultType: StructType): StructType = {
+
+      import scala.collection.JavaConverters._
+
+      var attrs: List[AttributeDefinition] =
+        resultType.fieldMapping.fields.asScala.filter(_._1.startsWith(s"${SRC_PREFIX}_")).mapValues { aInfo =>
+
+        new AttributeDefinition(aInfo.name.substring(s"${SRC_PREFIX}_".length), aInfo.dataType.getName,
+          aInfo.multiplicity, aInfo.isComposite, aInfo.reverseAttributeName)
+      }.values.toList
+
+      attrs = new AttributeDefinition(vertexIdAttrName, typSystem.getIdType.getStructType.name,
+        Multiplicity.REQUIRED, false, null) :: attrs
+
+      return typSystem.defineQueryResultType(s"${TEMP_STRUCT_NAME_PREFIX}${tempStructCounter.getAndIncrement}",
+        null,
+        attrs: _*)
+    }
+  }
+
     def fieldMapping(iDataType: IDataType[_]) : Option[FieldMapping] = iDataType match {
         case c : ClassType => Some(c.fieldMapping())
         case t : TraitType => Some(t.fieldMapping())
diff --git a/repository/src/test/scala/org/apache/hadoop/metadata/query/GremlinTest2.scala b/repository/src/test/scala/org/apache/hadoop/metadata/query/GremlinTest2.scala
index 32b94a9..4eb5b1a 100755
--- a/repository/src/test/scala/org/apache/hadoop/metadata/query/GremlinTest2.scala
+++ b/repository/src/test/scala/org/apache/hadoop/metadata/query/GremlinTest2.scala
@@ -104,6 +104,17 @@ class GremlinTest2 extends FunSuite with BeforeAndAfterAll with BaseGremlinTest 
     validateJson(r)
   }
 
+  test("testHighLevelLineageReturnGraph") {
+    val r = HiveLineageQuery("Table", "sales_fact_monthly_mv",
+      "LoadProcess",
+      "inputTables",
+      "outputTable",
+      None, Some(List("name")), true, GraphPersistenceStrategy1, g).graph
+
+    println(r.toInstanceJson)
+    //validateJson(r)
+  }
+
   test("testHighLevelWhereUsed") {
     val r = HiveWhereUsedQuery("Table", "sales_fact",
       "LoadProcess",
@@ -113,4 +124,13 @@ class GremlinTest2 extends FunSuite with BeforeAndAfterAll with BaseGremlinTest 
     validateJson(r)
   }
 
+  test("testHighLevelWhereUsedReturnGraph") {
+    val r = HiveWhereUsedQuery("Table", "sales_fact",
+      "LoadProcess",
+      "inputTables",
+      "outputTable",
+      None, Some(List("name")), true, GraphPersistenceStrategy1, g).graph
+    println(r.toInstanceJson)
+  }
+
 }
\ No newline at end of file
diff --git a/typesystem/src/main/java/org/apache/hadoop/metadata/typesystem/types/TypeSystem.java b/typesystem/src/main/java/org/apache/hadoop/metadata/typesystem/types/TypeSystem.java
index 61c60dc..6fbce65 100755
--- a/typesystem/src/main/java/org/apache/hadoop/metadata/typesystem/types/TypeSystem.java
+++ b/typesystem/src/main/java/org/apache/hadoop/metadata/typesystem/types/TypeSystem.java
@@ -653,4 +653,6 @@ public class TypeSystem {
         public String idAttrName() { return ID_ATTRNAME;}
         public String typeNameAttrName() { return TYPENAME_ATTRNAME;}
     }
+
+    public static final String ID_STRUCT_ID_ATTRNAME = IdType.ID_ATTRNAME;
 }
diff --git a/typesystem/src/main/scala/org/apache/hadoop/metadata/typesystem/json/InstanceSerialization.scala b/typesystem/src/main/scala/org/apache/hadoop/metadata/typesystem/json/InstanceSerialization.scala
index 13b320a..5909129 100755
--- a/typesystem/src/main/scala/org/apache/hadoop/metadata/typesystem/json/InstanceSerialization.scala
+++ b/typesystem/src/main/scala/org/apache/hadoop/metadata/typesystem/json/InstanceSerialization.scala
@@ -278,7 +278,7 @@ object InstanceSerialization {
     }
     case s : IStruct => _Struct(s.getTypeName, asScala(s.getValuesMap).asInstanceOf[Map[String, AnyRef]])
     case l : java.util.List[_] => l.asScala.map(e => asScala(e)).toList
-    case m : java.util.Map[_, _] => m.asScala.mapValues(v => asScala(v)).toMap
+    case m : java.util.Map[_, _] => m.asScala.map(t => (asScala(t._1), asScala(t._2))).toMap
     case _ => v
   }
 
--
libgit2 0.27.1