Commit 47894075 by Suma S

Merge pull request #120 from shwethags/ts

added table rename hive operation
parents 36a1f612 e4991a54
...@@ -156,9 +156,7 @@ public class HiveHook implements ExecuteWithHookContext { ...@@ -156,9 +156,7 @@ public class HiveHook implements ExecuteWithHookContext {
event.queryPlan = hookContext.getQueryPlan(); event.queryPlan = hookContext.getQueryPlan();
event.hookType = hookContext.getHookType(); event.hookType = hookContext.getHookType();
//todo throws NPE event.jsonPlan = getQueryPlan(event);
// event.jsonPlan = getQueryPlan(event);
event.jsonPlan = new JSONObject();
if (debug) { if (debug) {
fireAndForget(event); fireAndForget(event);
...@@ -205,10 +203,53 @@ public class HiveHook implements ExecuteWithHookContext { ...@@ -205,10 +203,53 @@ public class HiveHook implements ExecuteWithHookContext {
registerProcess(dgiBridge, event); registerProcess(dgiBridge, event);
break; break;
case ALTERTABLE_RENAME:
case ALTERVIEW_RENAME:
renameTable(dgiBridge, event);
break;
case ALTERVIEW_AS:
//update inputs/outputs?
break;
case ALTERTABLE_ADDCOLS:
case ALTERTABLE_REPLACECOLS:
case ALTERTABLE_RENAMECOL:
break;
default: default:
} }
} }
private void renameTable(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
//crappy, no easy of getting new name
assert event.inputs != null && event.inputs.size() == 1;
assert event.outputs != null && event.outputs.size() > 0;
Table oldTable = event.inputs.iterator().next().getTable();
Table newTable = null;
for (WriteEntity writeEntity : event.outputs) {
if (writeEntity.getType() == Entity.Type.TABLE) {
Table table = writeEntity.getTable();
if (table.getDbName().equals(oldTable.getDbName()) && !table.getTableName()
.equals(oldTable.getTableName())) {
newTable = table;
break;
}
}
}
if (newTable == null) {
LOG.warn("Failed to deduct new name for " + event.queryPlan.getQueryStr());
return;
}
Referenceable dbReferenceable = dgiBridge.registerDatabase(oldTable.getDbName().toLowerCase());
Referenceable tableReferenceable =
dgiBridge.registerTable(dbReferenceable, oldTable.getDbName(), oldTable.getTableName());
dgiBridge.getMetadataServiceClient().updateEntity(tableReferenceable.getId()._getId(), "name",
newTable.getTableName());
}
private void handleCreateTable(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception { private void handleCreateTable(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
for (WriteEntity entity : event.outputs) { for (WriteEntity entity : event.outputs) {
if (entity.getType() == Entity.Type.TABLE) { if (entity.getType() == Entity.Type.TABLE) {
...@@ -259,6 +300,9 @@ public class HiveHook implements ExecuteWithHookContext { ...@@ -259,6 +300,9 @@ public class HiveHook implements ExecuteWithHookContext {
String dbName = table.getDbName().toLowerCase(); String dbName = table.getDbName().toLowerCase();
source.add(dgiBridge.registerTable(dbName, table.getTableName())); source.add(dgiBridge.registerTable(dbName, table.getTableName()));
} }
if (readEntity.getType() == Entity.Type.PARTITION) {
dgiBridge.registerPartition(readEntity.getPartition());
}
} }
processReferenceable.set("inputTables", source); processReferenceable.set("inputTables", source);
List<Referenceable> target = new ArrayList<>(); List<Referenceable> target = new ArrayList<>();
...@@ -285,9 +329,14 @@ public class HiveHook implements ExecuteWithHookContext { ...@@ -285,9 +329,14 @@ public class HiveHook implements ExecuteWithHookContext {
private JSONObject getQueryPlan(HiveEvent event) throws Exception { private JSONObject getQueryPlan(HiveEvent event) throws Exception {
ExplainTask explain = new ExplainTask(); try {
explain.initialize(event.conf, event.queryPlan, null); ExplainTask explain = new ExplainTask();
List<Task<?>> rootTasks = event.queryPlan.getRootTasks(); explain.initialize(event.conf, event.queryPlan, null);
return explain.getJSONPlan(null, null, rootTasks, event.queryPlan.getFetchTask(), true, false, false); List<Task<?>> rootTasks = event.queryPlan.getRootTasks();
return explain.getJSONPlan(null, null, rootTasks, event.queryPlan.getFetchTask(), true, false, false);
} catch(Exception e) {
LOG.warn("Failed to get queryplan", e);
return new JSONObject();
}
} }
} }
...@@ -38,6 +38,7 @@ import java.util.Map; ...@@ -38,6 +38,7 @@ import java.util.Map;
public class HiveHookIT { public class HiveHookIT {
private static final String DGI_URL = "http://localhost:21000/"; private static final String DGI_URL = "http://localhost:21000/";
private static final String CLUSTER_NAME = "test"; private static final String CLUSTER_NAME = "test";
public static final String DEFAULT_DB = "default";
private Driver driver; private Driver driver;
private MetadataServiceClient dgiCLient; private MetadataServiceClient dgiCLient;
private SessionState ss; private SessionState ss;
...@@ -92,53 +93,70 @@ public class HiveHookIT { ...@@ -92,53 +93,70 @@ public class HiveHookIT {
assertDatabaseIsRegistered(dbName); assertDatabaseIsRegistered(dbName);
} }
@Test private String dbName() {
public void testCreateTable() throws Exception { return "db" + random();
String dbName = "db" + random(); }
private String createDatabase() throws Exception {
String dbName = dbName();
runCommand("create database " + dbName); runCommand("create database " + dbName);
return dbName;
}
private String tableName() {
return "table" + random();
}
private String createTable() throws Exception {
return createTable(true);
}
String tableName = "table" + random(); private String createTable(boolean partition) throws Exception {
String tableName = tableName();
runCommand("create table " + tableName + "(id int, name string)" + (partition ? " partitioned by(dt string)"
: ""));
return tableName;
}
@Test
public void testCreateTable() throws Exception {
String tableName = tableName();
String dbName = createDatabase();
runCommand("create table " + dbName + "." + tableName + "(id int, name string)"); runCommand("create table " + dbName + "." + tableName + "(id int, name string)");
assertTableIsRegistered(dbName, tableName); assertTableIsRegistered(dbName, tableName);
tableName = "table" + random(); tableName = createTable();
runCommand("create table " + tableName + "(id int, name string) partitioned by(dt string)"); assertTableIsRegistered(DEFAULT_DB, tableName);
assertTableIsRegistered("default", tableName);
//Create table where database doesn't exist, will create database instance as well //Create table where database doesn't exist, will create database instance as well
assertDatabaseIsRegistered("default"); assertDatabaseIsRegistered(DEFAULT_DB);
} }
@Test @Test
public void testCTAS() throws Exception { public void testCTAS() throws Exception {
String tableName = "table" + random(); String tableName = createTable();
runCommand("create table " + tableName + "(id int, name string)");
String ctasTableName = "table" + random(); String ctasTableName = "table" + random();
String query = "create table " + ctasTableName + " as select * from " + tableName; String query = "create table " + ctasTableName + " as select * from " + tableName;
runCommand(query); runCommand(query);
assertTableIsRegistered("default", ctasTableName); assertTableIsRegistered(DEFAULT_DB, ctasTableName);
assertProcessIsRegistered(query); assertProcessIsRegistered(query);
} }
@Test @Test
public void testCreateView() throws Exception { public void testCreateView() throws Exception {
String tableName = "table" + random(); String tableName = createTable();
runCommand("create table " + tableName + "(id int, name string)"); String viewName = tableName();
String viewName = "table" + random();
String query = "create view " + viewName + " as select * from " + tableName; String query = "create view " + viewName + " as select * from " + tableName;
runCommand(query); runCommand(query);
assertTableIsRegistered("default", viewName); assertTableIsRegistered(DEFAULT_DB, viewName);
assertProcessIsRegistered(query); assertProcessIsRegistered(query);
} }
@Test @Test
public void testLoadData() throws Exception { public void testLoadData() throws Exception {
String tableName = "table" + random(); String tableName = createTable(false);
runCommand("create table " + tableName + "(id int, name string)");
String loadFile = file("load"); String loadFile = file("load");
String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName; String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName;
...@@ -149,18 +167,14 @@ public class HiveHookIT { ...@@ -149,18 +167,14 @@ public class HiveHookIT {
@Test @Test
public void testInsert() throws Exception { public void testInsert() throws Exception {
String tableName = "table" + random(); String tableName = createTable();
runCommand("create table " + tableName + "(id int, name string) partitioned by(dt string)"); String insertTableName = createTable();
String query = "insert into " + insertTableName + " partition(dt = '2015-01-01') select id, name from "
String insertTableName = "table" + random();
runCommand("create table " + insertTableName + "(name string) partitioned by(dt string)");
String query = "insert into " + insertTableName + " partition(dt = '2015-01-01') select name from "
+ tableName + " where dt = '2015-01-01'"; + tableName + " where dt = '2015-01-01'";
runCommand(query); runCommand(query);
assertProcessIsRegistered(query); assertProcessIsRegistered(query);
assertPartitionIsRegistered("default", insertTableName, "2015-01-01"); assertPartitionIsRegistered(DEFAULT_DB, insertTableName, "2015-01-01");
} }
private String random() { private String random() {
...@@ -183,16 +197,14 @@ public class HiveHookIT { ...@@ -183,16 +197,14 @@ public class HiveHookIT {
@Test @Test
public void testExportImport() throws Exception { public void testExportImport() throws Exception {
String tableName = "table" + random(); String tableName = createTable(false);
runCommand("create table " + tableName + "(name string)");
String filename = "pfile://" + mkdir("export"); String filename = "pfile://" + mkdir("export");
String query = "export table " + tableName + " to '" + filename + "'"; String query = "export table " + tableName + " to '" + filename + "'";
runCommand(query); runCommand(query);
assertProcessIsRegistered(query); assertProcessIsRegistered(query);
tableName = "table" + random(); tableName = createTable(false);
runCommand("create table " + tableName + "(name string)");
query = "import table " + tableName + " from '" + filename + "'"; query = "import table " + tableName + " from '" + filename + "'";
runCommand(query); runCommand(query);
...@@ -201,29 +213,61 @@ public class HiveHookIT { ...@@ -201,29 +213,61 @@ public class HiveHookIT {
@Test @Test
public void testSelect() throws Exception { public void testSelect() throws Exception {
String tableName = "table" + random(); String tableName = createTable();
runCommand("create table " + tableName + "(id int, name string)");
String query = "select * from " + tableName; String query = "select * from " + tableName;
runCommand(query); runCommand(query);
assertProcessIsRegistered(query); assertProcessIsRegistered(query);
} }
@Test
public void testAlterTable() throws Exception {
String tableName = createTable();
String newName = tableName();
String query = "alter table " + tableName + " rename to " + newName;
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, newName);
assertTableIsNotRegistered(DEFAULT_DB, tableName);
}
@Test
public void testAlterView() throws Exception {
String tableName = createTable();
String viewName = tableName();
String newName = tableName();
String query = "create view " + viewName + " as select * from " + tableName;
runCommand(query);
query = "alter view " + viewName + " rename to " + newName;
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, newName);
assertTableIsNotRegistered(DEFAULT_DB, viewName);
}
private void assertProcessIsRegistered(String queryStr) throws Exception { private void assertProcessIsRegistered(String queryStr) throws Exception {
String dslQuery = String.format("%s where queryText = \"%s\"", HiveDataTypes.HIVE_PROCESS.getName(), queryStr); String dslQuery = String.format("%s where queryText = \"%s\"", HiveDataTypes.HIVE_PROCESS.getName(), queryStr);
assertEntityIsRegistered(dslQuery); assertEntityIsRegistered(dslQuery, true);
} }
private void assertTableIsRegistered(String dbName, String tableName) throws Exception { private String assertTableIsRegistered(String dbName, String tableName) throws Exception {
return assertTableIsRegistered(dbName, tableName, true);
}
private String assertTableIsNotRegistered(String dbName, String tableName) throws Exception {
return assertTableIsRegistered(dbName, tableName, false);
}
private String assertTableIsRegistered(String dbName, String tableName, boolean registered) throws Exception {
String query = String.format("%s as t where name = '%s', dbName where name = '%s' and clusterName = '%s'" String query = String.format("%s as t where name = '%s', dbName where name = '%s' and clusterName = '%s'"
+ " select t", HiveDataTypes.HIVE_TABLE.getName(), tableName, dbName, CLUSTER_NAME); + " select t", HiveDataTypes.HIVE_TABLE.getName(), tableName, dbName, CLUSTER_NAME);
assertEntityIsRegistered(query); return assertEntityIsRegistered(query, registered);
} }
private String assertDatabaseIsRegistered(String dbName) throws Exception { private String assertDatabaseIsRegistered(String dbName) throws Exception {
String query = String.format("%s where name = '%s' and clusterName = '%s'", HiveDataTypes.HIVE_DB.getName(), String query = String.format("%s where name = '%s' and clusterName = '%s'", HiveDataTypes.HIVE_DB.getName(),
dbName, CLUSTER_NAME); dbName, CLUSTER_NAME);
return assertEntityIsRegistered(query); return assertEntityIsRegistered(query, true);
} }
private void assertPartitionIsRegistered(String dbName, String tableName, String value) throws Exception { private void assertPartitionIsRegistered(String dbName, String tableName, String value) throws Exception {
...@@ -240,14 +284,19 @@ public class HiveHookIT { ...@@ -240,14 +284,19 @@ public class HiveHookIT {
Assert.assertEquals(results.length(), 1); Assert.assertEquals(results.length(), 1);
} }
private String assertEntityIsRegistered(String dslQuery) throws Exception{ private String assertEntityIsRegistered(String dslQuery, boolean registered) throws Exception{
JSONArray results = dgiCLient.searchByDSL(dslQuery); JSONArray results = dgiCLient.searchByDSL(dslQuery);
Assert.assertEquals(results.length(), 1); if (registered) {
JSONObject row = results.getJSONObject(0); Assert.assertEquals(results.length(), 1);
if (row.has("$id$")) { JSONObject row = results.getJSONObject(0);
return row.getJSONObject("$id$").getString("id"); if (row.has("$id$")) {
return row.getJSONObject("$id$").getString("id");
} else {
return row.getJSONObject("_col_0").getString("id");
}
} else { } else {
return row.getJSONObject("_col_0").getString("id"); Assert.assertEquals(results.length(), 0);
return null;
} }
} }
} }
...@@ -34,7 +34,6 @@ import org.slf4j.Logger; ...@@ -34,7 +34,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.ws.rs.HttpMethod; import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder; import javax.ws.rs.core.UriBuilder;
...@@ -69,6 +68,8 @@ public class MetadataServiceClient { ...@@ -69,6 +68,8 @@ public class MetadataServiceClient {
public static final String QUERY = "query"; public static final String QUERY = "query";
public static final String QUERY_TYPE = "queryType"; public static final String QUERY_TYPE = "queryType";
public static final String ATTRIBUTE_NAME = "property";
public static final String ATTRIBUTE_VALUE = "value";
private WebResource service; private WebResource service;
...@@ -207,6 +208,19 @@ public class MetadataServiceClient { ...@@ -207,6 +208,19 @@ public class MetadataServiceClient {
} }
} }
/**
* Updates property for the entity corresponding to guid
* @param guid
* @param property
* @param value
*/
public JSONObject updateEntity(String guid, String property, String value) throws MetadataServiceException {
WebResource resource = getResource(API.UPDATE_ENTITY, guid);
resource = resource.queryParam(ATTRIBUTE_NAME, property);
resource = resource.queryParam(ATTRIBUTE_VALUE, value);
return callAPIWithResource(API.UPDATE_ENTITY, resource);
}
public JSONObject searchEntity(String searchQuery) throws MetadataServiceException { public JSONObject searchEntity(String searchQuery) throws MetadataServiceException {
WebResource resource = getResource(API.SEARCH); WebResource resource = getResource(API.SEARCH);
resource = resource.queryParam(QUERY, searchQuery); resource = resource.queryParam(QUERY, searchQuery);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment