Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
atlas
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
dataplatform
atlas
Commits
fad2822f
Commit
fad2822f
authored
May 07, 2019
by
Ashutosh Mestry
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
ATLAS-3193: Import to existing entity.
parent
5892de67
Show whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
466 additions
and
123 deletions
+466
-123
Constants.java
.../src/main/java/org/apache/atlas/repository/Constants.java
+1
-0
GraphBackedSearchIndexer.java
...ache/atlas/repository/graph/GraphBackedSearchIndexer.java
+2
-0
ExportService.java
...ava/org/apache/atlas/repository/impexp/ExportService.java
+7
-118
ImportService.java
...ava/org/apache/atlas/repository/impexp/ImportService.java
+1
-1
StartEntityFetchByExportRequest.java
...as/repository/impexp/StartEntityFetchByExportRequest.java
+207
-0
BulkImporterImpl.java
...che/atlas/repository/store/graph/v2/BulkImporterImpl.java
+81
-3
EntityGraphRetriever.java
...atlas/repository/store/graph/v2/EntityGraphRetriever.java
+7
-0
ImportServiceTest.java
...org/apache/atlas/repository/impexp/ImportServiceTest.java
+36
-0
StartEntityFetchByExportRequestTest.java
...epository/impexp/StartEntityFetchByExportRequestTest.java
+112
-0
BulkImportPercentTest.java
...tlas/repository/store/graph/v2/BulkImportPercentTest.java
+12
-1
No files found.
common/src/main/java/org/apache/atlas/repository/Constants.java
View file @
fad2822f
...
...
@@ -38,6 +38,7 @@ public final class Constants {
public
static
final
String
RELATIONSHIP_PROPERTY_KEY_PREFIX
=
"_r"
;
public
static
final
String
GUID_PROPERTY_KEY
=
encodePropertyKey
(
INTERNAL_PROPERTY_KEY_PREFIX
+
"guid"
);
public
static
final
String
RELATIONSHIP_GUID_PROPERTY_KEY
=
encodePropertyKey
(
RELATIONSHIP_PROPERTY_KEY_PREFIX
+
GUID_PROPERTY_KEY
);
public
static
final
String
HISTORICAL_GUID_PROPERTY_KEY
=
encodePropertyKey
(
INTERNAL_PROPERTY_KEY_PREFIX
+
"historicalGuids"
);
/**
* Entity type name property key.
...
...
repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
View file @
fad2822f
...
...
@@ -267,6 +267,8 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
// create vertex indexes
createVertexIndex
(
management
,
GUID_PROPERTY_KEY
,
UniqueKind
.
GLOBAL_UNIQUE
,
String
.
class
,
SINGLE
,
true
,
false
);
createVertexIndex
(
management
,
HISTORICAL_GUID_PROPERTY_KEY
,
UniqueKind
.
GLOBAL_UNIQUE
,
String
.
class
,
SINGLE
,
true
,
false
);
createVertexIndex
(
management
,
TYPENAME_PROPERTY_KEY
,
UniqueKind
.
GLOBAL_UNIQUE
,
String
.
class
,
SINGLE
,
true
,
false
);
createVertexIndex
(
management
,
TYPESERVICETYPE_PROPERTY_KEY
,
UniqueKind
.
NONE
,
String
.
class
,
SINGLE
,
true
,
false
);
createVertexIndex
(
management
,
VERTEX_TYPE_PROPERTY_KEY
,
UniqueKind
.
NONE
,
String
.
class
,
SINGLE
,
true
,
false
);
...
...
repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
View file @
fad2822f
...
...
@@ -18,7 +18,6 @@
package
org
.
apache
.
atlas
.
repository
.
impexp
;
import
com.google.common.annotations.VisibleForTesting
;
import
org.apache.atlas.AtlasErrorCode
;
import
org.apache.atlas.RequestContext
;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.model.TypeCategory
;
...
...
@@ -53,8 +52,6 @@ import org.apache.atlas.type.AtlasTypeUtil;
import
org.apache.atlas.util.AtlasGremlinQueryProvider
;
import
org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.collections.MapUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.stereotype.Component
;
...
...
@@ -76,6 +73,7 @@ public class ExportService {
private
final
AtlasTypeRegistry
typeRegistry
;
private
final
String
QUERY_BINDING_START_GUID
=
"startGuid"
;
private
final
StartEntityFetchByExportRequest
startEntityFetchByExportRequest
;
private
AuditsWriter
auditsWriter
;
private
final
AtlasGraph
atlasGraph
;
private
final
EntityGraphRetriever
entityGraphRetriever
;
...
...
@@ -93,6 +91,7 @@ public class ExportService {
this
.
gremlinQueryProvider
=
AtlasGremlinQueryProvider
.
INSTANCE
;
this
.
auditsWriter
=
auditsWriter
;
this
.
hdfsPathEntityCreator
=
hdfsPathEntityCreator
;
this
.
startEntityFetchByExportRequest
=
new
StartEntityFetchByExportRequest
(
atlasGraph
,
typeRegistry
,
AtlasGremlinQueryProvider
.
INSTANCE
);
}
public
AtlasExportResult
run
(
ZipSink
exportSink
,
AtlasExportRequest
request
,
String
userName
,
String
hostName
,
...
...
@@ -244,119 +243,20 @@ public class ExportService {
return
AtlasExportResult
.
OperationStatus
.
SUCCESS
;
}
private
void
debugLog
(
String
s
,
Object
...
params
)
{
if
(!
LOG
.
isDebugEnabled
())
{
return
;
}
LOG
.
debug
(
s
,
params
);
}
private
List
<
String
>
getStartingEntity
(
AtlasObjectId
item
,
ExportContext
context
)
throws
AtlasBaseException
{
List
<
String
>
ret
=
null
;
if
(
item
.
getTypeName
().
equalsIgnoreCase
(
HdfsPathEntityCreator
.
HDFS_PATH_TYPE
))
{
hdfsPathEntityCreator
.
getCreateEntity
(
item
);
}
if
(
StringUtils
.
isNotEmpty
(
item
.
getGuid
()))
{
ret
=
Collections
.
singletonList
(
item
.
getGuid
());
}
else
if
(
StringUtils
.
equalsIgnoreCase
(
context
.
matchType
,
MATCH_TYPE_FOR_TYPE
)
&&
StringUtils
.
isNotEmpty
(
item
.
getTypeName
()))
{
ret
=
getStartingEntityForMatchTypeForType
(
item
,
context
);
}
else
if
(
StringUtils
.
isNotEmpty
(
item
.
getTypeName
())
&&
MapUtils
.
isNotEmpty
(
item
.
getUniqueAttributes
()))
{
ret
=
getStartingEntityUsingQueryTemplate
(
item
,
context
,
ret
);
}
if
(
ret
==
null
)
{
ret
=
Collections
.
emptyList
();
}
logInfoStartingEntitiesFound
(
item
,
context
,
ret
);
return
ret
;
return
startEntityFetchByExportRequest
.
get
(
context
.
result
.
getRequest
(),
item
);
}
private
List
<
String
>
getStartingEntityUsingQueryTemplate
(
AtlasObjectId
item
,
ExportContext
context
,
List
<
String
>
ret
)
throws
AtlasBaseException
{
final
String
queryTemplate
=
getQueryTemplateForMatchType
(
context
);
final
String
typeName
=
item
.
getTypeName
();
final
AtlasEntityType
entityType
=
typeRegistry
.
getEntityTypeByName
(
typeName
);
if
(
entityType
==
null
)
{
throw
new
AtlasBaseException
(
AtlasErrorCode
.
UNKNOWN_TYPENAME
,
typeName
);
}
for
(
Map
.
Entry
<
String
,
Object
>
e
:
item
.
getUniqueAttributes
().
entrySet
())
{
String
attrName
=
e
.
getKey
();
Object
attrValue
=
e
.
getValue
();
AtlasAttribute
attribute
=
entityType
.
getAttribute
(
attrName
);
if
(
attribute
==
null
||
attrValue
==
null
)
{
continue
;
}
setupBindingsForTypeNameAttrNameAttrValue
(
context
,
typeName
,
attrValue
,
attribute
);
List
<
String
>
guids
=
executeGremlinQueryForGuids
(
queryTemplate
,
context
);
if
(
CollectionUtils
.
isNotEmpty
(
guids
))
{
if
(
ret
==
null
)
{
ret
=
new
ArrayList
<>();
}
for
(
String
guid
:
guids
)
{
if
(!
ret
.
contains
(
guid
))
{
ret
.
add
(
guid
);
}
}
}
}
return
ret
;
}
private
List
<
String
>
getStartingEntityForMatchTypeForType
(
AtlasObjectId
item
,
ExportContext
context
)
{
setupBindingsForTypeName
(
context
,
item
.
getTypeName
());
return
executeGremlinQueryForGuids
(
getQueryTemplateForMatchType
(
context
),
context
);
}
private
void
logInfoStartingEntitiesFound
(
AtlasObjectId
item
,
ExportContext
context
,
List
<
String
>
ret
)
{
LOG
.
info
(
"export(item={}; matchType={}, fetchType={}): found {} entities: options: {}"
,
item
,
context
.
matchType
,
context
.
fetchType
,
ret
.
size
(),
AtlasType
.
toJson
(
context
.
result
.
getRequest
()));
}
private
void
setupBindingsForTypeName
(
ExportContext
context
,
String
typeName
)
{
context
.
bindings
.
clear
();
context
.
bindings
.
put
(
"typeName"
,
new
HashSet
<
String
>(
Arrays
.
asList
(
StringUtils
.
split
(
typeName
,
","
))));
}
private
void
setupBindingsForTypeNameAttrNameAttrValue
(
ExportContext
context
,
String
typeName
,
Object
attrValue
,
AtlasAttribute
attribute
)
{
context
.
bindings
.
clear
();
context
.
bindings
.
put
(
"typeName"
,
typeName
);
context
.
bindings
.
put
(
"attrName"
,
attribute
.
getQualifiedName
());
context
.
bindings
.
put
(
"attrValue"
,
attrValue
);
}
private
String
getQueryTemplateForMatchType
(
ExportContext
context
)
{
if
(
StringUtils
.
equalsIgnoreCase
(
context
.
matchType
,
MATCH_TYPE_STARTS_WITH
))
{
return
gremlinQueryProvider
.
getQuery
(
AtlasGremlinQuery
.
EXPORT_TYPE_STARTS_WITH
);
}
if
(
StringUtils
.
equalsIgnoreCase
(
context
.
matchType
,
MATCH_TYPE_ENDS_WITH
))
{
return
gremlinQueryProvider
.
getQuery
(
AtlasGremlinQuery
.
EXPORT_TYPE_ENDS_WITH
);
}
if
(
StringUtils
.
equalsIgnoreCase
(
context
.
matchType
,
MATCH_TYPE_CONTAINS
))
{
return
gremlinQueryProvider
.
getQuery
(
AtlasGremlinQuery
.
EXPORT_TYPE_CONTAINS
);
}
if
(
StringUtils
.
equalsIgnoreCase
(
context
.
matchType
,
MATCH_TYPE_MATCHES
))
{
return
gremlinQueryProvider
.
getQuery
(
AtlasGremlinQuery
.
EXPORT_TYPE_MATCHES
);
}
if
(
StringUtils
.
equalsIgnoreCase
(
context
.
matchType
,
MATCH_TYPE_FOR_TYPE
))
{
return
gremlinQueryProvider
.
getQuery
(
AtlasGremlinQuery
.
EXPORT_TYPE_ALL_FOR_TYPE
);
private
void
debugLog
(
String
s
,
Object
...
params
)
{
if
(!
LOG
.
isDebugEnabled
())
{
return
;
}
return
gremlinQueryProvider
.
getQuery
(
AtlasGremlinQuery
.
EXPORT_TYPE_DEFAULT
);
LOG
.
debug
(
s
,
params
);
}
private
void
processEntityGuid
(
String
guid
,
ExportContext
context
)
throws
AtlasBaseException
{
...
...
@@ -704,15 +604,6 @@ public class ExportService {
}
}
private
List
<
String
>
executeGremlinQueryForGuids
(
String
query
,
ExportContext
context
)
{
try
{
return
(
List
<
String
>)
atlasGraph
.
executeGremlinScript
(
context
.
scriptEngine
,
context
.
bindings
,
query
,
false
);
}
catch
(
ScriptException
e
)
{
LOG
.
error
(
"Script execution failed for query: "
,
query
,
e
);
return
null
;
}
}
public
enum
TraversalDirection
{
UNKNOWN
,
INWARD
,
...
...
@@ -764,7 +655,6 @@ public class ExportService {
private
final
ScriptEngine
scriptEngine
;
private
final
Map
<
String
,
Object
>
bindings
;
private
final
ExportFetchType
fetchType
;
private
final
String
matchType
;
private
final
boolean
skipLineage
;
private
final
long
changeMarker
;
private
final
boolean
isHiveDBIncremental
;
...
...
@@ -778,7 +668,6 @@ public class ExportService {
scriptEngine
=
atlasGraph
.
getGremlinScriptEngine
();
bindings
=
new
HashMap
<>();
fetchType
=
ExportFetchType
.
from
(
result
.
getRequest
().
getFetchTypeOptionValue
());
matchType
=
result
.
getRequest
().
getMatchTypeOptionValue
();
skipLineage
=
result
.
getRequest
().
getSkipLineageOptionValue
();
this
.
changeMarker
=
result
.
getRequest
().
getChangeTokenFromOptions
();
this
.
isHiveDBIncremental
=
checkHiveDBIncrementalSkipLineage
(
result
.
getRequest
());
...
...
repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
View file @
fad2822f
...
...
@@ -215,11 +215,11 @@ public class ImportService {
}
private
void
processEntities
(
String
userName
,
ZipSource
importSource
,
AtlasImportResult
result
)
throws
AtlasBaseException
{
result
.
setExportResult
(
importSource
.
getExportResult
());
this
.
bulkImporter
.
bulkImport
(
importSource
,
result
);
endTimestamp
=
System
.
currentTimeMillis
();
result
.
incrementMeticsCounter
(
"duration"
,
getDuration
(
this
.
endTimestamp
,
this
.
startTimestamp
));
result
.
setExportResult
(
importSource
.
getExportResult
());
result
.
setOperationStatus
(
AtlasImportResult
.
OperationStatus
.
SUCCESS
);
auditsWriter
.
write
(
userName
,
result
,
startTimestamp
,
endTimestamp
,
importSource
.
getCreationOrder
());
...
...
repository/src/main/java/org/apache/atlas/repository/impexp/StartEntityFetchByExportRequest.java
0 → 100644
View file @
fad2822f
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
repository
.
impexp
;
import
com.google.common.annotations.VisibleForTesting
;
import
org.apache.atlas.AtlasErrorCode
;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.model.impexp.AtlasExportRequest
;
import
org.apache.atlas.model.instance.AtlasObjectId
;
import
org.apache.atlas.repository.graphdb.AtlasGraph
;
import
org.apache.atlas.type.AtlasEntityType
;
import
org.apache.atlas.type.AtlasStructType
;
import
org.apache.atlas.type.AtlasType
;
import
org.apache.atlas.type.AtlasTypeRegistry
;
import
org.apache.atlas.util.AtlasGremlinQueryProvider
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.collections.MapUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
javax.script.ScriptEngine
;
import
javax.script.ScriptException
;
import
java.util.ArrayList
;
import
java.util.Arrays
;
import
java.util.HashMap
;
import
java.util.HashSet
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Set
;
import
static
org
.
apache
.
atlas
.
model
.
impexp
.
AtlasExportRequest
.
MATCH_TYPE_CONTAINS
;
import
static
org
.
apache
.
atlas
.
model
.
impexp
.
AtlasExportRequest
.
MATCH_TYPE_ENDS_WITH
;
import
static
org
.
apache
.
atlas
.
model
.
impexp
.
AtlasExportRequest
.
MATCH_TYPE_FOR_TYPE
;
import
static
org
.
apache
.
atlas
.
model
.
impexp
.
AtlasExportRequest
.
MATCH_TYPE_MATCHES
;
import
static
org
.
apache
.
atlas
.
model
.
impexp
.
AtlasExportRequest
.
MATCH_TYPE_STARTS_WITH
;
public
class
StartEntityFetchByExportRequest
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
StartEntityFetchByExportRequest
.
class
);
static
final
String
DEFAULT_MATCH
=
"*"
;
static
final
String
BINDING_PARAMETER_TYPENAME
=
"typeName"
;
static
final
String
BINDING_PARAMETER_ATTR_NAME
=
"attrName"
;
static
final
String
BINDING_PARAMTER_ATTR_VALUE
=
"attrValue"
;
private
AtlasGraph
atlasGraph
;
private
AtlasTypeRegistry
typeRegistry
;
private
Map
<
String
,
String
>
matchTypeQuery
;
public
StartEntityFetchByExportRequest
(
AtlasGraph
atlasGraph
,
AtlasTypeRegistry
typeRegistry
,
AtlasGremlinQueryProvider
gremlinQueryProvider
)
{
this
.
typeRegistry
=
typeRegistry
;
this
.
atlasGraph
=
atlasGraph
;
initMatchTypeQueryMap
(
gremlinQueryProvider
);
}
private
void
initMatchTypeQueryMap
(
AtlasGremlinQueryProvider
gremlinQueryProvider
)
{
matchTypeQuery
=
new
HashMap
<
String
,
String
>()
{{
put
(
MATCH_TYPE_STARTS_WITH
,
gremlinQueryProvider
.
getQuery
(
AtlasGremlinQueryProvider
.
AtlasGremlinQuery
.
EXPORT_TYPE_STARTS_WITH
));
put
(
MATCH_TYPE_ENDS_WITH
,
gremlinQueryProvider
.
getQuery
(
AtlasGremlinQueryProvider
.
AtlasGremlinQuery
.
EXPORT_TYPE_ENDS_WITH
));
put
(
MATCH_TYPE_CONTAINS
,
gremlinQueryProvider
.
getQuery
(
AtlasGremlinQueryProvider
.
AtlasGremlinQuery
.
EXPORT_TYPE_CONTAINS
));
put
(
MATCH_TYPE_MATCHES
,
gremlinQueryProvider
.
getQuery
(
AtlasGremlinQueryProvider
.
AtlasGremlinQuery
.
EXPORT_TYPE_MATCHES
));
put
(
MATCH_TYPE_FOR_TYPE
,
gremlinQueryProvider
.
getQuery
(
AtlasGremlinQueryProvider
.
AtlasGremlinQuery
.
EXPORT_TYPE_ALL_FOR_TYPE
));
put
(
DEFAULT_MATCH
,
gremlinQueryProvider
.
getQuery
(
AtlasGremlinQueryProvider
.
AtlasGremlinQuery
.
EXPORT_TYPE_DEFAULT
));
}};
}
public
List
<
AtlasObjectId
>
get
(
AtlasExportRequest
exportRequest
)
{
List
<
AtlasObjectId
>
list
=
new
ArrayList
<>();
for
(
AtlasObjectId
objectId
:
exportRequest
.
getItemsToExport
())
{
List
<
String
>
guids
=
get
(
exportRequest
,
objectId
);
if
(
guids
.
isEmpty
())
{
continue
;
}
objectId
.
setGuid
(
guids
.
get
(
0
));
list
.
add
(
objectId
);
}
return
list
;
}
public
List
<
String
>
get
(
AtlasExportRequest
exportRequest
,
AtlasObjectId
item
)
{
List
<
String
>
ret
=
new
ArrayList
<>();
String
matchType
=
exportRequest
.
getMatchTypeOptionValue
();
try
{
if
(
StringUtils
.
isNotEmpty
(
item
.
getGuid
()))
{
ret
.
add
(
item
.
getGuid
());
return
ret
;
}
if
(
StringUtils
.
equalsIgnoreCase
(
matchType
,
MATCH_TYPE_FOR_TYPE
)
&&
StringUtils
.
isNotEmpty
(
item
.
getTypeName
()))
{
ret
=
getEntitiesForMatchTypeType
(
item
,
matchType
);
return
ret
;
}
if
(
StringUtils
.
isNotEmpty
(
item
.
getTypeName
())
&&
MapUtils
.
isNotEmpty
(
item
.
getUniqueAttributes
()))
{
ret
=
getEntitiesForMatchTypeUsingUniqueAttributes
(
item
,
matchType
);
return
ret
;
}
}
catch
(
AtlasBaseException
ex
)
{
LOG
.
error
(
"Error fetching starting entity for: {}"
,
item
,
ex
);
}
finally
{
LOG
.
info
(
"export(item={}; matchType={}, fetchType={}): found {} entities: options: {}"
,
item
,
exportRequest
.
getMatchTypeOptionValue
(),
exportRequest
.
getFetchTypeOptionValue
(),
ret
.
size
(),
AtlasType
.
toJson
(
exportRequest
));
}
return
ret
;
}
private
List
<
String
>
getEntitiesForMatchTypeUsingUniqueAttributes
(
AtlasObjectId
item
,
String
matchType
)
throws
AtlasBaseException
{
final
String
queryTemplate
=
getQueryTemplateForMatchType
(
matchType
);
final
String
typeName
=
item
.
getTypeName
();
final
AtlasEntityType
entityType
=
typeRegistry
.
getEntityTypeByName
(
typeName
);
Set
<
String
>
ret
=
new
HashSet
<>();
if
(
entityType
==
null
)
{
throw
new
AtlasBaseException
(
AtlasErrorCode
.
UNKNOWN_TYPENAME
,
typeName
);
}
for
(
Map
.
Entry
<
String
,
Object
>
e
:
item
.
getUniqueAttributes
().
entrySet
())
{
String
attrName
=
e
.
getKey
();
Object
attrValue
=
e
.
getValue
();
AtlasStructType
.
AtlasAttribute
attribute
=
entityType
.
getAttribute
(
attrName
);
if
(
attribute
==
null
||
attrValue
==
null
)
{
continue
;
}
List
<
String
>
guids
=
executeGremlinQuery
(
queryTemplate
,
getBindingsForObjectId
(
typeName
,
attribute
.
getQualifiedName
(),
e
.
getValue
()));
if
(!
CollectionUtils
.
isNotEmpty
(
guids
))
{
continue
;
}
ret
.
addAll
(
guids
);
}
return
new
ArrayList
<>(
ret
);
}
private
List
<
String
>
getEntitiesForMatchTypeType
(
AtlasObjectId
item
,
String
matchType
)
{
return
executeGremlinQuery
(
getQueryTemplateForMatchType
(
matchType
),
getBindingsForTypeName
(
item
.
getTypeName
()));
}
@VisibleForTesting
String
getQueryTemplateForMatchType
(
String
matchType
)
{
return
matchTypeQuery
.
containsKey
(
matchType
)
?
matchTypeQuery
.
get
(
matchType
)
:
matchTypeQuery
.
get
(
DEFAULT_MATCH
);
}
private
HashMap
<
String
,
Object
>
getBindingsForTypeName
(
String
typeName
)
{
return
new
HashMap
<
String
,
Object
>()
{{
put
(
BINDING_PARAMETER_TYPENAME
,
new
HashSet
<
String
>(
Arrays
.
asList
(
StringUtils
.
split
(
typeName
,
","
))));
}};
}
private
HashMap
<
String
,
Object
>
getBindingsForObjectId
(
String
typeName
,
String
attrName
,
Object
attrValue
)
{
return
new
HashMap
<
String
,
Object
>()
{{
put
(
BINDING_PARAMETER_TYPENAME
,
typeName
);
put
(
BINDING_PARAMETER_ATTR_NAME
,
attrName
);
put
(
BINDING_PARAMTER_ATTR_VALUE
,
attrValue
);
}};
}
@VisibleForTesting
List
<
String
>
executeGremlinQuery
(
String
query
,
Map
<
String
,
Object
>
bindings
)
{
try
{
return
(
List
<
String
>)
atlasGraph
.
executeGremlinScript
(
getScriptEngine
(),
bindings
,
query
,
false
);
}
catch
(
ScriptException
e
)
{
LOG
.
error
(
"Script execution failed for query: "
,
query
,
e
);
return
null
;
}
}
public
ScriptEngine
getScriptEngine
()
{
try
{
return
atlasGraph
.
getGremlinScriptEngine
();
}
catch
(
AtlasBaseException
e
)
{
LOG
.
error
(
"Error initializing script engine."
,
e
);
}
return
null
;
}
}
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
View file @
fad2822f
...
...
@@ -20,14 +20,25 @@ package org.apache.atlas.repository.store.graph.v2;
import
com.google.common.annotations.VisibleForTesting
;
import
org.apache.atlas.AtlasErrorCode
;
import
org.apache.atlas.RequestContext
;
import
org.apache.atlas.annotation.GraphTransaction
;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.model.impexp.AtlasExportRequest
;
import
org.apache.atlas.model.impexp.AtlasImportResult
;
import
org.apache.atlas.model.instance.AtlasEntity
;
import
org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo
;
import
org.apache.atlas.model.instance.AtlasEntityHeader
;
import
org.apache.atlas.model.instance.AtlasObjectId
;
import
org.apache.atlas.model.instance.EntityMutationResponse
;
import
org.apache.atlas.repository.Constants
;
import
org.apache.atlas.repository.graphdb.AtlasGraph
;
import
org.apache.atlas.repository.graphdb.AtlasSchemaViolationException
;
import
org.apache.atlas.repository.graphdb.AtlasVertex
;
import
org.apache.atlas.repository.impexp.StartEntityFetchByExportRequest
;
import
org.apache.atlas.repository.store.graph.AtlasEntityStore
;
import
org.apache.atlas.repository.store.graph.BulkImporter
;
import
org.apache.atlas.type.AtlasTypeRegistry
;
import
org.apache.atlas.util.AtlasGremlinQueryProvider
;
import
org.apache.commons.lang.StringUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.stereotype.Component
;
...
...
@@ -37,17 +48,26 @@ import java.util.ArrayList;
import
java.util.HashMap
;
import
java.util.HashSet
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Set
;
import
static
org
.
apache
.
atlas
.
repository
.
Constants
.
HISTORICAL_GUID_PROPERTY_KEY
;
@Component
public
class
BulkImporterImpl
implements
BulkImporter
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
AtlasEntityStoreV2
.
class
);
private
final
AtlasEntityStore
entityStore
;
private
final
EntityGraphRetriever
entityGraphRetriever
;
private
final
Map
<
AtlasObjectId
,
String
>
objectIdExistingGuidMap
;
private
final
StartEntityFetchByExportRequest
startEntityFetchByExportRequest
;
@Inject
public
BulkImporterImpl
(
Atlas
EntityStore
entityStore
)
{
public
BulkImporterImpl
(
Atlas
Graph
atlasGraph
,
AtlasEntityStore
entityStore
,
AtlasTypeRegistry
typeRegistry
)
{
this
.
entityStore
=
entityStore
;
this
.
entityGraphRetriever
=
new
EntityGraphRetriever
(
typeRegistry
);
this
.
objectIdExistingGuidMap
=
new
HashMap
<>();
this
.
startEntityFetchByExportRequest
=
new
StartEntityFetchByExportRequest
(
atlasGraph
,
typeRegistry
,
AtlasGremlinQueryProvider
.
INSTANCE
);
}
@Override
...
...
@@ -60,8 +80,9 @@ public class BulkImporterImpl implements BulkImporter {
throw
new
AtlasBaseException
(
AtlasErrorCode
.
INVALID_PARAMETERS
,
"no entities to create/update."
);
}
fetchExistingEntitiesNotCreatedByImport
(
importResult
.
getExportResult
().
getRequest
());
EntityMutationResponse
ret
=
new
EntityMutationResponse
();
ret
.
setGuidAssignments
(
new
HashMap
<
String
,
String
>());
ret
.
setGuidAssignments
(
new
HashMap
<>());
Set
<
String
>
processedGuids
=
new
HashSet
<>();
float
currentPercent
=
0
f
;
...
...
@@ -93,7 +114,15 @@ public class BulkImporterImpl implements BulkImporter {
if
(!
updateResidualList
(
e
,
residualList
,
entityWithExtInfo
.
getEntity
().
getGuid
()))
{
throw
e
;
}
}
catch
(
Throwable
e
)
{
}
catch
(
AtlasSchemaViolationException
e
)
{
AtlasObjectId
objectId
=
entityGraphRetriever
.
toAtlasObjectIdWithoutGuid
(
entity
);
if
(
objectIdExistingGuidMap
.
containsKey
(
objectId
))
{
updateVertexGuidIfImportingToNonImportedEntity
(
entity
,
objectId
);
}
continue
;
}
catch
(
Throwable
e
)
{
AtlasBaseException
abe
=
new
AtlasBaseException
(
e
);
if
(!
updateResidualList
(
abe
,
residualList
,
entityWithExtInfo
.
getEntity
().
getGuid
()))
{
...
...
@@ -110,6 +139,55 @@ public class BulkImporterImpl implements BulkImporter {
return
ret
;
}
private
void
fetchExistingEntitiesNotCreatedByImport
(
AtlasExportRequest
request
)
{
List
<
AtlasObjectId
>
objectIds
=
startEntityFetchByExportRequest
.
get
(
request
);
if
(
objectIds
.
isEmpty
())
{
return
;
}
for
(
AtlasObjectId
objectId
:
objectIds
)
{
String
existingGuid
=
objectId
.
getGuid
();
objectId
.
setGuid
(
null
);
objectIdExistingGuidMap
.
put
(
objectId
,
existingGuid
);
}
}
@GraphTransaction
public
void
updateVertexGuidIfImportingToNonImportedEntity
(
AtlasEntity
entity
,
AtlasObjectId
objectId
)
{
String
entityGuid
=
entity
.
getGuid
();
String
vertexGuid
=
objectIdExistingGuidMap
.
get
(
objectId
);
if
(
vertexGuid
.
equals
(
entityGuid
))
{
return
;
}
AtlasVertex
v
=
AtlasGraphUtilsV2
.
findByGuid
(
vertexGuid
);
if
(
v
==
null
)
{
return
;
}
addHistoricalGuid
(
v
,
vertexGuid
);
AtlasGraphUtilsV2
.
setProperty
(
v
,
Constants
.
GUID_PROPERTY_KEY
,
entityGuid
);
LOG
.
warn
(
"GUID Updated: Entity: {}: from: {}: to: {}"
,
objectId
,
vertexGuid
,
entity
.
getGuid
());
}
private
void
addHistoricalGuid
(
AtlasVertex
v
,
String
vertexGuid
)
{
String
existingJson
=
AtlasGraphUtilsV2
.
getProperty
(
v
,
HISTORICAL_GUID_PROPERTY_KEY
,
String
.
class
);
AtlasGraphUtilsV2
.
setProperty
(
v
,
HISTORICAL_GUID_PROPERTY_KEY
,
getJsonArray
(
existingJson
,
vertexGuid
));
}
@VisibleForTesting
static
String
getJsonArray
(
String
json
,
String
vertexGuid
)
{
String
quotedGuid
=
String
.
format
(
"\"%s\""
,
vertexGuid
);
if
(
StringUtils
.
isEmpty
(
json
))
{
json
=
String
.
format
(
"[%s]"
,
quotedGuid
);
}
else
{
json
=
json
.
replace
(
"]"
,
""
).
concat
(
","
).
concat
(
quotedGuid
).
concat
(
"]"
);
}
return
json
;
}
private
boolean
updateResidualList
(
AtlasBaseException
e
,
List
<
String
>
lineageList
,
String
guid
)
{
if
(!
e
.
getAtlasErrorCode
().
getErrorCode
().
equals
(
AtlasErrorCode
.
INVALID_OBJECT_ID
.
getErrorCode
()))
{
...
...
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
View file @
fad2822f
...
...
@@ -280,6 +280,13 @@ public class EntityGraphRetriever {
return
ret
;
}
public
AtlasObjectId
toAtlasObjectIdWithoutGuid
(
AtlasEntity
entity
)
{
AtlasObjectId
objectId
=
toAtlasObjectId
(
entity
);
objectId
.
setGuid
(
null
);
return
objectId
;
}
public
AtlasClassification
toAtlasClassification
(
AtlasVertex
classificationVertex
)
throws
AtlasBaseException
{
AtlasClassification
ret
=
new
AtlasClassification
(
getTypeName
(
classificationVertex
));
...
...
repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
View file @
fad2822f
...
...
@@ -28,8 +28,13 @@ import org.apache.atlas.model.impexp.AtlasImportRequest;
import
org.apache.atlas.model.instance.AtlasEntity
;
import
org.apache.atlas.model.instance.AtlasEntityHeader
;
import
org.apache.atlas.model.instance.AtlasRelatedObjectId
;
import
org.apache.atlas.model.instance.EntityMutationResponse
;
import
org.apache.atlas.repository.Constants
;
import
org.apache.atlas.repository.graph.AtlasGraphProvider
;
import
org.apache.atlas.repository.graphdb.AtlasVertex
;
import
org.apache.atlas.repository.store.graph.AtlasEntityStore
;
import
org.apache.atlas.repository.store.graph.v2.AtlasEntityStream
;
import
org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2
;
import
org.apache.atlas.runner.LocalSolrRunner
;
import
org.apache.atlas.store.AtlasTypeDefStore
;
import
org.apache.atlas.type.AtlasClassificationType
;
...
...
@@ -63,6 +68,7 @@ import static org.mockito.Mockito.mock;
import
static
org
.
mockito
.
Mockito
.
when
;
import
static
org
.
testng
.
Assert
.
assertEquals
;
import
static
org
.
testng
.
Assert
.
assertFalse
;
import
static
org
.
testng
.
Assert
.
assertNotEquals
;
import
static
org
.
testng
.
Assert
.
assertNotNull
;
import
static
org
.
testng
.
Assert
.
assertTrue
;
...
...
@@ -253,6 +259,36 @@ public class ImportServiceTest extends ExportImportTestBase {
assertFalse
(
entity
.
getClassifications
().
get
(
1
).
getEntityGuid
().
equalsIgnoreCase
(
entity
.
getGuid
()));
}
@Test
(
dataProvider
=
"stocks-legacy"
)
public
void
importExistingTopLevelEntity
(
ZipSource
zipSource
)
throws
IOException
,
AtlasBaseException
{
loadBaseModel
();
loadFsModel
();
loadHiveModel
();
AtlasEntity
db
=
new
AtlasEntity
(
"hive_db"
,
"name"
,
"stocks"
);
db
.
setAttribute
(
"clusterName"
,
"cl1"
);
db
.
setAttribute
(
"qualifiedName"
,
"stocks@cl1"
);
AtlasEntity
.
AtlasEntitiesWithExtInfo
entitiesWithExtInfo
=
new
AtlasEntity
.
AtlasEntitiesWithExtInfo
();
entitiesWithExtInfo
.
addEntity
(
db
);
AtlasEntityStream
entityStream
=
new
AtlasEntityStream
(
entitiesWithExtInfo
);
EntityMutationResponse
createResponse
=
entityStore
.
createOrUpdate
(
entityStream
,
false
);
assertNotNull
(
createResponse
);
String
preImportGuid
=
createResponse
.
getCreatedEntities
().
get
(
0
).
getGuid
();
runImportWithNoParameters
(
importService
,
zipSource
);
AtlasVertex
v
=
AtlasGraphUtilsV2
.
findByGuid
(
"886c5e9c-3ac6-40be-8201-fb0cebb64783"
);
assertNotNull
(
v
);
String
postImportGuid
=
AtlasGraphUtilsV2
.
getIdFromVertex
(
v
);
assertNotEquals
(
preImportGuid
,
postImportGuid
);
String
historicalGuids
=
v
.
getProperty
(
Constants
.
HISTORICAL_GUID_PROPERTY_KEY
,
String
.
class
);
assertTrue
(
historicalGuids
.
contains
(
preImportGuid
));
}
@DataProvider
(
name
=
"stocks-glossary"
)
public
static
Object
[][]
getDataFromGlossary
(
ITestContext
context
)
throws
IOException
,
AtlasBaseException
{
return
getZipSource
(
"stocks-glossary.zip"
);
...
...
repository/src/test/java/org/apache/atlas/repository/impexp/StartEntityFetchByExportRequestTest.java
0 → 100644
View file @
fad2822f
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
repository
.
impexp
;
import
org.apache.atlas.TestModules
;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.model.impexp.AtlasExportRequest
;
import
org.apache.atlas.model.instance.AtlasObjectId
;
import
org.apache.atlas.repository.graphdb.AtlasGraph
;
import
org.apache.atlas.store.AtlasTypeDefStore
;
import
org.apache.atlas.type.AtlasType
;
import
org.apache.atlas.type.AtlasTypeRegistry
;
import
org.apache.atlas.util.AtlasGremlin3QueryProvider
;
import
org.testng.annotations.BeforeClass
;
import
org.testng.annotations.Guice
;
import
org.testng.annotations.Test
;
import
javax.inject.Inject
;
import
java.io.IOException
;
import
java.util.Collections
;
import
java.util.List
;
import
java.util.Map
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
StartEntityFetchByExportRequest
.
BINDING_PARAMETER_ATTR_NAME
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
StartEntityFetchByExportRequest
.
BINDING_PARAMETER_TYPENAME
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
StartEntityFetchByExportRequest
.
BINDING_PARAMTER_ATTR_VALUE
;
import
static
org
.
testng
.
Assert
.
assertEquals
;
@Guice
(
modules
=
TestModules
.
TestOnlyModule
.
class
)
public
class
StartEntityFetchByExportRequestTest
extends
ExportImportTestBase
{
@Inject
private
AtlasGraph
atlasGraph
;
@Inject
private
AtlasTypeRegistry
typeRegistry
;
@Inject
private
AtlasTypeDefStore
typeDefStore
;
private
AtlasGremlin3QueryProvider
atlasGremlin3QueryProvider
;
private
StartEntityFetchByExportRequestSpy
startEntityFetchByExportRequestSpy
;
private
class
StartEntityFetchByExportRequestSpy
extends
StartEntityFetchByExportRequest
{
String
generatedQuery
;
Map
<
String
,
Object
>
suppliedBindingsMap
;
public
StartEntityFetchByExportRequestSpy
(
AtlasGraph
atlasGraph
,
AtlasTypeRegistry
typeRegistry
)
{
super
(
atlasGraph
,
typeRegistry
,
atlasGremlin3QueryProvider
);
}
@Override
List
<
String
>
executeGremlinQuery
(
String
query
,
Map
<
String
,
Object
>
bindings
)
{
this
.
generatedQuery
=
query
;
this
.
suppliedBindingsMap
=
bindings
;
return
Collections
.
EMPTY_LIST
;
}
public
String
getGeneratedQuery
()
{
return
generatedQuery
;
}
public
Map
<
String
,
Object
>
getSuppliedBindingsMap
()
{
return
suppliedBindingsMap
;
}
}
@BeforeClass
void
setup
()
throws
IOException
,
AtlasBaseException
{
super
.
basicSetup
(
typeDefStore
,
typeRegistry
);
atlasGremlin3QueryProvider
=
new
AtlasGremlin3QueryProvider
();
startEntityFetchByExportRequestSpy
=
new
StartEntityFetchByExportRequestSpy
(
atlasGraph
,
typeRegistry
);
}
@Test
public
void
fetchTypeGuid
()
{
String
exportRequestJson
=
"{ \"itemsToExport\": [ { \"typeName\": \"hive_db\", \"guid\": \"111-222-333\" } ]}"
;
AtlasExportRequest
exportRequest
=
AtlasType
.
fromJson
(
exportRequestJson
,
AtlasExportRequest
.
class
);
List
<
AtlasObjectId
>
objectGuidMap
=
startEntityFetchByExportRequestSpy
.
get
(
exportRequest
);
assertEquals
(
objectGuidMap
.
get
(
0
).
getGuid
(),
"111-222-333"
);
}
@Test
public
void
fetchTypeUniqueAttributes
()
{
String
exportRequestJson
=
"{ \"itemsToExport\": [ { \"typeName\": \"hive_db\", \"uniqueAttributes\": {\"qualifiedName\": \"stocks@cl1\"} } ]}"
;
AtlasExportRequest
exportRequest
=
AtlasType
.
fromJson
(
exportRequestJson
,
AtlasExportRequest
.
class
);
startEntityFetchByExportRequestSpy
.
get
(
exportRequest
);
assertEquals
(
startEntityFetchByExportRequestSpy
.
getGeneratedQuery
(),
startEntityFetchByExportRequestSpy
.
getQueryTemplateForMatchType
(
exportRequest
.
getMatchTypeOptionValue
()));
assertEquals
(
startEntityFetchByExportRequestSpy
.
getSuppliedBindingsMap
().
get
(
BINDING_PARAMETER_TYPENAME
),
"hive_db"
);
assertEquals
(
startEntityFetchByExportRequestSpy
.
getSuppliedBindingsMap
().
get
(
BINDING_PARAMETER_ATTR_NAME
),
"Referenceable.qualifiedName"
);
assertEquals
(
startEntityFetchByExportRequestSpy
.
getSuppliedBindingsMap
().
get
(
BINDING_PARAMTER_ATTR_VALUE
),
"stocks@cl1"
);
}
}
repository/src/test/java/org/apache/atlas/repository/store/graph/v2/
AtlasEntityStoreV2
BulkImportPercentTest.java
→
repository/src/test/java/org/apache/atlas/repository/store/graph/v2/BulkImportPercentTest.java
View file @
fad2822f
...
...
@@ -32,7 +32,7 @@ import static org.mockito.Mockito.mock;
import
static
org
.
testng
.
Assert
.
assertEquals
;
import
static
org
.
testng
.
Assert
.
assertTrue
;
public
class
AtlasEntityStoreV2
BulkImportPercentTest
{
public
class
BulkImportPercentTest
{
private
final
int
MAX_PERCENT
=
100
;
private
final
float
MAX_PERCENT_FLOAT
=
100.0
F
;
...
...
@@ -174,4 +174,15 @@ public class AtlasEntityStoreV2BulkImportPercentTest {
}
return
expected
;
}
@Test
public
void
jsonArrayTest
()
{
String
t1
=
"123-abcd"
;
String
t2
=
"456-efgh"
;
String
jsonArray
=
BulkImporterImpl
.
getJsonArray
(
null
,
t1
);
assertEquals
(
jsonArray
,
String
.
format
(
"[\"%s\"]"
,
t1
));
assertEquals
(
BulkImporterImpl
.
getJsonArray
(
jsonArray
,
t2
),
String
.
format
(
"[\"%s\",\"%s\"]"
,
t1
,
t2
));
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment