Commit 0d38165d by Shwetha GS

hive hooks for create table and CTAS

parent 571fd25b
......@@ -45,7 +45,7 @@ import java.util.List;
* and registers then in DGI.
*/
public class HiveMetaStoreBridge {
static class Pair<S, T> {
public static class Pair<S, T> {
public S first;
public T second;
......@@ -119,7 +119,7 @@ public class HiveMetaStoreBridge {
String guid = jsonObject.getString(MetadataServiceClient.RESULTS);
LOG.debug("created instance for type " + typeName + ", guid: " + guid);
return new Referenceable(guid, referenceable.getTypeName(), referenceable.getValuesMap());
return new Referenceable(guid, referenceable.getTypeName(), null);
}
private void importTables(String databaseName, Referenceable databaseReferenceable) throws Exception {
......
......@@ -167,50 +167,22 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo
HiveMetaStoreBridge dgiBridge = new HiveMetaStoreBridge(conf);
HiveOperation operation = HiveOperation.valueOf(hookContext.getOperationName());
Set<ReadEntity> inputs = hookContext.getInputs();
Set<WriteEntity> outputs = hookContext.getOutputs();
String user = hookContext.getUserName();
String queryId = null;
String queryStr = null;
long queryStartTime = 0;
QueryPlan plan = hookContext.getQueryPlan();
if (plan != null) {
queryId = plan.getQueryId();
queryStr = plan.getQueryString();
queryStartTime = plan.getQueryStartTime();
}
System.out.println(String .format("%s - %s", queryStr, hookContext.getOperationName()));
StringBuffer stringBuffer = new StringBuffer("Inputs - ");
for (ReadEntity entity : inputs) {
stringBuffer = stringBuffer.append(" ").append(entity.getType());
if (entity.getType() == Entity.Type.TABLE) {
stringBuffer = stringBuffer.append(" ").append(entity.getTable().getTableName());
} else if (entity.getType() == Entity.Type.DATABASE) {
stringBuffer = stringBuffer.append(" ").append(entity.getDatabase().getName());
}
}
stringBuffer = stringBuffer.append(" Outputs - ");
switch (operation) {
case CREATEDATABASE:
Set<WriteEntity> outputs = hookContext.getOutputs();
for (WriteEntity entity : outputs) {
stringBuffer = stringBuffer.append(" ").append(entity.getType());
if (entity.getType() == Entity.Type.TABLE) {
stringBuffer = stringBuffer.append(" ").append(entity.getTable().getTableName());
} else if (entity.getType() == Entity.Type.DATABASE) {
stringBuffer = stringBuffer.append(" ").append(entity.getDatabase().getName());
if (entity.getType() == Entity.Type.DATABASE) {
dgiBridge.registerDatabase(entity.getDatabase().getName());
}
}
System.out.println(stringBuffer.toString());
switch (operation) {
case CREATEDATABASE:
String dbName = queryStr.split(" ")[2].trim();
dgiBridge.registerDatabase(dbName);
break;
case CREATETABLE:
outputs = hookContext.getOutputs();
for (WriteEntity entity : outputs) {
if (entity.getType() == Entity.Type.TABLE) {
Table table = entity.getTable();
//TODO table.getDbName().toLowerCase() is required as hive stores in lowercase, but table.getDbName() is not lowercase
Referenceable dbReferenceable = getDatabaseReference(dgiBridge, table.getDbName().toLowerCase());
......@@ -220,35 +192,59 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo
break;
case CREATETABLE_AS_SELECT:
Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.name());
registerCTAS(dgiBridge, hookContext);
break;
default:
}
}
private void registerCTAS(HiveMetaStoreBridge dgiBridge, HookContext hookContext) throws Exception {
Set<ReadEntity> inputs = hookContext.getInputs();
Set<WriteEntity> outputs = hookContext.getOutputs();
String user = hookContext.getUserName();
HiveOperation operation = HiveOperation.valueOf(hookContext.getOperationName());
String queryId = null;
String queryStr = null;
long queryStartTime = 0;
QueryPlan plan = hookContext.getQueryPlan();
if (plan != null) {
queryId = plan.getQueryId();
queryStr = plan.getQueryString();
queryStartTime = plan.getQueryStartTime();
}
Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
processReferenceable.set("processName", operation.getOperationName());
processReferenceable.set("startTime", queryStartTime);
processReferenceable.set("endTime", 0);
processReferenceable.set("userName", user);
List<Referenceable> source = new ArrayList<>();
for (ReadEntity readEntity : inputs) {
if (readEntity.getTyp() == Entity.Type.TABLE) {
source.add(getTableReference(dgiBridge, readEntity.getTable().getTableName()));
Table table = readEntity.getTable();
String dbName = table.getDbName().toLowerCase();
source.add(getTableReference(dgiBridge, dbName, table.getTableName()));
}
}
processReferenceable.set("sourceTableNames", source);
List<Referenceable> target = new ArrayList<>();
for (WriteEntity writeEntity : outputs) {
if (writeEntity.getTyp() == Entity.Type.TABLE) {
target.add(getTableReference(dgiBridge, writeEntity.getTable().getTableName()));
Table table = writeEntity.getTable();
String dbName = table.getDbName().toLowerCase();
target.add(getTableReference(dgiBridge, dbName, table.getTableName()));
}
}
processReferenceable.set("targetTableNames", target);
processReferenceable.set("queryText", queryStr);
processReferenceable.set("queryId", queryId);
//TODO set
processReferenceable.set("queryPlan", "");
processReferenceable.set("queryGraph", "");
processReferenceable.set("endTime", queryStartTime);
processReferenceable.set("queryPlan", "queryPlan");
processReferenceable.set("queryGraph", "queryGraph");
dgiBridge.createInstance(processReferenceable);
break;
default:
}
}
/**
......@@ -264,9 +260,11 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo
JSONObject result = dgiClient.search(typeName, "name", dbName);
JSONArray results = (JSONArray) result.get("results");
if (results.length() == 0) {
//Create new instance
return dgiBridge.registerDatabase(dbName);
} else {
String guid = (String) ((JSONObject) results.get(0)).get("guid");
return new Referenceable(guid, typeName, null);
......@@ -274,26 +272,30 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo
}
/**
* Gets reference for the table. Throws up if table instance doesn't exist
* Gets reference for the table. Creates new instance if it doesn't exist
* @param dgiBridge
* @param dbName
* @param tableName table name
* @return table reference
* @throws MetadataServiceException
* @throws JSONException
* @throws Exception
*/
private Referenceable getTableReference(HiveMetaStoreBridge dgiBridge, String tableName) throws MetadataServiceException, JSONException {
private Referenceable getTableReference(HiveMetaStoreBridge dgiBridge, String dbName, String tableName) throws Exception {
String typeName = HiveDataTypes.HIVE_TABLE.getName();
MetadataServiceClient dgiClient = dgiBridge.getMetadataServiceClient();
JSONObject result = dgiClient.search(typeName, "tableName", tableName);
JSONArray results = (JSONArray) new JSONObject((String) result.get("results")).get("results");
JSONArray results = (JSONArray) result.get("results");
if (results.length() == 0) {
throw new IllegalArgumentException("There is no entity for " + typeName + " where tableName=" + tableName);
}
Referenceable dbRererence = getDatabaseReference(dgiBridge, dbName);
return dgiBridge.registerTable(dbRererence, dbName, tableName).first;
} else {
//There should be just one instance with the given name
String guid = (String) new JSONObject((String) results.get(0)).get("guid");
String guid = (String) ((JSONObject) results.get(0)).get("guid");
return new Referenceable(guid, typeName, null);
}
}
//TODO Do we need this??
......
......@@ -59,8 +59,8 @@ public class HiveHookIT {
//Set-up hive session
HiveConf conf = getHiveConf();
driver = new Driver(conf);
ss = new SessionState(conf);
ss = ss.start(conf);
ss = new SessionState(conf, System.getProperty("user.name"));
ss = SessionState.start(ss);
SessionState.setCurrentSessionState(ss);
dgiCLient = new MetadataServiceClient(DGI_URL);
......@@ -87,37 +87,53 @@ public class HiveHookIT {
String dbName = "db" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
runCommand("create database " + dbName);
String typeName = HiveDataTypes.HIVE_DB.getName();
JSONObject result = dgiCLient.search(typeName, "name", dbName);
JSONArray results = (JSONArray) result.get("results");
Assert.assertEquals(results.length(), 1);
JSONObject resultRow = (JSONObject) results.get(0);
Assert.assertEquals(resultRow.get(typeName + ".name"), dbName);
assertDatabaseIsRegistered(dbName);
}
@Test(enabled = false)
@Test
public void testCreateTable() throws Exception {
String dbName = "db" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
runCommand("create database " + dbName);
String tableName = "table" + RandomStringUtils.randomAlphanumeric(5);
String queryStr = String.format("create table %s.%s(id int, name string)", dbName, tableName);
runCommand(queryStr);
String tableName = "table" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
runCommand("create table " + dbName + "." + tableName + "(id int, name string)");
assertTableIsRegistered(tableName);
String defaultTableName = "table" + RandomStringUtils.randomAlphanumeric(5);
runCommand("create table " + defaultTableName + "(id int, name string)");
tableName = "table" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
runCommand("create table " + tableName + "(id int, name string)");
assertTableIsRegistered(tableName);
//Create table where database doesn't exist, will create database instance as well
assertDatabaseIsRegistered("default");
}
runCommand("select * from " + defaultTableName);
@Test
public void testCTAS() throws Exception {
String tableName = "table" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
runCommand("create table " + tableName + "(id int, name string)");
runCommand("select * from " + dbName + "." + tableName);
String newTableName = "table" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
String query = "create table " + newTableName + " as select * from " + tableName;
runCommand(query);
String newTableName = "table" + RandomStringUtils.randomAlphanumeric(5);
assertTableIsRegistered(newTableName);
assertInstanceIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), "queryText", query);
}
runCommand("create table " + newTableName + " as select * from " + defaultTableName);
private void assertTableIsRegistered(String tableName) throws Exception {
assertInstanceIsRegistered(HiveDataTypes.HIVE_TABLE.getName(), "tableName", tableName);
}
runCommand("create table " + dbName + "." + newTableName + " as select * from " + dbName + "." + tableName);
private void assertDatabaseIsRegistered(String dbName) throws Exception {
assertInstanceIsRegistered(HiveDataTypes.HIVE_DB.getName(), "name", dbName);
}
newTableName = "table" + RandomStringUtils.randomAlphanumeric(5);
runCommand("create table " + newTableName + " as select count(*) from " + defaultTableName);
private void assertInstanceIsRegistered(String typeName, String colName, String colValue) throws Exception{
JSONObject result = dgiCLient.search(typeName, colName, colValue);
JSONArray results = (JSONArray) result.get("results");
Assert.assertEquals(results.length(), 1);
JSONObject resultRow = (JSONObject) results.get(0);
Assert.assertEquals(resultRow.get(typeName + "." + colName), colValue);
}
}
......@@ -146,7 +146,6 @@ public class MetadataServiceClient {
* @throws MetadataServiceException
*/
public JSONObject search(String typeName, String attributeName, Object attributeValue) throws MetadataServiceException {
//TODO replace with DSL when DSL works
String gremlinQuery = String.format("g.V.has(\"typeName\",\"%s\").and(_().has(\"%s.%s\", T.eq, \"%s\")).toList()",
typeName, typeName, attributeName, attributeValue);
return search(gremlinQuery);
......
......@@ -84,11 +84,9 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
Expressions.Expression validatedExpression = QueryProcessor.validate(expression);
GremlinQuery gremlinQuery =
new GremlinTranslator(validatedExpression, graphPersistenceStrategy).translate();
System.out.println("---------------------");
System.out.println("Query = " + validatedExpression);
System.out.println("Expression Tree = " + validatedExpression.treeString());
System.out.println("Gremlin Query = " + gremlinQuery.queryStr());
System.out.println("---------------------");
LOG.debug("Query = " + validatedExpression);
LOG.debug("Expression Tree = " + validatedExpression.treeString());
LOG.debug("Gremlin Query = " + gremlinQuery.queryStr());
return new GremlinEvaluator(gremlinQuery, graphPersistenceStrategy, titanGraph).evaluate();
}
......
......@@ -126,6 +126,10 @@ public class ClassType extends HierarchicalType<ClassType, IReferenceableInstanc
createInstanceWithTraits(id, r, r.getTraits().toArray(new String[0]))
: createInstance(id);
if (id != null && id.isAssigned()) {
return tr;
}
for (Map.Entry<String, AttributeInfo> e : fieldMapping.fields.entrySet()) {
String attrKey = e.getKey();
AttributeInfo i = e.getValue();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment