Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
atlas
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
dataplatform
atlas
Commits
54c3c7f0
Commit
54c3c7f0
authored
9 years ago
by
Harish Butani
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
support retriving closureQuery result as a Graph
parent
f4547da3
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
179 additions
and
5 deletions
+179
-5
ClosureQuery.scala
...scala/org/apache/hadoop/metadata/query/ClosureQuery.scala
+95
-4
TypeUtils.scala
...in/scala/org/apache/hadoop/metadata/query/TypeUtils.scala
+60
-0
GremlinTest2.scala
...scala/org/apache/hadoop/metadata/query/GremlinTest2.scala
+21
-0
TypeSystem.java
...g/apache/hadoop/metadata/typesystem/types/TypeSystem.java
+2
-0
InstanceSerialization.scala
...doop/metadata/typesystem/json/InstanceSerialization.scala
+1
-1
No files found.
repository/src/main/scala/org/apache/hadoop/metadata/query/ClosureQuery.scala
View file @
54c3c7f0
...
...
@@ -18,10 +18,16 @@
package
org.apache.hadoop.metadata.query
import
java.util
import
Expressions._
import
com.thinkaurelius.titan.core.TitanGraph
import
org.apache.hadoop.metadata.typesystem.types.DataTypes
import
org.apache.hadoop.metadata.typesystem.types.DataTypes.PrimitiveType
import
org.apache.hadoop.metadata.MetadataException
import
org.apache.hadoop.metadata.typesystem.ITypedStruct
import
org.apache.hadoop.metadata.typesystem.json.
{
InstanceSerialization
,
Serialization
}
import
org.apache.hadoop.metadata.typesystem.persistence.
{
Id
,
StructInstance
}
import
org.apache.hadoop.metadata.typesystem.types.
{
TypeSystem
,
StructType
,
DataTypes
}
import
org.apache.hadoop.metadata.typesystem.types.DataTypes.
{
MapType
,
PrimitiveType
}
/**
* Represents a Query to compute the closure based on a relationship between entities of a particular type.
...
...
@@ -62,6 +68,9 @@ import org.apache.hadoop.metadata.typesystem.types.DataTypes.PrimitiveType
*/
trait
ClosureQuery
{
val
SRC_PREFIX
=
TypeUtils
.
GraphResultStruct
.
SRC_PREFIX
val
DEST_PREFIX
=
TypeUtils
.
GraphResultStruct
.
DEST_PREFIX
sealed
trait
PathAttribute
{
def
toExpr
:
Expression
=
this
match
{
...
...
@@ -129,8 +138,8 @@ trait ClosureQuery {
def
srcCondition
(
expr
:
Expression
)
:
Expression
=
expr
def
expr
:
Expressions.Expression
=
{
val
e
=
srcCondition
(
Expressions
.
_class
(
closureType
)).
as
(
"src"
).
loop
(
pathExpr
).
as
(
"dest"
).
select
((
selectExpr
(
"src"
)
++
selectExpr
(
"dest"
))
:_
*
)
val
e
=
srcCondition
(
Expressions
.
_class
(
closureType
)).
as
(
SRC_PREFIX
).
loop
(
pathExpr
).
as
(
DEST_PREFIX
).
select
((
selectExpr
(
SRC_PREFIX
)
++
selectExpr
(
DEST_PREFIX
))
:_
*
)
if
(
withPath
)
e
.
path
else
e
}
...
...
@@ -138,6 +147,80 @@ trait ClosureQuery {
var
e
=
expr
QueryProcessor
.
evaluate
(
e
,
g
,
persistenceStrategy
)
}
def
graph
:
GraphResult
=
{
if
(!
withPath
)
{
throw
new
ExpressionException
(
expr
,
"Graph requested for non Path Query"
)
}
import
scala.collection.JavaConverters._
val
res
=
evaluate
()
val
graphResType
=
TypeUtils
.
GraphResultStruct
.
createType
(
res
.
resultDataType
.
asInstanceOf
[
StructType
])
val
vertexPayloadType
=
{
val
mT
=
graphResType
.
fieldMapping
.
fields
.
get
(
TypeUtils
.
GraphResultStruct
.
verticesAttrName
).
dataType
().
asInstanceOf
[
MapType
]
mT
.
getValueType
.
asInstanceOf
[
StructType
]
}
def
id
(
idObj
:
StructInstance
)
:
String
=
idObj
.
getString
(
TypeSystem
.
ID_STRUCT_ID_ATTRNAME
)
def
vertexStruct
(
idObj
:
StructInstance
,
resRow
:
ITypedStruct
,
attrPrefix
:
String
)
:
StructInstance
=
{
val
vP
=
vertexPayloadType
.
createInstance
()
vP
.
set
(
TypeUtils
.
GraphResultStruct
.
vertexIdAttrName
,
idObj
)
vertexPayloadType
.
fieldMapping
.
fields
.
asScala
.
keys
.
filter
(
_
!=
TypeUtils
.
GraphResultStruct
.
vertexIdAttrName
).
foreach
{
a
=>
vP
.
set
(
a
,
resRow
.
get
(
s
"${attrPrefix}$a"
))
}
vP
.
asInstanceOf
[
StructInstance
]
}
val
instance
=
graphResType
.
createInstance
()
val
vertices
=
new
util
.
HashMap
[
String
,
AnyRef
]()
val
edges
=
new
util
.
HashMap
[
String
,
java.util.List
[
String
]]()
/**
* foreach resultRow
* for each Path entry
* add an entry in the edges Map
* add an entry for the Src Vertex to the vertex Map
* add an entry for the Dest Vertex to the vertex Map
*/
res
.
rows
.
map
(
_
.
asInstanceOf
[
StructInstance
]).
foreach
{
r
=>
val
path
=
r
.
get
(
TypeUtils
.
ResultWithPathStruct
.
pathAttrName
).
asInstanceOf
[
java.util.List
[
_
]].
asScala
val
srcVertex
=
path
.
head
.
asInstanceOf
[
StructInstance
]
var
currVertex
=
srcVertex
path
.
tail
.
foreach
{
n
=>
val
nextVertex
=
n
.
asInstanceOf
[
StructInstance
]
val
iList
=
if
(!
edges
.
containsKey
(
id
(
currVertex
)))
{
val
l
=
new
util
.
ArrayList
[
String
]()
edges
.
put
(
id
(
currVertex
),
l
)
l
}
else
{
edges
.
get
(
id
(
currVertex
))
}
if
(
!
iList
.
contains
(
id
(
nextVertex
)))
{
iList
.
add
(
id
(
nextVertex
))
}
currVertex
=
nextVertex
}
val
vertex
=
r
.
get
(
TypeUtils
.
ResultWithPathStruct
.
resultAttrName
)
vertices
.
put
(
id
(
srcVertex
),
vertexStruct
(
srcVertex
,
r
.
get
(
TypeUtils
.
ResultWithPathStruct
.
resultAttrName
).
asInstanceOf
[
ITypedStruct
],
s
"${SRC_PREFIX}_"
))
vertices
.
put
(
id
(
currVertex
),
vertexStruct
(
currVertex
,
r
.
get
(
TypeUtils
.
ResultWithPathStruct
.
resultAttrName
).
asInstanceOf
[
ITypedStruct
],
s
"${DEST_PREFIX}_"
))
}
instance
.
set
(
TypeUtils
.
GraphResultStruct
.
verticesAttrName
,
vertices
)
instance
.
set
(
TypeUtils
.
GraphResultStruct
.
edgesAttrName
,
edges
)
GraphResult
(
res
.
query
,
instance
)
}
}
/**
...
...
@@ -238,3 +321,10 @@ case class HiveWhereUsedQuery(tableTypeName : String,
Relation
(
ctasOutputTableAttribute
)
)
}
case
class
GraphResult
(
query
:
String
,
result
:
ITypedStruct
)
{
def
toTypedJson
=
Serialization
.
toJson
(
result
)
def
toInstanceJson
=
InstanceSerialization
.
toJson
(
result
)
}
\ No newline at end of file
This diff is collapsed.
Click to expand it.
repository/src/main/scala/org/apache/hadoop/metadata/query/TypeUtils.scala
View file @
54c3c7f0
...
...
@@ -86,6 +86,66 @@ object TypeUtils {
}
}
/**
* Structure representing the Closure Graph.
* Returns:
* 1. A map of vertexId -> vertex Info(these are the attributes requested in the query)
* 2. A edges map: each entry is a mapping from an vertexId to the List of adjacent vertexIds.
*
* '''The Vertex Map doesn't contain all the vertices in the Graph. Only the ones for which Attributes are
* available.''' These are the vertices that represent the EntityType whose Closure was requested. For e.g. for
* Table Lineage the ''vertex map'' will contain information about Tables, but not about ''Load Process'' vertices
* that connect Tables.
*/
object
GraphResultStruct
{
val
SRC_PREFIX
=
"src"
val
DEST_PREFIX
=
"dest"
val
verticesAttrName
=
"vertices"
val
edgesAttrName
=
"edges"
val
vertexIdAttrName
=
"vertexId"
lazy
val
edgesAttrType
=
typSystem
.
defineMapType
(
DataTypes
.
STRING_TYPE
,
typSystem
.
defineArrayType
(
DataTypes
.
STRING_TYPE
))
def
createType
(
resultWithPathType
:
StructType
)
:
StructType
=
{
val
resultType
=
resultWithPathType
.
fieldMapping
().
fields
.
get
(
ResultWithPathStruct
.
resultAttrName
).
dataType
()
val
verticesAttrType
=
typSystem
.
defineMapType
(
DataTypes
.
STRING_TYPE
,
vertexType
(
resultType
.
asInstanceOf
[
StructType
]))
val
typName
=
s
"${TEMP_STRUCT_NAME_PREFIX}${tempStructCounter.getAndIncrement}"
val
verticesAttr
=
new
AttributeDefinition
(
verticesAttrName
,
verticesAttrType
.
getName
,
Multiplicity
.
REQUIRED
,
false
,
null
)
val
edgesAttr
=
new
AttributeDefinition
(
edgesAttrName
,
edgesAttrType
.
getName
,
Multiplicity
.
REQUIRED
,
false
,
null
)
val
m
:
java.util.HashMap
[
String
,
IDataType
[
_
]]
=
new
util
.
HashMap
[
String
,
IDataType
[
_
]]()
m
.
put
(
resultWithPathType
.
getName
,
resultWithPathType
)
m
.
put
(
resultType
.
getName
,
resultType
)
m
.
put
(
edgesAttrType
.
getName
,
edgesAttrType
)
m
.
put
(
verticesAttrType
.
getName
,
verticesAttrType
)
typSystem
.
defineQueryResultType
(
typName
,
m
,
verticesAttr
,
edgesAttr
)
}
private
def
vertexType
(
resultType
:
StructType
)
:
StructType
=
{
import
scala.collection.JavaConverters._
var
attrs
:
List
[
AttributeDefinition
]
=
resultType
.
fieldMapping
.
fields
.
asScala
.
filter
(
_
.
_1
.
startsWith
(
s
"${SRC_PREFIX}_"
)).
mapValues
{
aInfo
=>
new
AttributeDefinition
(
aInfo
.
name
.
substring
(
s
"${SRC_PREFIX}_"
.
length
),
aInfo
.
dataType
.
getName
,
aInfo
.
multiplicity
,
aInfo
.
isComposite
,
aInfo
.
reverseAttributeName
)
}.
values
.
toList
attrs
=
new
AttributeDefinition
(
vertexIdAttrName
,
typSystem
.
getIdType
.
getStructType
.
name
,
Multiplicity
.
REQUIRED
,
false
,
null
)
::
attrs
return
typSystem
.
defineQueryResultType
(
s
"${TEMP_STRUCT_NAME_PREFIX}${tempStructCounter.getAndIncrement}"
,
null
,
attrs
:
_
*
)
}
}
def
fieldMapping
(
iDataType
:
IDataType
[
_
])
:
Option
[
FieldMapping
]
=
iDataType
match
{
case
c
:
ClassType
=>
Some
(
c
.
fieldMapping
())
case
t
:
TraitType
=>
Some
(
t
.
fieldMapping
())
...
...
This diff is collapsed.
Click to expand it.
repository/src/test/scala/org/apache/hadoop/metadata/query/GremlinTest2.scala
View file @
54c3c7f0
...
...
@@ -104,6 +104,17 @@ class GremlinTest2 extends FunSuite with BeforeAndAfterAll with BaseGremlinTest
validateJson
(
r
)
}
test
(
"testHighLevelLineageReturnGraph"
)
{
val
r
=
HiveLineageQuery
(
"Table"
,
"sales_fact_monthly_mv"
,
"LoadProcess"
,
"inputTables"
,
"outputTable"
,
None
,
Some
(
List
(
"name"
)),
true
,
GraphPersistenceStrategy1
,
g
).
graph
println
(
r
.
toInstanceJson
)
//validateJson(r)
}
test
(
"testHighLevelWhereUsed"
)
{
val
r
=
HiveWhereUsedQuery
(
"Table"
,
"sales_fact"
,
"LoadProcess"
,
...
...
@@ -113,4 +124,13 @@ class GremlinTest2 extends FunSuite with BeforeAndAfterAll with BaseGremlinTest
validateJson
(
r
)
}
test
(
"testHighLevelWhereUsedReturnGraph"
)
{
val
r
=
HiveWhereUsedQuery
(
"Table"
,
"sales_fact"
,
"LoadProcess"
,
"inputTables"
,
"outputTable"
,
None
,
Some
(
List
(
"name"
)),
true
,
GraphPersistenceStrategy1
,
g
).
graph
println
(
r
.
toInstanceJson
)
}
}
\ No newline at end of file
This diff is collapsed.
Click to expand it.
typesystem/src/main/java/org/apache/hadoop/metadata/typesystem/types/TypeSystem.java
View file @
54c3c7f0
...
...
@@ -658,4 +658,6 @@ public class TypeSystem {
public
String
idAttrName
()
{
return
ID_ATTRNAME
;}
public
String
typeNameAttrName
()
{
return
TYPENAME_ATTRNAME
;}
}
public
static
final
String
ID_STRUCT_ID_ATTRNAME
=
IdType
.
ID_ATTRNAME
;
}
This diff is collapsed.
Click to expand it.
typesystem/src/main/scala/org/apache/hadoop/metadata/typesystem/json/InstanceSerialization.scala
View file @
54c3c7f0
...
...
@@ -278,7 +278,7 @@ object InstanceSerialization {
}
case
s
:
IStruct
=>
_Struct
(
s
.
getTypeName
,
asScala
(
s
.
getValuesMap
).
asInstanceOf
[
Map
[
String
,
AnyRef
]])
case
l
:
java.util.List
[
_
]
=>
l
.
asScala
.
map
(
e
=>
asScala
(
e
)).
toList
case
m
:
java.util.Map
[
_
,
_
]
=>
m
.
asScala
.
map
Values
(
v
=>
asScala
(
v
)).
toMap
case
m
:
java.util.Map
[
_
,
_
]
=>
m
.
asScala
.
map
(
t
=>
(
asScala
(
t
.
_1
),
asScala
(
t
.
_2
)
)).
toMap
case
_
=>
v
}
...
...
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment