diff --git a/hivehook/src/main/java/com/aetna/hadoop/dgc/hive/HiveLineageBean.java b/hivehook/src/main/java/com/aetna/hadoop/dgc/hive/HiveLineageBean.java index 9a17c05..734f87d 100644 --- a/hivehook/src/main/java/com/aetna/hadoop/dgc/hive/HiveLineageBean.java +++ b/hivehook/src/main/java/com/aetna/hadoop/dgc/hive/HiveLineageBean.java @@ -11,12 +11,16 @@ public class HiveLineageBean implements Serializable { */ private static final long serialVersionUID = 1L; public String queryId; + public String hiveId; public String user; public String queryStartTime; public String queryEndTime; public String query; public String tableName; public String tableLocation; + public boolean success; + public boolean failed; + public String executionEngine; ArrayList<SourceTables> sourceTables; ArrayList<QueryColumns> queryColumns; ArrayList<WhereClause> whereClause; @@ -34,6 +38,38 @@ public class HiveLineageBean implements Serializable { this.queryId = queryId; } + public String getExecutionEngine() { + return this.executionEngine ; + } + + public void setExecutionEngine(String executionEngine) { + this.executionEngine = executionEngine; + } + + public String getHiveId() { + return this.hiveId ; + } + + public void setHiveId(String hiveId) { + this.hiveId = hiveId; + } + + public boolean getSuccess() { + return this.success ; + } + + public void setSuccess(boolean success) { + this.success = success; + } + + public boolean getFailed() { + return this.failed ; + } + + public void setFailed(boolean failed) { + this.failed = failed; + } + public String getTableName() { return this.tableName ; diff --git a/hivehook/src/main/java/com/aetna/hadoop/dgc/hive/Hook.java b/hivehook/src/main/java/com/aetna/hadoop/dgc/hive/Hook.java index 8ae5597..a16c73f 100644 --- a/hivehook/src/main/java/com/aetna/hadoop/dgc/hive/Hook.java +++ b/hivehook/src/main/java/com/aetna/hadoop/dgc/hive/Hook.java @@ -15,6 +15,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Set; + import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; @@ -28,9 +29,11 @@ import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.ExplainTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; import org.apache.hadoop.hive.ql.hooks.HookContext; import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.util.StringUtils; //import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; //import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; @@ -56,6 +59,7 @@ public class Hook implements ExecuteWithHookContext { @Override public void run(HookContext hookContext) throws Exception { long currentTime = System.currentTimeMillis(); + String executionEngine = null; try { QueryPlan plan = hookContext.getQueryPlan(); if (plan == null) { @@ -70,6 +74,18 @@ public class Hook implements ExecuteWithHookContext { String query = plan.getQueryStr(); int numMrJobs = Utilities.getMRTasks(plan.getRootTasks()).size(); int numTezJobs = Utilities.getTezTasks(plan.getRootTasks()).size(); + String hiveId = explain.getId(); + SessionState sess = SessionState.get(); + + if (numTezJobs > 0) { + executionEngine="tez"; + } + if (numMrJobs > 0) { + executionEngine="mr"; + + } + hiveId = sess.getSessionId(); + switch(hookContext.getHookType()) { case PRE_EXEC_HOOK: @@ -77,23 +93,76 @@ public class Hook implements ExecuteWithHookContext { for (Object o : db) { LOG.error("DB:Table="+o.toString()); } + + currentTime = System.currentTimeMillis(); + HiveLineageInfo lep_pre = new HiveLineageInfo(); + lep_pre.getLineageInfo(query); + hlb=lep_pre.getHLBean(); + hlb.setQueryEndTime(Long.toString(currentTime)); + hlb.setQueryId(queryId); + hlb.setQuery(query); + hlb.setUser(user); + hlb.setHiveId(hiveId); + hlb.setSuccess(false); + if (executionEngine.equalsIgnoreCase("mr")) { + hlb.setExecutionEngine("mapreduce"); + } + if (executionEngine.equalsIgnoreCase("tez")) { + hlb.setExecutionEngine("tez"); + } + if (executionEngine.equalsIgnoreCase("spark")) { + hlb.setExecutionEngine("spark"); + } + hlb.setQueryStartTime(queryStartTime); + fireAndForget(hookContext.getConf(), hlb, queryId); break; case POST_EXEC_HOOK: currentTime = System.currentTimeMillis(); - HiveLineageInfo lep = new HiveLineageInfo(); - lep.getLineageInfo(query); - hlb=lep.getHLBean(); + HiveLineageInfo lep_post = new HiveLineageInfo(); + lep_post.getLineageInfo(query); + hlb=lep_post.getHLBean(); hlb.setQueryEndTime(Long.toString(currentTime)); hlb.setQueryId(queryId); hlb.setQuery(query); hlb.setUser(user); hlb.setQueryStartTime(queryStartTime); + hlb.setSuccess(true); + hlb.setHiveId(hiveId); + if (executionEngine.equalsIgnoreCase("mr")) { + hlb.setExecutionEngine("mapreduce"); + } + if (executionEngine.equalsIgnoreCase("tez")) { + hlb.setExecutionEngine("tez"); + } + if (executionEngine.equalsIgnoreCase("spark")) { + hlb.setExecutionEngine("spark"); + } fireAndForget(hookContext.getConf(), hlb, queryId); break; case ON_FAILURE_HOOK: - // ignore + HiveLineageInfo lep_failed = new HiveLineageInfo(); + lep_failed.getLineageInfo(query); + hlb=lep_failed.getHLBean(); + hlb.setQueryEndTime(Long.toString(currentTime)); + hlb.setQueryId(queryId); + hlb.setQuery(query); + hlb.setUser(user); + hlb.setQueryStartTime(queryStartTime); + hlb.setSuccess(false); + hlb.setFailed(true); + hlb.setHiveId(hiveId); + if (executionEngine.equalsIgnoreCase("mr")) { + hlb.setExecutionEngine("mapreduce"); + } + if (executionEngine.equalsIgnoreCase("tez")) { + hlb.setExecutionEngine("tez"); + } + if (executionEngine.equalsIgnoreCase("spark")) { + hlb.setExecutionEngine("spark"); + } + fireAndForget(hookContext.getConf(), hlb, queryId); break; default: //ignore @@ -111,7 +180,7 @@ public class Hook implements ExecuteWithHookContext { } Gson gson = new Gson(); String gsonString = gson.toJson(hookData); - System.out.println("GSON String: "+gsonString); + LOG.debug("GSON String: "+gsonString); String encodedGsonQuery = URLEncoder.encode(gsonString, "UTF-8"); String encodedQueryId = URLEncoder.encode(queryId, "UTF-8"); String postData = "hookdata=" + encodedGsonQuery+"&queryid="+encodedQueryId; @@ -140,7 +209,7 @@ public class Hook implements ExecuteWithHookContext { } } URL url = new URL(postUri); - System.out.println("Post URI: "+postUri); + LOG.debug("Post URI: "+postUri); DataOutputStream wr = null; //HttpURLConnection urlcon = null; if (postUri.contains("https:")) { @@ -153,7 +222,7 @@ public class Hook implements ExecuteWithHookContext { urlcon.setDoInput(true); urlcon.setDoOutput(true); wr = new DataOutputStream (urlcon.getOutputStream()); - System.out.println("PostString: "+postData); + LOG.debug("PostString: "+postData); //wr.writeBytes(postString.); wr.write(postData.getBytes()); @@ -172,7 +241,7 @@ public class Hook implements ExecuteWithHookContext { } String result = sb.toString(); - System.out.println("Post Response: "+result); + LOG.debug("Post Response: "+result); isr.close(); is.close(); urlcon.disconnect(); @@ -186,7 +255,7 @@ public class Hook implements ExecuteWithHookContext { urlcon.setDoInput(true); urlcon.setDoOutput(true); wr = new DataOutputStream (urlcon.getOutputStream()); - System.out.println("PostString: "+postData); + LOG.debug("PostString: "+postData); //wr.writeBytes(postString.); wr.write(postData.getBytes()); @@ -205,7 +274,7 @@ public class Hook implements ExecuteWithHookContext { } String result = sb.toString(); - System.out.println("Post Response: "+result); + LOG.debug("Post Response: "+result); isr.close(); is.close(); urlcon.disconnect();