diff --git a/.gitignore b/.gitignore index 2a53de2..90ca276 100755 --- a/.gitignore +++ b/.gitignore @@ -37,7 +37,7 @@ pom.xml.releaseBackup maven-eclipse.xml #binary files -**/bin/** +#**/bin/** !distro/src/bin/** #log files diff --git a/addons/hbase-bridge/pom.xml b/addons/hbase-bridge/pom.xml index 76be506..dd5979a 100644 --- a/addons/hbase-bridge/pom.xml +++ b/addons/hbase-bridge/pom.xml @@ -61,6 +61,11 @@ <dependency> <groupId>org.apache.atlas</groupId> + <artifactId>atlas-client-v2</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.atlas</groupId> <artifactId>atlas-notification</artifactId> </dependency> @@ -247,6 +252,11 @@ </artifactItem> <artifactItem> <groupId>${project.groupId}</groupId> + <artifactId>atlas-client-v2</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>${project.groupId}</groupId> <artifactId>atlas-intg</artifactId> <version>${project.version}</version> </artifactItem> @@ -275,6 +285,41 @@ <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </artifactItem> + <artifactItem> + <groupId>com.sun.jersey.contribs</groupId> + <artifactId>jersey-multipart</artifactId> + <version>${jersey.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + </artifactItem> + <artifactItem> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.version}</version> + </artifactItem> + <artifactItem> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>${jackson.version}</version> + </artifactItem> + <artifactItem> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + <version>${jackson.version}</version> + </artifactItem> + <artifactItem> + <groupId>commons-configuration</groupId> + <artifactId>commons-configuration</artifactId> + <version>${commons-conf.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-common</artifactId> + <version>${hbase.version}</version> + </artifactItem> </artifactItems> </configuration> </execution> diff --git a/addons/hbase-bridge/src/bin/import-hbase.sh b/addons/hbase-bridge/src/bin/import-hbase.sh new file mode 100644 index 0000000..07a3af4 --- /dev/null +++ b/addons/hbase-bridge/src/bin/import-hbase.sh @@ -0,0 +1,141 @@ +#!/bin/bash +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. See accompanying LICENSE file. +# + +# resolve links - $0 may be a softlink +PRG="${0}" + +[[ `uname -s` == *"CYGWIN"* ]] && CYGWIN=true + +while [ -h "${PRG}" ]; do + ls=`ls -ld "${PRG}"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "${PRG}"`/"$link" + fi +done + +echo ">>>>> $PRG" + +BASEDIR=`dirname ${PRG}` +BASEDIR=`cd ${BASEDIR}/..;pwd` + +echo ">>>>> $BASEDIR" + +allargs=$@ + +if test -z "${JAVA_HOME}" +then + JAVA_BIN=`which java` + JAR_BIN=`which jar` +else + JAVA_BIN="${JAVA_HOME}/bin/java" + JAR_BIN="${JAVA_HOME}/bin/jar" +fi +export JAVA_BIN + +if [ ! -e "${JAVA_BIN}" ] || [ ! -e "${JAR_BIN}" ]; then + echo "$JAVA_BIN and/or $JAR_BIN not found on the system. Please make sure java and jar commands are available." + exit 1 +fi + +# Construct Atlas classpath using jars from hook/hbase/atlas-hbase-plugin-impl/ directory. +for i in "${BASEDIR}/hook/hbase/atlas-hbase-plugin-impl/"*.jar; do + ATLASCPPATH="${ATLASCPPATH}:$i" +done + +# log dir for applications +ATLAS_LOG_DIR="${ATLAS_LOG_DIR:-$BASEDIR/logs}" +export ATLAS_LOG_DIR +LOGFILE="$ATLAS_LOG_DIR/import-hbase.log" + +TIME=`date +%Y%m%d%H%M%s` + +#Add HBase conf in classpath +if [ ! -z "$HBASE_CONF_DIR" ]; then + HBASE_CONF=$HBASE_CONF_DIR +elif [ ! -z "$HBASE_HOME" ]; then + HBASE_CONF="$HBASE_HOME/conf" +elif [ -e /etc/hbase/conf ]; then + HBASE_CONF="/etc/hbase/conf" +else + echo "Could not find a valid HBASE configuration" + exit 1 +fi + +echo Using HBase configuration directory "[$HBASE_CONF]" + + +if [ -f "${HBASE_CONF}/hbase-env.sh" ]; then + . "${HBASE_CONF}/hbase-env.sh" +fi + +if [ -z "$HBASE_HOME" ]; then + if [ -d "${BASEDIR}/../hbase" ]; then + HBASE_HOME=${BASEDIR}/../hbase + else + echo "Please set HBASE_HOME to the root of HBase installation" + exit 1 + fi +fi + +HBASE_CP="${HBASE_CONF}" + +for i in "${HBASE_HOME}/lib/"*.jar; do + HBASE_CP="${HBASE_CP}:$i" +done + +#Add hadoop conf in classpath +if [ ! -z "$HADOOP_CLASSPATH" ]; then + HADOOP_CP=$HADOOP_CLASSPATH +elif [ ! -z "$HADOOP_HOME" ]; then + HADOOP_CP=`$HADOOP_HOME/bin/hadoop classpath` +elif [ $(command -v hadoop) ]; then + HADOOP_CP=`hadoop classpath` + echo $HADOOP_CP +else + echo "Environment variable HADOOP_CLASSPATH or HADOOP_HOME need to be set" + exit 1 +fi + +CP="${ATLASCPPATH}:${HBASE_CP}:${HADOOP_CP}" + +# If running in cygwin, convert pathnames and classpath to Windows format. +if [ "${CYGWIN}" == "true" ] +then + ATLAS_LOG_DIR=`cygpath -w ${ATLAS_LOG_DIR}` + LOGFILE=`cygpath -w ${LOGFILE}` + HBASE_CP=`cygpath -w ${HBASE_CP}` + HADOOP_CP=`cygpath -w ${HADOOP_CP}` + CP=`cygpath -w -p ${CP}` +fi + +JAVA_PROPERTIES="$ATLAS_OPTS -Datlas.log.dir=$ATLAS_LOG_DIR -Datlas.log.file=import-hbase.log +-Dlog4j.configuration=atlas-hbase-import-log4j.xml" +shift + +while [[ ${1} =~ ^\-D ]]; do + JAVA_PROPERTIES="${JAVA_PROPERTIES} ${1}" + shift +done + +echo "Log file for import is $LOGFILE" + +"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" org.apache.atlas.hbase.util.ImportHBaseEntities $allargs + +RETVAL=$? +[ $RETVAL -eq 0 ] && echo HBase Data Model imported successfully!!! +[ $RETVAL -ne 0 ] && echo Failed to import HBase Data Model!!! diff --git a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/util/ImportHBaseEntities.java b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/util/ImportHBaseEntities.java new file mode 100644 index 0000000..cf3a9fc --- /dev/null +++ b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/util/ImportHBaseEntities.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.hbase.util; + + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.hook.AtlasHookException; +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ImportHBaseEntities extends ImportHBaseEntitiesBase { + private static final Logger LOG = LoggerFactory.getLogger(ImportHBaseEntities.class); + + public static void main(String[] args) throws AtlasHookException { + try { + ImportHBaseEntities importHBaseEntities = new ImportHBaseEntities(args); + + importHBaseEntities.execute(); + } catch(Exception e) { + throw new AtlasHookException("ImportHBaseEntities failed.", e); + } + } + + public ImportHBaseEntities(String[] args) throws Exception { + super(args); + } + + public boolean execute() throws Exception { + boolean ret = false; + + if (hbaseAdmin != null) { + if (StringUtils.isEmpty(namespaceToImport) && StringUtils.isEmpty(tableToImport)) { + NamespaceDescriptor[] namespaceDescriptors = hbaseAdmin.listNamespaceDescriptors(); + if (!ArrayUtils.isEmpty(namespaceDescriptors)) { + for (NamespaceDescriptor namespaceDescriptor : namespaceDescriptors) { + String namespace = namespaceDescriptor.getName(); + importNameSpace(namespace); + } + } + HTableDescriptor[] htds = hbaseAdmin.listTables(); + if (!ArrayUtils.isEmpty(htds)) { + for (HTableDescriptor htd : htds) { + String tableName = htd.getNameAsString(); + importTable(tableName); + } + } + ret = true; + } else if (StringUtils.isNotEmpty(namespaceToImport)) { + importNameSpace(namespaceToImport); + ret = true; + } else if (StringUtils.isNotEmpty(tableToImport)) { + importTable(tableToImport); + ret = true; + } + } + + return ret; + } + + public String importNameSpace(final String nameSpace) throws Exception { + NamespaceDescriptor namespaceDescriptor = hbaseAdmin.getNamespaceDescriptor(nameSpace); + + createOrUpdateNameSpace(namespaceDescriptor); + + return namespaceDescriptor.getName(); + } + + public String importTable(final String tableName) throws Exception { + byte[] tblName = tableName.getBytes(); + HTableDescriptor htd = hbaseAdmin.getTableDescriptor(tblName); + String nsName = htd.getTableName().getNamespaceAsString(); + NamespaceDescriptor nsDescriptor = hbaseAdmin.getNamespaceDescriptor(nsName); + AtlasEntity nsEntity = createOrUpdateNameSpace(nsDescriptor); + HColumnDescriptor[] hcdts = htd.getColumnFamilies(); + createOrUpdateTable(nsName, tableName, nsEntity, htd, hcdts); + + return htd.getTableName().getNameAsString(); + } +} diff --git a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/util/ImportHBaseEntitiesBase.java b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/util/ImportHBaseEntitiesBase.java new file mode 100644 index 0000000..f6d511f --- /dev/null +++ b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/util/ImportHBaseEntitiesBase.java @@ -0,0 +1,372 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.hbase.util; + +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.model.instance.EntityMutations; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.type.AtlasTypeUtil; +import org.apache.atlas.utils.AuthenticationUtil; +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.configuration.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public class ImportHBaseEntitiesBase { + private static final Logger LOG = LoggerFactory.getLogger(ImportHBaseEntitiesBase.class); + + static final String NAMESPACE_FLAG = "-n"; + static final String TABLE_FLAG = "-t"; + static final String NAMESPACE_FULL_FLAG = "--namespace"; + static final String TABLE_FULL_FLAG = "--tablename"; + static final String ATLAS_ENDPOINT = "atlas.rest.address"; + static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; + static final String NAMESPACE_TYPE = "hbase_namespace"; + static final String TABLE_TYPE = "hbase_table"; + static final String COLUMNFAMILY_TYPE = "hbase_column_family"; + static final String HBASE_CLUSTER_NAME = "atlas.cluster.name"; + static final String DEFAULT_CLUSTER_NAME = "primary"; + static final String QUALIFIED_NAME = "qualifiedName"; + static final String NAME = "name"; + static final String URI = "uri"; + static final String OWNER = "owner"; + static final String DESCRIPTION_ATTR = "description"; + static final String CLUSTERNAME = "clusterName"; + static final String NAMESPACE = "namespace"; + static final String TABLE = "table"; + static final String COLUMN_FAMILIES = "column_families"; + + + protected final HBaseAdmin hbaseAdmin; + protected final boolean failOnError; + protected final String namespaceToImport; + protected final String tableToImport; + private final AtlasClientV2 atlasClientV2; + private final UserGroupInformation ugi; + private final String clusterName; + private final HashMap<String, AtlasEntity> nameSpaceCache = new HashMap<>(); + private final HashMap<String, AtlasEntity> tableCache = new HashMap<>(); + private final HashMap<String, AtlasEntity> columnFamilyCache = new HashMap<>(); + + + protected ImportHBaseEntitiesBase(String[] args) throws Exception { + checkArgs(args); + + Configuration atlasConf = ApplicationProperties.get(); + String[] urls = atlasConf.getStringArray(ATLAS_ENDPOINT); + + if (urls == null || urls.length == 0) { + urls = new String[]{DEFAULT_ATLAS_URL}; + } + + if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) { + String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput(); + + ugi = null; + atlasClientV2 = new AtlasClientV2(urls, basicAuthUsernamePassword); + } else { + ugi = UserGroupInformation.getCurrentUser(); + atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), urls); + } + + Options options = new Options(); + options.addOption("n","namespace", true, "namespace"); + options.addOption("t", "table", true, "tablename"); + options.addOption("failOnError", false, "failOnError"); + + CommandLineParser parser = new BasicParser(); + CommandLine cmd = parser.parse(options, args); + + clusterName = atlasConf.getString(HBASE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); + failOnError = cmd.hasOption("failOnError"); + namespaceToImport = cmd.getOptionValue("n"); + tableToImport = cmd.getOptionValue("t"); + + org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); + + LOG.info("createHBaseClient(): checking HBase availability.."); + + HBaseAdmin.checkHBaseAvailable(conf); + + LOG.info("createHBaseClient(): HBase is available"); + + hbaseAdmin = new HBaseAdmin(conf); + } + + protected AtlasEntity createOrUpdateNameSpace(NamespaceDescriptor namespaceDescriptor) throws Exception { + String nsName = namespaceDescriptor.getName(); + String nsQualifiedName = getNameSpaceQualifiedName(clusterName, nsName); + AtlasEntity nsEntity = findNameSpaceEntityInAtlas(nsQualifiedName); + + if (nsEntity == null) { + LOG.info("Importing NameSpace: " + nsQualifiedName); + + AtlasEntity entity = getNameSpaceEntity(nsName); + + nsEntity = createEntityInAtlas(entity); + } + + return nsEntity; + } + + protected AtlasEntity createOrUpdateTable(String nameSpace, String tableName, AtlasEntity nameSapceEntity, HTableDescriptor htd, HColumnDescriptor[] hcdts) throws Exception { + String owner = htd.getOwnerString(); + String tblQualifiedName = getTableQualifiedName(clusterName, nameSpace, tableName); + AtlasEntity tableEntity = findTableEntityInAtlas(tblQualifiedName); + + if (tableEntity == null) { + LOG.info("Importing Table: " + tblQualifiedName); + + AtlasEntity entity = getTableEntity(nameSpace, tableName, owner, nameSapceEntity); + + tableEntity = createEntityInAtlas(entity); + } + + List<AtlasEntity> cfEntities = createOrUpdateColumnFamilies(nameSpace, tableName, owner, hcdts, tableEntity); + List<AtlasObjectId> cfIDs = new ArrayList<>(); + + if (CollectionUtils.isNotEmpty(cfEntities)) { + for (AtlasEntity cfEntity : cfEntities) { + cfIDs.add(AtlasTypeUtil.getAtlasObjectId(cfEntity)); + } + } + + tableEntity.setAttribute(COLUMN_FAMILIES, cfIDs); + + return tableEntity; + } + + protected List<AtlasEntity> createOrUpdateColumnFamilies(String nameSpace, String tableName, String owner, HColumnDescriptor[] hcdts , AtlasEntity tableEntity) throws Exception { + List<AtlasEntity> ret = new ArrayList<>(); + + if (hcdts != null) { + AtlasObjectId tableId = AtlasTypeUtil.getAtlasObjectId(tableEntity); + + for (HColumnDescriptor hcdt : hcdts) { + String cfName = hcdt.getNameAsString(); + String cfQualifiedName = getColumnFamilyQualifiedName(clusterName, nameSpace, tableName, cfName); + AtlasEntity cfEntity = findColumnFamiltyEntityInAtlas(cfQualifiedName); + + if (cfEntity == null) { + LOG.info("Importing Column-family: " + cfQualifiedName); + + AtlasEntity entity = getColumnFamilyEntity(nameSpace, tableName, owner, hcdt, tableId); + + cfEntity = createEntityInAtlas(entity); + } + + ret.add(cfEntity); + } + } + + return ret; + } + + private AtlasEntity findNameSpaceEntityInAtlas(String nsQualifiedName) { + AtlasEntity ret = nameSpaceCache.get(nsQualifiedName); + + if (ret == null) { + try { + ret = findEntityInAtlas(NAMESPACE_TYPE, nsQualifiedName); + + if (ret != null) { + nameSpaceCache.put(nsQualifiedName, ret); + } + } catch (Exception e) { + ret = null; // entity doesn't exist in Atlas + } + } + + return ret; + } + + private AtlasEntity findTableEntityInAtlas(String tableQualifiedName) { + AtlasEntity ret = tableCache.get(tableQualifiedName); + + if (ret == null) { + try { + ret = findEntityInAtlas(TABLE_TYPE, tableQualifiedName); + + if (ret != null) { + tableCache.put(tableQualifiedName, ret); + } + } catch (Exception e) { + ret = null; // entity doesn't exist in Atlas + } + } + + return ret; + } + + private AtlasEntity findColumnFamiltyEntityInAtlas(String columnFamilyQualifiedName) { + AtlasEntity ret = columnFamilyCache.get(columnFamilyQualifiedName); + + if (ret == null) { + try { + ret = findEntityInAtlas(COLUMNFAMILY_TYPE, columnFamilyQualifiedName); + + if (ret != null) { + columnFamilyCache.put(columnFamilyQualifiedName, ret); + } + } catch (Exception e) { + ret = null; // entity doesn't exist in Atlas + } + } + + return ret; + } + + private AtlasEntity findEntityInAtlas(String typeName, String qualifiedName) throws Exception { + Map<String, String> attributes = Collections.singletonMap(QUALIFIED_NAME, qualifiedName); + + return atlasClientV2.getEntityByAttribute(typeName, attributes).getEntity(); + } + + private AtlasEntity getNameSpaceEntity(String nameSpace){ + AtlasEntity ret = new AtlasEntity(NAMESPACE_TYPE); + String qualifiedName = getNameSpaceQualifiedName(clusterName, nameSpace); + + ret.setAttribute(QUALIFIED_NAME, qualifiedName); + ret.setAttribute(CLUSTERNAME, clusterName); + ret.setAttribute(NAME, nameSpace); + ret.setAttribute(DESCRIPTION_ATTR, nameSpace); + + return ret; + } + + private AtlasEntity getTableEntity(String nameSpace, String tableName, String owner, AtlasEntity nameSpaceEntity) { + AtlasEntity ret = new AtlasEntity(TABLE_TYPE); + String tableQualifiedName = getTableQualifiedName(clusterName, nameSpace, tableName); + + ret.setAttribute(QUALIFIED_NAME, tableQualifiedName); + ret.setAttribute(CLUSTERNAME, clusterName); + ret.setAttribute(NAMESPACE, AtlasTypeUtil.getAtlasObjectId(nameSpaceEntity)); + ret.setAttribute(NAME, tableName); + ret.setAttribute(DESCRIPTION_ATTR, tableName); + ret.setAttribute(OWNER, owner); + ret.setAttribute(URI, tableName); + + return ret; + } + + private AtlasEntity getColumnFamilyEntity(String nameSpace, String tableName, String owner, HColumnDescriptor hcdt, AtlasObjectId tableId){ + AtlasEntity ret = new AtlasEntity(COLUMNFAMILY_TYPE); + String cfName = hcdt.getNameAsString(); + String cfQualifiedName = getColumnFamilyQualifiedName(clusterName, nameSpace, tableName, cfName); + + ret.setAttribute(QUALIFIED_NAME, cfQualifiedName); + ret.setAttribute(CLUSTERNAME, clusterName); + ret.setAttribute(TABLE, tableId); + ret.setAttribute(NAME, cfName); + ret.setAttribute(DESCRIPTION_ATTR, cfName); + ret.setAttribute(OWNER, owner); + + return ret; + } + + private AtlasEntity createEntityInAtlas(AtlasEntity entity) throws Exception { + AtlasEntity ret = null; + EntityMutationResponse response = atlasClientV2.createEntity(new AtlasEntity.AtlasEntityWithExtInfo(entity)); + List<AtlasEntityHeader> entities = response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE); + + if (CollectionUtils.isNotEmpty(entities)) { + AtlasEntity.AtlasEntityWithExtInfo getByGuidResponse = atlasClientV2.getEntityByGuid(entities.get(0).getGuid()); + + ret = getByGuidResponse.getEntity(); + + LOG.info ("Created entity: type=" + ret.getTypeName() + ", guid=" + ret.getGuid()); + } + + return ret; + } + + private void checkArgs(String[] args) throws Exception { + String option = args.length > 0 ? args[0] : null; + String value = args.length > 1 ? args[1] : null; + + if (option != null && value == null) { + if (option.equalsIgnoreCase(NAMESPACE_FLAG) || option.equalsIgnoreCase(NAMESPACE_FULL_FLAG) || + option.equalsIgnoreCase(TABLE_FLAG) || option.equalsIgnoreCase(TABLE_FULL_FLAG)) { + + System.out.println("Usage: import-hbase.sh [-n <namespace> OR --namespace <namespace>] [-t <table> OR --table <table>]"); + + throw new Exception("Incorrect arguments.."); + } + } + } + + /** + * Construct the qualified name used to uniquely identify a ColumnFamily instance in Atlas. + * @param clusterName Name of the cluster to which the Hbase component belongs + * @param nameSpace Name of the Hbase database to which the Table belongs + * @param tableName Name of the Hbase table + * @param columnFamily Name of the ColumnFamily + * @return Unique qualified name to identify the Table instance in Atlas. + */ + private static String getColumnFamilyQualifiedName(String clusterName, String nameSpace, String tableName, String columnFamily) { + tableName = stripNameSpace(tableName.toLowerCase()); + + return String.format("%s.%s.%s@%s", nameSpace.toLowerCase(), tableName, columnFamily.toLowerCase(), clusterName); + } + + /** + * Construct the qualified name used to uniquely identify a Table instance in Atlas. + * @param clusterName Name of the cluster to which the Hbase component belongs + * @param nameSpace Name of the Hbase database to which the Table belongs + * @param tableName Name of the Hbase table + * @return Unique qualified name to identify the Table instance in Atlas. + */ + private static String getTableQualifiedName(String clusterName, String nameSpace, String tableName) { + tableName = stripNameSpace(tableName.toLowerCase()); + + return String.format("%s.%s@%s", nameSpace.toLowerCase(), tableName, clusterName); + } + + /** + * Construct the qualified name used to uniquely identify a Hbase NameSpace instance in Atlas. + * @param clusterName Name of the cluster to which the Hbase component belongs + * @param nameSpace Name of the NameSpace + * @return Unique qualified name to identify the HBase NameSpace instance in Atlas. + */ + private static String getNameSpaceQualifiedName(String clusterName, String nameSpace) { + return String.format("%s@%s", nameSpace.toLowerCase(), clusterName); + } + + private static String stripNameSpace(String tableName){ + tableName = tableName.substring(tableName.indexOf(":")+1); + + return tableName; + } +} diff --git a/distro/src/main/assemblies/standalone-package.xml b/distro/src/main/assemblies/standalone-package.xml index 052b28b..1881082 100755 --- a/distro/src/main/assemblies/standalone-package.xml +++ b/distro/src/main/assemblies/standalone-package.xml @@ -132,6 +132,14 @@ <!-- addons/hbase --> <fileSet> + <directory>../addons/hbase-bridge/src/bin</directory> + <outputDirectory>hook-bin</outputDirectory> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + + <!-- addons/hbase --> + <fileSet> <directory>../addons/hbase-bridge/target/dependency/bridge</directory> <outputDirectory>bridge</outputDirectory> </fileSet>