Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
atlas
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
dataplatform
atlas
Commits
d1e79fa0
Commit
d1e79fa0
authored
Dec 15, 2017
by
rmani
Committed by
Madhan Neethiraj
Dec 15, 2017
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
ATLAS-2051: Provide a utility to import HBase entities into Atlas
Signed-off-by:
Madhan Neethiraj
<
madhan@apache.org
>
parent
5bd5327c
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
668 additions
and
1 deletion
+668
-1
.gitignore
.gitignore
+1
-1
pom.xml
addons/hbase-bridge/pom.xml
+45
-0
import-hbase.sh
addons/hbase-bridge/src/bin/import-hbase.sh
+141
-0
ImportHBaseEntities.java
...java/org/apache/atlas/hbase/util/ImportHBaseEntities.java
+101
-0
ImportHBaseEntitiesBase.java
.../org/apache/atlas/hbase/util/ImportHBaseEntitiesBase.java
+372
-0
standalone-package.xml
distro/src/main/assemblies/standalone-package.xml
+8
-0
No files found.
.gitignore
View file @
d1e79fa0
...
@@ -37,7 +37,7 @@ pom.xml.releaseBackup
...
@@ -37,7 +37,7 @@ pom.xml.releaseBackup
maven-eclipse.xml
maven-eclipse.xml
#binary files
#binary files
**/bin/**
#
**/bin/**
!distro/src/bin/**
!distro/src/bin/**
#log files
#log files
...
...
addons/hbase-bridge/pom.xml
View file @
d1e79fa0
...
@@ -61,6 +61,11 @@
...
@@ -61,6 +61,11 @@
<dependency>
<dependency>
<groupId>
org.apache.atlas
</groupId>
<groupId>
org.apache.atlas
</groupId>
<artifactId>
atlas-client-v2
</artifactId>
</dependency>
<dependency>
<groupId>
org.apache.atlas
</groupId>
<artifactId>
atlas-notification
</artifactId>
<artifactId>
atlas-notification
</artifactId>
</dependency>
</dependency>
...
@@ -247,6 +252,11 @@
...
@@ -247,6 +252,11 @@
</artifactItem>
</artifactItem>
<artifactItem>
<artifactItem>
<groupId>
${project.groupId}
</groupId>
<groupId>
${project.groupId}
</groupId>
<artifactId>
atlas-client-v2
</artifactId>
<version>
${project.version}
</version>
</artifactItem>
<artifactItem>
<groupId>
${project.groupId}
</groupId>
<artifactId>
atlas-intg
</artifactId>
<artifactId>
atlas-intg
</artifactId>
<version>
${project.version}
</version>
<version>
${project.version}
</version>
</artifactItem>
</artifactItem>
...
@@ -275,6 +285,41 @@
...
@@ -275,6 +285,41 @@
<artifactId>
kafka-clients
</artifactId>
<artifactId>
kafka-clients
</artifactId>
<version>
${kafka.version}
</version>
<version>
${kafka.version}
</version>
</artifactItem>
</artifactItem>
<artifactItem>
<groupId>
com.sun.jersey.contribs
</groupId>
<artifactId>
jersey-multipart
</artifactId>
<version>
${jersey.version}
</version>
</artifactItem>
<artifactItem>
<groupId>
org.scala-lang
</groupId>
<artifactId>
scala-library
</artifactId>
<version>
${scala.version}
</version>
</artifactItem>
<artifactItem>
<groupId>
com.fasterxml.jackson.core
</groupId>
<artifactId>
jackson-databind
</artifactId>
<version>
${jackson.version}
</version>
</artifactItem>
<artifactItem>
<groupId>
com.fasterxml.jackson.core
</groupId>
<artifactId>
jackson-core
</artifactId>
<version>
${jackson.version}
</version>
</artifactItem>
<artifactItem>
<groupId>
com.fasterxml.jackson.core
</groupId>
<artifactId>
jackson-annotations
</artifactId>
<version>
${jackson.version}
</version>
</artifactItem>
<artifactItem>
<groupId>
commons-configuration
</groupId>
<artifactId>
commons-configuration
</artifactId>
<version>
${commons-conf.version}
</version>
</artifactItem>
<artifactItem>
<groupId>
org.apache.hbase
</groupId>
<artifactId>
hbase-common
</artifactId>
<version>
${hbase.version}
</version>
</artifactItem>
</artifactItems>
</artifactItems>
</configuration>
</configuration>
</execution>
</execution>
...
...
addons/hbase-bridge/src/bin/import-hbase.sh
0 → 100644
View file @
d1e79fa0
#!/bin/bash
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License. See accompanying LICENSE file.
#
# resolve links - $0 may be a softlink
PRG
=
"
${
0
}
"
[[
`
uname
-s
`
==
*
"CYGWIN"
*
]]
&&
CYGWIN
=
true
while
[
-h
"
${
PRG
}
"
]
;
do
ls
=
`
ls
-ld
"
${
PRG
}
"
`
link
=
`
expr
"
$ls
"
:
'.*-> \(.*\)$'
`
if
expr
"
$link
"
:
'/.*'
>
/dev/null
;
then
PRG
=
"
$link
"
else
PRG
=
`
dirname
"
${
PRG
}
"
`
/
"
$link
"
fi
done
echo
">>>>>
$PRG
"
BASEDIR
=
`
dirname
${
PRG
}
`
BASEDIR
=
`
cd
${
BASEDIR
}
/..
;
pwd
`
echo
">>>>>
$BASEDIR
"
allargs
=
$@
if
test
-z
"
${
JAVA_HOME
}
"
then
JAVA_BIN
=
`
which java
`
JAR_BIN
=
`
which jar
`
else
JAVA_BIN
=
"
${
JAVA_HOME
}
/bin/java"
JAR_BIN
=
"
${
JAVA_HOME
}
/bin/jar"
fi
export
JAVA_BIN
if
[
!
-e
"
${
JAVA_BIN
}
"
]
||
[
!
-e
"
${
JAR_BIN
}
"
]
;
then
echo
"
$JAVA_BIN
and/or
$JAR_BIN
not found on the system. Please make sure java and jar commands are available."
exit
1
fi
# Construct Atlas classpath using jars from hook/hbase/atlas-hbase-plugin-impl/ directory.
for
i
in
"
${
BASEDIR
}
/hook/hbase/atlas-hbase-plugin-impl/"
*
.jar
;
do
ATLASCPPATH
=
"
${
ATLASCPPATH
}
:
$i
"
done
# log dir for applications
ATLAS_LOG_DIR
=
"
${
ATLAS_LOG_DIR
:-
$BASEDIR
/logs
}
"
export
ATLAS_LOG_DIR
LOGFILE
=
"
$ATLAS_LOG_DIR
/import-hbase.log"
TIME
=
`
date
+%Y%m%d%H%M%s
`
#Add HBase conf in classpath
if
[
!
-z
"
$HBASE_CONF_DIR
"
]
;
then
HBASE_CONF
=
$HBASE_CONF_DIR
elif
[
!
-z
"
$HBASE_HOME
"
]
;
then
HBASE_CONF
=
"
$HBASE_HOME
/conf"
elif
[
-e
/etc/hbase/conf
]
;
then
HBASE_CONF
=
"/etc/hbase/conf"
else
echo
"Could not find a valid HBASE configuration"
exit
1
fi
echo
Using HBase configuration directory
"[
$HBASE_CONF
]"
if
[
-f
"
${
HBASE_CONF
}
/hbase-env.sh"
]
;
then
.
"
${
HBASE_CONF
}
/hbase-env.sh"
fi
if
[
-z
"
$HBASE_HOME
"
]
;
then
if
[
-d
"
${
BASEDIR
}
/../hbase"
]
;
then
HBASE_HOME
=
${
BASEDIR
}
/../hbase
else
echo
"Please set HBASE_HOME to the root of HBase installation"
exit
1
fi
fi
HBASE_CP
=
"
${
HBASE_CONF
}
"
for
i
in
"
${
HBASE_HOME
}
/lib/"
*
.jar
;
do
HBASE_CP
=
"
${
HBASE_CP
}
:
$i
"
done
#Add hadoop conf in classpath
if
[
!
-z
"
$HADOOP_CLASSPATH
"
]
;
then
HADOOP_CP
=
$HADOOP_CLASSPATH
elif
[
!
-z
"
$HADOOP_HOME
"
]
;
then
HADOOP_CP
=
`
$HADOOP_HOME
/bin/hadoop classpath
`
elif
[
$(
command
-v
hadoop
)
]
;
then
HADOOP_CP
=
`
hadoop classpath
`
echo
$HADOOP_CP
else
echo
"Environment variable HADOOP_CLASSPATH or HADOOP_HOME need to be set"
exit
1
fi
CP
=
"
${
ATLASCPPATH
}
:
${
HBASE_CP
}
:
${
HADOOP_CP
}
"
# If running in cygwin, convert pathnames and classpath to Windows format.
if
[
"
${
CYGWIN
}
"
==
"true"
]
then
ATLAS_LOG_DIR
=
`
cygpath
-w
${
ATLAS_LOG_DIR
}
`
LOGFILE
=
`
cygpath
-w
${
LOGFILE
}
`
HBASE_CP
=
`
cygpath
-w
${
HBASE_CP
}
`
HADOOP_CP
=
`
cygpath
-w
${
HADOOP_CP
}
`
CP
=
`
cygpath
-w
-p
${
CP
}
`
fi
JAVA_PROPERTIES
=
"
$ATLAS_OPTS
-Datlas.log.dir=
$ATLAS_LOG_DIR
-Datlas.log.file=import-hbase.log
-Dlog4j.configuration=atlas-hbase-import-log4j.xml"
shift
while
[[
${
1
}
=
~ ^
\-
D
]]
;
do
JAVA_PROPERTIES
=
"
${
JAVA_PROPERTIES
}
${
1
}
"
shift
done
echo
"Log file for import is
$LOGFILE
"
"
${
JAVA_BIN
}
"
${
JAVA_PROPERTIES
}
-cp
"
${
CP
}
"
org.apache.atlas.hbase.util.ImportHBaseEntities
$allargs
RETVAL
=
$?
[
$RETVAL
-eq
0
]
&&
echo
HBase Data Model imported successfully!!!
[
$RETVAL
-ne
0
]
&&
echo
Failed to import HBase Data Model!!!
addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/util/ImportHBaseEntities.java
0 → 100644
View file @
d1e79fa0
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
hbase
.
util
;
import
org.apache.atlas.model.instance.AtlasEntity
;
import
org.apache.atlas.hook.AtlasHookException
;
import
org.apache.commons.lang.ArrayUtils
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.hadoop.hbase.HColumnDescriptor
;
import
org.apache.hadoop.hbase.HTableDescriptor
;
import
org.apache.hadoop.hbase.NamespaceDescriptor
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
public
class
ImportHBaseEntities
extends
ImportHBaseEntitiesBase
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
ImportHBaseEntities
.
class
);
public
static
void
main
(
String
[]
args
)
throws
AtlasHookException
{
try
{
ImportHBaseEntities
importHBaseEntities
=
new
ImportHBaseEntities
(
args
);
importHBaseEntities
.
execute
();
}
catch
(
Exception
e
)
{
throw
new
AtlasHookException
(
"ImportHBaseEntities failed."
,
e
);
}
}
public
ImportHBaseEntities
(
String
[]
args
)
throws
Exception
{
super
(
args
);
}
public
boolean
execute
()
throws
Exception
{
boolean
ret
=
false
;
if
(
hbaseAdmin
!=
null
)
{
if
(
StringUtils
.
isEmpty
(
namespaceToImport
)
&&
StringUtils
.
isEmpty
(
tableToImport
))
{
NamespaceDescriptor
[]
namespaceDescriptors
=
hbaseAdmin
.
listNamespaceDescriptors
();
if
(!
ArrayUtils
.
isEmpty
(
namespaceDescriptors
))
{
for
(
NamespaceDescriptor
namespaceDescriptor
:
namespaceDescriptors
)
{
String
namespace
=
namespaceDescriptor
.
getName
();
importNameSpace
(
namespace
);
}
}
HTableDescriptor
[]
htds
=
hbaseAdmin
.
listTables
();
if
(!
ArrayUtils
.
isEmpty
(
htds
))
{
for
(
HTableDescriptor
htd
:
htds
)
{
String
tableName
=
htd
.
getNameAsString
();
importTable
(
tableName
);
}
}
ret
=
true
;
}
else
if
(
StringUtils
.
isNotEmpty
(
namespaceToImport
))
{
importNameSpace
(
namespaceToImport
);
ret
=
true
;
}
else
if
(
StringUtils
.
isNotEmpty
(
tableToImport
))
{
importTable
(
tableToImport
);
ret
=
true
;
}
}
return
ret
;
}
public
String
importNameSpace
(
final
String
nameSpace
)
throws
Exception
{
NamespaceDescriptor
namespaceDescriptor
=
hbaseAdmin
.
getNamespaceDescriptor
(
nameSpace
);
createOrUpdateNameSpace
(
namespaceDescriptor
);
return
namespaceDescriptor
.
getName
();
}
public
String
importTable
(
final
String
tableName
)
throws
Exception
{
byte
[]
tblName
=
tableName
.
getBytes
();
HTableDescriptor
htd
=
hbaseAdmin
.
getTableDescriptor
(
tblName
);
String
nsName
=
htd
.
getTableName
().
getNamespaceAsString
();
NamespaceDescriptor
nsDescriptor
=
hbaseAdmin
.
getNamespaceDescriptor
(
nsName
);
AtlasEntity
nsEntity
=
createOrUpdateNameSpace
(
nsDescriptor
);
HColumnDescriptor
[]
hcdts
=
htd
.
getColumnFamilies
();
createOrUpdateTable
(
nsName
,
tableName
,
nsEntity
,
htd
,
hcdts
);
return
htd
.
getTableName
().
getNameAsString
();
}
}
addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/util/ImportHBaseEntitiesBase.java
0 → 100644
View file @
d1e79fa0
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
hbase
.
util
;
import
org.apache.atlas.AtlasClientV2
;
import
org.apache.atlas.ApplicationProperties
;
import
org.apache.atlas.model.instance.AtlasEntity
;
import
org.apache.atlas.model.instance.AtlasObjectId
;
import
org.apache.atlas.model.instance.EntityMutationResponse
;
import
org.apache.atlas.model.instance.EntityMutations
;
import
org.apache.atlas.model.instance.AtlasEntityHeader
;
import
org.apache.atlas.type.AtlasTypeUtil
;
import
org.apache.atlas.utils.AuthenticationUtil
;
import
org.apache.commons.cli.BasicParser
;
import
org.apache.commons.cli.CommandLine
;
import
org.apache.commons.cli.CommandLineParser
;
import
org.apache.commons.cli.Options
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.configuration.Configuration
;
import
org.apache.hadoop.hbase.HBaseConfiguration
;
import
org.apache.hadoop.hbase.HColumnDescriptor
;
import
org.apache.hadoop.hbase.HTableDescriptor
;
import
org.apache.hadoop.hbase.NamespaceDescriptor
;
import
org.apache.hadoop.hbase.client.HBaseAdmin
;
import
org.apache.hadoop.security.UserGroupInformation
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.*
;
public
class
ImportHBaseEntitiesBase
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
ImportHBaseEntitiesBase
.
class
);
static
final
String
NAMESPACE_FLAG
=
"-n"
;
static
final
String
TABLE_FLAG
=
"-t"
;
static
final
String
NAMESPACE_FULL_FLAG
=
"--namespace"
;
static
final
String
TABLE_FULL_FLAG
=
"--tablename"
;
static
final
String
ATLAS_ENDPOINT
=
"atlas.rest.address"
;
static
final
String
DEFAULT_ATLAS_URL
=
"http://localhost:21000/"
;
static
final
String
NAMESPACE_TYPE
=
"hbase_namespace"
;
static
final
String
TABLE_TYPE
=
"hbase_table"
;
static
final
String
COLUMNFAMILY_TYPE
=
"hbase_column_family"
;
static
final
String
HBASE_CLUSTER_NAME
=
"atlas.cluster.name"
;
static
final
String
DEFAULT_CLUSTER_NAME
=
"primary"
;
static
final
String
QUALIFIED_NAME
=
"qualifiedName"
;
static
final
String
NAME
=
"name"
;
static
final
String
URI
=
"uri"
;
static
final
String
OWNER
=
"owner"
;
static
final
String
DESCRIPTION_ATTR
=
"description"
;
static
final
String
CLUSTERNAME
=
"clusterName"
;
static
final
String
NAMESPACE
=
"namespace"
;
static
final
String
TABLE
=
"table"
;
static
final
String
COLUMN_FAMILIES
=
"column_families"
;
protected
final
HBaseAdmin
hbaseAdmin
;
protected
final
boolean
failOnError
;
protected
final
String
namespaceToImport
;
protected
final
String
tableToImport
;
private
final
AtlasClientV2
atlasClientV2
;
private
final
UserGroupInformation
ugi
;
private
final
String
clusterName
;
private
final
HashMap
<
String
,
AtlasEntity
>
nameSpaceCache
=
new
HashMap
<>();
private
final
HashMap
<
String
,
AtlasEntity
>
tableCache
=
new
HashMap
<>();
private
final
HashMap
<
String
,
AtlasEntity
>
columnFamilyCache
=
new
HashMap
<>();
protected
ImportHBaseEntitiesBase
(
String
[]
args
)
throws
Exception
{
checkArgs
(
args
);
Configuration
atlasConf
=
ApplicationProperties
.
get
();
String
[]
urls
=
atlasConf
.
getStringArray
(
ATLAS_ENDPOINT
);
if
(
urls
==
null
||
urls
.
length
==
0
)
{
urls
=
new
String
[]{
DEFAULT_ATLAS_URL
};
}
if
(!
AuthenticationUtil
.
isKerberosAuthenticationEnabled
())
{
String
[]
basicAuthUsernamePassword
=
AuthenticationUtil
.
getBasicAuthenticationInput
();
ugi
=
null
;
atlasClientV2
=
new
AtlasClientV2
(
urls
,
basicAuthUsernamePassword
);
}
else
{
ugi
=
UserGroupInformation
.
getCurrentUser
();
atlasClientV2
=
new
AtlasClientV2
(
ugi
,
ugi
.
getShortUserName
(),
urls
);
}
Options
options
=
new
Options
();
options
.
addOption
(
"n"
,
"namespace"
,
true
,
"namespace"
);
options
.
addOption
(
"t"
,
"table"
,
true
,
"tablename"
);
options
.
addOption
(
"failOnError"
,
false
,
"failOnError"
);
CommandLineParser
parser
=
new
BasicParser
();
CommandLine
cmd
=
parser
.
parse
(
options
,
args
);
clusterName
=
atlasConf
.
getString
(
HBASE_CLUSTER_NAME
,
DEFAULT_CLUSTER_NAME
);
failOnError
=
cmd
.
hasOption
(
"failOnError"
);
namespaceToImport
=
cmd
.
getOptionValue
(
"n"
);
tableToImport
=
cmd
.
getOptionValue
(
"t"
);
org
.
apache
.
hadoop
.
conf
.
Configuration
conf
=
HBaseConfiguration
.
create
();
LOG
.
info
(
"createHBaseClient(): checking HBase availability.."
);
HBaseAdmin
.
checkHBaseAvailable
(
conf
);
LOG
.
info
(
"createHBaseClient(): HBase is available"
);
hbaseAdmin
=
new
HBaseAdmin
(
conf
);
}
protected
AtlasEntity
createOrUpdateNameSpace
(
NamespaceDescriptor
namespaceDescriptor
)
throws
Exception
{
String
nsName
=
namespaceDescriptor
.
getName
();
String
nsQualifiedName
=
getNameSpaceQualifiedName
(
clusterName
,
nsName
);
AtlasEntity
nsEntity
=
findNameSpaceEntityInAtlas
(
nsQualifiedName
);
if
(
nsEntity
==
null
)
{
LOG
.
info
(
"Importing NameSpace: "
+
nsQualifiedName
);
AtlasEntity
entity
=
getNameSpaceEntity
(
nsName
);
nsEntity
=
createEntityInAtlas
(
entity
);
}
return
nsEntity
;
}
protected
AtlasEntity
createOrUpdateTable
(
String
nameSpace
,
String
tableName
,
AtlasEntity
nameSapceEntity
,
HTableDescriptor
htd
,
HColumnDescriptor
[]
hcdts
)
throws
Exception
{
String
owner
=
htd
.
getOwnerString
();
String
tblQualifiedName
=
getTableQualifiedName
(
clusterName
,
nameSpace
,
tableName
);
AtlasEntity
tableEntity
=
findTableEntityInAtlas
(
tblQualifiedName
);
if
(
tableEntity
==
null
)
{
LOG
.
info
(
"Importing Table: "
+
tblQualifiedName
);
AtlasEntity
entity
=
getTableEntity
(
nameSpace
,
tableName
,
owner
,
nameSapceEntity
);
tableEntity
=
createEntityInAtlas
(
entity
);
}
List
<
AtlasEntity
>
cfEntities
=
createOrUpdateColumnFamilies
(
nameSpace
,
tableName
,
owner
,
hcdts
,
tableEntity
);
List
<
AtlasObjectId
>
cfIDs
=
new
ArrayList
<>();
if
(
CollectionUtils
.
isNotEmpty
(
cfEntities
))
{
for
(
AtlasEntity
cfEntity
:
cfEntities
)
{
cfIDs
.
add
(
AtlasTypeUtil
.
getAtlasObjectId
(
cfEntity
));
}
}
tableEntity
.
setAttribute
(
COLUMN_FAMILIES
,
cfIDs
);
return
tableEntity
;
}
protected
List
<
AtlasEntity
>
createOrUpdateColumnFamilies
(
String
nameSpace
,
String
tableName
,
String
owner
,
HColumnDescriptor
[]
hcdts
,
AtlasEntity
tableEntity
)
throws
Exception
{
List
<
AtlasEntity
>
ret
=
new
ArrayList
<>();
if
(
hcdts
!=
null
)
{
AtlasObjectId
tableId
=
AtlasTypeUtil
.
getAtlasObjectId
(
tableEntity
);
for
(
HColumnDescriptor
hcdt
:
hcdts
)
{
String
cfName
=
hcdt
.
getNameAsString
();
String
cfQualifiedName
=
getColumnFamilyQualifiedName
(
clusterName
,
nameSpace
,
tableName
,
cfName
);
AtlasEntity
cfEntity
=
findColumnFamiltyEntityInAtlas
(
cfQualifiedName
);
if
(
cfEntity
==
null
)
{
LOG
.
info
(
"Importing Column-family: "
+
cfQualifiedName
);
AtlasEntity
entity
=
getColumnFamilyEntity
(
nameSpace
,
tableName
,
owner
,
hcdt
,
tableId
);
cfEntity
=
createEntityInAtlas
(
entity
);
}
ret
.
add
(
cfEntity
);
}
}
return
ret
;
}
private
AtlasEntity
findNameSpaceEntityInAtlas
(
String
nsQualifiedName
)
{
AtlasEntity
ret
=
nameSpaceCache
.
get
(
nsQualifiedName
);
if
(
ret
==
null
)
{
try
{
ret
=
findEntityInAtlas
(
NAMESPACE_TYPE
,
nsQualifiedName
);
if
(
ret
!=
null
)
{
nameSpaceCache
.
put
(
nsQualifiedName
,
ret
);
}
}
catch
(
Exception
e
)
{
ret
=
null
;
// entity doesn't exist in Atlas
}
}
return
ret
;
}
private
AtlasEntity
findTableEntityInAtlas
(
String
tableQualifiedName
)
{
AtlasEntity
ret
=
tableCache
.
get
(
tableQualifiedName
);
if
(
ret
==
null
)
{
try
{
ret
=
findEntityInAtlas
(
TABLE_TYPE
,
tableQualifiedName
);
if
(
ret
!=
null
)
{
tableCache
.
put
(
tableQualifiedName
,
ret
);
}
}
catch
(
Exception
e
)
{
ret
=
null
;
// entity doesn't exist in Atlas
}
}
return
ret
;
}
private
AtlasEntity
findColumnFamiltyEntityInAtlas
(
String
columnFamilyQualifiedName
)
{
AtlasEntity
ret
=
columnFamilyCache
.
get
(
columnFamilyQualifiedName
);
if
(
ret
==
null
)
{
try
{
ret
=
findEntityInAtlas
(
COLUMNFAMILY_TYPE
,
columnFamilyQualifiedName
);
if
(
ret
!=
null
)
{
columnFamilyCache
.
put
(
columnFamilyQualifiedName
,
ret
);
}
}
catch
(
Exception
e
)
{
ret
=
null
;
// entity doesn't exist in Atlas
}
}
return
ret
;
}
private
AtlasEntity
findEntityInAtlas
(
String
typeName
,
String
qualifiedName
)
throws
Exception
{
Map
<
String
,
String
>
attributes
=
Collections
.
singletonMap
(
QUALIFIED_NAME
,
qualifiedName
);
return
atlasClientV2
.
getEntityByAttribute
(
typeName
,
attributes
).
getEntity
();
}
private
AtlasEntity
getNameSpaceEntity
(
String
nameSpace
){
AtlasEntity
ret
=
new
AtlasEntity
(
NAMESPACE_TYPE
);
String
qualifiedName
=
getNameSpaceQualifiedName
(
clusterName
,
nameSpace
);
ret
.
setAttribute
(
QUALIFIED_NAME
,
qualifiedName
);
ret
.
setAttribute
(
CLUSTERNAME
,
clusterName
);
ret
.
setAttribute
(
NAME
,
nameSpace
);
ret
.
setAttribute
(
DESCRIPTION_ATTR
,
nameSpace
);
return
ret
;
}
private
AtlasEntity
getTableEntity
(
String
nameSpace
,
String
tableName
,
String
owner
,
AtlasEntity
nameSpaceEntity
)
{
AtlasEntity
ret
=
new
AtlasEntity
(
TABLE_TYPE
);
String
tableQualifiedName
=
getTableQualifiedName
(
clusterName
,
nameSpace
,
tableName
);
ret
.
setAttribute
(
QUALIFIED_NAME
,
tableQualifiedName
);
ret
.
setAttribute
(
CLUSTERNAME
,
clusterName
);
ret
.
setAttribute
(
NAMESPACE
,
AtlasTypeUtil
.
getAtlasObjectId
(
nameSpaceEntity
));
ret
.
setAttribute
(
NAME
,
tableName
);
ret
.
setAttribute
(
DESCRIPTION_ATTR
,
tableName
);
ret
.
setAttribute
(
OWNER
,
owner
);
ret
.
setAttribute
(
URI
,
tableName
);
return
ret
;
}
private
AtlasEntity
getColumnFamilyEntity
(
String
nameSpace
,
String
tableName
,
String
owner
,
HColumnDescriptor
hcdt
,
AtlasObjectId
tableId
){
AtlasEntity
ret
=
new
AtlasEntity
(
COLUMNFAMILY_TYPE
);
String
cfName
=
hcdt
.
getNameAsString
();
String
cfQualifiedName
=
getColumnFamilyQualifiedName
(
clusterName
,
nameSpace
,
tableName
,
cfName
);
ret
.
setAttribute
(
QUALIFIED_NAME
,
cfQualifiedName
);
ret
.
setAttribute
(
CLUSTERNAME
,
clusterName
);
ret
.
setAttribute
(
TABLE
,
tableId
);
ret
.
setAttribute
(
NAME
,
cfName
);
ret
.
setAttribute
(
DESCRIPTION_ATTR
,
cfName
);
ret
.
setAttribute
(
OWNER
,
owner
);
return
ret
;
}
private
AtlasEntity
createEntityInAtlas
(
AtlasEntity
entity
)
throws
Exception
{
AtlasEntity
ret
=
null
;
EntityMutationResponse
response
=
atlasClientV2
.
createEntity
(
new
AtlasEntity
.
AtlasEntityWithExtInfo
(
entity
));
List
<
AtlasEntityHeader
>
entities
=
response
.
getEntitiesByOperation
(
EntityMutations
.
EntityOperation
.
CREATE
);
if
(
CollectionUtils
.
isNotEmpty
(
entities
))
{
AtlasEntity
.
AtlasEntityWithExtInfo
getByGuidResponse
=
atlasClientV2
.
getEntityByGuid
(
entities
.
get
(
0
).
getGuid
());
ret
=
getByGuidResponse
.
getEntity
();
LOG
.
info
(
"Created entity: type="
+
ret
.
getTypeName
()
+
", guid="
+
ret
.
getGuid
());
}
return
ret
;
}
private
void
checkArgs
(
String
[]
args
)
throws
Exception
{
String
option
=
args
.
length
>
0
?
args
[
0
]
:
null
;
String
value
=
args
.
length
>
1
?
args
[
1
]
:
null
;
if
(
option
!=
null
&&
value
==
null
)
{
if
(
option
.
equalsIgnoreCase
(
NAMESPACE_FLAG
)
||
option
.
equalsIgnoreCase
(
NAMESPACE_FULL_FLAG
)
||
option
.
equalsIgnoreCase
(
TABLE_FLAG
)
||
option
.
equalsIgnoreCase
(
TABLE_FULL_FLAG
))
{
System
.
out
.
println
(
"Usage: import-hbase.sh [-n <namespace> OR --namespace <namespace>] [-t <table> OR --table <table>]"
);
throw
new
Exception
(
"Incorrect arguments.."
);
}
}
}
/**
* Construct the qualified name used to uniquely identify a ColumnFamily instance in Atlas.
* @param clusterName Name of the cluster to which the Hbase component belongs
* @param nameSpace Name of the Hbase database to which the Table belongs
* @param tableName Name of the Hbase table
* @param columnFamily Name of the ColumnFamily
* @return Unique qualified name to identify the Table instance in Atlas.
*/
private
static
String
getColumnFamilyQualifiedName
(
String
clusterName
,
String
nameSpace
,
String
tableName
,
String
columnFamily
)
{
tableName
=
stripNameSpace
(
tableName
.
toLowerCase
());
return
String
.
format
(
"%s.%s.%s@%s"
,
nameSpace
.
toLowerCase
(),
tableName
,
columnFamily
.
toLowerCase
(),
clusterName
);
}
/**
* Construct the qualified name used to uniquely identify a Table instance in Atlas.
* @param clusterName Name of the cluster to which the Hbase component belongs
* @param nameSpace Name of the Hbase database to which the Table belongs
* @param tableName Name of the Hbase table
* @return Unique qualified name to identify the Table instance in Atlas.
*/
private
static
String
getTableQualifiedName
(
String
clusterName
,
String
nameSpace
,
String
tableName
)
{
tableName
=
stripNameSpace
(
tableName
.
toLowerCase
());
return
String
.
format
(
"%s.%s@%s"
,
nameSpace
.
toLowerCase
(),
tableName
,
clusterName
);
}
/**
* Construct the qualified name used to uniquely identify a Hbase NameSpace instance in Atlas.
* @param clusterName Name of the cluster to which the Hbase component belongs
* @param nameSpace Name of the NameSpace
* @return Unique qualified name to identify the HBase NameSpace instance in Atlas.
*/
private
static
String
getNameSpaceQualifiedName
(
String
clusterName
,
String
nameSpace
)
{
return
String
.
format
(
"%s@%s"
,
nameSpace
.
toLowerCase
(),
clusterName
);
}
private
static
String
stripNameSpace
(
String
tableName
){
tableName
=
tableName
.
substring
(
tableName
.
indexOf
(
":"
)+
1
);
return
tableName
;
}
}
distro/src/main/assemblies/standalone-package.xml
View file @
d1e79fa0
...
@@ -132,6 +132,14 @@
...
@@ -132,6 +132,14 @@
<!-- addons/hbase -->
<!-- addons/hbase -->
<fileSet>
<fileSet>
<directory>
../addons/hbase-bridge/src/bin
</directory>
<outputDirectory>
hook-bin
</outputDirectory>
<fileMode>
0755
</fileMode>
<directoryMode>
0755
</directoryMode>
</fileSet>
<!-- addons/hbase -->
<fileSet>
<directory>
../addons/hbase-bridge/target/dependency/bridge
</directory>
<directory>
../addons/hbase-bridge/target/dependency/bridge
</directory>
<outputDirectory>
bridge
</outputDirectory>
<outputDirectory>
bridge
</outputDirectory>
</fileSet>
</fileSet>
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment