Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
atlas
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
dataplatform
atlas
Commits
26f3b76b
Commit
26f3b76b
authored
May 18, 2015
by
Harish Butani
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
support retriving closureQuery result as a Graph
parent
b2e0fe35
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
179 additions
and
5 deletions
+179
-5
ClosureQuery.scala
...scala/org/apache/hadoop/metadata/query/ClosureQuery.scala
+95
-4
TypeUtils.scala
...in/scala/org/apache/hadoop/metadata/query/TypeUtils.scala
+60
-0
GremlinTest2.scala
...scala/org/apache/hadoop/metadata/query/GremlinTest2.scala
+21
-0
TypeSystem.java
...g/apache/hadoop/metadata/typesystem/types/TypeSystem.java
+2
-0
InstanceSerialization.scala
...doop/metadata/typesystem/json/InstanceSerialization.scala
+1
-1
No files found.
repository/src/main/scala/org/apache/hadoop/metadata/query/ClosureQuery.scala
View file @
26f3b76b
...
@@ -18,10 +18,16 @@
...
@@ -18,10 +18,16 @@
package
org.apache.hadoop.metadata.query
package
org.apache.hadoop.metadata.query
import
java.util
import
Expressions._
import
Expressions._
import
com.thinkaurelius.titan.core.TitanGraph
import
com.thinkaurelius.titan.core.TitanGraph
import
org.apache.hadoop.metadata.typesystem.types.DataTypes
import
org.apache.hadoop.metadata.MetadataException
import
org.apache.hadoop.metadata.typesystem.types.DataTypes.PrimitiveType
import
org.apache.hadoop.metadata.typesystem.ITypedStruct
import
org.apache.hadoop.metadata.typesystem.json.
{
InstanceSerialization
,
Serialization
}
import
org.apache.hadoop.metadata.typesystem.persistence.
{
Id
,
StructInstance
}
import
org.apache.hadoop.metadata.typesystem.types.
{
TypeSystem
,
StructType
,
DataTypes
}
import
org.apache.hadoop.metadata.typesystem.types.DataTypes.
{
MapType
,
PrimitiveType
}
/**
/**
* Represents a Query to compute the closure based on a relationship between entities of a particular type.
* Represents a Query to compute the closure based on a relationship between entities of a particular type.
...
@@ -62,6 +68,9 @@ import org.apache.hadoop.metadata.typesystem.types.DataTypes.PrimitiveType
...
@@ -62,6 +68,9 @@ import org.apache.hadoop.metadata.typesystem.types.DataTypes.PrimitiveType
*/
*/
trait
ClosureQuery
{
trait
ClosureQuery
{
val
SRC_PREFIX
=
TypeUtils
.
GraphResultStruct
.
SRC_PREFIX
val
DEST_PREFIX
=
TypeUtils
.
GraphResultStruct
.
DEST_PREFIX
sealed
trait
PathAttribute
{
sealed
trait
PathAttribute
{
def
toExpr
:
Expression
=
this
match
{
def
toExpr
:
Expression
=
this
match
{
...
@@ -129,8 +138,8 @@ trait ClosureQuery {
...
@@ -129,8 +138,8 @@ trait ClosureQuery {
def
srcCondition
(
expr
:
Expression
)
:
Expression
=
expr
def
srcCondition
(
expr
:
Expression
)
:
Expression
=
expr
def
expr
:
Expressions.Expression
=
{
def
expr
:
Expressions.Expression
=
{
val
e
=
srcCondition
(
Expressions
.
_class
(
closureType
)).
as
(
"src"
).
loop
(
pathExpr
).
as
(
"dest"
).
val
e
=
srcCondition
(
Expressions
.
_class
(
closureType
)).
as
(
SRC_PREFIX
).
loop
(
pathExpr
).
as
(
DEST_PREFIX
).
select
((
selectExpr
(
"src"
)
++
selectExpr
(
"dest"
))
:_
*
)
select
((
selectExpr
(
SRC_PREFIX
)
++
selectExpr
(
DEST_PREFIX
))
:_
*
)
if
(
withPath
)
e
.
path
else
e
if
(
withPath
)
e
.
path
else
e
}
}
...
@@ -138,6 +147,80 @@ trait ClosureQuery {
...
@@ -138,6 +147,80 @@ trait ClosureQuery {
var
e
=
expr
var
e
=
expr
QueryProcessor
.
evaluate
(
e
,
g
,
persistenceStrategy
)
QueryProcessor
.
evaluate
(
e
,
g
,
persistenceStrategy
)
}
}
def
graph
:
GraphResult
=
{
if
(!
withPath
)
{
throw
new
ExpressionException
(
expr
,
"Graph requested for non Path Query"
)
}
import
scala.collection.JavaConverters._
val
res
=
evaluate
()
val
graphResType
=
TypeUtils
.
GraphResultStruct
.
createType
(
res
.
resultDataType
.
asInstanceOf
[
StructType
])
val
vertexPayloadType
=
{
val
mT
=
graphResType
.
fieldMapping
.
fields
.
get
(
TypeUtils
.
GraphResultStruct
.
verticesAttrName
).
dataType
().
asInstanceOf
[
MapType
]
mT
.
getValueType
.
asInstanceOf
[
StructType
]
}
def
id
(
idObj
:
StructInstance
)
:
String
=
idObj
.
getString
(
TypeSystem
.
ID_STRUCT_ID_ATTRNAME
)
def
vertexStruct
(
idObj
:
StructInstance
,
resRow
:
ITypedStruct
,
attrPrefix
:
String
)
:
StructInstance
=
{
val
vP
=
vertexPayloadType
.
createInstance
()
vP
.
set
(
TypeUtils
.
GraphResultStruct
.
vertexIdAttrName
,
idObj
)
vertexPayloadType
.
fieldMapping
.
fields
.
asScala
.
keys
.
filter
(
_
!=
TypeUtils
.
GraphResultStruct
.
vertexIdAttrName
).
foreach
{
a
=>
vP
.
set
(
a
,
resRow
.
get
(
s
"${attrPrefix}$a"
))
}
vP
.
asInstanceOf
[
StructInstance
]
}
val
instance
=
graphResType
.
createInstance
()
val
vertices
=
new
util
.
HashMap
[
String
,
AnyRef
]()
val
edges
=
new
util
.
HashMap
[
String
,
java.util.List
[
String
]]()
/**
* foreach resultRow
* for each Path entry
* add an entry in the edges Map
* add an entry for the Src Vertex to the vertex Map
* add an entry for the Dest Vertex to the vertex Map
*/
res
.
rows
.
map
(
_
.
asInstanceOf
[
StructInstance
]).
foreach
{
r
=>
val
path
=
r
.
get
(
TypeUtils
.
ResultWithPathStruct
.
pathAttrName
).
asInstanceOf
[
java.util.List
[
_
]].
asScala
val
srcVertex
=
path
.
head
.
asInstanceOf
[
StructInstance
]
var
currVertex
=
srcVertex
path
.
tail
.
foreach
{
n
=>
val
nextVertex
=
n
.
asInstanceOf
[
StructInstance
]
val
iList
=
if
(!
edges
.
containsKey
(
id
(
currVertex
)))
{
val
l
=
new
util
.
ArrayList
[
String
]()
edges
.
put
(
id
(
currVertex
),
l
)
l
}
else
{
edges
.
get
(
id
(
currVertex
))
}
if
(
!
iList
.
contains
(
id
(
nextVertex
)))
{
iList
.
add
(
id
(
nextVertex
))
}
currVertex
=
nextVertex
}
val
vertex
=
r
.
get
(
TypeUtils
.
ResultWithPathStruct
.
resultAttrName
)
vertices
.
put
(
id
(
srcVertex
),
vertexStruct
(
srcVertex
,
r
.
get
(
TypeUtils
.
ResultWithPathStruct
.
resultAttrName
).
asInstanceOf
[
ITypedStruct
],
s
"${SRC_PREFIX}_"
))
vertices
.
put
(
id
(
currVertex
),
vertexStruct
(
currVertex
,
r
.
get
(
TypeUtils
.
ResultWithPathStruct
.
resultAttrName
).
asInstanceOf
[
ITypedStruct
],
s
"${DEST_PREFIX}_"
))
}
instance
.
set
(
TypeUtils
.
GraphResultStruct
.
verticesAttrName
,
vertices
)
instance
.
set
(
TypeUtils
.
GraphResultStruct
.
edgesAttrName
,
edges
)
GraphResult
(
res
.
query
,
instance
)
}
}
}
/**
/**
...
@@ -237,4 +320,11 @@ case class HiveWhereUsedQuery(tableTypeName : String,
...
@@ -237,4 +320,11 @@ case class HiveWhereUsedQuery(tableTypeName : String,
ReverseRelation
(
ctasTypeName
,
ctasInputTableAttribute
),
ReverseRelation
(
ctasTypeName
,
ctasInputTableAttribute
),
Relation
(
ctasOutputTableAttribute
)
Relation
(
ctasOutputTableAttribute
)
)
)
}
case
class
GraphResult
(
query
:
String
,
result
:
ITypedStruct
)
{
def
toTypedJson
=
Serialization
.
toJson
(
result
)
def
toInstanceJson
=
InstanceSerialization
.
toJson
(
result
)
}
}
\ No newline at end of file
repository/src/main/scala/org/apache/hadoop/metadata/query/TypeUtils.scala
View file @
26f3b76b
...
@@ -86,6 +86,66 @@ object TypeUtils {
...
@@ -86,6 +86,66 @@ object TypeUtils {
}
}
}
}
/**
* Structure representing the Closure Graph.
* Returns:
* 1. A map of vertexId -> vertex Info(these are the attributes requested in the query)
* 2. A edges map: each entry is a mapping from an vertexId to the List of adjacent vertexIds.
*
* '''The Vertex Map doesn't contain all the vertices in the Graph. Only the ones for which Attributes are
* available.''' These are the vertices that represent the EntityType whose Closure was requested. For e.g. for
* Table Lineage the ''vertex map'' will contain information about Tables, but not about ''Load Process'' vertices
* that connect Tables.
*/
object
GraphResultStruct
{
val
SRC_PREFIX
=
"src"
val
DEST_PREFIX
=
"dest"
val
verticesAttrName
=
"vertices"
val
edgesAttrName
=
"edges"
val
vertexIdAttrName
=
"vertexId"
lazy
val
edgesAttrType
=
typSystem
.
defineMapType
(
DataTypes
.
STRING_TYPE
,
typSystem
.
defineArrayType
(
DataTypes
.
STRING_TYPE
))
def
createType
(
resultWithPathType
:
StructType
)
:
StructType
=
{
val
resultType
=
resultWithPathType
.
fieldMapping
().
fields
.
get
(
ResultWithPathStruct
.
resultAttrName
).
dataType
()
val
verticesAttrType
=
typSystem
.
defineMapType
(
DataTypes
.
STRING_TYPE
,
vertexType
(
resultType
.
asInstanceOf
[
StructType
]))
val
typName
=
s
"${TEMP_STRUCT_NAME_PREFIX}${tempStructCounter.getAndIncrement}"
val
verticesAttr
=
new
AttributeDefinition
(
verticesAttrName
,
verticesAttrType
.
getName
,
Multiplicity
.
REQUIRED
,
false
,
null
)
val
edgesAttr
=
new
AttributeDefinition
(
edgesAttrName
,
edgesAttrType
.
getName
,
Multiplicity
.
REQUIRED
,
false
,
null
)
val
m
:
java.util.HashMap
[
String
,
IDataType
[
_
]]
=
new
util
.
HashMap
[
String
,
IDataType
[
_
]]()
m
.
put
(
resultWithPathType
.
getName
,
resultWithPathType
)
m
.
put
(
resultType
.
getName
,
resultType
)
m
.
put
(
edgesAttrType
.
getName
,
edgesAttrType
)
m
.
put
(
verticesAttrType
.
getName
,
verticesAttrType
)
typSystem
.
defineQueryResultType
(
typName
,
m
,
verticesAttr
,
edgesAttr
)
}
private
def
vertexType
(
resultType
:
StructType
)
:
StructType
=
{
import
scala.collection.JavaConverters._
var
attrs
:
List
[
AttributeDefinition
]
=
resultType
.
fieldMapping
.
fields
.
asScala
.
filter
(
_
.
_1
.
startsWith
(
s
"${SRC_PREFIX}_"
)).
mapValues
{
aInfo
=>
new
AttributeDefinition
(
aInfo
.
name
.
substring
(
s
"${SRC_PREFIX}_"
.
length
),
aInfo
.
dataType
.
getName
,
aInfo
.
multiplicity
,
aInfo
.
isComposite
,
aInfo
.
reverseAttributeName
)
}.
values
.
toList
attrs
=
new
AttributeDefinition
(
vertexIdAttrName
,
typSystem
.
getIdType
.
getStructType
.
name
,
Multiplicity
.
REQUIRED
,
false
,
null
)
::
attrs
return
typSystem
.
defineQueryResultType
(
s
"${TEMP_STRUCT_NAME_PREFIX}${tempStructCounter.getAndIncrement}"
,
null
,
attrs
:
_
*
)
}
}
def
fieldMapping
(
iDataType
:
IDataType
[
_
])
:
Option
[
FieldMapping
]
=
iDataType
match
{
def
fieldMapping
(
iDataType
:
IDataType
[
_
])
:
Option
[
FieldMapping
]
=
iDataType
match
{
case
c
:
ClassType
=>
Some
(
c
.
fieldMapping
())
case
c
:
ClassType
=>
Some
(
c
.
fieldMapping
())
case
t
:
TraitType
=>
Some
(
t
.
fieldMapping
())
case
t
:
TraitType
=>
Some
(
t
.
fieldMapping
())
...
...
repository/src/test/scala/org/apache/hadoop/metadata/query/GremlinTest2.scala
View file @
26f3b76b
...
@@ -104,6 +104,17 @@ class GremlinTest2 extends FunSuite with BeforeAndAfterAll with BaseGremlinTest
...
@@ -104,6 +104,17 @@ class GremlinTest2 extends FunSuite with BeforeAndAfterAll with BaseGremlinTest
validateJson
(
r
)
validateJson
(
r
)
}
}
test
(
"testHighLevelLineageReturnGraph"
)
{
val
r
=
HiveLineageQuery
(
"Table"
,
"sales_fact_monthly_mv"
,
"LoadProcess"
,
"inputTables"
,
"outputTable"
,
None
,
Some
(
List
(
"name"
)),
true
,
GraphPersistenceStrategy1
,
g
).
graph
println
(
r
.
toInstanceJson
)
//validateJson(r)
}
test
(
"testHighLevelWhereUsed"
)
{
test
(
"testHighLevelWhereUsed"
)
{
val
r
=
HiveWhereUsedQuery
(
"Table"
,
"sales_fact"
,
val
r
=
HiveWhereUsedQuery
(
"Table"
,
"sales_fact"
,
"LoadProcess"
,
"LoadProcess"
,
...
@@ -113,4 +124,13 @@ class GremlinTest2 extends FunSuite with BeforeAndAfterAll with BaseGremlinTest
...
@@ -113,4 +124,13 @@ class GremlinTest2 extends FunSuite with BeforeAndAfterAll with BaseGremlinTest
validateJson
(
r
)
validateJson
(
r
)
}
}
test
(
"testHighLevelWhereUsedReturnGraph"
)
{
val
r
=
HiveWhereUsedQuery
(
"Table"
,
"sales_fact"
,
"LoadProcess"
,
"inputTables"
,
"outputTable"
,
None
,
Some
(
List
(
"name"
)),
true
,
GraphPersistenceStrategy1
,
g
).
graph
println
(
r
.
toInstanceJson
)
}
}
}
\ No newline at end of file
typesystem/src/main/java/org/apache/hadoop/metadata/typesystem/types/TypeSystem.java
View file @
26f3b76b
...
@@ -653,4 +653,6 @@ public class TypeSystem {
...
@@ -653,4 +653,6 @@ public class TypeSystem {
public
String
idAttrName
()
{
return
ID_ATTRNAME
;}
public
String
idAttrName
()
{
return
ID_ATTRNAME
;}
public
String
typeNameAttrName
()
{
return
TYPENAME_ATTRNAME
;}
public
String
typeNameAttrName
()
{
return
TYPENAME_ATTRNAME
;}
}
}
public
static
final
String
ID_STRUCT_ID_ATTRNAME
=
IdType
.
ID_ATTRNAME
;
}
}
typesystem/src/main/scala/org/apache/hadoop/metadata/typesystem/json/InstanceSerialization.scala
View file @
26f3b76b
...
@@ -278,7 +278,7 @@ object InstanceSerialization {
...
@@ -278,7 +278,7 @@ object InstanceSerialization {
}
}
case
s
:
IStruct
=>
_Struct
(
s
.
getTypeName
,
asScala
(
s
.
getValuesMap
).
asInstanceOf
[
Map
[
String
,
AnyRef
]])
case
s
:
IStruct
=>
_Struct
(
s
.
getTypeName
,
asScala
(
s
.
getValuesMap
).
asInstanceOf
[
Map
[
String
,
AnyRef
]])
case
l
:
java.util.List
[
_
]
=>
l
.
asScala
.
map
(
e
=>
asScala
(
e
)).
toList
case
l
:
java.util.List
[
_
]
=>
l
.
asScala
.
map
(
e
=>
asScala
(
e
)).
toList
case
m
:
java.util.Map
[
_
,
_
]
=>
m
.
asScala
.
map
Values
(
v
=>
asScala
(
v
)).
toMap
case
m
:
java.util.Map
[
_
,
_
]
=>
m
.
asScala
.
map
(
t
=>
(
asScala
(
t
.
_1
),
asScala
(
t
.
_2
)
)).
toMap
case
_
=>
v
case
_
=>
v
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment