Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
atlas
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
dataplatform
atlas
Commits
6e9d6948
Commit
6e9d6948
authored
Jun 04, 2015
by
Suma S
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #126 from shwethags/hive
de-duping on query string in hive hook
parents
2270d05f
21109f1e
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
98 additions
and
46 deletions
+98
-46
HiveMetaStoreBridge.java
...ache/hadoop/metadata/hive/bridge/HiveMetaStoreBridge.java
+15
-1
HiveHook.java
...n/java/org/apache/hadoop/metadata/hive/hook/HiveHook.java
+50
-36
HiveHookIT.java
...java/org/apache/hadoop/metadata/hive/hook/HiveHookIT.java
+25
-3
GraphBackedTypeStore.java
...p/metadata/repository/typestore/GraphBackedTypeStore.java
+8
-6
No files found.
addons/hive-bridge/src/main/java/org/apache/hadoop/metadata/hive/bridge/HiveMetaStoreBridge.java
View file @
6e9d6948
...
...
@@ -18,6 +18,7 @@
package
org
.
apache
.
hadoop
.
metadata
.
hive
.
bridge
;
import
org.apache.commons.lang.StringEscapeUtils
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.hadoop.hive.conf.HiveConf
;
import
org.apache.hadoop.hive.metastore.api.Database
;
...
...
@@ -158,11 +159,24 @@ public class HiveMetaStoreBridge {
LOG
.
debug
(
"Getting reference for database {}"
,
databaseName
);
String
typeName
=
HiveDataTypes
.
HIVE_DB
.
getName
();
String
dslQuery
=
String
.
format
(
"%s where name = '%s' and clusterName = '%s'"
,
HiveDataTypes
.
HIVE_DB
.
getName
()
,
String
dslQuery
=
String
.
format
(
"%s where name = '%s' and clusterName = '%s'"
,
typeName
,
databaseName
.
toLowerCase
(),
clusterName
);
return
getEntityReferenceFromDSL
(
typeName
,
dslQuery
);
}
public
Referenceable
getProcessReference
(
String
queryStr
)
throws
Exception
{
LOG
.
debug
(
"Getting reference for process with query {}"
,
queryStr
);
String
typeName
=
HiveDataTypes
.
HIVE_PROCESS
.
getName
();
//todo enable DSL
// String dslQuery = String.format("%s where queryText = \"%s\"", typeName, queryStr);
// return getEntityReferenceFromDSL(typeName, dslQuery);
String
gremlinQuery
=
String
.
format
(
"g.V.has('__typeName', '%s').has('%s.queryText', \"%s\").toList()"
,
typeName
,
typeName
,
StringEscapeUtils
.
escapeJava
(
queryStr
));
return
getEntityReferenceFromGremlin
(
typeName
,
gremlinQuery
);
}
private
Referenceable
getEntityReferenceFromDSL
(
String
typeName
,
String
dslQuery
)
throws
Exception
{
MetadataServiceClient
dgiClient
=
getMetadataServiceClient
();
JSONArray
results
=
dgiClient
.
searchByDSL
(
dslQuery
);
...
...
addons/hive-bridge/src/main/java/org/apache/hadoop/metadata/hive/hook/HiveHook.java
View file @
6e9d6948
...
...
@@ -37,6 +37,8 @@ package org.apache.hadoop.metadata.hive.hook;
import
com.google.common.util.concurrent.ThreadFactoryBuilder
;
import
org.apache.commons.lang.StringEscapeUtils
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.hadoop.hive.conf.HiveConf
;
import
org.apache.hadoop.hive.ql.QueryPlan
;
import
org.apache.hadoop.hive.ql.exec.ExplainTask
;
...
...
@@ -271,6 +273,13 @@ public class HiveHook implements ExecuteWithHookContext {
}
}
private
String
normalize
(
String
str
)
{
if
(
StringUtils
.
isEmpty
(
str
))
{
return
null
;
}
return
str
.
toLowerCase
().
trim
();
}
private
void
registerProcess
(
HiveMetaStoreBridge
dgiBridge
,
HiveEvent
event
)
throws
Exception
{
Set
<
ReadEntity
>
inputs
=
event
.
inputs
;
Set
<
WriteEntity
>
outputs
=
event
.
outputs
;
...
...
@@ -285,48 +294,53 @@ public class HiveHook implements ExecuteWithHookContext {
}
String
queryId
=
event
.
queryPlan
.
getQueryId
();
String
queryStr
=
event
.
queryPlan
.
getQueryStr
(
);
String
queryStr
=
normalize
(
event
.
queryPlan
.
getQueryStr
()
);
long
queryStartTime
=
event
.
queryPlan
.
getQueryStartTime
();
LOG
.
debug
(
"Registering CTAS query: {}"
,
queryStr
);
Referenceable
processReferenceable
=
new
Referenceable
(
HiveDataTypes
.
HIVE_PROCESS
.
getName
());
processReferenceable
.
set
(
"name"
,
event
.
operation
.
getOperationName
());
processReferenceable
.
set
(
"startTime"
,
queryStartTime
);
processReferenceable
.
set
(
"userName"
,
event
.
user
);
List
<
Referenceable
>
source
=
new
ArrayList
<>();
for
(
ReadEntity
readEntity
:
inputs
)
{
if
(
readEntity
.
getType
()
==
Entity
.
Type
.
TABLE
)
{
Table
table
=
readEntity
.
getTable
();
String
dbName
=
table
.
getDbName
();
source
.
add
(
dgiBridge
.
registerTable
(
dbName
,
table
.
getTableName
()));
}
if
(
readEntity
.
getType
()
==
Entity
.
Type
.
PARTITION
)
{
dgiBridge
.
registerPartition
(
readEntity
.
getPartition
());
}
}
processReferenceable
.
set
(
"inputs"
,
source
);
List
<
Referenceable
>
target
=
new
ArrayList
<>();
for
(
WriteEntity
writeEntity
:
outputs
)
{
if
(
writeEntity
.
getType
()
==
Entity
.
Type
.
TABLE
||
writeEntity
.
getType
()
==
Entity
.
Type
.
PARTITION
)
{
Table
table
=
writeEntity
.
getTable
();
String
dbName
=
table
.
getDbName
();
target
.
add
(
dgiBridge
.
registerTable
(
dbName
,
table
.
getTableName
()));
Referenceable
processReferenceable
=
dgiBridge
.
getProcessReference
(
queryStr
);
if
(
processReferenceable
==
null
)
{
processReferenceable
=
new
Referenceable
(
HiveDataTypes
.
HIVE_PROCESS
.
getName
());
processReferenceable
.
set
(
"name"
,
event
.
operation
.
getOperationName
());
processReferenceable
.
set
(
"startTime"
,
queryStartTime
);
processReferenceable
.
set
(
"userName"
,
event
.
user
);
List
<
Referenceable
>
source
=
new
ArrayList
<>();
for
(
ReadEntity
readEntity
:
inputs
)
{
if
(
readEntity
.
getType
()
==
Entity
.
Type
.
TABLE
)
{
Table
table
=
readEntity
.
getTable
();
String
dbName
=
table
.
getDbName
();
source
.
add
(
dgiBridge
.
registerTable
(
dbName
,
table
.
getTableName
()));
}
if
(
readEntity
.
getType
()
==
Entity
.
Type
.
PARTITION
)
{
dgiBridge
.
registerPartition
(
readEntity
.
getPartition
());
}
}
if
(
writeEntity
.
getType
()
==
Entity
.
Type
.
PARTITION
)
{
dgiBridge
.
registerPartition
(
writeEntity
.
getPartition
());
processReferenceable
.
set
(
"inputs"
,
source
);
List
<
Referenceable
>
target
=
new
ArrayList
<>();
for
(
WriteEntity
writeEntity
:
outputs
)
{
if
(
writeEntity
.
getType
()
==
Entity
.
Type
.
TABLE
||
writeEntity
.
getType
()
==
Entity
.
Type
.
PARTITION
)
{
Table
table
=
writeEntity
.
getTable
();
String
dbName
=
table
.
getDbName
();
target
.
add
(
dgiBridge
.
registerTable
(
dbName
,
table
.
getTableName
()));
}
if
(
writeEntity
.
getType
()
==
Entity
.
Type
.
PARTITION
)
{
dgiBridge
.
registerPartition
(
writeEntity
.
getPartition
());
}
}
processReferenceable
.
set
(
"outputs"
,
target
);
processReferenceable
.
set
(
"queryText"
,
queryStr
);
processReferenceable
.
set
(
"queryId"
,
queryId
);
processReferenceable
.
set
(
"queryPlan"
,
event
.
jsonPlan
.
toString
());
processReferenceable
.
set
(
"endTime"
,
System
.
currentTimeMillis
());
//TODO set
processReferenceable
.
set
(
"queryGraph"
,
"queryGraph"
);
dgiBridge
.
createInstance
(
processReferenceable
);
}
else
{
LOG
.
debug
(
"Query {} is already registered"
,
queryStr
);
}
processReferenceable
.
set
(
"outputs"
,
target
);
processReferenceable
.
set
(
"queryText"
,
queryStr
);
processReferenceable
.
set
(
"queryId"
,
queryId
);
processReferenceable
.
set
(
"queryPlan"
,
event
.
jsonPlan
.
toString
());
processReferenceable
.
set
(
"endTime"
,
System
.
currentTimeMillis
());
//TODO set
processReferenceable
.
set
(
"queryGraph"
,
"queryGraph"
);
dgiBridge
.
createInstance
(
processReferenceable
);
}
...
...
addons/hive-bridge/src/test/java/org/apache/hadoop/metadata/hive/hook/HiveHookIT.java
View file @
6e9d6948
...
...
@@ -19,6 +19,8 @@
package
org
.
apache
.
hadoop
.
metadata
.
hive
.
hook
;
import
org.apache.commons.lang.RandomStringUtils
;
import
org.apache.commons.lang.StringEscapeUtils
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.hadoop.hive.conf.HiveConf
;
import
org.apache.hadoop.hive.metastore.TableType
;
import
org.apache.hadoop.hive.ql.Driver
;
...
...
@@ -222,7 +224,7 @@ public class HiveHookIT {
String
tableName
=
createTable
(
false
);
String
filename
=
"pfile://"
+
mkdir
(
"export"
);
String
query
=
"export table "
+
tableName
+
" to
'"
+
filename
+
"'
"
;
String
query
=
"export table "
+
tableName
+
" to
\""
+
filename
+
"\"
"
;
runCommand
(
query
);
assertProcessIsRegistered
(
query
);
...
...
@@ -239,6 +241,11 @@ public class HiveHookIT {
String
query
=
"select * from "
+
tableName
;
runCommand
(
query
);
assertProcessIsRegistered
(
query
);
//single entity per query
query
=
"SELECT * from "
+
tableName
.
toUpperCase
();
runCommand
(
query
);
assertProcessIsRegistered
(
query
);
}
@Test
...
...
@@ -268,8 +275,23 @@ public class HiveHookIT {
}
private
void
assertProcessIsRegistered
(
String
queryStr
)
throws
Exception
{
String
dslQuery
=
String
.
format
(
"%s where queryText = \"%s\""
,
HiveDataTypes
.
HIVE_PROCESS
.
getName
(),
queryStr
);
assertEntityIsRegistered
(
dslQuery
,
true
);
// String dslQuery = String.format("%s where queryText = \"%s\"", HiveDataTypes.HIVE_PROCESS.getName(),
// normalize(queryStr));
// assertEntityIsRegistered(dslQuery, true);
//todo replace with DSL
String
typeName
=
HiveDataTypes
.
HIVE_PROCESS
.
getName
();
String
gremlinQuery
=
String
.
format
(
"g.V.has('__typeName', '%s').has('%s.queryText', \"%s\").toList()"
,
typeName
,
typeName
,
normalize
(
queryStr
));
JSONObject
response
=
dgiCLient
.
searchByGremlin
(
gremlinQuery
);
JSONArray
results
=
response
.
getJSONArray
(
MetadataServiceClient
.
RESULTS
);
Assert
.
assertEquals
(
results
.
length
(),
1
);
}
private
String
normalize
(
String
str
)
{
if
(
StringUtils
.
isEmpty
(
str
))
{
return
null
;
}
return
StringEscapeUtils
.
escapeJava
(
str
.
toLowerCase
());
}
private
String
assertTableIsRegistered
(
String
dbName
,
String
tableName
)
throws
Exception
{
...
...
repository/src/main/java/org/apache/hadoop/metadata/repository/typestore/GraphBackedTypeStore.java
View file @
6e9d6948
...
...
@@ -281,12 +281,14 @@ public class GraphBackedTypeStore implements ITypeStore {
private
AttributeDefinition
[]
getAttributes
(
Vertex
vertex
,
String
typeName
)
throws
MetadataException
{
List
<
AttributeDefinition
>
attributes
=
new
ArrayList
<>();
List
<
String
>
attrNames
=
vertex
.
getProperty
(
getPropertyKey
(
typeName
));
for
(
String
attrName
:
attrNames
)
{
try
{
String
propertyKey
=
getPropertyKey
(
typeName
,
attrName
);
attributes
.
add
(
AttributeInfo
.
fromJson
((
String
)
vertex
.
getProperty
(
propertyKey
)));
}
catch
(
JSONException
e
)
{
throw
new
MetadataException
(
e
);
if
(
attrNames
!=
null
)
{
for
(
String
attrName
:
attrNames
)
{
try
{
String
propertyKey
=
getPropertyKey
(
typeName
,
attrName
);
attributes
.
add
(
AttributeInfo
.
fromJson
((
String
)
vertex
.
getProperty
(
propertyKey
)));
}
catch
(
JSONException
e
)
{
throw
new
MetadataException
(
e
);
}
}
}
return
attributes
.
toArray
(
new
AttributeDefinition
[
attributes
.
size
()]);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment