Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
atlas
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
dataplatform
atlas
Commits
54042d35
Commit
54042d35
authored
5 years ago
by
Ashutosh Mestry
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
DataMigration: Automatic resume.
parent
a2ccfb9f
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
404 additions
and
24 deletions
+404
-24
0010-base_model.json
addons/models/0000-Area0/0010-base_model.json
+50
-0
MigrationImportStatus.java
...g/apache/atlas/model/migration/MigrationImportStatus.java
+98
-0
DataMigrationService.java
...ache/atlas/repository/migration/DataMigrationService.java
+2
-2
DataMigrationStatusService.java
...tlas/repository/migration/DataMigrationStatusService.java
+104
-0
ZipFileMigrationImporter.java
.../atlas/repository/migration/ZipFileMigrationImporter.java
+24
-9
MigrationImportStatusDTO.java
...apache/atlas/repository/ogm/MigrationImportStatusDTO.java
+103
-0
BulkImporterImpl.java
...che/atlas/repository/store/graph/v2/BulkImporterImpl.java
+5
-3
MigrationImport.java
...repository/store/graph/v2/bulkimport/MigrationImport.java
+8
-5
EntityCreationManager.java
...y/store/graph/v2/bulkimport/pc/EntityCreationManager.java
+10
-5
No files found.
addons/models/0000-Area0/0010-base_model.json
View file @
54042d35
...
...
@@ -256,6 +256,56 @@
]
},
{
"name"
:
"__MigrationImportStatus"
,
"superTypes"
:
[
"__internal"
],
"serviceType"
:
"atlas_core"
,
"typeVersion"
:
"1.0"
,
"attributeDefs"
:
[
{
"name"
:
"name"
,
"typeName"
:
"string"
,
"cardinality"
:
"SINGLE"
,
"isIndexable"
:
true
,
"isOptional"
:
false
,
"isUnique"
:
true
},
{
"name"
:
"size"
,
"typeName"
:
"int"
,
"cardinality"
:
"SINGLE"
,
"isIndexable"
:
true
,
"isOptional"
:
true
,
"isUnique"
:
false
},
{
"name"
:
"position"
,
"typeName"
:
"string"
,
"cardinality"
:
"SINGLE"
,
"isIndexable"
:
true
,
"isOptional"
:
true
,
"isUnique"
:
false
},
{
"name"
:
"startTime"
,
"typeName"
:
"long"
,
"cardinality"
:
"SINGLE"
,
"isIndexable"
:
true
,
"isOptional"
:
true
,
"isUnique"
:
false
},
{
"name"
:
"endTime"
,
"typeName"
:
"long"
,
"cardinality"
:
"SINGLE"
,
"isIndexable"
:
true
,
"isOptional"
:
true
,
"isUnique"
:
false
}
]
},
{
"name"
:
"__AtlasUserSavedSearch"
,
"superTypes"
:
[
"__internal"
...
...
This diff is collapsed.
Click to expand it.
intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java
0 → 100644
View file @
54042d35
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
model
.
migration
;
import
com.fasterxml.jackson.annotation.JsonAutoDetect
;
import
com.fasterxml.jackson.annotation.JsonIgnoreProperties
;
import
com.fasterxml.jackson.databind.annotation.JsonSerialize
;
import
org.apache.atlas.model.AtlasBaseModelObject
;
import
java.io.Serializable
;
import
java.util.Date
;
import
static
com
.
fasterxml
.
jackson
.
annotation
.
JsonAutoDetect
.
Visibility
.
NONE
;
import
static
com
.
fasterxml
.
jackson
.
annotation
.
JsonAutoDetect
.
Visibility
.
PUBLIC_ONLY
;
@JsonAutoDetect
(
getterVisibility
=
PUBLIC_ONLY
,
setterVisibility
=
PUBLIC_ONLY
,
fieldVisibility
=
NONE
)
@JsonSerialize
(
include
=
JsonSerialize
.
Inclusion
.
NON_NULL
)
@JsonIgnoreProperties
(
ignoreUnknown
=
true
)
public
class
MigrationImportStatus
extends
AtlasBaseModelObject
implements
Serializable
{
private
String
name
;
private
int
size
;
private
long
startTime
;
private
long
endTime
;
private
String
position
;
public
MigrationImportStatus
()
{
}
public
MigrationImportStatus
(
String
name
)
{
this
.
name
=
name
;
}
public
String
getName
()
{
return
name
;
}
public
void
setName
(
String
name
)
{
this
.
name
=
name
;
}
public
int
getSize
()
{
return
size
;
}
public
void
setSize
(
int
size
)
{
this
.
size
=
size
;
}
public
long
getStartTime
()
{
return
startTime
;
}
public
void
setStartTime
(
long
startTime
)
{
this
.
startTime
=
startTime
;
}
public
long
getEndTime
()
{
return
endTime
;
}
public
void
setEndTime
(
long
endTime
)
{
this
.
endTime
=
endTime
;
}
public
void
setPosition
(
String
position
)
{
this
.
position
=
position
;
}
public
String
getPosition
()
{
return
this
.
position
;
}
@Override
protected
StringBuilder
toString
(
StringBuilder
sb
)
{
sb
.
append
(
", name="
).
append
(
name
);
sb
.
append
(
", size="
).
append
(
size
);
sb
.
append
(
", startTime="
).
append
(
startTime
);
sb
.
append
(
", endTime="
).
append
(
endTime
);
return
sb
;
}
}
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java
View file @
54042d35
...
...
@@ -60,14 +60,14 @@ public class DataMigrationService implements Service {
@Inject
public
DataMigrationService
(
GraphDBMigrator
migrator
,
AtlasTypeDefStore
typeDefStore
,
Configuration
configuration
,
GraphBackedSearchIndexer
indexer
,
AtlasTypeDefStoreInitializer
storeInitializer
,
AtlasTypeRegistry
typeRegistry
,
ImportService
importService
)
{
AtlasTypeRegistry
typeRegistry
,
ImportService
importService
,
DataMigrationStatusService
dataMigrationStatusService
)
{
this
.
configuration
=
configuration
;
String
fileName
=
getFileName
();
boolean
zipFileBasedMigrationImport
=
StringUtils
.
endsWithIgnoreCase
(
fileName
,
FILE_EXTENSION_ZIP
);
this
.
thread
=
(
zipFileBasedMigrationImport
)
?
new
Thread
(
new
ZipFileMigrationImporter
(
importService
,
fileName
),
"zipFileBasedMigrationImporter"
)
?
new
Thread
(
new
ZipFileMigrationImporter
(
importService
,
fileName
,
dataMigrationStatusService
),
"zipFileBasedMigrationImporter"
)
:
new
Thread
(
new
FileImporter
(
migrator
,
typeDefStore
,
typeRegistry
,
storeInitializer
,
fileName
,
indexer
));
}
...
...
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
0 → 100644
View file @
54042d35
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
repository
.
migration
;
import
org.apache.atlas.annotation.AtlasService
;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.model.migration.MigrationImportStatus
;
import
org.apache.atlas.repository.ogm.DataAccess
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
javax.inject.Inject
;
@AtlasService
public
class
DataMigrationStatusService
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
DataMigrationStatusService
.
class
);
private
final
DataAccess
dataAccess
;
private
MigrationImportStatus
status
;
@Inject
public
DataMigrationStatusService
(
DataAccess
dataAccess
)
{
this
.
dataAccess
=
dataAccess
;
}
public
MigrationImportStatus
getCreate
(
MigrationImportStatus
status
)
{
try
{
this
.
status
=
this
.
dataAccess
.
load
(
status
);
this
.
status
.
setSize
(
status
.
getSize
());
this
.
status
.
setStartTime
(
status
.
getStartTime
());
this
.
status
=
dataAccess
.
save
(
this
.
status
);
}
catch
(
Exception
ex
)
{
LOG
.
info
(
"DataMigrationStatusService: Setting status: {}..."
,
status
.
getName
());
try
{
this
.
status
=
dataAccess
.
save
(
status
);
}
catch
(
AtlasBaseException
e
)
{
LOG
.
info
(
"DataMigrationStatusService: Error saving status: {}..."
,
status
.
getName
());
}
}
return
this
.
status
;
}
public
MigrationImportStatus
get
()
{
return
this
.
status
;
}
public
MigrationImportStatus
getByName
(
String
name
)
throws
AtlasBaseException
{
MigrationImportStatus
status
=
new
MigrationImportStatus
(
name
);
return
dataAccess
.
load
(
status
);
}
public
void
deleteStatus
()
throws
AtlasBaseException
{
if
(
this
.
status
==
null
)
{
return
;
}
MigrationImportStatus
status
=
getByName
(
this
.
status
.
getName
());
dataAccess
.
delete
(
status
.
getGuid
());
}
public
void
savePosition
(
String
position
)
{
this
.
status
.
setPosition
(
position
);
try
{
this
.
dataAccess
.
saveNoLoad
(
this
.
status
);
}
catch
(
AtlasBaseException
e
)
{
LOG
.
error
(
"Error saving status: {}"
,
position
,
e
);
}
}
public
void
setEndTime
()
{
this
.
status
.
setEndTime
(
System
.
currentTimeMillis
());
try
{
this
.
dataAccess
.
saveNoLoad
(
this
.
status
);
}
catch
(
AtlasBaseException
e
)
{
LOG
.
error
(
"Error saving status: endTime"
,
e
);
}
}
public
MigrationImportStatus
createGet
(
String
fileToImport
,
int
streamSize
)
{
MigrationImportStatus
status
=
new
MigrationImportStatus
(
fileToImport
);
status
.
setSize
(
streamSize
);
return
getCreate
(
status
);
}
}
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
View file @
54042d35
...
...
@@ -23,6 +23,7 @@ import org.apache.atlas.AtlasException;
import
org.apache.atlas.RequestContext
;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.model.impexp.AtlasImportRequest
;
import
org.apache.atlas.model.migration.MigrationImportStatus
;
import
org.apache.atlas.repository.impexp.ImportService
;
import
org.apache.commons.lang.StringUtils
;
import
org.slf4j.Logger
;
...
...
@@ -50,20 +51,23 @@ public class ZipFileMigrationImporter implements Runnable {
private
final
ImportService
importService
;
private
final
String
fileToImport
;
private
DataMigrationStatusService
dataMigrationStatusService
;
public
ZipFileMigrationImporter
(
ImportService
importService
,
String
fileName
)
{
public
ZipFileMigrationImporter
(
ImportService
importService
,
String
fileName
,
DataMigrationStatusService
dataMigrationStatusService
)
{
this
.
importService
=
importService
;
this
.
fileToImport
=
fileName
;
this
.
dataMigrationStatusService
=
dataMigrationStatusService
;
}
@Override
public
void
run
()
{
try
{
FileWatcher
fileWatcher
=
new
FileWatcher
(
fileToImport
);
fileWatcher
.
start
();
detectFileToImport
();
int
streamSize
=
getStreamSizeFromComment
(
fileToImport
);
performImport
(
new
FileInputStream
(
new
File
(
fileToImport
)),
streamSize
);
MigrationImportStatus
status
=
dataMigrationStatusService
.
createGet
(
fileToImport
,
streamSize
);
performImport
(
new
FileInputStream
(
new
File
(
fileToImport
)),
status
.
getPosition
(),
streamSize
);
dataMigrationStatusService
.
setEndTime
();
}
catch
(
IOException
e
)
{
LOG
.
error
(
"Migration Import: IO Error!"
,
e
);
}
catch
(
AtlasBaseException
e
)
{
...
...
@@ -71,6 +75,11 @@ public class ZipFileMigrationImporter implements Runnable {
}
}
private
void
detectFileToImport
()
throws
IOException
{
FileWatcher
fileWatcher
=
new
FileWatcher
(
fileToImport
);
fileWatcher
.
start
();
}
private
int
getStreamSizeFromComment
(
String
fileToImport
)
{
int
ret
=
1
;
try
{
...
...
@@ -96,13 +105,13 @@ public class ZipFileMigrationImporter implements Runnable {
return
Integer
.
valueOf
(
s
);
}
private
void
performImport
(
InputStream
fs
,
int
streamSize
)
throws
AtlasBaseException
{
private
void
performImport
(
InputStream
fs
,
String
position
,
int
streamSize
)
throws
AtlasBaseException
{
try
{
LOG
.
info
(
"Migration Import: {}:
Starting..."
,
fileToImport
);
LOG
.
info
(
"Migration Import: {}:
Position: {}: Starting..."
,
fileToImport
,
position
);
RequestContext
.
get
().
setUser
(
getUserNameFromEnvironment
(),
null
);
importService
.
run
(
fs
,
getImportRequest
(
streamSize
),
importService
.
run
(
fs
,
getImportRequest
(
streamSize
,
position
),
getUserNameFromEnvironment
(),
InetAddress
.
getLocalHost
().
getHostName
(),
InetAddress
.
getLocalHost
().
getHostAddress
());
...
...
@@ -112,6 +121,7 @@ public class ZipFileMigrationImporter implements Runnable {
throw
new
AtlasBaseException
(
ex
);
}
finally
{
LOG
.
info
(
"Migration Import: {}: Done!"
,
fileToImport
);
dataMigrationStatusService
.
deleteStatus
();
}
}
...
...
@@ -119,14 +129,19 @@ public class ZipFileMigrationImporter implements Runnable {
return
System
.
getProperty
(
ENV_USER_NAME
);
}
private
AtlasImportRequest
getImportRequest
(
int
streamSize
)
throws
AtlasException
{
private
AtlasImportRequest
getImportRequest
(
int
streamSize
,
String
position
)
throws
AtlasException
{
AtlasImportRequest
request
=
new
AtlasImportRequest
();
request
.
setSizeOption
(
streamSize
);
request
.
setOption
(
AtlasImportRequest
.
OPTION_KEY_MIGRATION
,
"true"
);
request
.
setOption
(
AtlasImportRequest
.
OPTION_KEY_NUM_WORKERS
,
getPropertyValue
(
APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS
,
DEFAULT_NUMBER_OF_WORKDERS
));
request
.
setOption
(
AtlasImportRequest
.
OPTION_KEY_BATCH_SIZE
,
getPropertyValue
(
APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE
,
DEFAULT_BATCH_SIZE
));
request
.
setOption
(
AtlasImportRequest
.
START_POSITION_KEY
,
Integer
.
toString
(
MIGRATION_IMPORT_START_POSITION
.
getInt
()));
request
.
setOption
(
AtlasImportRequest
.
START_POSITION_KEY
,
(
StringUtils
.
isEmpty
(
position
)
?
Integer
.
toString
(
MIGRATION_IMPORT_START_POSITION
.
getInt
())
:
position
)
);
return
request
;
}
...
...
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/ogm/MigrationImportStatusDTO.java
0 → 100644
View file @
54042d35
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
repository
.
ogm
;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.model.instance.AtlasEntity
;
import
org.apache.atlas.model.migration.MigrationImportStatus
;
import
org.apache.atlas.repository.Constants
;
import
org.apache.atlas.type.AtlasTypeRegistry
;
import
org.springframework.stereotype.Component
;
import
javax.inject.Inject
;
import
java.util.Arrays
;
import
java.util.Collections
;
import
java.util.HashSet
;
import
java.util.Map
;
import
java.util.Set
;
@Component
public
class
MigrationImportStatusDTO
extends
AbstractDataTransferObject
<
MigrationImportStatus
>
{
public
static
final
String
PROPERTY_NAME
=
"name"
;
public
static
final
String
PROPERTY_SIZE
=
"size"
;
public
static
final
String
PROPERTY_POSITION
=
"position"
;
public
static
final
String
PROPERTY_START_TIME
=
"startTime"
;
public
static
final
String
PROPERTY_END_TIME
=
"endTime"
;
public
static
final
String
PROPERTY_ADDITIONAL_INFO
=
"additionalInfo"
;
private
static
final
Set
<
String
>
ATTRIBUTE_NAMES
=
new
HashSet
<>(
Arrays
.
asList
(
PROPERTY_NAME
,
PROPERTY_SIZE
,
PROPERTY_POSITION
,
PROPERTY_START_TIME
,
PROPERTY_END_TIME
,
PROPERTY_ADDITIONAL_INFO
));
@Inject
public
MigrationImportStatusDTO
(
AtlasTypeRegistry
typeRegistry
)
{
super
(
typeRegistry
,
MigrationImportStatus
.
class
,
Constants
.
INTERNAL_PROPERTY_KEY_PREFIX
+
MigrationImportStatus
.
class
.
getSimpleName
());
}
public
static
Set
<
String
>
getAttributes
()
{
return
ATTRIBUTE_NAMES
;
}
public
static
MigrationImportStatus
from
(
String
guid
,
Map
<
String
,
Object
>
attributes
)
{
MigrationImportStatus
entry
=
new
MigrationImportStatus
();
entry
.
setGuid
(
guid
);
entry
.
setName
((
String
)
attributes
.
get
(
PROPERTY_NAME
));
entry
.
setSize
((
int
)
attributes
.
get
(
PROPERTY_SIZE
));
entry
.
setPosition
((
String
)
attributes
.
get
(
PROPERTY_POSITION
));
entry
.
setStartTime
((
long
)
attributes
.
get
(
PROPERTY_START_TIME
));
entry
.
setEndTime
((
long
)
attributes
.
get
(
PROPERTY_END_TIME
));
return
entry
;
}
@Override
public
MigrationImportStatus
from
(
AtlasEntity
entity
)
{
return
from
(
entity
.
getGuid
(),
entity
.
getAttributes
());
}
@Override
public
MigrationImportStatus
from
(
AtlasEntity
.
AtlasEntityWithExtInfo
entityWithExtInfo
)
{
return
from
(
entityWithExtInfo
.
getEntity
());
}
@Override
public
AtlasEntity
toEntity
(
MigrationImportStatus
obj
)
{
AtlasEntity
entity
=
getDefaultAtlasEntity
(
obj
);
entity
.
setAttribute
(
PROPERTY_NAME
,
obj
.
getName
());
entity
.
setAttribute
(
PROPERTY_SIZE
,
obj
.
getSize
());
entity
.
setAttribute
(
PROPERTY_POSITION
,
obj
.
getPosition
());
entity
.
setAttribute
(
PROPERTY_START_TIME
,
obj
.
getStartTime
());
entity
.
setAttribute
(
PROPERTY_END_TIME
,
obj
.
getEndTime
());
return
entity
;
}
@Override
public
AtlasEntity
.
AtlasEntityWithExtInfo
toEntityWithExtInfo
(
MigrationImportStatus
obj
)
throws
AtlasBaseException
{
return
new
AtlasEntity
.
AtlasEntityWithExtInfo
(
toEntity
(
obj
));
}
@Override
public
Map
<
String
,
Object
>
getUniqueAttributes
(
final
MigrationImportStatus
obj
)
{
return
Collections
.
singletonMap
(
PROPERTY_NAME
,
obj
.
getName
());
}
}
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
View file @
54042d35
...
...
@@ -26,8 +26,8 @@ import org.apache.atlas.model.instance.AtlasEntityHeader;
import
org.apache.atlas.model.instance.AtlasObjectId
;
import
org.apache.atlas.model.instance.EntityMutationResponse
;
import
org.apache.atlas.repository.graph.AtlasGraphProvider
;
import
org.apache.atlas.repository.graphdb.AtlasGraph
;
import
org.apache.atlas.repository.graphdb.AtlasVertex
;
import
org.apache.atlas.repository.migration.DataMigrationStatusService
;
import
org.apache.atlas.repository.store.graph.AtlasEntityStore
;
import
org.apache.atlas.repository.store.graph.BulkImporter
;
import
org.apache.atlas.repository.store.graph.v2.bulkimport.ImportStrategy
;
...
...
@@ -53,11 +53,13 @@ public class BulkImporterImpl implements BulkImporter {
private
final
AtlasEntityStore
entityStore
;
private
AtlasTypeRegistry
typeRegistry
;
private
DataMigrationStatusService
dataMigrationStatusService
;
@Inject
public
BulkImporterImpl
(
AtlasEntityStore
entityStore
,
AtlasTypeRegistry
typeRegistry
)
{
public
BulkImporterImpl
(
AtlasEntityStore
entityStore
,
AtlasTypeRegistry
typeRegistry
,
DataMigrationStatusService
dataMigrationStatusService
)
{
this
.
entityStore
=
entityStore
;
this
.
typeRegistry
=
typeRegistry
;
this
.
dataMigrationStatusService
=
dataMigrationStatusService
;
}
@Override
...
...
@@ -65,7 +67,7 @@ public class BulkImporterImpl implements BulkImporter {
ImportStrategy
importStrategy
=
(
importResult
.
getRequest
().
getOptions
()
!=
null
&&
importResult
.
getRequest
().
getOptions
().
containsKey
(
AtlasImportRequest
.
OPTION_KEY_MIGRATION
))
?
new
MigrationImport
(
new
AtlasGraphProvider
(),
this
.
typeRegistry
)
?
new
MigrationImport
(
new
AtlasGraphProvider
(),
this
.
typeRegistry
,
dataMigrationStatusService
)
:
new
RegularImport
(
this
.
entityStore
,
this
.
typeRegistry
);
LOG
.
info
(
"BulkImportImpl: {}"
,
importStrategy
.
getClass
().
getSimpleName
());
...
...
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
View file @
54042d35
...
...
@@ -26,6 +26,7 @@ import org.apache.atlas.repository.converters.AtlasFormatConverters;
import
org.apache.atlas.repository.converters.AtlasInstanceConverter
;
import
org.apache.atlas.repository.graph.AtlasGraphProvider
;
import
org.apache.atlas.repository.graphdb.AtlasGraph
;
import
org.apache.atlas.repository.migration.DataMigrationStatusService
;
import
org.apache.atlas.repository.store.graph.AtlasEntityStore
;
import
org.apache.atlas.repository.store.graph.AtlasRelationshipStore
;
import
org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate
;
...
...
@@ -44,14 +45,16 @@ public class MigrationImport extends ImportStrategy {
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
MigrationImport
.
class
);
private
final
AtlasTypeRegistry
typeRegistry
;
private
final
DataMigrationStatusService
dataMigrationStatusService
;
private
AtlasGraph
atlasGraph
;
private
EntityGraphRetriever
entityGraphRetriever
;
private
EntityGraphMapper
entityGraphMapper
;
private
AtlasEntityStore
entityStore
;
public
MigrationImport
(
AtlasGraphProvider
atlasGraphProvider
,
AtlasTypeRegistry
typeRegistry
)
{
public
MigrationImport
(
AtlasGraphProvider
atlasGraphProvider
,
AtlasTypeRegistry
typeRegistry
,
DataMigrationStatusService
dataMigrationStatusService
)
{
this
.
typeRegistry
=
typeRegistry
;
setupEntityStore
(
atlasGraphProvider
,
typeRegistry
);
this
.
dataMigrationStatusService
=
dataMigrationStatusService
;
LOG
.
info
(
"MigrationImport: Using bulkLoading..."
);
}
...
...
@@ -67,11 +70,11 @@ public class MigrationImport extends ImportStrategy {
int
index
=
0
;
int
streamSize
=
entityStream
.
size
();
EntityMutationResponse
ret
=
new
EntityMutationResponse
();
EntityCreationManager
creationManager
=
createEntityCreationManager
(
atlasGraph
,
importResult
,
streamSize
);
EntityCreationManager
creationManager
=
createEntityCreationManager
(
atlasGraph
,
dataMigrationStatusService
,
importResult
,
streamSize
);
try
{
LOG
.
info
(
"Migration Import: Size: {}: Starting..."
,
streamSize
);
index
=
creationManager
.
read
(
entityStream
);
index
=
creationManager
.
read
(
entityStream
,
importResult
.
getRequest
().
getStartPosition
()
);
creationManager
.
drain
();
creationManager
.
extractResults
();
}
catch
(
Exception
ex
)
{
...
...
@@ -84,14 +87,14 @@ public class MigrationImport extends ImportStrategy {
return
ret
;
}
private
EntityCreationManager
createEntityCreationManager
(
AtlasGraph
threadedAtlasGraph
,
AtlasImportResult
importResult
,
int
streamSize
)
{
private
EntityCreationManager
createEntityCreationManager
(
AtlasGraph
threadedAtlasGraph
,
DataMigrationStatusService
dataMigrationStatusService
,
AtlasImportResult
importResult
,
int
streamSize
)
{
int
batchSize
=
importResult
.
getRequest
().
getOptionKeyBatchSize
();
int
numWorkers
=
getNumWorkers
(
importResult
.
getRequest
().
getOptionKeyNumWorkers
());
EntityConsumerBuilder
consumerBuilder
=
new
EntityConsumerBuilder
(
threadedAtlasGraph
,
entityStore
,
entityGraphRetriever
,
typeRegistry
,
batchSize
);
return
new
EntityCreationManager
(
consumerBuilder
,
batchSize
,
numWorkers
,
importResult
,
streamSize
);
return
new
EntityCreationManager
(
consumerBuilder
,
batchSize
,
numWorkers
,
dataMigrationStatusService
,
importResult
,
streamSize
);
}
private
static
int
getNumWorkers
(
int
numWorkersFromOptions
)
{
...
...
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
View file @
54042d35
...
...
@@ -22,6 +22,7 @@ import org.apache.atlas.model.impexp.AtlasImportResult;
import
org.apache.atlas.model.instance.AtlasEntity
;
import
org.apache.atlas.pc.WorkItemBuilder
;
import
org.apache.atlas.pc.WorkItemManager
;
import
org.apache.atlas.repository.migration.DataMigrationStatusService
;
import
org.apache.atlas.repository.store.graph.v2.BulkImporterImpl
;
import
org.apache.atlas.repository.store.graph.v2.EntityImportStream
;
import
org.apache.commons.lang.StringUtils
;
...
...
@@ -30,25 +31,27 @@ import org.slf4j.LoggerFactory;
public
class
EntityCreationManager
<
AtlasEntityWithExtInfo
>
extends
WorkItemManager
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
EntityCreationManager
.
class
);
private
static
final
long
STATUS_REPORT_TIMEOUT_DURATION
=
5
*
60
*
1000
;
// 5 min
private
static
final
String
WORKER_PREFIX
=
"migration-import"
;
private
final
StatusReporter
<
String
,
String
>
statusReporter
;
private
final
DataMigrationStatusService
dataMigrationStatusService
;
private
final
AtlasImportResult
importResult
;
private
final
int
streamSize
;
private
final
long
STATUS_REPORT_TIMEOUT_DURATION
=
5
*
60
*
1000
;
// 5 min
private
String
currentTypeName
;
private
float
currentPercent
;
public
EntityCreationManager
(
WorkItemBuilder
builder
,
int
batchSize
,
int
numWorkers
,
AtlasImportResult
importResult
,
int
streamSize
)
{
public
EntityCreationManager
(
WorkItemBuilder
builder
,
int
batchSize
,
int
numWorkers
,
DataMigrationStatusService
dataMigrationStatusService
,
AtlasImportResult
importResult
,
int
streamSize
)
{
super
(
builder
,
WORKER_PREFIX
,
batchSize
,
numWorkers
,
true
);
this
.
dataMigrationStatusService
=
dataMigrationStatusService
;
this
.
importResult
=
importResult
;
this
.
streamSize
=
streamSize
;
this
.
statusReporter
=
new
StatusReporter
<>(
STATUS_REPORT_TIMEOUT_DURATION
);
}
public
int
read
(
EntityImportStream
entityStream
)
{
int
currentIndex
=
0
;
public
int
read
(
EntityImportStream
entityStream
,
String
startPosition
)
{
int
currentIndex
=
StringUtils
.
isEmpty
(
startPosition
)
?
0
:
Integer
.
valueOf
(
startPosition
)
;
AtlasEntity
.
AtlasEntityWithExtInfo
entityWithExtInfo
;
while
((
entityWithExtInfo
=
entityStream
.
getNextEntityWithExtInfo
())
!=
null
)
{
AtlasEntity
entity
=
entityWithExtInfo
!=
null
?
entityWithExtInfo
.
getEntity
()
:
null
;
...
...
@@ -103,8 +106,10 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag
return
;
}
String
currentPosition
=
split
[
1
];
dataMigrationStatusService
.
savePosition
(
currentPosition
);
importResult
.
incrementMeticsCounter
(
split
[
0
]);
this
.
currentPercent
=
updateImportMetrics
(
split
[
0
],
Integer
.
parseInt
(
split
[
1
]
),
getStreamSize
(),
getCurrentPercent
());
this
.
currentPercent
=
updateImportMetrics
(
split
[
0
],
Integer
.
parseInt
(
currentPosition
),
getStreamSize
(),
getCurrentPercent
());
}
private
static
float
updateImportMetrics
(
String
typeNameGuid
,
int
currentIndex
,
int
streamSize
,
float
currentPercent
)
{
...
...
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment