Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
atlas
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
dataplatform
atlas
Commits
1c781deb
Commit
1c781deb
authored
Aug 29, 2019
by
nikhilbonte
Committed by
nixonrodrigues
Sep 20, 2019
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
ATLAS-3416 Import API: delete non-exported hive_table for table level replication
Signed-off-by:
nixonrodrigues
<
nixon@apache.org
>
parent
6b100410
Show whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
459 additions
and
10 deletions
+459
-10
ExportImportAuditEntry.java
...org/apache/atlas/model/impexp/ExportImportAuditEntry.java
+1
-0
AuditsWriter.java
...java/org/apache/atlas/repository/impexp/AuditsWriter.java
+22
-0
ImportService.java
...ava/org/apache/atlas/repository/impexp/ImportService.java
+36
-4
TableReplicationRequestProcessor.java
...s/repository/impexp/TableReplicationRequestProcessor.java
+181
-0
ImportServiceTest.java
...org/apache/atlas/repository/impexp/ImportServiceTest.java
+73
-6
TableReplicationRequestProcessorTest.java
...pository/impexp/TableReplicationRequestProcessorTest.java
+146
-0
No files found.
intg/src/main/java/org/apache/atlas/model/impexp/ExportImportAuditEntry.java
View file @
1c781deb
...
@@ -35,6 +35,7 @@ public class ExportImportAuditEntry extends AtlasBaseModelObject implements Seri
...
@@ -35,6 +35,7 @@ public class ExportImportAuditEntry extends AtlasBaseModelObject implements Seri
private
static
final
long
serialVersionUID
=
1L
;
private
static
final
long
serialVersionUID
=
1L
;
public
static
final
String
OPERATION_EXPORT
=
"EXPORT"
;
public
static
final
String
OPERATION_EXPORT
=
"EXPORT"
;
public
static
final
String
OPERATION_IMPORT
=
"IMPORT"
;
public
static
final
String
OPERATION_IMPORT
=
"IMPORT"
;
public
static
final
String
OPERATION_IMPORT_DELETE_REPL
=
"IMPORT_DELETE_REPL"
;
private
String
userName
;
private
String
userName
;
private
String
operation
;
private
String
operation
;
...
...
repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
View file @
1c781deb
...
@@ -45,6 +45,7 @@ import org.springframework.util.CollectionUtils;
...
@@ -45,6 +45,7 @@ import org.springframework.util.CollectionUtils;
import
javax.inject.Inject
;
import
javax.inject.Inject
;
import
java.util.Collections
;
import
java.util.Collections
;
import
java.util.List
;
import
java.util.List
;
import
java.util.Set
;
@Component
@Component
public
class
AuditsWriter
{
public
class
AuditsWriter
{
...
@@ -68,6 +69,10 @@ public class AuditsWriter {
...
@@ -68,6 +69,10 @@ public class AuditsWriter {
this
.
auditService
=
auditService
;
this
.
auditService
=
auditService
;
}
}
public
AtlasServerService
getAtlasServerService
()
{
return
atlasServerService
;
}
public
void
write
(
String
userName
,
AtlasExportResult
result
,
public
void
write
(
String
userName
,
AtlasExportResult
result
,
long
startTime
,
long
endTime
,
long
startTime
,
long
endTime
,
List
<
String
>
entityCreationOrder
)
throws
AtlasBaseException
{
List
<
String
>
entityCreationOrder
)
throws
AtlasBaseException
{
...
@@ -80,6 +85,12 @@ public class AuditsWriter {
...
@@ -80,6 +85,12 @@ public class AuditsWriter {
auditForImport
.
add
(
userName
,
result
,
startTime
,
endTime
,
entityCreationOrder
);
auditForImport
.
add
(
userName
,
result
,
startTime
,
endTime
,
entityCreationOrder
);
}
}
public
void
write
(
String
userName
,
String
sourceCluster
,
long
startTime
,
long
endTime
,
Set
<
String
>
entityCreationOrder
)
throws
AtlasBaseException
{
auditForImport
.
add
(
userName
,
sourceCluster
,
startTime
,
endTime
,
entityCreationOrder
);
}
private
void
updateReplicationAttribute
(
boolean
isReplicationSet
,
private
void
updateReplicationAttribute
(
boolean
isReplicationSet
,
String
serverName
,
String
serverFullName
,
String
serverName
,
String
serverFullName
,
List
<
String
>
exportedGuids
,
List
<
String
>
exportedGuids
,
...
@@ -238,5 +249,16 @@ public class AuditsWriter {
...
@@ -238,5 +249,16 @@ public class AuditsWriter {
updateReplicationAttribute
(
replicationOptionState
,
sourceServerName
,
sourceServerFullName
,
entityGuids
,
updateReplicationAttribute
(
replicationOptionState
,
sourceServerName
,
sourceServerFullName
,
entityGuids
,
Constants
.
ATTR_NAME_REPLICATED_FROM
,
result
.
getExportResult
().
getChangeMarker
());
Constants
.
ATTR_NAME_REPLICATED_FROM
,
result
.
getExportResult
().
getChangeMarker
());
}
}
public
void
add
(
String
userName
,
String
sourceCluster
,
long
startTime
,
long
endTime
,
Set
<
String
>
entityGuids
)
throws
AtlasBaseException
{
sourceServerName
=
getServerNameFromFullName
(
sourceCluster
);
auditService
.
add
(
userName
,
sourceServerName
,
getCurrentClusterName
(),
ExportImportAuditEntry
.
OPERATION_IMPORT_DELETE_REPL
,
AtlasType
.
toJson
(
entityGuids
),
startTime
,
endTime
,
!
entityGuids
.
isEmpty
());
}
}
}
}
}
repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
View file @
1c781deb
...
@@ -24,8 +24,10 @@ import org.apache.atlas.RequestContext;
...
@@ -24,8 +24,10 @@ import org.apache.atlas.RequestContext;
import
org.apache.atlas.entitytransform.BaseEntityHandler
;
import
org.apache.atlas.entitytransform.BaseEntityHandler
;
import
org.apache.atlas.entitytransform.TransformerContext
;
import
org.apache.atlas.entitytransform.TransformerContext
;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.model.impexp.AtlasExportRequest
;
import
org.apache.atlas.model.impexp.AtlasImportRequest
;
import
org.apache.atlas.model.impexp.AtlasImportRequest
;
import
org.apache.atlas.model.impexp.AtlasImportResult
;
import
org.apache.atlas.model.impexp.AtlasImportResult
;
import
org.apache.atlas.model.instance.AtlasObjectId
;
import
org.apache.atlas.model.typedef.AtlasTypesDef
;
import
org.apache.atlas.model.typedef.AtlasTypesDef
;
import
org.apache.atlas.repository.store.graph.BulkImporter
;
import
org.apache.atlas.repository.store.graph.BulkImporter
;
import
org.apache.atlas.repository.store.graph.v2.EntityImportStream
;
import
org.apache.atlas.repository.store.graph.v2.EntityImportStream
;
...
@@ -54,24 +56,28 @@ import static org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMS_KEY;
...
@@ -54,24 +56,28 @@ import static org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMS_KEY;
public
class
ImportService
{
public
class
ImportService
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
ImportService
.
class
);
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
ImportService
.
class
);
private
static
final
String
ATLAS_TYPE_HIVE_TABLE
=
"hive_table"
;
private
final
AtlasTypeDefStore
typeDefStore
;
private
final
AtlasTypeDefStore
typeDefStore
;
private
final
AtlasTypeRegistry
typeRegistry
;
private
final
AtlasTypeRegistry
typeRegistry
;
private
final
BulkImporter
bulkImporter
;
private
final
BulkImporter
bulkImporter
;
private
final
AuditsWriter
auditsWriter
;
private
final
AuditsWriter
auditsWriter
;
private
final
ImportTransformsShaper
importTransformsShaper
;
private
final
ImportTransformsShaper
importTransformsShaper
;
private
TableReplicationRequestProcessor
tableReplicationRequestProcessor
;
private
long
startTimestamp
;
private
long
startTimestamp
;
private
long
endTimestamp
;
private
long
endTimestamp
;
@Inject
@Inject
public
ImportService
(
AtlasTypeDefStore
typeDefStore
,
AtlasTypeRegistry
typeRegistry
,
BulkImporter
bulkImporter
,
public
ImportService
(
AtlasTypeDefStore
typeDefStore
,
AtlasTypeRegistry
typeRegistry
,
BulkImporter
bulkImporter
,
AuditsWriter
auditsWriter
,
AuditsWriter
auditsWriter
,
ImportTransformsShaper
importTransformsShaper
,
ImportTransformsShaper
importTransformsShape
r
)
{
TableReplicationRequestProcessor
tableReplicationRequestProcesso
r
)
{
this
.
typeDefStore
=
typeDefStore
;
this
.
typeDefStore
=
typeDefStore
;
this
.
typeRegistry
=
typeRegistry
;
this
.
typeRegistry
=
typeRegistry
;
this
.
bulkImporter
=
bulkImporter
;
this
.
bulkImporter
=
bulkImporter
;
this
.
auditsWriter
=
auditsWriter
;
this
.
auditsWriter
=
auditsWriter
;
this
.
importTransformsShaper
=
importTransformsShaper
;
this
.
importTransformsShaper
=
importTransformsShaper
;
this
.
tableReplicationRequestProcessor
=
tableReplicationRequestProcessor
;
}
}
public
AtlasImportResult
run
(
InputStream
inputStream
,
String
userName
,
public
AtlasImportResult
run
(
InputStream
inputStream
,
String
userName
,
...
@@ -109,7 +115,11 @@ public class ImportService {
...
@@ -109,7 +115,11 @@ public class ImportService {
startTimestamp
=
System
.
currentTimeMillis
();
startTimestamp
=
System
.
currentTimeMillis
();
processTypes
(
source
.
getTypesDef
(),
result
);
processTypes
(
source
.
getTypesDef
(),
result
);
setStartPosition
(
request
,
source
);
setStartPosition
(
request
,
source
);
processEntities
(
userName
,
source
,
result
);
processEntities
(
userName
,
source
,
result
);
processReplicationDeletion
(
source
.
getExportResult
().
getRequest
(),
request
);
}
catch
(
AtlasBaseException
excp
)
{
}
catch
(
AtlasBaseException
excp
)
{
LOG
.
error
(
"import(user={}, from={}): failed"
,
userName
,
requestingIP
,
excp
);
LOG
.
error
(
"import(user={}, from={}): failed"
,
userName
,
requestingIP
,
excp
);
...
@@ -228,6 +238,12 @@ public class ImportService {
...
@@ -228,6 +238,12 @@ public class ImportService {
auditsWriter
.
write
(
userName
,
result
,
startTimestamp
,
endTimestamp
,
importSource
.
getCreationOrder
());
auditsWriter
.
write
(
userName
,
result
,
startTimestamp
,
endTimestamp
,
importSource
.
getCreationOrder
());
}
}
private
void
processReplicationDeletion
(
AtlasExportRequest
exportRequest
,
AtlasImportRequest
importRequest
)
throws
AtlasBaseException
{
if
(
checkHiveTableIncrementalSkipLineage
(
importRequest
,
exportRequest
))
{
tableReplicationRequestProcessor
.
process
(
exportRequest
,
importRequest
);
}
}
private
int
getDuration
(
long
endTime
,
long
startTime
)
{
private
int
getDuration
(
long
endTime
,
long
startTime
)
{
return
(
int
)
(
endTime
-
startTime
);
return
(
int
)
(
endTime
-
startTime
);
}
}
...
@@ -239,9 +255,25 @@ public class ImportService {
...
@@ -239,9 +255,25 @@ public class ImportService {
}
}
return
new
ZipSourceWithBackingDirectory
(
inputStream
,
configuredTemporaryDirectory
);
return
new
ZipSourceWithBackingDirectory
(
inputStream
,
configuredTemporaryDirectory
);
}
}
catch
(
IOException
ex
)
{
catch
(
IOException
ex
)
{
throw
new
AtlasBaseException
(
ex
);
throw
new
AtlasBaseException
(
ex
);
}
}
}
}
@VisibleForTesting
boolean
checkHiveTableIncrementalSkipLineage
(
AtlasImportRequest
importRequest
,
AtlasExportRequest
exportRequest
)
{
if
(
CollectionUtils
.
isEmpty
(
exportRequest
.
getItemsToExport
()))
{
return
false
;
}
for
(
AtlasObjectId
itemToExport
:
exportRequest
.
getItemsToExport
())
{
if
(!
itemToExport
.
getTypeName
().
equalsIgnoreCase
(
ATLAS_TYPE_HIVE_TABLE
)){
return
false
;
}
}
return
importRequest
.
isReplicationOptionSet
()
&&
exportRequest
.
isReplicationOptionSet
()
&&
exportRequest
.
getFetchTypeOptionValue
().
equalsIgnoreCase
(
AtlasExportRequest
.
FETCH_TYPE_INCREMENTAL
)
&&
exportRequest
.
getSkipLineageOptionValue
();
}
}
}
repository/src/main/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessor.java
0 → 100644
View file @
1c781deb
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
repository
.
impexp
;
import
org.apache.atlas.authorize.AtlasAuthorizationUtils
;
import
org.apache.atlas.discovery.AtlasDiscoveryService
;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.model.discovery.AtlasSearchResult
;
import
org.apache.atlas.model.discovery.SearchParameters
;
import
org.apache.atlas.model.impexp.AtlasExportRequest
;
import
org.apache.atlas.model.impexp.AtlasImportRequest
;
import
org.apache.atlas.model.instance.AtlasEntityHeader
;
import
org.apache.atlas.model.instance.AtlasObjectId
;
import
org.apache.atlas.repository.store.graph.AtlasEntityStore
;
import
org.apache.atlas.type.AtlasTypeRegistry
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.stereotype.Component
;
import
javax.inject.Inject
;
import
java.util.Collections
;
import
java.util.List
;
import
java.util.Set
;
import
java.util.Map
;
import
java.util.ArrayList
;
import
java.util.HashSet
;
@Component
public
class
TableReplicationRequestProcessor
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
TableReplicationRequestProcessor
.
class
);
private
static
final
String
QUERY_DB_NAME_EQUALS
=
"where db.name='%s'"
;
private
static
final
String
ATTR_NAME_KEY
=
"name"
;
private
static
final
String
TYPE_HIVE_TABLE
=
"hive_table"
;
private
static
final
String
ATTR_QUALIFIED_NAME_KEY
=
"qualifiedName"
;
private
static
final
String
REPLICATED_TAG_NAME
=
"%s_replicated"
;
private
long
startTstamp
;
private
long
endTstamp
;
private
AuditsWriter
auditsWriter
;
private
AtlasEntityStore
entityStore
;
private
AtlasTypeRegistry
typeRegistry
;
private
AtlasDiscoveryService
discoveryService
;
@Inject
public
TableReplicationRequestProcessor
(
AuditsWriter
auditsWriter
,
AtlasEntityStore
entityStore
,
AtlasDiscoveryService
atlasDiscoveryService
,
AtlasTypeRegistry
typeRegistry
)
{
this
.
auditsWriter
=
auditsWriter
;
this
.
entityStore
=
entityStore
;
this
.
typeRegistry
=
typeRegistry
;
this
.
discoveryService
=
atlasDiscoveryService
;
}
public
void
process
(
AtlasExportRequest
exportRequest
,
AtlasImportRequest
importRequest
)
throws
AtlasBaseException
{
startTstamp
=
System
.
currentTimeMillis
();
LOG
.
info
(
"process: deleting entities with type hive_table which are not imported."
);
String
sourceCluster
=
importRequest
.
getOptionKeyReplicatedFrom
();
List
<
String
>
qualifiedNames
=
getQualifiedNamesFromRequest
(
exportRequest
);
List
<
String
>
safeGUIDs
=
getEntitiesFromQualifiedNames
(
qualifiedNames
);
String
dbName
=
getDbName
(
safeGUIDs
.
get
(
0
));
Set
<
String
>
guidsToDelete
=
getGuidsToDelete
(
dbName
,
safeGUIDs
,
sourceCluster
);
deleteTables
(
sourceCluster
,
guidsToDelete
);
}
private
List
<
String
>
getQualifiedNamesFromRequest
(
AtlasExportRequest
exportRequest
){
List
<
String
>
qualifiedNames
=
new
ArrayList
<>();
for
(
AtlasObjectId
objectId
:
exportRequest
.
getItemsToExport
())
{
qualifiedNames
.
add
(
objectId
.
getUniqueAttributes
().
get
(
ATTR_QUALIFIED_NAME_KEY
).
toString
());
}
return
qualifiedNames
;
}
private
List
<
String
>
getEntitiesFromQualifiedNames
(
List
<
String
>
qualifiedNames
)
throws
AtlasBaseException
{
List
<
String
>
safeGUIDs
=
new
ArrayList
<>();
for
(
String
qualifiedName
:
qualifiedNames
)
{
String
guid
=
getGuidByUniqueAttributes
(
Collections
.
singletonMap
(
ATTR_QUALIFIED_NAME_KEY
,
qualifiedName
));
safeGUIDs
.
add
(
guid
);
}
return
safeGUIDs
;
}
private
String
getGuidByUniqueAttributes
(
Map
<
String
,
Object
>
uniqueAttributes
)
throws
AtlasBaseException
{
return
entityStore
.
getGuidByUniqueAttributes
(
typeRegistry
.
getEntityTypeByName
(
TYPE_HIVE_TABLE
),
uniqueAttributes
);
}
private
String
getDbName
(
String
tableGuid
)
throws
AtlasBaseException
{
String
dbGuid
=
AuditsWriter
.
ReplKeyGuidFinder
.
get
(
typeRegistry
,
entityStore
,
tableGuid
);
return
(
String
)
entityStore
.
getById
(
dbGuid
).
getEntity
().
getAttribute
(
ATTR_NAME_KEY
);
}
private
Set
<
String
>
getGuidsToDelete
(
String
dbName
,
List
<
String
>
excludeGUIDs
,
String
sourceCluster
)
throws
AtlasBaseException
{
SearchParameters
parameters
=
getSearchParameters
(
dbName
,
sourceCluster
);
Set
<
String
>
unsafeGUIDs
=
new
HashSet
<>();
final
int
max
=
10000
;
int
fetchedSize
=
0
;
int
i
=
0
;
parameters
.
setLimit
(
max
);
while
(
fetchedSize
==
(
max
*
i
))
{
if
(
LOG
.
isDebugEnabled
())
{
LOG
.
debug
(
"i={}, fetchedSize={}, unsafeGUIDs.size()={}"
,
i
,
fetchedSize
,
unsafeGUIDs
.
size
());
}
int
offset
=
max
*
i
;
parameters
.
setOffset
(
offset
);
AtlasSearchResult
searchResult
=
discoveryService
.
searchWithParameters
(
parameters
);
if
(
CollectionUtils
.
isEmpty
(
searchResult
.
getEntities
()))
{
break
;
}
for
(
AtlasEntityHeader
entityHeader
:
searchResult
.
getEntities
())
{
String
guid
=
entityHeader
.
getGuid
();
if
(!
excludeGUIDs
.
contains
(
guid
))
{
unsafeGUIDs
.
add
(
guid
);
}
}
fetchedSize
=
searchResult
.
getEntities
().
size
();
i
++;
}
return
unsafeGUIDs
;
}
private
SearchParameters
getSearchParameters
(
String
dbName
,
String
sourceCluster
)
{
String
query
=
String
.
format
(
QUERY_DB_NAME_EQUALS
,
dbName
);
SearchParameters
parameters
=
new
SearchParameters
();
parameters
.
setExcludeDeletedEntities
(
false
);
parameters
.
setTypeName
(
TYPE_HIVE_TABLE
);
parameters
.
setExcludeDeletedEntities
(
true
);
parameters
.
setClassification
(
String
.
format
(
REPLICATED_TAG_NAME
,
sourceCluster
));
parameters
.
setAttributes
(
new
HashSet
<
String
>(){{
add
(
AtlasImportRequest
.
OPTION_KEY_REPLICATED_FROM
);
}});
parameters
.
setQuery
(
query
);
return
parameters
;
}
private
void
deleteTables
(
String
sourceCluster
,
Set
<
String
>
guidsToDelete
)
throws
AtlasBaseException
{
if
(!
CollectionUtils
.
isEmpty
(
guidsToDelete
))
{
entityStore
.
deleteByIds
(
new
ArrayList
<>(
guidsToDelete
));
endTstamp
=
System
.
currentTimeMillis
();
createAuditEntry
(
sourceCluster
,
guidsToDelete
);
}
}
private
void
createAuditEntry
(
String
sourceCluster
,
Set
<
String
>
guidsToDelete
)
throws
AtlasBaseException
{
auditsWriter
.
write
(
AtlasAuthorizationUtils
.
getCurrentUserName
(),
sourceCluster
,
startTstamp
,
endTstamp
,
guidsToDelete
);
if
(
LOG
.
isDebugEnabled
())
{
LOG
.
debug
(
"Deleted entities => {}"
,
guidsToDelete
);
}
}
}
repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
View file @
1c781deb
...
@@ -24,9 +24,11 @@ import org.apache.atlas.TestModules;
...
@@ -24,9 +24,11 @@ import org.apache.atlas.TestModules;
import
org.apache.atlas.TestUtilsV2
;
import
org.apache.atlas.TestUtilsV2
;
import
org.apache.atlas.discovery.EntityDiscoveryService
;
import
org.apache.atlas.discovery.EntityDiscoveryService
;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.model.impexp.AtlasExportRequest
;
import
org.apache.atlas.model.impexp.AtlasImportRequest
;
import
org.apache.atlas.model.impexp.AtlasImportRequest
;
import
org.apache.atlas.model.instance.AtlasEntity
;
import
org.apache.atlas.model.instance.AtlasEntity
;
import
org.apache.atlas.model.instance.AtlasEntityHeader
;
import
org.apache.atlas.model.instance.AtlasEntityHeader
;
import
org.apache.atlas.model.instance.AtlasObjectId
;
import
org.apache.atlas.model.instance.AtlasRelatedObjectId
;
import
org.apache.atlas.model.instance.AtlasRelatedObjectId
;
import
org.apache.atlas.model.instance.EntityMutationResponse
;
import
org.apache.atlas.model.instance.EntityMutationResponse
;
import
org.apache.atlas.repository.Constants
;
import
org.apache.atlas.repository.Constants
;
...
@@ -40,10 +42,7 @@ import org.apache.atlas.store.AtlasTypeDefStore;
...
@@ -40,10 +42,7 @@ import org.apache.atlas.store.AtlasTypeDefStore;
import
org.apache.atlas.type.AtlasClassificationType
;
import
org.apache.atlas.type.AtlasClassificationType
;
import
org.apache.atlas.type.AtlasTypeRegistry
;
import
org.apache.atlas.type.AtlasTypeRegistry
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.tinkerpop.shaded.kryo.io.Input
;
import
org.mockito.stubbing.Answer
;
import
org.mockito.stubbing.Answer
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.testng.ITestContext
;
import
org.testng.ITestContext
;
import
org.testng.annotations.AfterClass
;
import
org.testng.annotations.AfterClass
;
import
org.testng.annotations.AfterTest
;
import
org.testng.annotations.AfterTest
;
...
@@ -54,6 +53,7 @@ import org.testng.annotations.Test;
...
@@ -54,6 +53,7 @@ import org.testng.annotations.Test;
import
java.io.IOException
;
import
java.io.IOException
;
import
java.io.InputStream
;
import
java.io.InputStream
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Map
;
...
@@ -61,11 +61,17 @@ import java.util.Map;
...
@@ -61,11 +61,17 @@ import java.util.Map;
import
static
org
.
apache
.
atlas
.
graph
.
GraphSandboxUtil
.
useLocalSolr
;
import
static
org
.
apache
.
atlas
.
graph
.
GraphSandboxUtil
.
useLocalSolr
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
ZipFileResourceTestUtils
.
getDefaultImportRequest
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
ZipFileResourceTestUtils
.
getDefaultImportRequest
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
ZipFileResourceTestUtils
.
getZipSource
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
ZipFileResourceTestUtils
.
getZipSource
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
ZipFileResourceTestUtils
.
getInputStreamFrom
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
ZipFileResourceTestUtils
.
loadModelFromJson
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
ZipFileResourceTestUtils
.
loadModelFromJson
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
ZipFileResourceTestUtils
.
loadModelFromResourcesJson
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
ZipFileResourceTestUtils
.
loadModelFromResourcesJson
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
ZipFileResourceTestUtils
.
runAndVerifyQuickStart_v1_Import
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
ZipFileResourceTestUtils
.
runAndVerifyQuickStart_v1_Import
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
ZipFileResourceTestUtils
.
runImportWithNoParameters
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
ZipFileResourceTestUtils
.
runImportWithNoParameters
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
ZipFileResourceTestUtils
.
runImportWithParameters
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
ZipFileResourceTestUtils
.
runImportWithParameters
;
import
static
org
.
apache
.
atlas
.
model
.
impexp
.
AtlasExportRequest
.
OPTION_FETCH_TYPE
;
import
static
org
.
apache
.
atlas
.
model
.
impexp
.
AtlasExportRequest
.
OPTION_KEY_REPLICATED_TO
;
import
static
org
.
apache
.
atlas
.
model
.
impexp
.
AtlasExportRequest
.
OPTION_SKIP_LINEAGE
;
import
static
org
.
apache
.
atlas
.
model
.
impexp
.
AtlasExportRequest
.
FETCH_TYPE_FULL
;
import
static
org
.
apache
.
atlas
.
model
.
impexp
.
AtlasExportRequest
.
FETCH_TYPE_INCREMENTAL
;
import
static
org
.
mockito
.
Mockito
.
mock
;
import
static
org
.
mockito
.
Mockito
.
mock
;
import
static
org
.
mockito
.
Mockito
.
when
;
import
static
org
.
mockito
.
Mockito
.
when
;
import
static
org
.
testng
.
Assert
.
assertEquals
;
import
static
org
.
testng
.
Assert
.
assertEquals
;
...
@@ -392,7 +398,7 @@ public class ImportServiceTest extends ExportImportTestBase {
...
@@ -392,7 +398,7 @@ public class ImportServiceTest extends ExportImportTestBase {
@Test
@Test
public
void
importServiceProcessesIOException
()
{
public
void
importServiceProcessesIOException
()
{
ImportService
importService
=
new
ImportService
(
typeDefStore
,
typeRegistry
,
null
,
null
,
null
);
ImportService
importService
=
new
ImportService
(
typeDefStore
,
typeRegistry
,
null
,
null
,
null
,
null
);
AtlasImportRequest
req
=
mock
(
AtlasImportRequest
.
class
);
AtlasImportRequest
req
=
mock
(
AtlasImportRequest
.
class
);
Answer
<
Map
>
answer
=
invocationOnMock
->
{
Answer
<
Map
>
answer
=
invocationOnMock
->
{
...
@@ -461,9 +467,70 @@ public class ImportServiceTest extends ExportImportTestBase {
...
@@ -461,9 +467,70 @@ public class ImportServiceTest extends ExportImportTestBase {
assertEquals
(
importTransforms
.
getTransforms
().
get
(
"hive_table"
).
get
(
"qualifiedName"
).
size
(),
2
);
assertEquals
(
importTransforms
.
getTransforms
().
get
(
"hive_table"
).
get
(
"qualifiedName"
).
size
(),
2
);
}
}
@Test
(
expectedExceptions
=
AtlasBaseException
.
class
)
@Test
(
expectedExceptions
=
AtlasBaseException
.
class
)
public
void
importEmptyZip
()
throws
IOException
,
AtlasBaseException
{
public
void
importEmptyZip
()
throws
IOException
,
AtlasBaseException
{
new
ZipSource
((
InputStream
)
getZipSource
(
"empty.zip"
)[
0
][
0
]);
new
ZipSource
(
getInputStreamFrom
(
"empty.zip"
));
}
@Test
public
void
testCheckHiveTableIncrementalSkipLineage
()
{
AtlasImportRequest
importRequest
;
AtlasExportRequest
exportRequest
;
importRequest
=
getImportRequest
(
"cl1"
);
exportRequest
=
getExportRequest
(
FETCH_TYPE_INCREMENTAL
,
"cl2"
,
true
,
getItemsToExport
(
"hive_table"
,
"hive_table"
));
assertTrue
(
importService
.
checkHiveTableIncrementalSkipLineage
(
importRequest
,
exportRequest
));
exportRequest
=
getExportRequest
(
FETCH_TYPE_INCREMENTAL
,
"cl2"
,
true
,
getItemsToExport
(
"hive_table"
,
"hive_db"
,
"hive_table"
));
assertFalse
(
importService
.
checkHiveTableIncrementalSkipLineage
(
importRequest
,
exportRequest
));
exportRequest
=
getExportRequest
(
FETCH_TYPE_FULL
,
"cl2"
,
true
,
getItemsToExport
(
"hive_table"
,
"hive_table"
));
assertFalse
(
importService
.
checkHiveTableIncrementalSkipLineage
(
importRequest
,
exportRequest
));
exportRequest
=
getExportRequest
(
FETCH_TYPE_FULL
,
""
,
true
,
getItemsToExport
(
"hive_table"
,
"hive_table"
));
assertFalse
(
importService
.
checkHiveTableIncrementalSkipLineage
(
importRequest
,
exportRequest
));
importRequest
=
getImportRequest
(
""
);
exportRequest
=
getExportRequest
(
FETCH_TYPE_INCREMENTAL
,
"cl2"
,
true
,
getItemsToExport
(
"hive_table"
,
"hive_table"
));
assertFalse
(
importService
.
checkHiveTableIncrementalSkipLineage
(
importRequest
,
exportRequest
));
}
private
AtlasImportRequest
getImportRequest
(
String
replicatedFrom
){
AtlasImportRequest
importRequest
=
getDefaultImportRequest
();
if
(!
StringUtils
.
isEmpty
(
replicatedFrom
))
{
importRequest
.
setOption
(
AtlasImportRequest
.
OPTION_KEY_REPLICATED_FROM
,
replicatedFrom
);
}
return
importRequest
;
}
private
AtlasExportRequest
getExportRequest
(
String
fetchType
,
String
replicatedTo
,
boolean
skipLineage
,
List
<
AtlasObjectId
>
itemsToExport
){
AtlasExportRequest
request
=
new
AtlasExportRequest
();
request
.
setOptions
(
getOptionsMap
(
fetchType
,
replicatedTo
,
skipLineage
));
request
.
setItemsToExport
(
itemsToExport
);
return
request
;
}
private
List
<
AtlasObjectId
>
getItemsToExport
(
String
...
typeNames
){
List
<
AtlasObjectId
>
itemsToExport
=
new
ArrayList
<>();
for
(
String
typeName
:
typeNames
)
{
itemsToExport
.
add
(
new
AtlasObjectId
(
typeName
,
"qualifiedName"
,
"db.table@cluster"
));
}
return
itemsToExport
;
}
private
Map
<
String
,
Object
>
getOptionsMap
(
String
fetchType
,
String
replicatedTo
,
boolean
skipLineage
){
Map
<
String
,
Object
>
options
=
new
HashMap
<>();
if
(!
StringUtils
.
isEmpty
(
fetchType
))
{
options
.
put
(
OPTION_FETCH_TYPE
,
fetchType
);
}
if
(!
StringUtils
.
isEmpty
(
replicatedTo
))
{
options
.
put
(
OPTION_KEY_REPLICATED_TO
,
replicatedTo
);
}
options
.
put
(
OPTION_SKIP_LINEAGE
,
skipLineage
);
return
options
;
}
}
}
}
repository/src/test/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessorTest.java
0 → 100644
View file @
1c781deb
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
repository
.
impexp
;
import
com.google.inject.Inject
;
import
org.apache.atlas.RequestContext
;
import
org.apache.atlas.TestModules
;
import
org.apache.atlas.TestUtilsV2
;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.model.impexp.AtlasImportRequest
;
import
org.apache.atlas.model.impexp.ExportImportAuditEntry
;
import
org.apache.atlas.repository.graph.AtlasGraphProvider
;
import
org.apache.atlas.repository.store.graph.AtlasEntityStore
;
import
org.apache.atlas.runner.LocalSolrRunner
;
import
org.apache.atlas.store.AtlasTypeDefStore
;
import
org.apache.atlas.type.AtlasType
;
import
org.apache.atlas.type.AtlasTypeRegistry
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.testng.ITestContext
;
import
org.testng.SkipException
;
import
org.testng.annotations.Guice
;
import
org.testng.annotations.Test
;
import
org.testng.annotations.AfterClass
;
import
org.testng.annotations.BeforeTest
;
import
org.testng.annotations.DataProvider
;
import
java.io.IOException
;
import
java.io.InputStream
;
import
java.util.List
;
import
static
org
.
apache
.
atlas
.
graph
.
GraphSandboxUtil
.
useLocalSolr
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
ZipFileResourceTestUtils
.
getDefaultImportRequest
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
ZipFileResourceTestUtils
.
getZipSource
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
ZipFileResourceTestUtils
.
getInputStreamFrom
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
ZipFileResourceTestUtils
.
runImportWithParameters
;
import
static
org
.
testng
.
Assert
.
assertTrue
;
import
static
org
.
testng
.
Assert
.
assertFalse
;
import
static
org
.
testng
.
Assert
.
assertNotNull
;
@Guice
(
modules
=
TestModules
.
TestOnlyModule
.
class
)
public
class
TableReplicationRequestProcessorTest
extends
ExportImportTestBase
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
TableReplicationRequestProcessorTest
.
class
);
private
static
final
String
ENTITY_GUID_REPLICATED
=
"718a6d12-35a8-4731-aff8-3a64637a43a3"
;
private
static
final
String
ENTITY_GUID_NOT_REPLICATED_1
=
"e19e5683-d9ae-436a-af1e-0873582d0f1e"
;
private
static
final
String
ENTITY_GUID_NOT_REPLICATED_2
=
"2e28ae34-576e-4a8b-be48-cf5f925d7b15"
;
private
static
final
String
REPL_FROM
=
"cl1"
;
private
static
final
String
REPL_TRANSFORMER
=
"[{\"conditions\":{\"__entity\":\"topLevel: \"},"
+
"\"action\":{\"__entity\":\"ADD_CLASSIFICATION: cl1_replicated\"}},"
+
"{\"action\":{\"__entity.replicatedTo\":\"CLEAR:\",\"__entity.replicatedFrom\":\"CLEAR:\"}},"
+
"{\"conditions\":{\"hive_db.clusterName\":\"EQUALS: cl1\"},\"action\":{\"hive_db.clusterName\":\"SET: cl2\"}},"
+
"{\"conditions\":{\"hive_db.location\":\"STARTS_WITH_IGNORE_CASE: file:///\"},"
+
"\"action\":{\"hive_db.location\":\"REPLACE_PREFIX: = :file:///=file:///\"}},"
+
"{\"conditions\":{\"hive_storagedesc.location\":\"STARTS_WITH_IGNORE_CASE: file:///\"},"
+
"\"action\":{\"hive_storagedesc.location\":\"REPLACE_PREFIX: = :file:///=file:///\"}}]"
;
@Inject
private
ImportService
importService
;
@Inject
private
AtlasTypeRegistry
typeRegistry
;
@Inject
private
AtlasEntityStore
entityStore
;
@Inject
private
ExportImportAuditService
auditService
;
@Inject
private
AtlasTypeDefStore
typeDefStore
;
@BeforeTest
public
void
setupTest
()
throws
IOException
,
AtlasBaseException
{
RequestContext
.
clear
();
RequestContext
.
get
().
setUser
(
TestUtilsV2
.
TEST_USER
,
null
);
basicSetup
(
typeDefStore
,
typeRegistry
);
}
@AfterClass
public
void
clear
()
throws
Exception
{
AtlasGraphProvider
.
cleanup
();
if
(
useLocalSolr
())
{
LocalSolrRunner
.
stop
();
}
}
@DataProvider
(
name
=
"source1"
)
public
static
Object
[][]
getData1
(
ITestContext
context
)
throws
IOException
,
AtlasBaseException
{
return
getZipSource
(
"repl_exp_1.zip"
);
}
public
static
InputStream
getData2
()
{
return
getInputStreamFrom
(
"repl_exp_2.zip"
);
}
@Test
(
dataProvider
=
"source1"
)
public
void
importWithIsReplTrue
(
InputStream
zipSource
)
throws
AtlasBaseException
,
IOException
{
AtlasImportRequest
atlasImportRequest
=
getDefaultImportRequest
();
atlasImportRequest
.
setOption
(
"replicatedFrom"
,
REPL_FROM
);
atlasImportRequest
.
setOption
(
"transformers"
,
REPL_TRANSFORMER
);
runImportWithParameters
(
importService
,
atlasImportRequest
,
zipSource
);
runImportWithParameters
(
importService
,
atlasImportRequest
,
getData2
());
assertAuditEntry
();
}
private
void
assertAuditEntry
()
{
pauseForIndexCreation
();
List
<
ExportImportAuditEntry
>
result
;
try
{
result
=
auditService
.
get
(
""
,
"IMPORT_DELETE_REPL"
,
""
,
""
,
""
,
10
,
0
);
}
catch
(
Exception
e
)
{
throw
new
SkipException
(
"audit entries not retrieved."
);
}
assertNotNull
(
result
);
assertTrue
(
result
.
size
()
>
0
);
List
<
String
>
deletedGuids
=
AtlasType
.
fromJson
(
result
.
get
(
0
).
getResultSummary
(),
List
.
class
);
assertNotNull
(
deletedGuids
);
assertFalse
(
deletedGuids
.
contains
(
ENTITY_GUID_REPLICATED
));
assertTrue
(
deletedGuids
.
contains
(
ENTITY_GUID_NOT_REPLICATED_1
));
assertTrue
(
deletedGuids
.
contains
(
ENTITY_GUID_NOT_REPLICATED_2
));
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment