Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
atlas
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
dataplatform
atlas
Commits
e4991a54
Commit
e4991a54
authored
May 30, 2015
by
Shwetha GS
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
added table rename hive operation
parent
6bc7f5c7
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
155 additions
and
43 deletions
+155
-43
HiveHook.java
...n/java/org/apache/hadoop/metadata/hive/hook/HiveHook.java
+52
-3
HiveHookIT.java
...java/org/apache/hadoop/metadata/hive/hook/HiveHookIT.java
+88
-39
MetadataServiceClient.java
...ava/org/apache/hadoop/metadata/MetadataServiceClient.java
+15
-1
No files found.
addons/hive-bridge/src/main/java/org/apache/hadoop/metadata/hive/hook/HiveHook.java
View file @
e4991a54
...
...
@@ -156,9 +156,7 @@ public class HiveHook implements ExecuteWithHookContext {
event
.
queryPlan
=
hookContext
.
getQueryPlan
();
event
.
hookType
=
hookContext
.
getHookType
();
//todo throws NPE
// event.jsonPlan = getQueryPlan(event);
event
.
jsonPlan
=
new
JSONObject
();
event
.
jsonPlan
=
getQueryPlan
(
event
);
if
(
debug
)
{
fireAndForget
(
event
);
...
...
@@ -205,10 +203,53 @@ public class HiveHook implements ExecuteWithHookContext {
registerProcess
(
dgiBridge
,
event
);
break
;
case
ALTERTABLE_RENAME:
case
ALTERVIEW_RENAME:
renameTable
(
dgiBridge
,
event
);
break
;
case
ALTERVIEW_AS:
//update inputs/outputs?
break
;
case
ALTERTABLE_ADDCOLS:
case
ALTERTABLE_REPLACECOLS:
case
ALTERTABLE_RENAMECOL:
break
;
default
:
}
}
private
void
renameTable
(
HiveMetaStoreBridge
dgiBridge
,
HiveEvent
event
)
throws
Exception
{
//crappy, no easy of getting new name
assert
event
.
inputs
!=
null
&&
event
.
inputs
.
size
()
==
1
;
assert
event
.
outputs
!=
null
&&
event
.
outputs
.
size
()
>
0
;
Table
oldTable
=
event
.
inputs
.
iterator
().
next
().
getTable
();
Table
newTable
=
null
;
for
(
WriteEntity
writeEntity
:
event
.
outputs
)
{
if
(
writeEntity
.
getType
()
==
Entity
.
Type
.
TABLE
)
{
Table
table
=
writeEntity
.
getTable
();
if
(
table
.
getDbName
().
equals
(
oldTable
.
getDbName
())
&&
!
table
.
getTableName
()
.
equals
(
oldTable
.
getTableName
()))
{
newTable
=
table
;
break
;
}
}
}
if
(
newTable
==
null
)
{
LOG
.
warn
(
"Failed to deduct new name for "
+
event
.
queryPlan
.
getQueryStr
());
return
;
}
Referenceable
dbReferenceable
=
dgiBridge
.
registerDatabase
(
oldTable
.
getDbName
().
toLowerCase
());
Referenceable
tableReferenceable
=
dgiBridge
.
registerTable
(
dbReferenceable
,
oldTable
.
getDbName
(),
oldTable
.
getTableName
());
dgiBridge
.
getMetadataServiceClient
().
updateEntity
(
tableReferenceable
.
getId
().
_getId
(),
"name"
,
newTable
.
getTableName
());
}
private
void
handleCreateTable
(
HiveMetaStoreBridge
dgiBridge
,
HiveEvent
event
)
throws
Exception
{
for
(
WriteEntity
entity
:
event
.
outputs
)
{
if
(
entity
.
getType
()
==
Entity
.
Type
.
TABLE
)
{
...
...
@@ -259,6 +300,9 @@ public class HiveHook implements ExecuteWithHookContext {
String
dbName
=
table
.
getDbName
().
toLowerCase
();
source
.
add
(
dgiBridge
.
registerTable
(
dbName
,
table
.
getTableName
()));
}
if
(
readEntity
.
getType
()
==
Entity
.
Type
.
PARTITION
)
{
dgiBridge
.
registerPartition
(
readEntity
.
getPartition
());
}
}
processReferenceable
.
set
(
"inputTables"
,
source
);
List
<
Referenceable
>
target
=
new
ArrayList
<>();
...
...
@@ -285,9 +329,14 @@ public class HiveHook implements ExecuteWithHookContext {
private
JSONObject
getQueryPlan
(
HiveEvent
event
)
throws
Exception
{
try
{
ExplainTask
explain
=
new
ExplainTask
();
explain
.
initialize
(
event
.
conf
,
event
.
queryPlan
,
null
);
List
<
Task
<?>>
rootTasks
=
event
.
queryPlan
.
getRootTasks
();
return
explain
.
getJSONPlan
(
null
,
null
,
rootTasks
,
event
.
queryPlan
.
getFetchTask
(),
true
,
false
,
false
);
}
catch
(
Exception
e
)
{
LOG
.
warn
(
"Failed to get queryplan"
,
e
);
return
new
JSONObject
();
}
}
}
addons/hive-bridge/src/test/java/org/apache/hadoop/metadata/hive/hook/HiveHookIT.java
View file @
e4991a54
...
...
@@ -38,6 +38,7 @@ import java.util.Map;
public
class
HiveHookIT
{
private
static
final
String
DGI_URL
=
"http://localhost:21000/"
;
private
static
final
String
CLUSTER_NAME
=
"test"
;
public
static
final
String
DEFAULT_DB
=
"default"
;
private
Driver
driver
;
private
MetadataServiceClient
dgiCLient
;
private
SessionState
ss
;
...
...
@@ -92,53 +93,70 @@ public class HiveHookIT {
assertDatabaseIsRegistered
(
dbName
);
}
@Test
public
void
testCreateTable
()
throws
Exception
{
String
dbName
=
"db"
+
random
();
private
String
dbName
()
{
return
"db"
+
random
();
}
private
String
createDatabase
()
throws
Exception
{
String
dbName
=
dbName
();
runCommand
(
"create database "
+
dbName
);
return
dbName
;
}
private
String
tableName
()
{
return
"table"
+
random
();
}
private
String
createTable
()
throws
Exception
{
return
createTable
(
true
);
}
String
tableName
=
"table"
+
random
();
private
String
createTable
(
boolean
partition
)
throws
Exception
{
String
tableName
=
tableName
();
runCommand
(
"create table "
+
tableName
+
"(id int, name string)"
+
(
partition
?
" partitioned by(dt string)"
:
""
));
return
tableName
;
}
@Test
public
void
testCreateTable
()
throws
Exception
{
String
tableName
=
tableName
();
String
dbName
=
createDatabase
();
runCommand
(
"create table "
+
dbName
+
"."
+
tableName
+
"(id int, name string)"
);
assertTableIsRegistered
(
dbName
,
tableName
);
tableName
=
"table"
+
random
();
runCommand
(
"create table "
+
tableName
+
"(id int, name string) partitioned by(dt string)"
);
assertTableIsRegistered
(
"default"
,
tableName
);
tableName
=
createTable
();
assertTableIsRegistered
(
DEFAULT_DB
,
tableName
);
//Create table where database doesn't exist, will create database instance as well
assertDatabaseIsRegistered
(
"default"
);
assertDatabaseIsRegistered
(
DEFAULT_DB
);
}
@Test
public
void
testCTAS
()
throws
Exception
{
String
tableName
=
"table"
+
random
();
runCommand
(
"create table "
+
tableName
+
"(id int, name string)"
);
String
tableName
=
createTable
();
String
ctasTableName
=
"table"
+
random
();
String
query
=
"create table "
+
ctasTableName
+
" as select * from "
+
tableName
;
runCommand
(
query
);
assertTableIsRegistered
(
"default"
,
ctasTableName
);
assertTableIsRegistered
(
DEFAULT_DB
,
ctasTableName
);
assertProcessIsRegistered
(
query
);
}
@Test
public
void
testCreateView
()
throws
Exception
{
String
tableName
=
"table"
+
random
();
runCommand
(
"create table "
+
tableName
+
"(id int, name string)"
);
String
viewName
=
"table"
+
random
();
String
tableName
=
createTable
();
String
viewName
=
tableName
();
String
query
=
"create view "
+
viewName
+
" as select * from "
+
tableName
;
runCommand
(
query
);
assertTableIsRegistered
(
"default"
,
viewName
);
assertTableIsRegistered
(
DEFAULT_DB
,
viewName
);
assertProcessIsRegistered
(
query
);
}
@Test
public
void
testLoadData
()
throws
Exception
{
String
tableName
=
"table"
+
random
();
runCommand
(
"create table "
+
tableName
+
"(id int, name string)"
);
String
tableName
=
createTable
(
false
);
String
loadFile
=
file
(
"load"
);
String
query
=
"load data local inpath 'file://"
+
loadFile
+
"' into table "
+
tableName
;
...
...
@@ -149,18 +167,14 @@ public class HiveHookIT {
@Test
public
void
testInsert
()
throws
Exception
{
String
tableName
=
"table"
+
random
();
runCommand
(
"create table "
+
tableName
+
"(id int, name string) partitioned by(dt string)"
);
String
insertTableName
=
"table"
+
random
();
runCommand
(
"create table "
+
insertTableName
+
"(name string) partitioned by(dt string)"
);
String
query
=
"insert into "
+
insertTableName
+
" partition(dt = '2015-01-01') select name from "
String
tableName
=
createTable
();
String
insertTableName
=
createTable
();
String
query
=
"insert into "
+
insertTableName
+
" partition(dt = '2015-01-01') select id, name from "
+
tableName
+
" where dt = '2015-01-01'"
;
runCommand
(
query
);
assertProcessIsRegistered
(
query
);
assertPartitionIsRegistered
(
"default"
,
insertTableName
,
"2015-01-01"
);
assertPartitionIsRegistered
(
DEFAULT_DB
,
insertTableName
,
"2015-01-01"
);
}
private
String
random
()
{
...
...
@@ -183,16 +197,14 @@ public class HiveHookIT {
@Test
public
void
testExportImport
()
throws
Exception
{
String
tableName
=
"table"
+
random
();
runCommand
(
"create table "
+
tableName
+
"(name string)"
);
String
tableName
=
createTable
(
false
);
String
filename
=
"pfile://"
+
mkdir
(
"export"
);
String
query
=
"export table "
+
tableName
+
" to '"
+
filename
+
"'"
;
runCommand
(
query
);
assertProcessIsRegistered
(
query
);
tableName
=
"table"
+
random
();
runCommand
(
"create table "
+
tableName
+
"(name string)"
);
tableName
=
createTable
(
false
);
query
=
"import table "
+
tableName
+
" from '"
+
filename
+
"'"
;
runCommand
(
query
);
...
...
@@ -201,29 +213,61 @@ public class HiveHookIT {
@Test
public
void
testSelect
()
throws
Exception
{
String
tableName
=
"table"
+
random
();
runCommand
(
"create table "
+
tableName
+
"(id int, name string)"
);
String
tableName
=
createTable
();
String
query
=
"select * from "
+
tableName
;
runCommand
(
query
);
assertProcessIsRegistered
(
query
);
}
@Test
public
void
testAlterTable
()
throws
Exception
{
String
tableName
=
createTable
();
String
newName
=
tableName
();
String
query
=
"alter table "
+
tableName
+
" rename to "
+
newName
;
runCommand
(
query
);
assertTableIsRegistered
(
DEFAULT_DB
,
newName
);
assertTableIsNotRegistered
(
DEFAULT_DB
,
tableName
);
}
@Test
public
void
testAlterView
()
throws
Exception
{
String
tableName
=
createTable
();
String
viewName
=
tableName
();
String
newName
=
tableName
();
String
query
=
"create view "
+
viewName
+
" as select * from "
+
tableName
;
runCommand
(
query
);
query
=
"alter view "
+
viewName
+
" rename to "
+
newName
;
runCommand
(
query
);
assertTableIsRegistered
(
DEFAULT_DB
,
newName
);
assertTableIsNotRegistered
(
DEFAULT_DB
,
viewName
);
}
private
void
assertProcessIsRegistered
(
String
queryStr
)
throws
Exception
{
String
dslQuery
=
String
.
format
(
"%s where queryText = \"%s\""
,
HiveDataTypes
.
HIVE_PROCESS
.
getName
(),
queryStr
);
assertEntityIsRegistered
(
dslQuery
);
assertEntityIsRegistered
(
dslQuery
,
true
);
}
private
String
assertTableIsRegistered
(
String
dbName
,
String
tableName
)
throws
Exception
{
return
assertTableIsRegistered
(
dbName
,
tableName
,
true
);
}
private
void
assertTableIsRegistered
(
String
dbName
,
String
tableName
)
throws
Exception
{
private
String
assertTableIsNotRegistered
(
String
dbName
,
String
tableName
)
throws
Exception
{
return
assertTableIsRegistered
(
dbName
,
tableName
,
false
);
}
private
String
assertTableIsRegistered
(
String
dbName
,
String
tableName
,
boolean
registered
)
throws
Exception
{
String
query
=
String
.
format
(
"%s as t where name = '%s', dbName where name = '%s' and clusterName = '%s'"
+
" select t"
,
HiveDataTypes
.
HIVE_TABLE
.
getName
(),
tableName
,
dbName
,
CLUSTER_NAME
);
assertEntityIsRegistered
(
query
);
return
assertEntityIsRegistered
(
query
,
registered
);
}
private
String
assertDatabaseIsRegistered
(
String
dbName
)
throws
Exception
{
String
query
=
String
.
format
(
"%s where name = '%s' and clusterName = '%s'"
,
HiveDataTypes
.
HIVE_DB
.
getName
(),
dbName
,
CLUSTER_NAME
);
return
assertEntityIsRegistered
(
query
);
return
assertEntityIsRegistered
(
query
,
true
);
}
private
void
assertPartitionIsRegistered
(
String
dbName
,
String
tableName
,
String
value
)
throws
Exception
{
...
...
@@ -240,8 +284,9 @@ public class HiveHookIT {
Assert
.
assertEquals
(
results
.
length
(),
1
);
}
private
String
assertEntityIsRegistered
(
String
dslQuery
)
throws
Exception
{
private
String
assertEntityIsRegistered
(
String
dslQuery
,
boolean
registered
)
throws
Exception
{
JSONArray
results
=
dgiCLient
.
searchByDSL
(
dslQuery
);
if
(
registered
)
{
Assert
.
assertEquals
(
results
.
length
(),
1
);
JSONObject
row
=
results
.
getJSONObject
(
0
);
if
(
row
.
has
(
"$id$"
))
{
...
...
@@ -249,5 +294,9 @@ public class HiveHookIT {
}
else
{
return
row
.
getJSONObject
(
"_col_0"
).
getString
(
"id"
);
}
}
else
{
Assert
.
assertEquals
(
results
.
length
(),
0
);
return
null
;
}
}
}
client/src/main/java/org/apache/hadoop/metadata/MetadataServiceClient.java
View file @
e4991a54
...
...
@@ -34,7 +34,6 @@ import org.slf4j.Logger;
import
org.slf4j.LoggerFactory
;
import
javax.ws.rs.HttpMethod
;
import
javax.ws.rs.POST
;
import
javax.ws.rs.core.MediaType
;
import
javax.ws.rs.core.Response
;
import
javax.ws.rs.core.UriBuilder
;
...
...
@@ -69,6 +68,8 @@ public class MetadataServiceClient {
public
static
final
String
QUERY
=
"query"
;
public
static
final
String
QUERY_TYPE
=
"queryType"
;
public
static
final
String
ATTRIBUTE_NAME
=
"property"
;
public
static
final
String
ATTRIBUTE_VALUE
=
"value"
;
private
WebResource
service
;
...
...
@@ -207,6 +208,19 @@ public class MetadataServiceClient {
}
}
/**
* Updates property for the entity corresponding to guid
* @param guid
* @param property
* @param value
*/
public
JSONObject
updateEntity
(
String
guid
,
String
property
,
String
value
)
throws
MetadataServiceException
{
WebResource
resource
=
getResource
(
API
.
UPDATE_ENTITY
,
guid
);
resource
=
resource
.
queryParam
(
ATTRIBUTE_NAME
,
property
);
resource
=
resource
.
queryParam
(
ATTRIBUTE_VALUE
,
value
);
return
callAPIWithResource
(
API
.
UPDATE_ENTITY
,
resource
);
}
public
JSONObject
searchEntity
(
String
searchQuery
)
throws
MetadataServiceException
{
WebResource
resource
=
getResource
(
API
.
SEARCH
);
resource
=
resource
.
queryParam
(
QUERY
,
searchQuery
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment