Commit 045c0c07 by lina.li Committed by Sarath Subramanian

ATLAS-3080: Integration with Impala Hook API

parent 25c3bf9a
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>apache-atlas</artifactId>
<groupId>org.apache.atlas</groupId>
<version>3.0.0-SNAPSHOT</version>
<relativePath>../../</relativePath>
</parent>
<artifactId>impala-bridge-shim</artifactId>
<description>Apache Atlas Impala Bridge Shim Module</description>
<name>Apache Atlas Impala Bridge Shim</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-plugin-classloader</artifactId>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>impala-hook-api</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.impala.hook;
import org.apache.atlas.plugin.classloader.AtlasPluginClassLoader;
import org.apache.impala.hooks.PostQueryHookContext;
import org.apache.impala.hooks.QueryExecHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is used to convert lineage records from Impala to lineage notifications and
* send them to Atlas.
*/
public class ImpalaLineageHook implements QueryExecHook {
private static final Logger LOG = LoggerFactory.getLogger(ImpalaLineageHook.class);
private static final String ATLAS_PLUGIN_TYPE_IMPALA = "impala";
private static final String ATLAS_IMPALA_LINEAGE_HOOK_IMPL_CLASSNAME =
"org.apache.atlas.impala.hook.ImpalaHook";
private AtlasPluginClassLoader atlasPluginClassLoader = null;
private QueryExecHook impalaLineageHookImpl;
public ImpalaLineageHook() {
}
/**
* Execute Impala post-hook
*/
public void postQueryExecute(PostQueryHookContext context) {
LOG.debug("==> ImpalaLineageHook.postQueryExecute()");
try {
activatePluginClassLoader();
impalaLineageHookImpl.postQueryExecute(context);
} catch (Exception ex) {
String errorMessage = String.format("Error in processing impala lineage: {}", context.getLineageGraph());
LOG.error(errorMessage, ex);
} finally {
deactivatePluginClassLoader();
}
LOG.debug("<== ImpalaLineageHook.postQueryExecute()");
}
/**
* Initialization of Impala post-execution hook
*/
public void impalaStartup() {
LOG.debug("==> ImpalaLineageHook.impalaStartup()");
try {
atlasPluginClassLoader = AtlasPluginClassLoader.getInstance(ATLAS_PLUGIN_TYPE_IMPALA, this.getClass());
@SuppressWarnings("unchecked")
Class<QueryExecHook> cls = (Class<QueryExecHook>) Class
.forName(ATLAS_IMPALA_LINEAGE_HOOK_IMPL_CLASSNAME, true, atlasPluginClassLoader);
activatePluginClassLoader();
impalaLineageHookImpl = cls.newInstance();
impalaLineageHookImpl.impalaStartup();
} catch (Exception excp) {
LOG.error("Error instantiating Atlas hook implementation for Impala lineage", excp);
} finally {
deactivatePluginClassLoader();
}
LOG.debug("<== ImpalaLineageHook.impalaStartup()");
}
private void activatePluginClassLoader() {
if (atlasPluginClassLoader != null) {
atlasPluginClassLoader.activate();
}
}
private void deactivatePluginClassLoader() {
if (atlasPluginClassLoader != null) {
atlasPluginClassLoader.deactivate();
}
}
}
\ No newline at end of file
......@@ -53,7 +53,12 @@
<artifactId>atlas-notification</artifactId>
</dependency>
<!-- to bring up atlas server for integration tests -->
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>impala-hook-api</artifactId>
</dependency>
<!-- to bring up atlas server for integration tests -->
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-client-v2</artifactId>
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.impala.hook;
import org.apache.impala.hooks.PostQueryHookContext;
import org.apache.impala.hooks.QueryExecHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ImpalaHook implements QueryExecHook {
private static final Logger LOG = LoggerFactory.getLogger(ImpalaHook.class);
private ImpalaLineageHook lineageHook;
/**
* Execute Impala post-hook
*/
public void postQueryExecute(PostQueryHookContext context) {
try {
lineageHook.process(context.getLineageGraph());
} catch (Exception ex) {
String errorMessage = String.format("Error in processing impala lineage: {}", context.getLineageGraph());
LOG.error(errorMessage, ex);
}
}
/**
* Initialization of Impala post-execution hook
*/
public void impalaStartup() {
lineageHook = new ImpalaLineageHook();
}
}
......@@ -25,7 +25,6 @@ import java.io.IOException;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.impala.hook.events.BaseImpalaEvent;
import org.apache.atlas.impala.hook.events.CreateImpalaProcess;
import org.apache.atlas.impala.model.IImpalaLineageHook;
import org.apache.atlas.impala.model.ImpalaOperationType;
import org.apache.atlas.impala.model.ImpalaQuery;
import org.apache.atlas.type.AtlasType;
......@@ -37,7 +36,7 @@ import javax.security.auth.Subject;
import javax.security.auth.kerberos.KerberosPrincipal;
import java.util.HashSet;
public class ImpalaLineageHook extends AtlasHook implements IImpalaLineageHook {
public class ImpalaLineageHook extends AtlasHook {
private static final Logger LOG = LoggerFactory.getLogger(ImpalaLineageHook.class);
public static final String ATLAS_ENDPOINT = "atlas.rest.address";
public static final String REALM_SEPARATOR = "@";
......@@ -61,11 +60,21 @@ public class ImpalaLineageHook extends AtlasHook implements IImpalaLineageHook {
}
public void process(String impalaQueryString) throws Exception {
if (StringUtils.isEmpty(impalaQueryString)) {
LOG.warn("==> ImpalaLineageHook.process skips because the impalaQueryString is empty <==");
return;
}
ImpalaQuery lineageQuery = AtlasType.fromJson(impalaQueryString, ImpalaQuery.class);
process(lineageQuery);
}
public void process(ImpalaQuery lineageQuery) throws Exception {
if (lineageQuery == null) {
LOG.warn("==> ImpalaLineageHook.process skips because the query object is null <==");
return;
}
if (StringUtils.isEmpty(lineageQuery.getQueryText())) {
LOG.warn("==> ImpalaLineageHook.process skips because the query text is empty <==");
return;
......
......@@ -36,6 +36,7 @@ import org.apache.atlas.impala.model.ImpalaNode;
import org.apache.atlas.impala.model.ImpalaOperationType;
import org.apache.atlas.impala.model.ImpalaVertexType;
import org.apache.atlas.impala.model.LineageVertex;
import org.apache.atlas.impala.model.LineageVertexMetadata;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo;
......@@ -82,13 +83,7 @@ public abstract class BaseImpalaEvent {
public static final String ATTRIBUTE_DEPENDENCY_TYPE = "dependencyType";
public static final long MILLIS_CONVERT_FACTOR = 1000;
public static final Map<Integer, String> OWNER_TYPE_TO_ENUM_VALUE = new HashMap<>();
static {
OWNER_TYPE_TO_ENUM_VALUE.put(1, "USER");
OWNER_TYPE_TO_ENUM_VALUE.put(2, "ROLE");
OWNER_TYPE_TO_ENUM_VALUE.put(3, "GROUP");
}
protected final AtlasImpalaHookContext context;
protected final Map<String, ImpalaNode> vertexNameMap;
......@@ -109,6 +104,18 @@ public abstract class BaseImpalaEvent {
public String getUserName() { return context.getUserName(); }
public String getTableNameFromVertex(LineageVertex vertex) {
if (vertex.getVertexType() == ImpalaVertexType.COLUMN) {
LineageVertexMetadata metadata = vertex.getMetadata();
if (metadata != null) {
return metadata.getTableName();
}
}
return getTableNameFromColumn(vertex.getVertexId());
}
public String getTableNameFromColumn(String columnName) {
return context.getTableNameFromColumn(columnName);
}
......@@ -453,6 +460,11 @@ public abstract class BaseImpalaEvent {
return ret;
}
/**
* return the createTime of the table.
* @param table
* @return the createTime of the table. Its unit is in milliseconds.
*/
public static long getTableCreateTime(ImpalaNode table) {
return getTableCreateTime(table.getOwnVertex());
}
......@@ -460,6 +472,7 @@ public abstract class BaseImpalaEvent {
public static long getTableCreateTime(LineageVertex tableVertex) {
Long createTime = tableVertex.getCreateTime();
if (createTime != null) {
// the time unit of vertex is in seconds. Convert to milliseconds before sending to Atlas.
return createTime.longValue() * MILLIS_CONVERT_FACTOR;
} else {
return System.currentTimeMillis();
......@@ -511,8 +524,11 @@ public abstract class BaseImpalaEvent {
ret.setAttribute(ATTRIBUTE_OUTPUTS, getObjectIds(outputs));
ret.setAttribute(ATTRIBUTE_NAME, queryStr);
ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, context.getImpalaOperationType());
ret.setAttribute(ATTRIBUTE_START_TIME, context.getLineageQuery().getTimestamp());
ret.setAttribute(ATTRIBUTE_END_TIME, System.currentTimeMillis());
// the unit of timestamp from lineage record is in seconds. Convert to milliseconds to Atlas
ret.setAttribute(ATTRIBUTE_START_TIME, context.getLineageQuery().getTimestamp() * BaseImpalaEvent.MILLIS_CONVERT_FACTOR);
ret.setAttribute(ATTRIBUTE_END_TIME, context.getLineageQuery().getEndTime() * BaseImpalaEvent.MILLIS_CONVERT_FACTOR);
ret.setAttribute(ATTRIBUTE_USER_NAME, getUserName());
ret.setAttribute(ATTRIBUTE_QUERY_TEXT, queryStr);
ret.setAttribute(ATTRIBUTE_QUERY_ID, context.getLineageQuery().getQueryId());
......@@ -529,4 +545,37 @@ public abstract class BaseImpalaEvent {
entitiesWithExtInfo.compact();
}
// The unit of createTime in vertex is in seconds. So the returned value is
// time in seconds.
protected Long getCreateTimeInVertex(LineageVertex vertex) {
if (vertex == null) {
return System.currentTimeMillis() / MILLIS_CONVERT_FACTOR;
}
Long createTime = vertex.getCreateTime();
if (createTime != null) {
return createTime;
}
if (vertex.getVertexType() == ImpalaVertexType.COLUMN) {
LineageVertexMetadata metadata = vertex.getMetadata();
if (metadata != null) {
return metadata.getCreateTime();
}
}
return System.currentTimeMillis() / MILLIS_CONVERT_FACTOR;
}
protected ImpalaNode createTableNode(String tableName, Long createTime) {
// the created table vertex does not have its Id set as it is not referred in edge
LineageVertex tableVertex = new LineageVertex();
tableVertex.setVertexType(ImpalaVertexType.TABLE);
tableVertex.setVertexId(tableName);
tableVertex.setCreateTime(createTime);
return new ImpalaNode(tableVertex);
}
}
......@@ -33,6 +33,7 @@ import org.apache.atlas.impala.model.ImpalaVertexType;
import org.apache.atlas.impala.model.LineageEdge;
import org.apache.atlas.impala.model.ImpalaQuery;
import org.apache.atlas.impala.model.LineageVertex;
import org.apache.atlas.impala.model.LineageVertexMetadata;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.notification.HookNotification;
......@@ -192,7 +193,6 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
AtlasEntity columnLineageProcess = new AtlasEntity(ImpalaDataType.IMPALA_COLUMN_LINEAGE.getName());
// TODO: when there are multiple target IDs, should we use first column name or all of their name?
String columnQualifiedName = (String)impalaProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) +
AtlasImpalaHookContext.QNAME_SEP_PROCESS + outputColumns.get(0).getAttribute(ATTRIBUTE_NAME);
columnLineageProcess.setAttribute(ATTRIBUTE_NAME, columnQualifiedName);
......@@ -233,8 +233,7 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
// get vertex map with key being its id and
// ImpalaNode map with its own vertex's vertexId as its key
for (LineageVertex vertex : lineageQuery.getVertices()) {
verticesMap.put(vertex.getId(), vertex);
vertexNameMap.put(vertex.getVertexId(), new ImpalaNode(vertex));
updateVertexMap(vertex);
}
// get set of source ID and set of target Id
......@@ -254,6 +253,29 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
outputNodes.addAll(outputMap.values());
}
// Update internal maps using this vertex.
private void updateVertexMap(LineageVertex vertex) {
verticesMap.put(vertex.getId(), vertex);
vertexNameMap.put(vertex.getVertexId(), new ImpalaNode(vertex));
if (vertex.getVertexType() == ImpalaVertexType.COLUMN) {
LineageVertexMetadata metadata = vertex.getMetadata();
if (metadata == null) {
return;
}
// if the vertex is column and contains metadata, create a vertex for its table
String tableName = metadata.getTableName();
ImpalaNode tableNode = vertexNameMap.get(tableName);
if (tableNode == null) {
tableNode = createTableNode(tableName, metadata.getCreateTime());
vertexNameMap.put(tableName, tableNode);
}
}
}
/**
* From the list of Ids and Id to Vertices map, generate the Table name to ImpalaNode map.
* @param idSet the list of Ids. They are from lineage edges
......@@ -274,10 +296,11 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
if (ImpalaVertexType.COLUMN.equals(vertex.getVertexType())) {
// add column to its table node
String tableName = getTableNameFromColumn(vertex.getVertexId());
String tableName = getTableNameFromVertex(vertex);
if (tableName == null) {
LOG.warn("cannot find tableName for vertex with id: {}, column name : {}",
id, vertex.getVertexId() == null? "null" : vertex.getVertexId());
continue;
}
......@@ -289,7 +312,9 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
if (tableNode == null) {
LOG.warn("cannot find table node for vertex with id: {}, column name : {}",
id, vertex.getVertexId());
continue;
tableNode = createTableNode(tableName, getCreateTimeInVertex(null));
vertexNameMap.put(tableName, tableNode);
}
returnTableMap.put(tableName, tableNode);
......
......@@ -43,7 +43,10 @@ public class LineageVertex {
// specify the name of the entity
private String vertexId;
// It is optional, and could be null. It is only set if the verType is "TABLE"
// It is optional, and could be null. It is only set if the entity is a column, and this field contains metadata of its table.
private LineageVertexMetadata metadata;
// It is optional. Its unit in seconds.
private Long createTime;
public Long getId() { return id; }
......@@ -56,6 +59,10 @@ public class LineageVertex {
return vertexId;
}
public LineageVertexMetadata getMetadata() {
return metadata;
}
public Long getCreateTime() { return createTime; }
public void setId(Long id) {
......@@ -70,5 +77,7 @@ public class LineageVertex {
this.vertexId = vertexId;
}
public void setMetadata(LineageVertexMetadata metadata) { this.metadata = metadata; }
public void setCreateTime(Long createTime) { this.createTime = createTime; }
}
\ No newline at end of file
......@@ -18,11 +18,31 @@
package org.apache.atlas.impala.model;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
/**
* Define the interface to process Impala lineage record
* This represents optional metadata in Impala's lineage vertex entity.
*/
public interface IImpalaLineageHook {
@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown=true)
public class LineageVertexMetadata {
// specify the name of the table
private String tableName;
// the create time of the table. Its unit is in seconds.
private Long createTime;
public String getTableName() { return tableName; }
public Long getCreateTime() { return createTime; }
public void setTableName(String tableName) { this.tableName = tableName; }
// The input is a serialized string of an Impala lineage record
void process(String impalaQueryString) throws Exception;
}
\ No newline at end of file
public void setCreateTime(Long createTime) { this.createTime = createTime; }
}
......@@ -25,8 +25,10 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
......@@ -116,9 +118,58 @@ public class ImpalaLineageITBase {
return (String) entity.getGuid();
}
protected String assertEntityIsRegistered(final String typeName, List<String> processQFNames,
final AssertPredicates assertPredicates) throws Exception {
List<Map<String, String>> attributesList = new ArrayList<>();
for (String processName : processQFNames) {
attributesList.add(Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, processName));
}
return waitForWithReturn(80000, new PredicateWithReturn() {
@Override
public String evaluate() throws Exception {
AtlasEntity.AtlasEntitiesWithExtInfo atlasEntitiesWithExtInfo = atlasClientV2.getEntitiesByAttribute(typeName, attributesList);
List<AtlasEntity> entities = atlasEntitiesWithExtInfo.getEntities();
assertNotNull(entities);
if (assertPredicates != null) {
return assertPredicates.assertOnEntities(entities);
}
return null;
}
});
}
protected String assertProcessIsRegistered(List<String> processQFNames, String queryString) throws Exception {
try {
LOG.debug("Searching for process with query {}", queryString);
return assertEntityIsRegistered(ImpalaDataType.IMPALA_PROCESS.getName(), processQFNames, new AssertPredicates() {
@Override
public String assertOnEntities(final List<AtlasEntity> entities) throws Exception {
for (AtlasEntity entity : entities) {
List<String> recentQueries = (List<String>) entity
.getAttribute(ATTRIBUTE_RECENT_QUERIES);
if (queryString.equalsIgnoreCase(recentQueries.get(0)))
return entity.getGuid();
}
throw new IllegalStateException("Not found entity with matching query");
}
});
} catch(Exception e) {
LOG.error("Exception : ", e);
throw e;
}
}
protected String assertProcessIsRegistered(String processQFName, String queryString) throws Exception {
try {
LOG.debug("Searching for process with query {}", processQFName);
LOG.debug("Searching for process with qualified name {} and query {}", processQFName, queryString);
return assertEntityIsRegistered(ImpalaDataType.IMPALA_PROCESS.getName(), ATTRIBUTE_QUALIFIED_NAME, processQFName, new AssertPredicate() {
@Override
......@@ -180,6 +231,20 @@ public class ImpalaLineageITBase {
void assertOnEntity(AtlasEntity entity) throws Exception;
}
public interface AssertPredicates {
String assertOnEntities(List<AtlasEntity> entities) throws Exception;
}
public interface PredicateWithReturn {
/**
* Perform a predicate evaluation.
*
* @return the boolean result of the evaluation.
* @throws Exception thrown if the predicate evaluation could not evaluate.
*/
String evaluate() throws Exception;
}
public interface Predicate {
/**
* Perform a predicate evaluation.
......@@ -214,6 +279,29 @@ public class ImpalaLineageITBase {
}
}
/**
* Wait for a condition, expressed via a {@link Predicate} to become true.
*
* @param timeout maximum time in milliseconds to wait for the predicate to become true.
* @param predicate predicate waiting on.
*/
protected String waitForWithReturn(int timeout, PredicateWithReturn predicate) throws Exception {
ParamChecker.notNull(predicate, "predicate");
long mustEnd = System.currentTimeMillis() + timeout;
while (true) {
try {
return predicate.evaluate();
} catch(Error | Exception e) {
if (System.currentTimeMillis() >= mustEnd) {
fail("Assertions failed. Failing after waiting for timeout " + timeout + " msecs", e);
}
LOG.debug("Waiting up to {} msec as assertion failed", mustEnd - System.currentTimeMillis(), e);
Thread.sleep(5000);
}
}
}
public static String lower(String str) {
if (StringUtils.isEmpty(str)) {
return null;
......
......@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
import org.apache.atlas.impala.hook.ImpalaLineageHook;
import org.apache.atlas.impala.hook.events.BaseImpalaEvent;
import org.apache.atlas.impala.model.ImpalaQuery;
import org.testng.annotations.Test;
......@@ -28,8 +29,6 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
public static final long TABLE_CREATE_TIME_SOURCE = 1554750070;
public static final long TABLE_CREATE_TIME = 1554750072;
private static String dir = System.getProperty("user.dir") + "/src/test/resources/";
private static String IMPALA = dir + "impala3.json";
private static String IMPALA_WAL = dir + "WALimpala.wal";
/**
* This tests
......@@ -39,6 +38,11 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
*/
@Test
public void testCreateViewFromFile() {
// this file contains a single lineage record for "create view".
// It has table vertex with createTime
String IMPALA = dir + "impalaCreateView.json";
String IMPALA_WAL = dir + "WALimpala.wal";
List<ImpalaQuery> lineageList = new ArrayList<>();
ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
......@@ -79,6 +83,68 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
/**
* This tests
* 1) ImpalaLineageTool can parse one lineage file that contains "create view" command lineage,
* but there is no table vertex with createTime.
* 2) Lineage is sent to Atlas
* 3) Atlas can get this lineage from Atlas
*/
@Test
public void testCreateViewNoCreateTimeFromFile() {
// this file contains a single lineage record for "create view".
// there is no table vertex with createTime, which is lineage record generated by Impala
// originally. The table create time is hard-coded before Impala fixes this issue.
String IMPALA = dir + "impalaCreateViewNoCreateTime.json";
String IMPALA_WAL = dir + "WALimpala.wal";
List<ImpalaQuery> lineageList = new ArrayList<>();
ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
try {
// create database and tables to simulate Impala behavior that Impala updates metadata
// to HMS and HMSHook sends the metadata to Atlas, which has to happen before
// Atlas can handle lineage notification
String dbName = "db_2";
createDatabase(dbName);
String sourceTableName = "table_1";
createTable(dbName, sourceTableName,"(id string, count int)", false);
String targetTableName = "view_1";
createTable(dbName, targetTableName,"(count int, id string)", false);
// process lineage record, and send corresponding notification to Atlas
String[] args = new String[]{"-d", "./", "-p", "impala"};
ImpalaLineageTool toolInstance = new ImpalaLineageTool(args);
Long beforeCreateTime = System.currentTimeMillis() / BaseImpalaEvent.MILLIS_CONVERT_FACTOR;
toolInstance.importHImpalaEntities(impalaLineageHook, IMPALA, IMPALA_WAL);
Long afterCreateTime = System.currentTimeMillis() / BaseImpalaEvent.MILLIS_CONVERT_FACTOR;
String processQFNameWithoutTime =
dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS;
processQFNameWithoutTime = processQFNameWithoutTime.toLowerCase();
List<String> processQFNames = new ArrayList<>();
String createTime = new Long(beforeCreateTime.longValue()*1000).toString();
processQFNames.add(processQFNameWithoutTime + createTime);
if (beforeCreateTime != afterCreateTime) {
createTime = new Long(afterCreateTime.longValue() * 1000).toString();
processQFNames.add(processQFNameWithoutTime + createTime);
}
// verify the process is saved in Atlas. the value is from info in IMPALA_4.
// There is no createTime in lineage record, so we don't know the process qualified name
// And can only verify the process is created for the given query.
assertProcessIsRegistered(processQFNames,"create view " + dbName + "." + targetTableName + " as select count, id from " + dbName + "." + sourceTableName);
} catch (Exception e) {
System.out.print("Appending file error");
}
}
/**
* This tests
* 1) ImpalaLineageTool can parse one lineage file that contains "create table as select" command lineage,
* there is table vertex with createTime.
* 2) Lineage is sent to Atlas
......@@ -86,7 +152,7 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
*/
@Test
public void testCreateTableAsSelectFromFile() throws Exception {
String IMPALA = dir + "impala5.json";
String IMPALA = dir + "impalaCreateTableAsSelect.json";
String IMPALA_WAL = dir + "WALimpala.wal";
ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
......@@ -130,7 +196,7 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
*/
@Test
public void testAlterViewAsSelectFromFile() throws Exception {
String IMPALA = dir + "impala6.json";
String IMPALA = dir + "impalaAlterViewAsSelect.json";
String IMPALA_WAL = dir + "WALimpala.wal";
ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
......@@ -174,7 +240,7 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
*/
@Test
public void testInsertIntoAsSelectFromFile() throws Exception {
String IMPALA = dir + "impala7.json";
String IMPALA = dir + "impalaInsertIntoAsSelect.json";
String IMPALA_WAL = dir + "WALimpala.wal";
ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
......
{
"queryText": "INSERT INTO TABLE db_wlwspnwgfp.tbl_wlwspnwgfp_7 VALUES (1, 'foo', 'foo', 'foo', 'foo', 1), (2, 'foo', 'foo', 'foo', 'foo', 2)",
"queryId": "4c4cb5dc22194e20:c440f5fe00000000",
"hash": "d936d531989bd0f46d636bd05fc2540c",
"user": "impala@GCE.ABCDEFGH.COM",
"timestamp": 1553528501,
"endTime": 1553528505,
"edges": [
{
"sources": [],
"targets": [
0
],
"edgeType": "PROJECTION"
},
{
"sources": [],
"targets": [
1
],
"edgeType": "PROJECTION"
},
{
"sources": [],
"targets": [
2
],
"edgeType": "PROJECTION"
},
{
"sources": [],
"targets": [
3
],
"edgeType": "PROJECTION"
},
{
"sources": [],
"targets": [
4
],
"edgeType": "PROJECTION"
},
{
"sources": [],
"targets": [
5
],
"edgeType": "PROJECTION"
}
],
"vertices": [
{
"id": 0,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_id"
},
{
"id": 1,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_name"
},
{
"id": 2,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_street"
},
{
"id": 3,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_city"
},
{
"id": 4,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_state"
},
{
"id": 5,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_zipcode"
}
]
}
\ No newline at end of file
{
"queryText": "with a AS (SELECT * FROM db_wlwspnwgfp.tbl_wlwspnwgfp_1), b AS (SELECT * FROM db_wlwspnwgfp.tbl_wlwspnwgfp_2), c AS (SELECT * FROM db_wlwspnwgfp.tbl_wlwspnwgfp_3), d AS (SELECT * FROM db_wlwspnwgfp.tbl_wlwspnwgfp_4), e AS (SELECT * FROM db_wlwspnwgfp.tbl_wlwspnwgfp_5), f AS (SELECT * FROM db_wlwspnwgfp.tbl_wlwspnwgfp_6) INSERT INTO table db_wlwspnwgfp.tbl_wlwspnwgfp_7 SELECT * FROM a UNION SELECT * FROM b UNION SELECT * FROM c UNION SELECT * FROM d UNION SELECT * FROM e UNION SELECT * FROM f",
"queryId": "b9423a1de88a33c3:997879c000000000",
"hash": "a6ff7959c66c23499346eef791c66439",
"user": "impala@GCE.ABCDEFGH.COM",
"timestamp": 1553528521,
"endTime": 1553528525,
"edges": [
{
"sources": [
1,
2,
3,
4,
5,
6
],
"targets": [
0
],
"edgeType": "PROJECTION"
},
{
"sources": [
8,
9,
10,
11,
12,
13
],
"targets": [
7
],
"edgeType": "PROJECTION"
},
{
"sources": [
15,
16,
17,
18,
19,
20
],
"targets": [
14
],
"edgeType": "PROJECTION"
},
{
"sources": [
22,
23,
24,
25,
26,
27
],
"targets": [
21
],
"edgeType": "PROJECTION"
},
{
"sources": [
29,
30,
31,
32,
33,
34
],
"targets": [
28
],
"edgeType": "PROJECTION"
},
{
"sources": [
36,
37,
38,
39,
40,
41
],
"targets": [
35
],
"edgeType": "PROJECTION"
}
],
"vertices": [
{
"id": 0,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_id"
},
{
"id": 1,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_1.col_tbl_wlwspnwgfp_1_id"
},
{
"id": 2,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_2.col_tbl_wlwspnwgfp_2_id"
},
{
"id": 3,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_3.col_tbl_wlwspnwgfp_3_id"
},
{
"id": 4,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_4.col_tbl_wlwspnwgfp_4_id"
},
{
"id": 5,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_5.col_tbl_wlwspnwgfp_5_id"
},
{
"id": 6,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_6.col_tbl_wlwspnwgfp_6_id"
},
{
"id": 7,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_name"
},
{
"id": 8,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_1.col_tbl_wlwspnwgfp_1_name"
},
{
"id": 9,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_2.col_tbl_wlwspnwgfp_2_name"
},
{
"id": 10,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_3.col_tbl_wlwspnwgfp_3_name"
},
{
"id": 11,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_4.col_tbl_wlwspnwgfp_4_name"
},
{
"id": 12,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_5.col_tbl_wlwspnwgfp_5_name"
},
{
"id": 13,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_6.col_tbl_wlwspnwgfp_6_name"
},
{
"id": 14,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_street"
},
{
"id": 15,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_1.col_tbl_wlwspnwgfp_1_street"
},
{
"id": 16,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_2.col_tbl_wlwspnwgfp_2_street"
},
{
"id": 17,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_3.col_tbl_wlwspnwgfp_3_street"
},
{
"id": 18,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_4.col_tbl_wlwspnwgfp_4_street"
},
{
"id": 19,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_5.col_tbl_wlwspnwgfp_5_street"
},
{
"id": 20,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_6.col_tbl_wlwspnwgfp_6_street"
},
{
"id": 21,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_city"
},
{
"id": 22,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_1.col_tbl_wlwspnwgfp_1_city"
},
{
"id": 23,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_2.col_tbl_wlwspnwgfp_2_city"
},
{
"id": 24,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_3.col_tbl_wlwspnwgfp_3_city"
},
{
"id": 25,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_4.col_tbl_wlwspnwgfp_4_city"
},
{
"id": 26,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_5.col_tbl_wlwspnwgfp_5_city"
},
{
"id": 27,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_6.col_tbl_wlwspnwgfp_6_city"
},
{
"id": 28,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_state"
},
{
"id": 29,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_1.col_tbl_wlwspnwgfp_1_state"
},
{
"id": 30,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_2.col_tbl_wlwspnwgfp_2_state"
},
{
"id": 31,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_3.col_tbl_wlwspnwgfp_3_state"
},
{
"id": 32,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_4.col_tbl_wlwspnwgfp_4_state"
},
{
"id": 33,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_5.col_tbl_wlwspnwgfp_5_state"
},
{
"id": 34,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_6.col_tbl_wlwspnwgfp_6_state"
},
{
"id": 35,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_zipcode"
},
{
"id": 36,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_1.col_tbl_wlwspnwgfp_1_zipcode"
},
{
"id": 37,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_2.col_tbl_wlwspnwgfp_2_zipcode"
},
{
"id": 38,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_3.col_tbl_wlwspnwgfp_3_zipcode"
},
{
"id": 39,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_4.col_tbl_wlwspnwgfp_4_zipcode"
},
{
"id": 40,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_5.col_tbl_wlwspnwgfp_5_zipcode"
},
{
"id": 41,
"vertexType": "COLUMN",
"vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_6.col_tbl_wlwspnwgfp_6_zipcode"
}
]
}
\ No newline at end of file
......@@ -29,34 +29,38 @@
{
"id":0,
"vertexType":"COLUMN",
"vertexId":"db_4.view_1.count"
"vertexId":"db_4.view_1.count",
"metadata": {
"tableName": "db_4.view_1",
"createTime": 1554750072
}
},
{
"id":1,
"vertexType":"COLUMN",
"vertexId":"db_4.table_1.count"
"vertexId":"db_4.table_1.count",
"metadata": {
"tableName": "db_4.table_1",
"createTime": 1554750070
}
},
{
"id":2,
"vertexType":"COLUMN",
"vertexId":"db_4.view_1.id"
"vertexId":"db_4.view_1.id",
"metadata": {
"tableName": "db_4.view_1",
"createTime": 1554750072
}
},
{
"id":3,
"vertexType":"COLUMN",
"vertexId":"db_4.table_1.id"
},
{
"id":4,
"vertexType":"TABLE",
"vertexId":"db_4.table_1",
"createTime":1554750070
},
{
"id":5,
"vertexType":"TABLE",
"vertexId":"db_4.view_1",
"createTime":1554750072
"vertexId":"db_4.table_1.id",
"metadata": {
"tableName": "db_4.table_1",
"createTime": 1554750070
}
}
]
}
\ No newline at end of file
......@@ -29,34 +29,38 @@
{
"id":0,
"vertexType":"COLUMN",
"vertexId":"db_3.table_2.count"
"vertexId":"db_3.table_2.count",
"metadata": {
"tableName": "db_3.table_2",
"createTime": 1554750072
}
},
{
"id":1,
"vertexType":"COLUMN",
"vertexId":"db_3.table_1.count"
"vertexId":"db_3.table_1.count",
"metadata": {
"tableName": "db_3.table_1",
"createTime": 1554750070
}
},
{
"id":2,
"vertexType":"COLUMN",
"vertexId":"db_3.table_2.id"
"vertexId":"db_3.table_2.id",
"metadata": {
"tableName": "db_3.table_2",
"createTime": 1554750072
}
},
{
"id":3,
"vertexType":"COLUMN",
"vertexId":"db_3.table_1.id"
},
{
"id":4,
"vertexType":"TABLE",
"vertexId":"db_3.table_1",
"createTime":1554750070
},
{
"id":5,
"vertexType":"TABLE",
"vertexId":"db_3.table_2",
"createTime":1554750072
"vertexId":"db_3.table_1.id",
"metadata": {
"tableName": "db_3.table_1",
"createTime": 1554750070
}
}
]
}
\ No newline at end of file
{
"queryText":"create view db_1.view_1 as select count, id from db_1.table_1",
"queryId":"3a441d0c130962f8:7f634aec00000000",
"hash":"64ff0425ccdfaada53e3f2fd76f566f7",
"user":"admin",
"timestamp":1554750072,
"endTime":1554750554,
"edges":[
{
"sources":[
1
],
"targets":[
0
],
"edgeType":"PROJECTION"
},
{
"sources":[
3
],
"targets":[
2
],
"edgeType":"PROJECTION"
}
],
"vertices":[
{
"id":0,
"vertexType":"COLUMN",
"vertexId":"db_1.view_1.count",
"metadata": {
"tableName": "db_1.view_1",
"createTime": 1554750072
}
},
{
"id":1,
"vertexType":"COLUMN",
"vertexId":"db_1.table_1.count",
"metadata": {
"tableName": "db_1.table_1",
"createTime": 1554750070
}
},
{
"id":2,
"vertexType":"COLUMN",
"vertexId":"db_1.view_1.id",
"metadata": {
"tableName": "db_1.view_1",
"createTime": 1554750072
}
},
{
"id":3,
"vertexType":"COLUMN",
"vertexId":"db_1.table_1.id",
"metadata": {
"tableName": "db_1.table_1",
"createTime": 1554750070
}
}
]
}
\ No newline at end of file
{
"queryText":"create view db_1.view_1 as select count, id from db_1.table_1",
"queryId":"3a441d0c130962f8:7f634aec00000000",
"queryText":"create view db_2.view_1 as select count, id from db_2.table_1",
"queryId":"5a441d0c130962f8:7f634aec00000000",
"hash":"64ff0425ccdfaada53e3f2fd76f566f7",
"user":"admin",
"timestamp":1554750072,
......@@ -29,34 +29,22 @@
{
"id":0,
"vertexType":"COLUMN",
"vertexId":"db_1.view_1.count"
"vertexId":"db_2.view_1.count"
},
{
"id":1,
"vertexType":"COLUMN",
"vertexId":"db_1.table_1.count"
"vertexId":"db_2.table_1.count"
},
{
"id":2,
"vertexType":"COLUMN",
"vertexId":"db_1.view_1.id"
"vertexId":"db_2.view_1.id"
},
{
"id":3,
"vertexType":"COLUMN",
"vertexId":"db_1.table_1.id"
},
{
"id":4,
"vertexType":"TABLE",
"vertexId":"db_1.table_1",
"createTime":1554750070
},
{
"id":5,
"vertexType":"TABLE",
"vertexId":"db_1.view_1",
"createTime":1554750072
"vertexId":"db_2.table_1.id"
}
]
}
\ No newline at end of file
......@@ -28,7 +28,7 @@
"sources":[
],
"targets":[
6
4
],
"edgeType":"PROJECTION"
}
......@@ -37,39 +37,47 @@
{
"id":0,
"vertexType":"COLUMN",
"vertexId":"db_5.table_2.count"
"vertexId":"db_5.table_2.count",
"metadata": {
"tableName": "db_5.table_2",
"createTime": 1554750072
}
},
{
"id":1,
"vertexType":"COLUMN",
"vertexId":"db_5.table_1.count"
"vertexId":"db_5.table_1.count",
"metadata": {
"tableName": "db_5.table_1",
"createTime": 1554750070
}
},
{
"id":2,
"vertexType":"COLUMN",
"vertexId":"db_5.table_2.id"
"vertexId":"db_5.table_2.id",
"metadata": {
"tableName": "db_5.table_2",
"createTime": 1554750072
}
},
{
"id":3,
"vertexType":"COLUMN",
"vertexId":"db_5.table_1.id"
"vertexId":"db_5.table_1.id",
"metadata": {
"tableName": "db_5.table_1",
"createTime": 1554750070
}
},
{
"id":4,
"vertexType":"TABLE",
"vertexId":"db_5.table_1",
"createTime":1554750070
},
{
"id":5,
"vertexType":"TABLE",
"vertexId":"db_5.table_2",
"createTime":1554750072
},
{
"id":6,
"vertexType":"COLUMN",
"vertexId":"db_5.table_2.int_col"
"vertexId":"db_5.table_2.int_col",
"metadata": {
"tableName": "db_5.table_2",
"createTime": 1554750072
}
}
]
}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>apache-atlas</artifactId>
<groupId>org.apache.atlas</groupId>
<version>3.0.0-SNAPSHOT</version>
<relativePath>../../</relativePath>
</parent>
<artifactId>impala-hook-api</artifactId>
<description>Apache Atlas Impala Hook API Module</description>
<name>Apache Atlas Impala Hook API</name>
<packaging>jar</packaging>
</project>
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.impala.hooks;
import java.util.Objects;
/**
* {@link PostQueryHookContext} encapsulates immutable information sent from the
* BE to a post-query hook.
*/
public class PostQueryHookContext {
private final String lineageGraph;
public PostQueryHookContext(String lineageGraph) {
this.lineageGraph = Objects.requireNonNull(lineageGraph);
}
/**
* Returns the lineage graph sent from the backend during
* {@link QueryExecHook#postQueryExecute(PostQueryHookContext)}. This graph
* object will generally contain more information than it did when it was
* first constructed in the frontend, because the backend will have filled
* in additional information.
* <p>
* The returned object is serilized json string of the graph sent from the backend.
* </p>
*
* @return lineage graph from the query that executed
*/
public String getLineageGraph() {
return lineageGraph;
}
@Override
public String toString() {
return "PostQueryHookContext{" +
"lineageGraph='" + lineageGraph + '\'' +
'}';
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.impala.hooks;
/**
* {@link QueryExecHook} is the interface for implementations that
* can hook into supported places in Impala query execution.
*/
public interface QueryExecHook {
/**
* Hook method invoked when the Impala daemon starts up.
* <p>
* This method will block completion of daemon startup, so you should
* execute any long-running actions asynchronously.
* </p>
* <h3>Error-Handling</h3>
* <p>
* Any {@link Exception} thrown from this method will effectively fail
* Impala startup with an error. Implementations should handle all
* exceptions as gracefully as they can, even if the end result is to
* throw them.
* </p>
*/
void impalaStartup();
/**
* Hook method invoked asynchronously when a (qualifying) Impala query
* has executed, but before it has returned.
* <p>
* This method will not block the invoking or subsequent queries,
* but may block future hook invocations if it runs for too long
* </p>
* <h3>Error-Handling</h3>
* <p>
* Any {@link Exception} thrown from this method will only be caught
* and logged and will not affect the result of any query.
* </p>
*
* @param context object containing the post execution context
* of the query
*/
void postQueryExecute(PostQueryHookContext context);
}
\ No newline at end of file
......@@ -784,7 +784,9 @@
<module>addons/hbase-testing-util</module>
<module>addons/kafka-bridge</module>
<module>tools/classification-updater</module>
<module>addons/impala-hook-api</module>
<module>addons/impala-bridge</module>
<module>addons/impala-bridge-shim</module>
<module>distro</module>
</modules>
......@@ -1519,6 +1521,24 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>impala-hook-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>impala-bridge</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>impala-bridge-shim</artifactId>
<version>${project.version}</version>
</dependency>
<!-- API documentation -->
<dependency>
<groupId>com.webcohesion.enunciate</groupId>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment