Commit fdafb359 by Shwetha GS

ATLAS-179 Atlas hook causes mem leak and hive server 2 crashes (shwethags)

parent 04d1c132
...@@ -75,6 +75,7 @@ public class HiveHook implements ExecuteWithHookContext { ...@@ -75,6 +75,7 @@ public class HiveHook implements ExecuteWithHookContext {
private static final String MAX_THREADS = CONF_PREFIX + "maxThreads"; private static final String MAX_THREADS = CONF_PREFIX + "maxThreads";
private static final String KEEP_ALIVE_TIME = CONF_PREFIX + "keepAliveTime"; private static final String KEEP_ALIVE_TIME = CONF_PREFIX + "keepAliveTime";
public static final String CONF_SYNC = CONF_PREFIX + "synchronous"; public static final String CONF_SYNC = CONF_PREFIX + "synchronous";
public static final String QUEUE_SIZE = CONF_PREFIX + "queueSize";
public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
...@@ -86,27 +87,30 @@ public class HiveHook implements ExecuteWithHookContext { ...@@ -86,27 +87,30 @@ public class HiveHook implements ExecuteWithHookContext {
private static final int minThreadsDefault = 5; private static final int minThreadsDefault = 5;
private static final int maxThreadsDefault = 5; private static final int maxThreadsDefault = 5;
private static final long keepAliveTimeDefault = 10; private static final long keepAliveTimeDefault = 10;
private static final int queueSizeDefault = 10000;
private static boolean typesRegistered = false; private static boolean typesRegistered = false;
private static Configuration atlasProperties; private static Configuration atlasProperties;
class HiveEvent { class HiveEvent {
public HiveConf conf;
public Set<ReadEntity> inputs; public Set<ReadEntity> inputs;
public Set<WriteEntity> outputs; public Set<WriteEntity> outputs;
public String user; public String user;
public UserGroupInformation ugi; public UserGroupInformation ugi;
public HiveOperation operation; public HiveOperation operation;
public QueryPlan queryPlan;
public HookContext.HookType hookType; public HookContext.HookType hookType;
public JSONObject jsonPlan; public JSONObject jsonPlan;
public String queryId;
public String queryStr;
public Long queryStartTime;
} }
@Inject @Inject
private static NotificationInterface notifInterface; private static NotificationInterface notifInterface;
private static final HiveConf hiveConf;
static { static {
try { try {
atlasProperties = ApplicationProperties.get(ApplicationProperties.CLIENT_PROPERTIES); atlasProperties = ApplicationProperties.get(ApplicationProperties.CLIENT_PROPERTIES);
...@@ -116,9 +120,10 @@ public class HiveHook implements ExecuteWithHookContext { ...@@ -116,9 +120,10 @@ public class HiveHook implements ExecuteWithHookContext {
int minThreads = atlasProperties.getInt(MIN_THREADS, minThreadsDefault); int minThreads = atlasProperties.getInt(MIN_THREADS, minThreadsDefault);
int maxThreads = atlasProperties.getInt(MAX_THREADS, maxThreadsDefault); int maxThreads = atlasProperties.getInt(MAX_THREADS, maxThreadsDefault);
long keepAliveTime = atlasProperties.getLong(KEEP_ALIVE_TIME, keepAliveTimeDefault); long keepAliveTime = atlasProperties.getLong(KEEP_ALIVE_TIME, keepAliveTimeDefault);
int queueSize = atlasProperties.getInt(QUEUE_SIZE, queueSizeDefault);
executor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTime, TimeUnit.MILLISECONDS, executor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTime, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), new LinkedBlockingQueue<Runnable>(queueSize),
new ThreadFactoryBuilder().setNameFormat("Atlas Logger %d").build()); new ThreadFactoryBuilder().setNameFormat("Atlas Logger %d").build());
Runtime.getRuntime().addShutdownHook(new Thread() { Runtime.getRuntime().addShutdownHook(new Thread() {
...@@ -141,6 +146,8 @@ public class HiveHook implements ExecuteWithHookContext { ...@@ -141,6 +146,8 @@ public class HiveHook implements ExecuteWithHookContext {
Injector injector = Guice.createInjector(new NotificationModule()); Injector injector = Guice.createInjector(new NotificationModule());
notifInterface = injector.getInstance(NotificationInterface.class); notifInterface = injector.getInstance(NotificationInterface.class);
hiveConf = new HiveConf();
LOG.info("Created Atlas Hook"); LOG.info("Created Atlas Hook");
} }
...@@ -150,17 +157,18 @@ public class HiveHook implements ExecuteWithHookContext { ...@@ -150,17 +157,18 @@ public class HiveHook implements ExecuteWithHookContext {
final HiveEvent event = new HiveEvent(); final HiveEvent event = new HiveEvent();
final HiveConf conf = new HiveConf(hookContext.getConf()); final HiveConf conf = new HiveConf(hookContext.getConf());
event.conf = conf;
event.inputs = hookContext.getInputs(); event.inputs = hookContext.getInputs();
event.outputs = hookContext.getOutputs(); event.outputs = hookContext.getOutputs();
event.user = hookContext.getUserName() == null ? hookContext.getUgi().getUserName() : hookContext.getUserName(); event.user = hookContext.getUserName() == null ? hookContext.getUgi().getUserName() : hookContext.getUserName();
event.ugi = hookContext.getUgi(); event.ugi = hookContext.getUgi();
event.operation = HiveOperation.valueOf(hookContext.getOperationName()); event.operation = HiveOperation.valueOf(hookContext.getOperationName());
event.queryPlan = hookContext.getQueryPlan();
event.hookType = hookContext.getHookType(); event.hookType = hookContext.getHookType();
event.queryId = hookContext.getQueryPlan().getQueryId();
event.queryStr = hookContext.getQueryPlan().getQueryStr();
event.queryStartTime = hookContext.getQueryPlan().getQueryStartTime();
event.jsonPlan = getQueryPlan(event); event.jsonPlan = getQueryPlan(hookContext.getConf(), hookContext.getQueryPlan());
boolean sync = conf.get(CONF_SYNC, "false").equals("true"); boolean sync = conf.get(CONF_SYNC, "false").equals("true");
if (sync) { if (sync) {
...@@ -183,7 +191,8 @@ public class HiveHook implements ExecuteWithHookContext { ...@@ -183,7 +191,8 @@ public class HiveHook implements ExecuteWithHookContext {
assert event.hookType == HookContext.HookType.POST_EXEC_HOOK : "Non-POST_EXEC_HOOK not supported!"; assert event.hookType == HookContext.HookType.POST_EXEC_HOOK : "Non-POST_EXEC_HOOK not supported!";
LOG.info("Entered Atlas hook for hook type {} operation {}", event.hookType, event.operation); LOG.info("Entered Atlas hook for hook type {} operation {}", event.hookType, event.operation);
HiveMetaStoreBridge dgiBridge = new HiveMetaStoreBridge(event.conf, atlasProperties, event.user, event.ugi);
HiveMetaStoreBridge dgiBridge = new HiveMetaStoreBridge(hiveConf, atlasProperties, event.user, event.ugi);
if (!typesRegistered) { if (!typesRegistered) {
dgiBridge.registerHiveDataModel(); dgiBridge.registerHiveDataModel();
...@@ -245,7 +254,7 @@ public class HiveHook implements ExecuteWithHookContext { ...@@ -245,7 +254,7 @@ public class HiveHook implements ExecuteWithHookContext {
} }
} }
if (newTable == null) { if (newTable == null) {
LOG.warn("Failed to deduct new name for " + event.queryPlan.getQueryStr()); LOG.warn("Failed to deduct new name for " + event.queryStr);
return; return;
} }
} }
...@@ -353,20 +362,18 @@ public class HiveHook implements ExecuteWithHookContext { ...@@ -353,20 +362,18 @@ public class HiveHook implements ExecuteWithHookContext {
LOG.info("Explain statement. Skipping..."); LOG.info("Explain statement. Skipping...");
} }
if (event.queryPlan == null) { if (event.queryId == null) {
LOG.info("Query plan is missing. Skipping..."); LOG.info("Query plan is missing. Skipping...");
} }
String queryId = event.queryPlan.getQueryId(); String queryStr = normalize(event.queryStr);
String queryStr = normalize(event.queryPlan.getQueryStr());
long queryStartTime = event.queryPlan.getQueryStartTime();
LOG.debug("Registering query: {}", queryStr); LOG.debug("Registering query: {}", queryStr);
List<Referenceable> entities = new ArrayList<>(); List<Referenceable> entities = new ArrayList<>();
Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName()); Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
processReferenceable.set("name", queryStr); processReferenceable.set("name", queryStr);
processReferenceable.set("operationType", event.operation.getOperationName()); processReferenceable.set("operationType", event.operation.getOperationName());
processReferenceable.set("startTime", queryStartTime); processReferenceable.set("startTime", event.queryStartTime);
processReferenceable.set("userName", event.user); processReferenceable.set("userName", event.user);
List<Referenceable> source = new ArrayList<>(); List<Referenceable> source = new ArrayList<>();
...@@ -389,7 +396,7 @@ public class HiveHook implements ExecuteWithHookContext { ...@@ -389,7 +396,7 @@ public class HiveHook implements ExecuteWithHookContext {
} }
processReferenceable.set("outputs", target); processReferenceable.set("outputs", target);
processReferenceable.set("queryText", queryStr); processReferenceable.set("queryText", queryStr);
processReferenceable.set("queryId", queryId); processReferenceable.set("queryId", event.queryId);
processReferenceable.set("queryPlan", event.jsonPlan.toString()); processReferenceable.set("queryPlan", event.jsonPlan.toString());
processReferenceable.set("endTime", System.currentTimeMillis()); processReferenceable.set("endTime", System.currentTimeMillis());
...@@ -400,12 +407,12 @@ public class HiveHook implements ExecuteWithHookContext { ...@@ -400,12 +407,12 @@ public class HiveHook implements ExecuteWithHookContext {
} }
private JSONObject getQueryPlan(HiveEvent event) throws Exception { private JSONObject getQueryPlan(HiveConf hiveConf, QueryPlan queryPlan) throws Exception {
try { try {
ExplainTask explain = new ExplainTask(); ExplainTask explain = new ExplainTask();
explain.initialize(event.conf, event.queryPlan, null); explain.initialize(hiveConf, queryPlan, null);
List<Task<?>> rootTasks = event.queryPlan.getRootTasks(); List<Task<?>> rootTasks = queryPlan.getRootTasks();
return explain.getJSONPlan(null, null, rootTasks, event.queryPlan.getFetchTask(), true, false, false); return explain.getJSONPlan(null, null, rootTasks, queryPlan.getFetchTask(), true, false, false);
} catch (Exception e) { } catch (Exception e) {
LOG.info("Failed to get queryplan", e); LOG.info("Failed to get queryplan", e);
return new JSONObject(); return new JSONObject();
......
...@@ -54,7 +54,7 @@ public class HiveHookIT { ...@@ -54,7 +54,7 @@ public class HiveHookIT {
@BeforeClass @BeforeClass
public void setUp() throws Exception { public void setUp() throws Exception {
//Set-up hive session //Set-up hive session
HiveConf conf = createHiveConf(); HiveConf conf = new HiveConf();
driver = new Driver(conf); driver = new Driver(conf);
ss = new SessionState(conf, System.getProperty("user.name")); ss = new SessionState(conf, System.getProperty("user.name"));
ss = SessionState.start(ss); ss = SessionState.start(ss);
...@@ -63,24 +63,6 @@ public class HiveHookIT { ...@@ -63,24 +63,6 @@ public class HiveHookIT {
dgiCLient = new AtlasClient(DGI_URL); dgiCLient = new AtlasClient(DGI_URL);
} }
public static HiveConf createHiveConf() {
return createHiveConf(DGI_URL);
}
public static HiveConf createHiveConf(String atlasEndpoint) {
HiveConf hiveConf = new HiveConf(HiveHookIT.class);
hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, "");
hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, HiveHook.class.getName());
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, System.getProperty("user.dir") + "/target/metastore");
hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "jdbc:derby:./target/metastore_db;create=true");
hiveConf.set(HiveMetaStoreBridge.HIVE_CLUSTER_NAME, CLUSTER_NAME);
hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODE, true); //to not use hdfs
hiveConf.setVar(HiveConf.ConfVars.HIVETESTMODEPREFIX, "");
hiveConf.set("fs.pfile.impl", "org.apache.hadoop.fs.ProxyLocalFileSystem");
return hiveConf;
}
private void runCommand(String cmd) throws Exception { private void runCommand(String cmd) throws Exception {
ss.setCommandType(null); ss.setCommandType(null);
driver.run(cmd); driver.run(cmd);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
drop table if exists t;
create table t(a int, b string);
drop table if exists t2;
create table t2 as select * from t;
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<property>
<name>hive.exec.post.hooks</name>
<value>org.apache.atlas.hive.hook.HiveHook</value>
</property>
<property>
<name>hive.support.concurrency</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>${user.dir}/target/metastore</value>
</property>
<property>
<name>atlas.rest.address</name>
<value>http://localhost:21000/</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:derby:${user.dir}/target/metastore_db;create=true</value>
</property>
<property>
<name>atlas.hook.hive.synchronous</name>
<value>true</value>
</property>
<property>
<name>atlas.cluster.name</name>
<value>test</value>
</property>
<property>
<name>fs.pfile.impl</name>
<value>org.apache.hadoop.fs.ProxyLocalFileSystem</value>
</property>
</configuration>
\ No newline at end of file
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
<version>0.6-incubating-SNAPSHOT</version> <version>0.6-incubating-SNAPSHOT</version>
</parent> </parent>
<artifactId>dashboard</artifactId> <artifactId>atlas-dashboard</artifactId>
<description>Apache Atlas UI Application</description> <description>Apache Atlas UI Application</description>
<name>Apache Atlas UI</name> <name>Apache Atlas UI</name>
<packaging>war</packaging> <packaging>war</packaging>
......
...@@ -29,52 +29,53 @@ hive_process - attribute qualifiedName - queryText ...@@ -29,52 +29,53 @@ hive_process - attribute qualifiedName - queryText
---++ Importing Hive Metadata ---++ Importing Hive Metadata
org.apache.atlas.hive.bridge.HiveMetaStoreBridge imports the hive metadata into Atlas using the model defined in org.apache.atlas.hive.model.HiveDataModelGenerator. import-hive.sh command can be used to facilitate this. org.apache.atlas.hive.bridge.HiveMetaStoreBridge imports the hive metadata into Atlas using the model defined in org.apache.atlas.hive.model.HiveDataModelGenerator. import-hive.sh command can be used to facilitate this.
Set-up the following configs in hive-site.xml of your hive set-up and set environment variable HIVE_CONF_DIR to the hive conf directory: Set-up the following configs in <atlas-conf>/client.properties and set environment variable HIVE_CONFIG to the hive conf directory:
* Atlas endpoint - Add the following property with the Atlas endpoint for your set-up <verbatim>
<verbatim> <property>
<property>
<name>atlas.rest.address</name> <name>atlas.rest.address</name>
<value>http://localhost:21000/</value> <value>http://localhost:21000/</value>
</property> </property>
<property> <property>
<name>atlas.cluster.name</name> <name>atlas.cluster.name</name>
<value>primary</value> <value>primary</value>
</property> </property>
</verbatim> </verbatim>
Usage: <atlas package>/bin/hive/import-hive.sh. The logs are in <atlas package>/logs/import-hive.log Usage: <atlas package>/bin/import-hive.sh. The logs are in <atlas package>/logs/import-hive.log
---++ Hive Hook ---++ Hive Hook
Hive supports listeners on hive command execution using hive hooks. This is used to add/update/remove entities in Atlas using the model defined in org.apache.atlas.hive.model.HiveDataModelGenerator. Hive supports listeners on hive command execution using hive hooks. This is used to add/update/remove entities in Atlas using the model defined in org.apache.atlas.hive.model.HiveDataModelGenerator.
The hook submits the request to a thread pool executor to avoid blocking the command execution. The thread submits the entities as message to the notification server and atlas server reads these messages and registers the entities. The hook submits the request to a thread pool executor to avoid blocking the command execution. The thread submits the entities as message to the notification server and atlas server reads these messages and registers the entities.
Follow these instructions in your hive set-up to add hive hook for Atlas: Follow these instructions in your hive set-up to add hive hook for Atlas:
* Set-up atlas hook and atlas endpoint in hive-site.xml: * Set-up atlas hook in hive-site.xml of your hive configuration:
<verbatim> <verbatim>
<property> <property>
<name>hive.exec.post.hooks</name> <name>hive.exec.post.hooks</name>
<value>org.apache.atlas.hive.hook.HiveHook</value> <value>org.apache.atlas.hive.hook.HiveHook</value>
</property> </property>
</verbatim> </verbatim>
<verbatim> * Add 'export HIVE_AUX_JARS_PATH=<atlas package>/hook/hive' in hive-env.sh of your hive configuration
<property> * Set-up the following configs in <atlas-conf>/client.properties
<verbatim>
<property>
<name>atlas.rest.address</name> <name>atlas.rest.address</name>
<value>http://localhost:21000/</value> <value>http://localhost:21000/</value>
</property> </property>
<property> <property>
<name>atlas.cluster.name</name> <name>atlas.cluster.name</name>
<value>primary</value> <value>primary</value>
</property> </property>
</verbatim> </verbatim>
* Add 'export HIVE_AUX_JARS_PATH=<atlas package>/hook/hive' in hive-env.sh * Copy <atlas-conf>/client.properties and <atlas-conf>/application.properties to the hive conf directory.
* Copy <atlas package>/conf/application.properties to hive conf directory <hive package>/conf
The following properties in hive-site.xml control the thread pool and notification details: The following properties in <atlas-conf>/client.properties control the thread pool and notification details:
* atlas.hook.hive.synchronous - boolean, true to run the hook synchronously. default false * atlas.hook.hive.synchronous - boolean, true to run the hook synchronously. default false
* atlas.hook.hive.numRetries - number of retries for notification failure. default 3 * atlas.hook.hive.numRetries - number of retries for notification failure. default 3
* atlas.hook.hive.minThreads - core number of threads. default 5 * atlas.hook.hive.minThreads - core number of threads. default 5
* atlas.hook.hive.maxThreads - maximum number of threads. default 5 * atlas.hook.hive.maxThreads - maximum number of threads. default 5
* atlas.hook.hive.keepAliveTime - keep alive time in msecs. default 10 * atlas.hook.hive.keepAliveTime - keep alive time in msecs. default 10
* atlas.hook.hive.queueSize - queue size for the threadpool. default 10000
Refer [[Configuration][Configuration]] for notification related configurations Refer [[Configuration][Configuration]] for notification related configurations
......
...@@ -9,6 +9,7 @@ ATLAS-54 Rename configs in hive hook (shwethags) ...@@ -9,6 +9,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags) ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags)
ALL CHANGES: ALL CHANGES:
ATLAS-179 Atlas hook causes mem leak and hive server 2 crashes (shwethags)
ATLAS-212 Remove test class usage of hive configuration property "atlas.rest.address" (jspeidel via shwethags) ATLAS-212 Remove test class usage of hive configuration property "atlas.rest.address" (jspeidel via shwethags)
ATLAS-159 UI generated files should be target (sanjayp via sumasai) ATLAS-159 UI generated files should be target (sanjayp via sumasai)
ATLAS-188 Provide Ability to Add Tag to Entity (sanjayp via sumasai) ATLAS-188 Provide Ability to Add Tag to Entity (sanjayp via sumasai)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment