Commit a4af9e31 by Shwetha GS

auto registering hive model in hive hook

parent 1e835435
...@@ -351,13 +351,15 @@ public class HiveMetaStoreBridge { ...@@ -351,13 +351,15 @@ public class HiveMetaStoreBridge {
return fieldsList; return fieldsList;
} }
private void registerHiveDataModel() throws Exception { public synchronized void registerHiveDataModel() throws Exception {
HiveDataModelGenerator dataModelGenerator = new HiveDataModelGenerator(); HiveDataModelGenerator dataModelGenerator = new HiveDataModelGenerator();
try { MetadataServiceClient dgiClient = getMetadataServiceClient();
getMetadataServiceClient().createType(dataModelGenerator.getModelAsJson());
} catch (Exception e) { //Register hive data model if its not already registered
//Ignore if type is already registered if (dgiClient.getType(HiveDataTypes.HIVE_PROCESS.getName()) == null ) {
//TODO make createType idempotent dgiClient.createType(dataModelGenerator.getModelAsJson());
} else {
LOG.debug("Hive data model is already registered!");
} }
} }
......
...@@ -38,8 +38,6 @@ package org.apache.hadoop.metadata.hive.hook; ...@@ -38,8 +38,6 @@ package org.apache.hadoop.metadata.hive.hook;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.antlr.runtime.tree.Tree; import org.antlr.runtime.tree.Tree;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.ExplainTask; import org.apache.hadoop.hive.ql.exec.ExplainTask;
...@@ -65,6 +63,8 @@ import org.apache.hadoop.metadata.hive.model.HiveDataTypes; ...@@ -65,6 +63,8 @@ import org.apache.hadoop.metadata.hive.model.HiveDataTypes;
import org.apache.hadoop.metadata.typesystem.Referenceable; import org.apache.hadoop.metadata.typesystem.Referenceable;
import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedWriter; import java.io.BufferedWriter;
import java.io.File; import java.io.File;
...@@ -86,7 +86,8 @@ import java.util.concurrent.TimeUnit; ...@@ -86,7 +86,8 @@ import java.util.concurrent.TimeUnit;
*/ */
public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHook { public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHook {
private static final Log LOG = LogFactory.getLog(HiveHook.class.getName()); private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class);
// wait time determines how long we wait before we exit the jvm on // wait time determines how long we wait before we exit the jvm on
// shutdown. Pending requests after that will not be sent. // shutdown. Pending requests after that will not be sent.
private static final int WAIT_TIME = 3; private static final int WAIT_TIME = 3;
...@@ -99,6 +100,7 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo ...@@ -99,6 +100,7 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo
private static final int minThreadsDefault = 5; private static final int minThreadsDefault = 5;
private static final int maxThreadsDefault = 5; private static final int maxThreadsDefault = 5;
private static final long keepAliveTimeDefault = 10; private static final long keepAliveTimeDefault = 10;
private static boolean typesRegistered = false;
static { static {
// anything shared should be initialized here and destroyed in the // anything shared should be initialized here and destroyed in the
...@@ -174,14 +176,17 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo ...@@ -174,14 +176,17 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo
} }
private void fireAndForget(HookContext hookContext, HiveConf conf) throws Exception { private void fireAndForget(HookContext hookContext, HiveConf conf) throws Exception {
LOG.info("Entered DGI hook for query hook " + hookContext.getHookType()); assert hookContext.getHookType() == HookContext.HookType.POST_EXEC_HOOK : "Non-POST_EXEC_HOOK not supported!";
if (hookContext.getHookType() != HookContext.HookType.POST_EXEC_HOOK) {
LOG.debug("No-op for query hook " + hookContext.getHookType()); HiveOperation operation = HiveOperation.valueOf(hookContext.getOperationName());
} LOG.info("Entered DGI hook for hook type {} operation {}", hookContext.getHookType(), operation);
HiveMetaStoreBridge dgiBridge = new HiveMetaStoreBridge(conf); HiveMetaStoreBridge dgiBridge = new HiveMetaStoreBridge(conf);
HiveOperation operation = HiveOperation.valueOf(hookContext.getOperationName()); if (!typesRegistered) {
dgiBridge.registerHiveDataModel();
typesRegistered = true;
}
switch (operation) { switch (operation) {
case CREATEDATABASE: case CREATEDATABASE:
...@@ -199,7 +204,8 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo ...@@ -199,7 +204,8 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo
if (entity.getType() == Entity.Type.TABLE) { if (entity.getType() == Entity.Type.TABLE) {
Table table = entity.getTable(); Table table = entity.getTable();
//TODO table.getDbName().toLowerCase() is required as hive stores in lowercase, but table.getDbName() is not lowercase //TODO table.getDbName().toLowerCase() is required as hive stores in lowercase,
// but table.getDbName() is not lowercase
Referenceable dbReferenceable = getDatabaseReference(dgiBridge, table.getDbName().toLowerCase()); Referenceable dbReferenceable = getDatabaseReference(dgiBridge, table.getDbName().toLowerCase());
dgiBridge.registerTable(dbReferenceable, table.getDbName(), table.getTableName()); dgiBridge.registerTable(dbReferenceable, table.getDbName(), table.getTableName());
} }
...@@ -215,8 +221,6 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo ...@@ -215,8 +221,6 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo
} }
private void registerCTAS(HiveMetaStoreBridge dgiBridge, HookContext hookContext, HiveConf conf) throws Exception { private void registerCTAS(HiveMetaStoreBridge dgiBridge, HookContext hookContext, HiveConf conf) throws Exception {
LOG.debug("Registering CTAS");
Set<ReadEntity> inputs = hookContext.getInputs(); Set<ReadEntity> inputs = hookContext.getInputs();
Set<WriteEntity> outputs = hookContext.getOutputs(); Set<WriteEntity> outputs = hookContext.getOutputs();
...@@ -238,6 +242,7 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo ...@@ -238,6 +242,7 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo
queryStartTime = plan.getQueryStartTime(); queryStartTime = plan.getQueryStartTime();
} }
LOG.debug("Registering CTAS query: {}", queryStr);
Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName()); Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
processReferenceable.set("processName", operation.getOperationName()); processReferenceable.set("processName", operation.getOperationName());
processReferenceable.set("startTime", queryStartTime); processReferenceable.set("startTime", queryStartTime);
......
...@@ -40,16 +40,6 @@ public class HiveHookIT { ...@@ -40,16 +40,6 @@ public class HiveHookIT {
@BeforeClass @BeforeClass
public void setUp() throws Exception { public void setUp() throws Exception {
//Register hive types
HiveDataModelGenerator hiveModel = new HiveDataModelGenerator();
String typesAsJson = hiveModel.getModelAsJson();
MetadataServiceClient dgiClient = new MetadataServiceClient(DGI_URL);
try {
dgiClient.createType(typesAsJson);
} catch (Exception e) {
//ignore if types are already defined
}
//Set-up hive session //Set-up hive session
HiveConf conf = getHiveConf(); HiveConf conf = getHiveConf();
driver = new Driver(conf); driver = new Driver(conf);
...@@ -100,7 +90,6 @@ public class HiveHookIT { ...@@ -100,7 +90,6 @@ public class HiveHookIT {
//Create table where database doesn't exist, will create database instance as well //Create table where database doesn't exist, will create database instance as well
assertDatabaseIsRegistered("default"); assertDatabaseIsRegistered("default");
} }
@Test @Test
......
...@@ -108,6 +108,21 @@ public class MetadataServiceClient { ...@@ -108,6 +108,21 @@ public class MetadataServiceClient {
} }
} }
public String getType(String typeName) throws MetadataServiceException {
WebResource resource = getResource(API.GET_TYPE, typeName);
try {
JSONObject response = callAPIWithResource(API.GET_TYPE, resource);
return response.getString("definition");
} catch (MetadataServiceException e) {
if (e.getStatus() == ClientResponse.Status.NOT_FOUND) {
return null;
}
throw e;
} catch (JSONException e) {
throw new MetadataServiceException(e);
}
}
/** /**
* Register the given type(meta model) * Register the given type(meta model)
* @param typeAsJson type definition a jaon * @param typeAsJson type definition a jaon
...@@ -216,8 +231,7 @@ public class MetadataServiceClient { ...@@ -216,8 +231,7 @@ public class MetadataServiceClient {
return resource; return resource;
} }
private JSONObject callAPIWithResource(API api, private JSONObject callAPIWithResource(API api, WebResource resource) throws MetadataServiceException {
WebResource resource) throws MetadataServiceException {
return callAPIWithResource(api, resource, null); return callAPIWithResource(api, resource, null);
} }
......
...@@ -21,6 +21,8 @@ package org.apache.hadoop.metadata; ...@@ -21,6 +21,8 @@ package org.apache.hadoop.metadata;
import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.ClientResponse;
public class MetadataServiceException extends Exception { public class MetadataServiceException extends Exception {
private ClientResponse.Status status;
public MetadataServiceException(MetadataServiceClient.API api, Exception e) { public MetadataServiceException(MetadataServiceClient.API api, Exception e) {
super("Metadata service API " + api + " failed", e); super("Metadata service API " + api + " failed", e);
} }
...@@ -28,9 +30,14 @@ public class MetadataServiceException extends Exception { ...@@ -28,9 +30,14 @@ public class MetadataServiceException extends Exception {
public MetadataServiceException(MetadataServiceClient.API api, ClientResponse.Status status) { public MetadataServiceException(MetadataServiceClient.API api, ClientResponse.Status status) {
super("Metadata service API " + api + " failed with status " + status.getStatusCode() super("Metadata service API " + api + " failed with status " + status.getStatusCode()
+ "(" + status.getReasonPhrase() + ")"); + "(" + status.getReasonPhrase() + ")");
this.status = status;
} }
public MetadataServiceException(Exception e) { public MetadataServiceException(Exception e) {
super(e); super(e);
} }
public ClientResponse.Status getStatus() {
return status;
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment