Commit 384c3358 by Madhan Neethiraj Committed by Vimal Sharma

Atlas changes to support Hive hook for Hive2

parent cc503701
...@@ -25,7 +25,10 @@ import org.apache.hadoop.hive.ql.hooks.LineageInfo; ...@@ -25,7 +25,10 @@ import org.apache.hadoop.hive.ql.hooks.LineageInfo;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -62,18 +65,55 @@ public class ColumnLineageUtils { ...@@ -62,18 +65,55 @@ public class ColumnLineageUtils {
for (Map.Entry<LineageInfo.DependencyKey, LineageInfo.Dependency> e : lInfo.entrySet()) { for (Map.Entry<LineageInfo.DependencyKey, LineageInfo.Dependency> e : lInfo.entrySet()) {
List<HiveColumnLineageInfo> l = new ArrayList<>(); List<HiveColumnLineageInfo> l = new ArrayList<>();
String k = getQualifiedName(e.getKey()); String k = getQualifiedName(e.getKey());
for (LineageInfo.BaseColumnInfo iCol : e.getValue().getBaseCols()) {
String db = iCol.getTabAlias().getTable().getDbName(); if (LOG.isDebugEnabled()) {
String table = iCol.getTabAlias().getTable().getTableName(); LOG.debug("buildLineageMap(): key={}; value={}", e.getKey(), e.getValue());
String colQualifiedName = iCol.getColumn() == null ? db + "." + table : db + "." + table + "." + iCol.getColumn().getName(); }
l.add(new HiveColumnLineageInfo(e.getValue(), colQualifiedName));
Collection<LineageInfo.BaseColumnInfo> baseCols = getBaseCols(e.getValue());
if (baseCols != null) {
for (LineageInfo.BaseColumnInfo iCol : baseCols) {
String db = iCol.getTabAlias().getTable().getDbName();
String table = iCol.getTabAlias().getTable().getTableName();
String colQualifiedName = iCol.getColumn() == null ? db + "." + table : db + "." + table + "." + iCol.getColumn().getName();
l.add(new HiveColumnLineageInfo(e.getValue(), colQualifiedName));
}
if (LOG.isDebugEnabled()) {
LOG.debug("Setting lineage --> Input: {} ==> Output : {}", l, k);
}
m.put(k, l);
} }
LOG.debug("Setting lineage --> Input: {} ==> Output : {}", l, k);
m.put(k, l);
} }
return m; return m;
} }
static Collection<LineageInfo.BaseColumnInfo> getBaseCols(LineageInfo.Dependency lInfoDep) {
Collection<LineageInfo.BaseColumnInfo> ret = null;
if (lInfoDep != null) {
try {
Method getBaseColsMethod = lInfoDep.getClass().getMethod("getBaseCols");
Object retGetBaseCols = getBaseColsMethod.invoke(lInfoDep);
if (retGetBaseCols != null) {
if (retGetBaseCols instanceof Collection) {
ret = (Collection) retGetBaseCols;
} else {
LOG.warn("{}: unexpected return type from LineageInfo.Dependency.getBaseCols(), expected type {}",
retGetBaseCols.getClass().getName(), "Collection");
}
}
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) {
LOG.warn("getBaseCols()", ex);
}
}
return ret;
}
static String[] extractComponents(String qualifiedName) { static String[] extractComponents(String qualifiedName) {
String[] comps = qualifiedName.split("\\."); String[] comps = qualifiedName.split("\\.");
int lastIdx = comps.length - 1; int lastIdx = comps.length - 1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment