Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
atlas
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
dataplatform
atlas
Commits
a2ccfb9f
Commit
a2ccfb9f
authored
5 years ago
by
Ashutosh Mestry
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
ATLAS-3320: Import Service. Support concurrent ingest.
parent
e159f767
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
31 changed files
with
1383 additions
and
45 deletions
+1383
-45
AtlasJanusGraph.java
...pache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
+1
-1
AtlasConfiguration.java
intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+1
-0
AtlasImportRequest.java
...ava/org/apache/atlas/model/impexp/AtlasImportRequest.java
+40
-3
WorkItemConsumer.java
intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
+4
-7
WorkItemManager.java
intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java
+5
-4
GraphTransactionInterceptor.java
...in/java/org/apache/atlas/GraphTransactionInterceptor.java
+4
-0
AuditsWriter.java
...java/org/apache/atlas/repository/impexp/AuditsWriter.java
+2
-1
ImportService.java
...ava/org/apache/atlas/repository/impexp/ImportService.java
+19
-3
ZipExportFileNames.java
...rg/apache/atlas/repository/impexp/ZipExportFileNames.java
+4
-0
ZipSourceDirect.java
...a/org/apache/atlas/repository/impexp/ZipSourceDirect.java
+269
-0
ZipFileMigrationImporter.java
.../atlas/repository/migration/ZipFileMigrationImporter.java
+51
-7
UniqueAttributePatch.java
...apache/atlas/repository/patches/UniqueAttributePatch.java
+2
-2
AtlasEntityStore.java
...apache/atlas/repository/store/graph/AtlasEntityStore.java
+8
-0
AtlasEntityStoreV2.java
...e/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+9
-2
AtlasRelationshipStoreV2.java
...s/repository/store/graph/v2/AtlasRelationshipStoreV2.java
+4
-0
BulkImporterImpl.java
...che/atlas/repository/store/graph/v2/BulkImporterImpl.java
+0
-0
EntityGraphMapper.java
...he/atlas/repository/store/graph/v2/EntityGraphMapper.java
+28
-13
ImportStrategy.java
.../repository/store/graph/v2/bulkimport/ImportStrategy.java
+28
-0
MigrationImport.java
...repository/store/graph/v2/bulkimport/MigrationImport.java
+122
-0
RegularImport.java
...s/repository/store/graph/v2/bulkimport/RegularImport.java
+0
-0
EntityConsumer.java
...pository/store/graph/v2/bulkimport/pc/EntityConsumer.java
+213
-0
EntityConsumerBuilder.java
...y/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java
+50
-0
EntityCreationManager.java
...y/store/graph/v2/bulkimport/pc/EntityCreationManager.java
+130
-0
StatusReporter.java
...pository/store/graph/v2/bulkimport/pc/StatusReporter.java
+131
-0
ImportServiceTest.java
...org/apache/atlas/repository/impexp/ImportServiceTest.java
+16
-0
MigrationImportTest.java
...g/apache/atlas/repository/impexp/MigrationImportTest.java
+77
-0
StatusReporterTest.java
...rg/apache/atlas/repository/impexp/StatusReporterTest.java
+99
-0
ZipDirectTest.java
...ava/org/apache/atlas/repository/impexp/ZipDirectTest.java
+61
-0
ZipFileResourceTestUtils.java
...che/atlas/repository/impexp/ZipFileResourceTestUtils.java
+5
-2
zip-direct-1.zip
repository/src/test/resources/zip-direct-1.zip
+0
-0
zip-direct-2.zip
repository/src/test/resources/zip-direct-2.zip
+0
-0
No files found.
graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
View file @
a2ccfb9f
...
...
@@ -116,7 +116,7 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE
}
}
janusGraph
=
(
StandardJanusGraph
)
AtlasJanusGraphDatabase
.
getGraphInstance
()
;
janusGraph
=
(
StandardJanusGraph
)
graphInstance
;
}
@Override
...
...
This diff is collapsed.
Click to expand it.
intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
View file @
a2ccfb9f
...
...
@@ -64,6 +64,7 @@ public enum AtlasConfiguration {
CUSTOM_ATTRIBUTE_VALUE_MAX_LENGTH
(
"atlas.custom.attribute.value.max.length"
,
500
),
LABEL_MAX_LENGTH
(
"atlas.entity.label.max.length"
,
50
),
IMPORT_TEMP_DIRECTORY
(
"atlas.import.temp.directory"
,
""
),
MIGRATION_IMPORT_START_POSITION
(
"atlas.migration.import.start.position"
,
0
),
LINEAGE_USING_GREMLIN
(
"atlas.lineage.query.use.gremlin"
,
false
);
private
static
final
Configuration
APPLICATION_PROPERTIES
;
...
...
This diff is collapsed.
Click to expand it.
intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
View file @
a2ccfb9f
...
...
@@ -44,10 +44,16 @@ public class AtlasImportRequest implements Serializable {
public
static
final
String
TRANSFORMS_KEY
=
"transforms"
;
public
static
final
String
TRANSFORMERS_KEY
=
"transformers"
;
public
static
final
String
OPTION_KEY_REPLICATED_FROM
=
"replicatedFrom"
;
private
static
final
String
START_POSITION_KEY
=
"startPosition"
;
public
static
final
String
OPTION_KEY_MIGRATION
=
"migration"
;
public
static
final
String
OPTION_KEY_NUM_WORKERS
=
"numWorkers"
;
public
static
final
String
OPTION_KEY_BATCH_SIZE
=
"batchSize"
;
public
static
final
String
OPTION_KEY_FORMAT
=
"format"
;
public
static
final
String
OPTION_KEY_FORMAT_ZIP_DIRECT
=
"zipDirect"
;
public
static
final
String
START_POSITION_KEY
=
"startPosition"
;
private
static
final
String
START_GUID_KEY
=
"startGuid"
;
private
static
final
String
FILE_NAME_KEY
=
"fileName"
;
private
static
final
String
UPDATE_TYPE_DEFINITION_KEY
=
"updateTypeDefinition"
;
private
static
final
String
OPTION_KEY_STREAM_SIZE
=
"size"
;
private
Map
<
String
,
String
>
options
;
...
...
@@ -108,7 +114,7 @@ public class AtlasImportRequest implements Serializable {
return
null
;
}
return
(
String
)
this
.
options
.
get
(
key
);
return
this
.
options
.
get
(
key
);
}
@JsonIgnore
...
...
@@ -121,10 +127,41 @@ public class AtlasImportRequest implements Serializable {
return
isReplicationOptionSet
()
?
options
.
get
(
OPTION_KEY_REPLICATED_FROM
)
:
StringUtils
.
EMPTY
;
}
@JsonIgnore
public
int
getOptionKeyNumWorkers
()
{
return
getOptionsValue
(
OPTION_KEY_NUM_WORKERS
,
1
);
}
@JsonIgnore
public
int
getOptionKeyBatchSize
()
{
return
getOptionsValue
(
OPTION_KEY_BATCH_SIZE
,
1
);
}
private
int
getOptionsValue
(
String
optionKeyBatchSize
,
int
defaultValue
)
{
String
optionsValue
=
getOptionForKey
(
optionKeyBatchSize
);
return
StringUtils
.
isEmpty
(
optionsValue
)
?
defaultValue
:
Integer
.
valueOf
(
optionsValue
);
}
@JsonAnySetter
public
void
setOption
(
String
key
,
String
value
)
{
if
(
null
==
options
)
{
options
=
new
HashMap
<>();
}
options
.
put
(
key
,
value
);
}}
}
public
void
setSizeOption
(
int
size
)
{
setOption
(
OPTION_KEY_STREAM_SIZE
,
Integer
.
toString
(
size
));
}
public
int
getSizeOption
()
{
if
(!
this
.
options
.
containsKey
(
OPTION_KEY_STREAM_SIZE
))
{
return
1
;
}
return
Integer
.
valueOf
(
this
.
options
.
get
(
OPTION_KEY_STREAM_SIZE
));
}
}
This diff is collapsed.
Click to expand it.
intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
View file @
a2ccfb9f
...
...
@@ -21,6 +21,7 @@ package org.apache.atlas.pc;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.Queue
;
import
java.util.concurrent.BlockingQueue
;
import
java.util.concurrent.CountDownLatch
;
import
java.util.concurrent.TimeUnit
;
...
...
@@ -37,7 +38,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
private
final
AtomicBoolean
isDirty
=
new
AtomicBoolean
(
false
);
private
final
AtomicLong
maxCommitTimeInMs
=
new
AtomicLong
(
DEFAULT_COMMIT_TIME_IN_MS
);
private
CountDownLatch
countdownLatch
;
private
BlockingQueue
<
Object
>
results
;
private
Queue
<
Object
>
results
;
public
WorkItemConsumer
(
BlockingQueue
<
T
>
queue
)
{
this
.
queue
=
queue
;
...
...
@@ -101,11 +102,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
protected
abstract
void
processItem
(
T
item
);
protected
void
addResult
(
Object
value
)
{
try
{
results
.
put
(
value
);
}
catch
(
InterruptedException
e
)
{
LOG
.
error
(
"Interrupted while adding result: {}"
,
value
);
}
results
.
add
(
value
);
}
protected
void
updateCommitTime
(
long
commitTime
)
{
...
...
@@ -118,7 +115,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
this
.
countdownLatch
=
countdownLatch
;
}
public
<
V
>
void
setResults
(
Blocking
Queue
<
Object
>
queue
)
{
public
<
V
>
void
setResults
(
Queue
<
Object
>
queue
)
{
this
.
results
=
queue
;
}
}
This diff is collapsed.
Click to expand it.
intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java
View file @
a2ccfb9f
...
...
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Queue
;
import
java.util.concurrent.*
;
public
class
WorkItemManager
<
T
,
U
extends
WorkItemConsumer
>
{
...
...
@@ -33,7 +34,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
private
final
ExecutorService
service
;
private
final
List
<
U
>
consumers
=
new
ArrayList
<>();
private
CountDownLatch
countdownLatch
;
private
BlockingQueue
<
Object
>
resultsQueue
;
private
Queue
<
Object
>
resultsQueue
;
public
WorkItemManager
(
WorkItemBuilder
builder
,
String
namePrefix
,
int
batchSize
,
int
numWorkers
,
boolean
collectResults
)
{
this
.
numWorkers
=
numWorkers
;
...
...
@@ -49,13 +50,13 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
this
(
builder
,
"workItemConsumer"
,
batchSize
,
numWorkers
,
false
);
}
public
void
setResultsCollection
(
Blocking
Queue
<
Object
>
resultsQueue
)
{
public
void
setResultsCollection
(
Queue
<
Object
>
resultsQueue
)
{
this
.
resultsQueue
=
resultsQueue
;
}
private
void
createConsumers
(
WorkItemBuilder
builder
,
int
numWorkers
,
boolean
collectResults
)
{
if
(
collectResults
)
{
setResultsCollection
(
new
LinkedBlocking
Queue
<>());
setResultsCollection
(
new
ConcurrentLinked
Queue
<>());
}
for
(
int
i
=
0
;
i
<
numWorkers
;
i
++)
{
...
...
@@ -124,7 +125,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
LOG
.
info
(
"WorkItemManager: Shutdown done!"
);
}
public
Blocking
Queue
getResults
()
{
public
Queue
getResults
()
{
return
this
.
resultsQueue
;
}
...
...
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
View file @
a2ccfb9f
...
...
@@ -199,6 +199,10 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
return
cache
.
get
(
guid
);
}
public
static
void
clearCache
()
{
guidVertexCache
.
get
().
clear
();
}
boolean
logException
(
Throwable
t
)
{
if
(
t
instanceof
AtlasBaseException
)
{
Response
.
Status
httpCode
=
((
AtlasBaseException
)
t
).
getAtlasErrorCode
().
getHttpCode
();
...
...
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
View file @
a2ccfb9f
...
...
@@ -247,7 +247,8 @@ public class AuditsWriter {
}
updateReplicationAttribute
(
replicationOptionState
,
sourceServerName
,
sourceServerFullName
,
entityGuids
,
Constants
.
ATTR_NAME_REPLICATED_FROM
,
result
.
getExportResult
().
getChangeMarker
());
Constants
.
ATTR_NAME_REPLICATED_FROM
,
(
result
.
getExportResult
()
!=
null
)
?
result
.
getExportResult
().
getChangeMarker
()
:
0
);
}
public
void
add
(
String
userName
,
String
sourceCluster
,
long
startTime
,
...
...
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
View file @
a2ccfb9f
...
...
@@ -92,7 +92,7 @@ public class ImportService {
request
=
new
AtlasImportRequest
();
}
EntityImportStream
source
=
createZipSource
(
inputStream
,
AtlasConfiguration
.
IMPORT_TEMP_DIRECTORY
.
getString
());
EntityImportStream
source
=
createZipSource
(
request
,
inputStream
,
AtlasConfiguration
.
IMPORT_TEMP_DIRECTORY
.
getString
());
return
run
(
source
,
request
,
userName
,
hostName
,
requestingIP
);
}
...
...
@@ -248,8 +248,18 @@ public class ImportService {
return
(
int
)
(
endTime
-
startTime
);
}
private
EntityImportStream
createZipSource
(
InputStream
inputStream
,
String
configuredTemporaryDirectory
)
throws
AtlasBaseException
{
private
EntityImportStream
createZipSource
(
AtlasImportRequest
request
,
InputStream
inputStream
,
String
configuredTemporaryDirectory
)
throws
AtlasBaseException
{
try
{
if
(
request
.
getOptions
().
containsKey
(
AtlasImportRequest
.
OPTION_KEY_MIGRATION
))
{
LOG
.
info
(
"Migration mode: Detected..."
,
request
.
getOptions
().
get
(
"size"
));
return
getZipDirectEntityImportStream
(
request
,
inputStream
);
}
if
(
request
.
getOptions
().
containsKey
(
AtlasImportRequest
.
OPTION_KEY_FORMAT
)
&&
request
.
getOptions
().
get
(
AtlasImportRequest
.
OPTION_KEY_FORMAT
).
equals
(
AtlasImportRequest
.
OPTION_KEY_FORMAT_ZIP_DIRECT
)
)
{
return
getZipDirectEntityImportStream
(
request
,
inputStream
);
}
if
(
StringUtils
.
isEmpty
(
configuredTemporaryDirectory
))
{
return
new
ZipSource
(
inputStream
);
}
...
...
@@ -260,9 +270,15 @@ public class ImportService {
}
}
private
EntityImportStream
getZipDirectEntityImportStream
(
AtlasImportRequest
request
,
InputStream
inputStream
)
throws
IOException
,
AtlasBaseException
{
ZipSourceDirect
zipSourceDirect
=
new
ZipSourceDirect
(
inputStream
,
request
.
getSizeOption
());
LOG
.
info
(
"Using ZipSourceDirect: Size: {} entities"
,
zipSourceDirect
.
size
());
return
zipSourceDirect
;
}
@VisibleForTesting
boolean
checkHiveTableIncrementalSkipLineage
(
AtlasImportRequest
importRequest
,
AtlasExportRequest
exportRequest
)
{
if
(
CollectionUtils
.
isEmpty
(
exportRequest
.
getItemsToExport
()))
{
if
(
exportRequest
==
null
||
CollectionUtils
.
isEmpty
(
exportRequest
.
getItemsToExport
()))
{
return
false
;
}
...
...
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java
View file @
a2ccfb9f
...
...
@@ -31,4 +31,8 @@ public enum ZipExportFileNames {
public
String
toString
()
{
return
this
.
name
;
}
public
String
toEntryFileName
()
{
return
this
.
name
+
".json"
;
}
}
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
0 → 100644
View file @
a2ccfb9f
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
repository
.
impexp
;
import
org.apache.atlas.entitytransform.BaseEntityHandler
;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.model.impexp.AtlasExportResult
;
import
org.apache.atlas.model.instance.AtlasEntity
;
import
org.apache.atlas.model.typedef.AtlasTypesDef
;
import
org.apache.atlas.repository.store.graph.v2.EntityImportStream
;
import
org.apache.atlas.type.AtlasType
;
import
org.apache.commons.collections.MapUtils
;
import
org.apache.commons.lang.StringUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.io.ByteArrayOutputStream
;
import
java.io.IOException
;
import
java.io.InputStream
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.zip.ZipEntry
;
import
java.util.zip.ZipInputStream
;
import
static
org
.
apache
.
atlas
.
AtlasErrorCode
.
IMPORT_ATTEMPTING_EMPTY_ZIP
;
public
class
ZipSourceDirect
implements
EntityImportStream
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
ZipSourceDirect
.
class
);
private
final
ZipInputStream
zipInputStream
;
private
int
currentPosition
;
private
ImportTransforms
importTransform
;
private
List
<
BaseEntityHandler
>
entityHandlers
;
private
AtlasTypesDef
typesDef
;
private
ZipEntry
zipEntryNext
;
private
int
streamSize
=
1
;
public
ZipSourceDirect
(
InputStream
inputStream
,
int
streamSize
)
throws
IOException
,
AtlasBaseException
{
this
.
zipInputStream
=
new
ZipInputStream
(
inputStream
);
this
.
streamSize
=
streamSize
;
prepareStreamForFetch
();
}
@Override
public
ImportTransforms
getImportTransform
()
{
return
this
.
importTransform
;
}
@Override
public
void
setImportTransform
(
ImportTransforms
importTransform
)
{
this
.
importTransform
=
importTransform
;
}
@Override
public
List
<
BaseEntityHandler
>
getEntityHandlers
()
{
return
entityHandlers
;
}
@Override
public
void
setEntityHandlers
(
List
<
BaseEntityHandler
>
entityHandlers
)
{
this
.
entityHandlers
=
entityHandlers
;
}
@Override
public
AtlasTypesDef
getTypesDef
()
throws
AtlasBaseException
{
return
this
.
typesDef
;
}
@Override
public
AtlasExportResult
getExportResult
()
throws
AtlasBaseException
{
return
new
AtlasExportResult
();
}
@Override
public
List
<
String
>
getCreationOrder
()
{
return
new
ArrayList
<>();
}
@Override
public
int
getPosition
()
{
return
currentPosition
;
}
@Override
public
AtlasEntity
.
AtlasEntityWithExtInfo
getEntityWithExtInfo
(
String
json
)
throws
AtlasBaseException
{
if
(
StringUtils
.
isEmpty
(
json
))
{
return
null
;
}
AtlasEntity
.
AtlasEntityWithExtInfo
entityWithExtInfo
=
convertFromJson
(
AtlasEntity
.
AtlasEntityWithExtInfo
.
class
,
json
);
if
(
importTransform
!=
null
)
{
entityWithExtInfo
=
importTransform
.
apply
(
entityWithExtInfo
);
}
if
(
entityHandlers
!=
null
)
{
applyTransformers
(
entityWithExtInfo
);
}
return
entityWithExtInfo
;
}
@Override
public
boolean
hasNext
()
{
return
(
this
.
zipEntryNext
!=
null
&&
!
zipEntryNext
.
getName
().
equals
(
ZipExportFileNames
.
ATLAS_EXPORT_ORDER_NAME
.
toEntryFileName
())
&&
!
zipEntryNext
.
getName
().
equals
(
ZipExportFileNames
.
ATLAS_EXPORT_INFO_NAME
.
toEntryFileName
()));
}
@Override
public
AtlasEntity
next
()
{
AtlasEntity
.
AtlasEntityWithExtInfo
entityWithExtInfo
=
getNextEntityWithExtInfo
();
return
entityWithExtInfo
!=
null
?
entityWithExtInfo
.
getEntity
()
:
null
;
}
@Override
public
AtlasEntity
.
AtlasEntityWithExtInfo
getNextEntityWithExtInfo
()
{
try
{
if
(
hasNext
())
{
String
json
=
moveNext
();
return
getEntityWithExtInfo
(
json
);
}
}
catch
(
AtlasBaseException
e
)
{
LOG
.
error
(
"getNextEntityWithExtInfo"
,
e
);
}
return
null
;
}
@Override
public
void
reset
()
{
currentPosition
=
0
;
}
@Override
public
AtlasEntity
getByGuid
(
String
guid
)
{
try
{
return
getEntity
(
guid
);
}
catch
(
AtlasBaseException
e
)
{
LOG
.
error
(
"getByGuid: {} failed!"
,
guid
,
e
);
return
null
;
}
}
@Override
public
void
onImportComplete
(
String
guid
)
{
}
@Override
public
void
setPosition
(
int
index
)
{
try
{
for
(
int
i
=
0
;
i
<
index
;
i
++)
{
moveNextEntry
();
}
}
catch
(
IOException
e
)
{
LOG
.
error
(
"Error setting position: {}. Position may be beyond the stream size."
,
index
);
}
}
@Override
public
void
setPositionUsingEntityGuid
(
String
guid
)
{
}
@Override
public
void
close
()
{
}
private
void
applyTransformers
(
AtlasEntity
.
AtlasEntityWithExtInfo
entityWithExtInfo
)
{
if
(
entityWithExtInfo
==
null
)
{
return
;
}
transform
(
entityWithExtInfo
.
getEntity
());
if
(
MapUtils
.
isNotEmpty
(
entityWithExtInfo
.
getReferredEntities
()))
{
for
(
AtlasEntity
e
:
entityWithExtInfo
.
getReferredEntities
().
values
())
{
transform
(
e
);
}
}
}
private
void
transform
(
AtlasEntity
e
)
{
for
(
BaseEntityHandler
handler
:
entityHandlers
)
{
handler
.
transform
(
e
);
}
}
private
<
T
>
T
convertFromJson
(
Class
<
T
>
clazz
,
String
jsonData
)
throws
AtlasBaseException
{
try
{
return
AtlasType
.
fromJson
(
jsonData
,
clazz
);
}
catch
(
Exception
e
)
{
throw
new
AtlasBaseException
(
"Error converting file to JSON."
,
e
);
}
}
private
AtlasEntity
getEntity
(
String
guid
)
throws
AtlasBaseException
{
AtlasEntity
.
AtlasEntityWithExtInfo
extInfo
=
getEntityWithExtInfo
(
guid
);
return
(
extInfo
!=
null
)
?
extInfo
.
getEntity
()
:
null
;
}
public
int
size
()
{
return
this
.
streamSize
;
}
private
String
moveNext
()
{
try
{
moveNextEntry
();
return
getJsonPayloadFromZipEntryStream
(
this
.
zipInputStream
);
}
catch
(
IOException
e
)
{
LOG
.
error
(
"moveNext failed!"
,
e
);
}
return
null
;
}
private
void
moveNextEntry
()
throws
IOException
{
this
.
zipEntryNext
=
this
.
zipInputStream
.
getNextEntry
();
this
.
currentPosition
++;
}
private
void
prepareStreamForFetch
()
throws
AtlasBaseException
,
IOException
{
moveNextEntry
();
if
(
this
.
zipEntryNext
==
null
)
{
throw
new
AtlasBaseException
(
IMPORT_ATTEMPTING_EMPTY_ZIP
,
"Attempting to import empty ZIP."
);
}
if
(
this
.
zipEntryNext
.
getName
().
equals
(
ZipExportFileNames
.
ATLAS_TYPESDEF_NAME
.
toEntryFileName
()))
{
String
json
=
getJsonPayloadFromZipEntryStream
(
this
.
zipInputStream
);
this
.
typesDef
=
AtlasType
.
fromJson
(
json
,
AtlasTypesDef
.
class
);
}
}
private
String
getJsonPayloadFromZipEntryStream
(
ZipInputStream
zipInputStream
)
{
try
{
final
int
BUFFER_LENGTH
=
4096
;
byte
[]
buf
=
new
byte
[
BUFFER_LENGTH
];
int
n
=
0
;
ByteArrayOutputStream
bos
=
new
ByteArrayOutputStream
();
while
((
n
=
zipInputStream
.
read
(
buf
,
0
,
BUFFER_LENGTH
))
>
-
1
)
{
bos
.
write
(
buf
,
0
,
n
);
}
return
bos
.
toString
();
}
catch
(
IOException
ex
)
{
LOG
.
error
(
"Error fetching string from entry!"
,
ex
);
}
return
null
;
}
}
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
View file @
a2ccfb9f
...
...
@@ -24,6 +24,7 @@ import org.apache.atlas.RequestContext;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.model.impexp.AtlasImportRequest
;
import
org.apache.atlas.repository.impexp.ImportService
;
import
org.apache.commons.lang.StringUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
...
...
@@ -32,11 +33,20 @@ import java.io.FileInputStream;
import
java.io.IOException
;
import
java.io.InputStream
;
import
java.net.InetAddress
;
import
java.util.zip.ZipFile
;
import
static
org
.
apache
.
atlas
.
AtlasConfiguration
.
MIGRATION_IMPORT_START_POSITION
;
public
class
ZipFileMigrationImporter
implements
Runnable
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
ZipFileMigrationImporter
.
class
);
private
static
String
ENV_USER_NAME
=
"user.name"
;
private
static
final
String
APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS
=
"atlas.migration.mode.workers"
;
private
static
final
String
APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE
=
"atlas.migration.mode.batch.size"
;
private
static
final
String
DEFAULT_NUMBER_OF_WORKDERS
=
"4"
;
private
static
final
String
DEFAULT_BATCH_SIZE
=
"100"
;
private
static
final
String
ZIP_FILE_COMMENT
=
"streamSize"
;
private
final
static
String
ENV_USER_NAME
=
"user.name"
;
private
final
ImportService
importService
;
private
final
String
fileToImport
;
...
...
@@ -52,7 +62,8 @@ public class ZipFileMigrationImporter implements Runnable {
FileWatcher
fileWatcher
=
new
FileWatcher
(
fileToImport
);
fileWatcher
.
start
();
performImport
(
new
FileInputStream
(
new
File
(
fileToImport
)));
int
streamSize
=
getStreamSizeFromComment
(
fileToImport
);
performImport
(
new
FileInputStream
(
new
File
(
fileToImport
)),
streamSize
);
}
catch
(
IOException
e
)
{
LOG
.
error
(
"Migration Import: IO Error!"
,
e
);
}
catch
(
AtlasBaseException
e
)
{
...
...
@@ -60,19 +71,44 @@ public class ZipFileMigrationImporter implements Runnable {
}
}
private
void
performImport
(
InputStream
fs
)
throws
AtlasBaseException
{
private
int
getStreamSizeFromComment
(
String
fileToImport
)
{
int
ret
=
1
;
try
{
ZipFile
zipFile
=
new
ZipFile
(
fileToImport
);
String
streamSizeComment
=
zipFile
.
getComment
();
ret
=
processZipFileStreamSizeComment
(
streamSizeComment
);
zipFile
.
close
();
}
catch
(
IOException
e
)
{
LOG
.
error
(
"Error opening ZIP file: {}"
,
fileToImport
,
e
);
}
return
ret
;
}
private
int
processZipFileStreamSizeComment
(
String
streamSizeComment
)
{
if
(!
StringUtils
.
isNotEmpty
(
streamSizeComment
)
||
!
StringUtils
.
startsWith
(
streamSizeComment
,
ZIP_FILE_COMMENT
))
{
return
1
;
}
String
s
=
StringUtils
.
substringAfter
(
streamSizeComment
,
":"
);
LOG
.
debug
(
"ZipFileMigrationImporter: streamSize: {}"
,
streamSizeComment
);
return
Integer
.
valueOf
(
s
);
}
private
void
performImport
(
InputStream
fs
,
int
streamSize
)
throws
AtlasBaseException
{
try
{
LOG
.
info
(
"Migration Import: {}: Starting..."
,
fileToImport
);
RequestContext
.
get
().
setUser
(
getUserNameFromEnvironment
(),
null
);
importService
.
run
(
fs
,
getImportRequest
(),
importService
.
run
(
fs
,
getImportRequest
(
streamSize
),
getUserNameFromEnvironment
(),
InetAddress
.
getLocalHost
().
getHostName
(),
InetAddress
.
getLocalHost
().
getHostAddress
());
}
catch
(
Exception
ex
)
{
LOG
.
error
(
"
Error loading zip for migration
"
,
ex
);
LOG
.
error
(
"
Migration Import: Error loading zip for migration!
"
,
ex
);
throw
new
AtlasBaseException
(
ex
);
}
finally
{
LOG
.
info
(
"Migration Import: {}: Done!"
,
fileToImport
);
...
...
@@ -83,8 +119,16 @@ public class ZipFileMigrationImporter implements Runnable {
return
System
.
getProperty
(
ENV_USER_NAME
);
}
private
AtlasImportRequest
getImportRequest
()
throws
AtlasException
{
return
new
AtlasImportRequest
();
private
AtlasImportRequest
getImportRequest
(
int
streamSize
)
throws
AtlasException
{
AtlasImportRequest
request
=
new
AtlasImportRequest
();
request
.
setSizeOption
(
streamSize
);
request
.
setOption
(
AtlasImportRequest
.
OPTION_KEY_MIGRATION
,
"true"
);
request
.
setOption
(
AtlasImportRequest
.
OPTION_KEY_NUM_WORKERS
,
getPropertyValue
(
APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS
,
DEFAULT_NUMBER_OF_WORKDERS
));
request
.
setOption
(
AtlasImportRequest
.
OPTION_KEY_BATCH_SIZE
,
getPropertyValue
(
APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE
,
DEFAULT_BATCH_SIZE
));
request
.
setOption
(
AtlasImportRequest
.
START_POSITION_KEY
,
Integer
.
toString
(
MIGRATION_IMPORT_START_POSITION
.
getInt
()));
return
request
;
}
private
String
getPropertyValue
(
String
property
,
String
defaultValue
)
throws
AtlasException
{
...
...
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
View file @
a2ccfb9f
...
...
@@ -81,9 +81,9 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
AtlasGraph
graph
=
getGraph
();
for
(
AtlasEntityType
entityType
:
typeRegistry
.
getAllEntityTypes
())
{
LOG
.
info
(
"finding entities of type {}"
,
entityType
.
getTypeName
());
LOG
.
info
(
"finding entities of type: {}"
,
entityType
.
getTypeName
());
Iterable
<
Object
>
iterable
=
graph
.
query
().
has
(
Constants
.
ENTITY_TYPE_PROPERTY_KEY
,
entityType
.
getTypeName
()).
vertexIds
();
LOG
.
info
(
"found entities of type: {}"
,
entityType
.
getTypeName
());
int
count
=
0
;
for
(
Iterator
<
Object
>
iter
=
iterable
.
iterator
();
iter
.
hasNext
();
)
{
...
...
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
View file @
a2ccfb9f
...
...
@@ -150,6 +150,14 @@ public interface AtlasEntityStore {
EntityMutationResponse
createOrUpdateForImport
(
EntityStream
entityStream
)
throws
AtlasBaseException
;
/**
* Create or update entities with parameters necessary for import process without commit. Caller will have to do take care of commit.
* @param entityStream AtlasEntityStream
* @return EntityMutationResponse Entity mutations operations with the corresponding set of entities on which these operations were performed
* @throws AtlasBaseException
*/
EntityMutationResponse
createOrUpdateForImportNoCommit
(
EntityStream
entityStream
)
throws
AtlasBaseException
;
/**
* Update a single entity
* @param objectId ID of the entity
* @param updatedEntityInfo updated entity information
...
...
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
View file @
a2ccfb9f
...
...
@@ -332,6 +332,11 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
}
@Override
public
EntityMutationResponse
createOrUpdateForImportNoCommit
(
EntityStream
entityStream
)
throws
AtlasBaseException
{
return
createOrUpdate
(
entityStream
,
false
,
true
,
true
);
}
@Override
@GraphTransaction
public
EntityMutationResponse
updateEntity
(
AtlasObjectId
objectId
,
AtlasEntityWithExtInfo
updatedEntityInfo
,
boolean
isPartialUpdate
)
throws
AtlasBaseException
{
if
(
LOG
.
isDebugEnabled
())
{
...
...
@@ -1210,8 +1215,10 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
ret
.
setGuidAssignments
(
context
.
getGuidAssignments
());
// Notify the change listeners
entityChangeNotifier
.
onEntitiesMutated
(
ret
,
RequestContext
.
get
().
isImportInProgress
());
if
(!
RequestContext
.
get
().
isImportInProgress
())
{
// Notify the change listeners
entityChangeNotifier
.
onEntitiesMutated
(
ret
,
RequestContext
.
get
().
isImportInProgress
());
}
if
(
LOG
.
isDebugEnabled
())
{
LOG
.
debug
(
"<== createOrUpdate()"
);
...
...
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
View file @
a2ccfb9f
...
...
@@ -929,6 +929,10 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
}
private
void
sendNotifications
(
AtlasRelationship
ret
,
OperationType
relationshipUpdate
)
throws
AtlasBaseException
{
if
(
entityChangeNotifier
==
null
)
{
return
;
}
entityChangeNotifier
.
notifyPropagatedEntities
();
if
(
notificationsEnabled
){
entityChangeNotifier
.
notifyRelationshipMutation
(
ret
,
relationshipUpdate
);
...
...
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
View file @
a2ccfb9f
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
View file @
a2ccfb9f
...
...
@@ -361,7 +361,9 @@ public class EntityGraphMapper {
updateLabels
(
vertex
,
labels
);
entityChangeNotifier
.
onLabelsUpdatedFromEntity
(
GraphHelper
.
getGuid
(
vertex
),
addedLabels
,
removedLabels
);
if
(
entityChangeNotifier
!=
null
)
{
entityChangeNotifier
.
onLabelsUpdatedFromEntity
(
GraphHelper
.
getGuid
(
vertex
),
addedLabels
,
removedLabels
);
}
}
public
void
addLabels
(
AtlasVertex
vertex
,
Set
<
String
>
labels
)
throws
AtlasBaseException
{
...
...
@@ -378,7 +380,10 @@ public class EntityGraphMapper {
if
(!
updatedLabels
.
equals
(
existingLabels
))
{
updateLabels
(
vertex
,
updatedLabels
);
updatedLabels
.
removeAll
(
existingLabels
);
entityChangeNotifier
.
onLabelsUpdatedFromEntity
(
GraphHelper
.
getGuid
(
vertex
),
updatedLabels
,
null
);
if
(
entityChangeNotifier
!=
null
)
{
entityChangeNotifier
.
onLabelsUpdatedFromEntity
(
GraphHelper
.
getGuid
(
vertex
),
updatedLabels
,
null
);
}
}
}
}
...
...
@@ -395,7 +400,10 @@ public class EntityGraphMapper {
if
(!
updatedLabels
.
equals
(
existingLabels
))
{
updateLabels
(
vertex
,
updatedLabels
);
existingLabels
.
removeAll
(
updatedLabels
);
entityChangeNotifier
.
onLabelsUpdatedFromEntity
(
GraphHelper
.
getGuid
(
vertex
),
null
,
existingLabels
);
if
(
entityChangeNotifier
!=
null
)
{
entityChangeNotifier
.
onLabelsUpdatedFromEntity
(
GraphHelper
.
getGuid
(
vertex
),
null
,
existingLabels
);
}
}
}
}
...
...
@@ -1948,7 +1956,9 @@ public class EntityGraphMapper {
Set
<
AtlasVertex
>
vertices
=
addedClassifications
.
get
(
classification
);
List
<
AtlasEntity
>
propagatedEntities
=
updateClassificationText
(
classification
,
vertices
);
entityChangeNotifier
.
onClassificationsAddedToEntities
(
propagatedEntities
,
Collections
.
singletonList
(
classification
));
if
(
entityChangeNotifier
!=
null
)
{
entityChangeNotifier
.
onClassificationsAddedToEntities
(
propagatedEntities
,
Collections
.
singletonList
(
classification
));
}
}
RequestContext
.
get
().
endMetricRecord
(
metric
);
...
...
@@ -2056,7 +2066,10 @@ public class EntityGraphMapper {
AtlasEntity
entity
=
updateClassificationText
(
entry
.
getKey
());
List
<
AtlasClassification
>
deletedClassificationNames
=
entry
.
getValue
();
entityChangeNotifier
.
onClassificationDeletedFromEntity
(
entity
,
deletedClassificationNames
);
if
(
entityChangeNotifier
!=
null
)
{
entityChangeNotifier
.
onClassificationDeletedFromEntity
(
entity
,
deletedClassificationNames
);
}
}
}
...
...
@@ -2283,17 +2296,19 @@ public class EntityGraphMapper {
notificationVertices
.
addAll
(
entitiesToPropagateTo
);
}
for
(
AtlasVertex
vertex
:
notificationVertices
)
{
String
entityGuid
=
GraphHelper
.
getGuid
(
vertex
);
AtlasEntity
entity
=
instanceConverter
.
getAndCacheEntity
(
entityGuid
,
ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES
);
if
(
entityChangeNotifier
!=
null
)
{
for
(
AtlasVertex
vertex
:
notificationVertices
)
{
String
entityGuid
=
GraphHelper
.
getGuid
(
vertex
);
AtlasEntity
entity
=
instanceConverter
.
getAndCacheEntity
(
entityGuid
,
ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES
);
if
(
isActive
(
entity
))
{
vertex
.
setProperty
(
CLASSIFICATION_TEXT_KEY
,
fullTextMapperV2
.
getClassificationTextForEntity
(
entity
));
entityChangeNotifier
.
onClassificationUpdatedToEntity
(
entity
,
updatedClassifications
);
if
(
isActive
(
entity
))
{
vertex
.
setProperty
(
CLASSIFICATION_TEXT_KEY
,
fullTextMapperV2
.
getClassificationTextForEntity
(
entity
));
entityChangeNotifier
.
onClassificationUpdatedToEntity
(
entity
,
updatedClassifications
);
}
}
}
if
(
MapUtils
.
isNotEmpty
(
removedPropagations
))
{
if
(
entityChangeNotifier
!=
null
&&
MapUtils
.
isNotEmpty
(
removedPropagations
))
{
for
(
AtlasClassification
classification
:
removedPropagations
.
keySet
())
{
List
<
AtlasVertex
>
propagatedVertices
=
removedPropagations
.
get
(
classification
);
List
<
AtlasEntity
>
propagatedEntities
=
updateClassificationText
(
classification
,
propagatedVertices
);
...
...
@@ -2526,7 +2541,7 @@ public class EntityGraphMapper {
private
List
<
AtlasEntity
>
updateClassificationText
(
AtlasClassification
classification
,
Collection
<
AtlasVertex
>
propagatedVertices
)
throws
AtlasBaseException
{
List
<
AtlasEntity
>
propagatedEntities
=
new
ArrayList
<>();
if
(
CollectionUtils
.
isNotEmpty
(
propagatedVertices
))
{
if
(
fullTextMapperV2
!=
null
&&
CollectionUtils
.
isNotEmpty
(
propagatedVertices
))
{
for
(
AtlasVertex
vertex
:
propagatedVertices
)
{
AtlasEntity
entity
=
instanceConverter
.
getAndCacheEntity
(
GraphHelper
.
getGuid
(
vertex
),
ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES
);
...
...
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/ImportStrategy.java
0 → 100644
View file @
a2ccfb9f
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
repository
.
store
.
graph
.
v2
.
bulkimport
;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.model.impexp.AtlasImportResult
;
import
org.apache.atlas.model.instance.EntityMutationResponse
;
import
org.apache.atlas.repository.store.graph.v2.EntityImportStream
;
public
abstract
class
ImportStrategy
{
public
abstract
EntityMutationResponse
run
(
EntityImportStream
entityStream
,
AtlasImportResult
importResult
)
throws
AtlasBaseException
;
}
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
0 → 100644
View file @
a2ccfb9f
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
repository
.
store
.
graph
.
v2
.
bulkimport
;
import
org.apache.atlas.AtlasErrorCode
;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.model.impexp.AtlasImportResult
;
import
org.apache.atlas.model.instance.EntityMutationResponse
;
import
org.apache.atlas.repository.converters.AtlasFormatConverters
;
import
org.apache.atlas.repository.converters.AtlasInstanceConverter
;
import
org.apache.atlas.repository.graph.AtlasGraphProvider
;
import
org.apache.atlas.repository.graphdb.AtlasGraph
;
import
org.apache.atlas.repository.store.graph.AtlasEntityStore
;
import
org.apache.atlas.repository.store.graph.AtlasRelationshipStore
;
import
org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate
;
import
org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2
;
import
org.apache.atlas.repository.store.graph.v2.AtlasRelationshipStoreV2
;
import
org.apache.atlas.repository.store.graph.v2.EntityGraphMapper
;
import
org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever
;
import
org.apache.atlas.repository.store.graph.v2.EntityImportStream
;
import
org.apache.atlas.repository.store.graph.v2.bulkimport.pc.EntityConsumerBuilder
;
import
org.apache.atlas.repository.store.graph.v2.bulkimport.pc.EntityCreationManager
;
import
org.apache.atlas.type.AtlasTypeRegistry
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
public
class
MigrationImport
extends
ImportStrategy
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
MigrationImport
.
class
);
private
final
AtlasTypeRegistry
typeRegistry
;
private
AtlasGraph
atlasGraph
;
private
EntityGraphRetriever
entityGraphRetriever
;
private
EntityGraphMapper
entityGraphMapper
;
private
AtlasEntityStore
entityStore
;
public
MigrationImport
(
AtlasGraphProvider
atlasGraphProvider
,
AtlasTypeRegistry
typeRegistry
)
{
this
.
typeRegistry
=
typeRegistry
;
setupEntityStore
(
atlasGraphProvider
,
typeRegistry
);
LOG
.
info
(
"MigrationImport: Using bulkLoading..."
);
}
public
EntityMutationResponse
run
(
EntityImportStream
entityStream
,
AtlasImportResult
importResult
)
throws
AtlasBaseException
{
if
(
entityStream
==
null
||
!
entityStream
.
hasNext
())
{
throw
new
AtlasBaseException
(
AtlasErrorCode
.
INVALID_PARAMETERS
,
"no entities to create/update."
);
}
if
(
importResult
.
getRequest
()
==
null
)
{
throw
new
AtlasBaseException
(
AtlasErrorCode
.
INVALID_PARAMETERS
,
"importResult should contain request"
);
}
int
index
=
0
;
int
streamSize
=
entityStream
.
size
();
EntityMutationResponse
ret
=
new
EntityMutationResponse
();
EntityCreationManager
creationManager
=
createEntityCreationManager
(
atlasGraph
,
importResult
,
streamSize
);
try
{
LOG
.
info
(
"Migration Import: Size: {}: Starting..."
,
streamSize
);
index
=
creationManager
.
read
(
entityStream
);
creationManager
.
drain
();
creationManager
.
extractResults
();
}
catch
(
Exception
ex
)
{
LOG
.
error
(
"Migration Import: Error: Current position: {}"
,
index
,
ex
);
}
finally
{
shutdownEntityCreationManager
(
creationManager
);
}
LOG
.
info
(
"Migration Import: Size: {}: Done!"
,
streamSize
);
return
ret
;
}
private
EntityCreationManager
createEntityCreationManager
(
AtlasGraph
threadedAtlasGraph
,
AtlasImportResult
importResult
,
int
streamSize
)
{
int
batchSize
=
importResult
.
getRequest
().
getOptionKeyBatchSize
();
int
numWorkers
=
getNumWorkers
(
importResult
.
getRequest
().
getOptionKeyNumWorkers
());
EntityConsumerBuilder
consumerBuilder
=
new
EntityConsumerBuilder
(
threadedAtlasGraph
,
entityStore
,
entityGraphRetriever
,
typeRegistry
,
batchSize
);
return
new
EntityCreationManager
(
consumerBuilder
,
batchSize
,
numWorkers
,
importResult
,
streamSize
);
}
private
static
int
getNumWorkers
(
int
numWorkersFromOptions
)
{
int
ret
=
(
numWorkersFromOptions
>
0
)
?
numWorkersFromOptions
:
1
;
LOG
.
info
(
"Migration Import: Setting numWorkers: {}"
,
ret
);
return
ret
;
}
private
void
setupEntityStore
(
AtlasGraphProvider
atlasGraphProvider
,
AtlasTypeRegistry
typeRegistry
)
{
this
.
entityGraphRetriever
=
new
EntityGraphRetriever
(
typeRegistry
);
this
.
atlasGraph
=
atlasGraphProvider
.
getBulkLoading
();
DeleteHandlerDelegate
deleteDelegate
=
new
DeleteHandlerDelegate
(
typeRegistry
);
AtlasRelationshipStore
relationshipStore
=
new
AtlasRelationshipStoreV2
(
typeRegistry
,
deleteDelegate
,
null
);
AtlasFormatConverters
formatConverters
=
new
AtlasFormatConverters
(
typeRegistry
);
AtlasInstanceConverter
instanceConverter
=
new
AtlasInstanceConverter
(
typeRegistry
,
formatConverters
);
this
.
entityGraphMapper
=
new
EntityGraphMapper
(
deleteDelegate
,
typeRegistry
,
atlasGraph
,
relationshipStore
,
null
,
instanceConverter
,
null
);
this
.
entityStore
=
new
AtlasEntityStoreV2
(
deleteDelegate
,
typeRegistry
,
null
,
entityGraphMapper
);
}
private
void
shutdownEntityCreationManager
(
EntityCreationManager
creationManager
)
{
try
{
creationManager
.
shutdown
();
}
catch
(
InterruptedException
e
)
{
LOG
.
error
(
"Migration Import: Shutdown: Interrupted!"
,
e
);
}
}
}
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImport.java
0 → 100644
View file @
a2ccfb9f
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
0 → 100644
View file @
a2ccfb9f
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
repository
.
store
.
graph
.
v2
.
bulkimport
.
pc
;
import
org.apache.atlas.GraphTransactionInterceptor
;
import
org.apache.atlas.RequestContext
;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.model.instance.AtlasEntity
;
import
org.apache.atlas.model.instance.AtlasEntityHeader
;
import
org.apache.atlas.model.instance.EntityMutationResponse
;
import
org.apache.atlas.pc.WorkItemConsumer
;
import
org.apache.atlas.repository.graphdb.AtlasGraph
;
import
org.apache.atlas.repository.graphdb.AtlasSchemaViolationException
;
import
org.apache.atlas.repository.store.graph.AtlasEntityStore
;
import
org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport
;
import
org.apache.atlas.repository.store.graph.v2.BulkImporterImpl
;
import
org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever
;
import
org.apache.atlas.type.AtlasTypeRegistry
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.collections.MapUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.concurrent.BlockingQueue
;
import
java.util.concurrent.atomic.AtomicLong
;
public
class
EntityConsumer
extends
WorkItemConsumer
<
AtlasEntity
.
AtlasEntityWithExtInfo
>
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
EntityConsumer
.
class
);
private
static
final
int
MAX_COMMIT_RETRY_COUNT
=
3
;
private
final
int
batchSize
;
private
AtomicLong
counter
=
new
AtomicLong
(
1
);
private
AtomicLong
currentBatch
=
new
AtomicLong
(
1
);
private
final
AtlasGraph
atlasGraph
;
private
final
AtlasEntityStore
entityStoreV2
;
private
final
AtlasTypeRegistry
typeRegistry
;
private
final
EntityGraphRetriever
entityGraphRetriever
;
private
List
<
AtlasEntity
.
AtlasEntityWithExtInfo
>
entityBuffer
=
new
ArrayList
<>();
private
List
<
EntityMutationResponse
>
localResults
=
new
ArrayList
<>();
public
EntityConsumer
(
AtlasGraph
atlasGraph
,
AtlasEntityStore
entityStore
,
EntityGraphRetriever
entityGraphRetriever
,
AtlasTypeRegistry
typeRegistry
,
BlockingQueue
queue
,
int
batchSize
)
{
super
(
queue
);
this
.
atlasGraph
=
atlasGraph
;
this
.
entityStoreV2
=
entityStore
;
this
.
entityGraphRetriever
=
entityGraphRetriever
;
this
.
typeRegistry
=
typeRegistry
;
this
.
batchSize
=
batchSize
;
}
@Override
protected
void
processItem
(
AtlasEntity
.
AtlasEntityWithExtInfo
entityWithExtInfo
)
{
int
delta
=
(
MapUtils
.
isEmpty
(
entityWithExtInfo
.
getReferredEntities
())
?
1
:
entityWithExtInfo
.
getReferredEntities
().
size
())
+
1
;
long
currentCount
=
counter
.
addAndGet
(
delta
);
currentBatch
.
addAndGet
(
delta
);
entityBuffer
.
add
(
entityWithExtInfo
);
try
{
processEntity
(
entityWithExtInfo
,
currentCount
);
attemptCommit
();
}
catch
(
Exception
e
)
{
LOG
.
info
(
"Data loss: Please re-submit!"
,
e
);
}
}
private
void
processEntity
(
AtlasEntity
.
AtlasEntityWithExtInfo
entityWithExtInfo
,
long
currentCount
)
{
try
{
RequestContext
.
get
().
setImportInProgress
(
true
);
AtlasEntityStreamForImport
oneEntityStream
=
new
AtlasEntityStreamForImport
(
entityWithExtInfo
,
null
);
LOG
.
debug
(
"Processing: {}"
,
currentCount
);
EntityMutationResponse
result
=
entityStoreV2
.
createOrUpdateForImportNoCommit
(
oneEntityStream
);
localResults
.
add
(
result
);
}
catch
(
AtlasBaseException
e
)
{
addResult
(
entityWithExtInfo
.
getEntity
().
getGuid
());
LOG
.
warn
(
"Exception: {}"
,
entityWithExtInfo
.
getEntity
().
getGuid
(),
e
);
}
catch
(
AtlasSchemaViolationException
e
)
{
if
(
LOG
.
isDebugEnabled
())
{
LOG
.
debug
(
"Entity: {}"
,
entityWithExtInfo
.
getEntity
().
getGuid
(),
e
);
}
BulkImporterImpl
.
updateVertexGuid
(
typeRegistry
,
entityGraphRetriever
,
entityWithExtInfo
.
getEntity
());
}
}
private
void
attemptCommit
()
{
if
(
currentBatch
.
get
()
<
batchSize
)
{
return
;
}
doCommit
();
}
@Override
protected
void
doCommit
()
{
for
(
int
retryCount
=
1
;
retryCount
<=
MAX_COMMIT_RETRY_COUNT
;
retryCount
++)
{
if
(
commitWithRetry
(
retryCount
))
{
return
;
}
}
LOG
.
error
(
"Retries exceeded! Potential data loss! Please correct data and re-attempt. Buffer: {}: Counter: {}"
,
entityBuffer
.
size
(),
counter
.
get
());
clear
();
}
@Override
protected
void
commitDirty
()
{
super
.
commitDirty
();
LOG
.
info
(
"Total: Commit: {}"
,
counter
.
get
());
counter
.
set
(
0
);
}
private
boolean
commitWithRetry
(
int
retryCount
)
{
try
{
atlasGraph
.
commit
();
if
(
LOG
.
isDebugEnabled
())
{
LOG
.
debug
(
"Commit: Done!: Buffer: {}: Batch: {}: Counter: {}"
,
entityBuffer
.
size
(),
currentBatch
.
get
(),
counter
.
get
());
}
dispatchResults
();
return
true
;
}
catch
(
Exception
ex
)
{
rollbackPauseRetry
(
retryCount
,
ex
);
return
false
;
}
}
private
void
rollbackPauseRetry
(
int
retryCount
,
Exception
ex
)
{
atlasGraph
.
rollback
();
clearCache
();
LOG
.
error
(
"Rollback: Done! Buffer: {}: Counter: {}: Retry count: {}"
,
entityBuffer
.
size
(),
counter
.
get
(),
retryCount
);
pause
(
retryCount
);
if
(
ex
.
getClass
().
getName
().
endsWith
(
"JanusGraphException"
)
&&
retryCount
>=
MAX_COMMIT_RETRY_COUNT
)
{
LOG
.
warn
(
"Commit error! Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}"
,
entityBuffer
.
size
(),
counter
.
get
(),
retryCount
,
ex
);
}
else
{
LOG
.
info
(
"Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}"
,
entityBuffer
.
size
(),
counter
.
get
(),
retryCount
);
}
retryProcessEntity
(
retryCount
);
}
private
void
retryProcessEntity
(
int
retryCount
)
{
LOG
.
info
(
"Replaying: Starting!: Buffer: {}: Retry count: {}"
,
entityBuffer
.
size
(),
retryCount
);
for
(
AtlasEntity
.
AtlasEntityWithExtInfo
e
:
entityBuffer
)
{
processEntity
(
e
,
counter
.
get
());
}
LOG
.
info
(
"Replaying: Done!: Buffer: {}: Retry count: {}"
,
entityBuffer
.
size
(),
retryCount
);
}
private
void
dispatchResults
()
{
localResults
.
stream
().
forEach
(
x
->
{
addResultsFromResponse
(
x
.
getCreatedEntities
());
addResultsFromResponse
(
x
.
getUpdatedEntities
());
addResultsFromResponse
(
x
.
getDeletedEntities
());
});
clear
();
}
private
void
pause
(
int
retryCount
)
{
try
{
Thread
.
sleep
(
1000
*
retryCount
);
}
catch
(
InterruptedException
e
)
{
LOG
.
error
(
"pause: Interrupted!"
,
e
);
}
}
private
void
addResultsFromResponse
(
List
<
AtlasEntityHeader
>
entities
)
{
if
(
CollectionUtils
.
isEmpty
(
entities
))
{
return
;
}
for
(
AtlasEntityHeader
eh
:
entities
)
{
addResult
(
eh
.
getGuid
());
}
}
private
void
clear
()
{
localResults
.
clear
();
entityBuffer
.
clear
();
clearCache
();
currentBatch
.
set
(
0
);
}
private
void
clearCache
()
{
GraphTransactionInterceptor
.
clearCache
();
RequestContext
.
get
().
clearCache
();
}
}
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java
0 → 100644
View file @
a2ccfb9f
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
repository
.
store
.
graph
.
v2
.
bulkimport
.
pc
;
import
org.apache.atlas.model.instance.AtlasEntity
;
import
org.apache.atlas.pc.WorkItemBuilder
;
import
org.apache.atlas.repository.graphdb.AtlasGraph
;
import
org.apache.atlas.repository.store.graph.AtlasEntityStore
;
import
org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever
;
import
org.apache.atlas.type.AtlasTypeRegistry
;
import
java.util.concurrent.BlockingQueue
;
public
class
EntityConsumerBuilder
implements
WorkItemBuilder
<
EntityConsumer
,
AtlasEntity
.
AtlasEntityWithExtInfo
>
{
private
AtlasGraph
atlasGraph
;
private
AtlasEntityStore
entityStore
;
private
final
EntityGraphRetriever
entityGraphRetriever
;
private
final
AtlasTypeRegistry
typeRegistry
;
private
int
batchSize
;
public
EntityConsumerBuilder
(
AtlasGraph
atlasGraph
,
AtlasEntityStore
entityStore
,
EntityGraphRetriever
entityGraphRetriever
,
AtlasTypeRegistry
typeRegistry
,
int
batchSize
)
{
this
.
atlasGraph
=
atlasGraph
;
this
.
entityStore
=
entityStore
;
this
.
entityGraphRetriever
=
entityGraphRetriever
;
this
.
typeRegistry
=
typeRegistry
;
this
.
batchSize
=
batchSize
;
}
@Override
public
EntityConsumer
build
(
BlockingQueue
<
AtlasEntity
.
AtlasEntityWithExtInfo
>
queue
)
{
return
new
EntityConsumer
(
atlasGraph
,
entityStore
,
entityGraphRetriever
,
typeRegistry
,
queue
,
this
.
batchSize
);
}
}
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
0 → 100644
View file @
a2ccfb9f
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
repository
.
store
.
graph
.
v2
.
bulkimport
.
pc
;
import
org.apache.atlas.model.impexp.AtlasImportResult
;
import
org.apache.atlas.model.instance.AtlasEntity
;
import
org.apache.atlas.pc.WorkItemBuilder
;
import
org.apache.atlas.pc.WorkItemManager
;
import
org.apache.atlas.repository.store.graph.v2.BulkImporterImpl
;
import
org.apache.atlas.repository.store.graph.v2.EntityImportStream
;
import
org.apache.commons.lang.StringUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
public
class
EntityCreationManager
<
AtlasEntityWithExtInfo
>
extends
WorkItemManager
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
EntityCreationManager
.
class
);
private
static
final
String
WORKER_PREFIX
=
"migration-import"
;
private
final
StatusReporter
<
String
,
String
>
statusReporter
;
private
final
AtlasImportResult
importResult
;
private
final
int
streamSize
;
private
final
long
STATUS_REPORT_TIMEOUT_DURATION
=
5
*
60
*
1000
;
// 5 min
private
String
currentTypeName
;
private
float
currentPercent
;
public
EntityCreationManager
(
WorkItemBuilder
builder
,
int
batchSize
,
int
numWorkers
,
AtlasImportResult
importResult
,
int
streamSize
)
{
super
(
builder
,
WORKER_PREFIX
,
batchSize
,
numWorkers
,
true
);
this
.
importResult
=
importResult
;
this
.
streamSize
=
streamSize
;
this
.
statusReporter
=
new
StatusReporter
<>(
STATUS_REPORT_TIMEOUT_DURATION
);
}
public
int
read
(
EntityImportStream
entityStream
)
{
int
currentIndex
=
0
;
AtlasEntity
.
AtlasEntityWithExtInfo
entityWithExtInfo
;
while
((
entityWithExtInfo
=
entityStream
.
getNextEntityWithExtInfo
())
!=
null
)
{
AtlasEntity
entity
=
entityWithExtInfo
!=
null
?
entityWithExtInfo
.
getEntity
()
:
null
;
if
(
entity
==
null
)
{
continue
;
}
try
{
produce
(
currentIndex
++,
entity
.
getTypeName
(),
entityWithExtInfo
);
}
catch
(
Throwable
e
)
{
LOG
.
warn
(
"Exception: {}"
,
entity
.
getGuid
(),
e
);
break
;
}
}
return
currentIndex
;
}
private
void
produce
(
int
currentIndex
,
String
typeName
,
AtlasEntity
.
AtlasEntityWithExtInfo
entityWithExtInfo
)
{
String
previousTypeName
=
getCurrentTypeName
();
if
(
StringUtils
.
isNotEmpty
(
typeName
)
&&
StringUtils
.
isNotEmpty
(
previousTypeName
)
&&
!
StringUtils
.
equals
(
previousTypeName
,
typeName
))
{
LOG
.
info
(
"Waiting: '{}' to complete..."
,
previousTypeName
);
super
.
drain
();
LOG
.
info
(
"Switching entity type processing: From: '{}' To: '{}'..."
,
previousTypeName
,
typeName
);
}
setCurrentTypeName
(
typeName
);
statusReporter
.
produced
(
entityWithExtInfo
.
getEntity
().
getGuid
(),
String
.
format
(
"%s:%s"
,
entityWithExtInfo
.
getEntity
().
getTypeName
(),
currentIndex
));
super
.
checkProduce
(
entityWithExtInfo
);
extractResults
();
}
public
void
extractResults
()
{
Object
result
;
while
(((
result
=
getResults
().
poll
()))
!=
null
)
{
statusReporter
.
processed
((
String
)
result
);
}
logStatus
();
}
private
void
logStatus
()
{
String
ack
=
statusReporter
.
ack
();
if
(
StringUtils
.
isEmpty
(
ack
))
{
return
;
}
String
[]
split
=
ack
.
split
(
":"
);
if
(
split
.
length
==
0
||
split
.
length
<
2
)
{
return
;
}
importResult
.
incrementMeticsCounter
(
split
[
0
]);
this
.
currentPercent
=
updateImportMetrics
(
split
[
0
],
Integer
.
parseInt
(
split
[
1
]),
getStreamSize
(),
getCurrentPercent
());
}
private
static
float
updateImportMetrics
(
String
typeNameGuid
,
int
currentIndex
,
int
streamSize
,
float
currentPercent
)
{
String
lastEntityImported
=
String
.
format
(
"entity:last-imported:%s:(%s)"
,
typeNameGuid
,
currentIndex
);
return
BulkImporterImpl
.
updateImportProgress
(
LOG
,
currentIndex
,
streamSize
,
currentPercent
,
lastEntityImported
);
}
private
String
getCurrentTypeName
()
{
return
this
.
currentTypeName
;
}
private
void
setCurrentTypeName
(
String
typeName
)
{
this
.
currentTypeName
=
typeName
;
}
private
float
getCurrentPercent
()
{
return
this
.
currentPercent
;
}
private
int
getStreamSize
()
{
return
this
.
streamSize
;
}
}
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/StatusReporter.java
0 → 100644
View file @
a2ccfb9f
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
repository
.
store
.
graph
.
v2
.
bulkimport
.
pc
;
import
org.apache.atlas.v1.typesystem.types.utils.TypesUtil
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.Arrays
;
import
java.util.HashSet
;
import
java.util.LinkedHashMap
;
import
java.util.Map
;
import
java.util.Set
;
public
class
StatusReporter
<
T
,
U
>
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
StatusReporter
.
class
);
private
Map
<
T
,
U
>
producedItems
=
new
LinkedHashMap
<>();
private
Set
<
T
>
processedSet
=
new
HashSet
<>();
private
TypesUtil
.
Pair
<
T
,
Long
>
watchedItem
;
private
final
long
timeOut
;
public
StatusReporter
(
long
timeOut
)
{
this
.
timeOut
=
timeOut
;
}
public
void
produced
(
T
item
,
U
index
)
{
this
.
producedItems
.
put
(
item
,
index
);
}
public
void
processed
(
T
item
)
{
this
.
processedSet
.
add
(
item
);
}
public
void
processed
(
T
[]
index
)
{
this
.
processedSet
.
addAll
(
Arrays
.
asList
(
index
));
}
public
U
ack
()
{
U
ack
=
null
;
U
ret
;
Map
.
Entry
<
T
,
U
>
firstElement
;
do
{
firstElement
=
getFirstElement
(
this
.
producedItems
);
ret
=
completionIndex
(
firstElement
);
if
(
ret
!=
null
)
{
ack
=
ret
;
}
}
while
(
ret
!=
null
);
return
addToWatchIfNeeded
(
ack
,
firstElement
);
}
private
U
addToWatchIfNeeded
(
U
ack
,
Map
.
Entry
<
T
,
U
>
firstElement
)
{
if
(
ack
==
null
&&
firstElement
!=
null
)
{
ack
=
addToWatch
(
firstElement
.
getKey
());
}
else
{
resetWatchItem
();
}
return
ack
;
}
private
void
resetWatchItem
()
{
this
.
watchedItem
=
null
;
}
private
U
addToWatch
(
T
key
)
{
createNewWatchItem
(
key
);
if
(!
hasTimedOut
(
this
.
watchedItem
))
{
return
null
;
}
T
producedItemKey
=
this
.
watchedItem
.
left
;
resetWatchItem
();
LOG
.
warn
(
"Item: {}: Was produced but not successfully processed!"
,
producedItemKey
);
return
this
.
producedItems
.
get
(
producedItemKey
);
}
private
void
createNewWatchItem
(
T
key
)
{
if
(
this
.
watchedItem
!=
null
)
{
return
;
}
this
.
watchedItem
=
new
TypesUtil
.
Pair
<
T
,
Long
>(
key
,
System
.
currentTimeMillis
());
}
private
boolean
hasTimedOut
(
TypesUtil
.
Pair
<
T
,
Long
>
watchedItem
)
{
if
(
watchedItem
==
null
)
{
return
false
;
}
return
(
System
.
currentTimeMillis
()
-
watchedItem
.
right
)
>=
timeOut
;
}
private
Map
.
Entry
<
T
,
U
>
getFirstElement
(
Map
<
T
,
U
>
map
)
{
if
(
map
.
isEmpty
())
{
return
null
;
}
return
map
.
entrySet
().
iterator
().
next
();
}
private
U
completionIndex
(
Map
.
Entry
<
T
,
U
>
lookFor
)
{
U
ack
=
null
;
if
(
lookFor
==
null
||
!
processedSet
.
contains
(
lookFor
.
getKey
()))
{
return
ack
;
}
ack
=
lookFor
.
getValue
();
producedItems
.
remove
(
lookFor
.
getKey
());
processedSet
.
remove
(
lookFor
);
return
ack
;
}
}
This diff is collapsed.
Click to expand it.
repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
View file @
a2ccfb9f
...
...
@@ -136,6 +136,11 @@ public class ImportServiceTest extends ExportImportTestBase {
return
getZipSource
(
"dup_col_deleted.zip"
);
}
@DataProvider
(
name
=
"zipDirect1"
)
public
static
Object
[][]
getZipDirect
(
ITestContext
context
)
throws
IOException
,
AtlasBaseException
{
return
getZipSource
(
"dup_col_deleted.zip"
);
}
@Test
(
dataProvider
=
"sales"
)
public
void
importDB1
(
InputStream
inputStream
)
throws
AtlasBaseException
,
IOException
{
loadBaseModel
();
...
...
@@ -530,6 +535,17 @@ public class ImportServiceTest extends ExportImportTestBase {
}
}
@Test
(
dataProvider
=
"zipDirect1"
)
public
void
zipSourceDirect
(
InputStream
inputStream
)
throws
IOException
,
AtlasBaseException
{
loadBaseModel
();
loadFsModel
();
loadHiveModel
();
runImportWithNoParameters
(
importService
,
inputStream
);
}
private
AtlasImportRequest
getImportRequest
(
String
replicatedFrom
){
AtlasImportRequest
importRequest
=
getDefaultImportRequest
();
...
...
This diff is collapsed.
Click to expand it.
repository/src/test/java/org/apache/atlas/repository/impexp/MigrationImportTest.java
0 → 100644
View file @
a2ccfb9f
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
repository
.
impexp
;
import
com.google.inject.Inject
;
import
org.apache.atlas.TestModules
;
import
org.apache.atlas.discovery.EntityDiscoveryService
;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.model.impexp.AtlasImportRequest
;
import
org.apache.atlas.model.impexp.AtlasImportResult
;
import
org.apache.atlas.model.instance.EntityMutationResponse
;
import
org.apache.atlas.repository.graphdb.AtlasGraph
;
import
org.apache.atlas.repository.store.graph.AtlasEntityStore
;
import
org.apache.atlas.repository.store.graph.v2.bulkimport.MigrationImport
;
import
org.apache.atlas.store.AtlasTypeDefStore
;
import
org.apache.atlas.type.AtlasTypeRegistry
;
import
org.testng.annotations.Guice
;
import
org.testng.annotations.Test
;
import
java.io.IOException
;
import
java.io.InputStream
;
import
static
org
.
testng
.
Assert
.
assertNotNull
;
@Guice
(
modules
=
TestModules
.
TestOnlyModule
.
class
)
public
class
MigrationImportTest
extends
ExportImportTestBase
{
private
final
ImportService
importService
;
@Inject
AtlasTypeRegistry
typeRegistry
;
@Inject
private
AtlasTypeDefStore
typeDefStore
;
@Inject
private
EntityDiscoveryService
discoveryService
;
@Inject
AtlasEntityStore
entityStore
;
@Inject
AtlasGraph
atlasGraph
;
@Inject
public
MigrationImportTest
(
ImportService
importService
)
{
this
.
importService
=
importService
;
}
@Test
public
void
simpleImport
()
throws
IOException
,
AtlasBaseException
{
InputStream
inputStream
=
ZipFileResourceTestUtils
.
getFileInputStream
(
"zip-direct-2.zip"
);
AtlasImportRequest
importRequest
=
new
AtlasImportRequest
();
importRequest
.
setOption
(
"migration"
,
"true"
);
AtlasImportResult
result
=
importService
.
run
(
inputStream
,
importRequest
,
null
,
null
,
null
);
assertNotNull
(
result
);
}
}
This diff is collapsed.
Click to expand it.
repository/src/test/java/org/apache/atlas/repository/impexp/StatusReporterTest.java
0 → 100644
View file @
a2ccfb9f
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
repository
.
impexp
;
import
org.apache.atlas.repository.store.graph.v2.bulkimport.pc.StatusReporter
;
import
org.testng.annotations.Test
;
import
static
org
.
testng
.
Assert
.
assertEquals
;
import
static
org
.
testng
.
Assert
.
assertNull
;
public
class
StatusReporterTest
{
@Test
public
void
noneProducedNoneReported
()
{
StatusReporter
<
Integer
,
Integer
>
statusReporter
=
new
StatusReporter
<>(
100
);
assertNull
(
statusReporter
.
ack
());
}
@Test
public
void
producedButNotAcknowledged
()
{
StatusReporter
<
Integer
,
Integer
>
statusReporter
=
createStatusReportWithItems
();
assertNull
(
statusReporter
.
ack
());
}
@Test
public
void
producedAcknowledged
()
{
StatusReporter
<
Integer
,
Integer
>
statusReporter
=
createStatusReportWithItems
();
statusReporter
.
processed
(
1
);
assertEquals
(
java
.
util
.
Optional
.
of
(
100
).
get
(),
statusReporter
.
ack
());
}
@Test
public
void
producedAcknowledgeMaxAvailableInSequence
()
{
StatusReporter
<
Integer
,
Integer
>
statusReporter
=
createStatusReportWithItems
();
statusReporter
.
processed
(
new
Integer
[]{
1
,
3
,
5
});
assertEquals
(
java
.
util
.
Optional
.
of
(
100
).
get
(),
statusReporter
.
ack
());
}
@Test
public
void
producedAcknowledgeMaxAvailableInSequence2
()
{
StatusReporter
<
Integer
,
Integer
>
statusReporter
=
createStatusReportWithItems
();
statusReporter
.
processed
(
new
Integer
[]{
1
,
2
,
3
,
6
,
5
});
assertEquals
(
java
.
util
.
Optional
.
of
(
300
).
get
(),
statusReporter
.
ack
());
}
@Test
public
void
producedSetDisjointWithAckSet
()
{
StatusReporter
<
Integer
,
Integer
>
statusReporter
=
new
StatusReporter
(
100
);
statusReporter
.
produced
(
11
,
1000
);
statusReporter
.
produced
(
12
,
2000
);
statusReporter
.
produced
(
13
,
3000
);
statusReporter
.
processed
(
new
Integer
[]{
1
,
11
,
12
,
13
});
assertEquals
(
java
.
util
.
Optional
.
of
(
3000
).
get
(),
statusReporter
.
ack
());
}
@Test
public
void
missingAck
()
throws
InterruptedException
{
StatusReporter
<
Integer
,
Integer
>
statusReporter
=
createStatusReportWithItems
(
2
,
3
,
4
);
assertNull
(
statusReporter
.
ack
());
Thread
.
sleep
(
1002
);
assertEquals
(
java
.
util
.
Optional
.
of
(
100
).
get
(),
statusReporter
.
ack
());
}
private
StatusReporter
<
Integer
,
Integer
>
createStatusReportWithItems
(
Integer
...
processed
)
{
StatusReporter
<
Integer
,
Integer
>
statusReporter
=
new
StatusReporter
(
1000
);
statusReporter
.
produced
(
1
,
100
);
statusReporter
.
produced
(
2
,
200
);
statusReporter
.
produced
(
3
,
300
);
statusReporter
.
produced
(
4
,
400
);
statusReporter
.
produced
(
5
,
500
);
statusReporter
.
produced
(
6
,
600
);
statusReporter
.
processed
(
processed
);
return
statusReporter
;
}
}
This diff is collapsed.
Click to expand it.
repository/src/test/java/org/apache/atlas/repository/impexp/ZipDirectTest.java
0 → 100644
View file @
a2ccfb9f
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
repository
.
impexp
;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.model.impexp.AtlasExportResult
;
import
org.apache.atlas.model.instance.AtlasEntity
;
import
org.apache.atlas.model.typedef.AtlasTypesDef
;
import
org.testng.annotations.Test
;
import
java.io.IOException
;
import
java.io.InputStream
;
import
static
org
.
testng
.
Assert
.
assertEquals
;
import
static
org
.
testng
.
Assert
.
assertNotNull
;
import
static
org
.
testng
.
Assert
.
assertTrue
;
public
class
ZipDirectTest
{
@Test
(
expectedExceptions
=
AtlasBaseException
.
class
)
public
void
loadFileEmpty
()
throws
IOException
,
AtlasBaseException
{
InputStream
inputStream
=
ZipFileResourceTestUtils
.
getFileInputStream
(
"zip-direct-1.zip"
);
new
ZipSourceDirect
(
inputStream
,
1
);
}
@Test
public
void
loadFile
()
throws
IOException
,
AtlasBaseException
{
final
int
EXPECTED_ENTITY_COUNT
=
3434
;
InputStream
inputStream
=
ZipFileResourceTestUtils
.
getFileInputStream
(
"zip-direct-2.zip"
);
ZipSourceDirect
zipSourceDirect
=
new
ZipSourceDirect
(
inputStream
,
EXPECTED_ENTITY_COUNT
);
assertNotNull
(
zipSourceDirect
);
assertNotNull
(
zipSourceDirect
.
getTypesDef
());
assertTrue
(
zipSourceDirect
.
getTypesDef
().
getEntityDefs
().
size
()
>
0
);
assertNotNull
(
zipSourceDirect
.
getExportResult
());
int
count
=
0
;
AtlasEntity
.
AtlasEntityWithExtInfo
entityWithExtInfo
;
while
((
entityWithExtInfo
=
zipSourceDirect
.
getNextEntityWithExtInfo
())
!=
null
)
{
assertNotNull
(
entityWithExtInfo
);
count
++;
}
assertEquals
(
count
,
EXPECTED_ENTITY_COUNT
);
}
}
This diff is collapsed.
Click to expand it.
repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
View file @
a2ccfb9f
...
...
@@ -317,7 +317,9 @@ public class ZipFileResourceTestUtils {
}
public
static
AtlasImportRequest
getDefaultImportRequest
()
{
return
new
AtlasImportRequest
();
AtlasImportRequest
atlasImportRequest
=
new
AtlasImportRequest
();
atlasImportRequest
.
setOption
(
"migration"
,
"true"
);
return
atlasImportRequest
;
}
...
...
@@ -336,7 +338,8 @@ public class ZipFileResourceTestUtils {
final
String
hostName
=
"localhost"
;
final
String
userName
=
"admin"
;
AtlasImportResult
result
=
importService
.
run
(
inputStream
,
userName
,
hostName
,
requestingIP
);
AtlasImportRequest
request
=
getDefaultImportRequest
();
AtlasImportResult
result
=
runImportWithParameters
(
importService
,
request
,
inputStream
);
assertEquals
(
result
.
getOperationStatus
(),
AtlasImportResult
.
OperationStatus
.
SUCCESS
);
return
result
;
}
...
...
This diff is collapsed.
Click to expand it.
repository/src/test/resources/zip-direct-1.zip
0 → 100644
View file @
a2ccfb9f
File added
This diff is collapsed.
Click to expand it.
repository/src/test/resources/zip-direct-2.zip
0 → 100644
View file @
a2ccfb9f
File added
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment