Commit 40826d14 by Shwetha GS

added cluster name attribute to hive db

parent 114fa269
...@@ -33,7 +33,7 @@ ...@@ -33,7 +33,7 @@
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
<hive.version>1.1.0</hive.version> <hive.version>1.2.0</hive.version>
<calcite.version>0.9.2-incubating</calcite.version> <calcite.version>0.9.2-incubating</calcite.version>
<hadoop.version>2.6.0</hadoop.version> <hadoop.version>2.6.0</hadoop.version>
</properties> </properties>
......
...@@ -32,13 +32,10 @@ import org.apache.hadoop.hive.ql.metadata.Table; ...@@ -32,13 +32,10 @@ import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.metadata.MetadataServiceClient; import org.apache.hadoop.metadata.MetadataServiceClient;
import org.apache.hadoop.metadata.hive.model.HiveDataModelGenerator; import org.apache.hadoop.metadata.hive.model.HiveDataModelGenerator;
import org.apache.hadoop.metadata.hive.model.HiveDataTypes; import org.apache.hadoop.metadata.hive.model.HiveDataTypes;
import org.apache.hadoop.metadata.typesystem.ITypedReferenceableInstance;
import org.apache.hadoop.metadata.typesystem.Referenceable; import org.apache.hadoop.metadata.typesystem.Referenceable;
import org.apache.hadoop.metadata.typesystem.Struct; import org.apache.hadoop.metadata.typesystem.Struct;
import org.apache.hadoop.metadata.typesystem.json.InstanceSerialization; import org.apache.hadoop.metadata.typesystem.json.InstanceSerialization;
import org.apache.hadoop.metadata.typesystem.json.Serialization;
import org.apache.hadoop.metadata.typesystem.persistence.Id; import org.apache.hadoop.metadata.typesystem.persistence.Id;
import org.apache.hadoop.metadata.typesystem.types.TypeSystem;
import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
...@@ -55,20 +52,9 @@ import java.util.Set; ...@@ -55,20 +52,9 @@ import java.util.Set;
*/ */
public class HiveMetaStoreBridge { public class HiveMetaStoreBridge {
private static final String DEFAULT_DGI_URL = "http://localhost:21000/"; private static final String DEFAULT_DGI_URL = "http://localhost:21000/";
public static final String HIVE_CLUSTER_NAME = "hive.cluster.name";
public static class Pair<S, T> { public static final String DEFAULT_CLUSTER_NAME = "primary";
public S first; private final String clusterName;
public T second;
public Pair(S first, T second) {
this.first = first;
this.second = second;
}
public static <S, T> Pair of(S first, T second) {
return new Pair(first, second);
}
}
public static final String DGI_URL_PROPERTY = "hive.hook.dgi.url"; public static final String DGI_URL_PROPERTY = "hive.hook.dgi.url";
...@@ -82,6 +68,7 @@ public class HiveMetaStoreBridge { ...@@ -82,6 +68,7 @@ public class HiveMetaStoreBridge {
* @param hiveConf * @param hiveConf
*/ */
public HiveMetaStoreBridge(HiveConf hiveConf) throws Exception { public HiveMetaStoreBridge(HiveConf hiveConf) throws Exception {
clusterName = hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
hiveClient = Hive.get(hiveConf); hiveClient = Hive.get(hiveConf);
metadataServiceClient = new MetadataServiceClient(hiveConf.get(DGI_URL_PROPERTY, DEFAULT_DGI_URL)); metadataServiceClient = new MetadataServiceClient(hiveConf.get(DGI_URL_PROPERTY, DEFAULT_DGI_URL));
} }
...@@ -107,16 +94,20 @@ public class HiveMetaStoreBridge { ...@@ -107,16 +94,20 @@ public class HiveMetaStoreBridge {
/** /**
* Gets reference for the database * Gets reference for the database
* *
* @param dbName database name *
* @param databaseName
* @param clusterName cluster name
* @return Reference for database if exists, else null * @return Reference for database if exists, else null
* @throws Exception * @throws Exception
*/ */
private Referenceable getDatabaseReference(String dbName) throws Exception { private Referenceable getDatabaseReference(String databaseName, String clusterName) throws Exception {
LOG.debug("Getting reference for database {}", dbName); LOG.debug("Getting reference for database {}", databaseName);
String typeName = HiveDataTypes.HIVE_DB.getName(); String typeName = HiveDataTypes.HIVE_DB.getName();
MetadataServiceClient dgiClient = getMetadataServiceClient(); MetadataServiceClient dgiClient = getMetadataServiceClient();
JSONArray results = dgiClient.rawSearch(typeName, "name", dbName); String dslQuery = String.format("%s where name = '%s' and clusterName = '%s'",
HiveDataTypes.HIVE_DB.getName(), databaseName, clusterName);
JSONArray results = dgiClient.searchByDSL(dslQuery);
if (results.length() == 0) { if (results.length() == 0) {
return null; return null;
} else { } else {
...@@ -126,13 +117,14 @@ public class HiveMetaStoreBridge { ...@@ -126,13 +117,14 @@ public class HiveMetaStoreBridge {
} }
public Referenceable registerDatabase(String databaseName) throws Exception { public Referenceable registerDatabase(String databaseName) throws Exception {
Referenceable dbRef = getDatabaseReference(databaseName); Referenceable dbRef = getDatabaseReference(databaseName, clusterName);
if (dbRef == null) { if (dbRef == null) {
LOG.info("Importing objects from databaseName : " + databaseName); LOG.info("Importing objects from databaseName : " + databaseName);
Database hiveDB = hiveClient.getDatabase(databaseName); Database hiveDB = hiveClient.getDatabase(databaseName);
dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName()); dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName());
dbRef.set("name", hiveDB.getName()); dbRef.set("name", hiveDB.getName());
dbRef.set("clusterName", clusterName);
dbRef.set("description", hiveDB.getDescription()); dbRef.set("description", hiveDB.getDescription());
dbRef.set("locationUri", hiveDB.getLocationUri()); dbRef.set("locationUri", hiveDB.getLocationUri());
dbRef.set("parameters", hiveDB.getParameters()); dbRef.set("parameters", hiveDB.getParameters());
...@@ -168,7 +160,7 @@ public class HiveMetaStoreBridge { ...@@ -168,7 +160,7 @@ public class HiveMetaStoreBridge {
Referenceable tableReferenceable = registerTable(databaseReferenceable, databaseName, tableName); Referenceable tableReferenceable = registerTable(databaseReferenceable, databaseName, tableName);
// Import Partitions // Import Partitions
Referenceable sdReferenceable = getSDForTable(databaseReferenceable, tableName); Referenceable sdReferenceable = getSDForTable(databaseName, tableName);
importPartitions(databaseName, tableName, databaseReferenceable, tableReferenceable, sdReferenceable); importPartitions(databaseName, tableName, databaseReferenceable, tableReferenceable, sdReferenceable);
// Import Indexes // Import Indexes
...@@ -179,28 +171,26 @@ public class HiveMetaStoreBridge { ...@@ -179,28 +171,26 @@ public class HiveMetaStoreBridge {
/** /**
* Gets reference for the table * Gets reference for the table
* *
* @param dbRef * @param dbName
* @param tableName table name * @param tableName table name
* @return table reference if exists, else null * @return table reference if exists, else null
* @throws Exception * @throws Exception
*/ */
private Referenceable getTableReference(Referenceable dbRef, String tableName) throws Exception { private Referenceable getTableReference(String dbName, String tableName) throws Exception {
LOG.debug("Getting reference for table {}.{}", dbRef, tableName); LOG.debug("Getting reference for table {}.{}", dbName, tableName);
String typeName = HiveDataTypes.HIVE_TABLE.getName(); String typeName = HiveDataTypes.HIVE_TABLE.getName();
MetadataServiceClient dgiClient = getMetadataServiceClient(); MetadataServiceClient dgiClient = getMetadataServiceClient();
//todo DSL support for reference doesn't work. is the usage right? String query = String.format("%s where name = '%s', dbName where name = '%s' and clusterName = '%s'",
// String query = String.format("%s where dbName = \"%s\" and tableName = \"%s\"", typeName, dbRef.getId().id, HiveDataTypes.HIVE_TABLE.getName(), tableName, dbName, clusterName);
// tableName);
String query = String.format("%s where name = \"%s\"", typeName, tableName);
JSONArray results = dgiClient.searchByDSL(query); JSONArray results = dgiClient.searchByDSL(query);
if (results.length() == 0) { if (results.length() == 0) {
return null; return null;
} else { } else {
//There should be just one instance with the given name //There should be just one instance with the given name
String guid = getGuidFromDSLResponse(results.getJSONObject(0)); String guid = getGuidFromDSLResponse(results.getJSONObject(0));
LOG.debug("Got reference for table {}.{} = {}", dbRef, tableName, guid); LOG.debug("Got reference for table {}.{} = {}", dbName, tableName, guid);
return new Referenceable(guid, typeName, null); return new Referenceable(guid, typeName, null);
} }
} }
...@@ -209,10 +199,10 @@ public class HiveMetaStoreBridge { ...@@ -209,10 +199,10 @@ public class HiveMetaStoreBridge {
return jsonObject.getJSONObject("$id$").getString("id"); return jsonObject.getJSONObject("$id$").getString("id");
} }
private Referenceable getSDForTable(Referenceable dbRef, String tableName) throws Exception { private Referenceable getSDForTable(String dbName, String tableName) throws Exception {
Referenceable tableRef = getTableReference(dbRef, tableName); Referenceable tableRef = getTableReference(dbName, tableName);
if (tableRef == null) { if (tableRef == null) {
throw new IllegalArgumentException("Table " + dbRef + "." + tableName + " doesn't exist"); throw new IllegalArgumentException("Table " + dbName + "." + tableName + " doesn't exist");
} }
MetadataServiceClient dgiClient = getMetadataServiceClient(); MetadataServiceClient dgiClient = getMetadataServiceClient();
...@@ -228,7 +218,7 @@ public class HiveMetaStoreBridge { ...@@ -228,7 +218,7 @@ public class HiveMetaStoreBridge {
public Referenceable registerTable(Referenceable dbReference, String dbName, String tableName) throws Exception { public Referenceable registerTable(Referenceable dbReference, String dbName, String tableName) throws Exception {
LOG.info("Attempting to register table [" + tableName + "]"); LOG.info("Attempting to register table [" + tableName + "]");
Referenceable tableRef = getTableReference(dbReference, tableName); Referenceable tableRef = getTableReference(dbName, tableName);
if (tableRef == null) { if (tableRef == null) {
LOG.info("Importing objects from " + dbName + "." + tableName); LOG.info("Importing objects from " + dbName + "." + tableName);
......
...@@ -37,7 +37,6 @@ package org.apache.hadoop.metadata.hive.hook; ...@@ -37,7 +37,6 @@ package org.apache.hadoop.metadata.hive.hook;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.antlr.runtime.tree.Tree;
import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.ExplainTask; import org.apache.hadoop.hive.ql.exec.ExplainTask;
...@@ -48,33 +47,16 @@ import org.apache.hadoop.hive.ql.hooks.HookContext; ...@@ -48,33 +47,16 @@ import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHook;
import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
import org.apache.hadoop.hive.ql.parse.ParseDriver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExplainWork;
import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.metadata.MetadataServiceClient;
import org.apache.hadoop.metadata.hive.bridge.HiveMetaStoreBridge; import org.apache.hadoop.metadata.hive.bridge.HiveMetaStoreBridge;
import org.apache.hadoop.metadata.hive.model.HiveDataTypes; import org.apache.hadoop.metadata.hive.model.HiveDataTypes;
import org.apache.hadoop.metadata.typesystem.Referenceable; import org.apache.hadoop.metadata.typesystem.Referenceable;
import org.codehaus.jettison.json.JSONArray; import org.json.JSONObject;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
...@@ -84,7 +66,7 @@ import java.util.concurrent.TimeUnit; ...@@ -84,7 +66,7 @@ import java.util.concurrent.TimeUnit;
/** /**
* DgiHook sends lineage information to the DgiSever. * DgiHook sends lineage information to the DgiSever.
*/ */
public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHook { public class HiveHook implements ExecuteWithHookContext {
private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class); private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class);
...@@ -115,17 +97,12 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo ...@@ -115,17 +97,12 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo
int maxThreads = hiveConf.getInt(MAX_THREADS, maxThreadsDefault); int maxThreads = hiveConf.getInt(MAX_THREADS, maxThreadsDefault);
long keepAliveTime = hiveConf.getLong(KEEP_ALIVE_TIME, keepAliveTimeDefault); long keepAliveTime = hiveConf.getLong(KEEP_ALIVE_TIME, keepAliveTimeDefault);
executor = new ThreadPoolExecutor(minThreads, maxThreads, executor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTime, TimeUnit.MILLISECONDS,
keepAliveTime, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), new LinkedBlockingQueue<Runnable>(),
new ThreadFactoryBuilder() new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DGI Logger %d").build());
.setDaemon(true)
.setNameFormat("DGI Logger %d")
.build());
try { try {
Runtime.getRuntime().addShutdownHook( Runtime.getRuntime().addShutdownHook(new Thread() {
new Thread() {
@Override @Override
public void run() { public void run() {
try { try {
...@@ -137,8 +114,7 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo ...@@ -137,8 +114,7 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo
} }
// shutdown client // shutdown client
} }
} });
);
} catch (IllegalStateException is) { } catch (IllegalStateException is) {
LOG.info("Attempting to send msg while shutdown in progress."); LOG.info("Attempting to send msg while shutdown in progress.");
} }
...@@ -146,6 +122,19 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo ...@@ -146,6 +122,19 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo
LOG.info("Created DGI Hook"); LOG.info("Created DGI Hook");
} }
class HiveEvent {
public HiveConf conf;
public Set<ReadEntity> inputs;
public Set<WriteEntity> outputs;
public String user;
public HiveOperation operation;
public QueryPlan queryPlan;
public HookContext.HookType hookType;
public JSONObject jsonPlan;
}
@Override @Override
public void run(final HookContext hookContext) throws Exception { public void run(final HookContext hookContext) throws Exception {
if (executor == null) { if (executor == null) {
...@@ -154,44 +143,53 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo ...@@ -154,44 +143,53 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo
} }
// clone to avoid concurrent access // clone to avoid concurrent access
final HiveEvent event = new HiveEvent();
final HiveConf conf = new HiveConf(hookContext.getConf()); final HiveConf conf = new HiveConf(hookContext.getConf());
boolean debug = conf.get("hive.hook.dgi.synchronous", "false").equals("true"); boolean debug = conf.get("hive.hook.dgi.synchronous", "false").equals("true");
event.conf = conf;
event.inputs = hookContext.getInputs();
event.outputs = hookContext.getOutputs();
event.user = hookContext.getUserName() == null ? hookContext.getUgi().getUserName() : hookContext.getUserName();
event.operation = HiveOperation.valueOf(hookContext.getOperationName());
event.queryPlan = hookContext.getQueryPlan();
event.hookType = hookContext.getHookType();
//todo throws NPE
// event.jsonPlan = getQueryPlan(event);
event.jsonPlan = new JSONObject();
if (debug) { if (debug) {
fireAndForget(hookContext, conf); fireAndForget(event);
} else { } else {
executor.submit( executor.submit(new Runnable() {
new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
fireAndForget(hookContext, conf); fireAndForget(event);
} catch (Throwable e) { } catch (Throwable e) {
LOG.info("DGI hook failed", e); LOG.info("DGI hook failed", e);
} }
} }
} });
);
} }
} }
private void fireAndForget(HookContext hookContext, HiveConf conf) throws Exception { private void fireAndForget(HiveEvent event) throws Exception {
assert hookContext.getHookType() == HookContext.HookType.POST_EXEC_HOOK : "Non-POST_EXEC_HOOK not supported!"; assert event.hookType == HookContext.HookType.POST_EXEC_HOOK : "Non-POST_EXEC_HOOK not supported!";
LOG.info("Entered DGI hook for hook type {} operation {}", hookContext.getHookType(), LOG.info("Entered DGI hook for hook type {} operation {}", event.hookType, event.operation);
hookContext.getOperationName()); HiveMetaStoreBridge dgiBridge = new HiveMetaStoreBridge(event.conf);
HiveOperation operation = HiveOperation.valueOf(hookContext.getOperationName());
HiveMetaStoreBridge dgiBridge = new HiveMetaStoreBridge(conf);
if (!typesRegistered) { if (!typesRegistered) {
dgiBridge.registerHiveDataModel(); dgiBridge.registerHiveDataModel();
typesRegistered = true; typesRegistered = true;
} }
switch (operation) { switch (event.operation) {
case CREATEDATABASE: case CREATEDATABASE:
Set<WriteEntity> outputs = hookContext.getOutputs(); Set<WriteEntity> outputs = event.outputs;
for (WriteEntity entity : outputs) { for (WriteEntity entity : outputs) {
if (entity.getType() == Entity.Type.DATABASE) { if (entity.getType() == Entity.Type.DATABASE) {
dgiBridge.registerDatabase(entity.getDatabase().getName()); dgiBridge.registerDatabase(entity.getDatabase().getName());
...@@ -200,7 +198,7 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo ...@@ -200,7 +198,7 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo
break; break;
case CREATETABLE: case CREATETABLE:
outputs = hookContext.getOutputs(); outputs = event.outputs;
for (WriteEntity entity : outputs) { for (WriteEntity entity : outputs) {
if (entity.getType() == Entity.Type.TABLE) { if (entity.getType() == Entity.Type.TABLE) {
...@@ -214,41 +212,35 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo ...@@ -214,41 +212,35 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo
break; break;
case CREATETABLE_AS_SELECT: case CREATETABLE_AS_SELECT:
registerCTAS(dgiBridge, hookContext, conf); registerCTAS(dgiBridge, event);
break; break;
default: default:
} }
} }
private void registerCTAS(HiveMetaStoreBridge dgiBridge, HookContext hookContext, HiveConf conf) throws Exception { private void registerCTAS(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
Set<ReadEntity> inputs = hookContext.getInputs(); Set<ReadEntity> inputs = event.inputs;
Set<WriteEntity> outputs = hookContext.getOutputs(); Set<WriteEntity> outputs = event.outputs;
//Even explain CTAS has operation name as CREATETABLE_AS_SELECT //Even explain CTAS has operation name as CREATETABLE_AS_SELECT
if (inputs.isEmpty() && outputs.isEmpty()) { if (inputs.isEmpty() && outputs.isEmpty()) {
LOG.info("Explain statement. Skipping..."); LOG.info("Explain statement. Skipping...");
} }
//todo hookContext.getUserName() is null in hdp sandbox 2.2.4 if (event.queryPlan == null) {
String user = hookContext.getUserName() == null ? System.getProperty("user.name") : hookContext.getUserName(); LOG.info("Query plan is missing. Skipping...");
HiveOperation operation = HiveOperation.valueOf(hookContext.getOperationName());
String queryId = null;
String queryStr = null;
long queryStartTime = 0;
QueryPlan plan = hookContext.getQueryPlan();
if (plan != null) {
queryId = plan.getQueryId();
queryStr = plan.getQueryString();
queryStartTime = plan.getQueryStartTime();
} }
String queryId = event.queryPlan.getQueryId();
String queryStr = event.queryPlan.getQueryStr();
long queryStartTime = event.queryPlan.getQueryStartTime();
LOG.debug("Registering CTAS query: {}", queryStr); LOG.debug("Registering CTAS query: {}", queryStr);
Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName()); Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
processReferenceable.set("name", operation.getOperationName()); processReferenceable.set("name", event.operation.getOperationName());
processReferenceable.set("startTime", queryStartTime); processReferenceable.set("startTime", queryStartTime);
processReferenceable.set("userName", user); processReferenceable.set("userName", event.user);
List<Referenceable> source = new ArrayList<>(); List<Referenceable> source = new ArrayList<>();
for (ReadEntity readEntity : inputs) { for (ReadEntity readEntity : inputs) {
if (readEntity.getTyp() == Entity.Type.TABLE) { if (readEntity.getTyp() == Entity.Type.TABLE) {
...@@ -269,7 +261,7 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo ...@@ -269,7 +261,7 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo
processReferenceable.set("outputTables", target); processReferenceable.set("outputTables", target);
processReferenceable.set("queryText", queryStr); processReferenceable.set("queryText", queryStr);
processReferenceable.set("queryId", queryId); processReferenceable.set("queryId", queryId);
processReferenceable.set("queryPlan", getQueryPlan(hookContext, conf)); processReferenceable.set("queryPlan", event.jsonPlan.toString());
processReferenceable.set("endTime", System.currentTimeMillis()); processReferenceable.set("endTime", System.currentTimeMillis());
//TODO set //TODO set
...@@ -278,234 +270,10 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo ...@@ -278,234 +270,10 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo
} }
private String getQueryPlan(HookContext hookContext, HiveConf conf) throws Exception { private JSONObject getQueryPlan(HiveEvent event) throws Exception {
//We need to somehow get the sem associated with the plan and use it here.
MySemanticAnaylzer sem = new MySemanticAnaylzer(conf);
QueryPlan queryPlan = hookContext.getQueryPlan();
sem.setInputs(queryPlan.getInputs());
ExplainWork ew = new ExplainWork(null, null, queryPlan.getRootTasks(), queryPlan.getFetchTask(), null, sem,
false, true, false, false, false);
ExplainTask explain = new ExplainTask(); ExplainTask explain = new ExplainTask();
explain.initialize(conf, queryPlan, null); explain.initialize(event.conf, event.queryPlan, null);
List<Task<?>> rootTasks = event.queryPlan.getRootTasks();
org.json.JSONObject explainPlan = explain.getJSONPlan(null, ew); return explain.getJSONPlan(null, null, rootTasks, event.queryPlan.getFetchTask(), true, false, false);
return explainPlan.toString();
}
private void analyzeHiveParseTree(ASTNode ast) {
String astStr = ast.dump();
Tree tab = ast.getChild(0);
String fullTableName;
boolean isExternal = false;
boolean isTemporary = false;
String inputFormat = null;
String outputFormat = null;
String serde = null;
String storageHandler = null;
String likeTableName = null;
String comment = null;
Tree ctasNode = null;
Tree rowFormatNode = null;
String location = null;
Map<String, String> serdeProps = new HashMap<>();
try {
BufferedWriter fw = new BufferedWriter(
new FileWriter(new File("/tmp/dgi/", "ASTDump"), true));
fw.write("Full AST Dump" + astStr);
switch (ast.getToken().getType()) {
case HiveParser.TOK_CREATETABLE:
if (tab.getType() != HiveParser.TOK_TABNAME ||
(tab.getChildCount() != 1 && tab.getChildCount() != 2)) {
LOG.error("Ignoring malformed Create table statement");
}
if (tab.getChildCount() == 2) {
String dbName = BaseSemanticAnalyzer
.unescapeIdentifier(tab.getChild(0).getText());
String tableName = BaseSemanticAnalyzer
.unescapeIdentifier(tab.getChild(1).getText());
fullTableName = dbName + "." + tableName;
} else {
fullTableName = BaseSemanticAnalyzer
.unescapeIdentifier(tab.getChild(0).getText());
}
LOG.info("Creating table " + fullTableName);
int numCh = ast.getChildCount();
for (int num = 1; num < numCh; num++) {
ASTNode child = (ASTNode) ast.getChild(num);
// Handle storage format
switch (child.getToken().getType()) {
case HiveParser.TOK_TABLEFILEFORMAT:
if (child.getChildCount() < 2) {
throw new SemanticException(
"Incomplete specification of File Format. " +
"You must provide InputFormat, OutputFormat.");
}
inputFormat = BaseSemanticAnalyzer
.unescapeSQLString(child.getChild(0).getText());
outputFormat = BaseSemanticAnalyzer
.unescapeSQLString(child.getChild(1).getText());
if (child.getChildCount() == 3) {
serde = BaseSemanticAnalyzer
.unescapeSQLString(child.getChild(2).getText());
}
break;
case HiveParser.TOK_STORAGEHANDLER:
storageHandler = BaseSemanticAnalyzer
.unescapeSQLString(child.getChild(0).getText());
if (child.getChildCount() == 2) {
BaseSemanticAnalyzer.readProps(
(ASTNode) (child.getChild(1).getChild(0)),
serdeProps);
}
break;
case HiveParser.TOK_FILEFORMAT_GENERIC:
ASTNode grandChild = (ASTNode) child.getChild(0);
String name = (grandChild == null ? "" : grandChild.getText())
.trim().toUpperCase();
if (name.isEmpty()) {
LOG.error("File format in STORED AS clause is empty");
break;
}
break;
}
switch (child.getToken().getType()) {
case HiveParser.KW_EXTERNAL:
isExternal = true;
break;
case HiveParser.KW_TEMPORARY:
isTemporary = true;
break;
case HiveParser.TOK_LIKETABLE:
if (child.getChildCount() > 0) {
likeTableName = BaseSemanticAnalyzer
.getUnescapedName((ASTNode) child.getChild(0));
}
break;
case HiveParser.TOK_QUERY:
ctasNode = child;
break;
case HiveParser.TOK_TABLECOMMENT:
comment = BaseSemanticAnalyzer
.unescapeSQLString(child.getChild(0).getText());
break;
case HiveParser.TOK_TABLEPARTCOLS:
case HiveParser.TOK_TABCOLLIST:
case HiveParser.TOK_ALTERTABLE_BUCKETS:
break;
case HiveParser.TOK_TABLEROWFORMAT:
rowFormatNode = child;
break;
case HiveParser.TOK_TABLELOCATION:
location = BaseSemanticAnalyzer
.unescapeSQLString(child.getChild(0).getText());
break;
case HiveParser.TOK_TABLEPROPERTIES:
break;
case HiveParser.TOK_TABLESERIALIZER:
child = (ASTNode) child.getChild(0);
serde = BaseSemanticAnalyzer
.unescapeSQLString(child.getChild(0).getText());
break;
case HiveParser.TOK_TABLESKEWED:
break;
default:
throw new AssertionError("Unknown token: " + child.getToken());
}
}
StringBuilder sb = new StringBuilder(1024);
sb.append("Full table name: ").append(fullTableName).append('\n');
sb.append("\tisTemporary: ").append(isTemporary).append('\n');
sb.append("\tIsExternal: ").append(isExternal).append('\n');
if (inputFormat != null) {
sb.append("\tinputFormat: ").append(inputFormat).append('\n');
}
if (outputFormat != null) {
sb.append("\toutputFormat: ").append(outputFormat).append('\n');
}
if (serde != null) {
sb.append("\tserde: ").append(serde).append('\n');
}
if (storageHandler != null) {
sb.append("\tstorageHandler: ").append(storageHandler).append('\n');
}
if (likeTableName != null) {
sb.append("\tlikeTableName: ").append(likeTableName);
}
if (comment != null) {
sb.append("\tcomment: ").append(comment);
}
if (location != null) {
sb.append("\tlocation: ").append(location);
}
if (ctasNode != null) {
sb.append("\tctasNode: ").append(((ASTNode) ctasNode).dump());
}
if (rowFormatNode != null) {
sb.append("\trowFormatNode: ").append(((ASTNode) rowFormatNode).dump());
}
fw.write(sb.toString());
}
fw.flush();
fw.close();
} catch (Exception e) {
LOG.error("Unable to log logical plan to file", e);
}
}
private void parseQuery(String sqlText) throws Exception {
ParseDriver parseDriver = new ParseDriver();
ASTNode node = parseDriver.parse(sqlText);
analyzeHiveParseTree(node);
}
/**
* This is an attempt to use the parser. Sematnic issues are not handled here.
* <p/>
* Trying to recompile the query runs into some issues in the preExec
* hook but we need to make sure all the semantic issues are handled. May be we should save the AST in the
* Semantic analyzer and have it available in the preExec hook so that we walk with it freely.
*
* @param context
* @param ast
* @return
* @throws SemanticException
*/
@Override
public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context, ASTNode ast)
throws SemanticException {
analyzeHiveParseTree(ast);
return ast;
}
@Override
public void postAnalyze(HiveSemanticAnalyzerHookContext context,
List<Task<? extends Serializable>> rootTasks) throws SemanticException {
}
private class MySemanticAnaylzer extends BaseSemanticAnalyzer {
public MySemanticAnaylzer(HiveConf conf) throws SemanticException {
super(conf);
}
public void analyzeInternal(ASTNode ast) throws SemanticException {
throw new RuntimeException("Not implemented");
}
public void setInputs(HashSet<ReadEntity> inputs) {
this.inputs = inputs;
}
} }
} }
...@@ -280,6 +280,8 @@ public class HiveDataModelGenerator { ...@@ -280,6 +280,8 @@ public class HiveDataModelGenerator {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition("name", DataTypes.STRING_TYPE.getName(), new AttributeDefinition("name", DataTypes.STRING_TYPE.getName(),
Multiplicity.REQUIRED, false, null), Multiplicity.REQUIRED, false, null),
new AttributeDefinition("clusterName", DataTypes.STRING_TYPE.getName(),
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("description", DataTypes.STRING_TYPE.getName(), new AttributeDefinition("description", DataTypes.STRING_TYPE.getName(),
Multiplicity.OPTIONAL, false, null), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("locationUri", DataTypes.STRING_TYPE.getName(), new AttributeDefinition("locationUri", DataTypes.STRING_TYPE.getName(),
...@@ -322,8 +324,6 @@ public class HiveDataModelGenerator { ...@@ -322,8 +324,6 @@ public class HiveDataModelGenerator {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition("name", DataTypes.STRING_TYPE.getName(), new AttributeDefinition("name", DataTypes.STRING_TYPE.getName(),
Multiplicity.REQUIRED, false, null), Multiplicity.REQUIRED, false, null),
//new AttributeDefinition("type", DefinedTypes.HIVE_TYPE.getName(), Multiplicity
// .REQUIRED, false, null),
new AttributeDefinition("type", DataTypes.STRING_TYPE.getName(), new AttributeDefinition("type", DataTypes.STRING_TYPE.getName(),
Multiplicity.REQUIRED, false, null), Multiplicity.REQUIRED, false, null),
new AttributeDefinition("comment", DataTypes.STRING_TYPE.getName(), new AttributeDefinition("comment", DataTypes.STRING_TYPE.getName(),
......
...@@ -29,6 +29,10 @@ hive conf directory: ...@@ -29,6 +29,10 @@ hive conf directory:
<name>hive.hook.dgi.url</name> <name>hive.hook.dgi.url</name>
<value>http://localhost:21000/</value> <value>http://localhost:21000/</value>
</property> </property>
<property>
<name>hive.cluster.name</name>
<value>primary</value>
</property>
</verbatim> </verbatim>
Usage: <dgi package>/bin/import-hive.sh. The logs are in <dgi package>/logs/import-hive.log Usage: <dgi package>/bin/import-hive.sh. The logs are in <dgi package>/logs/import-hive.log
...@@ -44,12 +48,16 @@ The hook submits the request to a thread pool executor to avoid blocking the com ...@@ -44,12 +48,16 @@ The hook submits the request to a thread pool executor to avoid blocking the com
<value>org.apache.hadoop.metadata.hive.hook.HiveHook</value> <value>org.apache.hadoop.metadata.hive.hook.HiveHook</value>
</property> </property>
</verbatim> </verbatim>
* Add the following property in hive-ste.xml with the DGI endpoint for your set-up * Add the following properties in hive-ste.xml with the DGI endpoint for your set-up
<verbatim> <verbatim>
<property> <property>
<name>hive.hook.dgi.url</name> <name>hive.hook.dgi.url</name>
<value>http://localhost:21000/</value> <value>http://localhost:21000/</value>
</property> </property>
<property>
<name>hive.cluster.name</name>
<value>primary</value>
</property>
</verbatim> </verbatim>
* Add 'export HIVE_AUX_JARS_PATH=<dgi package>/hook/hive' in hive-env.sh * Add 'export HIVE_AUX_JARS_PATH=<dgi package>/hook/hive' in hive-env.sh
......
...@@ -24,16 +24,15 @@ import org.apache.hadoop.hive.ql.Driver; ...@@ -24,16 +24,15 @@ import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.metadata.MetadataServiceClient; import org.apache.hadoop.metadata.MetadataServiceClient;
import org.apache.hadoop.metadata.hive.bridge.HiveMetaStoreBridge; import org.apache.hadoop.metadata.hive.bridge.HiveMetaStoreBridge;
import org.apache.hadoop.metadata.hive.model.HiveDataModelGenerator;
import org.apache.hadoop.metadata.hive.model.HiveDataTypes; import org.apache.hadoop.metadata.hive.model.HiveDataTypes;
import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.testng.Assert; import org.testng.Assert;
import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test; import org.testng.annotations.Test;
public class HiveHookIT { public class HiveHookIT {
private static final String DGI_URL = "http://localhost:21000/"; private static final String DGI_URL = "http://localhost:21000/";
private static final String CLUSTER_NAME = "test";
private Driver driver; private Driver driver;
private MetadataServiceClient dgiCLient; private MetadataServiceClient dgiCLient;
private SessionState ss; private SessionState ss;
...@@ -59,6 +58,7 @@ public class HiveHookIT { ...@@ -59,6 +58,7 @@ public class HiveHookIT {
hiveConf.set(HiveMetaStoreBridge.DGI_URL_PROPERTY, DGI_URL); hiveConf.set(HiveMetaStoreBridge.DGI_URL_PROPERTY, DGI_URL);
hiveConf.set("javax.jdo.option.ConnectionURL", "jdbc:derby:./target/metastore_db;create=true"); hiveConf.set("javax.jdo.option.ConnectionURL", "jdbc:derby:./target/metastore_db;create=true");
hiveConf.set("hive.hook.dgi.synchronous", "true"); hiveConf.set("hive.hook.dgi.synchronous", "true");
hiveConf.set(HiveMetaStoreBridge.HIVE_CLUSTER_NAME, CLUSTER_NAME);
return hiveConf; return hiveConf;
} }
...@@ -82,11 +82,11 @@ public class HiveHookIT { ...@@ -82,11 +82,11 @@ public class HiveHookIT {
String tableName = "table" + RandomStringUtils.randomAlphanumeric(5).toLowerCase(); String tableName = "table" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
runCommand("create table " + dbName + "." + tableName + "(id int, name string)"); runCommand("create table " + dbName + "." + tableName + "(id int, name string)");
assertTableIsRegistered(tableName); assertTableIsRegistered(dbName, tableName);
tableName = "table" + RandomStringUtils.randomAlphanumeric(5).toLowerCase(); tableName = "table" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
runCommand("create table " + tableName + "(id int, name string)"); runCommand("create table " + tableName + "(id int, name string)");
assertTableIsRegistered(tableName); assertTableIsRegistered("default", tableName);
//Create table where database doesn't exist, will create database instance as well //Create table where database doesn't exist, will create database instance as well
assertDatabaseIsRegistered("default"); assertDatabaseIsRegistered("default");
...@@ -97,24 +97,33 @@ public class HiveHookIT { ...@@ -97,24 +97,33 @@ public class HiveHookIT {
String tableName = "table" + RandomStringUtils.randomAlphanumeric(5).toLowerCase(); String tableName = "table" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
runCommand("create table " + tableName + "(id int, name string)"); runCommand("create table " + tableName + "(id int, name string)");
String newTableName = "table" + RandomStringUtils.randomAlphanumeric(5).toLowerCase(); String ctasTableName = "table" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
String query = "create table " + newTableName + " as select * from " + tableName; String query = "create table " + ctasTableName + " as select * from " + tableName;
runCommand(query); runCommand(query);
assertTableIsRegistered(newTableName); assertTableIsRegistered("default", ctasTableName);
assertInstanceIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), "queryText", query); assertProcessIsRegistered(query);
} }
private void assertTableIsRegistered(String tableName) throws Exception { private void assertProcessIsRegistered(String queryStr) throws Exception {
assertInstanceIsRegistered(HiveDataTypes.HIVE_TABLE.getName(), "name", tableName); String dslQuery = String.format("%s where queryText = '%s'", HiveDataTypes.HIVE_PROCESS.getName(), queryStr);
assertInstanceIsRegistered(dslQuery);
}
private void assertTableIsRegistered(String dbName, String tableName) throws Exception {
String query = String.format("%s where name = '%s', dbName where name = '%s' and clusterName = '%s'",
HiveDataTypes.HIVE_TABLE.getName(), tableName, dbName, CLUSTER_NAME);
assertInstanceIsRegistered(query);
} }
private void assertDatabaseIsRegistered(String dbName) throws Exception { private void assertDatabaseIsRegistered(String dbName) throws Exception {
assertInstanceIsRegistered(HiveDataTypes.HIVE_DB.getName(), "name", dbName); String query = String.format("%s where name = '%s' and clusterName = '%s'", HiveDataTypes.HIVE_DB.getName(),
dbName, CLUSTER_NAME);
assertInstanceIsRegistered(query);
} }
private void assertInstanceIsRegistered(String typeName, String colName, String colValue) throws Exception{ private void assertInstanceIsRegistered(String dslQuery) throws Exception{
JSONArray results = dgiCLient.rawSearch(typeName, colName, colValue); JSONArray results = dgiCLient.searchByDSL(dslQuery);
Assert.assertEquals(results.length(), 1); Assert.assertEquals(results.length(), 1);
} }
} }
...@@ -205,40 +205,6 @@ public class SSLAndKerberosHiveHookIT extends BaseSSLAndKerberosTest { ...@@ -205,40 +205,6 @@ public class SSLAndKerberosHiveHookIT extends BaseSSLAndKerberosTest {
assertDatabaseIsRegistered(dbName); assertDatabaseIsRegistered(dbName);
} }
@Test
public void testCreateTable() throws Exception {
String dbName = "db" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
runCommand("create database " + dbName);
String tableName = "table" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
runCommand("create table " + dbName + "." + tableName + "(id int, name string)");
assertTableIsRegistered(tableName);
tableName = "table" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
runCommand("create table " + tableName + "(id int, name string)");
assertTableIsRegistered(tableName);
//Create table where database doesn't exist, will create database instance as well
assertDatabaseIsRegistered("default");
}
@Test
public void testCTAS() throws Exception {
String tableName = "table" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
runCommand("create table " + tableName + "(id int, name string)");
String newTableName = "table" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
String query = "create table " + newTableName + " as select * from " + tableName;
runCommand(query);
assertTableIsRegistered(newTableName);
assertInstanceIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), "queryText", query);
}
private void assertTableIsRegistered(String tableName) throws Exception {
assertInstanceIsRegistered(HiveDataTypes.HIVE_TABLE.getName(), "name", tableName);
}
private void assertDatabaseIsRegistered(String dbName) throws Exception { private void assertDatabaseIsRegistered(String dbName) throws Exception {
assertInstanceIsRegistered(HiveDataTypes.HIVE_DB.getName(), "name", dbName); assertInstanceIsRegistered(HiveDataTypes.HIVE_DB.getName(), "name", dbName);
} }
......
...@@ -208,40 +208,6 @@ public class SSLHiveHookIT { ...@@ -208,40 +208,6 @@ public class SSLHiveHookIT {
assertDatabaseIsRegistered(dbName); assertDatabaseIsRegistered(dbName);
} }
@Test
public void testCreateTable() throws Exception {
String dbName = "db" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
runCommand("create database " + dbName);
String tableName = "table" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
runCommand("create table " + dbName + "." + tableName + "(id int, name string)");
assertTableIsRegistered(tableName);
tableName = "table" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
runCommand("create table " + tableName + "(id int, name string)");
assertTableIsRegistered(tableName);
//Create table where database doesn't exist, will create database instance as well
assertDatabaseIsRegistered("default");
}
@Test
public void testCTAS() throws Exception {
String tableName = "table" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
runCommand("create table " + tableName + "(id int, name string)");
String newTableName = "table" + RandomStringUtils.randomAlphanumeric(5).toLowerCase();
String query = "create table " + newTableName + " as select * from " + tableName;
runCommand(query);
assertTableIsRegistered(newTableName);
assertInstanceIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), "queryText", query);
}
private void assertTableIsRegistered(String tableName) throws Exception {
assertInstanceIsRegistered(HiveDataTypes.HIVE_TABLE.getName(), "name", tableName);
}
private void assertDatabaseIsRegistered(String dbName) throws Exception { private void assertDatabaseIsRegistered(String dbName) throws Exception {
assertInstanceIsRegistered(HiveDataTypes.HIVE_DB.getName(), "name", dbName); assertInstanceIsRegistered(HiveDataTypes.HIVE_DB.getName(), "name", dbName);
} }
......
...@@ -21,6 +21,7 @@ package org.apache.hadoop.metadata.typesystem.types; ...@@ -21,6 +21,7 @@ package org.apache.hadoop.metadata.typesystem.types;
import com.google.common.collect.ImmutableCollection; import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.metadata.MetadataException; import org.apache.hadoop.metadata.MetadataException;
import scala.math.BigInt;
public class EnumType extends AbstractDataType<EnumValue> { public class EnumType extends AbstractDataType<EnumValue> {
...@@ -54,7 +55,7 @@ public class EnumType extends AbstractDataType<EnumValue> { ...@@ -54,7 +55,7 @@ public class EnumType extends AbstractDataType<EnumValue> {
EnumValue e = null; EnumValue e = null;
if (val instanceof EnumValue) { if (val instanceof EnumValue) {
e = valueMap.get(((EnumValue)val).value); e = valueMap.get(((EnumValue)val).value);
} else if ( val instanceof Integer) { } else if ( val instanceof Integer || val instanceof BigInt) {
e = ordinalMap.get(val); e = ordinalMap.get(val);
} else if ( val instanceof String) { } else if ( val instanceof String) {
e = valueMap.get(val); e = valueMap.get(val);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment