Commit 0174bac0 by Sarath Subramanian

ATLAS-3148: Implement Hive Metastore hook for Atlas

parent 0bb18f08
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hive.hook;
import org.apache.atlas.plugin.classloader.AtlasPluginClassLoader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.events.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Hive Metastore hook to capture DDL operations for atlas entity registration.
*/
public class HiveMetastoreHook extends MetaStoreEventListener {
private static final String ATLAS_PLUGIN_TYPE = "hive";
private static final String ATLAS_HIVE_METASTORE_HOOK_IMPL_CLASSNAME = "org.apache.atlas.hive.hook.HiveMetastoreHookImpl";
public static final Logger LOG = LoggerFactory.getLogger(HiveMetastoreHook.class);
private AtlasPluginClassLoader atlasPluginClassLoader = null;
private MetaStoreEventListener atlasMetastoreHookImpl = null;
private Configuration config;
public HiveMetastoreHook(Configuration config) {
super(config);
this.config = config;
this.initialize();
}
private void initialize() {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HiveMetastoreHook.initialize()");
}
try {
atlasPluginClassLoader = AtlasPluginClassLoader.getInstance(ATLAS_PLUGIN_TYPE, this.getClass());
@SuppressWarnings("unchecked")
Class<MetaStoreEventListener> cls = (Class<MetaStoreEventListener>)
Class.forName(ATLAS_HIVE_METASTORE_HOOK_IMPL_CLASSNAME, true, atlasPluginClassLoader);
activatePluginClassLoader();
atlasMetastoreHookImpl = cls.getDeclaredConstructor(Configuration.class).newInstance(config);
} catch (Exception ex) {
LOG.error("Error instantiating Atlas hook implementation", ex);
} finally {
deactivatePluginClassLoader();
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== HiveMetastoreHook.initialize()");
}
}
@Override
public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HiveMetastoreHook.onCreateTable()");
}
try {
activatePluginClassLoader();
atlasMetastoreHookImpl.onCreateTable(tableEvent);
} finally {
deactivatePluginClassLoader();
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== HiveMetastoreHook.onCreateTable()");
}
}
@Override
public void onDropTable(DropTableEvent tableEvent) throws MetaException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HiveMetastoreHook.onDropTable()");
}
try {
activatePluginClassLoader();
atlasMetastoreHookImpl.onDropTable(tableEvent);
} finally {
deactivatePluginClassLoader();
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== HiveMetastoreHook.onDropTable()");
}
}
@Override
public void onAlterTable(AlterTableEvent tableEvent) throws MetaException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HiveMetastoreHook.onAlterTable()");
}
try {
activatePluginClassLoader();
atlasMetastoreHookImpl.onAlterTable(tableEvent);
} finally {
deactivatePluginClassLoader();
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== HiveMetastoreHook.onAlterTable()");
}
}
@Override
public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HiveMetastoreHook.onCreateDatabase()");
}
try {
activatePluginClassLoader();
atlasMetastoreHookImpl.onCreateDatabase(dbEvent);
} finally {
deactivatePluginClassLoader();
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== HiveMetastoreHook.onCreateDatabase()");
}
}
@Override
public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HiveMetastoreHook.onDropDatabase()");
}
try {
activatePluginClassLoader();
atlasMetastoreHookImpl.onDropDatabase(dbEvent);
} finally {
deactivatePluginClassLoader();
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== HiveMetastoreHook.onDropDatabase()");
}
}
@Override
public void onAlterDatabase(AlterDatabaseEvent dbEvent) throws MetaException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HiveMetastoreHook.onAlterDatabase()");
}
try {
activatePluginClassLoader();
atlasMetastoreHookImpl.onAlterDatabase(dbEvent);
} finally {
deactivatePluginClassLoader();
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== HiveMetastoreHook.onAlterDatabase()");
}
}
private void activatePluginClassLoader() {
if (atlasPluginClassLoader != null) {
atlasPluginClassLoader.activate();
}
}
private void deactivatePluginClassLoader() {
if (atlasPluginClassLoader != null) {
atlasPluginClassLoader.deactivate();
}
}
}
\ No newline at end of file
......@@ -19,21 +19,25 @@
package org.apache.atlas.hive.hook;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.hive.hook.HiveMetastoreHookImpl.HiveMetastoreHook;
import org.apache.atlas.hive.hook.HiveHook.PreprocessAction;
import org.apache.atlas.hive.hook.HiveHook.HiveHookObjectNamesCache;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.hive.metastore.IHMSHandler;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.events.*;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.hooks.LineageInfo;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.session.SessionState;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import static org.apache.atlas.hive.hook.events.BaseHiveEvent.toTable;
public class AtlasHiveHookContext {
......@@ -41,6 +45,8 @@ public class AtlasHiveHookContext {
public static final char QNAME_SEP_ENTITY_NAME = '.';
public static final char QNAME_SEP_PROCESS = ':';
public static final String TEMP_TABLE_PREFIX = "_temp-";
public static final String CREATE_OPERATION = "CREATE";
public static final String ALTER_OPERATION = "ALTER";
private final HiveHook hook;
private final HiveOperation hiveOperation;
......@@ -48,17 +54,58 @@ public class AtlasHiveHookContext {
private final Hive hive;
private final Map<String, AtlasEntity> qNameEntityMap = new HashMap<>();
private final HiveHookObjectNamesCache knownObjects;
private final HiveMetastoreHook metastoreHook;
private final ListenerEvent metastoreEvent;
private final IHMSHandler metastoreHandler;
public AtlasHiveHookContext(HiveHook hook, HiveOperation hiveOperation, HookContext hiveContext,
HiveHookObjectNamesCache knownObjects) throws Exception {
this(hook, hiveOperation, hiveContext, knownObjects, null, null);
}
public AtlasHiveHookContext(HiveHook hook, HiveOperation hiveOperation, HookContext hiveContext, HiveHookObjectNamesCache knownObjects) throws Exception {
this.hook = hook;
this.hiveOperation = hiveOperation;
this.hiveContext = hiveContext;
this.hive = Hive.get(hiveContext.getConf());
this.knownObjects = knownObjects;
public AtlasHiveHookContext(HiveHook hook, HiveOperation hiveOperation, HiveHookObjectNamesCache knownObjects,
HiveMetastoreHook metastoreHook, ListenerEvent listenerEvent) throws Exception {
this(hook, hiveOperation, null, knownObjects, metastoreHook, listenerEvent);
}
public AtlasHiveHookContext(HiveHook hook, HiveOperation hiveOperation, HookContext hiveContext, HiveHookObjectNamesCache knownObjects,
HiveMetastoreHook metastoreHook, ListenerEvent listenerEvent) throws Exception {
this.hook = hook;
this.hiveOperation = hiveOperation;
this.hiveContext = hiveContext;
this.hive = hiveContext != null ? Hive.get(hiveContext.getConf()) : null;
this.knownObjects = knownObjects;
this.metastoreHook = metastoreHook;
this.metastoreEvent = listenerEvent;
this.metastoreHandler = (listenerEvent != null) ? metastoreEvent.getIHMSHandler() : null;
init();
}
public boolean isMetastoreHook() {
return metastoreHook != null;
}
public ListenerEvent getMetastoreEvent() {
return metastoreEvent;
}
public IHMSHandler getMetastoreHandler() {
return metastoreHandler;
}
public Set<ReadEntity> getInputs() {
return hiveContext != null ? hiveContext.getInputs() : Collections.emptySet();
}
public Set<WriteEntity> getOutputs() {
return hiveContext != null ? hiveContext.getOutputs() : Collections.emptySet();
}
public LineageInfo getLineageInfo() {
return hiveContext != null ? hiveContext.getLinfo() : null;
}
public HookContext getHiveContext() {
return hiveContext;
}
......@@ -147,24 +194,59 @@ public class AtlasHiveHookContext {
}
private void init() {
if (knownObjects != null) {
String operationName = hiveContext.getOperationName();
if (operationName != null && operationName.startsWith("CREATE") || operationName.startsWith("ALTER")) {
if (CollectionUtils.isNotEmpty(hiveContext.getOutputs())) {
for (WriteEntity output : hiveContext.getOutputs()) {
switch (output.getType()) {
case DATABASE:
knownObjects.removeFromKnownDatabase(getQualifiedName(output.getDatabase()));
break;
case TABLE:
knownObjects.removeFromKnownTable(getQualifiedName(output.getTable()));
break;
}
String operation = hiveOperation.getOperationName();
if (knownObjects == null || !isCreateAlterOperation(operation)) {
return;
}
List<Database> databases = new ArrayList<>();
List<Table> tables = new ArrayList<>();
if (isMetastoreHook()) {
switch (hiveOperation) {
case CREATEDATABASE:
databases.add(((CreateDatabaseEvent) metastoreEvent).getDatabase());
break;
case ALTERDATABASE:
databases.add(((AlterDatabaseEvent) metastoreEvent).getOldDatabase());
databases.add(((AlterDatabaseEvent) metastoreEvent).getNewDatabase());
break;
case CREATETABLE:
tables.add(toTable(((CreateTableEvent) metastoreEvent).getTable()));
break;
case ALTERTABLE_PROPERTIES:
case ALTERTABLE_RENAME:
case ALTERTABLE_RENAMECOL:
tables.add(toTable(((AlterTableEvent) metastoreEvent).getOldTable()));
tables.add(toTable(((AlterTableEvent) metastoreEvent).getNewTable()));
break;
}
} else {
if (getOutputs() != null) {
for (WriteEntity output : hiveContext.getOutputs()) {
switch (output.getType()) {
case DATABASE:
databases.add(output.getDatabase());
break;
case TABLE:
tables.add(output.getTable());
break;
}
}
}
}
for (Database database : databases) {
knownObjects.removeFromKnownDatabase(getQualifiedName(database));
}
for (Table table : tables) {
knownObjects.removeFromKnownTable(getQualifiedName(table));
}
}
private static boolean isCreateAlterOperation(String operationName) {
return operationName != null && operationName.startsWith(CREATE_OPERATION) || operationName.startsWith(ALTER_OPERATION);
}
}
}
\ No newline at end of file
......@@ -146,17 +146,10 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
LOG.debug("==> HiveHook.run({})", hookContext.getOperationName());
}
if (knownObjects != null && knownObjects.isCacheExpired()) {
LOG.info("HiveHook.run(): purging cached databaseNames ({}) and tableNames ({})", knownObjects.getCachedDbCount(), knownObjects.getCachedTableCount());
knownObjects = new HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, nameCacheRebuildIntervalSeconds);
}
try {
HiveOperation oper = OPERATION_MAP.get(hookContext.getOperationName());
AtlasHiveHookContext context = new AtlasHiveHookContext(this, oper, hookContext, knownObjects);
BaseHiveEvent event = null;
AtlasHiveHookContext context = new AtlasHiveHookContext(this, oper, hookContext, getKnownObjects());
BaseHiveEvent event = null;
switch (oper) {
case CREATEDATABASE:
......@@ -169,6 +162,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
case ALTERDATABASE:
case ALTERDATABASE_OWNER:
case ALTERDATABASE_LOCATION:
event = new AlterDatabase(context);
break;
......@@ -288,6 +282,15 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return ret;
}
public static HiveHookObjectNamesCache getKnownObjects() {
if (knownObjects != null && knownObjects.isCacheExpired()) {
LOG.info("HiveHook.run(): purging cached databaseNames ({}) and tableNames ({})", knownObjects.getCachedDbCount(), knownObjects.getCachedTableCount());
knownObjects = new HiveHook.HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, nameCacheRebuildIntervalSeconds);
}
return knownObjects;
}
public static class HiveHookObjectNamesCache {
private final int dbMaxCacheCount;
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hive.hook;
import org.apache.atlas.hive.hook.events.*;
import org.apache.atlas.hook.AtlasHook;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.events.*;
import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.atlas.hive.hook.events.AlterTableRenameCol.findRenamedColumn;
import static org.apache.atlas.hive.hook.events.BaseHiveEvent.toTable;
import static org.apache.hadoop.hive.ql.plan.HiveOperation.*;
public class HiveMetastoreHookImpl extends MetaStoreEventListener {
private static final Logger LOG = LoggerFactory.getLogger(HiveMetastoreHookImpl.class);
private final HiveHook hiveHook;
private final HiveMetastoreHook hook;
public HiveMetastoreHookImpl(Configuration config) {
super(config);
this.hiveHook = new HiveHook();
this.hook = new HiveMetastoreHook();
}
@Override
public void onCreateDatabase(CreateDatabaseEvent dbEvent) {
HiveOperationContext context = new HiveOperationContext(CREATEDATABASE, dbEvent);
hook.handleEvent(context);
}
@Override
public void onDropDatabase(DropDatabaseEvent dbEvent) {
HiveOperationContext context = new HiveOperationContext(DROPDATABASE, dbEvent);
hook.handleEvent(context);
}
@Override
public void onAlterDatabase(AlterDatabaseEvent dbEvent) {
HiveOperationContext context = new HiveOperationContext(ALTERDATABASE, dbEvent);
hook.handleEvent(context);
}
@Override
public void onCreateTable(CreateTableEvent tableEvent) {
HiveOperationContext context = new HiveOperationContext(CREATETABLE, tableEvent);
hook.handleEvent(context);
}
@Override
public void onDropTable(DropTableEvent tableEvent) {
HiveOperationContext context = new HiveOperationContext(DROPTABLE, tableEvent);
hook.handleEvent(context);
}
@Override
public void onAlterTable(AlterTableEvent tableEvent) {
HiveOperationContext context = new HiveOperationContext(tableEvent);
Table oldTable = toTable(tableEvent.getOldTable());
Table newTable = toTable(tableEvent.getNewTable());
if (isTableRename(oldTable, newTable)) {
context.setOperation(ALTERTABLE_RENAME);
} else if (isColumnRename(oldTable, newTable, context)) {
context.setOperation(ALTERTABLE_RENAMECOL);
} else {
context.setOperation(ALTERTABLE_PROPERTIES); // map other alter table operations to ALTERTABLE_PROPERTIES
}
hook.handleEvent(context);
}
public class HiveMetastoreHook extends AtlasHook {
public HiveMetastoreHook() {
}
public void handleEvent(HiveOperationContext operContext) {
ListenerEvent listenerEvent = operContext.getEvent();
if (!listenerEvent.getStatus()) {
return;
}
try {
HiveOperation oper = operContext.getOperation();
AtlasHiveHookContext context = new AtlasHiveHookContext(hiveHook, oper, hiveHook.getKnownObjects(), this, listenerEvent);
BaseHiveEvent event = null;
switch (oper) {
case CREATEDATABASE:
event = new CreateDatabase(context);
break;
case DROPDATABASE:
event = new DropDatabase(context);
break;
case ALTERDATABASE:
event = new AlterDatabase(context);
break;
case CREATETABLE:
event = new CreateTable(context, true);
break;
case DROPTABLE:
event = new DropTable(context);
break;
case ALTERTABLE_PROPERTIES:
event = new AlterTable(context);
break;
case ALTERTABLE_RENAME:
event = new AlterTableRename(context);
break;
case ALTERTABLE_RENAMECOL:
FieldSchema columnOld = operContext.getColumnOld();
FieldSchema columnNew = operContext.getColumnNew();
event = new AlterTableRenameCol(columnOld, columnNew, context);
break;
default:
if (LOG.isDebugEnabled()) {
LOG.debug("HiveMetastoreHook.handleEvent({}): operation ignored.", listenerEvent);
}
break;
}
if (event != null) {
final UserGroupInformation ugi = SecurityUtils.getUGI() == null ? Utils.getUGI() : SecurityUtils.getUGI();
super.notifyEntities(event.getNotificationMessages(), ugi);
}
} catch (Throwable t) {
LOG.error("HiveMetastoreHook.handleEvent({}): failed to process operation {}", listenerEvent, t);
}
}
}
private static boolean isTableRename(Table oldTable, Table newTable) {
String oldTableName = oldTable.getTableName();
String newTableName = newTable.getTableName();
return !StringUtils.equalsIgnoreCase(oldTableName, newTableName);
}
private static boolean isColumnRename(Table oldTable, Table newTable, HiveOperationContext context) {
FieldSchema columnOld = findRenamedColumn(oldTable, newTable);
FieldSchema columnNew = findRenamedColumn(newTable, oldTable);
boolean isColumnRename = columnOld != null && columnNew != null;
if (isColumnRename) {
context.setColumnOld(columnOld);
context.setColumnNew(columnNew);
}
return isColumnRename;
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hive.hook;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
public class HiveOperationContext {
HiveOperation operation;
ListenerEvent event;
FieldSchema columnOld;
FieldSchema columnNew;
public HiveOperationContext(ListenerEvent event) {
this(null, event);
}
public HiveOperationContext(HiveOperation operation, ListenerEvent event) {
setOperation(operation);
setEvent(event);
setColumnOld(null);
setColumnNew(null);
}
public ListenerEvent getEvent() {
return event;
}
public void setEvent(ListenerEvent event) {
this.event = event;
}
public HiveOperation getOperation() {
return operation;
}
public void setOperation(HiveOperation operation) {
this.operation = operation;
}
public FieldSchema getColumnOld() {
return columnOld;
}
public void setColumnOld(FieldSchema columnOld) {
this.columnOld = columnOld;
}
public FieldSchema getColumnNew() {
return columnNew;
}
public void setColumnNew(FieldSchema columnNew) {
this.columnNew = columnNew;
}
}
......@@ -19,15 +19,22 @@
package org.apache.atlas.hive.hook.events;
import org.apache.atlas.hive.hook.AtlasHiveHookContext;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.events.AlterDatabaseEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
public class AlterDatabase extends CreateDatabase {
private static final Logger LOG = LoggerFactory.getLogger(AlterDatabase.class);
public AlterDatabase(AtlasHiveHookContext context) {
super(context);
}
......@@ -35,7 +42,7 @@ public class AlterDatabase extends CreateDatabase {
@Override
public List<HookNotification> getNotificationMessages() throws Exception {
List<HookNotification> ret = null;
AtlasEntitiesWithExtInfo entities = getEntities();
AtlasEntitiesWithExtInfo entities = context.isMetastoreHook() ? getHiveMetastoreEntities() : getHiveEntities();
if (entities != null && CollectionUtils.isNotEmpty(entities.getEntities())) {
ret = Collections.singletonList(new EntityUpdateRequestV2(getUserName(), entities));
......@@ -43,4 +50,27 @@ public class AlterDatabase extends CreateDatabase {
return ret;
}
}
public AtlasEntitiesWithExtInfo getHiveMetastoreEntities() throws Exception {
AtlasEntitiesWithExtInfo ret = new AtlasEntitiesWithExtInfo();
AlterDatabaseEvent dbEvent = (AlterDatabaseEvent) context.getMetastoreEvent();
Database oldDb = dbEvent.getOldDatabase();
Database newDb = dbEvent.getNewDatabase();
if (newDb != null) {
AtlasEntity dbEntity = toDbEntity(newDb);
ret.addEntity(dbEntity);
} else {
LOG.error("AlterDatabase.getEntities(): failed to retrieve db");
}
addProcessedEntities(ret);
return ret;
}
public AtlasEntitiesWithExtInfo getHiveEntities() throws Exception {
return super.getHiveEntities();
}
}
\ No newline at end of file
......@@ -35,7 +35,7 @@ public class AlterTable extends CreateTable {
@Override
public List<HookNotification> getNotificationMessages() throws Exception {
List<HookNotification> ret = null;
AtlasEntitiesWithExtInfo entities = getEntities();
AtlasEntitiesWithExtInfo entities = context.isMetastoreHook() ? getHiveMetastoreEntities() : getHiveEntities();
if (entities != null && CollectionUtils.isNotEmpty(entities.getEntities())) {
ret = Collections.singletonList(new EntityUpdateRequestV2(getUserName(), entities));
......@@ -43,4 +43,4 @@ public class AlterTable extends CreateTable {
return ret;
}
}
}
\ No newline at end of file
......@@ -29,6 +29,7 @@ import org.apache.atlas.model.notification.HookNotification.EntityPartialUpdateR
import org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.Table;
......@@ -49,24 +50,48 @@ public class AlterTableRename extends BaseHiveEvent {
@Override
public List<HookNotification> getNotificationMessages() throws Exception {
return context.isMetastoreHook() ? getHiveMetastoreMessages() : getHiveMessages();
}
public List<HookNotification> getHiveMetastoreMessages() throws Exception {
List<HookNotification> ret = new ArrayList<>();
AlterTableEvent tblEvent = (AlterTableEvent) context.getMetastoreEvent();
Table oldTable = toTable(tblEvent.getOldTable());
Table newTable = toTable(tblEvent.getNewTable());
if (newTable == null) {
LOG.error("AlterTableRename: renamed table not found in outputs list");
return ret;
}
processTables(oldTable, newTable, ret);
return ret;
}
public List<HookNotification> getHiveMessages() throws Exception {
List<HookNotification> ret = new ArrayList<>();
Table oldTable;
Table newTable;
if (CollectionUtils.isEmpty(getHiveContext().getInputs())) {
if (CollectionUtils.isEmpty(getInputs())) {
LOG.error("AlterTableRename: old-table not found in inputs list");
return ret;
}
Table oldTable = getHiveContext().getInputs().iterator().next().getTable();
Table newTable = null;
oldTable = getInputs().iterator().next().getTable();
newTable = null;
if (CollectionUtils.isNotEmpty(getHiveContext().getOutputs())) {
for (WriteEntity entity : getHiveContext().getOutputs()) {
if (CollectionUtils.isNotEmpty(getOutputs())) {
for (WriteEntity entity : getOutputs()) {
if (entity.getType() == Entity.Type.TABLE) {
newTable = entity.getTable();
//Hive sends with both old and new table names in the outputs which is weird. So skipping that with the below check
if (StringUtils.equalsIgnoreCase(newTable.getDbName(), oldTable.getDbName()) && StringUtils.equalsIgnoreCase(newTable.getTableName(), oldTable.getTableName())) {
if (StringUtils.equalsIgnoreCase(newTable.getDbName(), oldTable.getDbName()) &&
StringUtils.equalsIgnoreCase(newTable.getTableName(), oldTable.getTableName())) {
newTable = null;
continue;
......@@ -85,11 +110,17 @@ public class AlterTableRename extends BaseHiveEvent {
return ret;
}
processTables(oldTable, newTable, ret);
return ret;
}
private void processTables(Table oldTable, Table newTable, List<HookNotification> ret) throws Exception {
AtlasEntityWithExtInfo oldTableEntity = toTableEntity(oldTable);
AtlasEntityWithExtInfo renamedTableEntity = toTableEntity(newTable);
if (oldTableEntity == null || renamedTableEntity == null) {
return ret;
return;
}
// first update with oldTable info, so that the table will be created if it is not present in Atlas
......@@ -110,14 +141,13 @@ public class AlterTableRename extends BaseHiveEvent {
// set previous name as the alias
renamedTableEntity.getEntity().setAttribute(ATTRIBUTE_ALIASES, Collections.singletonList(oldTable.getTableName()));
AtlasObjectId oldTableId = new AtlasObjectId(oldTableEntity.getEntity().getTypeName(), ATTRIBUTE_QUALIFIED_NAME, oldTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME));
String oldTableQualifiedName = (String) oldTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME);
AtlasObjectId oldTableId = new AtlasObjectId(oldTableEntity.getEntity().getTypeName(), ATTRIBUTE_QUALIFIED_NAME, oldTableQualifiedName);
// update qualifiedName and other attributes (like params - which include lastModifiedTime, lastModifiedBy) of the table
ret.add(new EntityPartialUpdateRequestV2(getUserName(), oldTableId, renamedTableEntity));
context.removeFromKnownTable((String) oldTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME));
return ret;
context.removeFromKnownTable(oldTableQualifiedName);
}
private void renameColumns(List<AtlasObjectId> columns, AtlasEntityExtInfo oldEntityExtInfo, String newTableQualifiedName, List<HookNotification> notifications) {
......
......@@ -26,6 +26,7 @@ import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.model.notification.HookNotification.EntityPartialUpdateRequestV2;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -34,64 +35,79 @@ import java.util.ArrayList;
import java.util.List;
public class AlterTableRenameCol extends AlterTable {
private static final Logger LOG = LoggerFactory.getLogger(AlterTableRenameCol.class);
private static final Logger LOG = LoggerFactory.getLogger(AlterTableRenameCol.class);
private final FieldSchema columnOld;
private final FieldSchema columnNew;
public AlterTableRenameCol(AtlasHiveHookContext context) {
this(null, null, context);
}
public AlterTableRenameCol(FieldSchema columnOld, FieldSchema columnNew, AtlasHiveHookContext context) {
super(context);
this.columnOld = columnOld;
this.columnNew = columnNew;
}
@Override
public List<HookNotification> getNotificationMessages() throws Exception {
if (CollectionUtils.isEmpty(getHiveContext().getInputs())) {
return context.isMetastoreHook() ? getHiveMetastoreMessages() : getHiveMessages();
}
public List<HookNotification> getHiveMetastoreMessages() throws Exception {
List<HookNotification> baseMsgs = super.getNotificationMessages();
List<HookNotification> ret = new ArrayList<>(baseMsgs);
AlterTableEvent tblEvent = (AlterTableEvent) context.getMetastoreEvent();
Table oldTable = toTable(tblEvent.getOldTable());
Table newTable = toTable(tblEvent.getNewTable());
processColumns(oldTable, newTable, ret);
return ret;
}
public List<HookNotification> getHiveMessages() throws Exception {
List<HookNotification> baseMsgs = super.getNotificationMessages();
List<HookNotification> ret = new ArrayList<>(baseMsgs);
if (CollectionUtils.isEmpty(getInputs())) {
LOG.error("AlterTableRenameCol: old-table not found in inputs list");
return null;
}
if (CollectionUtils.isEmpty(getHiveContext().getOutputs())) {
if (CollectionUtils.isEmpty(getOutputs())) {
LOG.error("AlterTableRenameCol: new-table not found in outputs list");
return null;
}
List<HookNotification> baseMsgs = super.getNotificationMessages();
if (CollectionUtils.isEmpty(baseMsgs)) {
LOG.debug("Skipped processing of column-rename (on a temporary table?)");
return null;
}
List<HookNotification> ret = new ArrayList<>(baseMsgs);
Table oldTable = getHiveContext().getInputs().iterator().next().getTable();
Table newTable = getHiveContext().getOutputs().iterator().next().getTable();
newTable = getHive().getTable(newTable.getDbName(), newTable.getTableName());
List<FieldSchema> oldColumns = oldTable.getCols();
List<FieldSchema> newColumns = newTable.getCols();
FieldSchema changedColumnOld = null;
FieldSchema changedColumnNew = null;
for (FieldSchema oldColumn : oldColumns) {
if (!newColumns.contains(oldColumn)) {
changedColumnOld = oldColumn;
Table oldTable = getInputs().iterator().next().getTable();
Table newTable = getOutputs().iterator().next().getTable();
break;
}
if (newTable != null) {
newTable = getHive().getTable(newTable.getDbName(), newTable.getTableName());
}
for (FieldSchema newColumn : newColumns) {
if (!oldColumns.contains(newColumn)) {
changedColumnNew = newColumn;
processColumns(oldTable, newTable, ret);
break;
}
}
return ret;
}
private void processColumns(Table oldTable, Table newTable, List<HookNotification> ret) {
FieldSchema changedColumnOld = (columnOld == null) ? findRenamedColumn(oldTable, newTable) : columnOld;
FieldSchema changedColumnNew = (columnNew == null) ? findRenamedColumn(newTable, oldTable) : columnNew;
if (changedColumnOld != null && changedColumnNew != null) {
AtlasObjectId oldColumnId = new AtlasObjectId(HIVE_TYPE_COLUMN, ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(oldTable, changedColumnOld));
AtlasEntity newColumn = new AtlasEntity(HIVE_TYPE_COLUMN);
AtlasEntity newColumn = new AtlasEntity(HIVE_TYPE_COLUMN);
newColumn.setAttribute(ATTRIBUTE_NAME, changedColumnNew.getName());
newColumn.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(newTable, changedColumnNew));
......@@ -100,7 +116,21 @@ public class AlterTableRenameCol extends AlterTable {
} else {
LOG.error("AlterTableRenameCol: no renamed column detected");
}
}
public static FieldSchema findRenamedColumn(Table inputTable, Table outputTable) {
FieldSchema ret = null;
List<FieldSchema> inputColumns = inputTable.getCols();
List<FieldSchema> outputColumns = outputTable.getCols();
for (FieldSchema inputColumn : inputColumns) {
if (!outputColumns.contains(inputColumn)) {
ret = inputColumn;
break;
}
}
return ret;
}
}
}
\ No newline at end of file
......@@ -37,11 +37,10 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.apache.hadoop.hive.ql.hooks.*;
import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo;
import org.apache.hadoop.hive.ql.hooks.LineageInfo.DependencyKey;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
......@@ -328,7 +327,8 @@ public abstract class BaseHiveEvent {
}
protected AtlasEntity toTableEntity(Table table, AtlasEntityExtInfo entityExtInfo) throws Exception {
AtlasEntity dbEntity = toDbEntity(getHive().getDatabase(table.getDbName()));
Database db = getDatabases(table.getDbName());
AtlasEntity dbEntity = toDbEntity(db);
if (entityExtInfo != null) {
if (dbEntity != null) {
......@@ -594,8 +594,7 @@ public abstract class BaseHiveEvent {
protected AtlasEntity getHiveProcessEntity(List<AtlasEntity> inputs, List<AtlasEntity> outputs) throws Exception {
AtlasEntity ret = new AtlasEntity(HIVE_TYPE_PROCESS);
HookContext hookContext = getHiveContext();
String queryStr = hookContext.getQueryPlan().getQueryStr();
String queryStr = getQueryString();
if (queryStr != null) {
queryStr = queryStr.toLowerCase().trim();
......@@ -605,12 +604,12 @@ public abstract class BaseHiveEvent {
ret.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputs));
ret.setAttribute(ATTRIBUTE_OUTPUTS, getObjectIds(outputs));
ret.setAttribute(ATTRIBUTE_NAME, queryStr);
ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, hookContext.getOperationName());
ret.setAttribute(ATTRIBUTE_START_TIME, hookContext.getQueryPlan().getQueryStartTime());
ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, getOperationName());
ret.setAttribute(ATTRIBUTE_START_TIME, getQueryStartTime());
ret.setAttribute(ATTRIBUTE_END_TIME, System.currentTimeMillis());
ret.setAttribute(ATTRIBUTE_USER_NAME, getUserName());
ret.setAttribute(ATTRIBUTE_QUERY_TEXT, queryStr);
ret.setAttribute(ATTRIBUTE_QUERY_ID, hookContext.getQueryPlan().getQuery().getQueryId());
ret.setAttribute(ATTRIBUTE_QUERY_ID, getQueryId());
ret.setAttribute(ATTRIBUTE_QUERY_PLAN, "Not Supported");
ret.setAttribute(ATTRIBUTE_RECENT_QUERIES, Collections.singletonList(queryStr));
......@@ -621,34 +620,86 @@ public abstract class BaseHiveEvent {
return context.getClusterName();
}
protected Database getDatabases(String dbName) throws Exception {
return context.isMetastoreHook() ? context.getMetastoreHandler().get_database(dbName) :
context.getHive().getDatabase(dbName);
}
protected Hive getHive() {
return context.getHive();
}
protected HookContext getHiveContext() {
return context.getHiveContext();
protected Set<ReadEntity> getInputs() {
return context != null ? context.getInputs() : Collections.emptySet();
}
protected String getUserName() {
String ret = getHiveContext().getUserName();
protected Set<WriteEntity> getOutputs() {
return context != null ? context.getOutputs() : Collections.emptySet();
}
if (StringUtils.isEmpty(ret)) {
UserGroupInformation ugi = getHiveContext().getUgi();
protected LineageInfo getLineageInfo() {
return context != null ? context.getLineageInfo() : null;
}
protected String getQueryString() {
return isHiveContextValid() ? context.getHiveContext().getQueryPlan().getQueryStr() : null;
}
protected String getOperationName() {
return isHiveContextValid() ? context.getHiveContext().getOperationName() : null;
}
protected String getHiveUserName() {
return isHiveContextValid() ? context.getHiveContext().getUserName() : null;
}
protected UserGroupInformation getUgi() {
return isHiveContextValid() ? context.getHiveContext().getUgi() : null;
}
protected Long getQueryStartTime() {
return isHiveContextValid() ? context.getHiveContext().getQueryPlan().getQueryStartTime() : null;
}
protected String getQueryId() {
return isHiveContextValid() ? context.getHiveContext().getQueryPlan().getQueryId() : null;
}
private boolean isHiveContextValid() {
return context != null && context.getHiveContext() != null;
}
protected String getUserName() {
String ret = null;
UserGroupInformation ugi = null;
if (ugi != null) {
ret = ugi.getShortUserName();
if (context.isMetastoreHook()) {
try {
ugi = SecurityUtils.getUGI();
} catch (Exception e) {
//do nothing
}
} else {
ret = getHiveUserName();
if (StringUtils.isEmpty(ret)) {
try {
ret = UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException e) {
LOG.warn("Failed for UserGroupInformation.getCurrentUser() ", e);
ret = System.getProperty("user.name");
}
ugi = getUgi();
}
}
if (ugi != null) {
ret = ugi.getShortUserName();
}
if (StringUtils.isEmpty(ret)) {
try {
ret = UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException e) {
LOG.warn("Failed for UserGroupInformation.getCurrentUser() ", e);
ret = System.getProperty("user.name");
}
}
return ret;
}
......@@ -757,7 +808,7 @@ public abstract class BaseHiveEvent {
operation == HiveOperation.CREATEVIEW ||
operation == HiveOperation.ALTERVIEW_AS ||
operation == HiveOperation.ALTERTABLE_LOCATION) {
List<? extends Entity> sortedEntities = new ArrayList<>(getHiveContext().getOutputs());
List<? extends Entity> sortedEntities = new ArrayList<>(getOutputs());
Collections.sort(sortedEntities, entityComparator);
......@@ -774,15 +825,23 @@ public abstract class BaseHiveEvent {
}
}
StringBuilder sb = new StringBuilder(getHiveContext().getOperationName());
String qualifiedName = null;
String operationName = getOperationName();
boolean ignoreHDFSPaths = ignoreHDFSPathsinProcessQualifiedName();
if (operationName != null) {
StringBuilder sb = new StringBuilder(operationName);
addToProcessQualifiedName(sb, getHiveContext().getInputs(), ignoreHDFSPaths);
sb.append("->");
addToProcessQualifiedName(sb, getHiveContext().getOutputs(), ignoreHDFSPaths);
boolean ignoreHDFSPaths = ignoreHDFSPathsinProcessQualifiedName();
return sb.toString();
addToProcessQualifiedName(sb, getInputs(), ignoreHDFSPaths);
sb.append("->");
addToProcessQualifiedName(sb, getOutputs(), ignoreHDFSPaths);
qualifiedName = sb.toString();
}
return qualifiedName;
}
protected AtlasEntity toReferencedHBaseTable(Table table, AtlasEntitiesWithExtInfo entities) {
......@@ -836,9 +895,9 @@ public abstract class BaseHiveEvent {
switch (context.getHiveOperation()) {
case LOAD:
case IMPORT:
return hasPartitionEntity(getHiveContext().getOutputs());
return hasPartitionEntity(getOutputs());
case EXPORT:
return hasPartitionEntity(getHiveContext().getInputs());
return hasPartitionEntity(getInputs());
case QUERY:
return true;
}
......@@ -1006,4 +1065,8 @@ public abstract class BaseHiveEvent {
return hbaseTableName;
}
}
public static Table toTable(org.apache.hadoop.hive.metastore.api.Table table) {
return new Table(table);
}
}
......@@ -25,6 +25,7 @@ import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -32,6 +33,8 @@ import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
import static org.apache.hadoop.hive.ql.hooks.Entity.Type.DATABASE;
public class CreateDatabase extends BaseHiveEvent {
private static final Logger LOG = LoggerFactory.getLogger(CreateDatabase.class);
......@@ -42,7 +45,7 @@ public class CreateDatabase extends BaseHiveEvent {
@Override
public List<HookNotification> getNotificationMessages() throws Exception {
List<HookNotification> ret = null;
AtlasEntitiesWithExtInfo entities = getEntities();
AtlasEntitiesWithExtInfo entities = context.isMetastoreHook() ? getHiveMetastoreEntities() : getHiveEntities();
if (entities != null && CollectionUtils.isNotEmpty(entities.getEntities())) {
ret = Collections.singletonList(new EntityCreateRequestV2(getUserName(), entities));
......@@ -51,11 +54,29 @@ public class CreateDatabase extends BaseHiveEvent {
return ret;
}
public AtlasEntitiesWithExtInfo getEntities() throws Exception {
public AtlasEntitiesWithExtInfo getHiveMetastoreEntities() throws Exception {
AtlasEntitiesWithExtInfo ret = new AtlasEntitiesWithExtInfo();
CreateDatabaseEvent dbEvent = (CreateDatabaseEvent) context.getMetastoreEvent();
Database db = dbEvent.getDatabase();
if (db != null) {
AtlasEntity dbEntity = toDbEntity(db);
ret.addEntity(dbEntity);
} else {
LOG.error("CreateDatabase.getEntities(): failed to retrieve db");
}
addProcessedEntities(ret);
return ret;
}
public AtlasEntitiesWithExtInfo getHiveEntities() throws Exception {
AtlasEntitiesWithExtInfo ret = new AtlasEntitiesWithExtInfo();
for (Entity entity : getHiveContext().getOutputs()) {
if (entity.getType() == Entity.Type.DATABASE) {
for (Entity entity : getOutputs()) {
if (entity.getType() == DATABASE) {
Database db = entity.getDatabase();
if (db != null) {
......@@ -76,4 +97,4 @@ public class CreateDatabase extends BaseHiveEvent {
return ret;
}
}
}
\ No newline at end of file
......@@ -72,13 +72,12 @@ public class CreateHiveProcess extends BaseHiveEvent {
if (!skipProcess()) {
List<AtlasEntity> inputs = new ArrayList<>();
List<AtlasEntity> outputs = new ArrayList<>();
HookContext hiveContext = getHiveContext();
Set<String> processedNames = new HashSet<>();
ret = new AtlasEntitiesWithExtInfo();
if (hiveContext.getInputs() != null) {
for (ReadEntity input : hiveContext.getInputs()) {
if (getInputs() != null) {
for (ReadEntity input : getInputs()) {
String qualifiedName = getQualifiedName(input);
if (qualifiedName == null || !processedNames.add(qualifiedName)) {
......@@ -97,8 +96,8 @@ public class CreateHiveProcess extends BaseHiveEvent {
}
}
if (hiveContext.getOutputs() != null) {
for (WriteEntity output : hiveContext.getOutputs()) {
if (getOutputs() != null) {
for (WriteEntity output : getOutputs()) {
String qualifiedName = getQualifiedName(output);
if (qualifiedName == null || !processedNames.add(qualifiedName)) {
......@@ -130,7 +129,7 @@ public class CreateHiveProcess extends BaseHiveEvent {
}
private void processColumnLineage(AtlasEntity hiveProcess, AtlasEntitiesWithExtInfo entities) {
LineageInfo lineageInfo = getHiveContext().getLinfo();
LineageInfo lineageInfo = getLineageInfo();
if (lineageInfo == null || CollectionUtils.isEmpty(lineageInfo.entrySet())) {
return;
......@@ -235,8 +234,8 @@ public class CreateHiveProcess extends BaseHiveEvent {
private boolean skipProcess() {
Set<ReadEntity> inputs = getHiveContext().getInputs();
Set<WriteEntity> outputs = getHiveContext().getOutputs();
Set<ReadEntity> inputs = getInputs();
Set<WriteEntity> outputs = getOutputs();
boolean ret = CollectionUtils.isEmpty(inputs) && CollectionUtils.isEmpty(outputs);
......
......@@ -24,14 +24,19 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import java.util.Collections;
import java.util.List;
import static org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE;
import static org.apache.hadoop.hive.ql.plan.HiveOperation.*;
public class CreateTable extends BaseHiveEvent {
private final boolean skipTempTables;
......@@ -44,7 +49,7 @@ public class CreateTable extends BaseHiveEvent {
@Override
public List<HookNotification> getNotificationMessages() throws Exception {
List<HookNotification> ret = null;
AtlasEntitiesWithExtInfo entities = getEntities();
AtlasEntitiesWithExtInfo entities = context.isMetastoreHook() ? getHiveMetastoreEntities() : getHiveEntities();
if (entities != null && CollectionUtils.isNotEmpty(entities.getEntities())) {
ret = Collections.singletonList(new EntityCreateRequestV2(getUserName(), entities));
......@@ -53,31 +58,62 @@ public class CreateTable extends BaseHiveEvent {
return ret;
}
public AtlasEntitiesWithExtInfo getEntities() throws Exception {
public AtlasEntitiesWithExtInfo getHiveMetastoreEntities() throws Exception {
AtlasEntitiesWithExtInfo ret = new AtlasEntitiesWithExtInfo();
Database db = null;
Table table = null;
ListenerEvent event = context.getMetastoreEvent();
HiveOperation oper = context.getHiveOperation();
Table table;
if (isAlterTable(oper)) {
table = toTable(((AlterTableEvent) event).getNewTable());
} else {
table = toTable(((CreateTableEvent) event).getTable());
}
if (skipTemporaryTable(table)) {
table = null;
}
processTable(table, ret);
for (Entity entity : getHiveContext().getOutputs()) {
if (entity.getType() == Entity.Type.TABLE) {
table = entity.getTable();
addProcessedEntities(ret);
return ret;
}
public AtlasEntitiesWithExtInfo getHiveEntities() throws Exception {
AtlasEntitiesWithExtInfo ret = new AtlasEntitiesWithExtInfo();
Table table = null;
if (table != null) {
db = getHive().getDatabase(table.getDbName());
table = getHive().getTable(table.getDbName(), table.getTableName());
if (CollectionUtils.isNotEmpty(getOutputs())) {
for (Entity entity : getOutputs()) {
if (entity.getType() == Entity.Type.TABLE) {
table = entity.getTable();
if (table != null) {
// If its an external table, even though the temp table skip flag is on, we create the table since we need the HDFS path to temp table lineage.
if (skipTempTables && table.isTemporary() && !TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
table = null;
} else {
break;
table = getHive().getTable(table.getDbName(), table.getTableName());
if (table != null) {
if (skipTemporaryTable(table)) {
table = null;
} else {
break;
}
}
}
}
}
}
processTable(table, ret);
addProcessedEntities(ret);
return ret;
}
// create process entities for lineages from HBase/HDFS to hive table
private void processTable(Table table, AtlasEntitiesWithExtInfo ret) throws Exception {
if (table != null) {
AtlasEntity tblEntity = toTableEntity(table, ret);
......@@ -89,7 +125,7 @@ public class CreateTable extends BaseHiveEvent {
if (hbaseTableEntity != null) {
final AtlasEntity processEntity;
if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
if (EXTERNAL_TABLE.equals(table.getTableType())) {
processEntity = getHiveProcessEntity(Collections.singletonList(hbaseTableEntity), Collections.singletonList(tblEntity));
} else {
processEntity = getHiveProcessEntity(Collections.singletonList(tblEntity), Collections.singletonList(hbaseTableEntity));
......@@ -98,7 +134,7 @@ public class CreateTable extends BaseHiveEvent {
ret.addEntity(processEntity);
}
} else {
if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
if (EXTERNAL_TABLE.equals(table.getTableType())) {
AtlasEntity hdfsPathEntity = getPathEntity(table.getDataLocation(), ret);
AtlasEntity processEntity = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), Collections.singletonList(tblEntity));
......@@ -108,9 +144,14 @@ public class CreateTable extends BaseHiveEvent {
}
}
}
}
addProcessedEntities(ret);
private static boolean isAlterTable(HiveOperation oper) {
return (oper == ALTERTABLE_PROPERTIES || oper == ALTERTABLE_RENAME || oper == ALTERTABLE_RENAMECOL);
}
return ret;
private boolean skipTemporaryTable(Table table) {
// If its an external table, even though the temp table skip flag is on, we create the table since we need the HDFS path to temp table lineage.
return table != null && skipTempTables && table.isTemporary() && !EXTERNAL_TABLE.equals(table.getTableType());
}
}
......@@ -23,21 +23,25 @@ import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV2;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
import org.apache.hadoop.hive.ql.hooks.Entity;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.apache.hadoop.hive.ql.hooks.Entity.Type.DATABASE;
import static org.apache.hadoop.hive.ql.hooks.Entity.Type.TABLE;
public class DropDatabase extends BaseHiveEvent {
public DropDatabase(AtlasHiveHookContext context) {
super(context);
}
@Override
public List<HookNotification> getNotificationMessages() throws Exception {
public List<HookNotification> getNotificationMessages() {
List<HookNotification> ret = null;
List<AtlasObjectId> entities = getEntities();
List<AtlasObjectId> entities = context.isMetastoreHook() ? getHiveMetastoreEntities() : getHiveEntities();
if (CollectionUtils.isNotEmpty(entities)) {
ret = new ArrayList<>(entities.size());
......@@ -50,27 +54,40 @@ public class DropDatabase extends BaseHiveEvent {
return ret;
}
public List<AtlasObjectId> getEntities() throws Exception {
private List<AtlasObjectId> getHiveMetastoreEntities() {
List<AtlasObjectId> ret = new ArrayList<>();
DropDatabaseEvent dbEvent = (DropDatabaseEvent) context.getMetastoreEvent();
String dbQName = getQualifiedName(dbEvent.getDatabase());
AtlasObjectId dbId = new AtlasObjectId(HIVE_TYPE_DB, ATTRIBUTE_QUALIFIED_NAME, dbQName);
context.removeFromKnownDatabase(dbQName);
ret.add(dbId);
return ret;
}
private List<AtlasObjectId> getHiveEntities() {
List<AtlasObjectId> ret = new ArrayList<>();
for (Entity entity : getHiveContext().getOutputs()) {
if (entity.getType() == Entity.Type.DATABASE) {
for (Entity entity : getOutputs()) {
if (entity.getType() == DATABASE) {
String dbQName = getQualifiedName(entity.getDatabase());
AtlasObjectId dbId = new AtlasObjectId(HIVE_TYPE_DB, ATTRIBUTE_QUALIFIED_NAME, dbQName);
context.removeFromKnownDatabase(dbQName);
ret.add(dbId);
} else if (entity.getType() == Entity.Type.TABLE) {
} else if (entity.getType() == TABLE) {
String tblQName = getQualifiedName(entity.getTable());
AtlasObjectId dbId = new AtlasObjectId(HIVE_TYPE_TABLE, ATTRIBUTE_QUALIFIED_NAME, tblQName);
AtlasObjectId tblId = new AtlasObjectId(HIVE_TYPE_TABLE, ATTRIBUTE_QUALIFIED_NAME, tblQName);
context.removeFromKnownTable(tblQName);
ret.add(dbId);
ret.add(tblId);
}
}
return ret;
}
}
}
\ No newline at end of file
......@@ -23,7 +23,9 @@ import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV2;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.metadata.Table;
import java.util.ArrayList;
import java.util.Collections;
......@@ -35,9 +37,9 @@ public class DropTable extends BaseHiveEvent {
}
@Override
public List<HookNotification> getNotificationMessages() throws Exception {
public List<HookNotification> getNotificationMessages() {
List<HookNotification> ret = null;
List<AtlasObjectId> entities = getEntities();
List<AtlasObjectId> entities = context.isMetastoreHook() ? getHiveMetastoreEntities() : getHiveEntities();
if (CollectionUtils.isNotEmpty(entities)) {
ret = new ArrayList<>(entities.size());
......@@ -50,20 +52,34 @@ public class DropTable extends BaseHiveEvent {
return ret;
}
public List<AtlasObjectId> getEntities() throws Exception {
public List<AtlasObjectId> getHiveMetastoreEntities() {
List<AtlasObjectId> ret = new ArrayList<>();
DropTableEvent tblEvent = (DropTableEvent) context.getMetastoreEvent();
Table table = new Table(tblEvent.getTable());
String tblQName = getQualifiedName(table);
AtlasObjectId tblId = new AtlasObjectId(HIVE_TYPE_TABLE, ATTRIBUTE_QUALIFIED_NAME, tblQName);
context.removeFromKnownTable(tblQName);
ret.add(tblId);
return ret;
}
public List<AtlasObjectId> getHiveEntities() {
List<AtlasObjectId> ret = new ArrayList<>();
for (Entity entity : getHiveContext().getOutputs()) {
for (Entity entity : getOutputs()) {
if (entity.getType() == Entity.Type.TABLE) {
String tblQName = getQualifiedName(entity.getTable());
AtlasObjectId dbId = new AtlasObjectId(HIVE_TYPE_TABLE, ATTRIBUTE_QUALIFIED_NAME, tblQName);
AtlasObjectId tblId = new AtlasObjectId(HIVE_TYPE_TABLE, ATTRIBUTE_QUALIFIED_NAME, tblQName);
context.removeFromKnownTable(tblQName);
ret.add(dbId);
ret.add(tblId);
}
}
return ret;
}
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment