Commit 9cdc25ee by a760104

Initial Committ of Hivehook written by aetna..

todo: add more parser functionality around astnode specifically around partitions, drops, updates and alters
parent 82209db0
<factorypath>
<factorypathentry kind="PLUGIN" id="com.ibm.jee.annotations.processor" enabled="true" runInBatchMode="false"/>
<factorypathentry kind="PLUGIN" id="com.ibm.etools.javaee.cdi.ext.ui" enabled="false" runInBatchMode="false"/>
<factorypathentry kind="PLUGIN" id="com.ibm.jaxrs.annotations.processor" enabled="false" runInBatchMode="false"/>
<factorypathentry kind="PLUGIN" id="org.eclipse.jst.ws.annotations.core" enabled="true" runInBatchMode="false"/>
</factorypath>
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop.metadata</groupId>
<artifactId>metadata-governance</artifactId>
<version>0.1-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.hadoop.metadata</groupId>
<artifactId>metadata-hivehook</artifactId>
<version>0.1-incubating-SNAPSHOT</version>
<name>metadata-hivehook</name>
<packaging>jar</packaging>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<repositories>
<repository>
<id>central</id>
<!-- This should be at top, it makes maven try the central repo first and then others and hence faster dep resolution -->
<name>Maven Repository</name>
<url>https://repo1.maven.org/maven2</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>apache-repo</id>
<name>Apache Repository</name>
<url>https://repository.apache.org/content/repositories/releases</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<fork>true</fork>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.3.2</version>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-common</artifactId>
<version>0.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>0.13.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
package com.aetna.hadoop.dgc.hive;
import java.io.Serializable;
import java.util.List;
import java.util.ArrayList;
public class HiveLineageBean implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
public String queryId;
public String user;
public String queryStartTime;
public String queryEndTime;
public String query;
public String tableName;
public String tableLocation;
ArrayList<SourceTables> sourceTables;
ArrayList<QueryColumns> queryColumns;
ArrayList<WhereClause> whereClause;
ArrayList<CreateColumns> createColumns;
ArrayList<GroupBy> groupBy;
ArrayList<GroupBy> orderBy;
public String getQueryId() {
return this.queryId ;
}
public void setQueryId(String queryId) {
this.queryId = queryId;
}
public String getTableName() {
return this.tableName ;
}
public void setTableName(String tableName) {
this.tableName = tableName;
}
public String getTableLocation() {
return this.tableLocation ;
}
public void setTableLocation(String tableLocation) {
this.tableLocation = tableLocation;
}
public String getUser() {
return this.user ;
}
public void setUser(String user) {
this.user = user;
}
public String getQueryStartTime() {
return this.queryStartTime ;
}
public void setQueryStartTime(String queryStartTime) {
this.queryStartTime = queryStartTime;
}
public String getQueryEndTime() {
return this.queryEndTime ;
}
public void setQueryEndTime(String queryEndTime) {
this.queryEndTime = queryEndTime;
}
public String getQuery() {
return this.query ;
}
public void setQuery(String query) {
this.query = query;
}
public ArrayList<SourceTables> getSourceTables() {
return this.sourceTables ;
}
public void setSourceTables(ArrayList<SourceTables> sourceTables) {
this.sourceTables = sourceTables;
}
public ArrayList<QueryColumns> getQueryColumns() {
return this.queryColumns ;
}
public void setQueryColumns(ArrayList<QueryColumns> queryColumns) {
this.queryColumns = queryColumns;
}
public ArrayList<WhereClause> getWhereClause() {
return this.whereClause ;
}
public void setWhereClause(ArrayList<WhereClause> whereClause) {
this.whereClause = whereClause;
}
public ArrayList<GroupBy> getGroupBy() {
return this.groupBy ;
}
public void setGroupBy(ArrayList<GroupBy> groupBy) {
this.groupBy = groupBy;
}
public class SourceTables {
public String tableName;
public String tableAlias;
public String databaseName;
public String getTableName() {
return this.tableName ;
}
public void setTableName(String tableName) {
this.tableName = tableName;
}
public String getTableAlias() {
return this.tableAlias ;
}
public void setTableAlias(String tableAlias) {
this.tableAlias = tableAlias;
}
public String getDatabaseName() {
return this.databaseName ;
}
public void setDatabaseName(String databaseName) {
this.databaseName = databaseName;
}
}
public class QueryColumns {
public String tbAliasOrName;
public String columnName;
public String columnAlias;
public String columnFunction;
public String getTbAliasOrName() {
return this.tbAliasOrName ;
}
public void setTbAliasOrName(String tbAliasOrName) {
this.tbAliasOrName = tbAliasOrName;
}
public String getColumnName() {
return this.columnName ;
}
public void setColumnName(String columnName) {
this.columnName = columnName;
}
public String getColumnAlias() {
return this.columnAlias ;
}
public void setColumnAlias(String columnAlias) {
this.columnAlias = columnAlias;
}
public String getColumnFunction() {
return this.columnFunction ;
}
public void setColumnFunction(String columnFunction) {
this.columnFunction = columnFunction;
}
}
public class GroupBy {
public String tbAliasOrName;
public String columnName;
public String getTbAliasOrName() {
return this.tbAliasOrName ;
}
public void setTbAliasOrName(String tbAliasOrName) {
this.tbAliasOrName = tbAliasOrName;
}
public String getColumnName() {
return this.columnName ;
}
public void setColumnName(String columnName) {
this.columnName = columnName;
}
}
public class WhereClause {
public String tbAliasOrName;
public String columnCondition;
public String columnName;
public String columnOperator;
public String columnValue;
public String getColumnCondition() {
return this.columnCondition ;
}
public void setColumnCondition(String columnCondition) {
this.columnCondition = columnCondition;
}
public String getTbAliasOrName() {
return this.tbAliasOrName ;
}
public void setTbAliasOrName(String tbAliasOrName) {
this.tbAliasOrName = tbAliasOrName;
}
public String getColumnName() {
return this.columnName ;
}
public void setColumnName(String columnName) {
this.columnName = columnName;
}
public String getColumnOperator() {
return this.columnOperator ;
}
public void setColumnOperator(String columnOperator) {
this.columnOperator = columnOperator;
}
public String getColumnValue() {
return this.columnValue ;
}
public void setColumnValue(String columnValue) {
this.columnValue = columnValue;
}
}
public ArrayList<CreateColumns> getCreateColumns() {
return this.createColumns ;
}
public void setCreateColumns(ArrayList<CreateColumns> createColumns) {
this.createColumns = createColumns;
}
public class CreateColumns {
public String columnName;
public String columnType;
public String getColumnName() {
return this.columnName ;
}
public void setColumnName(String columnName) {
this.columnName = columnName;
}
public String getColumnType() {
return this.columnType ;
}
public void setColumnType(String columnType) {
this.columnType = columnType;
}
}
}
package com.aetna.hadoop.dgc.hive;
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
import org.apache.hadoop.hive.ql.lib.GraphWalker;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.apache.hadoop.hive.ql.parse.ParseDriver;
import org.apache.hadoop.hive.ql.parse.ParseException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import com.aetna.hadoop.dgc.hive.HiveLineageBean.CreateColumns;
import com.aetna.hadoop.dgc.hive.HiveLineageBean.GroupBy;
import com.aetna.hadoop.dgc.hive.HiveLineageBean.QueryColumns;
import com.aetna.hadoop.dgc.hive.HiveLineageBean.SourceTables;
import com.aetna.hadoop.dgc.hive.HiveLineageBean.WhereClause;
import com.google.gson.Gson;
/**
*
* This class prints out the lineage info. It takes sql as input and prints
* lineage info. Currently this prints only input and output tables for a given
* sql. Later we can expand to add join tables etc.
*
*/
public class HiveLineageInfo implements NodeProcessor {
private final Log LOG = LogFactory.getLog(HiveLineageInfo.class.getName());
public Map<Integer, String> queryMap;
public Integer counter = 0;
public HiveLineageBean hlb = new HiveLineageBean();;
public ArrayList<SourceTables> sourceTables;
public ArrayList<QueryColumns> queryColumns;
public ArrayList<GroupBy> groupBy;
public ArrayList<WhereClause> whereClause;
public ArrayList<CreateColumns> createColumns;
/**
* @return Custom HiveLineageBean data to be passed to GSON parsert
*/
public HiveLineageBean getHLBean() {
return hlb;
}
/**
* Implements the process method for the NodeProcessor interface.
*/
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
ASTNode pt = (ASTNode) nd;
/*
* Check the 1st-level children and do simple semantic checks: 1) CTLT and
* CTAS should not coexists. 2) CTLT or CTAS should not coexists with column
* list (target table schema). 3) CTAS does not support partitioning (for
* now).
*/
switch (pt.getToken().getType()) {
case HiveParser.TOK_TABREF:
sourceTables = new ArrayList<SourceTables>();
LOG.debug("From Table Dump: "+pt.dump());
fromTableDump(pt);
this.hlb.setSourceTables(sourceTables);
break;
case HiveParser.TOK_SELECT:
queryColumns = new ArrayList<QueryColumns>();
LOG.debug("Column Dump: "+pt.dump());
columnTableDump(pt);
this.hlb.setQueryColumns(queryColumns);
break;
case HiveParser.TOK_WHERE:
whereClause = new ArrayList<WhereClause>();
LOG.debug("WHERE CLAUSE DUMP: "+pt.dump());
whereDump(pt);
this.hlb.setWhereClause(whereClause);
break;
case HiveParser.TOK_GROUPBY:
groupBy = new ArrayList<GroupBy>();
LOG.debug("GROUPBY CLAUSE DUMP: "+pt.dump());
groupByDump(pt);
this.hlb.setGroupBy(groupBy);
break;
case HiveParser.TOK_CREATETABLE:
createColumns = new ArrayList<CreateColumns>();
LOG.debug("CREATABLE DUMP: "+pt.dump());
createTableDump(pt);
break;
}
return null;
}
/**
* Walks the whereTree called by processWalker
*/
public void whereDump(ASTNode nodeIn) {
counter = 0;
wdump(nodeIn);
}
/**
* Walks the Where Tree called by whereDump
*/
private void wdump(ASTNode nodeIn) {
boolean parseChild = true;
if (nodeIn.getType() == HiveParser.TOK_TABLE_OR_COL) {
WhereClause whreClse = hlb.new WhereClause();
if (nodeIn.getParent().getText().equalsIgnoreCase(".")) {
ASTNode checkOrAnd = (ASTNode) nodeIn.getParent().getParent().getChild(1).getParent().getParent();
if (checkOrAnd.getType() == HiveParser.KW_AND || checkOrAnd.getType() == HiveParser.KW_OR) {
LOG.info("WHERE:: "+checkOrAnd.getText());
whreClse.setColumnOperator(checkOrAnd.getText());
}
LOG.info("Table Alias:: "+nodeIn.getChild(0).getText());
whreClse.setTbAliasOrName(nodeIn.getChild(0).getText());
LOG.info("Delimiter:: "+nodeIn.getParent().getText());
LOG.info("Column:: "+nodeIn.getParent().getChild(1).getText());
whreClse.setColumnName(nodeIn.getParent().getChild(1).getText());
LOG.info("Column Qualifer:: "+nodeIn.getParent().getParent().getChild(1).getParent().getText());
whreClse.setColumnOperator(nodeIn.getParent().getParent().getChild(1).getParent().getText());
LOG.info("Column Value:: "+nodeIn.getParent().getParent().getChild(1).getText());
whreClse.setColumnValue(nodeIn.getParent().getParent().getChild(1).getText());
} else {
ASTNode checkOrAnd = (ASTNode) nodeIn.getParent().getParent().getChild(1).getParent();
if (checkOrAnd.getType() == HiveParser.KW_AND || checkOrAnd.getType() == HiveParser.KW_OR) {
LOG.info("WHERE:: "+checkOrAnd.getText());
whreClse.setColumnOperator(checkOrAnd.getText());
}
LOG.info("Column:: = "+nodeIn.getChild(0).getText());
whreClse.setColumnName(nodeIn.getChild(0).getText());
//LOG.info("Delimiter "+nodeIn.getParent().getText());
LOG.info("Column Qualifer:: "+nodeIn.getParent().getChild(1).getParent().getText());
whreClse.setColumnOperator(nodeIn.getParent().getChild(1).getParent().getText());
LOG.info("Column Value:: "+nodeIn.getParent().getChild(1).getText());
whreClse.setColumnValue(nodeIn.getParent().getChild(1).getText());
}
whereClause.add(whreClse);
}
if (parseChild) {
int childCount = nodeIn.getChildCount();
if (childCount != 0 ){
for (int numr = 0; numr < childCount; numr++) {
wdump((ASTNode)nodeIn.getChild(numr));
}
}
}
}
/**
* Walks the GroupByTree called by processWalker
*/
public void groupByDump(ASTNode nodeIn) {
counter = 0;
gdump(nodeIn);
}
/**
* Walks the GroupBy Tree called by groupByDump
*/
private void gdump(ASTNode nodeIn) {
boolean parseChild = true;
if (nodeIn.getType() == HiveParser.TOK_TABLE_OR_COL) {
GroupBy grpBy = hlb.new GroupBy();
ASTNode parentNode = (ASTNode) nodeIn.getParent();
if (parentNode.getText().equalsIgnoreCase(".")) {
LOG.info("GroupBy TableAlias: "+nodeIn.getChild(0).getText());
grpBy.setTbAliasOrName(nodeIn.getChild(0).getText());
LOG.info("GroupBy Column:: "+parentNode.getChild(1).getText());
grpBy.setColumnName(parentNode.getChild(1).getText());
} else {
LOG.info("GroupBy Column: "+nodeIn.getChild(0).getText());
grpBy.setColumnName(nodeIn.getChild(0).getText());
}
groupBy.add(grpBy);
}
if (parseChild) {
int childCount = nodeIn.getChildCount();
if (childCount != 0 ){
for (int numr = 0; numr < childCount; numr++) {
gdump((ASTNode)nodeIn.getChild(numr));
}
}
}
}
/**
* Walks the CreateTable Tree called by processWalker
*/
public void createTableDump(ASTNode nodeIn) {
counter = 0;
if (nodeIn.getFirstChildWithType(HiveParser.TOK_TABNAME) != null) {
LOG.info("Create TableName:: "+nodeIn.getFirstChildWithType(HiveParser.TOK_TABNAME).getText());
hlb.setTableName(nodeIn.getFirstChildWithType(HiveParser.TOK_TABNAME).getChild(0).getText());
}
if (nodeIn.getFirstChildWithType(HiveParser.TOK_TABLELOCATION) != null) {
LOG.info("Create Table Location:: "+nodeIn.getFirstChildWithType(HiveParser.TOK_TABLELOCATION).getText());
hlb.setTableLocation(nodeIn.getFirstChildWithType(HiveParser.TOK_TABLELOCATION).getChild(0).getText());
}
if (nodeIn.getFirstChildWithType(HiveParser.TOK_TABCOLLIST) != null ) {
ctdump((ASTNode)nodeIn.getFirstChildWithType(HiveParser.TOK_TABCOLLIST).getParent());
hlb.setCreateColumns(createColumns);
}
}
/**
* Walks the CreateTable Tree called by createTableDump
*/
private void ctdump(ASTNode nodeIn) {
boolean parseChild = true;
if (nodeIn.getType() == HiveParser.TOK_TABCOL) {
CreateColumns crtClmns = hlb.new CreateColumns();
LOG.info("Create Column Name:: "+nodeIn.getChild(0).getText());
crtClmns.setColumnName(nodeIn.getChild(0).getText());
LOG.info("Create Column Type:: "+nodeIn.getChild(1).getText());
crtClmns.setColumnType(nodeIn.getChild(1).getText());
createColumns.add(crtClmns);
}
if (parseChild) {
int childCount = nodeIn.getChildCount();
if (childCount != 0 ){
for (int numr = 0; numr < childCount; numr++) {
ctdump((ASTNode)nodeIn.getChild(numr));
}
}
}
}
/**
* Walks the fromTable Tree called by processWalker
*/
public void fromTableDump(ASTNode nodeIn) {
counter = 0;
ftdump(nodeIn);
}
/**
* Walks the fromTable Tree called by fromTableDump
*/
private void ftdump(ASTNode nodeIn) {
boolean parseChild = true;
if (nodeIn.getType() == HiveParser.TOK_TABNAME && nodeIn.getParent().getType() == HiveParser.TOK_TABREF) {
SourceTables hlbSbls = hlb.new SourceTables();
if (nodeIn.getChildCount() == 2) {
LOG.info("From DBName:: "+nodeIn.getChild(0).getText());
hlbSbls.setDatabaseName(nodeIn.getChild(0).getText());
LOG.info("From TableName:: "+nodeIn.getChild(1).getText());
hlbSbls.setTableName(nodeIn.getChild(1).getText());
} else {
LOG.info("From TableName:: "+nodeIn.getChild(0).getText());
hlbSbls.setTableName(nodeIn.getChild(0).getText());
}
if (nodeIn.getType() == HiveParser.TOK_TABNAME && nodeIn.getParent().getChild(1) != null) {
LOG.info("From DB/Table Alias:: "+nodeIn.getParent().getChild(1).getText());
hlbSbls.setTableAlias(nodeIn.getParent().getChild(1).getText());
}
sourceTables.add(hlbSbls);
}
if (parseChild) {
int childCount = nodeIn.getChildCount();
if (childCount != 0 ){
for (int numr = 0; numr < childCount; numr++) {
ftdump((ASTNode)nodeIn.getChild(numr));
}
}
}
}
/**
* Walks the column Tree called by processWalker
*/
public void columnTableDump(ASTNode nodeIn) {
counter = 0;
clmnTdump(nodeIn);
}
/**
* Walks the columnDump Tree called by columnTableDump
*/
private void clmnTdump(ASTNode nodeIn) {
boolean parseChild = true;
if (nodeIn.getType() == HiveParser.TOK_TABLE_OR_COL && nodeIn.getAncestor(HiveParser.TOK_SELEXPR) != null ) {
QueryColumns qclmns = hlb.new QueryColumns();
if (nodeIn.getAncestor(HiveParser.TOK_FUNCTION) != null && nodeIn.getAncestor(HiveParser.TOK_SELEXPR) != null) {
LOG.info("Function Query:: "+nodeIn.getAncestor(HiveParser.TOK_FUNCTION).getChild(0).getText());
qclmns.setColumnFunction(nodeIn.getAncestor(HiveParser.TOK_FUNCTION).getChild(0).getText());
}
if (nodeIn.getParent().getText().equalsIgnoreCase(".")) {
LOG.info("Table Name/Alias:: "+nodeIn.getChild(0).getText());
qclmns.setTbAliasOrName(nodeIn.getChild(0).getText());
LOG.info("Column:: "+nodeIn.getParent().getChild(1).getText());
qclmns.setColumnName(nodeIn.getParent().getChild(1).getText());
if (nodeIn.getAncestor(HiveParser.TOK_SELEXPR).getChild(1) != null) {
LOG.info("Column Alias:: "+nodeIn.getAncestor(HiveParser.TOK_SELEXPR).getChild(1).getText());
qclmns.setColumnAlias(nodeIn.getAncestor(HiveParser.TOK_SELEXPR).getChild(1).getText());
}
} else {
LOG.info("Column:: "+nodeIn.getChild(0).getText());
qclmns.setColumnName(nodeIn.getChild(0).getText());
if (nodeIn.getParent().getChild(1) != null) {
LOG.info("Column Alias:: "+nodeIn.getParent().getChild(1).getText());
qclmns.setColumnAlias(nodeIn.getParent().getChild(1).getText());
}
}
if (qclmns.getColumnName() != null) {
queryColumns.add(qclmns);
}
}
if (parseChild) {
int childCount = nodeIn.getChildCount();
if (childCount != 0 ){
for (int numr = 0; numr < childCount; numr++) {
clmnTdump((ASTNode)nodeIn.getChild(numr));
}
}
}
}
/**
* parses given query and gets the lineage info.
*
* @param query
* @throws ParseException
*/
public void getLineageInfo(String query) throws ParseException,
SemanticException {
/*
* Get the AST tree
*/
ParseDriver pd = new ParseDriver();
ASTNode tree = pd.parse(query);
LOG.info("DUMP TREE: "+tree.dump());
while ((tree.getToken() == null) && (tree.getChildCount() > 0)) {
tree = (ASTNode) tree.getChild(0);
}
/*
* initialize Event Processor and dispatcher.
*/
// create a walker which walks the tree in a DFS manner while maintaining
// the operator stack. The dispatcher
// generates the plan from the operator tree
Map<Rule, NodeProcessor> rules = new LinkedHashMap<Rule, NodeProcessor>();
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
Dispatcher disp = new DefaultRuleDispatcher(this, rules, null);
GraphWalker ogw = new DefaultGraphWalker(disp);
// Create a list of topop nodes
ArrayList<Node> topNodes = new ArrayList<Node>();
topNodes.add(tree);
ogw.startWalking(topNodes, null);
}
//Main method to run tests and return json/gson feed from a query
public static void main(String[] args) throws IOException, ParseException,
SemanticException {
String query = args[0];
ConsoleAppender console = new ConsoleAppender(); //create appender
//configure the appender
String PATTERN = "%d [%p|%c|%C{1}] %m%n";
console.setLayout(new PatternLayout(PATTERN));
console.setThreshold(Level.DEBUG);
console.activateOptions();
//add appender to any Logger (here is root)
Logger.getRootLogger().addAppender(console);
LogManager.getRootLogger().setLevel(Level.DEBUG);
HiveLineageInfo lep = new HiveLineageInfo();
lep.getLineageInfo(query);
Gson gson = new Gson();
String jsonOut = gson.toJson(lep.getHLBean());
System.out.println("GSON/JSON Generate :: "+jsonOut);
}
}
package com.aetna.hadoop.dgc.hive;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.ExplainTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.util.StringUtils;
//import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
//import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
//import org.apache.hadoop.yarn.client.api.TimelineClient;
//import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
* DGC Hook sends query + plan info to DGCCollector Service. To enable (hadoop 2.4 and up) set
* hive.exec.pre.hooks/hive.exec.post.hooks/hive.exec.failure.hooks to include this class.
*/
public class Hook implements ExecuteWithHookContext {
private static final Log LOG = LogFactory.getLog(Hook.class.getName());
//private static TimelineClient timelineClient;
private enum EntityTypes { HIVE_QUERY_ID };
private enum EventTypes { QUERY_SUBMITTED, QUERY_COMPLETED };
private enum OtherInfoTypes { QUERY, STATUS, TEZ, MAPRED };
private enum PrimaryFilterTypes { user };
private static final int WAIT_TIME = 3;
private HiveLineageBean hlb;
@Override
public void run(HookContext hookContext) throws Exception {
long currentTime = System.currentTimeMillis();
try {
QueryPlan plan = hookContext.getQueryPlan();
if (plan == null) {
return;
}
ExplainTask explain = new ExplainTask();
explain.initialize(hookContext.getConf(), plan, null);
List<Task<?>> rootTasks = plan.getRootTasks();
String queryId = plan.getQueryId();
String queryStartTime = plan.getQueryStartTime().toString();
String user = hookContext.getUgi().getUserName();
String query = plan.getQueryStr();
int numMrJobs = Utilities.getMRTasks(plan.getRootTasks()).size();
int numTezJobs = Utilities.getTezTasks(plan.getRootTasks()).size();
switch(hookContext.getHookType()) {
case PRE_EXEC_HOOK:
Set<ReadEntity> db = hookContext.getInputs();
for (Object o : db) {
LOG.error("DB:Table="+o.toString());
}
break;
case POST_EXEC_HOOK:
currentTime = System.currentTimeMillis();
HiveLineageInfo lep = new HiveLineageInfo();
lep.getLineageInfo(query);
hlb=lep.getHLBean();
hlb.setQueryEndTime(Long.toString(currentTime));
hlb.setQueryId(queryId);
hlb.setQuery(query);
hlb.setUser(user);
hlb.setQueryStartTime(queryStartTime);
fireAndForget(hookContext.getConf(), hlb, queryId);
break;
case ON_FAILURE_HOOK:
// ignore
break;
default:
//ignore
break;
}
} catch (Exception e) {
LOG.info("Failed to submit plan to DGC: " + StringUtils.stringifyException(e));
}
}
public void fireAndForget(Configuration conf, HiveLineageBean hookData, String queryId) throws Exception {
String postUri = "http://167.69.111.50:20810/HiveHookCollector/HookServlet";
if (conf.getTrimmed("aetna.hive.hook") != null) {
postUri = conf.getTrimmed("aetna.hive.hook");
}
Gson gson = new Gson();
String gsonString = gson.toJson(hookData);
System.out.println("GSON String: "+gsonString);
String encodedGsonQuery = URLEncoder.encode(gsonString, "UTF-8");
String encodedQueryId = URLEncoder.encode(queryId, "UTF-8");
String postData = "hookdata=" + encodedGsonQuery+"&queryid="+encodedQueryId;
// Create a trust manager that does not validate certificate chains
if (postUri.contains("https:")) {
TrustManager[] trustAllCerts = new TrustManager[]{
new X509TrustManager() {
public java.security.cert.X509Certificate[] getAcceptedIssuers() {
return null;
}
public void checkClientTrusted(
java.security.cert.X509Certificate[] certs, String authType) {
}
public void checkServerTrusted(
java.security.cert.X509Certificate[] certs, String authType) {
}
}
};
// Install the all-trusting trust manager
try {
SSLContext sc = SSLContext.getInstance("SSL");
sc.init(null, trustAllCerts, new java.security.SecureRandom());
HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
} catch (Exception e) {
e.printStackTrace();
}
}
URL url = new URL(postUri);
System.out.println("Post URI: "+postUri);
DataOutputStream wr = null;
//HttpURLConnection urlcon = null;
if (postUri.contains("https:")) {
HttpsURLConnection urlcon = null;
urlcon = (HttpsURLConnection)url.openConnection();
urlcon.setRequestMethod("POST");
urlcon.setRequestProperty("X-Requested-By", "HiveHook");
urlcon.setRequestProperty("Content-Type","application/x-www-form-urlencoded");
urlcon.setUseCaches(false);
urlcon.setDoInput(true);
urlcon.setDoOutput(true);
wr = new DataOutputStream (urlcon.getOutputStream());
System.out.println("PostString: "+postData);
//wr.writeBytes(postString.);
wr.write(postData.getBytes());
wr.flush ();
wr.close ();
InputStream is = urlcon.getInputStream();
InputStreamReader isr = new InputStreamReader(is);
int numCharsRead;
char[] charArray = new char[1024];
StringBuffer sb = new StringBuffer();
while ((numCharsRead = isr.read(charArray)) > 0) {
sb.append(charArray, 0, numCharsRead);
}
String result = sb.toString();
System.out.println("Post Response: "+result);
isr.close();
is.close();
urlcon.disconnect();
} else {
HttpURLConnection urlcon = null;
urlcon = (HttpURLConnection)url.openConnection();
urlcon.setRequestMethod("POST");
urlcon.setRequestProperty("X-Requested-By", "HiveHook");
urlcon.setRequestProperty("Content-Type","application/x-www-form-urlencoded");
urlcon.setUseCaches(false);
urlcon.setDoInput(true);
urlcon.setDoOutput(true);
wr = new DataOutputStream (urlcon.getOutputStream());
System.out.println("PostString: "+postData);
//wr.writeBytes(postString.);
wr.write(postData.getBytes());
wr.flush ();
wr.close ();
InputStream is = urlcon.getInputStream();
InputStreamReader isr = new InputStreamReader(is);
int numCharsRead;
char[] charArray = new char[1024];
StringBuffer sb = new StringBuffer();
while ((numCharsRead = isr.read(charArray)) > 0) {
sb.append(charArray, 0, numCharsRead);
}
String result = sb.toString();
System.out.println("Post Response: "+result);
isr.close();
is.close();
urlcon.disconnect();
}
}
}
......@@ -315,6 +315,7 @@
<module>repository</module>
<module>webapp</module>
<module>docs</module>
<module>metadata-hivehook</module>
</modules>
<repositories>
......
<factorypath>
<factorypathentry kind="PLUGIN" id="com.ibm.jee.annotations.processor" enabled="true" runInBatchMode="false"/>
<factorypathentry kind="PLUGIN" id="com.ibm.etools.javaee.cdi.ext.ui" enabled="false" runInBatchMode="false"/>
<factorypathentry kind="PLUGIN" id="com.ibm.jaxrs.annotations.processor" enabled="false" runInBatchMode="false"/>
<factorypathentry kind="PLUGIN" id="org.eclipse.jst.ws.annotations.core" enabled="true" runInBatchMode="false"/>
</factorypath>
<factorypath>
<factorypathentry kind="PLUGIN" id="com.ibm.jaxrs.annotations.processor" enabled="true" runInBatchMode="false"/>
<factorypathentry kind="PLUGIN" id="com.ibm.jee.annotations.processor" enabled="true" runInBatchMode="false"/>
<factorypathentry kind="PLUGIN" id="com.ibm.etools.javaee.cdi.ext.ui" enabled="false" runInBatchMode="false"/>
<factorypathentry kind="PLUGIN" id="org.eclipse.jst.ws.annotations.core" enabled="true" runInBatchMode="false"/>
</factorypath>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment