Commit 5196fd71 by lina.li Committed by Sarath Subramanian

ATLAS-3230: Change Impala Hook API Names

parent 51290693
...@@ -19,8 +19,8 @@ ...@@ -19,8 +19,8 @@
package org.apache.atlas.impala.hook; package org.apache.atlas.impala.hook;
import org.apache.atlas.plugin.classloader.AtlasPluginClassLoader; import org.apache.atlas.plugin.classloader.AtlasPluginClassLoader;
import org.apache.impala.hooks.PostQueryHookContext; import org.apache.impala.hooks.QueryCompleteContext;
import org.apache.impala.hooks.QueryExecHook; import org.apache.impala.hooks.QueryEventHook;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory; ...@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
* This class is used to convert lineage records from Impala to lineage notifications and * This class is used to convert lineage records from Impala to lineage notifications and
* send them to Atlas. * send them to Atlas.
*/ */
public class ImpalaLineageHook implements QueryExecHook { public class ImpalaLineageHook implements QueryEventHook {
private static final Logger LOG = LoggerFactory.getLogger(ImpalaLineageHook.class); private static final Logger LOG = LoggerFactory.getLogger(ImpalaLineageHook.class);
private static final String ATLAS_PLUGIN_TYPE_IMPALA = "impala"; private static final String ATLAS_PLUGIN_TYPE_IMPALA = "impala";
...@@ -36,20 +36,20 @@ public class ImpalaLineageHook implements QueryExecHook { ...@@ -36,20 +36,20 @@ public class ImpalaLineageHook implements QueryExecHook {
"org.apache.atlas.impala.hook.ImpalaHook"; "org.apache.atlas.impala.hook.ImpalaHook";
private AtlasPluginClassLoader atlasPluginClassLoader = null; private AtlasPluginClassLoader atlasPluginClassLoader = null;
private QueryExecHook impalaLineageHookImpl; private QueryEventHook impalaLineageHookImpl;
public ImpalaLineageHook() { public ImpalaLineageHook() {
} }
/** /**
* Execute Impala post-hook * Execute Impala hook
*/ */
public void postQueryExecute(PostQueryHookContext context) { public void onQueryComplete(QueryCompleteContext context) {
LOG.debug("==> ImpalaLineageHook.postQueryExecute()"); LOG.debug("==> ImpalaLineageHook.onQueryComplete()");
try { try {
activatePluginClassLoader(); activatePluginClassLoader();
impalaLineageHookImpl.postQueryExecute(context); impalaLineageHookImpl.onQueryComplete(context);
} catch (Exception ex) { } catch (Exception ex) {
String errorMessage = String.format("Error in processing impala lineage: {}", context.getLineageGraph()); String errorMessage = String.format("Error in processing impala lineage: {}", context.getLineageGraph());
LOG.error(errorMessage, ex); LOG.error(errorMessage, ex);
...@@ -57,33 +57,33 @@ public class ImpalaLineageHook implements QueryExecHook { ...@@ -57,33 +57,33 @@ public class ImpalaLineageHook implements QueryExecHook {
deactivatePluginClassLoader(); deactivatePluginClassLoader();
} }
LOG.debug("<== ImpalaLineageHook.postQueryExecute()"); LOG.debug("<== ImpalaLineageHook.onQueryComplete()");
} }
/** /**
* Initialization of Impala post-execution hook * Initialization of Impala hook
*/ */
public void impalaStartup() { public void onImpalaStartup() {
LOG.debug("==> ImpalaLineageHook.impalaStartup()"); LOG.debug("==> ImpalaLineageHook.onImpalaStartup()");
try { try {
atlasPluginClassLoader = AtlasPluginClassLoader.getInstance(ATLAS_PLUGIN_TYPE_IMPALA, this.getClass()); atlasPluginClassLoader = AtlasPluginClassLoader.getInstance(ATLAS_PLUGIN_TYPE_IMPALA, this.getClass());
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Class<QueryExecHook> cls = (Class<QueryExecHook>) Class Class<QueryEventHook> cls = (Class<QueryEventHook>) Class
.forName(ATLAS_IMPALA_LINEAGE_HOOK_IMPL_CLASSNAME, true, atlasPluginClassLoader); .forName(ATLAS_IMPALA_LINEAGE_HOOK_IMPL_CLASSNAME, true, atlasPluginClassLoader);
activatePluginClassLoader(); activatePluginClassLoader();
impalaLineageHookImpl = cls.newInstance(); impalaLineageHookImpl = cls.newInstance();
impalaLineageHookImpl.impalaStartup(); impalaLineageHookImpl.onImpalaStartup();
} catch (Exception excp) { } catch (Exception excp) {
LOG.error("Error instantiating Atlas hook implementation for Impala lineage", excp); LOG.error("Error instantiating Atlas hook implementation for Impala lineage", excp);
} finally { } finally {
deactivatePluginClassLoader(); deactivatePluginClassLoader();
} }
LOG.debug("<== ImpalaLineageHook.impalaStartup()"); LOG.debug("<== ImpalaLineageHook.onImpalaStartup()");
} }
private void activatePluginClassLoader() { private void activatePluginClassLoader() {
......
...@@ -18,20 +18,20 @@ ...@@ -18,20 +18,20 @@
package org.apache.atlas.impala.hook; package org.apache.atlas.impala.hook;
import org.apache.impala.hooks.PostQueryHookContext; import org.apache.impala.hooks.QueryCompleteContext;
import org.apache.impala.hooks.QueryExecHook; import org.apache.impala.hooks.QueryEventHook;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class ImpalaHook implements QueryExecHook { public class ImpalaHook implements QueryEventHook {
private static final Logger LOG = LoggerFactory.getLogger(ImpalaHook.class); private static final Logger LOG = LoggerFactory.getLogger(ImpalaHook.class);
private ImpalaLineageHook lineageHook; private ImpalaLineageHook lineageHook;
/** /**
* Execute Impala post-hook * Execute Impala hook
*/ */
public void postQueryExecute(PostQueryHookContext context) { public void onQueryComplete(QueryCompleteContext context) {
try { try {
lineageHook.process(context.getLineageGraph()); lineageHook.process(context.getLineageGraph());
} catch (Exception ex) { } catch (Exception ex) {
...@@ -41,9 +41,9 @@ public class ImpalaHook implements QueryExecHook { ...@@ -41,9 +41,9 @@ public class ImpalaHook implements QueryExecHook {
} }
/** /**
* Initialization of Impala post-execution hook * Initialization of Impala hook
*/ */
public void impalaStartup() { public void onImpalaStartup() {
lineageHook = new ImpalaLineageHook(); lineageHook = new ImpalaLineageHook();
} }
} }
...@@ -563,7 +563,7 @@ public abstract class BaseImpalaEvent { ...@@ -563,7 +563,7 @@ public abstract class BaseImpalaEvent {
LineageVertexMetadata metadata = vertex.getMetadata(); LineageVertexMetadata metadata = vertex.getMetadata();
if (metadata != null) { if (metadata != null) {
return metadata.getCreateTime(); return metadata.getTableCreateTime();
} }
} }
......
...@@ -270,7 +270,7 @@ public class CreateImpalaProcess extends BaseImpalaEvent { ...@@ -270,7 +270,7 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
ImpalaNode tableNode = vertexNameMap.get(tableName); ImpalaNode tableNode = vertexNameMap.get(tableName);
if (tableNode == null) { if (tableNode == null) {
tableNode = createTableNode(tableName, metadata.getCreateTime()); tableNode = createTableNode(tableName, metadata.getTableCreateTime());
vertexNameMap.put(tableName, tableNode); vertexNameMap.put(tableName, tableNode);
} }
} }
......
...@@ -36,13 +36,13 @@ public class LineageVertexMetadata { ...@@ -36,13 +36,13 @@ public class LineageVertexMetadata {
private String tableName; private String tableName;
// the create time of the table. Its unit is in seconds. // the create time of the table. Its unit is in seconds.
private Long createTime; private Long tableCreateTime;
public String getTableName() { return tableName; } public String getTableName() { return tableName; }
public Long getCreateTime() { return createTime; } public Long getTableCreateTime() { return tableCreateTime; }
public void setTableName(String tableName) { this.tableName = tableName; } public void setTableName(String tableName) { this.tableName = tableName; }
public void setCreateTime(Long createTime) { this.createTime = createTime; } public void setTableCreateTime(Long createTime) { this.tableCreateTime = createTime; }
} }
...@@ -32,7 +32,7 @@ ...@@ -32,7 +32,7 @@
"vertexId":"db_4.view_1.count", "vertexId":"db_4.view_1.count",
"metadata": { "metadata": {
"tableName": "db_4.view_1", "tableName": "db_4.view_1",
"createTime": 1554750072 "tableCreateTime": 1554750072
} }
}, },
{ {
...@@ -41,7 +41,7 @@ ...@@ -41,7 +41,7 @@
"vertexId":"db_4.table_1.count", "vertexId":"db_4.table_1.count",
"metadata": { "metadata": {
"tableName": "db_4.table_1", "tableName": "db_4.table_1",
"createTime": 1554750070 "tableCreateTime": 1554750070
} }
}, },
{ {
...@@ -50,7 +50,7 @@ ...@@ -50,7 +50,7 @@
"vertexId":"db_4.view_1.id", "vertexId":"db_4.view_1.id",
"metadata": { "metadata": {
"tableName": "db_4.view_1", "tableName": "db_4.view_1",
"createTime": 1554750072 "tableCreateTime": 1554750072
} }
}, },
{ {
...@@ -59,7 +59,7 @@ ...@@ -59,7 +59,7 @@
"vertexId":"db_4.table_1.id", "vertexId":"db_4.table_1.id",
"metadata": { "metadata": {
"tableName": "db_4.table_1", "tableName": "db_4.table_1",
"createTime": 1554750070 "tableCreateTime": 1554750070
} }
} }
] ]
......
...@@ -32,7 +32,7 @@ ...@@ -32,7 +32,7 @@
"vertexId":"db_3.table_2.count", "vertexId":"db_3.table_2.count",
"metadata": { "metadata": {
"tableName": "db_3.table_2", "tableName": "db_3.table_2",
"createTime": 1554750072 "tableCreateTime": 1554750072
} }
}, },
{ {
...@@ -41,7 +41,7 @@ ...@@ -41,7 +41,7 @@
"vertexId":"db_3.table_1.count", "vertexId":"db_3.table_1.count",
"metadata": { "metadata": {
"tableName": "db_3.table_1", "tableName": "db_3.table_1",
"createTime": 1554750070 "tableCreateTime": 1554750070
} }
}, },
{ {
...@@ -50,7 +50,7 @@ ...@@ -50,7 +50,7 @@
"vertexId":"db_3.table_2.id", "vertexId":"db_3.table_2.id",
"metadata": { "metadata": {
"tableName": "db_3.table_2", "tableName": "db_3.table_2",
"createTime": 1554750072 "tableCreateTime": 1554750072
} }
}, },
{ {
...@@ -59,7 +59,7 @@ ...@@ -59,7 +59,7 @@
"vertexId":"db_3.table_1.id", "vertexId":"db_3.table_1.id",
"metadata": { "metadata": {
"tableName": "db_3.table_1", "tableName": "db_3.table_1",
"createTime": 1554750070 "tableCreateTime": 1554750070
} }
} }
] ]
......
...@@ -32,7 +32,7 @@ ...@@ -32,7 +32,7 @@
"vertexId":"db_1.view_1.count", "vertexId":"db_1.view_1.count",
"metadata": { "metadata": {
"tableName": "db_1.view_1", "tableName": "db_1.view_1",
"createTime": 1554750072 "tableCreateTime": 1554750072
} }
}, },
{ {
...@@ -41,7 +41,7 @@ ...@@ -41,7 +41,7 @@
"vertexId":"db_1.table_1.count", "vertexId":"db_1.table_1.count",
"metadata": { "metadata": {
"tableName": "db_1.table_1", "tableName": "db_1.table_1",
"createTime": 1554750070 "tableCreateTime": 1554750070
} }
}, },
{ {
...@@ -50,7 +50,7 @@ ...@@ -50,7 +50,7 @@
"vertexId":"db_1.view_1.id", "vertexId":"db_1.view_1.id",
"metadata": { "metadata": {
"tableName": "db_1.view_1", "tableName": "db_1.view_1",
"createTime": 1554750072 "tableCreateTime": 1554750072
} }
}, },
{ {
...@@ -59,7 +59,7 @@ ...@@ -59,7 +59,7 @@
"vertexId":"db_1.table_1.id", "vertexId":"db_1.table_1.id",
"metadata": { "metadata": {
"tableName": "db_1.table_1", "tableName": "db_1.table_1",
"createTime": 1554750070 "tableCreateTime": 1554750070
} }
} }
] ]
......
...@@ -40,7 +40,7 @@ ...@@ -40,7 +40,7 @@
"vertexId":"db_5.table_2.count", "vertexId":"db_5.table_2.count",
"metadata": { "metadata": {
"tableName": "db_5.table_2", "tableName": "db_5.table_2",
"createTime": 1554750072 "tableCreateTime": 1554750072
} }
}, },
{ {
...@@ -49,7 +49,7 @@ ...@@ -49,7 +49,7 @@
"vertexId":"db_5.table_1.count", "vertexId":"db_5.table_1.count",
"metadata": { "metadata": {
"tableName": "db_5.table_1", "tableName": "db_5.table_1",
"createTime": 1554750070 "tableCreateTime": 1554750070
} }
}, },
{ {
...@@ -58,7 +58,7 @@ ...@@ -58,7 +58,7 @@
"vertexId":"db_5.table_2.id", "vertexId":"db_5.table_2.id",
"metadata": { "metadata": {
"tableName": "db_5.table_2", "tableName": "db_5.table_2",
"createTime": 1554750072 "tableCreateTime": 1554750072
} }
}, },
{ {
...@@ -67,7 +67,7 @@ ...@@ -67,7 +67,7 @@
"vertexId":"db_5.table_1.id", "vertexId":"db_5.table_1.id",
"metadata": { "metadata": {
"tableName": "db_5.table_1", "tableName": "db_5.table_1",
"createTime": 1554750070 "tableCreateTime": 1554750070
} }
}, },
{ {
...@@ -76,7 +76,7 @@ ...@@ -76,7 +76,7 @@
"vertexId":"db_5.table_2.int_col", "vertexId":"db_5.table_2.int_col",
"metadata": { "metadata": {
"tableName": "db_5.table_2", "tableName": "db_5.table_2",
"createTime": 1554750072 "tableCreateTime": 1554750072
} }
} }
] ]
......
...@@ -20,36 +20,37 @@ package org.apache.impala.hooks; ...@@ -20,36 +20,37 @@ package org.apache.impala.hooks;
import java.util.Objects; import java.util.Objects;
/** /**
* {@link PostQueryHookContext} encapsulates immutable information sent from the * {@link QueryCompleteContext} encapsulates immutable information sent from the
* BE to a post-query hook. * BE to a post-query hook.
*/ */
public class PostQueryHookContext { public class QueryCompleteContext {
private final String lineageGraph; private final String lineageGraph_;
public PostQueryHookContext(String lineageGraph) { public QueryCompleteContext(String lineageGraph) {
this.lineageGraph = Objects.requireNonNull(lineageGraph); lineageGraph_ = Objects.requireNonNull(lineageGraph);
} }
/** /**
* Returns the lineage graph sent from the backend during * Returns the lineage graph sent from the backend during
* {@link QueryExecHook#postQueryExecute(PostQueryHookContext)}. This graph * {@link QueryEventHook#onQueryComplete(QueryCompleteContext)}. This graph
* object will generally contain more information than it did when it was * object will generally contain more information than it did when it was
* first constructed in the frontend, because the backend will have filled * first constructed in the frontend, because the backend will have filled
* in additional information. * in additional information.
* <p> * <p>
* The returned object is serilized json string of the graph sent from the backend. * The returned object is a JSON representation of the lineage graph object
* for the query. The details of the JSON translation are not provided here
* as this is meant to be a temporary feature, and the String format will
* be changed to something more strongly-typed in the future.
* </p> * </p>
* *
* @return lineage graph from the query that executed * @return lineage graph from the query that executed
*/ */
public String getLineageGraph() { public String getLineageGraph() { return lineageGraph_; }
return lineageGraph;
}
@Override @Override
public String toString() { public String toString() {
return "PostQueryHookContext{" + return "QueryCompleteContext{" +
"lineageGraph='" + lineageGraph + '\'' + "lineageGraph='" + lineageGraph_ + '\'' +
'}'; '}';
} }
} }
...@@ -17,13 +17,11 @@ ...@@ -17,13 +17,11 @@
*/ */
package org.apache.impala.hooks; package org.apache.impala.hooks;
/** /**
* {@link QueryExecHook} is the interface for implementations that * {@link QueryEventHook} is the interface for implementations that
* can hook into supported places in Impala query execution. * can hook into supported events in Impala query execution.
*/ */
public interface QueryExecHook { public interface QueryEventHook {
/** /**
* Hook method invoked when the Impala daemon starts up. * Hook method invoked when the Impala daemon starts up.
* <p> * <p>
...@@ -38,7 +36,7 @@ public interface QueryExecHook { ...@@ -38,7 +36,7 @@ public interface QueryExecHook {
* throw them. * throw them.
* </p> * </p>
*/ */
void impalaStartup(); void onImpalaStartup();
/** /**
* Hook method invoked asynchronously when a (qualifying) Impala query * Hook method invoked asynchronously when a (qualifying) Impala query
...@@ -49,12 +47,70 @@ public interface QueryExecHook { ...@@ -49,12 +47,70 @@ public interface QueryExecHook {
* </p> * </p>
* <h3>Error-Handling</h3> * <h3>Error-Handling</h3>
* <p> * <p>
* Any {@link Exception} thrown from this method will only be caught * Any {@link Throwable} thrown from this method will only be caught
* and logged and will not affect the result of any query. * and logged and will not affect the result of any query. Hook implementations
* should make a best-effort to handle their own exceptions.
* </p>
* <h3>Important:</h3>
* <p>
* This hook is actually invoked when the query is <i>unregistered</i>,
* which may happen a long time after the query has executed.
* e.g. the following sequence is possible:
* <ol>
* <li>User executes query from Hue.
* <li>User goes home for weekend, leaving Hue tab open in browser
* <li>If we're lucky, the session timeout expires after some amount of idle time.
* <li>The query gets unregistered, lineage record gets logged
* </ol>
* </p>
* <h3>Service Guarantees</h3>
*
* Impala makes the following guarantees about how this method is executed
* with respect to other implementations that may be registered:
*
* <h4>Hooks are executed asynchronously</h4>
*
* All hook execution happens asynchronously of the query that triggered
* them. Hooks may still be executing after the query response has returned
* to the caller. Additionally, hooks may execute concurrently if the
* hook executor thread size is configured appropriately.
*
* <h4>Hook Invocation is in Configuration Order</h4>
*
* The <i>submission</i> of the hook execution tasks occurs in the order
* that the hooks were defined in configuration. This generally means that
* hooks will <i>start</i> executing in order, but there are no guarantees
* about finishing order.
* <p>
* For example, if configured with {@code query_event_hook_classes=hook1,hook2,hook3},
* then hook1 will start before hook2, and hook2 will start before hook3.
* If you need to guarantee that hook1 <i>completes</i> before hook2 starts, then
* you should specify {@code query_event_hook_nthreads=1} for serial hook
* execution.
* </p> * </p>
* *
* <h4>Hook Execution Blocks</h4>
*
* A hook will block the thread it executes on until it completes. If a hook hangs,
* then the thread also hangs. Impala (currently) will not check for hanging hooks to
* take any action. This means that if you have {@code query_event_hook_nthreads}
* less than the number of hooks, then 1 hook may effectively block others from
* executing.
*
* <h4>Hook Exceptions are non-fatal</h4>
*
* Any exception thrown from this hook method will be logged and ignored. Therefore,
* an exception in 1 hook will not affect another hook (when no shared resources are
* involved).
*
* <h4>Hook Execution may end abruptly at Impala shutdown</h4>
*
* If a hook is still executing when Impala is shutdown, there are no guarantees
* that it will complete execution before being killed.
*
*
* @param context object containing the post execution context * @param context object containing the post execution context
* of the query * of the query
*/ */
void postQueryExecute(PostQueryHookContext context); void onQueryComplete(QueryCompleteContext context);
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment