Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
atlas
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
dataplatform
atlas
Commits
64c7a3be
Commit
64c7a3be
authored
6 years ago
by
skoritala
Committed by
Ashutosh Mestry
6 years ago
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
ATLAS-3251: Implement Patch to populate classification text for legacy data.
Signed-off-by:
Ashutosh Mestry
<
amestry@hortonworks.com
>
parent
4b6380fe
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
371 additions
and
176 deletions
+371
-176
AtlasPatchHandler.java
...rg/apache/atlas/repository/patches/AtlasPatchHandler.java
+2
-1
AtlasPatchManager.java
...rg/apache/atlas/repository/patches/AtlasPatchManager.java
+5
-3
ClassificationTextPatch.java
...che/atlas/repository/patches/ClassificationTextPatch.java
+81
-0
ConcurrentPatchProcessor.java
...he/atlas/repository/patches/ConcurrentPatchProcessor.java
+233
-0
PatchContext.java
...ava/org/apache/atlas/repository/patches/PatchContext.java
+5
-1
UniqueAttributePatch.java
...apache/atlas/repository/patches/UniqueAttributePatch.java
+34
-166
EntityGraphMapper.java
...he/atlas/repository/store/graph/v2/EntityGraphMapper.java
+11
-5
No files found.
repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java
View file @
64c7a3be
...
@@ -17,6 +17,7 @@
...
@@ -17,6 +17,7 @@
*/
*/
package
org
.
apache
.
atlas
.
repository
.
patches
;
package
org
.
apache
.
atlas
.
repository
.
patches
;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.model.patches.AtlasPatch.PatchStatus
;
import
org.apache.atlas.model.patches.AtlasPatch.PatchStatus
;
import
static
org
.
apache
.
atlas
.
model
.
patches
.
AtlasPatch
.
PatchStatus
.
UNKNOWN
;
import
static
org
.
apache
.
atlas
.
model
.
patches
.
AtlasPatch
.
PatchStatus
.
UNKNOWN
;
...
@@ -64,5 +65,5 @@ public abstract class AtlasPatchHandler {
...
@@ -64,5 +65,5 @@ public abstract class AtlasPatchHandler {
return
patchId
;
return
patchId
;
}
}
public
abstract
void
apply
();
public
abstract
void
apply
()
throws
AtlasBaseException
;
}
}
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
View file @
64c7a3be
...
@@ -22,6 +22,7 @@ import org.apache.atlas.model.patches.AtlasPatch;
...
@@ -22,6 +22,7 @@ import org.apache.atlas.model.patches.AtlasPatch;
import
org.apache.atlas.model.patches.AtlasPatch.PatchStatus
;
import
org.apache.atlas.model.patches.AtlasPatch.PatchStatus
;
import
org.apache.atlas.repository.graph.GraphBackedSearchIndexer
;
import
org.apache.atlas.repository.graph.GraphBackedSearchIndexer
;
import
org.apache.atlas.repository.graphdb.AtlasGraph
;
import
org.apache.atlas.repository.graphdb.AtlasGraph
;
import
org.apache.atlas.repository.store.graph.v2.EntityGraphMapper
;
import
org.apache.atlas.type.AtlasTypeRegistry
;
import
org.apache.atlas.type.AtlasTypeRegistry
;
import
org.slf4j.Logger
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.slf4j.LoggerFactory
;
...
@@ -39,8 +40,8 @@ public class AtlasPatchManager {
...
@@ -39,8 +40,8 @@ public class AtlasPatchManager {
private
final
PatchContext
context
;
private
final
PatchContext
context
;
@Inject
@Inject
public
AtlasPatchManager
(
AtlasGraph
atlasGraph
,
AtlasTypeRegistry
typeRegistry
,
GraphBackedSearchIndexer
indexer
)
{
public
AtlasPatchManager
(
AtlasGraph
atlasGraph
,
AtlasTypeRegistry
typeRegistry
,
GraphBackedSearchIndexer
indexer
,
EntityGraphMapper
entityGraphMapper
)
{
this
.
context
=
new
PatchContext
(
atlasGraph
,
typeRegistry
,
indexer
);
this
.
context
=
new
PatchContext
(
atlasGraph
,
typeRegistry
,
indexer
,
entityGraphMapper
);
}
}
public
AtlasPatch
.
AtlasPatches
getAllPatches
()
{
public
AtlasPatch
.
AtlasPatches
getAllPatches
()
{
...
@@ -49,7 +50,8 @@ public class AtlasPatchManager {
...
@@ -49,7 +50,8 @@ public class AtlasPatchManager {
public
void
applyAll
()
{
public
void
applyAll
()
{
final
AtlasPatchHandler
handlers
[]
=
{
final
AtlasPatchHandler
handlers
[]
=
{
new
UniqueAttributePatch
(
context
)
new
UniqueAttributePatch
(
context
),
new
ClassificationTextPatch
(
context
)
};
};
try
{
try
{
...
...
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/patches/ClassificationTextPatch.java
0 → 100644
View file @
64c7a3be
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
repository
.
patches
;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.repository.graphdb.AtlasVertex
;
import
org.apache.atlas.type.AtlasEntityType
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
static
org
.
apache
.
atlas
.
model
.
patches
.
AtlasPatch
.
PatchStatus
.
APPLIED
;
public
class
ClassificationTextPatch
extends
AtlasPatchHandler
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
ClassificationTextPatch
.
class
);
private
static
final
String
PATCH_ID
=
"JAVA_PATCH_0000_002"
;
private
static
final
String
PATCH_DESCRIPTION
=
"Populates Classification Text attribute for entities from classifications applied on them."
;
private
final
PatchContext
context
;
public
ClassificationTextPatch
(
PatchContext
context
)
{
super
(
context
.
getPatchRegistry
(),
PATCH_ID
,
PATCH_DESCRIPTION
);
this
.
context
=
context
;
}
@Override
public
void
apply
()
throws
AtlasBaseException
{
ConcurrentPatchProcessor
patchProcessor
=
new
ClassificationTextPatchProcessor
(
context
);
patchProcessor
.
apply
();
setStatus
(
APPLIED
);
LOG
.
info
(
"ClassificationTextPatch.apply(): patchId={}, status={}"
,
getPatchId
(),
getStatus
());
}
public
static
class
ClassificationTextPatchProcessor
extends
ConcurrentPatchProcessor
{
public
ClassificationTextPatchProcessor
(
PatchContext
context
)
{
super
(
context
);
}
@Override
protected
void
processVertexItem
(
Long
vertexId
,
AtlasVertex
vertex
,
String
typeName
,
AtlasEntityType
entityType
)
throws
AtlasBaseException
{
processItem
(
vertexId
,
vertex
,
typeName
,
entityType
);
}
@Override
protected
void
prepareForExecution
()
{
//do nothing
}
protected
void
processItem
(
Long
vertexId
,
AtlasVertex
vertex
,
String
typeName
,
AtlasEntityType
entityType
)
throws
AtlasBaseException
{
if
(
LOG
.
isDebugEnabled
())
{
LOG
.
debug
(
"processItem(typeName={}, vertexId={})"
,
typeName
,
vertexId
);
}
getEntityGraphMapper
().
updateClassificationText
(
vertex
);
if
(
LOG
.
isDebugEnabled
())
{
LOG
.
debug
(
"processItem(typeName={}, vertexId={}): Done!"
,
typeName
,
vertexId
);
}
}
}
}
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
0 → 100644
View file @
64c7a3be
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
repository
.
patches
;
import
org.apache.atlas.ApplicationProperties
;
import
org.apache.atlas.AtlasException
;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.model.instance.AtlasEntity
;
import
org.apache.atlas.pc.WorkItemBuilder
;
import
org.apache.atlas.pc.WorkItemConsumer
;
import
org.apache.atlas.pc.WorkItemManager
;
import
org.apache.atlas.repository.graph.GraphBackedSearchIndexer
;
import
org.apache.atlas.repository.graphdb.*
;
import
org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2
;
import
org.apache.atlas.repository.store.graph.v2.EntityGraphMapper
;
import
org.apache.atlas.type.AtlasEntityType
;
import
org.apache.atlas.type.AtlasTypeRegistry
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.Iterator
;
import
java.util.concurrent.BlockingQueue
;
import
java.util.concurrent.atomic.AtomicLong
;
public
abstract
class
ConcurrentPatchProcessor
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
ConcurrentPatchProcessor
.
class
);
private
static
final
String
NUM_WORKERS_PROPERTY
=
"atlas.patch.numWorkers"
;
private
static
final
String
BATCH_SIZE_PROPERTY
=
"atlas.patch.batchSize"
;
private
static
final
String
ATLAS_SOLR_SHARDS
=
"ATLAS_SOLR_SHARDS"
;
private
static
final
String
WORKER_NAME_PREFIX
=
"patchWorkItem"
;
private
static
final
int
NUM_WORKERS
;
private
static
final
int
BATCH_SIZE
;
private
final
EntityGraphMapper
entityGraphMapper
;
public
AtlasGraph
getGraph
()
{
return
graph
;
}
public
GraphBackedSearchIndexer
getIndexer
()
{
return
indexer
;
}
public
AtlasTypeRegistry
getTypeRegistry
()
{
return
typeRegistry
;
}
private
final
AtlasGraph
graph
;
private
final
GraphBackedSearchIndexer
indexer
;
private
final
AtlasTypeRegistry
typeRegistry
;
static
{
int
numWorkers
=
3
;
int
batchSize
=
300
;
try
{
numWorkers
=
ApplicationProperties
.
get
().
getInt
(
NUM_WORKERS_PROPERTY
,
getDefaultNumWorkers
());
batchSize
=
ApplicationProperties
.
get
().
getInt
(
BATCH_SIZE_PROPERTY
,
300
);
LOG
.
info
(
"UniqueAttributePatch: {}={}, {}={}"
,
NUM_WORKERS_PROPERTY
,
numWorkers
,
BATCH_SIZE_PROPERTY
,
batchSize
);
}
catch
(
Exception
e
)
{
LOG
.
error
(
"Error retrieving configuration."
,
e
);
}
NUM_WORKERS
=
numWorkers
;
BATCH_SIZE
=
batchSize
;
}
private
static
int
getDefaultNumWorkers
()
throws
AtlasException
{
return
ApplicationProperties
.
get
().
getInt
(
ATLAS_SOLR_SHARDS
,
1
)
*
3
;
}
public
ConcurrentPatchProcessor
(
PatchContext
context
)
{
this
.
graph
=
context
.
getGraph
();
this
.
indexer
=
context
.
getIndexer
();
this
.
typeRegistry
=
context
.
getTypeRegistry
();
this
.
entityGraphMapper
=
context
.
getEntityGraphMapper
();
}
public
EntityGraphMapper
getEntityGraphMapper
()
{
return
entityGraphMapper
;
}
public
void
apply
()
throws
AtlasBaseException
{
prepareForExecution
();
execute
();
}
private
void
execute
()
{
Iterable
<
Object
>
iterable
=
graph
.
query
().
vertexIds
();
WorkItemManager
manager
=
new
WorkItemManager
(
new
ConsumerBuilder
(
graph
,
typeRegistry
,
this
),
WORKER_NAME_PREFIX
,
BATCH_SIZE
,
NUM_WORKERS
,
false
);
try
{
for
(
Iterator
<
Object
>
iter
=
iterable
.
iterator
();
iter
.
hasNext
();
)
{
Object
vertexId
=
iter
.
next
();
submitForProcessing
((
Long
)
vertexId
,
manager
);
}
manager
.
drain
();
}
finally
{
try
{
manager
.
shutdown
();
}
catch
(
InterruptedException
e
)
{
LOG
.
error
(
"ConcurrentPatchProcessor.execute(): interrupted during WorkItemManager shutdown."
,
e
);
}
}
}
private
void
submitForProcessing
(
Long
vertexId
,
WorkItemManager
manager
)
{
manager
.
checkProduce
(
vertexId
);
}
private
static
class
ConsumerBuilder
implements
WorkItemBuilder
<
Consumer
,
Long
>
{
private
final
AtlasTypeRegistry
typeRegistry
;
private
final
AtlasGraph
graph
;
private
final
ConcurrentPatchProcessor
patchItemProcessor
;
public
ConsumerBuilder
(
AtlasGraph
graph
,
AtlasTypeRegistry
typeRegistry
,
ConcurrentPatchProcessor
patchItemProcessor
)
{
this
.
graph
=
graph
;
this
.
typeRegistry
=
typeRegistry
;
this
.
patchItemProcessor
=
patchItemProcessor
;
}
@Override
public
Consumer
build
(
BlockingQueue
<
Long
>
queue
)
{
return
new
Consumer
(
graph
,
typeRegistry
,
queue
,
patchItemProcessor
);
}
}
private
static
class
Consumer
extends
WorkItemConsumer
<
Long
>
{
private
int
MAX_COMMIT_RETRY_COUNT
=
3
;
private
final
AtlasGraph
graph
;
private
final
AtlasTypeRegistry
typeRegistry
;
private
final
AtomicLong
counter
;
private
final
ConcurrentPatchProcessor
individualItemProcessor
;
public
Consumer
(
AtlasGraph
graph
,
AtlasTypeRegistry
typeRegistry
,
BlockingQueue
<
Long
>
queue
,
ConcurrentPatchProcessor
individualItemProcessor
)
{
super
(
queue
);
this
.
graph
=
graph
;
this
.
typeRegistry
=
typeRegistry
;
this
.
counter
=
new
AtomicLong
(
0
);
this
.
individualItemProcessor
=
individualItemProcessor
;
}
@Override
protected
void
doCommit
()
{
if
(
counter
.
get
()
%
BATCH_SIZE
==
0
)
{
LOG
.
info
(
"Processed: {}"
,
counter
.
get
());
attemptCommit
();
}
}
@Override
protected
void
commitDirty
()
{
attemptCommit
();
LOG
.
info
(
"Total: Commit: {}"
,
counter
.
get
());
super
.
commitDirty
();
}
private
void
attemptCommit
()
{
for
(
int
retryCount
=
1
;
retryCount
<=
MAX_COMMIT_RETRY_COUNT
;
retryCount
++)
{
try
{
graph
.
commit
();
break
;
}
catch
(
Exception
ex
)
{
LOG
.
error
(
"Commit exception: "
,
retryCount
,
ex
);
try
{
Thread
.
currentThread
().
sleep
(
300
*
retryCount
);
}
catch
(
InterruptedException
e
)
{
LOG
.
error
(
"Commit exception: Pause: Interrputed!"
,
e
);
}
}
}
}
@Override
protected
void
processItem
(
Long
vertexId
)
{
counter
.
incrementAndGet
();
AtlasVertex
vertex
=
graph
.
getVertex
(
Long
.
toString
(
vertexId
));
if
(
vertex
==
null
)
{
LOG
.
warn
(
"processItem(vertexId={}): AtlasVertex not found!"
,
vertexId
);
return
;
}
if
(
AtlasGraphUtilsV2
.
isTypeVertex
(
vertex
))
{
return
;
}
if
(
AtlasGraphUtilsV2
.
getState
(
vertex
)
!=
AtlasEntity
.
Status
.
ACTIVE
)
{
return
;
}
String
typeName
=
AtlasGraphUtilsV2
.
getTypeName
(
vertex
);
AtlasEntityType
entityType
=
typeRegistry
.
getEntityTypeByName
(
typeName
);
if
(
entityType
==
null
)
{
return
;
}
try
{
individualItemProcessor
.
processVertexItem
(
vertexId
,
vertex
,
typeName
,
entityType
);
}
catch
(
AtlasBaseException
e
)
{
LOG
.
error
(
"Error processing: {}"
,
vertexId
,
e
);
}
}
}
protected
abstract
void
processVertexItem
(
Long
vertexId
,
AtlasVertex
vertex
,
String
typeName
,
AtlasEntityType
entityType
)
throws
AtlasBaseException
;
protected
abstract
void
prepareForExecution
()
throws
AtlasBaseException
;
}
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java
View file @
64c7a3be
...
@@ -20,6 +20,7 @@ package org.apache.atlas.repository.patches;
...
@@ -20,6 +20,7 @@ package org.apache.atlas.repository.patches;
import
org.apache.atlas.repository.graph.GraphBackedSearchIndexer
;
import
org.apache.atlas.repository.graph.GraphBackedSearchIndexer
;
import
org.apache.atlas.repository.graphdb.AtlasGraph
;
import
org.apache.atlas.repository.graphdb.AtlasGraph
;
import
org.apache.atlas.repository.store.graph.v2.EntityGraphMapper
;
import
org.apache.atlas.type.AtlasTypeRegistry
;
import
org.apache.atlas.type.AtlasTypeRegistry
;
public
class
PatchContext
{
public
class
PatchContext
{
...
@@ -27,12 +28,14 @@ public class PatchContext {
...
@@ -27,12 +28,14 @@ public class PatchContext {
private
final
AtlasTypeRegistry
typeRegistry
;
private
final
AtlasTypeRegistry
typeRegistry
;
private
final
GraphBackedSearchIndexer
indexer
;
private
final
GraphBackedSearchIndexer
indexer
;
private
final
AtlasPatchRegistry
patchRegistry
;
private
final
AtlasPatchRegistry
patchRegistry
;
private
final
EntityGraphMapper
entityGraphMapper
;
public
PatchContext
(
AtlasGraph
graph
,
AtlasTypeRegistry
typeRegistry
,
GraphBackedSearchIndexer
indexer
)
{
public
PatchContext
(
AtlasGraph
graph
,
AtlasTypeRegistry
typeRegistry
,
GraphBackedSearchIndexer
indexer
,
EntityGraphMapper
entityGraphMapper
)
{
this
.
graph
=
graph
;
this
.
graph
=
graph
;
this
.
typeRegistry
=
typeRegistry
;
this
.
typeRegistry
=
typeRegistry
;
this
.
indexer
=
indexer
;
this
.
indexer
=
indexer
;
this
.
patchRegistry
=
new
AtlasPatchRegistry
(
this
.
graph
);
this
.
patchRegistry
=
new
AtlasPatchRegistry
(
this
.
graph
);
this
.
entityGraphMapper
=
entityGraphMapper
;
}
}
public
AtlasGraph
getGraph
()
{
public
AtlasGraph
getGraph
()
{
...
@@ -50,4 +53,5 @@ public class PatchContext {
...
@@ -50,4 +53,5 @@ public class PatchContext {
public
AtlasPatchRegistry
getPatchRegistry
()
{
public
AtlasPatchRegistry
getPatchRegistry
()
{
return
patchRegistry
;
return
patchRegistry
;
}
}
public
EntityGraphMapper
getEntityGraphMapper
()
{
return
entityGraphMapper
;}
}
}
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
View file @
64c7a3be
...
@@ -19,16 +19,11 @@ package org.apache.atlas.repository.patches;
...
@@ -19,16 +19,11 @@ package org.apache.atlas.repository.patches;
import
org.apache.atlas.ApplicationProperties
;
import
org.apache.atlas.ApplicationProperties
;
import
org.apache.atlas.AtlasException
;
import
org.apache.atlas.AtlasException
;
import
org.apache.atlas.
model.instance.AtlasEntity
;
import
org.apache.atlas.
exception.AtlasBaseException
;
import
org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef
;
import
org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef
;
import
org.apache.atlas.pc.WorkItemBuilder
;
import
org.apache.atlas.pc.WorkItemConsumer
;
import
org.apache.atlas.pc.WorkItemManager
;
import
org.apache.atlas.repository.IndexException
;
import
org.apache.atlas.repository.IndexException
;
import
org.apache.atlas.repository.graph.GraphBackedSearchIndexer
;
import
org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind
;
import
org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind
;
import
org.apache.atlas.repository.graphdb.AtlasCardinality
;
import
org.apache.atlas.repository.graphdb.AtlasCardinality
;
import
org.apache.atlas.repository.graphdb.AtlasGraph
;
import
org.apache.atlas.repository.graphdb.AtlasGraphManagement
;
import
org.apache.atlas.repository.graphdb.AtlasGraphManagement
;
import
org.apache.atlas.repository.graphdb.AtlasSchemaViolationException
;
import
org.apache.atlas.repository.graphdb.AtlasSchemaViolationException
;
import
org.apache.atlas.repository.graphdb.AtlasVertex
;
import
org.apache.atlas.repository.graphdb.AtlasVertex
;
...
@@ -36,15 +31,11 @@ import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
...
@@ -36,15 +31,11 @@ import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import
org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever
;
import
org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever
;
import
org.apache.atlas.type.AtlasEntityType
;
import
org.apache.atlas.type.AtlasEntityType
;
import
org.apache.atlas.type.AtlasStructType.AtlasAttribute
;
import
org.apache.atlas.type.AtlasStructType.AtlasAttribute
;
import
org.apache.atlas.type.AtlasTypeRegistry
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.slf4j.LoggerFactory
;
import
java.util.Collection
;
import
java.util.Collection
;
import
java.util.Iterator
;
import
java.util.concurrent.BlockingQueue
;
import
java.util.concurrent.atomic.AtomicLong
;
import
static
org
.
apache
.
atlas
.
model
.
patches
.
AtlasPatch
.
PatchStatus
.
APPLIED
;
import
static
org
.
apache
.
atlas
.
model
.
patches
.
AtlasPatch
.
PatchStatus
.
APPLIED
;
import
static
org
.
apache
.
atlas
.
repository
.
store
.
graph
.
v2
.
AtlasGraphUtilsV2
.
getIdFromVertex
;
import
static
org
.
apache
.
atlas
.
repository
.
store
.
graph
.
v2
.
AtlasGraphUtilsV2
.
getIdFromVertex
;
...
@@ -64,8 +55,8 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
...
@@ -64,8 +55,8 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
}
}
@Override
@Override
public
void
apply
()
{
public
void
apply
()
throws
AtlasBaseException
{
UniqueAttribute
PatchProcessor
patchProcessor
=
new
UniqueAttributePatchProcessor
(
context
);
Concurrent
PatchProcessor
patchProcessor
=
new
UniqueAttributePatchProcessor
(
context
);
patchProcessor
.
apply
();
patchProcessor
.
apply
();
...
@@ -74,17 +65,13 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
...
@@ -74,17 +65,13 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
LOG
.
info
(
"UniqueAttributePatch.apply(): patchId={}, status={}"
,
getPatchId
(),
getStatus
());
LOG
.
info
(
"UniqueAttributePatch.apply(): patchId={}, status={}"
,
getPatchId
(),
getStatus
());
}
}
public
static
class
UniqueAttributePatchProcessor
{
public
static
class
UniqueAttributePatchProcessor
extends
ConcurrentPatchProcessor
{
private
static
final
String
NUM_WORKERS_PROPERTY
=
"atlas.patch.unique_attribute_patch.numWorkers"
;
private
static
final
String
NUM_WORKERS_PROPERTY
=
"atlas.patch.unique_attribute_patch.numWorkers"
;
private
static
final
String
BATCH_SIZE_PROPERTY
=
"atlas.patch.unique_attribute_patch.batchSize"
;
private
static
final
String
BATCH_SIZE_PROPERTY
=
"atlas.patch.unique_attribute_patch.batchSize"
;
private
static
final
String
ATLAS_SOLR_SHARDS
=
"ATLAS_SOLR_SHARDS"
;
private
static
final
String
ATLAS_SOLR_SHARDS
=
"ATLAS_SOLR_SHARDS"
;
private
static
final
int
NUM_WORKERS
;
private
static
final
int
NUM_WORKERS
;
private
static
final
int
BATCH_SIZE
;
private
static
final
int
BATCH_SIZE
;
private
final
AtlasGraph
graph
;
private
final
GraphBackedSearchIndexer
indexer
;
private
final
AtlasTypeRegistry
typeRegistry
;
static
{
static
{
int
numWorkers
=
3
;
int
numWorkers
=
3
;
int
batchSize
=
300
;
int
batchSize
=
300
;
...
@@ -103,37 +90,23 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
...
@@ -103,37 +90,23 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
}
}
public
UniqueAttributePatchProcessor
(
PatchContext
context
)
{
public
UniqueAttributePatchProcessor
(
PatchContext
context
)
{
this
.
graph
=
context
.
getGraph
();
super
(
context
);
this
.
indexer
=
context
.
getIndexer
();
this
.
typeRegistry
=
context
.
getTypeRegistry
();
}
}
public
void
apply
()
{
@Override
createIndexForUniqueAttributes
();
protected
void
processVertexItem
(
Long
vertexId
,
AtlasVertex
vertex
,
String
typeName
,
AtlasEntityType
entityType
)
{
addUniqueAttributeToAllVertices
();
//process the vertex
processItem
(
vertexId
,
vertex
,
typeName
,
entityType
);
}
}
private
void
addUniqueAttributeToAllVertices
()
{
@Override
Iterable
<
Object
>
iterable
=
graph
.
query
().
vertexIds
();
protected
void
prepareForExecution
()
{
WorkItemManager
manager
=
new
WorkItemManager
(
new
ConsumerBuilder
(
graph
,
typeRegistry
),
BATCH_SIZE
,
NUM_WORKERS
);
//create the new attribute for all unique attributes.
try
{
createIndexForUniqueAttributes
();
for
(
Iterator
<
Object
>
iter
=
iterable
.
iterator
();
iter
.
hasNext
();
)
{
Object
vertexId
=
iter
.
next
();
submitForProcessing
((
Long
)
vertexId
,
manager
);
}
manager
.
drain
();
}
finally
{
try
{
manager
.
shutdown
();
}
catch
(
InterruptedException
e
)
{
LOG
.
error
(
"UniqueAttributePatchProcessor.apply(): interrupted during WorkItemManager shutdown"
,
e
);
}
}
}
}
private
void
createIndexForUniqueAttributes
()
{
private
void
createIndexForUniqueAttributes
()
{
for
(
AtlasEntityType
entityType
:
typeRegistry
.
getAllEntityTypes
())
{
for
(
AtlasEntityType
entityType
:
getTypeRegistry
()
.
getAllEntityTypes
())
{
String
typeName
=
entityType
.
getTypeName
();
String
typeName
=
entityType
.
getTypeName
();
Collection
<
AtlasAttribute
>
uniqAttributes
=
entityType
.
getUniqAttributes
().
values
();
Collection
<
AtlasAttribute
>
uniqAttributes
=
entityType
.
getUniqAttributes
().
values
();
...
@@ -150,7 +123,7 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
...
@@ -150,7 +123,7 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
private
void
createIndexForUniqueAttributes
(
String
typeName
,
Collection
<
AtlasAttribute
>
attributes
)
{
private
void
createIndexForUniqueAttributes
(
String
typeName
,
Collection
<
AtlasAttribute
>
attributes
)
{
try
{
try
{
AtlasGraphManagement
management
=
g
raph
.
getManagementSystem
();
AtlasGraphManagement
management
=
g
etGraph
()
.
getManagementSystem
();
for
(
AtlasAttribute
attribute
:
attributes
)
{
for
(
AtlasAttribute
attribute
:
attributes
)
{
String
uniquePropertyName
=
attribute
.
getVertexUniquePropertyName
();
String
uniquePropertyName
=
attribute
.
getVertexUniquePropertyName
();
...
@@ -162,14 +135,14 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
...
@@ -162,14 +135,14 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
AtlasAttributeDef
attributeDef
=
attribute
.
getAttributeDef
();
AtlasAttributeDef
attributeDef
=
attribute
.
getAttributeDef
();
boolean
isIndexable
=
attributeDef
.
getIsIndexable
();
boolean
isIndexable
=
attributeDef
.
getIsIndexable
();
String
attribTypeName
=
attributeDef
.
getTypeName
();
String
attribTypeName
=
attributeDef
.
getTypeName
();
Class
propertyClass
=
indexer
.
getPrimitiveClass
(
attribTypeName
);
Class
propertyClass
=
getIndexer
()
.
getPrimitiveClass
(
attribTypeName
);
AtlasCardinality
cardinality
=
indexer
.
toAtlasCardinality
(
attributeDef
.
getCardinality
());
AtlasCardinality
cardinality
=
getIndexer
()
.
toAtlasCardinality
(
attributeDef
.
getCardinality
());
indexer
.
createVertexIndex
(
management
,
uniquePropertyName
,
UniqueKind
.
PER_TYPE_UNIQUE
,
propertyClass
,
cardinality
,
isIndexable
,
true
);
getIndexer
()
.
createVertexIndex
(
management
,
uniquePropertyName
,
UniqueKind
.
PER_TYPE_UNIQUE
,
propertyClass
,
cardinality
,
isIndexable
,
true
);
}
}
indexer
.
commit
(
management
);
getIndexer
()
.
commit
(
management
);
g
raph
.
commit
();
g
etGraph
()
.
commit
();
LOG
.
info
(
"Unique attributes: type: {}: Registered!"
,
typeName
);
LOG
.
info
(
"Unique attributes: type: {}: Registered!"
,
typeName
);
}
catch
(
IndexException
e
)
{
}
catch
(
IndexException
e
)
{
...
@@ -181,134 +154,29 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
...
@@ -181,134 +154,29 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
return
ApplicationProperties
.
get
().
getInt
(
ATLAS_SOLR_SHARDS
,
1
)
*
3
;
return
ApplicationProperties
.
get
().
getInt
(
ATLAS_SOLR_SHARDS
,
1
)
*
3
;
}
}
private
void
submitForProcessing
(
Long
vertexId
,
WorkItemManager
manager
)
{
protected
void
processItem
(
Long
vertexId
,
AtlasVertex
vertex
,
String
typeName
,
AtlasEntityType
entityType
)
{
manager
.
checkProduce
(
vertexId
);
LOG
.
debug
(
"processItem(typeName={}, vertexId={})"
,
typeName
,
vertexId
);
}
private
static
class
ConsumerBuilder
implements
WorkItemBuilder
<
Consumer
,
Long
>
{
private
final
AtlasTypeRegistry
typeRegistry
;
private
final
AtlasGraph
graph
;
public
ConsumerBuilder
(
AtlasGraph
graph
,
AtlasTypeRegistry
typeRegistry
)
{
this
.
graph
=
graph
;
this
.
typeRegistry
=
typeRegistry
;
}
@Override
public
Consumer
build
(
BlockingQueue
<
Long
>
queue
)
{
return
new
Consumer
(
graph
,
typeRegistry
,
queue
);
}
}
private
static
class
Consumer
extends
WorkItemConsumer
<
Long
>
{
private
int
MAX_COMMIT_RETRY_COUNT
=
3
;
private
final
AtlasGraph
graph
;
private
final
AtlasTypeRegistry
typeRegistry
;
private
final
AtomicLong
counter
;
public
Consumer
(
AtlasGraph
graph
,
AtlasTypeRegistry
typeRegistry
,
BlockingQueue
<
Long
>
queue
)
{
super
(
queue
);
this
.
graph
=
graph
;
this
.
typeRegistry
=
typeRegistry
;
this
.
counter
=
new
AtomicLong
(
0
);
}
@Override
protected
void
doCommit
()
{
if
(
counter
.
get
()
%
BATCH_SIZE
==
0
)
{
LOG
.
info
(
"Processed: {}"
,
counter
.
get
());
attemptCommit
();
}
}
@Override
for
(
AtlasAttribute
attribute
:
entityType
.
getUniqAttributes
().
values
())
{
protected
void
commitDirty
()
{
String
uniquePropertyKey
=
attribute
.
getVertexUniquePropertyName
();
attemptCommit
();
Collection
<?
extends
String
>
propertyKeys
=
vertex
.
getPropertyKeys
();
Object
uniqAttrValue
=
null
;
LOG
.
info
(
"Total: Commit: {}"
,
counter
.
get
());
if
(
propertyKeys
==
null
||
!
propertyKeys
.
contains
(
uniquePropertyKey
))
{
super
.
commitDirty
();
}
private
void
attemptCommit
()
{
for
(
int
retryCount
=
1
;
retryCount
<=
MAX_COMMIT_RETRY_COUNT
;
retryCount
++)
{
try
{
try
{
graph
.
commit
();
String
propertyKey
=
attribute
.
getVertexPropertyName
();
break
;
uniqAttrValue
=
EntityGraphRetriever
.
mapVertexToPrimitive
(
vertex
,
propertyKey
,
attribute
.
getAttributeDef
());
}
catch
(
Exception
ex
)
{
LOG
.
error
(
"Commit exception: "
,
retryCount
,
ex
);
try
{
AtlasGraphUtilsV2
.
setEncodedProperty
(
vertex
,
uniquePropertyKey
,
uniqAttrValue
);
Thread
.
currentThread
().
sleep
(
300
*
retryCount
);
}
catch
(
AtlasSchemaViolationException
ex
)
{
}
catch
(
InterruptedException
e
)
{
LOG
.
error
(
"Duplicates detected: {}:{}:{}"
,
typeName
,
uniqAttrValue
,
getIdFromVertex
(
vertex
));
LOG
.
error
(
"Commit exception: Pause: Interrputed!"
,
e
);
vertex
.
removeProperty
(
uniquePropertyKey
);
}
}
}
}
}
}
}
@Override
LOG
.
debug
(
"processItem(typeName={}, vertexId={}): Done!"
,
typeName
,
vertexId
);
protected
void
processItem
(
Long
vertexId
)
{
AtlasVertex
vertex
=
graph
.
getVertex
(
Long
.
toString
(
vertexId
));
if
(
vertex
==
null
)
{
LOG
.
warn
(
"processItem(vertexId={}): AtlasVertex not found!"
,
vertexId
);
return
;
}
if
(
AtlasGraphUtilsV2
.
isTypeVertex
(
vertex
))
{
return
;
}
if
(
AtlasGraphUtilsV2
.
getState
(
vertex
)
!=
AtlasEntity
.
Status
.
ACTIVE
)
{
return
;
}
String
typeName
=
AtlasGraphUtilsV2
.
getTypeName
(
vertex
);
AtlasEntityType
entityType
=
typeRegistry
.
getEntityTypeByName
(
typeName
);
if
(
entityType
==
null
)
{
return
;
}
processItem
(
vertexId
,
vertex
,
typeName
,
entityType
);
}
private
void
processItem
(
Long
vertexId
,
AtlasVertex
vertex
,
String
typeName
,
AtlasEntityType
entityType
)
{
try
{
counter
.
incrementAndGet
();
LOG
.
debug
(
"processItem(typeName={}, vertexId={})"
,
typeName
,
vertexId
);
for
(
AtlasAttribute
attribute
:
entityType
.
getUniqAttributes
().
values
())
{
String
uniquePropertyKey
=
attribute
.
getVertexUniquePropertyName
();
Collection
<?
extends
String
>
propertyKeys
=
vertex
.
getPropertyKeys
();
Object
uniqAttrValue
=
null
;
if
(
propertyKeys
==
null
||
!
propertyKeys
.
contains
(
uniquePropertyKey
))
{
try
{
String
propertyKey
=
attribute
.
getVertexPropertyName
();
uniqAttrValue
=
EntityGraphRetriever
.
mapVertexToPrimitive
(
vertex
,
propertyKey
,
attribute
.
getAttributeDef
());
AtlasGraphUtilsV2
.
setEncodedProperty
(
vertex
,
uniquePropertyKey
,
uniqAttrValue
);
}
catch
(
AtlasSchemaViolationException
ex
)
{
LOG
.
error
(
"Duplicates detected: {}:{}:{}"
,
typeName
,
uniqAttrValue
,
getIdFromVertex
(
vertex
));
vertex
.
removeProperty
(
uniquePropertyKey
);
}
}
}
LOG
.
debug
(
"processItem(typeName={}, vertexId={}): Done!"
,
typeName
,
vertexId
);
}
catch
(
Exception
ex
)
{
LOG
.
error
(
"processItem(typeName={}, vertexId={}): failed!"
,
typeName
,
vertexId
,
ex
);
}
finally
{
commit
();
}
}
}
}
}
}
}
}
This diff is collapsed.
Click to expand it.
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
View file @
64c7a3be
...
@@ -1675,16 +1675,22 @@ public class EntityGraphMapper {
...
@@ -1675,16 +1675,22 @@ public class EntityGraphMapper {
updateModificationMetadata
(
entityVertex
);
updateModificationMetadata
(
entityVertex
);
for
(
Map
.
Entry
<
AtlasVertex
,
List
<
AtlasClassification
>>
entry
:
removedClassifications
.
entrySet
())
{
for
(
Map
.
Entry
<
AtlasVertex
,
List
<
AtlasClassification
>>
entry
:
removedClassifications
.
entrySet
())
{
AtlasVertex
vertex
=
entry
.
getKey
();
AtlasEntity
entity
=
updateClassificationText
(
entry
.
getKey
());
String
guid
=
GraphHelper
.
getGuid
(
vertex
);
List
<
AtlasClassification
>
deletedClassificationNames
=
entry
.
getValue
();
AtlasEntity
entity
=
instanceConverter
.
getAndCacheEntity
(
guid
);
vertex
.
setProperty
(
CLASSIFICATION_TEXT_KEY
,
fullTextMapperV2
.
getClassificationTextForEntity
(
entity
)
);
List
<
AtlasClassification
>
deletedClassificationNames
=
entry
.
getValue
(
);
entityChangeNotifier
.
onClassificationDeletedFromEntity
(
entity
,
deletedClassificationNames
);
entityChangeNotifier
.
onClassificationDeletedFromEntity
(
entity
,
deletedClassificationNames
);
}
}
}
}
public
AtlasEntity
updateClassificationText
(
AtlasVertex
vertex
)
throws
AtlasBaseException
{
String
guid
=
GraphHelper
.
getGuid
(
vertex
);
AtlasEntity
entity
=
instanceConverter
.
getAndCacheEntity
(
guid
);
vertex
.
setProperty
(
CLASSIFICATION_TEXT_KEY
,
fullTextMapperV2
.
getClassificationTextForEntity
(
entity
));
return
entity
;
}
public
void
updateClassifications
(
EntityMutationContext
context
,
String
guid
,
List
<
AtlasClassification
>
classifications
)
throws
AtlasBaseException
{
public
void
updateClassifications
(
EntityMutationContext
context
,
String
guid
,
List
<
AtlasClassification
>
classifications
)
throws
AtlasBaseException
{
if
(
CollectionUtils
.
isEmpty
(
classifications
))
{
if
(
CollectionUtils
.
isEmpty
(
classifications
))
{
throw
new
AtlasBaseException
(
AtlasErrorCode
.
INVALID_CLASSIFICATION_PARAMS
,
"update"
,
guid
);
throw
new
AtlasBaseException
(
AtlasErrorCode
.
INVALID_CLASSIFICATION_PARAMS
,
"update"
,
guid
);
...
...
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment