Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
atlas
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
dataplatform
atlas
Commits
479b9ab1
Commit
479b9ab1
authored
6 years ago
by
apoorvnaik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
ATLAS-2667: Enhance GraphTransactionInterceptor to deal with nested/inner commits
Change-Id: I9ea29deb9aea226f077f4d008d459fdb3ac6663f
parent
6e7aa6ed
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
83 additions
and
19 deletions
+83
-19
GraphTransactionInterceptor.java
...in/java/org/apache/atlas/GraphTransactionInterceptor.java
+83
-19
No files found.
repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
View file @
479b9ab1
...
...
@@ -21,14 +21,15 @@ import com.google.common.annotations.VisibleForTesting;
import
org.aopalliance.intercept.MethodInterceptor
;
import
org.aopalliance.intercept.MethodInvocation
;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.repository.graphdb.AtlasGraph
;
import
org.apache.atlas.exception.NotFoundException
;
import
org.apache.atlas.repository.graphdb.AtlasGraph
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.stereotype.Component
;
import
javax.inject.Inject
;
import
javax.ws.rs.core.Response
;
import
java.lang.reflect.Method
;
import
java.util.ArrayList
;
import
java.util.Collections
;
import
java.util.List
;
...
...
@@ -41,8 +42,10 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
GraphTransactionInterceptor
.
class
);
@VisibleForTesting
private
static
final
ObjectUpdateSynchronizer
OBJECT_UPDATE_SYNCHRONIZER
=
new
ObjectUpdateSynchronizer
();
private
static
final
ThreadLocal
<
List
<
PostTransactionHook
>>
postTransactionHooks
=
new
ThreadLocal
<>();
private
static
final
ObjectUpdateSynchronizer
OBJECT_UPDATE_SYNCHRONIZER
=
new
ObjectUpdateSynchronizer
();
private
static
final
ThreadLocal
<
List
<
PostTransactionHook
>>
postTransactionHooks
=
new
ThreadLocal
<>();
private
static
final
ThreadLocal
<
Boolean
>
isTxnOpen
=
ThreadLocal
.
withInitial
(()
->
Boolean
.
FALSE
);
private
static
final
ThreadLocal
<
Boolean
>
innerFailure
=
ThreadLocal
.
withInitial
(()
->
Boolean
.
FALSE
);
private
final
AtlasGraph
graph
;
...
...
@@ -53,39 +56,72 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
@Override
public
Object
invoke
(
MethodInvocation
invocation
)
throws
Throwable
{
Method
method
=
invocation
.
getMethod
();
String
invokingClass
=
method
.
getDeclaringClass
().
getSimpleName
();
String
invokedMethodName
=
method
.
getName
();
boolean
isInnerTxn
=
isTxnOpen
.
get
();
// Outermost txn marks any subsequent transaction as inner
isTxnOpen
.
set
(
Boolean
.
TRUE
);
if
(
LOG
.
isDebugEnabled
()
&&
isInnerTxn
)
{
LOG
.
debug
(
"Txn entry-point {}.{} is inner txn. Commit/Rollback will be ignored"
,
invokingClass
,
invokedMethodName
);
}
boolean
isSuccess
=
false
;
try
{
try
{
Object
response
=
invocation
.
proceed
();
graph
.
commit
();
isSuccess
=
true
;
if
(
LOG
.
isDebugEnabled
())
{
LOG
.
debug
(
"graph commit"
);
if
(
isInnerTxn
)
{
if
(
LOG
.
isDebugEnabled
())
{
LOG
.
debug
(
"Ignoring commit for nested/inner transaction {}.{}"
,
invokingClass
,
invokedMethodName
);
}
}
else
{
doCommitOrRollback
(
invokingClass
,
invokedMethodName
);
}
isSuccess
=
!
innerFailure
.
get
();
return
response
;
}
catch
(
Throwable
t
)
{
if
(
logException
(
t
))
{
LOG
.
error
(
"graph rollback due to exception "
,
t
);
if
(
isInnerTxn
)
{
if
(
LOG
.
isDebugEnabled
())
{
LOG
.
debug
(
"Ignoring rollback for nested/inner transaction {}.{}"
,
invokingClass
,
invokedMethodName
);
}
innerFailure
.
set
(
true
);
}
else
{
LOG
.
error
(
"graph rollback due to exception {}:{}"
,
t
.
getClass
().
getSimpleName
(),
t
.
getMessage
()
);
doRollback
(
t
);
}
graph
.
rollback
();
throw
t
;
}
}
finally
{
List
<
PostTransactionHook
>
trxHooks
=
postTransactionHooks
.
get
();
// Only outer txn can mark as closed
if
(!
isInnerTxn
)
{
if
(
LOG
.
isDebugEnabled
())
{
LOG
.
debug
(
"Closing outer txn"
);
}
// Reset the boolean flags
isTxnOpen
.
set
(
Boolean
.
FALSE
);
innerFailure
.
set
(
Boolean
.
FALSE
);
List
<
PostTransactionHook
>
trxHooks
=
postTransactionHooks
.
get
();
if
(
trxHooks
!=
null
)
{
postTransactionHooks
.
remove
();
if
(
trxHooks
!=
null
)
{
if
(
LOG
.
isDebugEnabled
())
{
LOG
.
debug
(
"Processing post-txn hooks"
);
}
postTransactionHooks
.
remove
();
for
(
PostTransactionHook
trxHook
:
trxHooks
)
{
try
{
trxHook
.
onComplete
(
isSuccess
);
}
catch
(
Throwable
t
)
{
LOG
.
error
(
"postTransactionHook failed"
,
t
);
for
(
PostTransactionHook
trxHook
:
trxHooks
)
{
try
{
trxHook
.
onComplete
(
isSuccess
);
}
catch
(
Throwable
t
)
{
LOG
.
error
(
"postTransactionHook failed"
,
t
);
}
}
}
}
...
...
@@ -94,6 +130,34 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
}
}
private
void
doCommitOrRollback
(
final
String
invokingClass
,
final
String
invokedMethodName
)
{
if
(
innerFailure
.
get
())
{
if
(
LOG
.
isDebugEnabled
())
{
LOG
.
debug
(
"Inner/Nested call threw exception. Rollback on txn entry-point, {}.{}"
,
invokingClass
,
invokedMethodName
);
}
graph
.
rollback
();
}
else
{
doCommit
(
invokingClass
,
invokedMethodName
);
}
}
private
void
doCommit
(
final
String
invokingClass
,
final
String
invokedMethodName
)
{
graph
.
commit
();
if
(
LOG
.
isDebugEnabled
())
{
LOG
.
debug
(
"Graph commit txn {}.{}"
,
invokingClass
,
invokedMethodName
);
}
}
private
void
doRollback
(
final
Throwable
t
)
{
if
(
logException
(
t
))
{
LOG
.
error
(
"graph rollback due to exception "
,
t
);
}
else
{
LOG
.
error
(
"graph rollback due to exception {}:{}"
,
t
.
getClass
().
getSimpleName
(),
t
.
getMessage
());
}
graph
.
rollback
();
}
public
static
void
lockObjectAndReleasePostCommit
(
final
String
guid
)
{
OBJECT_UPDATE_SYNCHRONIZER
.
lockObject
(
guid
);
}
...
...
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment