Commit 21197b67 by a760104

Commit hivehook changes: add executionEngine type, change log.info to

log.debug, grab hive.session.id so that it can be linked to a running job if needed.
parent b8308a31
......@@ -11,12 +11,16 @@ public class HiveLineageBean implements Serializable {
*/
private static final long serialVersionUID = 1L;
public String queryId;
public String hiveId;
public String user;
public String queryStartTime;
public String queryEndTime;
public String query;
public String tableName;
public String tableLocation;
public boolean success;
public boolean failed;
public String executionEngine;
ArrayList<SourceTables> sourceTables;
ArrayList<QueryColumns> queryColumns;
ArrayList<WhereClause> whereClause;
......@@ -34,6 +38,38 @@ public class HiveLineageBean implements Serializable {
this.queryId = queryId;
}
public String getExecutionEngine() {
return this.executionEngine ;
}
public void setExecutionEngine(String executionEngine) {
this.executionEngine = executionEngine;
}
public String getHiveId() {
return this.hiveId ;
}
public void setHiveId(String hiveId) {
this.hiveId = hiveId;
}
public boolean getSuccess() {
return this.success ;
}
public void setSuccess(boolean success) {
this.success = success;
}
public boolean getFailed() {
return this.failed ;
}
public void setFailed(boolean failed) {
this.failed = failed;
}
public String getTableName() {
return this.tableName ;
......
......@@ -15,6 +15,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
......@@ -28,9 +29,11 @@ import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.ExplainTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.util.StringUtils;
//import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
//import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
......@@ -56,6 +59,7 @@ public class Hook implements ExecuteWithHookContext {
@Override
public void run(HookContext hookContext) throws Exception {
long currentTime = System.currentTimeMillis();
String executionEngine = null;
try {
QueryPlan plan = hookContext.getQueryPlan();
if (plan == null) {
......@@ -70,6 +74,18 @@ public class Hook implements ExecuteWithHookContext {
String query = plan.getQueryStr();
int numMrJobs = Utilities.getMRTasks(plan.getRootTasks()).size();
int numTezJobs = Utilities.getTezTasks(plan.getRootTasks()).size();
String hiveId = explain.getId();
SessionState sess = SessionState.get();
if (numTezJobs > 0) {
executionEngine="tez";
}
if (numMrJobs > 0) {
executionEngine="mr";
}
hiveId = sess.getSessionId();
switch(hookContext.getHookType()) {
case PRE_EXEC_HOOK:
......@@ -78,22 +94,75 @@ public class Hook implements ExecuteWithHookContext {
LOG.error("DB:Table="+o.toString());
}
currentTime = System.currentTimeMillis();
HiveLineageInfo lep_pre = new HiveLineageInfo();
lep_pre.getLineageInfo(query);
hlb=lep_pre.getHLBean();
hlb.setQueryEndTime(Long.toString(currentTime));
hlb.setQueryId(queryId);
hlb.setQuery(query);
hlb.setUser(user);
hlb.setHiveId(hiveId);
hlb.setSuccess(false);
if (executionEngine.equalsIgnoreCase("mr")) {
hlb.setExecutionEngine("mapreduce");
}
if (executionEngine.equalsIgnoreCase("tez")) {
hlb.setExecutionEngine("tez");
}
if (executionEngine.equalsIgnoreCase("spark")) {
hlb.setExecutionEngine("spark");
}
hlb.setQueryStartTime(queryStartTime);
fireAndForget(hookContext.getConf(), hlb, queryId);
break;
case POST_EXEC_HOOK:
currentTime = System.currentTimeMillis();
HiveLineageInfo lep = new HiveLineageInfo();
lep.getLineageInfo(query);
hlb=lep.getHLBean();
HiveLineageInfo lep_post = new HiveLineageInfo();
lep_post.getLineageInfo(query);
hlb=lep_post.getHLBean();
hlb.setQueryEndTime(Long.toString(currentTime));
hlb.setQueryId(queryId);
hlb.setQuery(query);
hlb.setUser(user);
hlb.setQueryStartTime(queryStartTime);
hlb.setSuccess(true);
hlb.setHiveId(hiveId);
if (executionEngine.equalsIgnoreCase("mr")) {
hlb.setExecutionEngine("mapreduce");
}
if (executionEngine.equalsIgnoreCase("tez")) {
hlb.setExecutionEngine("tez");
}
if (executionEngine.equalsIgnoreCase("spark")) {
hlb.setExecutionEngine("spark");
}
fireAndForget(hookContext.getConf(), hlb, queryId);
break;
case ON_FAILURE_HOOK:
// ignore
HiveLineageInfo lep_failed = new HiveLineageInfo();
lep_failed.getLineageInfo(query);
hlb=lep_failed.getHLBean();
hlb.setQueryEndTime(Long.toString(currentTime));
hlb.setQueryId(queryId);
hlb.setQuery(query);
hlb.setUser(user);
hlb.setQueryStartTime(queryStartTime);
hlb.setSuccess(false);
hlb.setFailed(true);
hlb.setHiveId(hiveId);
if (executionEngine.equalsIgnoreCase("mr")) {
hlb.setExecutionEngine("mapreduce");
}
if (executionEngine.equalsIgnoreCase("tez")) {
hlb.setExecutionEngine("tez");
}
if (executionEngine.equalsIgnoreCase("spark")) {
hlb.setExecutionEngine("spark");
}
fireAndForget(hookContext.getConf(), hlb, queryId);
break;
default:
//ignore
......@@ -111,7 +180,7 @@ public class Hook implements ExecuteWithHookContext {
}
Gson gson = new Gson();
String gsonString = gson.toJson(hookData);
System.out.println("GSON String: "+gsonString);
LOG.debug("GSON String: "+gsonString);
String encodedGsonQuery = URLEncoder.encode(gsonString, "UTF-8");
String encodedQueryId = URLEncoder.encode(queryId, "UTF-8");
String postData = "hookdata=" + encodedGsonQuery+"&queryid="+encodedQueryId;
......@@ -140,7 +209,7 @@ public class Hook implements ExecuteWithHookContext {
}
}
URL url = new URL(postUri);
System.out.println("Post URI: "+postUri);
LOG.debug("Post URI: "+postUri);
DataOutputStream wr = null;
//HttpURLConnection urlcon = null;
if (postUri.contains("https:")) {
......@@ -153,7 +222,7 @@ public class Hook implements ExecuteWithHookContext {
urlcon.setDoInput(true);
urlcon.setDoOutput(true);
wr = new DataOutputStream (urlcon.getOutputStream());
System.out.println("PostString: "+postData);
LOG.debug("PostString: "+postData);
//wr.writeBytes(postString.);
wr.write(postData.getBytes());
......@@ -172,7 +241,7 @@ public class Hook implements ExecuteWithHookContext {
}
String result = sb.toString();
System.out.println("Post Response: "+result);
LOG.debug("Post Response: "+result);
isr.close();
is.close();
urlcon.disconnect();
......@@ -186,7 +255,7 @@ public class Hook implements ExecuteWithHookContext {
urlcon.setDoInput(true);
urlcon.setDoOutput(true);
wr = new DataOutputStream (urlcon.getOutputStream());
System.out.println("PostString: "+postData);
LOG.debug("PostString: "+postData);
//wr.writeBytes(postString.);
wr.write(postData.getBytes());
......@@ -205,7 +274,7 @@ public class Hook implements ExecuteWithHookContext {
}
String result = sb.toString();
System.out.println("Post Response: "+result);
LOG.debug("Post Response: "+result);
isr.close();
is.close();
urlcon.disconnect();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment