Commit a52112d8 by Shwetha GS

ATLAS-247 Hive Column level lineage (rhbutani,svimal2106 via shwethags)

parent 4c56c61f
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hive.bridge;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.hive.hook.HiveHook;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.hadoop.hive.ql.hooks.LineageInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class ColumnLineageUtils {
public static final Logger LOG = LoggerFactory.getLogger(ColumnLineageUtils.class);
public static class HiveColumnLineageInfo {
public final String depenendencyType;
public final String expr;
public final String inputColumn;
HiveColumnLineageInfo(LineageInfo.Dependency d, String inputCol) {
depenendencyType = d.getType().name();
expr = d.getExpr();
inputColumn = inputCol;
}
@Override
public String toString(){
return inputColumn;
}
}
public static String getQualifiedName(LineageInfo.DependencyKey key){
String db = key.getDataContainer().getTable().getDbName();
String table = key.getDataContainer().getTable().getTableName();
String col = key.getFieldSchema().getName();
return db + "." + table + "." + col;
}
public static Map<String, List<HiveColumnLineageInfo>> buildLineageMap(LineageInfo lInfo) {
Map<String, List<HiveColumnLineageInfo>> m = new HashMap<>();
for (Map.Entry<LineageInfo.DependencyKey, LineageInfo.Dependency> e : lInfo.entrySet()) {
List<HiveColumnLineageInfo> l = new ArrayList<>();
String k = getQualifiedName(e.getKey());
for (LineageInfo.BaseColumnInfo iCol : e.getValue().getBaseCols()) {
String db = iCol.getTabAlias().getTable().getDbName();
String table = iCol.getTabAlias().getTable().getTableName();
String colQualifiedName = iCol.getColumn() == null ? db + "." + table : db + "." + table + "." + iCol.getColumn().getName();
l.add(new HiveColumnLineageInfo(e.getValue(), colQualifiedName));
}
LOG.debug("Setting lineage --> Input: {} ==> Output : {}", l, k);
m.put(k, l);
}
return m;
}
static String[] extractComponents(String qualifiedName) {
String[] comps = qualifiedName.split("\\.");
int lastIdx = comps.length - 1;
int atLoc = comps[lastIdx].indexOf('@');
if (atLoc > 0) {
comps[lastIdx] = comps[lastIdx].substring(0, atLoc);
}
return comps;
}
static void populateColumnReferenceableMap(Map<String, Referenceable> m,
Referenceable r) {
if (r.getTypeName().equals(HiveDataTypes.HIVE_TABLE.getName())) {
String qName = (String) r.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
String[] qNameComps = extractComponents(qName);
for (Referenceable col : (List<Referenceable>) r.get(HiveDataModelGenerator.COLUMNS)) {
String cName = (String) col.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
String[] colQNameComps = extractComponents(cName);
String colQName = colQNameComps[0] + "." + colQNameComps[1] + "." + colQNameComps[2];
m.put(colQName, col);
}
String tableQName = qNameComps[0] + "." + qNameComps[1];
m.put(tableQName, r);
}
}
public static Map<String, Referenceable> buildColumnReferenceableMap(List<Referenceable> inputs,
List<Referenceable> outputs) {
Map<String, Referenceable> m = new HashMap<>();
for (Referenceable r : inputs) {
populateColumnReferenceableMap(m, r);
}
for (Referenceable r : outputs) {
populateColumnReferenceableMap(m, r);
}
return m;
}
}
......@@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.bridge.ColumnLineageUtils;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hook.AtlasHook;
......@@ -39,6 +40,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.ExplainTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.hooks.*;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.Entity.Type;
import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
......@@ -55,6 +57,7 @@ import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.MalformedURLException;
import java.net.URI;
import java.util.ArrayList;
......@@ -182,6 +185,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
event.setQueryStr(hookContext.getQueryPlan().getQueryStr());
event.setQueryStartTime(hookContext.getQueryPlan().getQueryStartTime());
event.setQueryType(hookContext.getQueryPlan().getQueryPlan().getQueryType());
event.setLineageInfo(hookContext.getLinfo());
if (executor == null) {
fireAndForget(event);
......@@ -616,7 +620,21 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
if (source.size() > 0 || target.size() > 0) {
Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, sortedHiveInputs, sortedHiveOutputs, source, target);
entities.add(processReferenceable);
// setup Column Lineage
List<Referenceable> sourceList = new ArrayList<>(source.values());
List<Referenceable> targetList = new ArrayList<>(target.values());
List<Referenceable> colLineageProcessInstances = new ArrayList<>();
try {
Map<String, Referenceable> columnQNameToRef =
ColumnLineageUtils.buildColumnReferenceableMap(sourceList, targetList);
colLineageProcessInstances = createColumnLineageProcessInstances(processReferenceable,
event.lineageInfo,
columnQNameToRef);
}catch (Exception e){
LOG.warn("Column lineage process setup failed with exception {}", e);
}
colLineageProcessInstances.add(0, processReferenceable);
entities.addAll(colLineageProcessInstances);
event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), new ArrayList<>(entities)));
} else {
LOG.info("Skipped query {} since it has no getInputs() or resulting getOutputs()", event.getQueryStr());
......@@ -773,6 +791,51 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return processReferenceable;
}
private List<Referenceable> createColumnLineageProcessInstances(
Referenceable processRefObj,
Map<String, List<ColumnLineageUtils.HiveColumnLineageInfo>> lineageInfo,
Map<String, Referenceable> columnQNameToRef
) {
List<Referenceable> l = new ArrayList<>();
for(Map.Entry<String, List<ColumnLineageUtils.HiveColumnLineageInfo>> e :
lineageInfo.entrySet()) {
Referenceable destCol = columnQNameToRef.get(e.getKey());
if (destCol == null ) {
LOG.debug("Couldn't find output Column {}", e.getKey());
continue;
}
List<Referenceable> outRef = new ArrayList<>();
outRef.add(destCol);
List<Referenceable> inputRefs = new ArrayList<>();
for(ColumnLineageUtils.HiveColumnLineageInfo cLI : e.getValue()) {
Referenceable srcCol = columnQNameToRef.get(cLI.inputColumn);
if (srcCol == null ) {
LOG.debug("Couldn't find input Column {}", cLI.inputColumn);
continue;
}
inputRefs.add(srcCol);
}
if (inputRefs.size() > 0 ) {
Referenceable r = new Referenceable(HiveDataTypes.HIVE_COLUMN_LINEAGE.getName());
r.set("name", processRefObj.get(AtlasClient.NAME) + ":" + outRef.get(0).get(AtlasClient.NAME));
r.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processRefObj.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME) + ":" + outRef.get(0).get(AtlasClient.NAME));
r.set("inputs", inputRefs);
r.set("outputs", outRef);
r.set("query", processRefObj);
r.set("depenendencyType", e.getValue().get(0).depenendencyType);
r.set("expression", e.getValue().get(0).expr);
l.add(r);
}
else{
LOG.debug("No input references found for lineage of column {}", destCol.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME));
}
}
return l;
}
@VisibleForTesting
static String getProcessQualifiedName(HiveMetaStoreBridge dgiBridge, HiveEventContext eventContext,
final SortedSet<ReadEntity> sortedHiveInputs,
......@@ -930,6 +993,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private String queryStr;
private Long queryStartTime;
public Map<String, List<ColumnLineageUtils.HiveColumnLineageInfo>> lineageInfo;
private List<HookNotification.HookNotificationMessage> messages = new ArrayList<>();
private String queryType;
......@@ -978,6 +1043,15 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
this.queryType = queryType;
}
public void setLineageInfo(LineageInfo lineageInfo){
try {
this.lineageInfo = ColumnLineageUtils.buildLineageMap(lineageInfo);
LOG.debug("Column Lineage Map => {} ", this.lineageInfo.entrySet());
}catch (Exception e){
LOG.warn("Column Lineage Map build failed with exception {}", e);
}
}
public Set<ReadEntity> getInputs() {
return inputs;
}
......
......@@ -20,7 +20,6 @@ package org.apache.atlas.hive.model;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasException;
......@@ -107,6 +106,7 @@ public class HiveDataModelGenerator {
// DDL/DML Process
createProcessClass();
createColumnLineageClass();
}
public TypesDef getTypesDef() {
......@@ -328,4 +328,23 @@ public class HiveDataModelGenerator {
}
}
private void createColumnLineageClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition("query", HiveDataTypes.HIVE_PROCESS.getName(),
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("depenendencyType",DataTypes.STRING_TYPE.getName(),
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("expression",DataTypes.STRING_TYPE.getName(),
Multiplicity.OPTIONAL, false, null)
};
HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_COLUMN_LINEAGE.getName(), null,
ImmutableSet.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions);
classTypeDefinitions.put(HiveDataTypes.HIVE_COLUMN_LINEAGE.getName(), definition);
LOG.debug("Created definition for " + HiveDataTypes.HIVE_COLUMN_LINEAGE.getName());
}
}
......@@ -42,7 +42,8 @@ public enum HiveDataTypes {
HIVE_INDEX,
HIVE_ROLE,
HIVE_TYPE,
HIVE_PROCESS
HIVE_PROCESS,
HIVE_COLUMN_LINEAGE
// HIVE_VIEW,
;
......
......@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
......@@ -54,18 +55,7 @@ import org.testng.annotations.Test;
import java.io.File;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.*;
import static org.apache.atlas.AtlasClient.NAME;
import static org.apache.atlas.hive.hook.HiveHook.IO_SEP;
......@@ -320,6 +310,7 @@ public class HiveHookIT extends HiveITBase {
assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT, readEntities, writeEntities));
assertTableIsRegistered(DEFAULT_DB, ctasTableName);
}
private HiveHook.HiveEventContext constructEvent(String query, HiveOperation op, Set<ReadEntity> inputs, Set<WriteEntity> outputs) {
......@@ -1116,6 +1107,83 @@ public class HiveHookIT extends HiveITBase {
);
}
/*
The test is disabled by default
Reason : Atlas uses Hive version 1.2.x and the Hive patch HIVE-13112 which enables column level lineage is not
committed in Hive version 1.2.x
This test will fail if the lineage information is not available from Hive
Once the patch for HIVE-13112 is committed to Hive branch 1.2.x, the test can be enabled
Please track HIVE-14706 to know the status of column lineage availability in latest Hive versions i.e 2.1.x
*/
@Test(enabled = false)
public void testColumnLevelLineage() throws Exception {
String sourceTable = "table" + random();
runCommand("create table " + sourceTable + "(a int, b int)");
String sourceTableGUID = assertTableIsRegistered(DEFAULT_DB, sourceTable);
String a_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, sourceTable), "a"));
String b_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, sourceTable), "b"));
String ctasTableName = "table" + random();
String query = "create table " + ctasTableName + " as " +
"select sum(a+b) as a, count(*) as b from " + sourceTable;
runCommand(query);
String dest_a_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, ctasTableName), "a"));
String dest_b_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, ctasTableName), "b"));
final Set<ReadEntity> inputs = getInputs(sourceTable, Entity.Type.TABLE);
final Set<WriteEntity> outputs = getOutputs(ctasTableName, Entity.Type.TABLE);
HiveHook.HiveEventContext event = constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs);
assertProcessIsRegistered(event);
assertTableIsRegistered(DEFAULT_DB, ctasTableName);
String processQName = sortEventsAndGetProcessQualifiedName(event);
List<String> aLineageInputs = Arrays.asList(a_guid, b_guid);
String aLineageProcessName = processQName + ":" + "a";
LOG.debug("Searching for column lineage process {} ", aLineageProcessName);
String guid = assertEntityIsRegistered(HiveDataTypes.HIVE_COLUMN_LINEAGE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, aLineageProcessName, null);
List<Id> processInputs = (List<Id>) atlasClient.getEntity(guid).get("inputs");
List<String> processInputsAsString = new ArrayList<>();
for(Id input: processInputs){
processInputsAsString.add(input._getId());
}
Collections.sort(processInputsAsString);
Collections.sort(aLineageInputs);
Assert.assertEquals(processInputsAsString, aLineageInputs);
List<String> bLineageInputs = Arrays.asList(sourceTableGUID);
String bLineageProcessName = processQName + ":" + "b";
LOG.debug("Searching for column lineage process {} ", bLineageProcessName);
String guid1 = assertEntityIsRegistered(HiveDataTypes.HIVE_COLUMN_LINEAGE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, bLineageProcessName, null);
List<Id> bProcessInputs = (List<Id>) atlasClient.getEntity(guid1).get("inputs");
List<String> bProcessInputsAsString = new ArrayList<>();
for(Id input: bProcessInputs){
bProcessInputsAsString.add(input._getId());
}
Collections.sort(bProcessInputsAsString);
Collections.sort(bLineageInputs);
Assert.assertEquals(bProcessInputsAsString, bLineageInputs);
//Test lineage API response
JSONObject response = atlasClient.getInputGraphForEntity(dest_a_guid);
JSONObject vertices = response.getJSONObject("values").getJSONObject("vertices");
JSONObject dest_a_val = (JSONObject) vertices.get(dest_a_guid);
JSONObject src_a_val = (JSONObject) vertices.get(a_guid);
JSONObject src_b_val = (JSONObject) vertices.get(b_guid);
Assert.assertNotNull(dest_a_val);
Assert.assertNotNull(src_a_val);
Assert.assertNotNull(src_b_val);
JSONObject b_response = atlasClient.getInputGraphForEntity(dest_b_guid);
JSONObject b_vertices = b_response.getJSONObject("values").getJSONObject("vertices");
JSONObject b_val = (JSONObject) b_vertices.get(dest_b_guid);
JSONObject src_tbl_val = (JSONObject) b_vertices.get(sourceTableGUID);
Assert.assertNotNull(b_val);
Assert.assertNotNull(src_tbl_val);
}
@Test
public void testTruncateTable() throws Exception {
String tableName = createTable(false);
......@@ -1620,19 +1688,22 @@ public class HiveHookIT extends HiveITBase {
}
}
private String assertProcessIsRegistered(final HiveHook.HiveEventContext event) throws Exception {
try {
SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null ? null : new TreeSet<ReadEntity>(entityComparator);
SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == null ? null : new TreeSet<WriteEntity>(entityComparator);
private String sortEventsAndGetProcessQualifiedName(final HiveHook.HiveEventContext event) throws HiveException{
SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null ? null : new TreeSet<ReadEntity>(entityComparator);
SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == null ? null : new TreeSet<WriteEntity>(entityComparator);
if ( event.getInputs() != null) {
sortedHiveInputs.addAll(event.getInputs());
}
if ( event.getOutputs() != null) {
sortedHiveOutputs.addAll(event.getOutputs());
}
if ( event.getInputs() != null) {
sortedHiveInputs.addAll(event.getInputs());
}
if ( event.getOutputs() != null) {
sortedHiveOutputs.addAll(event.getOutputs());
}
return getProcessQualifiedName(hiveMetaStoreBridge, event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs()));
}
String processQFName = getProcessQualifiedName(hiveMetaStoreBridge, event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs()));
private String assertProcessIsRegistered(final HiveHook.HiveEventContext event) throws Exception {
try {
String processQFName = sortEventsAndGetProcessQualifiedName(event);
LOG.debug("Searching for process with query {}", processQFName);
return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName, new AssertPredicate() {
@Override
......
......@@ -9,6 +9,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements for al
ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai)
ALL CHANGES:
ATLAS-247 Hive Column level lineage (rhbutani,svimal2106 via shwethags)
ATLAS-1184 ReservedTypesRegistrar checks for existence of 1st class type (svimal2106 via shwethags)
ATLAS-1199 Atlas UI not loading after fresh build due to jquery-asBreadcrumbs plugin upgrade (kevalbhatt via shwethags)
ATLAS-1174 Framework to apply updates to types in the type-system (sarath.kum4r@gmail.com via shwethags)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment