Commit 45c22514 by Shwetha GS

added table rename hive operation

parent 381bc538
......@@ -156,9 +156,7 @@ public class HiveHook implements ExecuteWithHookContext {
event.queryPlan = hookContext.getQueryPlan();
event.hookType = hookContext.getHookType();
//todo throws NPE
// event.jsonPlan = getQueryPlan(event);
event.jsonPlan = new JSONObject();
event.jsonPlan = getQueryPlan(event);
if (debug) {
fireAndForget(event);
......@@ -205,10 +203,53 @@ public class HiveHook implements ExecuteWithHookContext {
registerProcess(dgiBridge, event);
break;
case ALTERTABLE_RENAME:
case ALTERVIEW_RENAME:
renameTable(dgiBridge, event);
break;
case ALTERVIEW_AS:
//update inputs/outputs?
break;
case ALTERTABLE_ADDCOLS:
case ALTERTABLE_REPLACECOLS:
case ALTERTABLE_RENAMECOL:
break;
default:
}
}
private void renameTable(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
//crappy, no easy of getting new name
assert event.inputs != null && event.inputs.size() == 1;
assert event.outputs != null && event.outputs.size() > 0;
Table oldTable = event.inputs.iterator().next().getTable();
Table newTable = null;
for (WriteEntity writeEntity : event.outputs) {
if (writeEntity.getType() == Entity.Type.TABLE) {
Table table = writeEntity.getTable();
if (table.getDbName().equals(oldTable.getDbName()) && !table.getTableName()
.equals(oldTable.getTableName())) {
newTable = table;
break;
}
}
}
if (newTable == null) {
LOG.warn("Failed to deduct new name for " + event.queryPlan.getQueryStr());
return;
}
Referenceable dbReferenceable = dgiBridge.registerDatabase(oldTable.getDbName().toLowerCase());
Referenceable tableReferenceable =
dgiBridge.registerTable(dbReferenceable, oldTable.getDbName(), oldTable.getTableName());
dgiBridge.getMetadataServiceClient().updateEntity(tableReferenceable.getId()._getId(), "name",
newTable.getTableName());
}
private void handleCreateTable(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
for (WriteEntity entity : event.outputs) {
if (entity.getType() == Entity.Type.TABLE) {
......@@ -259,6 +300,9 @@ public class HiveHook implements ExecuteWithHookContext {
String dbName = table.getDbName().toLowerCase();
source.add(dgiBridge.registerTable(dbName, table.getTableName()));
}
if (readEntity.getType() == Entity.Type.PARTITION) {
dgiBridge.registerPartition(readEntity.getPartition());
}
}
processReferenceable.set("inputTables", source);
List<Referenceable> target = new ArrayList<>();
......@@ -285,9 +329,14 @@ public class HiveHook implements ExecuteWithHookContext {
private JSONObject getQueryPlan(HiveEvent event) throws Exception {
ExplainTask explain = new ExplainTask();
explain.initialize(event.conf, event.queryPlan, null);
List<Task<?>> rootTasks = event.queryPlan.getRootTasks();
return explain.getJSONPlan(null, null, rootTasks, event.queryPlan.getFetchTask(), true, false, false);
try {
ExplainTask explain = new ExplainTask();
explain.initialize(event.conf, event.queryPlan, null);
List<Task<?>> rootTasks = event.queryPlan.getRootTasks();
return explain.getJSONPlan(null, null, rootTasks, event.queryPlan.getFetchTask(), true, false, false);
} catch(Exception e) {
LOG.warn("Failed to get queryplan", e);
return new JSONObject();
}
}
}
......@@ -38,6 +38,7 @@ import java.util.Map;
public class HiveHookIT {
private static final String DGI_URL = "http://localhost:21000/";
private static final String CLUSTER_NAME = "test";
public static final String DEFAULT_DB = "default";
private Driver driver;
private MetadataServiceClient dgiCLient;
private SessionState ss;
......@@ -92,53 +93,70 @@ public class HiveHookIT {
assertDatabaseIsRegistered(dbName);
}
@Test
public void testCreateTable() throws Exception {
String dbName = "db" + random();
private String dbName() {
return "db" + random();
}
private String createDatabase() throws Exception {
String dbName = dbName();
runCommand("create database " + dbName);
return dbName;
}
private String tableName() {
return "table" + random();
}
private String createTable() throws Exception {
return createTable(true);
}
String tableName = "table" + random();
private String createTable(boolean partition) throws Exception {
String tableName = tableName();
runCommand("create table " + tableName + "(id int, name string)" + (partition ? " partitioned by(dt string)"
: ""));
return tableName;
}
@Test
public void testCreateTable() throws Exception {
String tableName = tableName();
String dbName = createDatabase();
runCommand("create table " + dbName + "." + tableName + "(id int, name string)");
assertTableIsRegistered(dbName, tableName);
tableName = "table" + random();
runCommand("create table " + tableName + "(id int, name string) partitioned by(dt string)");
assertTableIsRegistered("default", tableName);
tableName = createTable();
assertTableIsRegistered(DEFAULT_DB, tableName);
//Create table where database doesn't exist, will create database instance as well
assertDatabaseIsRegistered("default");
assertDatabaseIsRegistered(DEFAULT_DB);
}
@Test
public void testCTAS() throws Exception {
String tableName = "table" + random();
runCommand("create table " + tableName + "(id int, name string)");
String tableName = createTable();
String ctasTableName = "table" + random();
String query = "create table " + ctasTableName + " as select * from " + tableName;
runCommand(query);
assertTableIsRegistered("default", ctasTableName);
assertTableIsRegistered(DEFAULT_DB, ctasTableName);
assertProcessIsRegistered(query);
}
@Test
public void testCreateView() throws Exception {
String tableName = "table" + random();
runCommand("create table " + tableName + "(id int, name string)");
String viewName = "table" + random();
String tableName = createTable();
String viewName = tableName();
String query = "create view " + viewName + " as select * from " + tableName;
runCommand(query);
assertTableIsRegistered("default", viewName);
assertTableIsRegistered(DEFAULT_DB, viewName);
assertProcessIsRegistered(query);
}
@Test
public void testLoadData() throws Exception {
String tableName = "table" + random();
runCommand("create table " + tableName + "(id int, name string)");
String tableName = createTable(false);
String loadFile = file("load");
String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName;
......@@ -149,18 +167,14 @@ public class HiveHookIT {
@Test
public void testInsert() throws Exception {
String tableName = "table" + random();
runCommand("create table " + tableName + "(id int, name string) partitioned by(dt string)");
String insertTableName = "table" + random();
runCommand("create table " + insertTableName + "(name string) partitioned by(dt string)");
String query = "insert into " + insertTableName + " partition(dt = '2015-01-01') select name from "
String tableName = createTable();
String insertTableName = createTable();
String query = "insert into " + insertTableName + " partition(dt = '2015-01-01') select id, name from "
+ tableName + " where dt = '2015-01-01'";
runCommand(query);
assertProcessIsRegistered(query);
assertPartitionIsRegistered("default", insertTableName, "2015-01-01");
assertPartitionIsRegistered(DEFAULT_DB, insertTableName, "2015-01-01");
}
private String random() {
......@@ -183,16 +197,14 @@ public class HiveHookIT {
@Test
public void testExportImport() throws Exception {
String tableName = "table" + random();
runCommand("create table " + tableName + "(name string)");
String tableName = createTable(false);
String filename = "pfile://" + mkdir("export");
String query = "export table " + tableName + " to '" + filename + "'";
runCommand(query);
assertProcessIsRegistered(query);
tableName = "table" + random();
runCommand("create table " + tableName + "(name string)");
tableName = createTable(false);
query = "import table " + tableName + " from '" + filename + "'";
runCommand(query);
......@@ -201,29 +213,61 @@ public class HiveHookIT {
@Test
public void testSelect() throws Exception {
String tableName = "table" + random();
runCommand("create table " + tableName + "(id int, name string)");
String tableName = createTable();
String query = "select * from " + tableName;
runCommand(query);
assertProcessIsRegistered(query);
}
@Test
public void testAlterTable() throws Exception {
String tableName = createTable();
String newName = tableName();
String query = "alter table " + tableName + " rename to " + newName;
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, newName);
assertTableIsNotRegistered(DEFAULT_DB, tableName);
}
@Test
public void testAlterView() throws Exception {
String tableName = createTable();
String viewName = tableName();
String newName = tableName();
String query = "create view " + viewName + " as select * from " + tableName;
runCommand(query);
query = "alter view " + viewName + " rename to " + newName;
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, newName);
assertTableIsNotRegistered(DEFAULT_DB, viewName);
}
private void assertProcessIsRegistered(String queryStr) throws Exception {
String dslQuery = String.format("%s where queryText = \"%s\"", HiveDataTypes.HIVE_PROCESS.getName(), queryStr);
assertEntityIsRegistered(dslQuery);
assertEntityIsRegistered(dslQuery, true);
}
private void assertTableIsRegistered(String dbName, String tableName) throws Exception {
private String assertTableIsRegistered(String dbName, String tableName) throws Exception {
return assertTableIsRegistered(dbName, tableName, true);
}
private String assertTableIsNotRegistered(String dbName, String tableName) throws Exception {
return assertTableIsRegistered(dbName, tableName, false);
}
private String assertTableIsRegistered(String dbName, String tableName, boolean registered) throws Exception {
String query = String.format("%s as t where name = '%s', dbName where name = '%s' and clusterName = '%s'"
+ " select t", HiveDataTypes.HIVE_TABLE.getName(), tableName, dbName, CLUSTER_NAME);
assertEntityIsRegistered(query);
+ " select t", HiveDataTypes.HIVE_TABLE.getName(), tableName, dbName, CLUSTER_NAME);
return assertEntityIsRegistered(query, registered);
}
private String assertDatabaseIsRegistered(String dbName) throws Exception {
String query = String.format("%s where name = '%s' and clusterName = '%s'", HiveDataTypes.HIVE_DB.getName(),
dbName, CLUSTER_NAME);
return assertEntityIsRegistered(query);
return assertEntityIsRegistered(query, true);
}
private void assertPartitionIsRegistered(String dbName, String tableName, String value) throws Exception {
......@@ -240,14 +284,19 @@ public class HiveHookIT {
Assert.assertEquals(results.length(), 1);
}
private String assertEntityIsRegistered(String dslQuery) throws Exception{
private String assertEntityIsRegistered(String dslQuery, boolean registered) throws Exception{
JSONArray results = dgiCLient.searchByDSL(dslQuery);
Assert.assertEquals(results.length(), 1);
JSONObject row = results.getJSONObject(0);
if (row.has("$id$")) {
return row.getJSONObject("$id$").getString("id");
if (registered) {
Assert.assertEquals(results.length(), 1);
JSONObject row = results.getJSONObject(0);
if (row.has("$id$")) {
return row.getJSONObject("$id$").getString("id");
} else {
return row.getJSONObject("_col_0").getString("id");
}
} else {
return row.getJSONObject("_col_0").getString("id");
Assert.assertEquals(results.length(), 0);
return null;
}
}
}
......@@ -34,7 +34,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
......@@ -69,6 +68,8 @@ public class MetadataServiceClient {
public static final String QUERY = "query";
public static final String QUERY_TYPE = "queryType";
public static final String ATTRIBUTE_NAME = "property";
public static final String ATTRIBUTE_VALUE = "value";
private WebResource service;
......@@ -207,6 +208,19 @@ public class MetadataServiceClient {
}
}
/**
* Updates property for the entity corresponding to guid
* @param guid
* @param property
* @param value
*/
public JSONObject updateEntity(String guid, String property, String value) throws MetadataServiceException {
WebResource resource = getResource(API.UPDATE_ENTITY, guid);
resource = resource.queryParam(ATTRIBUTE_NAME, property);
resource = resource.queryParam(ATTRIBUTE_VALUE, value);
return callAPIWithResource(API.UPDATE_ENTITY, resource);
}
public JSONObject searchEntity(String searchQuery) throws MetadataServiceException {
WebResource resource = getResource(API.SEARCH);
resource = resource.queryParam(QUERY, searchQuery);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment