Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
atlas
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
dataplatform
atlas
Commits
7f5a665e
Commit
7f5a665e
authored
7 years ago
by
rmani
Committed by
Madhan Neethiraj
7 years ago
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
ATLAS-2525: updated HBase, Hive hooks to enable import namespaces/databases/tables listed in a file
Signed-off-by:
Madhan Neethiraj
<
madhan@apache.org
>
parent
f42c1d9f
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
704 additions
and
521 deletions
+704
-521
import-hbase.sh
addons/hbase-bridge/src/bin/import-hbase.sh
+1
-1
HBaseBridge.java
.../main/java/org/apache/atlas/hbase/bridge/HBaseBridge.java
+661
-0
ImportHBaseEntities.java
...java/org/apache/atlas/hbase/util/ImportHBaseEntities.java
+0
-101
ImportHBaseEntitiesBase.java
.../org/apache/atlas/hbase/util/ImportHBaseEntitiesBase.java
+0
-417
HiveMetaStoreBridge.java
...ava/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+42
-2
No files found.
addons/hbase-bridge/src/bin/import-hbase.sh
View file @
7f5a665e
...
...
@@ -134,7 +134,7 @@ done
echo
"Log file for import is
$LOGFILE
"
"
${
JAVA_BIN
}
"
${
JAVA_PROPERTIES
}
-cp
"
${
CP
}
"
org.apache.atlas.hbase.
util.ImportHBaseEntities
$allargs
"
${
JAVA_BIN
}
"
${
JAVA_PROPERTIES
}
-cp
"
${
CP
}
"
org.apache.atlas.hbase.
bridge.HBaseBridge
$allargs
RETVAL
=
$?
[
$RETVAL
-eq
0
]
&&
echo
HBase Data Model imported successfully!!!
...
...
This diff is collapsed.
Click to expand it.
addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseBridge.java
0 → 100644
View file @
7f5a665e
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
hbase
.
bridge
;
import
org.apache.atlas.ApplicationProperties
;
import
org.apache.atlas.AtlasClientV2
;
import
org.apache.atlas.hbase.model.HBaseDataTypes
;
import
org.apache.atlas.hook.AtlasHookException
;
import
org.apache.atlas.model.instance.AtlasEntity
;
import
org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo
;
import
org.apache.atlas.model.instance.AtlasEntityHeader
;
import
org.apache.atlas.model.instance.AtlasObjectId
;
import
org.apache.atlas.model.instance.EntityMutationResponse
;
import
org.apache.atlas.type.AtlasTypeUtil
;
import
org.apache.atlas.utils.AuthenticationUtil
;
import
org.apache.commons.cli.BasicParser
;
import
org.apache.commons.cli.CommandLine
;
import
org.apache.commons.cli.CommandLineParser
;
import
org.apache.commons.cli.Options
;
import
org.apache.commons.cli.ParseException
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.configuration.Configuration
;
import
org.apache.commons.lang.ArrayUtils
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.hadoop.hbase.HBaseConfiguration
;
import
org.apache.hadoop.hbase.HColumnDescriptor
;
import
org.apache.hadoop.hbase.HTableDescriptor
;
import
org.apache.hadoop.hbase.NamespaceDescriptor
;
import
org.apache.hadoop.hbase.client.HBaseAdmin
;
import
org.apache.hadoop.security.UserGroupInformation
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.io.BufferedReader
;
import
java.io.File
;
import
java.io.FileReader
;
import
java.util.ArrayList
;
import
java.util.Arrays
;
import
java.util.Collections
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.regex.Pattern
;
public
class
HBaseBridge
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
HBaseBridge
.
class
);
private
static
final
int
EXIT_CODE_SUCCESS
=
0
;
private
static
final
int
EXIT_CODE_FAILED
=
1
;
private
static
final
String
ATLAS_ENDPOINT
=
"atlas.rest.address"
;
private
static
final
String
DEFAULT_ATLAS_URL
=
"http://localhost:21000/"
;
private
static
final
String
HBASE_CLUSTER_NAME
=
"atlas.cluster.name"
;
private
static
final
String
DEFAULT_CLUSTER_NAME
=
"primary"
;
private
static
final
String
QUALIFIED_NAME
=
"qualifiedName"
;
private
static
final
String
NAME
=
"name"
;
private
static
final
String
URI
=
"uri"
;
private
static
final
String
OWNER
=
"owner"
;
private
static
final
String
DESCRIPTION_ATTR
=
"description"
;
private
static
final
String
CLUSTERNAME
=
"clusterName"
;
private
static
final
String
NAMESPACE
=
"namespace"
;
private
static
final
String
TABLE
=
"table"
;
private
static
final
String
COLUMN_FAMILIES
=
"column_families"
;
// table metadata
private
static
final
String
ATTR_TABLE_MAX_FILESIZE
=
"maxFileSize"
;
private
static
final
String
ATTR_TABLE_ISREADONLY
=
"isReadOnly"
;
private
static
final
String
ATTR_TABLE_ISCOMPACTION_ENABLED
=
"isCompactionEnabled"
;
private
static
final
String
ATTR_TABLE_REPLICATION_PER_REGION
=
"replicasPerRegion"
;
private
static
final
String
ATTR_TABLE_DURABLILITY
=
"durability"
;
// column family metadata
private
static
final
String
ATTR_CF_BLOOMFILTER_TYPE
=
"bloomFilterType"
;
private
static
final
String
ATTR_CF_COMPRESSION_TYPE
=
"compressionType"
;
private
static
final
String
ATTR_CF_COMPACTION_COMPRESSION_TYPE
=
"compactionCompressionType"
;
private
static
final
String
ATTR_CF_ENCRYPTION_TYPE
=
"encryptionType"
;
private
static
final
String
ATTR_CF_KEEP_DELETE_CELLS
=
"keepDeletedCells"
;
private
static
final
String
ATTR_CF_MAX_VERSIONS
=
"maxVersions"
;
private
static
final
String
ATTR_CF_MIN_VERSIONS
=
"minVersions"
;
private
static
final
String
ATTR_CF_DATA_BLOCK_ENCODING
=
"dataBlockEncoding"
;
private
static
final
String
ATTR_CF_TTL
=
"ttl"
;
private
static
final
String
ATTR_CF_BLOCK_CACHE_ENABLED
=
"blockCacheEnabled"
;
private
static
final
String
ATTR_CF_CACHED_BLOOM_ON_WRITE
=
"cacheBloomsOnWrite"
;
private
static
final
String
ATTR_CF_CACHED_DATA_ON_WRITE
=
"cacheDataOnWrite"
;
private
static
final
String
ATTR_CF_CACHED_INDEXES_ON_WRITE
=
"cacheIndexesOnWrite"
;
private
static
final
String
ATTR_CF_EVICT_BLOCK_ONCLOSE
=
"evictBlocksOnClose"
;
private
static
final
String
ATTR_CF_PREFETCH_BLOCK_ONOPEN
=
"prefetchBlocksOnOpen"
;
private
static
final
String
ATTRIBUTE_QUALIFIED_NAME
=
"qualifiedName"
;
private
static
final
String
HBASE_NAMESPACE_QUALIFIED_NAME
=
"%s@%s"
;
private
static
final
String
HBASE_TABLE_QUALIFIED_NAME_FORMAT
=
"%s:%s@%s"
;
private
static
final
String
HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT
=
"%s:%s.%s@%s"
;
private
final
String
clusterName
;
private
final
AtlasClientV2
atlasClientV2
;
private
final
HBaseAdmin
hbaseAdmin
;
public
static
void
main
(
String
[]
args
)
{
int
exitCode
=
EXIT_CODE_FAILED
;
try
{
Options
options
=
new
Options
();
options
.
addOption
(
"n"
,
"namespace"
,
true
,
"namespace"
);
options
.
addOption
(
"t"
,
"table"
,
true
,
"tablename"
);
options
.
addOption
(
"f"
,
"filename"
,
true
,
"filename"
);
CommandLineParser
parser
=
new
BasicParser
();
CommandLine
cmd
=
parser
.
parse
(
options
,
args
);
String
namespaceToImport
=
cmd
.
getOptionValue
(
"n"
);
String
tableToImport
=
cmd
.
getOptionValue
(
"t"
);
String
fileToImport
=
cmd
.
getOptionValue
(
"f"
);
Configuration
atlasConf
=
ApplicationProperties
.
get
();
String
[]
urls
=
atlasConf
.
getStringArray
(
ATLAS_ENDPOINT
);
if
(
urls
==
null
||
urls
.
length
==
0
)
{
urls
=
new
String
[]
{
DEFAULT_ATLAS_URL
};
}
final
AtlasClientV2
atlasClientV2
;
if
(!
AuthenticationUtil
.
isKerberosAuthenticationEnabled
())
{
String
[]
basicAuthUsernamePassword
=
AuthenticationUtil
.
getBasicAuthenticationInput
();
atlasClientV2
=
new
AtlasClientV2
(
urls
,
basicAuthUsernamePassword
);
}
else
{
UserGroupInformation
ugi
=
UserGroupInformation
.
getCurrentUser
();
atlasClientV2
=
new
AtlasClientV2
(
ugi
,
ugi
.
getShortUserName
(),
urls
);
}
HBaseBridge
importer
=
new
HBaseBridge
(
atlasConf
,
atlasClientV2
);
if
(
StringUtils
.
isNotEmpty
(
fileToImport
))
{
File
f
=
new
File
(
fileToImport
);
if
(
f
.
exists
()
&&
f
.
canRead
())
{
BufferedReader
br
=
new
BufferedReader
(
new
FileReader
(
f
));
String
line
=
null
;
while
((
line
=
br
.
readLine
())
!=
null
)
{
String
val
[]
=
line
.
split
(
":"
);
if
(
ArrayUtils
.
isNotEmpty
(
val
))
{
namespaceToImport
=
val
[
0
];
if
(
val
.
length
>
1
)
{
tableToImport
=
val
[
1
];
}
else
{
tableToImport
=
""
;
}
importer
.
importHBaseEntities
(
namespaceToImport
,
tableToImport
);
}
}
exitCode
=
EXIT_CODE_SUCCESS
;
}
else
{
LOG
.
error
(
"Failed to read the file"
);
}
}
else
{
importer
.
importHBaseEntities
(
namespaceToImport
,
tableToImport
);
exitCode
=
EXIT_CODE_SUCCESS
;
}
}
catch
(
ParseException
e
)
{
LOG
.
error
(
"Failed to parse arguments. Error: "
,
e
.
getMessage
());
printUsage
();
}
catch
(
Exception
e
)
{
System
.
out
.
println
(
"ImportHBaseEntities failed. Please check the log file for the detailed error message"
);
LOG
.
error
(
"ImportHBaseEntities failed"
,
e
);
}
System
.
exit
(
exitCode
);
}
public
HBaseBridge
(
Configuration
atlasConf
,
AtlasClientV2
atlasClientV2
)
throws
Exception
{
this
.
atlasClientV2
=
atlasClientV2
;
this
.
clusterName
=
atlasConf
.
getString
(
HBASE_CLUSTER_NAME
,
DEFAULT_CLUSTER_NAME
);
org
.
apache
.
hadoop
.
conf
.
Configuration
conf
=
HBaseConfiguration
.
create
();
LOG
.
info
(
"checking HBase availability.."
);
HBaseAdmin
.
checkHBaseAvailable
(
conf
);
LOG
.
info
(
"HBase is available"
);
hbaseAdmin
=
new
HBaseAdmin
(
conf
);
}
private
boolean
importHBaseEntities
(
String
namespaceToImport
,
String
tableToImport
)
throws
Exception
{
boolean
ret
=
false
;
if
(
StringUtils
.
isEmpty
(
namespaceToImport
)
&&
StringUtils
.
isEmpty
(
tableToImport
))
{
// when both NameSpace and Table options are not present
importNameSpaceAndTable
();
ret
=
true
;
}
else
if
(
StringUtils
.
isNotEmpty
(
namespaceToImport
))
{
// When Namespace option is present or both namespace and table options are present
importNameSpaceWithTable
(
namespaceToImport
,
tableToImport
);
ret
=
true
;
}
else
if
(
StringUtils
.
isNotEmpty
(
tableToImport
))
{
importTable
(
tableToImport
);
ret
=
true
;
}
return
ret
;
}
public
void
importNameSpace
(
final
String
nameSpace
)
throws
Exception
{
List
<
NamespaceDescriptor
>
matchingNameSpaceDescriptors
=
getMatchingNameSpaces
(
nameSpace
);
if
(
CollectionUtils
.
isNotEmpty
(
matchingNameSpaceDescriptors
))
{
for
(
NamespaceDescriptor
namespaceDescriptor
:
matchingNameSpaceDescriptors
)
{
createOrUpdateNameSpace
(
namespaceDescriptor
);
}
}
else
{
throw
new
AtlasHookException
(
"No NameSpace found for the given criteria. NameSpace = "
+
nameSpace
);
}
}
public
void
importTable
(
final
String
tableName
)
throws
Exception
{
String
tableNameStr
=
null
;
HTableDescriptor
[]
htds
=
hbaseAdmin
.
listTables
(
Pattern
.
compile
(
tableName
));
if
(
ArrayUtils
.
isNotEmpty
(
htds
))
{
for
(
HTableDescriptor
htd
:
htds
)
{
String
tblNameWithNameSpace
=
htd
.
getTableName
().
getNameWithNamespaceInclAsString
();
String
tblNameWithOutNameSpace
=
htd
.
getTableName
().
getNameAsString
();
if
(
tableName
.
equals
(
tblNameWithNameSpace
))
{
tableNameStr
=
tblNameWithNameSpace
;
}
else
if
(
tableName
.
equals
(
tblNameWithOutNameSpace
))
{
tableNameStr
=
tblNameWithOutNameSpace
;
}
else
{
// when wild cards are used in table name
if
(
tblNameWithNameSpace
!=
null
)
{
tableNameStr
=
tblNameWithNameSpace
;
}
else
if
(
tblNameWithOutNameSpace
!=
null
)
{
tableNameStr
=
tblNameWithOutNameSpace
;
}
}
byte
[]
nsByte
=
htd
.
getTableName
().
getNamespace
();
String
nsName
=
new
String
(
nsByte
);
NamespaceDescriptor
nsDescriptor
=
hbaseAdmin
.
getNamespaceDescriptor
(
nsName
);
AtlasEntityWithExtInfo
entity
=
createOrUpdateNameSpace
(
nsDescriptor
);
HColumnDescriptor
[]
hcdts
=
htd
.
getColumnFamilies
();
createOrUpdateTable
(
nsName
,
tableNameStr
,
entity
.
getEntity
(),
htd
,
hcdts
);
}
}
else
{
throw
new
AtlasHookException
(
"No Table found for the given criteria. Table = "
+
tableName
);
}
}
private
void
importNameSpaceAndTable
()
throws
Exception
{
NamespaceDescriptor
[]
namespaceDescriptors
=
hbaseAdmin
.
listNamespaceDescriptors
();
if
(
ArrayUtils
.
isNotEmpty
(
namespaceDescriptors
))
{
for
(
NamespaceDescriptor
namespaceDescriptor
:
namespaceDescriptors
)
{
String
namespace
=
namespaceDescriptor
.
getName
();
importNameSpace
(
namespace
);
}
}
HTableDescriptor
[]
htds
=
hbaseAdmin
.
listTables
();
if
(
ArrayUtils
.
isNotEmpty
(
htds
))
{
for
(
HTableDescriptor
htd
:
htds
)
{
String
tableName
=
htd
.
getNameAsString
();
importTable
(
tableName
);
}
}
}
private
void
importNameSpaceWithTable
(
String
namespaceToImport
,
String
tableToImport
)
throws
Exception
{
importNameSpace
(
namespaceToImport
);
List
<
HTableDescriptor
>
hTableDescriptors
=
new
ArrayList
<>();
if
(
StringUtils
.
isEmpty
(
tableToImport
))
{
List
<
NamespaceDescriptor
>
matchingNameSpaceDescriptors
=
getMatchingNameSpaces
(
namespaceToImport
);
if
(
CollectionUtils
.
isNotEmpty
(
matchingNameSpaceDescriptors
))
{
hTableDescriptors
=
getTableDescriptors
(
matchingNameSpaceDescriptors
);
}
}
else
{
tableToImport
=
namespaceToImport
+
":"
+
tableToImport
;
HTableDescriptor
[]
htds
=
hbaseAdmin
.
listTables
(
Pattern
.
compile
(
tableToImport
));
hTableDescriptors
.
addAll
(
Arrays
.
asList
(
htds
));
}
if
(
CollectionUtils
.
isNotEmpty
(
hTableDescriptors
))
{
for
(
HTableDescriptor
htd
:
hTableDescriptors
)
{
String
tblName
=
htd
.
getTableName
().
getNameAsString
();
importTable
(
tblName
);
}
}
}
private
List
<
NamespaceDescriptor
>
getMatchingNameSpaces
(
String
nameSpace
)
throws
Exception
{
List
<
NamespaceDescriptor
>
ret
=
new
ArrayList
<>();
NamespaceDescriptor
[]
namespaceDescriptors
=
hbaseAdmin
.
listNamespaceDescriptors
();
for
(
NamespaceDescriptor
namespaceDescriptor:
namespaceDescriptors
){
String
nmSpace
=
namespaceDescriptor
.
getName
();
if
(
nmSpace
.
matches
(
nameSpace
)){
ret
.
add
(
namespaceDescriptor
);
}
}
return
ret
;
}
private
List
<
HTableDescriptor
>
getTableDescriptors
(
List
<
NamespaceDescriptor
>
namespaceDescriptors
)
throws
Exception
{
List
<
HTableDescriptor
>
ret
=
new
ArrayList
<>();
for
(
NamespaceDescriptor
namespaceDescriptor:
namespaceDescriptors
)
{
HTableDescriptor
[]
tableDescriptors
=
hbaseAdmin
.
listTableDescriptorsByNamespace
(
namespaceDescriptor
.
getName
());
ret
.
addAll
(
Arrays
.
asList
(
tableDescriptors
));
}
return
ret
;
}
protected
AtlasEntityWithExtInfo
createOrUpdateNameSpace
(
NamespaceDescriptor
namespaceDescriptor
)
throws
Exception
{
String
nsName
=
namespaceDescriptor
.
getName
();
String
nsQualifiedName
=
getNameSpaceQualifiedName
(
clusterName
,
nsName
);
AtlasEntityWithExtInfo
nsEntity
=
findNameSpaceEntityInAtlas
(
nsQualifiedName
);
if
(
nsEntity
==
null
)
{
LOG
.
info
(
"Importing NameSpace: "
+
nsQualifiedName
);
AtlasEntity
entity
=
getNameSpaceEntity
(
nsName
,
null
);
nsEntity
=
createEntityInAtlas
(
new
AtlasEntityWithExtInfo
(
entity
));
}
else
{
LOG
.
info
(
"NameSpace already present in Atlas. Updating it..: "
+
nsQualifiedName
);
AtlasEntity
entity
=
getNameSpaceEntity
(
nsName
,
nsEntity
.
getEntity
());
nsEntity
.
setEntity
(
entity
);
nsEntity
=
updateEntityInAtlas
(
nsEntity
);
}
return
nsEntity
;
}
protected
AtlasEntityWithExtInfo
createOrUpdateTable
(
String
nameSpace
,
String
tableName
,
AtlasEntity
nameSapceEntity
,
HTableDescriptor
htd
,
HColumnDescriptor
[]
hcdts
)
throws
Exception
{
String
owner
=
htd
.
getOwnerString
();
String
tblQualifiedName
=
getTableQualifiedName
(
clusterName
,
nameSpace
,
tableName
);
AtlasEntityWithExtInfo
ret
=
findTableEntityInAtlas
(
tblQualifiedName
);
if
(
ret
==
null
)
{
LOG
.
info
(
"Importing Table: "
+
tblQualifiedName
);
AtlasEntity
entity
=
getTableEntity
(
nameSpace
,
tableName
,
owner
,
nameSapceEntity
,
htd
,
null
);
ret
=
createEntityInAtlas
(
new
AtlasEntityWithExtInfo
(
entity
));
}
else
{
LOG
.
info
(
"Table already present in Atlas. Updating it..: "
+
tblQualifiedName
);
AtlasEntity
entity
=
getTableEntity
(
nameSpace
,
tableName
,
owner
,
nameSapceEntity
,
htd
,
ret
.
getEntity
());
ret
.
setEntity
(
entity
);
ret
=
updateEntityInAtlas
(
ret
);
}
AtlasEntity
tableEntity
=
ret
.
getEntity
();
if
(
tableEntity
!=
null
)
{
List
<
AtlasEntityWithExtInfo
>
cfEntities
=
createOrUpdateColumnFamilies
(
nameSpace
,
tableName
,
owner
,
hcdts
,
tableEntity
);
List
<
AtlasObjectId
>
cfIDs
=
new
ArrayList
<>();
if
(
CollectionUtils
.
isNotEmpty
(
cfEntities
))
{
for
(
AtlasEntityWithExtInfo
cfEntity
:
cfEntities
)
{
cfIDs
.
add
(
AtlasTypeUtil
.
getAtlasObjectId
(
cfEntity
.
getEntity
()));
}
}
tableEntity
.
setAttribute
(
COLUMN_FAMILIES
,
cfIDs
);
}
return
ret
;
}
protected
List
<
AtlasEntityWithExtInfo
>
createOrUpdateColumnFamilies
(
String
nameSpace
,
String
tableName
,
String
owner
,
HColumnDescriptor
[]
hcdts
,
AtlasEntity
tableEntity
)
throws
Exception
{
List
<
AtlasEntityWithExtInfo
>
ret
=
new
ArrayList
<>();
if
(
hcdts
!=
null
)
{
AtlasObjectId
tableId
=
AtlasTypeUtil
.
getAtlasObjectId
(
tableEntity
);
for
(
HColumnDescriptor
columnFamilyDescriptor
:
hcdts
)
{
String
cfName
=
columnFamilyDescriptor
.
getNameAsString
();
String
cfQualifiedName
=
getColumnFamilyQualifiedName
(
clusterName
,
nameSpace
,
tableName
,
cfName
);
AtlasEntityWithExtInfo
cfEntity
=
findColumnFamiltyEntityInAtlas
(
cfQualifiedName
);
if
(
cfEntity
==
null
)
{
LOG
.
info
(
"Importing Column-family: "
+
cfQualifiedName
);
AtlasEntity
entity
=
getColumnFamilyEntity
(
nameSpace
,
tableName
,
owner
,
columnFamilyDescriptor
,
tableId
,
null
);
cfEntity
=
createEntityInAtlas
(
new
AtlasEntityWithExtInfo
(
entity
));
}
else
{
LOG
.
info
(
"ColumnFamily already present in Atlas. Updating it..: "
+
cfQualifiedName
);
AtlasEntity
entity
=
getColumnFamilyEntity
(
nameSpace
,
tableName
,
owner
,
columnFamilyDescriptor
,
tableId
,
cfEntity
.
getEntity
());
cfEntity
.
setEntity
(
entity
);
cfEntity
=
updateEntityInAtlas
(
cfEntity
);
}
ret
.
add
(
cfEntity
);
}
}
return
ret
;
}
private
AtlasEntityWithExtInfo
findNameSpaceEntityInAtlas
(
String
nsQualifiedName
)
{
AtlasEntityWithExtInfo
ret
=
null
;
try
{
ret
=
findEntityInAtlas
(
HBaseDataTypes
.
HBASE_NAMESPACE
.
getName
(),
nsQualifiedName
);
}
catch
(
Exception
e
)
{
ret
=
null
;
// entity doesn't exist in Atlas
}
return
ret
;
}
private
AtlasEntityWithExtInfo
findTableEntityInAtlas
(
String
tableQualifiedName
)
{
AtlasEntityWithExtInfo
ret
=
null
;
try
{
ret
=
findEntityInAtlas
(
HBaseDataTypes
.
HBASE_TABLE
.
getName
(),
tableQualifiedName
);
}
catch
(
Exception
e
)
{
ret
=
null
;
// entity doesn't exist in Atlas
}
return
ret
;
}
private
AtlasEntityWithExtInfo
findColumnFamiltyEntityInAtlas
(
String
columnFamilyQualifiedName
)
{
AtlasEntityWithExtInfo
ret
=
null
;
try
{
ret
=
findEntityInAtlas
(
HBaseDataTypes
.
HBASE_COLUMN_FAMILY
.
getName
(),
columnFamilyQualifiedName
);
}
catch
(
Exception
e
)
{
ret
=
null
;
// entity doesn't exist in Atlas
}
return
ret
;
}
private
AtlasEntityWithExtInfo
findEntityInAtlas
(
String
typeName
,
String
qualifiedName
)
throws
Exception
{
Map
<
String
,
String
>
attributes
=
Collections
.
singletonMap
(
QUALIFIED_NAME
,
qualifiedName
);
return
atlasClientV2
.
getEntityByAttribute
(
typeName
,
attributes
);
}
private
AtlasEntity
getNameSpaceEntity
(
String
nameSpace
,
AtlasEntity
nsEtity
)
{
AtlasEntity
ret
=
null
;
if
(
nsEtity
==
null
)
{
ret
=
new
AtlasEntity
(
HBaseDataTypes
.
HBASE_NAMESPACE
.
getName
());
}
else
{
ret
=
nsEtity
;
}
String
qualifiedName
=
getNameSpaceQualifiedName
(
clusterName
,
nameSpace
);
ret
.
setAttribute
(
QUALIFIED_NAME
,
qualifiedName
);
ret
.
setAttribute
(
CLUSTERNAME
,
clusterName
);
ret
.
setAttribute
(
NAME
,
nameSpace
);
ret
.
setAttribute
(
DESCRIPTION_ATTR
,
nameSpace
);
return
ret
;
}
private
AtlasEntity
getTableEntity
(
String
nameSpace
,
String
tableName
,
String
owner
,
AtlasEntity
nameSpaceEntity
,
HTableDescriptor
htd
,
AtlasEntity
atlasEntity
)
{
AtlasEntity
ret
=
null
;
if
(
atlasEntity
==
null
)
{
ret
=
new
AtlasEntity
(
HBaseDataTypes
.
HBASE_TABLE
.
getName
());
}
else
{
ret
=
atlasEntity
;
}
String
tableQualifiedName
=
getTableQualifiedName
(
clusterName
,
nameSpace
,
tableName
);
ret
.
setAttribute
(
QUALIFIED_NAME
,
tableQualifiedName
);
ret
.
setAttribute
(
CLUSTERNAME
,
clusterName
);
ret
.
setAttribute
(
NAMESPACE
,
AtlasTypeUtil
.
getAtlasObjectId
(
nameSpaceEntity
));
ret
.
setAttribute
(
NAME
,
tableName
);
ret
.
setAttribute
(
DESCRIPTION_ATTR
,
tableName
);
ret
.
setAttribute
(
OWNER
,
owner
);
ret
.
setAttribute
(
URI
,
tableName
);
ret
.
setAttribute
(
ATTR_TABLE_MAX_FILESIZE
,
htd
.
getMaxFileSize
());
ret
.
setAttribute
(
ATTR_TABLE_REPLICATION_PER_REGION
,
htd
.
getRegionReplication
());
ret
.
setAttribute
(
ATTR_TABLE_ISREADONLY
,
htd
.
isReadOnly
());
ret
.
setAttribute
(
ATTR_TABLE_ISCOMPACTION_ENABLED
,
htd
.
isCompactionEnabled
());
ret
.
setAttribute
(
ATTR_TABLE_DURABLILITY
,
(
htd
.
getDurability
()
!=
null
?
htd
.
getDurability
().
name
()
:
null
));
return
ret
;
}
private
AtlasEntity
getColumnFamilyEntity
(
String
nameSpace
,
String
tableName
,
String
owner
,
HColumnDescriptor
hcdt
,
AtlasObjectId
tableId
,
AtlasEntity
atlasEntity
){
AtlasEntity
ret
=
null
;
if
(
atlasEntity
==
null
)
{
ret
=
new
AtlasEntity
(
HBaseDataTypes
.
HBASE_COLUMN_FAMILY
.
getName
());
}
else
{
ret
=
atlasEntity
;
}
String
cfName
=
hcdt
.
getNameAsString
();
String
cfQualifiedName
=
getColumnFamilyQualifiedName
(
clusterName
,
nameSpace
,
tableName
,
cfName
);
ret
.
setAttribute
(
QUALIFIED_NAME
,
cfQualifiedName
);
ret
.
setAttribute
(
CLUSTERNAME
,
clusterName
);
ret
.
setAttribute
(
TABLE
,
tableId
);
ret
.
setAttribute
(
NAME
,
cfName
);
ret
.
setAttribute
(
DESCRIPTION_ATTR
,
cfName
);
ret
.
setAttribute
(
OWNER
,
owner
);
ret
.
setAttribute
(
ATTR_CF_BLOCK_CACHE_ENABLED
,
hcdt
.
isBlockCacheEnabled
());
ret
.
setAttribute
(
ATTR_CF_BLOOMFILTER_TYPE
,
(
hcdt
.
getBloomFilterType
()
!=
null
?
hcdt
.
getBloomFilterType
().
name
():
null
));
ret
.
setAttribute
(
ATTR_CF_CACHED_BLOOM_ON_WRITE
,
hcdt
.
isCacheBloomsOnWrite
());
ret
.
setAttribute
(
ATTR_CF_CACHED_DATA_ON_WRITE
,
hcdt
.
isCacheDataOnWrite
());
ret
.
setAttribute
(
ATTR_CF_CACHED_INDEXES_ON_WRITE
,
hcdt
.
isCacheIndexesOnWrite
());
ret
.
setAttribute
(
ATTR_CF_COMPACTION_COMPRESSION_TYPE
,
(
hcdt
.
getCompactionCompressionType
()
!=
null
?
hcdt
.
getCompactionCompressionType
().
name
():
null
));
ret
.
setAttribute
(
ATTR_CF_COMPRESSION_TYPE
,
(
hcdt
.
getCompressionType
()
!=
null
?
hcdt
.
getCompressionType
().
name
():
null
));
ret
.
setAttribute
(
ATTR_CF_DATA_BLOCK_ENCODING
,
(
hcdt
.
getDataBlockEncoding
()
!=
null
?
hcdt
.
getDataBlockEncoding
().
name
():
null
));
ret
.
setAttribute
(
ATTR_CF_ENCRYPTION_TYPE
,
hcdt
.
getEncryptionType
());
ret
.
setAttribute
(
ATTR_CF_EVICT_BLOCK_ONCLOSE
,
hcdt
.
isEvictBlocksOnClose
());
ret
.
setAttribute
(
ATTR_CF_KEEP_DELETE_CELLS
,
(
hcdt
.
getKeepDeletedCells
()
!=
null
?
hcdt
.
getKeepDeletedCells
().
name
():
null
));
ret
.
setAttribute
(
ATTR_CF_MAX_VERSIONS
,
hcdt
.
getMaxVersions
());
ret
.
setAttribute
(
ATTR_CF_MIN_VERSIONS
,
hcdt
.
getMinVersions
());
ret
.
setAttribute
(
ATTR_CF_PREFETCH_BLOCK_ONOPEN
,
hcdt
.
isPrefetchBlocksOnOpen
());
ret
.
setAttribute
(
ATTR_CF_TTL
,
hcdt
.
getTimeToLive
());
return
ret
;
}
private
AtlasEntityWithExtInfo
createEntityInAtlas
(
AtlasEntityWithExtInfo
entity
)
throws
Exception
{
AtlasEntityWithExtInfo
ret
=
null
;
EntityMutationResponse
response
=
atlasClientV2
.
createEntity
(
entity
);
List
<
AtlasEntityHeader
>
entities
=
response
.
getCreatedEntities
();
if
(
CollectionUtils
.
isNotEmpty
(
entities
))
{
AtlasEntityWithExtInfo
getByGuidResponse
=
atlasClientV2
.
getEntityByGuid
(
entities
.
get
(
0
).
getGuid
());
ret
=
getByGuidResponse
;
LOG
.
info
(
"Created {} entity: name={}, guid={}"
,
ret
.
getEntity
().
getTypeName
(),
ret
.
getEntity
().
getAttribute
(
ATTRIBUTE_QUALIFIED_NAME
),
ret
.
getEntity
().
getGuid
());
}
return
ret
;
}
private
AtlasEntityWithExtInfo
updateEntityInAtlas
(
AtlasEntityWithExtInfo
entity
)
throws
Exception
{
AtlasEntityWithExtInfo
ret
=
null
;
EntityMutationResponse
response
=
atlasClientV2
.
updateEntity
(
entity
);
if
(
response
!=
null
)
{
List
<
AtlasEntityHeader
>
entities
=
response
.
getUpdatedEntities
();
if
(
CollectionUtils
.
isNotEmpty
(
entities
))
{
AtlasEntityWithExtInfo
getByGuidResponse
=
atlasClientV2
.
getEntityByGuid
(
entities
.
get
(
0
).
getGuid
());
ret
=
getByGuidResponse
;
LOG
.
info
(
"Updated {} entity: name={}, guid={} "
,
ret
.
getEntity
().
getTypeName
(),
ret
.
getEntity
().
getAttribute
(
ATTRIBUTE_QUALIFIED_NAME
),
ret
.
getEntity
().
getGuid
());
}
else
{
LOG
.
info
(
"Entity: name={} "
,
entity
.
toString
()
+
" not updated as it is unchanged from what is in Atlas"
);
ret
=
entity
;
}
}
else
{
LOG
.
info
(
"Entity: name={} "
,
entity
.
toString
()
+
" not updated as it is unchanged from what is in Atlas"
);
ret
=
entity
;
}
return
ret
;
}
/**
* Construct the qualified name used to uniquely identify a ColumnFamily instance in Atlas.
* @param clusterName Name of the cluster to which the Hbase component belongs
* @param nameSpace Name of the Hbase database to which the Table belongs
* @param tableName Name of the Hbase table
* @param columnFamily Name of the ColumnFamily
* @return Unique qualified name to identify the Table instance in Atlas.
*/
private
static
String
getColumnFamilyQualifiedName
(
String
clusterName
,
String
nameSpace
,
String
tableName
,
String
columnFamily
)
{
tableName
=
stripNameSpace
(
tableName
.
toLowerCase
());
return
String
.
format
(
HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT
,
nameSpace
.
toLowerCase
(),
tableName
,
columnFamily
.
toLowerCase
(),
clusterName
);
}
/**
* Construct the qualified name used to uniquely identify a Table instance in Atlas.
* @param clusterName Name of the cluster to which the Hbase component belongs
* @param nameSpace Name of the Hbase database to which the Table belongs
* @param tableName Name of the Hbase table
* @return Unique qualified name to identify the Table instance in Atlas.
*/
private
static
String
getTableQualifiedName
(
String
clusterName
,
String
nameSpace
,
String
tableName
)
{
tableName
=
stripNameSpace
(
tableName
.
toLowerCase
());
return
String
.
format
(
HBASE_TABLE_QUALIFIED_NAME_FORMAT
,
nameSpace
.
toLowerCase
(),
tableName
,
clusterName
);
}
/**
* Construct the qualified name used to uniquely identify a Hbase NameSpace instance in Atlas.
* @param clusterName Name of the cluster to which the Hbase component belongs
* @param nameSpace Name of the NameSpace
* @return Unique qualified name to identify the HBase NameSpace instance in Atlas.
*/
private
static
String
getNameSpaceQualifiedName
(
String
clusterName
,
String
nameSpace
)
{
return
String
.
format
(
HBASE_NAMESPACE_QUALIFIED_NAME
,
nameSpace
.
toLowerCase
(),
clusterName
);
}
private
static
String
stripNameSpace
(
String
tableName
){
tableName
=
tableName
.
substring
(
tableName
.
indexOf
(
":"
)+
1
);
return
tableName
;
}
private
static
void
printUsage
()
{
System
.
out
.
println
(
"Usage 1: import-hbase.sh [-n <namespace regex> OR --namespace <namespace regex >] [-t <table regex > OR --table <table regex>]"
);
System
.
out
.
println
(
"Usage 2: import-hbase.sh [-f <filename>]"
);
System
.
out
.
println
(
" Format:"
);
System
.
out
.
println
(
" namespace1:tbl1"
);
System
.
out
.
println
(
" namespace1:tbl2"
);
System
.
out
.
println
(
" namespace2:tbl1"
);
}
}
This diff is collapsed.
Click to expand it.
addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/util/ImportHBaseEntities.java
deleted
100644 → 0
View file @
f42c1d9f
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
hbase
.
util
;
import
org.apache.atlas.model.instance.AtlasEntity
;
import
org.apache.atlas.hook.AtlasHookException
;
import
org.apache.commons.lang.ArrayUtils
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.hadoop.hbase.HColumnDescriptor
;
import
org.apache.hadoop.hbase.HTableDescriptor
;
import
org.apache.hadoop.hbase.NamespaceDescriptor
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
public
class
ImportHBaseEntities
extends
ImportHBaseEntitiesBase
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
ImportHBaseEntities
.
class
);
public
static
void
main
(
String
[]
args
)
throws
AtlasHookException
{
try
{
ImportHBaseEntities
importHBaseEntities
=
new
ImportHBaseEntities
(
args
);
importHBaseEntities
.
execute
();
}
catch
(
Exception
e
)
{
throw
new
AtlasHookException
(
"ImportHBaseEntities failed."
,
e
);
}
}
public
ImportHBaseEntities
(
String
[]
args
)
throws
Exception
{
super
(
args
);
}
public
boolean
execute
()
throws
Exception
{
boolean
ret
=
false
;
if
(
hbaseAdmin
!=
null
)
{
if
(
StringUtils
.
isEmpty
(
namespaceToImport
)
&&
StringUtils
.
isEmpty
(
tableToImport
))
{
NamespaceDescriptor
[]
namespaceDescriptors
=
hbaseAdmin
.
listNamespaceDescriptors
();
if
(!
ArrayUtils
.
isEmpty
(
namespaceDescriptors
))
{
for
(
NamespaceDescriptor
namespaceDescriptor
:
namespaceDescriptors
)
{
String
namespace
=
namespaceDescriptor
.
getName
();
importNameSpace
(
namespace
);
}
}
HTableDescriptor
[]
htds
=
hbaseAdmin
.
listTables
();
if
(!
ArrayUtils
.
isEmpty
(
htds
))
{
for
(
HTableDescriptor
htd
:
htds
)
{
String
tableName
=
htd
.
getNameAsString
();
importTable
(
tableName
);
}
}
ret
=
true
;
}
else
if
(
StringUtils
.
isNotEmpty
(
namespaceToImport
))
{
importNameSpace
(
namespaceToImport
);
ret
=
true
;
}
else
if
(
StringUtils
.
isNotEmpty
(
tableToImport
))
{
importTable
(
tableToImport
);
ret
=
true
;
}
}
return
ret
;
}
public
String
importNameSpace
(
final
String
nameSpace
)
throws
Exception
{
NamespaceDescriptor
namespaceDescriptor
=
hbaseAdmin
.
getNamespaceDescriptor
(
nameSpace
);
createOrUpdateNameSpace
(
namespaceDescriptor
);
return
namespaceDescriptor
.
getName
();
}
public
String
importTable
(
final
String
tableName
)
throws
Exception
{
byte
[]
tblName
=
tableName
.
getBytes
();
HTableDescriptor
htd
=
hbaseAdmin
.
getTableDescriptor
(
tblName
);
String
nsName
=
htd
.
getTableName
().
getNameWithNamespaceInclAsString
();
NamespaceDescriptor
nsDescriptor
=
hbaseAdmin
.
getNamespaceDescriptor
(
nsName
);
AtlasEntity
nsEntity
=
createOrUpdateNameSpace
(
nsDescriptor
);
HColumnDescriptor
[]
hcdts
=
htd
.
getColumnFamilies
();
createOrUpdateTable
(
nsName
,
tableName
,
nsEntity
,
htd
,
hcdts
);
return
htd
.
getTableName
().
getNameAsString
();
}
}
This diff is collapsed.
Click to expand it.
addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/util/ImportHBaseEntitiesBase.java
deleted
100644 → 0
View file @
f42c1d9f
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
hbase
.
util
;
import
org.apache.atlas.AtlasClientV2
;
import
org.apache.atlas.ApplicationProperties
;
import
org.apache.atlas.model.instance.AtlasEntity
;
import
org.apache.atlas.model.instance.AtlasObjectId
;
import
org.apache.atlas.model.instance.EntityMutationResponse
;
import
org.apache.atlas.model.instance.EntityMutations
;
import
org.apache.atlas.model.instance.AtlasEntityHeader
;
import
org.apache.atlas.type.AtlasTypeUtil
;
import
org.apache.atlas.utils.AuthenticationUtil
;
import
org.apache.commons.cli.BasicParser
;
import
org.apache.commons.cli.CommandLine
;
import
org.apache.commons.cli.CommandLineParser
;
import
org.apache.commons.cli.Options
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.configuration.Configuration
;
import
org.apache.hadoop.hbase.HBaseConfiguration
;
import
org.apache.hadoop.hbase.HColumnDescriptor
;
import
org.apache.hadoop.hbase.HTableDescriptor
;
import
org.apache.hadoop.hbase.NamespaceDescriptor
;
import
org.apache.hadoop.hbase.client.HBaseAdmin
;
import
org.apache.hadoop.security.UserGroupInformation
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.*
;
public
class
ImportHBaseEntitiesBase
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
ImportHBaseEntitiesBase
.
class
);
static
final
String
NAMESPACE_FLAG
=
"-n"
;
static
final
String
TABLE_FLAG
=
"-t"
;
static
final
String
NAMESPACE_FULL_FLAG
=
"--namespace"
;
static
final
String
TABLE_FULL_FLAG
=
"--tablename"
;
static
final
String
ATLAS_ENDPOINT
=
"atlas.rest.address"
;
static
final
String
DEFAULT_ATLAS_URL
=
"http://localhost:21000/"
;
static
final
String
NAMESPACE_TYPE
=
"hbase_namespace"
;
static
final
String
TABLE_TYPE
=
"hbase_table"
;
static
final
String
COLUMNFAMILY_TYPE
=
"hbase_column_family"
;
static
final
String
HBASE_CLUSTER_NAME
=
"atlas.cluster.name"
;
static
final
String
DEFAULT_CLUSTER_NAME
=
"primary"
;
static
final
String
QUALIFIED_NAME
=
"qualifiedName"
;
static
final
String
NAME
=
"name"
;
static
final
String
URI
=
"uri"
;
static
final
String
OWNER
=
"owner"
;
static
final
String
DESCRIPTION_ATTR
=
"description"
;
static
final
String
CLUSTERNAME
=
"clusterName"
;
static
final
String
NAMESPACE
=
"namespace"
;
static
final
String
TABLE
=
"table"
;
static
final
String
COLUMN_FAMILIES
=
"column_families"
;
// column addition metadata
public
static
final
String
ATTR_TABLE_MAX_FILESIZE
=
"maxFileSize"
;
public
static
final
String
ATTR_TABLE_ISREADONLY
=
"isReadOnly"
;
public
static
final
String
ATTR_TABLE_ISCOMPACTION_ENABLED
=
"isCompactionEnabled"
;
public
static
final
String
ATTR_TABLE_REPLICATION_PER_REGION
=
"replicasPerRegion"
;
public
static
final
String
ATTR_TABLE_DURABLILITY
=
"durability"
;
// column family additional metadata
public
static
final
String
ATTR_CF_BLOOMFILTER_TYPE
=
"bloomFilterType"
;
public
static
final
String
ATTR_CF_COMPRESSION_TYPE
=
"compressionType"
;
public
static
final
String
ATTR_CF_COMPACTION_COMPRESSION_TYPE
=
"compactionCompressionType"
;
public
static
final
String
ATTR_CF_ENCRYPTION_TYPE
=
"encryptionType"
;
public
static
final
String
ATTR_CF_KEEP_DELETE_CELLS
=
"keepDeletedCells"
;
public
static
final
String
ATTR_CF_MAX_VERSIONS
=
"maxVersions"
;
public
static
final
String
ATTR_CF_MIN_VERSIONS
=
"minVersions"
;
public
static
final
String
ATTR_CF_DATA_BLOCK_ENCODING
=
"dataBlockEncoding"
;
public
static
final
String
ATTR_CF_TTL
=
"ttl"
;
public
static
final
String
ATTR_CF_BLOCK_CACHE_ENABLED
=
"blockCacheEnabled"
;
public
static
final
String
ATTR_CF_CACHED_BLOOM_ON_WRITE
=
"cacheBloomsOnWrite"
;
public
static
final
String
ATTR_CF_CACHED_DATA_ON_WRITE
=
"cacheDataOnWrite"
;
public
static
final
String
ATTR_CF_CACHED_INDEXES_ON_WRITE
=
"cacheIndexesOnWrite"
;
public
static
final
String
ATTR_CF_EVICT_BLOCK_ONCLOSE
=
"evictBlocksOnClose"
;
public
static
final
String
ATTR_CF_PREFETCH_BLOCK_ONOPEN
=
"prefetchBlocksOnOpen"
;
public
static
final
String
HBASE_NAMESPACE_QUALIFIED_NAME
=
"%s@%s"
;
public
static
final
String
HBASE_TABLE_QUALIFIED_NAME_FORMAT
=
"%s:%s@%s"
;
public
static
final
String
HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT
=
"%s:%s.%s@%s"
;
protected
final
HBaseAdmin
hbaseAdmin
;
protected
final
boolean
failOnError
;
protected
final
String
namespaceToImport
;
protected
final
String
tableToImport
;
private
final
AtlasClientV2
atlasClientV2
;
private
final
UserGroupInformation
ugi
;
private
final
String
clusterName
;
private
final
HashMap
<
String
,
AtlasEntity
>
nameSpaceCache
=
new
HashMap
<>();
private
final
HashMap
<
String
,
AtlasEntity
>
tableCache
=
new
HashMap
<>();
private
final
HashMap
<
String
,
AtlasEntity
>
columnFamilyCache
=
new
HashMap
<>();
protected
ImportHBaseEntitiesBase
(
String
[]
args
)
throws
Exception
{
checkArgs
(
args
);
Configuration
atlasConf
=
ApplicationProperties
.
get
();
String
[]
urls
=
atlasConf
.
getStringArray
(
ATLAS_ENDPOINT
);
if
(
urls
==
null
||
urls
.
length
==
0
)
{
urls
=
new
String
[]{
DEFAULT_ATLAS_URL
};
}
if
(!
AuthenticationUtil
.
isKerberosAuthenticationEnabled
())
{
String
[]
basicAuthUsernamePassword
=
AuthenticationUtil
.
getBasicAuthenticationInput
();
ugi
=
null
;
atlasClientV2
=
new
AtlasClientV2
(
urls
,
basicAuthUsernamePassword
);
}
else
{
ugi
=
UserGroupInformation
.
getCurrentUser
();
atlasClientV2
=
new
AtlasClientV2
(
ugi
,
ugi
.
getShortUserName
(),
urls
);
}
Options
options
=
new
Options
();
options
.
addOption
(
"n"
,
"namespace"
,
true
,
"namespace"
);
options
.
addOption
(
"t"
,
"table"
,
true
,
"tablename"
);
options
.
addOption
(
"failOnError"
,
false
,
"failOnError"
);
CommandLineParser
parser
=
new
BasicParser
();
CommandLine
cmd
=
parser
.
parse
(
options
,
args
);
clusterName
=
atlasConf
.
getString
(
HBASE_CLUSTER_NAME
,
DEFAULT_CLUSTER_NAME
);
failOnError
=
cmd
.
hasOption
(
"failOnError"
);
namespaceToImport
=
cmd
.
getOptionValue
(
"n"
);
tableToImport
=
cmd
.
getOptionValue
(
"t"
);
org
.
apache
.
hadoop
.
conf
.
Configuration
conf
=
HBaseConfiguration
.
create
();
LOG
.
info
(
"createHBaseClient(): checking HBase availability.."
);
HBaseAdmin
.
checkHBaseAvailable
(
conf
);
LOG
.
info
(
"createHBaseClient(): HBase is available"
);
hbaseAdmin
=
new
HBaseAdmin
(
conf
);
}
protected
AtlasEntity
createOrUpdateNameSpace
(
NamespaceDescriptor
namespaceDescriptor
)
throws
Exception
{
String
nsName
=
namespaceDescriptor
.
getName
();
String
nsQualifiedName
=
getNameSpaceQualifiedName
(
clusterName
,
nsName
);
AtlasEntity
nsEntity
=
findNameSpaceEntityInAtlas
(
nsQualifiedName
);
if
(
nsEntity
==
null
)
{
LOG
.
info
(
"Importing NameSpace: "
+
nsQualifiedName
);
AtlasEntity
entity
=
getNameSpaceEntity
(
nsName
);
nsEntity
=
createEntityInAtlas
(
entity
);
}
return
nsEntity
;
}
protected
AtlasEntity
createOrUpdateTable
(
String
nameSpace
,
String
tableName
,
AtlasEntity
nameSapceEntity
,
HTableDescriptor
htd
,
HColumnDescriptor
[]
hcdts
)
throws
Exception
{
String
owner
=
htd
.
getOwnerString
();
String
tblQualifiedName
=
getTableQualifiedName
(
clusterName
,
nameSpace
,
tableName
);
AtlasEntity
tableEntity
=
findTableEntityInAtlas
(
tblQualifiedName
);
if
(
tableEntity
==
null
)
{
LOG
.
info
(
"Importing Table: "
+
tblQualifiedName
);
AtlasEntity
entity
=
getTableEntity
(
nameSpace
,
tableName
,
owner
,
nameSapceEntity
,
htd
);
tableEntity
=
createEntityInAtlas
(
entity
);
}
List
<
AtlasEntity
>
cfEntities
=
createOrUpdateColumnFamilies
(
nameSpace
,
tableName
,
owner
,
hcdts
,
tableEntity
);
List
<
AtlasObjectId
>
cfIDs
=
new
ArrayList
<>();
if
(
CollectionUtils
.
isNotEmpty
(
cfEntities
))
{
for
(
AtlasEntity
cfEntity
:
cfEntities
)
{
cfIDs
.
add
(
AtlasTypeUtil
.
getAtlasObjectId
(
cfEntity
));
}
}
tableEntity
.
setAttribute
(
COLUMN_FAMILIES
,
cfIDs
);
return
tableEntity
;
}
protected
List
<
AtlasEntity
>
createOrUpdateColumnFamilies
(
String
nameSpace
,
String
tableName
,
String
owner
,
HColumnDescriptor
[]
hcdts
,
AtlasEntity
tableEntity
)
throws
Exception
{
List
<
AtlasEntity
>
ret
=
new
ArrayList
<>();
if
(
hcdts
!=
null
)
{
AtlasObjectId
tableId
=
AtlasTypeUtil
.
getAtlasObjectId
(
tableEntity
);
for
(
HColumnDescriptor
hcdt
:
hcdts
)
{
String
cfName
=
hcdt
.
getNameAsString
();
String
cfQualifiedName
=
getColumnFamilyQualifiedName
(
clusterName
,
nameSpace
,
tableName
,
cfName
);
AtlasEntity
cfEntity
=
findColumnFamiltyEntityInAtlas
(
cfQualifiedName
);
if
(
cfEntity
==
null
)
{
LOG
.
info
(
"Importing Column-family: "
+
cfQualifiedName
);
AtlasEntity
entity
=
getColumnFamilyEntity
(
nameSpace
,
tableName
,
owner
,
hcdt
,
tableId
);
cfEntity
=
createEntityInAtlas
(
entity
);
}
ret
.
add
(
cfEntity
);
}
}
return
ret
;
}
private
AtlasEntity
findNameSpaceEntityInAtlas
(
String
nsQualifiedName
)
{
AtlasEntity
ret
=
nameSpaceCache
.
get
(
nsQualifiedName
);
if
(
ret
==
null
)
{
try
{
ret
=
findEntityInAtlas
(
NAMESPACE_TYPE
,
nsQualifiedName
);
if
(
ret
!=
null
)
{
nameSpaceCache
.
put
(
nsQualifiedName
,
ret
);
}
}
catch
(
Exception
e
)
{
ret
=
null
;
// entity doesn't exist in Atlas
}
}
return
ret
;
}
private
AtlasEntity
findTableEntityInAtlas
(
String
tableQualifiedName
)
{
AtlasEntity
ret
=
tableCache
.
get
(
tableQualifiedName
);
if
(
ret
==
null
)
{
try
{
ret
=
findEntityInAtlas
(
TABLE_TYPE
,
tableQualifiedName
);
if
(
ret
!=
null
)
{
tableCache
.
put
(
tableQualifiedName
,
ret
);
}
}
catch
(
Exception
e
)
{
ret
=
null
;
// entity doesn't exist in Atlas
}
}
return
ret
;
}
private
AtlasEntity
findColumnFamiltyEntityInAtlas
(
String
columnFamilyQualifiedName
)
{
AtlasEntity
ret
=
columnFamilyCache
.
get
(
columnFamilyQualifiedName
);
if
(
ret
==
null
)
{
try
{
ret
=
findEntityInAtlas
(
COLUMNFAMILY_TYPE
,
columnFamilyQualifiedName
);
if
(
ret
!=
null
)
{
columnFamilyCache
.
put
(
columnFamilyQualifiedName
,
ret
);
}
}
catch
(
Exception
e
)
{
ret
=
null
;
// entity doesn't exist in Atlas
}
}
return
ret
;
}
private
AtlasEntity
findEntityInAtlas
(
String
typeName
,
String
qualifiedName
)
throws
Exception
{
Map
<
String
,
String
>
attributes
=
Collections
.
singletonMap
(
QUALIFIED_NAME
,
qualifiedName
);
return
atlasClientV2
.
getEntityByAttribute
(
typeName
,
attributes
).
getEntity
();
}
private
AtlasEntity
getNameSpaceEntity
(
String
nameSpace
){
AtlasEntity
ret
=
new
AtlasEntity
(
NAMESPACE_TYPE
);
String
qualifiedName
=
getNameSpaceQualifiedName
(
clusterName
,
nameSpace
);
ret
.
setAttribute
(
QUALIFIED_NAME
,
qualifiedName
);
ret
.
setAttribute
(
CLUSTERNAME
,
clusterName
);
ret
.
setAttribute
(
NAME
,
nameSpace
);
ret
.
setAttribute
(
DESCRIPTION_ATTR
,
nameSpace
);
return
ret
;
}
private
AtlasEntity
getTableEntity
(
String
nameSpace
,
String
tableName
,
String
owner
,
AtlasEntity
nameSpaceEntity
,
HTableDescriptor
htd
)
{
AtlasEntity
ret
=
new
AtlasEntity
(
TABLE_TYPE
);
String
tableQualifiedName
=
getTableQualifiedName
(
clusterName
,
nameSpace
,
tableName
);
ret
.
setAttribute
(
QUALIFIED_NAME
,
tableQualifiedName
);
ret
.
setAttribute
(
CLUSTERNAME
,
clusterName
);
ret
.
setAttribute
(
NAMESPACE
,
AtlasTypeUtil
.
getAtlasObjectId
(
nameSpaceEntity
));
ret
.
setAttribute
(
NAME
,
tableName
);
ret
.
setAttribute
(
DESCRIPTION_ATTR
,
tableName
);
ret
.
setAttribute
(
OWNER
,
owner
);
ret
.
setAttribute
(
URI
,
tableName
);
ret
.
setAttribute
(
ATTR_TABLE_MAX_FILESIZE
,
htd
.
getMaxFileSize
());
ret
.
setAttribute
(
ATTR_TABLE_REPLICATION_PER_REGION
,
htd
.
getRegionReplication
());
ret
.
setAttribute
(
ATTR_TABLE_ISREADONLY
,
htd
.
isReadOnly
());
ret
.
setAttribute
(
ATTR_TABLE_ISCOMPACTION_ENABLED
,
htd
.
isCompactionEnabled
());
ret
.
setAttribute
(
ATTR_TABLE_DURABLILITY
,
(
htd
.
getDurability
()
!=
null
?
htd
.
getDurability
().
name
()
:
null
));
return
ret
;
}
private
AtlasEntity
getColumnFamilyEntity
(
String
nameSpace
,
String
tableName
,
String
owner
,
HColumnDescriptor
hcdt
,
AtlasObjectId
tableId
){
AtlasEntity
ret
=
new
AtlasEntity
(
COLUMNFAMILY_TYPE
);
String
cfName
=
hcdt
.
getNameAsString
();
String
cfQualifiedName
=
getColumnFamilyQualifiedName
(
clusterName
,
nameSpace
,
tableName
,
cfName
);
ret
.
setAttribute
(
QUALIFIED_NAME
,
cfQualifiedName
);
ret
.
setAttribute
(
CLUSTERNAME
,
clusterName
);
ret
.
setAttribute
(
TABLE
,
tableId
);
ret
.
setAttribute
(
NAME
,
cfName
);
ret
.
setAttribute
(
DESCRIPTION_ATTR
,
cfName
);
ret
.
setAttribute
(
OWNER
,
owner
);
ret
.
setAttribute
(
ATTR_CF_BLOCK_CACHE_ENABLED
,
hcdt
.
isBlockCacheEnabled
());
ret
.
setAttribute
(
ATTR_CF_BLOOMFILTER_TYPE
,
(
hcdt
.
getBloomFilterType
()
!=
null
?
hcdt
.
getBloomFilterType
().
name
():
null
));
ret
.
setAttribute
(
ATTR_CF_CACHED_BLOOM_ON_WRITE
,
hcdt
.
isCacheBloomsOnWrite
());
ret
.
setAttribute
(
ATTR_CF_CACHED_DATA_ON_WRITE
,
hcdt
.
isCacheDataOnWrite
());
ret
.
setAttribute
(
ATTR_CF_CACHED_INDEXES_ON_WRITE
,
hcdt
.
isCacheIndexesOnWrite
());
ret
.
setAttribute
(
ATTR_CF_COMPACTION_COMPRESSION_TYPE
,
(
hcdt
.
getCompactionCompressionType
()
!=
null
?
hcdt
.
getCompactionCompressionType
().
name
():
null
));
ret
.
setAttribute
(
ATTR_CF_COMPRESSION_TYPE
,
(
hcdt
.
getCompressionType
()
!=
null
?
hcdt
.
getCompressionType
().
name
():
null
));
ret
.
setAttribute
(
ATTR_CF_DATA_BLOCK_ENCODING
,
(
hcdt
.
getDataBlockEncoding
()
!=
null
?
hcdt
.
getDataBlockEncoding
().
name
():
null
));
ret
.
setAttribute
(
ATTR_CF_ENCRYPTION_TYPE
,
hcdt
.
getEncryptionType
());
ret
.
setAttribute
(
ATTR_CF_EVICT_BLOCK_ONCLOSE
,
hcdt
.
isEvictBlocksOnClose
());
ret
.
setAttribute
(
ATTR_CF_KEEP_DELETE_CELLS
,
(
hcdt
.
getKeepDeletedCells
()
!=
null
?
hcdt
.
getKeepDeletedCells
().
name
():
null
));
ret
.
setAttribute
(
ATTR_CF_MAX_VERSIONS
,
hcdt
.
getMaxVersions
());
ret
.
setAttribute
(
ATTR_CF_MIN_VERSIONS
,
hcdt
.
getMinVersions
());
ret
.
setAttribute
(
ATTR_CF_PREFETCH_BLOCK_ONOPEN
,
hcdt
.
isPrefetchBlocksOnOpen
());
ret
.
setAttribute
(
ATTR_CF_TTL
,
hcdt
.
getTimeToLive
());
return
ret
;
}
private
AtlasEntity
createEntityInAtlas
(
AtlasEntity
entity
)
throws
Exception
{
AtlasEntity
ret
=
null
;
EntityMutationResponse
response
=
atlasClientV2
.
createEntity
(
new
AtlasEntity
.
AtlasEntityWithExtInfo
(
entity
));
List
<
AtlasEntityHeader
>
entities
=
response
.
getEntitiesByOperation
(
EntityMutations
.
EntityOperation
.
CREATE
);
if
(
CollectionUtils
.
isNotEmpty
(
entities
))
{
AtlasEntity
.
AtlasEntityWithExtInfo
getByGuidResponse
=
atlasClientV2
.
getEntityByGuid
(
entities
.
get
(
0
).
getGuid
());
ret
=
getByGuidResponse
.
getEntity
();
LOG
.
info
(
"Created entity: type="
+
ret
.
getTypeName
()
+
", guid="
+
ret
.
getGuid
());
}
return
ret
;
}
private
void
checkArgs
(
String
[]
args
)
throws
Exception
{
String
option
=
args
.
length
>
0
?
args
[
0
]
:
null
;
String
value
=
args
.
length
>
1
?
args
[
1
]
:
null
;
if
(
option
!=
null
&&
value
==
null
)
{
if
(
option
.
equalsIgnoreCase
(
NAMESPACE_FLAG
)
||
option
.
equalsIgnoreCase
(
NAMESPACE_FULL_FLAG
)
||
option
.
equalsIgnoreCase
(
TABLE_FLAG
)
||
option
.
equalsIgnoreCase
(
TABLE_FULL_FLAG
))
{
System
.
out
.
println
(
"Usage: import-hbase.sh [-n <namespace> OR --namespace <namespace>] [-t <table> OR --table <table>]"
);
throw
new
Exception
(
"Incorrect arguments.."
);
}
}
}
/**
* Construct the qualified name used to uniquely identify a ColumnFamily instance in Atlas.
* @param clusterName Name of the cluster to which the Hbase component belongs
* @param nameSpace Name of the Hbase database to which the Table belongs
* @param tableName Name of the Hbase table
* @param columnFamily Name of the ColumnFamily
* @return Unique qualified name to identify the Table instance in Atlas.
*/
private
static
String
getColumnFamilyQualifiedName
(
String
clusterName
,
String
nameSpace
,
String
tableName
,
String
columnFamily
)
{
tableName
=
stripNameSpace
(
tableName
.
toLowerCase
());
return
String
.
format
(
HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT
,
nameSpace
.
toLowerCase
(),
tableName
,
columnFamily
.
toLowerCase
(),
clusterName
);
}
/**
* Construct the qualified name used to uniquely identify a Table instance in Atlas.
* @param clusterName Name of the cluster to which the Hbase component belongs
* @param nameSpace Name of the Hbase database to which the Table belongs
* @param tableName Name of the Hbase table
* @return Unique qualified name to identify the Table instance in Atlas.
*/
private
static
String
getTableQualifiedName
(
String
clusterName
,
String
nameSpace
,
String
tableName
)
{
tableName
=
stripNameSpace
(
tableName
.
toLowerCase
());
return
String
.
format
(
HBASE_TABLE_QUALIFIED_NAME_FORMAT
,
nameSpace
.
toLowerCase
(),
tableName
,
clusterName
);
}
/**
* Construct the qualified name used to uniquely identify a Hbase NameSpace instance in Atlas.
* @param clusterName Name of the cluster to which the Hbase component belongs
* @param nameSpace Name of the NameSpace
* @return Unique qualified name to identify the HBase NameSpace instance in Atlas.
*/
private
static
String
getNameSpaceQualifiedName
(
String
clusterName
,
String
nameSpace
)
{
return
String
.
format
(
HBASE_NAMESPACE_QUALIFIED_NAME
,
nameSpace
.
toLowerCase
(),
clusterName
);
}
private
static
String
stripNameSpace
(
String
tableName
){
tableName
=
tableName
.
substring
(
tableName
.
indexOf
(
":"
)+
1
);
return
tableName
;
}
}
This diff is collapsed.
Click to expand it.
addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
View file @
7f5a665e
...
...
@@ -44,6 +44,7 @@ import org.apache.commons.cli.CommandLine;
import
org.apache.commons.cli.Options
;
import
org.apache.commons.collections.MapUtils
;
import
org.apache.commons.configuration.Configuration
;
import
org.apache.commons.lang.ArrayUtils
;
import
org.apache.commons.lang.RandomStringUtils
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.hadoop.fs.Path
;
...
...
@@ -62,6 +63,9 @@ import org.apache.hadoop.security.UserGroupInformation;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.io.BufferedReader
;
import
java.io.File
;
import
java.io.FileReader
;
import
java.util.ArrayList
;
import
java.util.Collections
;
import
java.util.List
;
...
...
@@ -102,14 +106,16 @@ public class HiveMetaStoreBridge {
try
{
Options
options
=
new
Options
();
options
.
addOption
(
"d"
,
"database"
,
true
,
"Datab
b
ase name"
);
options
.
addOption
(
"d"
,
"database"
,
true
,
"Database name"
);
options
.
addOption
(
"t"
,
"table"
,
true
,
"Table name"
);
options
.
addOption
(
"f"
,
"filename"
,
true
,
"Filename"
);
options
.
addOption
(
"failOnError"
,
false
,
"failOnError"
);
CommandLine
cmd
=
new
BasicParser
().
parse
(
options
,
args
);
boolean
failOnError
=
cmd
.
hasOption
(
"failOnError"
);
String
databaseToImport
=
cmd
.
getOptionValue
(
"d"
);
String
tableToImport
=
cmd
.
getOptionValue
(
"t"
);
String
fileToImport
=
cmd
.
getOptionValue
(
"f"
);
Configuration
atlasConf
=
ApplicationProperties
.
get
();
String
[]
atlasEndpoint
=
atlasConf
.
getStringArray
(
ATLAS_ENDPOINT
);
...
...
@@ -131,12 +137,40 @@ public class HiveMetaStoreBridge {
HiveMetaStoreBridge
hiveMetaStoreBridge
=
new
HiveMetaStoreBridge
(
atlasConf
,
new
HiveConf
(),
atlasClientV2
);
if
(
StringUtils
.
isNotEmpty
(
fileToImport
))
{
File
f
=
new
File
(
fileToImport
);
if
(
f
.
exists
()
&&
f
.
canRead
())
{
BufferedReader
br
=
new
BufferedReader
(
new
FileReader
(
f
));
String
line
=
null
;
while
((
line
=
br
.
readLine
())
!=
null
)
{
String
val
[]
=
line
.
split
(
":"
);
if
(
ArrayUtils
.
isNotEmpty
(
val
))
{
databaseToImport
=
val
[
0
];
if
(
val
.
length
>
1
)
{
tableToImport
=
val
[
1
];
}
else
{
tableToImport
=
""
;
}
hiveMetaStoreBridge
.
importHiveMetadata
(
databaseToImport
,
tableToImport
,
failOnError
);
}
}
exitCode
=
EXIT_CODE_SUCCESS
;
}
else
{
LOG
.
error
(
"Failed to read the input file: "
+
fileToImport
);
}
}
else
{
hiveMetaStoreBridge
.
importHiveMetadata
(
databaseToImport
,
tableToImport
,
failOnError
);
}
exitCode
=
EXIT_CODE_SUCCESS
;
}
catch
(
ParseException
e
)
{
LOG
.
error
(
"Failed to parse arguments. Error: "
,
e
.
getMessage
());
printUsage
();
}
catch
(
Exception
e
)
{
LOG
.
error
(
"Import failed"
,
e
);
...
...
@@ -157,6 +191,12 @@ public class HiveMetaStoreBridge {
System
.
out
.
println
(
"Usage 3: import-hive.sh"
);
System
.
out
.
println
(
" Imports all databases and tables..."
);
System
.
out
.
println
();
System
.
out
.
println
(
"Usage 4: import-hive.sh -f <filename>"
);
System
.
out
.
println
(
" Imports all databases and tables in the file..."
);
System
.
out
.
println
(
" Format:"
);
System
.
out
.
println
(
" database1:tbl1"
);
System
.
out
.
println
(
" database1:tbl2"
);
System
.
out
.
println
(
" database2:tbl2"
);
System
.
out
.
println
();
}
...
...
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment