Commit fa81143d by Shwetha G S

Merge pull request #103 from shwethags/hive-ops

handling all hive query operations in hive hook
parents 1abeba45 3e695bad
......@@ -40,25 +40,6 @@
......@@ -99,6 +80,25 @@
......@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.metadata.MetadataServiceClient;
import org.apache.hadoop.metadata.MetadataServiceException;
import org.apache.hadoop.metadata.hive.model.HiveDataModelGenerator;
import org.apache.hadoop.metadata.hive.model.HiveDataTypes;
import org.apache.hadoop.metadata.typesystem.Referenceable;
......@@ -91,31 +92,6 @@ public class HiveMetaStoreBridge {
* Gets reference for the database
* @param databaseName
* @param clusterName cluster name
* @return Reference for database if exists, else null
* @throws Exception
private Referenceable getDatabaseReference(String databaseName, String clusterName) throws Exception {
LOG.debug("Getting reference for database {}", databaseName);
String typeName = HiveDataTypes.HIVE_DB.getName();
MetadataServiceClient dgiClient = getMetadataServiceClient();
String dslQuery = String.format("%s where name = '%s' and clusterName = '%s'",
HiveDataTypes.HIVE_DB.getName(), databaseName, clusterName);
JSONArray results = dgiClient.searchByDSL(dslQuery);
if (results.length() == 0) {
return null;
} else {
String guid = getGuidFromDSLResponse(results.getJSONObject(0));
return new Referenceable(guid, typeName, null);
public Referenceable registerDatabase(String databaseName) throws Exception {
Referenceable dbRef = getDatabaseReference(databaseName, clusterName);
if (dbRef == null) {
......@@ -169,6 +145,35 @@ public class HiveMetaStoreBridge {
* Gets reference for the database
* @param databaseName
* @param clusterName cluster name
* @return Reference for database if exists, else null
* @throws Exception
private Referenceable getDatabaseReference(String databaseName, String clusterName) throws Exception {
LOG.debug("Getting reference for database {}", databaseName);
String typeName = HiveDataTypes.HIVE_DB.getName();
String dslQuery = String.format("%s where name = '%s' and clusterName = '%s'", HiveDataTypes.HIVE_DB.getName(),
databaseName, clusterName);
return getEntityReferenceFromDSL(typeName, dslQuery);
private Referenceable getEntityReferenceFromDSL(String typeName, String dslQuery) throws Exception {
MetadataServiceClient dgiClient = getMetadataServiceClient();
JSONArray results = dgiClient.searchByDSL(dslQuery);
if (results.length() == 0) {
return null;
} else {
String guid = getGuidFromDSLResponse(results.getJSONObject(0));
return new Referenceable(guid, typeName, null);
* Gets reference for the table
* @param dbName
......@@ -180,19 +185,47 @@ public class HiveMetaStoreBridge {
LOG.debug("Getting reference for table {}.{}", dbName, tableName);
String typeName = HiveDataTypes.HIVE_TABLE.getName();
MetadataServiceClient dgiClient = getMetadataServiceClient();
String query = String.format("%s where name = '%s', dbName where name = '%s' and clusterName = '%s'",
HiveDataTypes.HIVE_TABLE.getName(), tableName, dbName, clusterName);
JSONArray results = dgiClient.searchByDSL(query);
// String dslQuery = String.format("%s as t where name = '%s' dbName where name = '%s' and "
// + "clusterName = '%s' select t",
// HiveDataTypes.HIVE_TABLE.getName(), tableName, dbName, clusterName);
String dbType = HiveDataTypes.HIVE_DB.getName();
String gremlinQuery = String.format("g.V.has('__typeName', '%s').has('', '%s').as('t').out"
+ "('__%s.dbName').has('', '%s').has('%s.clusterName', '%s').back('t').toList()",
typeName, typeName, tableName, typeName, dbType, dbName, dbType, clusterName);
return getEntityReferenceFromGremlin(typeName, gremlinQuery);
private Referenceable getEntityReferenceFromGremlin(String typeName, String gremlinQuery) throws MetadataServiceException,
JSONException {
MetadataServiceClient client = getMetadataServiceClient();
JSONObject response = client.searchByGremlin(gremlinQuery);
JSONArray results = response.getJSONArray(MetadataServiceClient.RESULTS);
if (results.length() == 0) {
return null;
} else {
//There should be just one instance with the given name
String guid = getGuidFromDSLResponse(results.getJSONObject(0));
LOG.debug("Got reference for table {}.{} = {}", dbName, tableName, guid);
String guid = results.getJSONObject(0).getString("__guid");
return new Referenceable(guid, typeName, null);
private Referenceable getPartitionReference(String dbName, String tableName, List<String> values) throws Exception {
String valuesStr = "['" + StringUtils.join(values, "', '") + "']";
LOG.debug("Getting reference for partition for {}.{} with values {}", dbName, tableName, valuesStr);
String typeName = HiveDataTypes.HIVE_PARTITION.getName();
// String dslQuery = String.format("%s as p where values = %s, tableName where name = '%s', "
// + "dbName where name = '%s' and clusterName = '%s' select p", typeName, valuesStr, tableName,
// dbName, clusterName);
String dbType = HiveDataTypes.HIVE_DB.getName();
String tableType = HiveDataTypes.HIVE_TABLE.getName();
String gremlinQuery = String.format("g.V.has('__typeName', '%s').has('%s.values', %s).as('p')."
+ "out('__%s.tableName').has('', '%s').out('__%s.dbName').has('', '%s')"
+ ".has('%s.clusterName', '%s').back('p').toList()", typeName, typeName, valuesStr, typeName,
tableType, tableName, tableType, dbType, dbName, dbType, clusterName);
return getEntityReferenceFromGremlin(typeName, gremlinQuery);
private String getGuidFromDSLResponse(JSONObject jsonObject) throws JSONException {
......@@ -292,14 +325,27 @@ public class HiveMetaStoreBridge {
//todo should be idempotent
public Referenceable registerPartition(Partition partition) throws Exception {
String dbName = partition.getTable().getDbName();
String tableName = partition.getTable().getTableName();
Referenceable dbRef = registerDatabase(dbName);
Referenceable tableRef = registerTable(dbName, tableName);
Referenceable sdRef = getSDForTable(dbName, tableName);
return importPartition(partition, dbRef, tableRef, sdRef);
private Referenceable importPartition(Partition hivePart,
Referenceable dbReferenceable,
Referenceable tableReferenceable,
Referenceable sdReferenceable) throws Exception {"Importing partition for {}.{} with values {}", dbReferenceable, tableReferenceable,
StringUtils.join(hivePart.getValues(), ","));
Referenceable partRef = new Referenceable(HiveDataTypes.HIVE_PARTITION.getName());
String dbName = hivePart.getTable().getDbName();
String tableName = hivePart.getTable().getTableName();
Referenceable partRef = getPartitionReference(dbName, tableName, hivePart.getValues());
if (partRef == null) {
partRef = new Referenceable(HiveDataTypes.HIVE_PARTITION.getName());
partRef.set("values", hivePart.getValues());
partRef.set("dbName", dbReferenceable);
......@@ -315,8 +361,12 @@ public class HiveMetaStoreBridge {
partRef.set("sd", sdReferenceable);
partRef.set("parameters", hivePart.getParameters());
return createInstance(partRef);
partRef = createInstance(partRef);
} else {"Partition {}.{} with values {} is already registered with id {}", dbName, tableName,
StringUtils.join(hivePart.getValues(), ","), partRef.getId().id);
return partRef;
private void importIndexes(String db, String table,
......@@ -189,17 +189,28 @@ public class HiveHook implements ExecuteWithHookContext {
switch (event.operation) {
Set<WriteEntity> outputs = event.outputs;
for (WriteEntity entity : outputs) {
if (entity.getType() == Entity.Type.DATABASE) {
handleCreateDB(dgiBridge, event);
outputs = event.outputs;
for (WriteEntity entity : outputs) {
handleCreateTable(dgiBridge, event);
case LOAD:
case EXPORT:
case IMPORT:
case QUERY:
registerProcess(dgiBridge, event);
private void handleCreateTable(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
for (WriteEntity entity : event.outputs) {
if (entity.getType() == Entity.Type.TABLE) {
Table table = entity.getTable();
......@@ -209,17 +220,17 @@ public class HiveHook implements ExecuteWithHookContext {
dgiBridge.registerTable(dbReferenceable, table.getDbName(), table.getTableName());
registerCTAS(dgiBridge, event);
private void handleCreateDB(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
for (WriteEntity entity : event.outputs) {
if (entity.getType() == Entity.Type.DATABASE) {
private void registerCTAS(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
private void registerProcess(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
Set<ReadEntity> inputs = event.inputs;
Set<WriteEntity> outputs = event.outputs;
......@@ -243,7 +254,7 @@ public class HiveHook implements ExecuteWithHookContext {
processReferenceable.set("userName", event.user);
List<Referenceable> source = new ArrayList<>();
for (ReadEntity readEntity : inputs) {
if (readEntity.getTyp() == Entity.Type.TABLE) {
if (readEntity.getType() == Entity.Type.TABLE) {
Table table = readEntity.getTable();
String dbName = table.getDbName().toLowerCase();
source.add(dgiBridge.registerTable(dbName, table.getTableName()));
......@@ -252,11 +263,14 @@ public class HiveHook implements ExecuteWithHookContext {
processReferenceable.set("inputTables", source);
List<Referenceable> target = new ArrayList<>();
for (WriteEntity writeEntity : outputs) {
if (writeEntity.getTyp() == Entity.Type.TABLE) {
if (writeEntity.getType() == Entity.Type.TABLE || writeEntity.getType() == Entity.Type.PARTITION) {
Table table = writeEntity.getTable();
String dbName = table.getDbName().toLowerCase();
target.add(dgiBridge.registerTable(dbName, table.getTableName()));
if (writeEntity.getType() == Entity.Type.PARTITION) {
processReferenceable.set("outputTables", target);
processReferenceable.set("queryText", queryStr);
......@@ -26,10 +26,13 @@ import org.apache.hadoop.metadata.MetadataServiceClient;
import org.apache.hadoop.metadata.hive.bridge.HiveMetaStoreBridge;
import org.apache.hadoop.metadata.hive.model.HiveDataTypes;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class HiveHookIT {
private static final String DGI_URL = "http://localhost:21000/";
private static final String CLUSTER_NAME = "test";
......@@ -59,6 +62,9 @@ public class HiveHookIT {
hiveConf.set("javax.jdo.option.ConnectionURL", "jdbc:derby:./target/metastore_db;create=true");
hiveConf.set("hive.hook.dgi.synchronous", "true");
hiveConf.set(HiveMetaStoreBridge.HIVE_CLUSTER_NAME, CLUSTER_NAME);
//weird, hive prepends test_ to table name
hiveConf.set("hive.test.mode", "true");
hiveConf.set("fs.pfile.impl", "org.apache.hadoop.fs.ProxyLocalFileSystem");
return hiveConf;
......@@ -69,7 +75,7 @@ public class HiveHookIT {
public void testCreateDatabase() throws Exception {
String dbName = "db" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
String dbName = "db" + random();
runCommand("create database " + dbName);
......@@ -77,15 +83,15 @@ public class HiveHookIT {
public void testCreateTable() throws Exception {
String dbName = "db" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
String dbName = "db" + random();
runCommand("create database " + dbName);
String tableName = "table" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
String tableName = "table" + random();
runCommand("create table " + dbName + "." + tableName + "(id int, name string)");
assertTableIsRegistered(dbName, tableName);
tableName = "table" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
runCommand("create table " + tableName + "(id int, name string)");
tableName = "table" + random();
runCommand("create table " + tableName + "(id int, name string) partitioned by(dt string)");
assertTableIsRegistered("default", tableName);
//Create table where database doesn't exist, will create database instance as well
......@@ -94,10 +100,10 @@ public class HiveHookIT {
public void testCTAS() throws Exception {
String tableName = "table" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
String tableName = "table" + random();
runCommand("create table " + tableName + "(id int, name string)");
String ctasTableName = "table" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
String ctasTableName = "table" + random();
String query = "create table " + ctasTableName + " as select * from " + tableName;
......@@ -105,24 +111,125 @@ public class HiveHookIT {
public void testCreateView() throws Exception {
String tableName = "table" + random();
runCommand("create table " + tableName + "(id int, name string)");
String viewName = "table" + random();
String query = "create view " + viewName + " as select * from " + tableName;
assertTableIsRegistered("default", viewName);
public void testLoadData() throws Exception {
String tableName = "table" + random();
runCommand("create table test_" + tableName + "(id int, name string)");
String loadFile = file("load");
String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName;
public void testInsert() throws Exception {
String tableName = "table" + random();
runCommand("create table " + tableName + "(id int, name string) partitioned by(dt string)");
String insertTableName = "table" + random();
runCommand("create table test_" + insertTableName + "(name string) partitioned by(dt string)");
String query = "insert into " + insertTableName + " partition(dt = '2015-01-01') select name from "
+ tableName + " where dt = '2015-01-01'";
assertPartitionIsRegistered("default", "test_" + insertTableName, "2015-01-01");
private String random() {
return RandomStringUtils.randomAlphanumeric(5).toLowerCase();
private String file(String tag) throws Exception {
String filename = "./target/" + tag + "-data-" + random();
File file = new File(filename);
return file.getAbsolutePath();
private String mkdir(String tag) throws Exception {
String filename = "./target/" + tag + "-data-" + random();
File file = new File(filename);
return file.getAbsolutePath();
public void testExportImport() throws Exception {
String tableName = "table" + random();
runCommand("create table test_" + tableName + "(name string)");
String filename = "pfile://" + mkdir("export");
String query = "export table " + tableName + " to '" + filename + "'";
tableName = "table" + random();
runCommand("create table " + tableName + "(name string)");
query = "import table " + tableName + " from '" + filename + "'";
public void testSelect() throws Exception {
String tableName = "table" + random();
runCommand("create table " + tableName + "(id int, name string)");
String query = "select * from " + tableName;
private void assertProcessIsRegistered(String queryStr) throws Exception {
String dslQuery = String.format("%s where queryText = '%s'", HiveDataTypes.HIVE_PROCESS.getName(), queryStr);
String dslQuery = String.format("%s where queryText = \"%s\"", HiveDataTypes.HIVE_PROCESS.getName(), queryStr);
private void assertTableIsRegistered(String dbName, String tableName) throws Exception {
String query = String.format("%s where name = '%s', dbName where name = '%s' and clusterName = '%s'",
HiveDataTypes.HIVE_TABLE.getName(), tableName, dbName, CLUSTER_NAME);
private void assertDatabaseIsRegistered(String dbName) throws Exception {
String query = String.format("%s where name = '%s' and clusterName = '%s'", HiveDataTypes.HIVE_DB.getName(),
private void assertPartitionIsRegistered(String dbName, String tableName, String value) throws Exception {
String typeName = HiveDataTypes.HIVE_PARTITION.getName();
String dbType = HiveDataTypes.HIVE_DB.getName();
String tableType = HiveDataTypes.HIVE_TABLE.getName();
String gremlinQuery = String.format("g.V.has('__typeName', '%s').has('%s.values', ['%s']).as('p')."
+ "out('__%s.tableName').has('', '%s').out('__%s.dbName').has('', '%s')"
+ ".has('%s.clusterName', '%s').back('p').toList()", typeName, typeName, value, typeName,
tableType, tableName, tableType, dbType, dbName, dbType, CLUSTER_NAME);
JSONObject response = dgiCLient.searchByGremlin(gremlinQuery);
JSONArray results = response.getJSONArray(MetadataServiceClient.RESULTS);
Assert.assertEquals(results.length(), 1);
private void assertInstanceIsRegistered(String dslQuery) throws Exception{
private void assertEntityIsRegistered(String dslQuery) throws Exception{
JSONArray results = dgiCLient.searchByDSL(dslQuery);
Assert.assertEquals(results.length(), 1);
......@@ -200,7 +200,7 @@ public class MetadataServiceClient {
public Referenceable getEntity(String guid) throws MetadataServiceException {
JSONObject jsonResponse = callAPI(API.GET_ENTITY, null, guid);
try {
String entityInstanceDefinition = jsonResponse.getString(MetadataServiceClient.GUID);
String entityInstanceDefinition = jsonResponse.getString(MetadataServiceClient.DEFINITION);
return InstanceSerialization.fromJsonReferenceable(entityInstanceDefinition, true);
} catch (JSONException e) {
throw new MetadataServiceException(e);
......@@ -101,6 +101,9 @@
<StagingName>Apache Release Distribution Repository</StagingName>
<!-- skips checkstyle and find bugs -->
......@@ -971,6 +974,7 @@
......@@ -988,6 +992,7 @@
<!--debug>true</debug -->
......@@ -23,6 +23,8 @@ import com.thinkaurelius.titan.core.TitanIndexQuery;
import com.thinkaurelius.titan.core.TitanProperty;
import com.thinkaurelius.titan.core.TitanVertex;
import com.tinkerpop.blueprints.Vertex;
import com.tinkerpop.gremlin.groovy.Gremlin;
import org.apache.hadoop.metadata.MetadataServiceClient;
import org.apache.hadoop.metadata.discovery.DiscoveryException;
import org.apache.hadoop.metadata.discovery.DiscoveryService;
......@@ -199,7 +199,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
case ENUM:
createVertexMixedIndex(propertyName, Integer.class);
createVertexMixedIndex(propertyName, String.class);
case ARRAY:
......@@ -85,7 +85,7 @@
<priority value="debug"/>
<priority value="info"/>
<appender-ref ref="console"/>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment