Commit b05b8556 by rmani Committed by apoorvnaik

ATLAS-1805:Provide an Atlas hook to send Hbase Namespace/Table/column family metadata to Atlas

parent d9b2bd06
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>apache-atlas</artifactId>
<groupId>org.apache.atlas</groupId>
<version>0.8.2-SNAPSHOT</version>
<relativePath>../../</relativePath>
</parent>
<artifactId>hbase-bridge-shim</artifactId>
<description>Apache Atlas Hbase Bridge Shim Module</description>
<name>Apache Atlas Hbase Bridge Shim</name>
<packaging>jar</packaging>
<dependencies>
<!-- Logging -->
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-plugin-classloader</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
</dependencies>
</project>
This source diff could not be displayed because it is too large. You can view the blob instead.
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>apache-atlas</artifactId>
<groupId>org.apache.atlas</groupId>
<version>0.8.2-SNAPSHOT</version>
<relativePath>../../</relativePath>
</parent>
<artifactId>hbase-bridge</artifactId>
<description>Apache Atlas Hbase Bridge Module</description>
<name>Apache Atlas Hbase Bridge</name>
<packaging>jar</packaging>
<properties>
<hbase.version>1.2.1</hbase.version>
<calcite.version>0.9.2-incubating</calcite.version>
</properties>
<dependencies>
<!-- Logging -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-client-v1</artifactId>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-notification</artifactId>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>hdfs-model</artifactId>
</dependency>
<!-- to bring up atlas server for integration tests -->
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-bundle</artifactId>
<version>1.19</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-webapp</artifactId>
<type>war</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.6</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-webapp</artifactId>
<version>${jetty.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-typesystem</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop2-compat</artifactId>
<version>${hbase.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
<version>${hbase.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>12.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
<profiles>
<profile>
<id>dist</id>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile-first</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<recompileMode>incremental</recompileMode>
<useZincServer>true</useZincServer>
<source>1.7</source>
<target>1.7</target>
<args>
<arg>-unchecked</arg>
<arg>-deprecation</arg>
<arg>-feature</arg>
</args>
<jvmArgs>
<jvmArg>-Xmx512m</jvmArg>
<jvmArg>-XX:MaxPermSize=128m</jvmArg>
</jvmArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-hook</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/dependency/hook/hbase/atlas-hbase-plugin-impl</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>${project.artifactId}</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.json4s</groupId>
<artifactId>json4s-native_${scala.binary.version}</artifactId>
<version>${json.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.json4s</groupId>
<artifactId>json4s-core_${scala.binary.version}</artifactId>
<version>${json.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.json4s</groupId>
<artifactId>json4s-ast_${scala.binary.version}</artifactId>
<version>${json.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-client-v1</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-client-common</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-intg</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-typesystem</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-notification</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>hdfs-model</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-common</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.scala-lang</groupId>
<artifactId>scalap</artifactId>
<version>${scala.version}</version>
</artifactItem>
<artifactItem>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-multibindings</artifactId>
<version>${guice.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>${kafka.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</artifactItem>
<artifactItem>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
</artifactItem>
</artifactItems>
</configuration>
</execution>
<execution>
<id>copy-hook-shim</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/dependency/hook/hbase</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>hbase-bridge-shim</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-plugin-classloader</artifactId>
<version>${project.version}</version>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<build>
<plugins>
<plugin>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-maven-plugin</artifactId>
<configuration>
<skip>${skipTests}</skip>
<!--only skip int tests -->
<httpConnector>
<port>31000</port>
<idleTimeout>60000</idleTimeout>
</httpConnector>
<war>../../webapp/target/atlas-webapp-${project.version}.war</war>
<daemon>true</daemon>
<webApp>
<contextPath>/</contextPath>
<descriptor>${project.basedir}/../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
<extraClasspath>${project.basedir}/../../webapp/target/test-classes/</extraClasspath>
</webApp>
<useTestScope>true</useTestScope>
<systemProperties>
<systemProperty>
<name>log4j.configuration</name>
<value>file://${project.basedir}/../../distro/src/conf/atlas-log4j.xml</value>
</systemProperty>
<systemProperty>
<name>atlas.log.file</name>
<value>application.log</value>
</systemProperty>
<systemProperty>
<name>atlas.log.dir</name>
<value>${project.build.directory}/logs</value>
</systemProperty>
<systemProperty>
<name>atlas.data</name>
<value>${project.build.directory}/data</value>
</systemProperty>
<systemProperty>
<key>atlas.conf</key>
<value>${project.build.directory}/../../../typesystem/target/test-classes</value>
</systemProperty>
<systemProperty>
<key>atlas.home</key>
<value>${project.basedir}/target</value>
</systemProperty>
</systemProperties>
<stopKey>atlas-stop</stopKey>
<stopPort>31001</stopPort>
<stopWait>${jetty-maven-plugin.stopWait}</stopWait>
</configuration>
<executions>
<execution>
<id>start-jetty</id>
<phase>pre-integration-test</phase>
<goals>
<goal>deploy-war</goal>
</goals>
<configuration>
<daemon>true</daemon>
</configuration>
</execution>
<execution>
<id>stop-jetty</id>
<phase>post-integration-test</phase>
<goals>
<goal>stop</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-site-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.maven.doxia</groupId>
<artifactId>doxia-module-twiki</artifactId>
<version>1.3</version>
</dependency>
</dependencies>
<executions>
<execution>
<goals>
<goal>site</goal>
</goals>
<phase>prepare-package</phase>
</execution>
</executions>
<configuration>
<generateProjectInfo>false</generateProjectInfo>
<generateReports>false</generateReports>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<inherited>false</inherited>
<executions>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-resources</id>
<phase>validate</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${basedir}/target/models</outputDirectory>
<resources>
<resource>
<directory>${basedir}/../models</directory>
<filtering>true</filtering>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hbase.bridge;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.hbase.model.HBaseOperationContext;
import org.apache.atlas.hbase.model.HBaseDataTypes;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ShutdownHookManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
// This will register Hbase entities into Atlas
public class HBaseAtlasHook extends AtlasHook {
private static final Logger LOG = LoggerFactory.getLogger(HBaseAtlasHook.class);
public static final String CONF_PREFIX = "atlas.hook.hbase.";
public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
public static final String QUEUE_SIZE = CONF_PREFIX + "queueSize";
public static final String CONF_SYNC = CONF_PREFIX + "synchronous";
private static final String MIN_THREADS = CONF_PREFIX + "minThreads";
private static final String MAX_THREADS = CONF_PREFIX + "maxThreads";
private static final String KEEP_ALIVE_TIME = CONF_PREFIX + "keepAliveTime";
private static final int minThreadsDefault = 5;
private static final int maxThreadsDefault = 5;
private static final int queueSizeDefault = 10000;
private static final long keepAliveTimeDefault = 10;
// wait time determines how long we wait before we exit the jvm on shutdown. Pending requests after that will not be sent.
private static final int WAIT_TIME = 3;
private static boolean sync;
private static ExecutorService executor;
public static final String HBASE_CLUSTER_NAME = "atlas.cluster.name";
public static final String DEFAULT_CLUSTER_NAME = "primary";
public static final String ATTR_DESCRIPTION = "description";
public static final String ATTR_ATLAS_ENDPOINT = "atlas.rest.address";
public static final String ATTR_COMMENT = "comment";
public static final String ATTR_PARAMETERS = "parameters";
public static final String ATTR_URI = "uri";
public static final String ATTR_NAMESPACE = "namespace";
public static final String ATTR_TABLE = "table";
public static final String ATTR_COLUMNFAMILIES = "column_families";
public static final String ATTR_CREATE_TIME = "createTime";
public static final String ATTR_MODIFIED_TIME = "modifiedTime";
public static final String ATTR_OWNER = "owner";
public static final String ATTR_NAME = "name";
private static final String REFERENCEABLE_ATTRIBUTE_NAME = "qualifiedName";
private String clusterName = null;
private static volatile HBaseAtlasHook me;
public enum OPERATION {
CREATE_NAMESPACE("create_namespace"),
ALTER_NAMESPACE("alter_namespace"),
DELETE_NAMESPACE("delete_namespace"),
CREATE_TABLE("create_table"),
ALTER_TABLE("alter_table"),
DELETE_TABLE("delete_table"),
CREATE_COLUMN_FAMILY("create_column_Family"),
ALTER_COLUMN_FAMILY("alter_column_Family"),
DELETE_COLUMN_FAMILY("delete_column_Family");
private final String name;
OPERATION(String s) {
name = s;
}
public String getName() {
return name;
}
}
static {
try {
// initialize the async facility to process hook calls. We don't
// want to do this inline since it adds plenty of overhead for the query.
int minThreads = atlasProperties.getInt(MIN_THREADS, minThreadsDefault);
int maxThreads = atlasProperties.getInt(MAX_THREADS, maxThreadsDefault);
int queueSize = atlasProperties.getInt(QUEUE_SIZE, queueSizeDefault);
long keepAliveTime = atlasProperties.getLong(KEEP_ALIVE_TIME, keepAliveTimeDefault);
sync = atlasProperties.getBoolean(CONF_SYNC, false);
executor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTime, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(queueSize),
new ThreadFactoryBuilder().setNameFormat("Atlas Logger %d").build());
ShutdownHookManager.get().addShutdownHook(new Thread() {
@Override
public void run() {
try {
LOG.info("==> Shutdown of Atlas HBase Hook");
executor.shutdown();
executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS);
executor = null;
} catch (InterruptedException ie) {
LOG.info("Interrupt received in shutdown.", ie);
} finally {
LOG.info("<== Shutdown of Atlas HBase Hook");
}
// shutdown client
}
}, AtlasConstants.ATLAS_SHUTDOWN_HOOK_PRIORITY);
} catch (Exception e) {
LOG.error("Caught exception initializing the Atlas HBase hook.", e);
}
LOG.info("Created Atlas Hook for HBase");
}
public static HBaseAtlasHook getInstance() {
HBaseAtlasHook ret = me;
if (ret == null) {
try {
synchronized (HBaseAtlasHook.class) {
ret = me;
if (ret == null) {
me = ret = new HBaseAtlasHook(atlasProperties);
}
}
} catch (Exception e) {
LOG.error("Caught exception instantiating the Atlas HBase hook.", e);
}
}
return ret;
}
public HBaseAtlasHook(Configuration atlasProperties) {
this(atlasProperties.getString(HBASE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
}
public HBaseAtlasHook(String clusterName) {
this.clusterName = clusterName;
}
@Override
protected String getNumberOfRetriesPropertyKey() {
return HOOK_NUM_RETRIES;
}
public void createAtlasInstances(HBaseOperationContext hbaseOperationContext) {
HBaseAtlasHook.OPERATION operation = hbaseOperationContext.getOperation();
LOG.info("HBaseAtlasHook(operation={})", operation);
switch (operation) {
case CREATE_NAMESPACE:
case ALTER_NAMESPACE:
createOrUpdateNamespaceInstance(hbaseOperationContext);
break;
case DELETE_NAMESPACE:
deleteNameSpaceInstance(hbaseOperationContext);
break;
case CREATE_TABLE:
case ALTER_TABLE:
createOrUpdateTableInstance(hbaseOperationContext);
break;
case DELETE_TABLE:
deleteTableInstance(hbaseOperationContext);
break;
case CREATE_COLUMN_FAMILY:
case ALTER_COLUMN_FAMILY:
createOrUpdateColumnFamilyInstance(hbaseOperationContext);
break;
case DELETE_COLUMN_FAMILY:
deleteColumnFamilyInstance(hbaseOperationContext);
break;
}
}
private void createOrUpdateNamespaceInstance(HBaseOperationContext hbaseOperationContext) {
Referenceable nameSpaceRef = buildNameSpaceRef(hbaseOperationContext);
switch (hbaseOperationContext.getOperation()) {
case CREATE_NAMESPACE:
LOG.info("Create NameSpace {}", nameSpaceRef.get(REFERENCEABLE_ATTRIBUTE_NAME));
hbaseOperationContext.addMessage(new HookNotification.EntityCreateRequest(hbaseOperationContext.getUser(), nameSpaceRef));
break;
case ALTER_NAMESPACE:
LOG.info("Modify NameSpace {}", nameSpaceRef.get(REFERENCEABLE_ATTRIBUTE_NAME));
hbaseOperationContext.addMessage(new HookNotification.EntityUpdateRequest(hbaseOperationContext.getUser(), nameSpaceRef));
break;
}
}
private void deleteNameSpaceInstance(HBaseOperationContext hbaseOperationContext) {
String nameSpaceQualifiedName = getNameSpaceQualifiedName(clusterName, hbaseOperationContext.getNameSpace());
LOG.info("Delete NameSpace {}", nameSpaceQualifiedName);
hbaseOperationContext.addMessage(new HookNotification.EntityDeleteRequest(hbaseOperationContext.getUser(),
HBaseDataTypes.HBASE_NAMESPACE.getName(),
REFERENCEABLE_ATTRIBUTE_NAME,
nameSpaceQualifiedName));
}
private void createOrUpdateTableInstance(HBaseOperationContext hbaseOperationContext) {
Referenceable nameSpaceRef = buildNameSpaceRef(hbaseOperationContext);
Referenceable tableRef = buildTableRef(hbaseOperationContext, nameSpaceRef);
List<Referenceable> columnFamilyRef = buildColumnFamiliesRef(hbaseOperationContext, nameSpaceRef, tableRef);
tableRef.set(ATTR_COLUMNFAMILIES, columnFamilyRef);
switch (hbaseOperationContext.getOperation()) {
case CREATE_TABLE:
LOG.info("Create Table {}", tableRef.get(REFERENCEABLE_ATTRIBUTE_NAME));
hbaseOperationContext.addMessage(new HookNotification.EntityCreateRequest(hbaseOperationContext.getUser(), nameSpaceRef, tableRef));
break;
case ALTER_TABLE:
LOG.info("Modify Table {}", tableRef.get(REFERENCEABLE_ATTRIBUTE_NAME));
hbaseOperationContext.addMessage(new HookNotification.EntityUpdateRequest(hbaseOperationContext.getUser(), nameSpaceRef, tableRef));
break;
}
}
private void deleteTableInstance(HBaseOperationContext hbaseOperationContext) {
TableName tableName = hbaseOperationContext.getTableName();
String tableNameSpace = tableName.getNamespaceAsString();
if (tableNameSpace == null) {
tableNameSpace = tableName.getNameWithNamespaceInclAsString();
}
String tableNameStr = tableName.getNameAsString();
String tableQualifiedName = getTableQualifiedName(clusterName, tableNameSpace, tableNameStr);
LOG.info("Delete Table {}", tableQualifiedName);
hbaseOperationContext.addMessage(new HookNotification.EntityDeleteRequest(hbaseOperationContext.getUser(),
HBaseDataTypes.HBASE_TABLE.getName(),
REFERENCEABLE_ATTRIBUTE_NAME,
tableQualifiedName));
}
private void createOrUpdateColumnFamilyInstance(HBaseOperationContext hbaseOperationContext) {
Referenceable nameSpaceRef = buildNameSpaceRef(hbaseOperationContext);
Referenceable tableRef = buildTableRef(hbaseOperationContext, nameSpaceRef);
Referenceable columnFamilyRef = buildColumnFamilyRef(hbaseOperationContext, hbaseOperationContext.gethColumnDescriptor(), nameSpaceRef, tableRef);
switch (hbaseOperationContext.getOperation()) {
case CREATE_COLUMN_FAMILY:
LOG.info("Create ColumnFamily {}", columnFamilyRef.get(REFERENCEABLE_ATTRIBUTE_NAME));
hbaseOperationContext.addMessage(new HookNotification.EntityCreateRequest(hbaseOperationContext.getUser(), nameSpaceRef, tableRef, columnFamilyRef));
break;
case ALTER_COLUMN_FAMILY:
LOG.info("Alter ColumnFamily {}", columnFamilyRef.get(REFERENCEABLE_ATTRIBUTE_NAME));
hbaseOperationContext.addMessage(new HookNotification.EntityUpdateRequest(hbaseOperationContext.getUser(), nameSpaceRef, tableRef, columnFamilyRef));
break;
}
}
private void deleteColumnFamilyInstance(HBaseOperationContext hbaseOperationContext) {
TableName tableName = hbaseOperationContext.getTableName();
String tableNameSpace = tableName.getNamespaceAsString();
if (tableNameSpace == null) {
tableNameSpace = tableName.getNameWithNamespaceInclAsString();
}
String tableNameStr = tableName.getNameAsString();
String columnFamilyName = hbaseOperationContext.getColummFamily();
String columnFamilyQualifiedName = getColumnFamilyQualifiedName(clusterName, tableNameSpace, tableNameStr, columnFamilyName);
LOG.info("Delete ColumnFamily {}", columnFamilyQualifiedName);
hbaseOperationContext.addMessage(new HookNotification.EntityDeleteRequest(hbaseOperationContext.getUser(),
HBaseDataTypes.HBASE_COLUMN_FAMILY.getName(),
REFERENCEABLE_ATTRIBUTE_NAME,
columnFamilyQualifiedName));
}
/**
* Construct the qualified name used to uniquely identify a ColumnFamily instance in Atlas.
*
* @param clusterName Name of the cluster to which the HBase component belongs
* @param nameSpace Name of the HBase database to which the Table belongs
* @param tableName Name of the HBase table
* @param columnFamily Name of the ColumnFamily
* @return Unique qualified name to identify the Table instance in Atlas.
*/
public static String getColumnFamilyQualifiedName(String clusterName, String nameSpace, String tableName, String columnFamily) {
return String.format("%s.%s.%s@%s", nameSpace.toLowerCase(), stripNameSpace(tableName.toLowerCase()), columnFamily.toLowerCase(), clusterName);
}
/**
* Construct the qualified name used to uniquely identify a Table instance in Atlas.
*
* @param clusterName Name of the cluster to which the HBase component belongs
* @param nameSpace Name of the HBase database to which the Table belongs
* @param tableName Name of the HBase table
* @return Unique qualified name to identify the Table instance in Atlas.
*/
public static String getTableQualifiedName(String clusterName, String nameSpace, String tableName) {
return String.format("%s.%s@%s", nameSpace.toLowerCase(), stripNameSpace(tableName.toLowerCase()), clusterName);
}
/**
* Construct the qualified name used to uniquely identify a HBase NameSpace instance in Atlas.
*
* @param clusterName Name of the cluster to which the HBase component belongs
* @param nameSpace
* @return Unique qualified name to identify the HBase NameSpace instance in Atlas.
*/
public static String getNameSpaceQualifiedName(String clusterName, String nameSpace) {
return String.format("%s@%s", nameSpace.toLowerCase(), clusterName);
}
private static String stripNameSpace(String tableName) {
return tableName.substring(tableName.indexOf(":") + 1);
}
private Referenceable buildNameSpaceRef(HBaseOperationContext hbaseOperationContext) {
Referenceable nameSpaceRef = new Referenceable(HBaseDataTypes.HBASE_NAMESPACE.getName());
String nameSpace = null;
NamespaceDescriptor nameSpaceDesc = hbaseOperationContext.getNamespaceDescriptor();
if (nameSpaceDesc != null) {
nameSpace = hbaseOperationContext.getNamespaceDescriptor().getName();
}
if (nameSpace == null) {
nameSpace = hbaseOperationContext.getNameSpace();
}
nameSpaceRef.set(ATTR_NAME, nameSpace);
nameSpaceRef.set(REFERENCEABLE_ATTRIBUTE_NAME, getNameSpaceQualifiedName(clusterName, nameSpace));
nameSpaceRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
nameSpaceRef.set(ATTR_DESCRIPTION, nameSpace);
nameSpaceRef.set(ATTR_PARAMETERS, hbaseOperationContext.getHbaseConf());
nameSpaceRef.set(ATTR_OWNER, hbaseOperationContext.getOwner());
Date now = new Date(System.currentTimeMillis());
if (OPERATION.CREATE_NAMESPACE.equals(hbaseOperationContext.getOperation())) {
nameSpaceRef.set(ATTR_CREATE_TIME, now);
nameSpaceRef.set(ATTR_MODIFIED_TIME, now);
} else {
nameSpaceRef.set(ATTR_MODIFIED_TIME, now);
}
return nameSpaceRef;
}
private Referenceable buildTableRef(HBaseOperationContext hbaseOperationContext, Referenceable nameSpaceRef) {
Referenceable tableRef = new Referenceable(HBaseDataTypes.HBASE_TABLE.getName());
String tableName = getTableName(hbaseOperationContext);
String tableNameSpace = hbaseOperationContext.getNameSpace();
String tableQualifiedName = getTableQualifiedName(clusterName, tableNameSpace, tableName);
OPERATION operation = hbaseOperationContext.getOperation();
Date now = new Date(System.currentTimeMillis());
tableRef.set(REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
tableRef.set(ATTR_NAME, tableName);
tableRef.set(ATTR_URI, tableName);
tableRef.set(ATTR_OWNER, hbaseOperationContext.getOwner());
tableRef.set(ATTR_DESCRIPTION, tableName);
tableRef.set(ATTR_PARAMETERS, hbaseOperationContext.getHbaseConf());
switch (operation) {
case CREATE_TABLE:
tableRef.set(ATTR_NAMESPACE, nameSpaceRef);
tableRef.set(ATTR_CREATE_TIME, now);
tableRef.set(ATTR_MODIFIED_TIME, now);
break;
case ALTER_TABLE:
tableRef.set(ATTR_NAMESPACE, nameSpaceRef);
tableRef.set(ATTR_MODIFIED_TIME, now);
break;
default:
tableRef.set(ATTR_NAMESPACE, nameSpaceRef.getId());
break;
}
return tableRef;
}
private List<Referenceable> buildColumnFamiliesRef(HBaseOperationContext hbaseOperationContext, Referenceable nameSpaceRef, Referenceable tableRef) {
List<Referenceable> entities = new ArrayList<>();
HColumnDescriptor[] hColumnDescriptors = hbaseOperationContext.gethColumnDescriptors();
if (hColumnDescriptors != null) {
for (HColumnDescriptor hColumnDescriptor : hColumnDescriptors) {
Referenceable columnFamilyRef = buildColumnFamilyRef(hbaseOperationContext, hColumnDescriptor, nameSpaceRef, tableRef);
entities.add(columnFamilyRef);
}
}
return entities;
}
private Referenceable buildColumnFamilyRef(HBaseOperationContext hbaseOperationContext, HColumnDescriptor hColumnDescriptor, Referenceable nameSpaceRef, Referenceable tableReference) {
Referenceable columnFamilyRef = new Referenceable(HBaseDataTypes.HBASE_COLUMN_FAMILY.getName());
String columnFamilyName = hColumnDescriptor.getNameAsString();
String tableName = (String) tableReference.get(ATTR_NAME);
String namespace = (String) nameSpaceRef.get(ATTR_NAME);
String columnFamilyQualifiedName = getColumnFamilyQualifiedName(clusterName, namespace, tableName, columnFamilyName);
columnFamilyRef.set(ATTR_NAME, columnFamilyName);
columnFamilyRef.set(ATTR_DESCRIPTION, columnFamilyName);
columnFamilyRef.set(REFERENCEABLE_ATTRIBUTE_NAME, columnFamilyQualifiedName);
columnFamilyRef.set(ATTR_OWNER, hbaseOperationContext.getOwner());
Date now = new Date(System.currentTimeMillis());
switch (hbaseOperationContext.getOperation()) {
case CREATE_COLUMN_FAMILY:
columnFamilyRef.set(ATTR_TABLE, tableReference);
columnFamilyRef.set(ATTR_CREATE_TIME, now);
columnFamilyRef.set(ATTR_MODIFIED_TIME, now);
break;
case ALTER_COLUMN_FAMILY:
columnFamilyRef.set(ATTR_TABLE, tableReference);
columnFamilyRef.set(ATTR_MODIFIED_TIME, now);
break;
default:
columnFamilyRef.set(ATTR_TABLE, tableReference.getId());
}
return columnFamilyRef;
}
private String getTableName(HBaseOperationContext hbaseOperationContext) {
HTableDescriptor tableDescriptor = hbaseOperationContext.gethTableDescriptor();
return (tableDescriptor != null) ? tableDescriptor.getNameAsString() : null;
}
private void notifyAsPrivilegedAction(final HBaseOperationContext hbaseOperationContext) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasHook.notifyAsPrivilegedAction({})", hbaseOperationContext);
}
final List<HookNotification.HookNotificationMessage> messages = hbaseOperationContext.getMessages();
try {
PrivilegedExceptionAction<Object> privilegedNotify = new PrivilegedExceptionAction<Object>() {
@Override
public Object run() {
notifyEntities(messages);
return hbaseOperationContext;
}
};
//Notify as 'hbase' service user in doAs mode
UserGroupInformation realUser = hbaseOperationContext.getUgi().getRealUser();
String numberOfMessages = Integer.toString(messages.size());
String operation = hbaseOperationContext.getOperation().toString();
String user = hbaseOperationContext.getUgi().getShortUserName();
if (realUser != null) {
LOG.info("Sending notification for event {} as service user {} #messages {}", operation, realUser.getShortUserName(), numberOfMessages);
realUser.doAs(privilegedNotify);
} else {
LOG.info("Sending notification for event {} as service user {} #messages {}", operation, user, numberOfMessages);
hbaseOperationContext.getUgi().doAs(privilegedNotify);
}
} catch (Throwable e) {
LOG.error("Error during notify {} ", hbaseOperationContext.getOperation(), e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasHook.notifyAsPrivilegedAction()");
}
}
/**
* Notify atlas of the entity through message. The entity can be a
* complex entity with reference to other entities.
* De-duping of entities is done on server side depending on the
* unique attribute on the entities.
*
* @param messages hook notification messages
*/
protected void notifyEntities(List<HookNotification.HookNotificationMessage> messages) {
final int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3);
notifyEntities(messages, maxRetries);
}
public void sendHBaseNameSpaceOperation(final NamespaceDescriptor namespaceDescriptor, final String nameSpace, final OPERATION operation) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasHook.sendHBaseNameSpaceOperation()");
}
try {
final UserGroupInformation ugi = getUGI();
HBaseOperationContext hbaseOperationContext = null;
if (executor == null) {
hbaseOperationContext = handleHBaseNameSpaceOperation(namespaceDescriptor, nameSpace, operation);
if (hbaseOperationContext != null) {
notifyAsPrivilegedAction(hbaseOperationContext);
}
} else {
executor.submit(new Runnable() {
HBaseOperationContext hbaseOperationContext = null;
@Override
public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasHook.sendHBaseNameSpaceOperation():executor.submit()");
}
if (ugi != null) {
try {
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() {
hbaseOperationContext = handleHBaseNameSpaceOperation(namespaceDescriptor, nameSpace, operation);
return hbaseOperationContext;
}
});
notifyAsPrivilegedAction(hbaseOperationContext);
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasHook.sendHBaseNameSpaceOperation(){}", hbaseOperationContext);
}
} catch (Throwable e) {
LOG.error("<== HBaseAtlasHook.sendHBaseNameSpaceOperation(): Atlas hook failed due to error ", e);
}
} else {
LOG.error("<== HBaseAtlasHook.sendHBaseNameSpaceOperation(): Atlas hook failed, UserGroupInformation cannot be NULL!");
}
}
});
}
} catch (Throwable t) {
LOG.error("<== HBaseAtlasHook.sendHBaseNameSpaceOperation(): Submitting to thread pool failed due to error ", t);
}
}
public void sendHBaseTableOperation(final HTableDescriptor hTableDescriptor, final TableName tableName, final OPERATION operation) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasHook.sendHBaseTableOperation()");
}
try {
final UserGroupInformation ugi = getUGI();
HBaseOperationContext hbaseOperationContext = null;
if (executor == null) {
hbaseOperationContext = handleHBaseTableOperation(hTableDescriptor, tableName, operation);
if (hbaseOperationContext != null) {
notifyAsPrivilegedAction(hbaseOperationContext);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasHook.sendHBaseTableOperation(){}", hbaseOperationContext);
}
} else {
executor.submit(new Runnable() {
HBaseOperationContext hbaseOperationContext = null;
@Override
public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasHook.sendHBaseTableOperation():executor.submit()");
}
if (ugi != null) {
try {
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() {
hbaseOperationContext = handleHBaseTableOperation(hTableDescriptor, tableName, operation);
return hbaseOperationContext;
}
});
notifyAsPrivilegedAction(hbaseOperationContext);
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasHook.sendHBaseTableOperation(){}", hbaseOperationContext);
}
} catch (Throwable e) {
LOG.error("<== HBaseAtlasHook.sendHBaseTableOperation(): Atlas hook failed due to error ", e);
}
} else {
LOG.error("<== HBaseAtlasHook.sendHBasecolumnFamilyOperation(): Atlas hook failed, UserGroupInformation cannot be NULL!");
}
}
});
}
} catch (Throwable t) {
LOG.error("<== HBaseAtlasHook.sendHBaseTableOperation(): Submitting to thread pool failed due to error ", t);
}
}
public void sendHBaseColumnFamilyOperation(final HColumnDescriptor hColumnDescriptor, final TableName tableName, final String columnFamily, final OPERATION operation) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasHook.sendHBaseColumnFamilyOperation()");
}
try {
final UserGroupInformation ugi = getUGI();
HBaseOperationContext hbaseOperationContext = null;
if (executor == null) {
hbaseOperationContext = handleHBaseColumnFamilyOperation(hColumnDescriptor, tableName, columnFamily, operation);
if (hbaseOperationContext != null) {
notifyAsPrivilegedAction(hbaseOperationContext);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasHook.sendHBaseColumnFamilyOperation(){}", hbaseOperationContext);
}
} else {
executor.submit(new Runnable() {
HBaseOperationContext hbaseOperationContext = null;
@Override
public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasHook.sendHBaseColumnFamilyOperation():executor.submit()");
}
if (ugi != null) {
try {
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() {
hbaseOperationContext = handleHBaseColumnFamilyOperation(hColumnDescriptor, tableName, columnFamily, operation);
return hbaseOperationContext;
}
});
notifyAsPrivilegedAction(hbaseOperationContext);
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasHook.sendHBaseColumnFamilyOperation(){}", hbaseOperationContext);
}
} catch (Throwable e) {
LOG.error("<== HBaseAtlasHook.sendHBaseColumnFamilyOperation(): Atlas hook failed due to error ", e);
}
} else {
LOG.error("<== HBaseAtlasHook.sendHBaseColumnFamilyOperation(): Atlas hook failed, UserGroupInformation cannot be NULL!");
}
}
});
}
} catch (Throwable t) {
LOG.error("<== HBaseAtlasHook.sendHBaseColumnFamilyOperation(): Submitting to thread pool failed due to error ", t);
}
}
private HBaseOperationContext handleHBaseNameSpaceOperation(NamespaceDescriptor namespaceDescriptor, String nameSpace, OPERATION operation) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasHook.handleHBaseNameSpaceOperation()");
}
UserGroupInformation ugi = getUGI();
User user = getActiveUser();
String userName = user.getShortName();
HBaseOperationContext hbaseOperationContext = new HBaseOperationContext(namespaceDescriptor, nameSpace, operation, ugi, userName, userName);
createAtlasInstances(hbaseOperationContext);
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasHook.handleHBaseNameSpaceOperation(): {}", hbaseOperationContext);
}
return hbaseOperationContext;
}
private HBaseOperationContext handleHBaseTableOperation(HTableDescriptor hTableDescriptor, TableName tableName, OPERATION operation) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasHook.handleHBaseTableOperation()");
}
UserGroupInformation ugi = getUGI();
User user = getActiveUser();
String userName = user.getShortName();
Map<String, String> hbaseConf = null;
String owner = null;
String tableNameSpace = null;
TableName hbaseTableName = null;
HColumnDescriptor[] hColumnDescriptors = null;
if (hTableDescriptor != null) {
owner = hTableDescriptor.getOwnerString();
hbaseConf = hTableDescriptor.getConfiguration();
hbaseTableName = hTableDescriptor.getTableName();
if (hbaseTableName != null) {
tableNameSpace = hbaseTableName.getNamespaceAsString();
if (tableNameSpace == null) {
tableNameSpace = hbaseTableName.getNameWithNamespaceInclAsString();
}
}
}
if (owner == null) {
owner = userName;
}
if (hTableDescriptor != null) {
hColumnDescriptors = hTableDescriptor.getColumnFamilies();
}
HBaseOperationContext hbaseOperationContext = new HBaseOperationContext(tableNameSpace, hTableDescriptor, tableName, hColumnDescriptors, operation, ugi, userName, owner, hbaseConf);
createAtlasInstances(hbaseOperationContext);
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasHook.handleHBaseTableOperation(): {}", hbaseOperationContext);
}
return hbaseOperationContext;
}
private HBaseOperationContext handleHBaseColumnFamilyOperation(HColumnDescriptor hColumnDescriptor, TableName tableName, String columnFamily, OPERATION operation) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasHook.handleHBaseColumnFamilyOperation()");
}
UserGroupInformation ugi = getUGI();
User user = getActiveUser();
String userName = user.getShortName();
String owner = userName;
Map<String, String> hbaseConf = null;
String tableNameSpace = tableName.getNamespaceAsString();
if (tableNameSpace == null) {
tableNameSpace = tableName.getNameWithNamespaceInclAsString();
}
if (hColumnDescriptor != null) {
hbaseConf = hColumnDescriptor.getConfiguration();
}
HBaseOperationContext hbaseOperationContext = new HBaseOperationContext(tableNameSpace, tableName, hColumnDescriptor, columnFamily, operation, ugi, userName, owner, hbaseConf);
createAtlasInstances(hbaseOperationContext);
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasHook.handleHBaseColumnFamilyOperation(): {}", hbaseOperationContext);
}
return hbaseOperationContext;
}
private User getActiveUser() {
User user = RpcServer.getRequestUser();
if (user == null) {
// for non-rpc handling, fallback to system user
try {
user = User.getCurrent();
} catch (IOException e) {
LOG.error("Unable to find the current user");
user = null;
}
}
return user;
}
private UserGroupInformation getUGI() {
UserGroupInformation ugi = null;
User user = getActiveUser();
try {
ugi = UserGroupInformation.getLoginUser();
} catch (Exception e) {
// not setting the UGI here
}
if (ugi == null) {
if (user != null) {
ugi = user.getUGI();
}
}
LOG.info("HBaseAtlasHook: UGI: {}", ugi);
return ugi;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hbase.hook;
import org.apache.atlas.hbase.bridge.HBaseAtlasHook;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class HBaseAtlasCoprocessor extends HBaseAtlasCoprocessorBase {
private static final Logger LOG = LoggerFactory.getLogger(HBaseAtlasCoprocessor.class);
final HBaseAtlasHook hbaseAtlasHook;
public HBaseAtlasCoprocessor() {
hbaseAtlasHook = HBaseAtlasHook.getInstance();
}
@Override
public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> observerContext, HTableDescriptor hTableDescriptor, HRegionInfo[] hRegionInfos) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasCoprocessoror.postCreateTable()");
}
hbaseAtlasHook.sendHBaseTableOperation(hTableDescriptor, null, HBaseAtlasHook.OPERATION.CREATE_TABLE);
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasCoprocessoror.postCreateTable()");
}
}
@Override
public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasCoprocessor.postDeleteTable()");
}
hbaseAtlasHook.sendHBaseTableOperation(null, tableName, HBaseAtlasHook.OPERATION.DELETE_TABLE);
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasCoprocessor.postDeleteTable()");
}
}
@Override
public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName, HTableDescriptor hTableDescriptor) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasCoprocessor.postModifyTable()");
}
hbaseAtlasHook.sendHBaseTableOperation(hTableDescriptor, tableName, HBaseAtlasHook.OPERATION.ALTER_TABLE);
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasCoprocessor.postModifyTable()");
}
}
@Override
public void postAddColumn(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName, HColumnDescriptor hColumnDescriptor) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasCoprocessor.postAddColumn()");
}
hbaseAtlasHook.sendHBaseColumnFamilyOperation(hColumnDescriptor, tableName, null, HBaseAtlasHook.OPERATION.CREATE_COLUMN_FAMILY);
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasCoprocessor.postAddColumn()");
}
}
@Override
public void postModifyColumn(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName, HColumnDescriptor hColumnDescriptor) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasCoprocessor.postModifyColumn()");
}
hbaseAtlasHook.sendHBaseColumnFamilyOperation(hColumnDescriptor, tableName, null, HBaseAtlasHook.OPERATION.ALTER_COLUMN_FAMILY);
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasCoprocessor.postModifyColumn()");
}
}
@Override
public void postDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName, byte[] bytes) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasCoprocessor.postDeleteColumn()");
}
String columnFamily = Bytes.toString(bytes);
hbaseAtlasHook.sendHBaseColumnFamilyOperation(null, tableName, columnFamily, HBaseAtlasHook.OPERATION.DELETE_COLUMN_FAMILY);
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasCoprocessor.postDeleteColumn()");
}
}
@Override
public void postCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> observerContext, NamespaceDescriptor namespaceDescriptor) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasCoprocessor.postCreateNamespace()");
}
hbaseAtlasHook.sendHBaseNameSpaceOperation(namespaceDescriptor, null, HBaseAtlasHook.OPERATION.CREATE_NAMESPACE);
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasCoprocessor.postCreateNamespace()");
}
}
@Override
public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> observerContext, String s) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasCoprocessor.postDeleteNamespace()");
}
hbaseAtlasHook.sendHBaseNameSpaceOperation(null, s, HBaseAtlasHook.OPERATION.DELETE_NAMESPACE);
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasCoprocessor.postDeleteNamespace()");
}
}
@Override
public void postModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> observerContext, NamespaceDescriptor namespaceDescriptor) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasCoprocessor.postModifyNamespace()");
}
hbaseAtlasHook.sendHBaseNameSpaceOperation(namespaceDescriptor, null, HBaseAtlasHook.OPERATION.ALTER_NAMESPACE);
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasCoprocessor.postModifyNamespace()");
}
}
}
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hbase.hook;
import java.io.IOException;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import com.google.common.collect.ImmutableList;
import org.apache.atlas.hook.AtlasHook;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.coprocessor.*;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.protobuf.generated.*;
import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.regionserver.*;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALKey;
/**
* This class exists only to prevent the clutter of methods that we don't intend to implement in the main co-processor class.
*
*/
public abstract class HBaseAtlasCoprocessorBase implements MasterObserver,RegionObserver,RegionServerObserver,BulkLoadObserver {
@Override
public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> observerContext, HTableDescriptor hTableDescriptor, HRegionInfo[] hRegionInfos) throws IOException {
}
@Override
public void preCreateTableHandler(ObserverContext<MasterCoprocessorEnvironment> observerContext, HTableDescriptor hTableDescriptor, HRegionInfo[] hRegionInfos) throws IOException {
}
@Override
public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName) throws IOException {
}
@Override
public void preDeleteTableHandler(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName) throws IOException {
}
@Override
public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName) throws IOException {
}
@Override
public void preTruncateTableHandler(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName) throws IOException {
}
@Override
public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName, HTableDescriptor hTableDescriptor) throws IOException {
}
@Override
public void preModifyTableHandler(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName, HTableDescriptor hTableDescriptor) throws IOException {
}
@Override
public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName, HColumnDescriptor hColumnDescriptor) throws IOException {
}
@Override
public void preAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName, HColumnDescriptor hColumnDescriptor) throws IOException {
}
@Override
public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName, HColumnDescriptor hColumnDescriptor) throws IOException {
}
@Override
public void preModifyColumnHandler(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName, HColumnDescriptor hColumnDescriptor) throws IOException {
}
@Override
public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName, byte[] bytes) throws IOException {
}
@Override
public void preDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName, byte[] bytes) throws IOException {
}
@Override
public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName) throws IOException {
}
@Override
public void preEnableTableHandler(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName) throws IOException {
}
@Override
public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName) throws IOException {
}
@Override
public void preDisableTableHandler(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName) throws IOException {
}
@Override
public void preMove(ObserverContext<MasterCoprocessorEnvironment> observerContext, HRegionInfo hRegionInfo, ServerName serverName, ServerName serverName1) throws IOException {
}
@Override
public void preListProcedures(ObserverContext<MasterCoprocessorEnvironment> observerContext) throws IOException {
}
@Override
public void preAssign(ObserverContext<MasterCoprocessorEnvironment> observerContext, HRegionInfo hRegionInfo) throws IOException {
}
@Override
public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> observerContext, HRegionInfo hRegionInfo, boolean b) throws IOException {
}
@Override
public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> observerContext, HRegionInfo hRegionInfo) throws IOException {
}
@Override
public void preBalance(ObserverContext<MasterCoprocessorEnvironment> observerContext) throws IOException {
}
@Override
public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> observerContext, boolean b) throws IOException {
return b;
}
@Override
public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> observerContext) throws IOException {
}
@Override
public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> observerContext) throws IOException {
}
@Override
public void preMasterInitialization(ObserverContext<MasterCoprocessorEnvironment> observerContext) throws IOException {
}
@Override
public void preSnapshot(ObserverContext<MasterCoprocessorEnvironment> observerContext, SnapshotDescription snapshotDescription, HTableDescriptor hTableDescriptor) throws IOException {
}
@Override
public void preListSnapshot(ObserverContext<MasterCoprocessorEnvironment> observerContext, SnapshotDescription snapshotDescription) throws IOException {
}
@Override
public void preCloneSnapshot(ObserverContext<MasterCoprocessorEnvironment> observerContext, SnapshotDescription snapshotDescription, HTableDescriptor hTableDescriptor) throws IOException {
}
@Override
public void preRestoreSnapshot(ObserverContext<MasterCoprocessorEnvironment> observerContext, SnapshotDescription snapshotDescription, HTableDescriptor hTableDescriptor) throws IOException {
}
@Override
public void preDeleteSnapshot(ObserverContext<MasterCoprocessorEnvironment> observerContext, SnapshotDescription snapshotDescription) throws IOException {
}
@Override
public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> observerContext, List<TableName> list, List<HTableDescriptor> list1) throws IOException {
}
@Override
public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> observerContext, List<TableName> list, List<HTableDescriptor> list1, String s) throws IOException {
}
@Override
public void preGetTableNames(ObserverContext<MasterCoprocessorEnvironment> observerContext, List<HTableDescriptor> list, String s) throws IOException {
}
@Override
public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> observerContext, NamespaceDescriptor namespaceDescriptor) throws IOException {
}
@Override
public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> observerContext, String s) throws IOException {
}
@Override
public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> observerContext, NamespaceDescriptor namespaceDescriptor) throws IOException {
}
@Override
public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> observerContext, String s) throws IOException {
}
@Override
public void preListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> observerContext, List<NamespaceDescriptor> list) throws IOException {
}
@Override
public void preTableFlush(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName) throws IOException {
}
@Override
public void preSetUserQuota(ObserverContext<MasterCoprocessorEnvironment> observerContext, String s, Quotas quotas) throws IOException {
}
@Override
public void preSetUserQuota(ObserverContext<MasterCoprocessorEnvironment> observerContext, String s, TableName tableName, Quotas quotas) throws IOException {
}
@Override
public void preSetUserQuota(ObserverContext<MasterCoprocessorEnvironment> observerContext, String s, String s1, Quotas quotas) throws IOException {
}
@Override
public void preSetTableQuota(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName, Quotas quotas) throws IOException {
}
@Override
public void preSetNamespaceQuota(ObserverContext<MasterCoprocessorEnvironment> observerContext, String s, Quotas quotas) throws IOException {
}
@Override
public void start(CoprocessorEnvironment coprocessorEnvironment) throws IOException {
}
@Override
public void stop(CoprocessorEnvironment coprocessorEnvironment) throws IOException {
}
@Override
public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> observerContext, List<HTableDescriptor> list) throws IOException {
}
@Override
public void postBalance(ObserverContext<MasterCoprocessorEnvironment> observerContext, List<RegionPlan> list) throws IOException {
}
@Override
public void postBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> observerContext, boolean b, boolean b1) throws IOException {
}
@Override
public void postGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> observerContext, NamespaceDescriptor namespaceDescriptor) throws IOException {
}
@Override
public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> observerContext) throws IOException {
}
@Override
public void postSnapshot(ObserverContext<MasterCoprocessorEnvironment> observerContext, SnapshotDescription snapshotDescription, HTableDescriptor hTableDescriptor) throws IOException {
}
@Override
public void postSetNamespaceQuota(ObserverContext<MasterCoprocessorEnvironment> observerContext, String s, Quotas quotas) throws IOException {
}
@Override
public void postAbortProcedure(ObserverContext<MasterCoprocessorEnvironment> observerContext) throws IOException {
}
@Override
public void postListProcedures(ObserverContext<MasterCoprocessorEnvironment> observerContext, List<ProcedureInfo> list) throws IOException {
}
@Override
public void postCreateTableHandler(ObserverContext<MasterCoprocessorEnvironment> observerContext, HTableDescriptor hTableDescriptor, HRegionInfo[] hRegionInfos) throws IOException {
}
@Override
public void postDeleteTableHandler(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName) throws IOException {
}
@Override
public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName) throws IOException {
}
@Override
public void postTruncateTableHandler(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName) throws IOException {
}
@Override
public void postModifyTableHandler(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName, HTableDescriptor hTableDescriptor) throws IOException {
}
@Override
public void postAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName, HColumnDescriptor hColumnDescriptor) throws IOException {
}
@Override
public void postModifyColumnHandler(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName, HColumnDescriptor hColumnDescriptor) throws IOException {
}
@Override
public void postDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName, byte[] bytes) throws IOException {
}
@Override
public void postEnableTable(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName) throws IOException {
}
@Override
public void postEnableTableHandler(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName) throws IOException {
}
@Override
public void postDisableTable(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName) throws IOException {
}
@Override
public void postDisableTableHandler(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName) throws IOException {
}
@Override
public void postMove(ObserverContext<MasterCoprocessorEnvironment> observerContext, HRegionInfo hRegionInfo, ServerName serverName, ServerName serverName1) throws IOException {
}
@Override
public void postAssign(ObserverContext<MasterCoprocessorEnvironment> observerContext, HRegionInfo hRegionInfo) throws IOException {
}
@Override
public void postUnassign(ObserverContext<MasterCoprocessorEnvironment> observerContext, HRegionInfo hRegionInfo, boolean b) throws IOException {
}
@Override
public void postRegionOffline(ObserverContext<MasterCoprocessorEnvironment> observerContext, HRegionInfo hRegionInfo) throws IOException {
}
@Override
public void postListSnapshot(ObserverContext<MasterCoprocessorEnvironment> observerContext, HBaseProtos.SnapshotDescription snapshotDescription) throws IOException {
}
@Override
public void postCloneSnapshot(ObserverContext<MasterCoprocessorEnvironment> observerContext, HBaseProtos.SnapshotDescription snapshotDescription, HTableDescriptor hTableDescriptor) throws IOException {
}
@Override
public void postRestoreSnapshot(ObserverContext<MasterCoprocessorEnvironment> observerContext, HBaseProtos.SnapshotDescription snapshotDescription, HTableDescriptor hTableDescriptor) throws IOException {
}
@Override
public void postDeleteSnapshot(ObserverContext<MasterCoprocessorEnvironment> observerContext, HBaseProtos.SnapshotDescription snapshotDescription) throws IOException {
}
@Override
public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> observerContext, List<TableName> list, List<HTableDescriptor> list1, String s) throws IOException {
}
@Override
public void postGetTableNames(ObserverContext<MasterCoprocessorEnvironment> observerContext, List<HTableDescriptor> list, String s) throws IOException {
}
@Override
public void postListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> observerContext, List<NamespaceDescriptor> list) throws IOException {
}
@Override
public void postTableFlush(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName) throws IOException {
}
@Override
public void postSetUserQuota(ObserverContext<MasterCoprocessorEnvironment> observerContext, String s, QuotaProtos.Quotas quotas) throws IOException {
}
@Override
public void postSetUserQuota(ObserverContext<MasterCoprocessorEnvironment> observerContext, String s, TableName tableName, QuotaProtos.Quotas quotas) throws IOException {
}
@Override
public void postSetUserQuota(ObserverContext<MasterCoprocessorEnvironment> observerContext, String s, String s1, QuotaProtos.Quotas quotas) throws IOException {
}
@Override
public void postSetTableQuota(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName, QuotaProtos.Quotas quotas) throws IOException {
}
@Override
public void preOpen(ObserverContext<RegionCoprocessorEnvironment> observerContext) throws IOException {
}
@Override
public void postOpen(ObserverContext<RegionCoprocessorEnvironment> observerContext) {
}
@Override
public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> observerContext) {
}
@Override
public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, KeyValueScanner keyValueScanner, InternalScanner internalScanner) throws IOException {
return null;
}
@Override
public void preFlush(ObserverContext<RegionCoprocessorEnvironment> observerContext) throws IOException {
}
@Override
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, InternalScanner internalScanner) throws IOException {
return internalScanner;
}
@Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> observerContext) throws IOException {
}
@Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, StoreFile storeFile) throws IOException {
}
@Override
public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, List<StoreFile> list, CompactionRequest compactionRequest) throws IOException {
}
@Override
public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, List<StoreFile> list) throws IOException {
}
@Override
public void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, ImmutableList<StoreFile> immutableList, CompactionRequest compactionRequest) {
}
@Override
public void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, ImmutableList<StoreFile> immutableList) {
}
@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, InternalScanner internalScanner, ScanType scanType, CompactionRequest compactionRequest) throws IOException {
return internalScanner;
}
@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, InternalScanner internalScanner, ScanType scanType) throws IOException {
return internalScanner;
}
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, List<? extends KeyValueScanner> list, ScanType scanType, long l, InternalScanner internalScanner, CompactionRequest compactionRequest) throws IOException {
return internalScanner;
}
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, List<? extends KeyValueScanner> list, ScanType scanType, long l, InternalScanner internalScanner) throws IOException {
return internalScanner;
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, StoreFile storeFile, CompactionRequest compactionRequest) throws IOException {
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, StoreFile storeFile) throws IOException {
}
@Override
public void preSplit(ObserverContext<RegionCoprocessorEnvironment> observerContext) throws IOException {
}
@Override
public void preSplit(ObserverContext<RegionCoprocessorEnvironment> observerContext, byte[] bytes) throws IOException {
}
@Override
public void postSplit(ObserverContext<RegionCoprocessorEnvironment> observerContext, Region region, Region region1) throws IOException {
}
@Override
public void preSplitBeforePONR(ObserverContext<RegionCoprocessorEnvironment> observerContext, byte[] bytes, List<Mutation> list) throws IOException {
}
@Override
public void preSplitAfterPONR(ObserverContext<RegionCoprocessorEnvironment> observerContext) throws IOException {
}
@Override
public void preRollBackSplit(ObserverContext<RegionCoprocessorEnvironment> observerContext) throws IOException {
}
@Override
public void postRollBackSplit(ObserverContext<RegionCoprocessorEnvironment> observerContext) throws IOException {
}
@Override
public void postCompleteSplit(ObserverContext<RegionCoprocessorEnvironment> observerContext) throws IOException {
}
@Override
public void preClose(ObserverContext<RegionCoprocessorEnvironment> observerContext, boolean b) throws IOException {
}
@Override
public void postClose(ObserverContext<RegionCoprocessorEnvironment> observerContext, boolean b) {
}
@Override
public void preGetClosestRowBefore(ObserverContext<RegionCoprocessorEnvironment> observerContext, byte[] bytes, byte[] bytes1, Result result) throws IOException {
}
@Override
public void postGetClosestRowBefore(ObserverContext<RegionCoprocessorEnvironment> observerContext, byte[] bytes, byte[] bytes1, Result result) throws IOException {
}
@Override
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> observerContext, Get get, List<Cell> list) throws IOException {
}
@Override
public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> observerContext, Get get, List<Cell> list) throws IOException {
}
@Override
public boolean preExists(ObserverContext<RegionCoprocessorEnvironment> observerContext, Get get, boolean b) throws IOException {
return b;
}
@Override
public boolean postExists(ObserverContext<RegionCoprocessorEnvironment> observerContext, Get get, boolean b) throws IOException {
return b;
}
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> observerContext, Put put, WALEdit walEdit, Durability durability) throws IOException {
}
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> observerContext, Put put, WALEdit walEdit, Durability durability) throws IOException {
}
@Override
public void preDelete(ObserverContext<RegionCoprocessorEnvironment> observerContext, Delete delete, WALEdit walEdit, Durability durability) throws IOException {
}
@Override
public void prePrepareTimeStampForDeleteVersion(ObserverContext<RegionCoprocessorEnvironment> observerContext, Mutation mutation, Cell cell, byte[] bytes, Get get) throws IOException {
}
@Override
public void postDelete(ObserverContext<RegionCoprocessorEnvironment> observerContext, Delete delete, WALEdit walEdit, Durability durability) throws IOException {
}
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> observerContext, MiniBatchOperationInProgress<Mutation> miniBatchOperationInProgress) throws IOException {
}
@Override
public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> observerContext, MiniBatchOperationInProgress<Mutation> miniBatchOperationInProgress) throws IOException {
}
@Override
public void postStartRegionOperation(ObserverContext<RegionCoprocessorEnvironment> observerContext, Region.Operation operation) throws IOException {
}
@Override
public void postCloseRegionOperation(ObserverContext<RegionCoprocessorEnvironment> observerContext, Region.Operation operation) throws IOException {
}
@Override
public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> observerContext, MiniBatchOperationInProgress<Mutation> miniBatchOperationInProgress, boolean b) throws IOException {
}
@Override
public boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> observerContext, byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp, ByteArrayComparable byteArrayComparable, Put put, boolean b) throws IOException {
return b;
}
@Override
public boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> observerContext, byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp, ByteArrayComparable byteArrayComparable, Put put, boolean b) throws IOException {
return false;
}
@Override
public boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> observerContext, byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp, ByteArrayComparable byteArrayComparable, Put put, boolean b) throws IOException {
return b;
}
@Override
public boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> observerContext, byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp, ByteArrayComparable byteArrayComparable, Delete delete, boolean b) throws IOException {
return b;
}
@Override
public boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> observerContext, byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp, ByteArrayComparable byteArrayComparable, Delete delete, boolean b) throws IOException {
return b;
}
@Override
public boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> observerContext, byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp, ByteArrayComparable byteArrayComparable, Delete delete, boolean b) throws IOException {
return false;
}
@Override
public long preIncrementColumnValue(ObserverContext<RegionCoprocessorEnvironment> observerContext, byte[] bytes, byte[] bytes1, byte[] bytes2, long l, boolean b) throws IOException {
return l;
}
@Override
public long postIncrementColumnValue(ObserverContext<RegionCoprocessorEnvironment> observerContext, byte[] bytes, byte[] bytes1, byte[] bytes2, long l, boolean b, long l1) throws IOException {
return l;
}
@Override
public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> observerContext, Append append) throws IOException {
return null;
}
@Override
public Result preAppendAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> observerContext, Append append) throws IOException {
return null;
}
@Override
public Result postAppend(ObserverContext<RegionCoprocessorEnvironment> observerContext, Append append, Result result) throws IOException {
return result;
}
@Override
public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> observerContext, Increment increment) throws IOException {
return null;
}
@Override
public Result preIncrementAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> observerContext, Increment increment) throws IOException {
return null;
}
@Override
public Result postIncrement(ObserverContext<RegionCoprocessorEnvironment> observerContext, Increment increment, Result result) throws IOException {
return result;
}
@Override
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> observerContext, Scan scan, RegionScanner regionScanner) throws IOException {
return regionScanner;
}
@Override
public KeyValueScanner preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, Scan scan, NavigableSet<byte[]> navigableSet, KeyValueScanner keyValueScanner) throws IOException {
return keyValueScanner;
}
@Override
public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> observerContext, Scan scan, RegionScanner regionScanner) throws IOException {
return regionScanner;
}
@Override
public boolean preScannerNext(ObserverContext<RegionCoprocessorEnvironment> observerContext, InternalScanner internalScanner, List<Result> list, int i, boolean b) throws IOException {
return b;
}
@Override
public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> observerContext, InternalScanner internalScanner, List<Result> list, int i, boolean b) throws IOException {
return b;
}
@Override
public boolean postScannerFilterRow(ObserverContext<RegionCoprocessorEnvironment> observerContext, InternalScanner internalScanner, byte[] bytes, int i, short i1, boolean b) throws IOException {
return b;
}
@Override
public void preScannerClose(ObserverContext<RegionCoprocessorEnvironment> observerContext, InternalScanner internalScanner) throws IOException {
}
@Override
public void postScannerClose(ObserverContext<RegionCoprocessorEnvironment> observerContext, InternalScanner internalScanner) throws IOException {
}
@Override
public void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> observerContext, HRegionInfo hRegionInfo, WALKey walKey, WALEdit walEdit) throws IOException {
}
@Override
public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> observerContext, HRegionInfo hRegionInfo, HLogKey hLogKey, WALEdit walEdit) throws IOException {
}
@Override
public void postWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> observerContext, HRegionInfo hRegionInfo, WALKey walKey, WALEdit walEdit) throws IOException {
}
@Override
public void postWALRestore(ObserverContext<RegionCoprocessorEnvironment> observerContext, HRegionInfo hRegionInfo, HLogKey hLogKey, WALEdit walEdit) throws IOException {
}
@Override
public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> observerContext, List<Pair<byte[], String>> list) throws IOException {
}
@Override
public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> observerContext, List<Pair<byte[], String>> list, boolean b) throws IOException {
return b;
}
@Override
public StoreFile.Reader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> observerContext, FileSystem fileSystem, Path path, FSDataInputStreamWrapper fsDataInputStreamWrapper, long l, CacheConfig cacheConfig, Reference reference, StoreFile.Reader reader) throws IOException {
return reader;
}
@Override
public StoreFile.Reader postStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> observerContext, FileSystem fileSystem, Path path, FSDataInputStreamWrapper fsDataInputStreamWrapper, long l, CacheConfig cacheConfig, Reference reference, StoreFile.Reader reader) throws IOException {
return reader;
}
@Override
public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> observerContext, MutationType mutationType, Mutation mutation, Cell cell, Cell cell1) throws IOException {
return cell;
}
@Override
public DeleteTracker postInstantiateDeleteTracker(ObserverContext<RegionCoprocessorEnvironment> observerContext, DeleteTracker deleteTracker) throws IOException {
return deleteTracker;
}
@Override
public void preStopRegionServer(ObserverContext<RegionServerCoprocessorEnvironment> observerContext) throws IOException {
}
@Override
public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> observerContext, Region region, Region region1) throws IOException {
}
@Override
public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> observerContext, Region region, Region region1, Region region2) throws IOException {
}
@Override
public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> observerContext, Region region, Region region1, @MetaMutationAnnotation List<Mutation> list) throws IOException {
}
@Override
public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> observerContext, Region region, Region region1, Region region2) throws IOException {
}
@Override
public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> observerContext, Region region, Region region1) throws IOException {
}
@Override
public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> observerContext, Region region, Region region1) throws IOException {
}
@Override
public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> observerContext) throws IOException {
}
@Override
public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> observerContext) throws IOException {
}
@Override
public ReplicationEndpoint postCreateReplicationEndPoint(ObserverContext<RegionServerCoprocessorEnvironment> observerContext, ReplicationEndpoint replicationEndpoint) {
return null;
}
@Override
public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> observerContext, List<AdminProtos.WALEntry> list, CellScanner cellScanner) throws IOException {
}
@Override
public void postReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> observerContext, List<AdminProtos.WALEntry> list, CellScanner cellScanner) throws IOException {
}
@Override
public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> observerContext, SecureBulkLoadProtos.PrepareBulkLoadRequest prepareBulkLoadRequest) throws IOException {
}
@Override
public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> observerContext, SecureBulkLoadProtos.CleanupBulkLoadRequest cleanupBulkLoadRequest) throws IOException {
}
@Override
public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> observerContext, HTableDescriptor hTableDescriptor, HRegionInfo[] hRegionInfos) throws IOException {
}
@Override
public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName) throws IOException {
}
@Override
public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName, HTableDescriptor hTableDescriptor) throws IOException {
}
@Override
public void postAddColumn(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName, HColumnDescriptor hColumnDescriptor) throws IOException {
}
@Override
public void postModifyColumn(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName, HColumnDescriptor hColumnDescriptor) throws IOException {
}
@Override
public void postDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName, byte[] bytes) throws IOException {
}
@Override
public void postCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> observerContext, NamespaceDescriptor namespaceDescriptor) throws IOException {
}
@Override
public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> observerContext, String s) throws IOException {
}
@Override
public void postModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> observerContext, NamespaceDescriptor namespaceDescriptor) throws IOException {
}
@Override
public void preAbortProcedure(ObserverContext<MasterCoprocessorEnvironment> observerContext, ProcedureExecutor<MasterProcedureEnv> procedureExecutor, long l) throws IOException {
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hbase.model;
/**
* HBASE Data Types for model and bridge.
*/
public enum HBaseDataTypes {
// Classes
HBASE_NAMESPACE,
HBASE_TABLE,
HBASE_COLUMN_FAMILY,
HBASE_COLUMN;
public String getName() {
return name().toLowerCase();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hbase.model;
import org.apache.atlas.hbase.bridge.HBaseAtlasHook;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.security.UserGroupInformation;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class HBaseOperationContext {
private final UserGroupInformation ugi;
private final Map<String, String> hbaseConf;
private final HBaseAtlasHook.OPERATION operation;
private final String user;
private final NamespaceDescriptor namespaceDescriptor;
private final HTableDescriptor hTableDescriptor;
private final HColumnDescriptor[] hColumnDescriptors;
private final TableName tableName;
private final String nameSpace;
private final String columnFamily;
private final String owner;
private final HColumnDescriptor hColumnDescriptor;
public HBaseOperationContext(NamespaceDescriptor namespaceDescriptor, String nameSpace, HTableDescriptor hTableDescriptor, TableName tableName, HColumnDescriptor[] hColumnDescriptors,
HColumnDescriptor hColumnDescriptor, String columnFamily, HBaseAtlasHook.OPERATION operation, UserGroupInformation ugi , String user, String owner,
Map<String, String> hbaseConf) {
this.namespaceDescriptor = namespaceDescriptor;
this.nameSpace = nameSpace;
this.hTableDescriptor = hTableDescriptor;
this.tableName = tableName;
this.hColumnDescriptors = hColumnDescriptors;
this.hColumnDescriptor = hColumnDescriptor;
this.columnFamily = columnFamily;
this.operation = operation;
this.ugi = ugi;
this.user = user;
this.owner = owner;
this.hbaseConf = hbaseConf;
}
public HBaseOperationContext(NamespaceDescriptor namespaceDescriptor, String nameSpace, HBaseAtlasHook.OPERATION operation, UserGroupInformation ugi , String user, String owner) {
this(namespaceDescriptor, nameSpace, null, null, null, null, null, operation, ugi, user, owner, null);
}
public HBaseOperationContext(String nameSpace, HTableDescriptor hTableDescriptor, TableName tableName, HColumnDescriptor[] hColumnDescriptor, HBaseAtlasHook.OPERATION operation, UserGroupInformation ugi, String user, String owner, Map<String,String> hbaseConf) {
this(null, nameSpace, hTableDescriptor, tableName, hColumnDescriptor, null, null, operation, ugi, user, owner, hbaseConf);
}
public HBaseOperationContext(String nameSpace, TableName tableName, HColumnDescriptor hColumnDescriptor, String columnFamily, HBaseAtlasHook.OPERATION operation, UserGroupInformation ugi, String user, String owner, Map<String,String> hbaseConf) {
this(null, nameSpace, null, tableName, null, hColumnDescriptor, columnFamily, operation, ugi, user, owner, hbaseConf);
}
private List<HookNotification.HookNotificationMessage> messages = new ArrayList<>();
public UserGroupInformation getUgi() {
return ugi;
}
public Map<String, String> getHbaseConf() {
return hbaseConf;
}
public String getUser() {
return user;
}
public HBaseAtlasHook.OPERATION getOperation() {
return operation;
}
public NamespaceDescriptor getNamespaceDescriptor() {
return namespaceDescriptor;
}
public HTableDescriptor gethTableDescriptor() {
return hTableDescriptor;
}
public HColumnDescriptor[] gethColumnDescriptors() {
return hColumnDescriptors;
}
public TableName getTableName() {
return tableName;
}
public String getNameSpace() {
return nameSpace;
}
public HColumnDescriptor gethColumnDescriptor() {
return hColumnDescriptor;
}
public String getColummFamily() {
return columnFamily;
}
public void addMessage(HookNotification.HookNotificationMessage message) {
messages.add(message);
}
public String getOwner() {
return owner;
}
public List<HookNotification.HookNotificationMessage> getMessages() {
return messages;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
toString(sb);
return sb.toString();
}
public StringBuilder toString(StringBuilder sb) {
sb.append("HBaseOperationContext={");
sb.append("Operation={").append(operation).append("} ");
sb.append("User ={").append(user).append("} ");
if (nameSpace != null ) {
sb.append("NameSpace={").append(nameSpace).append("}");
} else {
if (namespaceDescriptor != null) {
sb.append("NameSpace={").append(namespaceDescriptor.toString()).append("}");
}
}
if (tableName != null ) {
sb.append("Table={").append(tableName).append("}");
} else {
if ( hColumnDescriptor != null) {
sb.append("Table={").append(hTableDescriptor.toString()).append("}");
}
}
if (columnFamily != null ) {
sb.append("Columm Family={").append(columnFamily).append("}");
} else {
if ( hColumnDescriptor != null) {
sb.append("Columm Family={").append(hColumnDescriptor.toString()).append("}");
}
}
sb.append("Message ={").append(getMessages()).append("} ");
sb.append(" }");
return sb;
}
}
<?xml version="1.0" encoding="UTF-8" ?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="console" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/>
</layout>
</appender>
<appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
<param name="File" value="${atlas.log.dir}/${atlas.log.file}"/>
<param name="Append" value="true"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/>
</layout>
</appender>
<logger name="org.apache.atlas" additivity="false">
<level value="info"/>
<appender-ref ref="FILE"/>
</logger>
<!-- to avoid logs - The configuration log.flush.interval.messages = 1 was supplied but isn't a known config -->
<logger name="org.apache.kafka.common.config.AbstractConfig" additivity="false">
<level value="error"/>
<appender-ref ref="FILE"/>
</logger>
<root>
<priority value="info"/>
<appender-ref ref="FILE"/>
</root>
</log4j:configuration>
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hbase;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.hbase.bridge.HBaseAtlasHook;
import org.apache.atlas.hbase.model.HBaseDataTypes;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.atlas.utils.ParamChecker;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.Iterator;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
public class HBaseAtlasHookIT {
private static final Logger LOG = LoggerFactory.getLogger(HBaseAtlasHookIT.class);
protected static final String DGI_URL = "http://localhost:31000/";
protected static final String CLUSTER_NAME = "primary";
private static HBaseTestingUtility utility;
private static int port;
private static AtlasClient atlasClient;
@BeforeClass
public void setUp() {
try {
createHBaseCluster();
createAtlasClient();
} catch (Exception e) {
LOG.error("Unable to create Hbase Admin for Testing ", e);
}
}
@AfterClass
public static void cleanup() throws Exception {
LOG.info(" Stopping mini cluster.. ");
utility.shutdownMiniCluster();
}
@Test
public void testCreateNamesapce() throws Exception {
final Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "localhost");
conf.set("hbase.zookeeper.property.clientPort", String.valueOf(port));
conf.set("zookeeper.znode.parent", "/hbase-unsecure");
Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin();
NamespaceDescriptor ns = NamespaceDescriptor.create("test_namespace").build();
admin.createNamespace(ns);
String nameSpace = assertNameSpaceIsRegistered(ns.getName());
//assert on qualified name
Referenceable nameSpaceRef = getAtlasClient().getEntity(nameSpace);
String nameSpaceQualifiedName = HBaseAtlasHook.getNameSpaceQualifiedName(CLUSTER_NAME, ns.getName());
Assert.assertEquals(nameSpaceRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), nameSpaceQualifiedName);
}
@Test
public void testCreateTable() throws Exception {
final Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "localhost");
conf.set("hbase.zookeeper.property.clientPort", String.valueOf(port));
conf.set("zookeeper.znode.parent", "/hbase-unsecure");
Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin();
String namespace = "test_namespace1";
String tablename = "test_table";
// Create a table
if (!admin.tableExists(TableName.valueOf(namespace, tablename))) {
NamespaceDescriptor ns = NamespaceDescriptor.create(namespace).build();
admin.createNamespace(ns);
HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(namespace, tablename));
tableDescriptor.addFamily(new HColumnDescriptor("colfam1"));
admin.createTable(tableDescriptor);
}
String table = assertTableIsRegistered(namespace, tablename);
//assert on qualified name
Referenceable tableRef = getAtlasClient().getEntity(table);
String entityName = HBaseAtlasHook.getTableQualifiedName(CLUSTER_NAME, namespace, tablename);
Assert.assertEquals(tableRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), entityName);
}
// Methods for creating HBase
public static void createAtlasClient() {
try {
org.apache.commons.configuration.Configuration configuration = ApplicationProperties.get();
String[] atlasEndPoint = configuration.getStringArray(HBaseAtlasHook.ATTR_ATLAS_ENDPOINT);
configuration.setProperty("atlas.cluster.name", CLUSTER_NAME);
if (atlasEndPoint == null || atlasEndPoint.length == 0) {
atlasEndPoint = new String[]{DGI_URL};
}
Iterator<String> keys = configuration.getKeys();
while (keys.hasNext()) {
String key = keys.next();
LOG.info("{} = {} ", key, configuration.getString(key));
}
if (AuthenticationUtil.isKerberosAuthenticationEnabled()) {
atlasClient = new AtlasClient(configuration, atlasEndPoint);
} else {
atlasClient = new AtlasClient(configuration, atlasEndPoint, new String[]{"admin", "admin"});
}
} catch (Exception e) {
LOG.error("Unable to create AtlasClient for Testing ", e);
}
}
private static int getFreePort() throws IOException {
ServerSocket serverSocket = new ServerSocket(0);
int port = serverSocket.getLocalPort();
serverSocket.close();
return port;
}
public static void createHBaseCluster() throws Exception {
LOG.info("Creating Hbase Admin...");
port = getFreePort();
utility = new HBaseTestingUtility();
utility.getConfiguration().set("test.hbase.zookeeper.property.clientPort", String.valueOf(port));
utility.getConfiguration().set("hbase.master.port", String.valueOf(getFreePort()));
utility.getConfiguration().set("hbase.master.info.port", String.valueOf(getFreePort()));
utility.getConfiguration().set("hbase.regionserver.port", String.valueOf(getFreePort()));
utility.getConfiguration().set("hbase.regionserver.info.port", String.valueOf(getFreePort()));
utility.getConfiguration().set("zookeeper.znode.parent", "/hbase-unsecure");
utility.getConfiguration().set("hbase.table.sanity.checks", "false");
utility.getConfiguration().set("hbase.coprocessor.master.classes",
"org.apache.atlas.hbase.hook.HBaseAtlasCoprocessor");
utility.startMiniCluster();
}
public AtlasClient getAtlasClient() {
AtlasClient ret = null;
if (atlasClient != null) {
ret = atlasClient;
}
return ret;
}
protected String assertNameSpaceIsRegistered(String nameSpace) throws Exception {
return assertNameSpaceIsRegistered(nameSpace, null);
}
protected String assertNameSpaceIsRegistered(String nameSpace, HBaseAtlasHookIT.AssertPredicate assertPredicate) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("Searching for nameSpace {}", nameSpace);
}
String nameSpaceQualifiedName = HBaseAtlasHook.getNameSpaceQualifiedName(CLUSTER_NAME, nameSpace);
return assertEntityIsRegistered(HBaseDataTypes.HBASE_NAMESPACE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
nameSpaceQualifiedName, assertPredicate);
}
protected String assertTableIsRegistered(String nameSpace, String tableName) throws Exception {
return assertTableIsRegistered(nameSpace, tableName, null);
}
protected String assertTableIsRegistered(String nameSpace, String tableName, HBaseAtlasHookIT.AssertPredicate assertPredicate) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("Searching for nameSpace:Table {} {}", nameSpace, tableName);
}
String tableQualifiedName = HBaseAtlasHook.getTableQualifiedName(CLUSTER_NAME, nameSpace, tableName);
return assertEntityIsRegistered(HBaseDataTypes.HBASE_TABLE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName,
assertPredicate);
}
public interface AssertPredicate {
void assertOnEntity(Referenceable entity) throws Exception;
}
public interface Predicate {
/**
* Perform a predicate evaluation.
*
* @return the boolean result of the evaluation.
* @throws Exception thrown if the predicate evaluation could not evaluate.
*/
void evaluate() throws Exception;
}
protected String assertEntityIsRegistered(final String typeName, final String property, final String value,
final HBaseAtlasHookIT.AssertPredicate assertPredicate) throws Exception {
waitFor(80000, new HBaseAtlasHookIT.Predicate() {
@Override
public void evaluate() throws Exception {
Referenceable entity = atlasClient.getEntity(typeName, property, value);
assertNotNull(entity);
if (assertPredicate != null) {
assertPredicate.assertOnEntity(entity);
}
}
});
Referenceable entity = atlasClient.getEntity(typeName, property, value);
return entity.getId()._getId();
}
/**
* Wait for a condition, expressed via a {@link HBaseAtlasHookIT.Predicate} to become true.
*
* @param timeout maximum time in milliseconds to wait for the predicate to become true.
* @param predicate predicate waiting on.
*/
protected void waitFor(int timeout, HBaseAtlasHookIT.Predicate predicate) throws Exception {
ParamChecker.notNull(predicate, "predicate");
long mustEnd = System.currentTimeMillis() + timeout;
while (true) {
try {
predicate.evaluate();
return;
} catch (Error | Exception e) {
if (System.currentTimeMillis() >= mustEnd) {
fail("Assertions failed. Failing after waiting for timeout " + timeout + " msecs", e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Waiting up to {} msec as assertion failed", mustEnd - System.currentTimeMillis(), e);
}
Thread.sleep(5000);
}
}
}
}
......@@ -4,6 +4,47 @@
"classificationDefs": [],
"entityDefs": [
{
"name": "hbase_namespace",
"superTypes": [
"DataSet"
],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "clusterName",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "parameters",
"typeName": "map<string,string>",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "createTime",
"typeName": "date",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "modifiedTime",
"typeName": "date",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
}
]
},
{
"name": "hbase_table",
"superTypes": [
"DataSet"
......
{
"patches": [
{
"action": "ADD_ATTRIBUTE",
"typeName": "hbase_table",
"applyToVersion": "1.1",
"updateToVersion": "1.2",
"params": null,
"attributeDefs": [
{
"name": "namespace",
"typeName": "hbase_namespace",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "parameters",
"typeName": "map<string,string>",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "createTime",
"typeName": "date",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "modifiedTime",
"typeName": "date",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
}
]
},
{
"action": "ADD_ATTRIBUTE",
"typeName": "hbase_column_family",
"applyToVersion": "1.0",
"updateToVersion": "1.1",
"params": null,
"attributeDefs": [
{
"name": "createTime",
"typeName": "date",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "modifiedTime",
"typeName": "date",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
}
]
}
]
}
\ No newline at end of file
......@@ -130,6 +130,17 @@
<outputDirectory>hook</outputDirectory>
</fileSet>
<!-- addons/hbase -->
<fileSet>
<directory>../addons/hbase-bridge/target/dependency/bridge</directory>
<outputDirectory>bridge</outputDirectory>
</fileSet>
<fileSet>
<directory>../addons/hbase-bridge/target/dependency/hook</directory>
<outputDirectory>hook</outputDirectory>
</fileSet>
<!-- addons/falcon -->
<fileSet>
<directory>../addons/falcon-bridge/target/dependency/hook</directory>
......
......@@ -679,7 +679,8 @@
<module>addons/sqoop-bridge</module>
<module>addons/storm-bridge-shim</module>
<module>addons/storm-bridge</module>
<module>addons/hbase-bridge-shim</module>
<module>addons/hbase-bridge</module>
<module>distro</module>
<module>build-tools</module>
</modules>
......@@ -1519,6 +1520,19 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>hbase-bridge</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>hbase-bridge-shim</artifactId>
<version>${project.version}</version>
</dependency>
<!--Scala dependencies-->
<dependency>
<groupId>org.scala-lang</groupId>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment