diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java index a647192..b9e4256 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java @@ -19,10 +19,10 @@ package org.apache.atlas.hive.hook; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.hive.hook.HiveHook.HiveHookObjectNamesCache; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.RandomStringUtils; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.HookContext; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -46,12 +46,14 @@ public class AtlasHiveHookContext { private final HookContext hiveContext; private final Hive hive; private final Map<String, AtlasEntity> qNameEntityMap = new HashMap<>(); + private final HiveHookObjectNamesCache knownObjects; - public AtlasHiveHookContext(HiveHook hook, HiveOperation hiveOperation, HookContext hiveContext) throws Exception { + public AtlasHiveHookContext(HiveHook hook, HiveOperation hiveOperation, HookContext hiveContext, HiveHookObjectNamesCache knownObjects) throws Exception { this.hook = hook; this.hiveOperation = hiveOperation; this.hiveContext = hiveContext; this.hive = Hive.get(hiveContext.getConf()); + this.knownObjects = knownObjects; init(); } @@ -102,40 +104,47 @@ public class AtlasHiveHookContext { } public boolean isKnownDatabase(String dbQualifiedName) { - return hook.isKnownDatabase(dbQualifiedName); + return knownObjects != null && dbQualifiedName != null ? knownObjects.isKnownDatabase(dbQualifiedName) : false; } public boolean isKnownTable(String tblQualifiedName) { - return hook.isKnownTable(tblQualifiedName); + return knownObjects != null && tblQualifiedName != null ? knownObjects.isKnownTable(tblQualifiedName) : false; } public void addToKnownEntities(Collection<AtlasEntity> entities) { - hook.addToKnownEntities(entities); + if (knownObjects != null && entities != null) { + knownObjects.addToKnownEntities(entities); + } } public void removeFromKnownDatabase(String dbQualifiedName) { - hook.removeFromKnownDatabase(dbQualifiedName); + if (knownObjects != null && dbQualifiedName != null) { + knownObjects.removeFromKnownDatabase(dbQualifiedName); + } } public void removeFromKnownTable(String tblQualifiedName) { - hook.removeFromKnownTable(tblQualifiedName); + if (knownObjects != null && tblQualifiedName != null) { + knownObjects.removeFromKnownTable(tblQualifiedName); + } } private void init() { - // for create and alter operations, remove output entities from 'known' entity cache - String operationName = hiveContext.getOperationName(); - - if (operationName != null && operationName.startsWith("CREATE") || operationName.startsWith("ALTER")) { - if (CollectionUtils.isNotEmpty(hiveContext.getOutputs())) { - for (WriteEntity output : hiveContext.getOutputs()) { - switch (output.getType()) { - case DATABASE: - hook.removeFromKnownDatabase(getQualifiedName(output.getDatabase())); - break; - - case TABLE: - hook.removeFromKnownTable(getQualifiedName(output.getTable())); - break; + if (knownObjects != null) { + String operationName = hiveContext.getOperationName(); + + if (operationName != null && operationName.startsWith("CREATE") || operationName.startsWith("ALTER")) { + if (CollectionUtils.isNotEmpty(hiveContext.getOutputs())) { + for (WriteEntity output : hiveContext.getOutputs()) { + switch (output.getType()) { + case DATABASE: + knownObjects.removeFromKnownDatabase(getQualifiedName(output.getDatabase())); + break; + + case TABLE: + knownObjects.removeFromKnownTable(getQualifiedName(output.getTable())); + break; + } } } } diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index b1ffd1d..19075f6 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -21,7 +21,6 @@ package org.apache.atlas.hive.hook; import org.apache.atlas.hive.hook.events.*; import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.utils.LruCache; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; import org.apache.hadoop.hive.ql.hooks.HookContext; @@ -34,7 +33,9 @@ import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import static org.apache.atlas.hive.hook.events.BaseHiveEvent.ATTRIBUTE_QUALIFIED_NAME; import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_DB; @@ -44,31 +45,40 @@ import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_TABLE; public class HiveHook extends AtlasHook implements ExecuteWithHookContext { private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class); - public static final String CONF_PREFIX = "atlas.hook.hive."; - public static final String HOOK_DATABASE_NAME_CACHE_COUNT = CONF_PREFIX + "database.name.cache.count"; - public static final String HOOK_TABLE_NAME_CACHE_COUNT = CONF_PREFIX + "table.name.cache.count"; - public static final String CONF_CLUSTER_NAME = "atlas.cluster.name"; + public static final String CONF_PREFIX = "atlas.hook.hive."; + public static final String CONF_CLUSTER_NAME = "atlas.cluster.name"; + public static final String HOOK_NAME_CACHE_ENABLED = CONF_PREFIX + "name.cache.enabled"; + public static final String HOOK_NAME_CACHE_DATABASE_COUNT = CONF_PREFIX + "name.cache.database.count"; + public static final String HOOK_NAME_CACHE_TABLE_COUNT = CONF_PREFIX + "name.cache.table.count"; + public static final String HOOK_NAME_CACHE_REBUID_INTERVAL_SEC = CONF_PREFIX + "name.cache.rebuild.interval.seconds"; public static final String DEFAULT_CLUSTER_NAME = "primary"; private static final Map<String, HiveOperation> OPERATION_MAP = new HashMap<>(); - private static final String clusterName; - private static final Map<String, Long> knownDatabases; - private static final Map<String, Long> knownTables; + + private static final String clusterName; + private static final boolean nameCacheEnabled; + private static final int nameCacheDatabaseMaxCount; + private static final int nameCacheTableMaxCount; + private static final int nameCacheRebuildIntervalSeconds; + + private static HiveHookObjectNamesCache knownObjects = null; static { for (HiveOperation hiveOperation : HiveOperation.values()) { OPERATION_MAP.put(hiveOperation.getOperationName(), hiveOperation); } - int dbNameCacheCount = atlasProperties.getInt(HOOK_DATABASE_NAME_CACHE_COUNT, 10000); - int tblNameCacheCount = atlasProperties.getInt(HOOK_TABLE_NAME_CACHE_COUNT, 10000); + clusterName = atlasProperties.getString(CONF_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); + nameCacheEnabled = atlasProperties.getBoolean(HOOK_NAME_CACHE_ENABLED, true); + nameCacheDatabaseMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_DATABASE_COUNT, 10000); + nameCacheTableMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_TABLE_COUNT, 10000); + nameCacheRebuildIntervalSeconds = atlasProperties.getInt(HOOK_NAME_CACHE_REBUID_INTERVAL_SEC, 60 * 60); // 60 minutes default - clusterName = atlasProperties.getString(CONF_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); - knownDatabases = dbNameCacheCount > 0 ? Collections.synchronizedMap(new LruCache<String, Long>(dbNameCacheCount, 0)) : null; - knownTables = tblNameCacheCount > 0 ? Collections.synchronizedMap(new LruCache<String, Long>(tblNameCacheCount, 0)) : null; + knownObjects = nameCacheEnabled ? new HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, nameCacheRebuildIntervalSeconds) : null; } + public HiveHook() { } @@ -78,9 +88,15 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { LOG.debug("==> HiveHook.run({})", hookContext.getOperationName()); } + if (knownObjects != null && knownObjects.isCacheExpired()) { + LOG.info("HiveHook.run(): purging cached databaseNames ({}) and tableNames ({})", knownObjects.getCachedDbCount(), knownObjects.getCachedTableCount()); + + knownObjects = new HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, nameCacheRebuildIntervalSeconds); + } + try { HiveOperation oper = OPERATION_MAP.get(hookContext.getOperationName()); - AtlasHiveHookContext context = new AtlasHiveHookContext(this, oper, hookContext); + AtlasHiveHookContext context = new AtlasHiveHookContext(this, oper, hookContext, knownObjects); BaseHiveEvent event = null; @@ -166,49 +182,72 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { return clusterName; } - public boolean isKnownDatabase(String dbQualifiedName) { - return knownDatabases != null && dbQualifiedName != null ? knownDatabases.containsKey(dbQualifiedName) : false; - } - public boolean isKnownTable(String tblQualifiedName) { - return knownTables != null && tblQualifiedName != null ? knownTables.containsKey(tblQualifiedName) : false; - } + public static class HiveHookObjectNamesCache { + private final int dbMaxCacheCount; + private final int tblMaxCacheCount; + private final long cacheExpiryTimeMs; + private final Set<String> knownDatabases; + private final Set<String> knownTables; - public void addToKnownEntities(Collection<AtlasEntity> entities) { - if (knownDatabases != null || knownTables != null) { // caching should be enabled at least for one - if (entities != null) { - for (AtlasEntity entity : entities) { - if (StringUtils.equalsIgnoreCase(entity.getTypeName(), HIVE_TYPE_DB)) { - addToKnownDatabase((String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); - } else if (StringUtils.equalsIgnoreCase(entity.getTypeName(), HIVE_TYPE_TABLE)) { - addToKnwnTable((String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); - } + public HiveHookObjectNamesCache(int dbMaxCacheCount, int tblMaxCacheCount, long nameCacheRebuildIntervalSeconds) { + this.dbMaxCacheCount = dbMaxCacheCount; + this.tblMaxCacheCount = tblMaxCacheCount; + this.cacheExpiryTimeMs = nameCacheRebuildIntervalSeconds <= 0 ? Long.MAX_VALUE : (System.currentTimeMillis() + (nameCacheRebuildIntervalSeconds * 1000)); + this.knownDatabases = Collections.synchronizedSet(new HashSet<>()); + this.knownTables = Collections.synchronizedSet(new HashSet<>()); + } + + public int getCachedDbCount() { + return knownDatabases.size(); + } + + public int getCachedTableCount() { + return knownTables.size(); + } + + public boolean isCacheExpired() { + return System.currentTimeMillis() > cacheExpiryTimeMs; + } + + public boolean isKnownDatabase(String dbQualifiedName) { + return knownDatabases.contains(dbQualifiedName); + } + + public boolean isKnownTable(String tblQualifiedName) { + return knownTables.contains(tblQualifiedName); + } + + public void addToKnownEntities(Collection<AtlasEntity> entities) { + for (AtlasEntity entity : entities) { + if (StringUtils.equalsIgnoreCase(entity.getTypeName(), HIVE_TYPE_DB)) { + addToKnownDatabase((String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); + } else if (StringUtils.equalsIgnoreCase(entity.getTypeName(), HIVE_TYPE_TABLE)) { + addToKnownTable((String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); } } } - } - public void addToKnownDatabase(String dbQualifiedName) { - if (knownDatabases != null && dbQualifiedName != null) { - knownDatabases.put(dbQualifiedName, System.currentTimeMillis()); + public void addToKnownDatabase(String dbQualifiedName) { + if (knownDatabases.size() < dbMaxCacheCount) { + knownDatabases.add(dbQualifiedName); + } } - } - public void addToKnwnTable(String tblQualifiedName) { - if (knownTables != null && tblQualifiedName != null) { - knownTables.put(tblQualifiedName, System.currentTimeMillis()); + public void addToKnownTable(String tblQualifiedName) { + if (knownTables.size() < tblMaxCacheCount) { + knownTables.add(tblQualifiedName); + } } - } - public void removeFromKnownDatabase(String dbQualifiedName) { - if (knownDatabases != null && dbQualifiedName != null) { + public void removeFromKnownDatabase(String dbQualifiedName) { knownDatabases.remove(dbQualifiedName); } - } - public void removeFromKnownTable(String tblQualifiedName) { - if (knownTables != null && tblQualifiedName != null) { - knownTables.remove(tblQualifiedName); + public void removeFromKnownTable(String tblQualifiedName) { + if (tblQualifiedName != null) { + knownTables.remove(tblQualifiedName); + } } } }