Commit 65d95ebe by Suma Shivaprasad

ATLAS-834 Handle exceptions from HiveHook executor.submit() (sumasai)

parent 56c13aa3
......@@ -154,36 +154,39 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
@Override
public void run(final HookContext hookContext) throws Exception {
// clone to avoid concurrent access
final HiveConf conf = new HiveConf(hookContext.getConf());
final HiveEventContext event = new HiveEventContext();
event.setInputs(hookContext.getInputs());
event.setOutputs(hookContext.getOutputs());
event.setJsonPlan(getQueryPlan(hookContext.getConf(), hookContext.getQueryPlan()));
event.setHookType(hookContext.getHookType());
event.setUgi(hookContext.getUgi());
event.setUser(getUser(hookContext.getUserName()));
event.setOperation(OPERATION_MAP.get(hookContext.getOperationName()));
event.setQueryId(hookContext.getQueryPlan().getQueryId());
event.setQueryStr(hookContext.getQueryPlan().getQueryStr());
event.setQueryStartTime(hookContext.getQueryPlan().getQueryStartTime());
event.setQueryType(hookContext.getQueryPlan().getQueryPlan().getQueryType());
boolean sync = conf.get(CONF_SYNC, "false").equals("true");
if (sync) {
fireAndForget(event);
} else {
executor.submit(new Runnable() {
@Override
public void run() {
try {
fireAndForget(event);
} catch (Throwable e) {
LOG.info("Atlas hook failed", e);
try {
final HiveConf conf = new HiveConf(hookContext.getConf());
final HiveEventContext event = new HiveEventContext();
event.setInputs(hookContext.getInputs());
event.setOutputs(hookContext.getOutputs());
event.setJsonPlan(getQueryPlan(hookContext.getConf(), hookContext.getQueryPlan()));
event.setHookType(hookContext.getHookType());
event.setUgi(hookContext.getUgi());
event.setUser(getUser(hookContext.getUserName()));
event.setOperation(OPERATION_MAP.get(hookContext.getOperationName()));
event.setQueryId(hookContext.getQueryPlan().getQueryId());
event.setQueryStr(hookContext.getQueryPlan().getQueryStr());
event.setQueryStartTime(hookContext.getQueryPlan().getQueryStartTime());
event.setQueryType(hookContext.getQueryPlan().getQueryPlan().getQueryType());
boolean sync = conf.get(CONF_SYNC, "false").equals("true");
if (sync) {
fireAndForget(event);
} else {
executor.submit(new Runnable() {
@Override
public void run() {
try {
fireAndForget(event);
} catch (Throwable e) {
LOG.error("Atlas hook failed due to error ", e);
}
}
}
});
});
}
} catch(Throwable t) {
LOG.error("Submitting to thread pool failed due to error ", t);
}
}
......
......@@ -22,6 +22,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
ATLAS-834 Handle exceptions from HiveHook executor.submit() (sumasai)
ATLAS-635 Process showing old entity name where as actual entity is renamed ( svimal2106 via sumasai )
ATLAS-823 Atlas should use external HBase and SOLR (tbeerbower via shwethags)
ATLAS-752 Column renames should retain traits/tags (svimal2106 via shwethags)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment