Commit ece78ba0 by Madhan Neethiraj

ATLAS-2872: updated HiveHook to purge name cache periodically

(cherry picked from commit 110db1a40b84b9a5ad2590233ee3f03eeae2ec78) (cherry picked from commit 52719937ad971b9ae25148a0e57a8ec3e94dc146)
parent a064e092
......@@ -19,10 +19,10 @@
package org.apache.atlas.hive.hook;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.hive.hook.HiveHook.HiveHookObjectNamesCache;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.Hive;
......@@ -46,12 +46,14 @@ public class AtlasHiveHookContext {
private final HookContext hiveContext;
private final Hive hive;
private final Map<String, AtlasEntity> qNameEntityMap = new HashMap<>();
private final HiveHookObjectNamesCache knownObjects;
public AtlasHiveHookContext(HiveHook hook, HiveOperation hiveOperation, HookContext hiveContext) throws Exception {
public AtlasHiveHookContext(HiveHook hook, HiveOperation hiveOperation, HookContext hiveContext, HiveHookObjectNamesCache knownObjects) throws Exception {
this.hook = hook;
this.hiveOperation = hiveOperation;
this.hiveContext = hiveContext;
this.hive = Hive.get(hiveContext.getConf());
this.knownObjects = knownObjects;
init();
}
......@@ -102,40 +104,47 @@ public class AtlasHiveHookContext {
}
public boolean isKnownDatabase(String dbQualifiedName) {
return hook.isKnownDatabase(dbQualifiedName);
return knownObjects != null && dbQualifiedName != null ? knownObjects.isKnownDatabase(dbQualifiedName) : false;
}
public boolean isKnownTable(String tblQualifiedName) {
return hook.isKnownTable(tblQualifiedName);
return knownObjects != null && tblQualifiedName != null ? knownObjects.isKnownTable(tblQualifiedName) : false;
}
public void addToKnownEntities(Collection<AtlasEntity> entities) {
hook.addToKnownEntities(entities);
if (knownObjects != null && entities != null) {
knownObjects.addToKnownEntities(entities);
}
}
public void removeFromKnownDatabase(String dbQualifiedName) {
hook.removeFromKnownDatabase(dbQualifiedName);
if (knownObjects != null && dbQualifiedName != null) {
knownObjects.removeFromKnownDatabase(dbQualifiedName);
}
}
public void removeFromKnownTable(String tblQualifiedName) {
hook.removeFromKnownTable(tblQualifiedName);
if (knownObjects != null && tblQualifiedName != null) {
knownObjects.removeFromKnownTable(tblQualifiedName);
}
}
private void init() {
// for create and alter operations, remove output entities from 'known' entity cache
String operationName = hiveContext.getOperationName();
if (operationName != null && operationName.startsWith("CREATE") || operationName.startsWith("ALTER")) {
if (CollectionUtils.isNotEmpty(hiveContext.getOutputs())) {
for (WriteEntity output : hiveContext.getOutputs()) {
switch (output.getType()) {
case DATABASE:
hook.removeFromKnownDatabase(getQualifiedName(output.getDatabase()));
break;
case TABLE:
hook.removeFromKnownTable(getQualifiedName(output.getTable()));
break;
if (knownObjects != null) {
String operationName = hiveContext.getOperationName();
if (operationName != null && operationName.startsWith("CREATE") || operationName.startsWith("ALTER")) {
if (CollectionUtils.isNotEmpty(hiveContext.getOutputs())) {
for (WriteEntity output : hiveContext.getOutputs()) {
switch (output.getType()) {
case DATABASE:
knownObjects.removeFromKnownDatabase(getQualifiedName(output.getDatabase()));
break;
case TABLE:
knownObjects.removeFromKnownTable(getQualifiedName(output.getTable()));
break;
}
}
}
}
......
......@@ -21,7 +21,6 @@ package org.apache.atlas.hive.hook;
import org.apache.atlas.hive.hook.events.*;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.utils.LruCache;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.HookContext;
......@@ -34,7 +33,9 @@ import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.apache.atlas.hive.hook.events.BaseHiveEvent.ATTRIBUTE_QUALIFIED_NAME;
import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_DB;
......@@ -44,31 +45,40 @@ import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_TABLE;
public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class);
public static final String CONF_PREFIX = "atlas.hook.hive.";
public static final String HOOK_DATABASE_NAME_CACHE_COUNT = CONF_PREFIX + "database.name.cache.count";
public static final String HOOK_TABLE_NAME_CACHE_COUNT = CONF_PREFIX + "table.name.cache.count";
public static final String CONF_CLUSTER_NAME = "atlas.cluster.name";
public static final String CONF_PREFIX = "atlas.hook.hive.";
public static final String CONF_CLUSTER_NAME = "atlas.cluster.name";
public static final String HOOK_NAME_CACHE_ENABLED = CONF_PREFIX + "name.cache.enabled";
public static final String HOOK_NAME_CACHE_DATABASE_COUNT = CONF_PREFIX + "name.cache.database.count";
public static final String HOOK_NAME_CACHE_TABLE_COUNT = CONF_PREFIX + "name.cache.table.count";
public static final String HOOK_NAME_CACHE_REBUID_INTERVAL_SEC = CONF_PREFIX + "name.cache.rebuild.interval.seconds";
public static final String DEFAULT_CLUSTER_NAME = "primary";
private static final Map<String, HiveOperation> OPERATION_MAP = new HashMap<>();
private static final String clusterName;
private static final Map<String, Long> knownDatabases;
private static final Map<String, Long> knownTables;
private static final String clusterName;
private static final boolean nameCacheEnabled;
private static final int nameCacheDatabaseMaxCount;
private static final int nameCacheTableMaxCount;
private static final int nameCacheRebuildIntervalSeconds;
private static HiveHookObjectNamesCache knownObjects = null;
static {
for (HiveOperation hiveOperation : HiveOperation.values()) {
OPERATION_MAP.put(hiveOperation.getOperationName(), hiveOperation);
}
int dbNameCacheCount = atlasProperties.getInt(HOOK_DATABASE_NAME_CACHE_COUNT, 10000);
int tblNameCacheCount = atlasProperties.getInt(HOOK_TABLE_NAME_CACHE_COUNT, 10000);
clusterName = atlasProperties.getString(CONF_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
nameCacheEnabled = atlasProperties.getBoolean(HOOK_NAME_CACHE_ENABLED, true);
nameCacheDatabaseMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_DATABASE_COUNT, 10000);
nameCacheTableMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_TABLE_COUNT, 10000);
nameCacheRebuildIntervalSeconds = atlasProperties.getInt(HOOK_NAME_CACHE_REBUID_INTERVAL_SEC, 60 * 60); // 60 minutes default
clusterName = atlasProperties.getString(CONF_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
knownDatabases = dbNameCacheCount > 0 ? Collections.synchronizedMap(new LruCache<String, Long>(dbNameCacheCount, 0)) : null;
knownTables = tblNameCacheCount > 0 ? Collections.synchronizedMap(new LruCache<String, Long>(tblNameCacheCount, 0)) : null;
knownObjects = nameCacheEnabled ? new HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, nameCacheRebuildIntervalSeconds) : null;
}
public HiveHook() {
}
......@@ -78,9 +88,15 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
LOG.debug("==> HiveHook.run({})", hookContext.getOperationName());
}
if (knownObjects != null && knownObjects.isCacheExpired()) {
LOG.info("HiveHook.run(): purging cached databaseNames ({}) and tableNames ({})", knownObjects.getCachedDbCount(), knownObjects.getCachedTableCount());
knownObjects = new HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, nameCacheRebuildIntervalSeconds);
}
try {
HiveOperation oper = OPERATION_MAP.get(hookContext.getOperationName());
AtlasHiveHookContext context = new AtlasHiveHookContext(this, oper, hookContext);
AtlasHiveHookContext context = new AtlasHiveHookContext(this, oper, hookContext, knownObjects);
BaseHiveEvent event = null;
......@@ -166,49 +182,72 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return clusterName;
}
public boolean isKnownDatabase(String dbQualifiedName) {
return knownDatabases != null && dbQualifiedName != null ? knownDatabases.containsKey(dbQualifiedName) : false;
}
public boolean isKnownTable(String tblQualifiedName) {
return knownTables != null && tblQualifiedName != null ? knownTables.containsKey(tblQualifiedName) : false;
}
public static class HiveHookObjectNamesCache {
private final int dbMaxCacheCount;
private final int tblMaxCacheCount;
private final long cacheExpiryTimeMs;
private final Set<String> knownDatabases;
private final Set<String> knownTables;
public void addToKnownEntities(Collection<AtlasEntity> entities) {
if (knownDatabases != null || knownTables != null) { // caching should be enabled at least for one
if (entities != null) {
for (AtlasEntity entity : entities) {
if (StringUtils.equalsIgnoreCase(entity.getTypeName(), HIVE_TYPE_DB)) {
addToKnownDatabase((String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
} else if (StringUtils.equalsIgnoreCase(entity.getTypeName(), HIVE_TYPE_TABLE)) {
addToKnwnTable((String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
public HiveHookObjectNamesCache(int dbMaxCacheCount, int tblMaxCacheCount, long nameCacheRebuildIntervalSeconds) {
this.dbMaxCacheCount = dbMaxCacheCount;
this.tblMaxCacheCount = tblMaxCacheCount;
this.cacheExpiryTimeMs = nameCacheRebuildIntervalSeconds <= 0 ? Long.MAX_VALUE : (System.currentTimeMillis() + (nameCacheRebuildIntervalSeconds * 1000));
this.knownDatabases = Collections.synchronizedSet(new HashSet<>());
this.knownTables = Collections.synchronizedSet(new HashSet<>());
}
public int getCachedDbCount() {
return knownDatabases.size();
}
public int getCachedTableCount() {
return knownTables.size();
}
public boolean isCacheExpired() {
return System.currentTimeMillis() > cacheExpiryTimeMs;
}
public boolean isKnownDatabase(String dbQualifiedName) {
return knownDatabases.contains(dbQualifiedName);
}
public boolean isKnownTable(String tblQualifiedName) {
return knownTables.contains(tblQualifiedName);
}
public void addToKnownEntities(Collection<AtlasEntity> entities) {
for (AtlasEntity entity : entities) {
if (StringUtils.equalsIgnoreCase(entity.getTypeName(), HIVE_TYPE_DB)) {
addToKnownDatabase((String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
} else if (StringUtils.equalsIgnoreCase(entity.getTypeName(), HIVE_TYPE_TABLE)) {
addToKnownTable((String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
}
}
}
public void addToKnownDatabase(String dbQualifiedName) {
if (knownDatabases != null && dbQualifiedName != null) {
knownDatabases.put(dbQualifiedName, System.currentTimeMillis());
public void addToKnownDatabase(String dbQualifiedName) {
if (knownDatabases.size() < dbMaxCacheCount) {
knownDatabases.add(dbQualifiedName);
}
}
}
public void addToKnwnTable(String tblQualifiedName) {
if (knownTables != null && tblQualifiedName != null) {
knownTables.put(tblQualifiedName, System.currentTimeMillis());
public void addToKnownTable(String tblQualifiedName) {
if (knownTables.size() < tblMaxCacheCount) {
knownTables.add(tblQualifiedName);
}
}
}
public void removeFromKnownDatabase(String dbQualifiedName) {
if (knownDatabases != null && dbQualifiedName != null) {
public void removeFromKnownDatabase(String dbQualifiedName) {
knownDatabases.remove(dbQualifiedName);
}
}
public void removeFromKnownTable(String tblQualifiedName) {
if (knownTables != null && tblQualifiedName != null) {
knownTables.remove(tblQualifiedName);
public void removeFromKnownTable(String tblQualifiedName) {
if (tblQualifiedName != null) {
knownTables.remove(tblQualifiedName);
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment