Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
atlas
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
dataplatform
atlas
Commits
7527ca06
Commit
7527ca06
authored
Jan 04, 2019
by
nikhilbonte
Committed by
nixonrodrigues
Jan 29, 2019
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
ATLAS-2810:- Add notifications from RelationshipStore to Kafka ATLAS_ENTITITES topic.
parent
b84ed999
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
401 additions
and
19 deletions
+401
-19
AtlasConfiguration.java
intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+1
-0
EntityChangeListenerV2.java
...ava/org/apache/atlas/listener/EntityChangeListenerV2.java
+26
-0
AtlasRelationshipHeader.java
.../apache/atlas/model/instance/AtlasRelationshipHeader.java
+203
-0
EntityNotification.java
...g/apache/atlas/model/notification/EntityNotification.java
+25
-2
EntityAuditListenerV2.java
.../apache/atlas/repository/audit/EntityAuditListenerV2.java
+23
-2
AtlasEntityChangeNotifier.java
.../repository/store/graph/v2/AtlasEntityChangeNotifier.java
+54
-0
AtlasRelationshipStoreV2.java
...s/repository/store/graph/v2/AtlasRelationshipStoreV2.java
+15
-9
EntityGraphRetriever.java
...atlas/repository/store/graph/v2/EntityGraphRetriever.java
+2
-2
EntityNotificationListenerV2.java
...ache/atlas/notification/EntityNotificationListenerV2.java
+52
-4
No files found.
intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
View file @
7527ca06
...
...
@@ -34,6 +34,7 @@ public enum AtlasConfiguration {
QUERY_PARAM_MAX_LENGTH
(
"atlas.query.param.max.length"
,
4
*
1024
),
REST_API_ENABLE_DELETE_TYPE_OVERRIDE
(
"atlas.rest.enable.delete.type.override"
,
false
),
NOTIFICATION_RELATIONSHIPS_ENABLED
(
"atlas.notification.relationships.enabled"
,
false
),
NOTIFICATION_HOOK_TOPIC_NAME
(
"atlas.notification.hook.topic.name"
,
"ATLAS_HOOK"
),
NOTIFICATION_ENTITIES_TOPIC_NAME
(
"atlas.notification.entities.topic.name"
,
"ATLAS_ENTITIES"
),
...
...
intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
View file @
7527ca06
...
...
@@ -23,6 +23,7 @@ import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
import
org.apache.atlas.model.instance.AtlasClassification
;
import
org.apache.atlas.model.instance.AtlasEntity
;
import
org.apache.atlas.model.instance.AtlasRelatedObjectId
;
import
org.apache.atlas.model.instance.AtlasRelationship
;
import
java.util.List
;
...
...
@@ -96,4 +97,28 @@ public interface EntityChangeListenerV2 {
* @param entities list of entities to which the term is assigned
*/
void
onTermDeleted
(
AtlasGlossaryTerm
term
,
List
<
AtlasRelatedObjectId
>
entities
)
throws
AtlasBaseException
;
/**
* This is upon adding new relationships to the repository.
*
* @param relationships the created relationships
* @param isImport
*/
void
onRelationshipsAdded
(
List
<
AtlasRelationship
>
relationships
,
boolean
isImport
)
throws
AtlasBaseException
;
/**
* This is upon updating an relationships.
*
* @param relationships the updated relationships
* @param isImport
*/
void
onRelationshipsUpdated
(
List
<
AtlasRelationship
>
relationships
,
boolean
isImport
)
throws
AtlasBaseException
;
/**
* This is upon deleting relationships from the repository.
*
* @param relationships the deleted relationships
* @param isImport
*/
void
onRelationshipsDeleted
(
List
<
AtlasRelationship
>
relationships
,
boolean
isImport
)
throws
AtlasBaseException
;
}
\ No newline at end of file
intg/src/main/java/org/apache/atlas/model/instance/AtlasRelationshipHeader.java
0 → 100644
View file @
7527ca06
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
model
.
instance
;
import
com.fasterxml.jackson.annotation.JsonAutoDetect
;
import
com.fasterxml.jackson.annotation.JsonIgnoreProperties
;
import
com.fasterxml.jackson.databind.annotation.JsonSerialize
;
import
org.apache.atlas.model.PList
;
import
org.apache.atlas.model.SearchFilter.SortType
;
import
org.apache.atlas.model.typedef.AtlasRelationshipDef
;
import
javax.xml.bind.annotation.XmlAccessType
;
import
javax.xml.bind.annotation.XmlAccessorType
;
import
javax.xml.bind.annotation.XmlRootElement
;
import
javax.xml.bind.annotation.XmlSeeAlso
;
import
java.io.Serializable
;
import
java.util.List
;
import
java.util.Objects
;
import
static
com
.
fasterxml
.
jackson
.
annotation
.
JsonAutoDetect
.
Visibility
.
NONE
;
import
static
com
.
fasterxml
.
jackson
.
annotation
.
JsonAutoDetect
.
Visibility
.
PUBLIC_ONLY
;
@JsonAutoDetect
(
getterVisibility
=
PUBLIC_ONLY
,
setterVisibility
=
PUBLIC_ONLY
,
fieldVisibility
=
NONE
)
@JsonSerialize
(
include
=
JsonSerialize
.
Inclusion
.
NON_NULL
)
@JsonIgnoreProperties
(
ignoreUnknown
=
true
)
@XmlRootElement
@XmlAccessorType
(
XmlAccessType
.
PROPERTY
)
public
class
AtlasRelationshipHeader
extends
AtlasStruct
implements
Serializable
{
private
static
final
long
serialVersionUID
=
1L
;
private
String
guid
=
null
;
private
AtlasEntity
.
Status
status
=
AtlasEntity
.
Status
.
ACTIVE
;
private
AtlasRelationshipDef
.
PropagateTags
propagateTags
=
AtlasRelationshipDef
.
PropagateTags
.
NONE
;
private
String
label
=
null
;
private
AtlasObjectId
end1
=
null
;
private
AtlasObjectId
end2
=
null
;
public
AtlasRelationshipHeader
()
{
}
public
AtlasRelationshipHeader
(
String
typeName
,
String
guid
)
{
super
(
typeName
);
setGuid
(
guid
);
}
public
AtlasRelationshipHeader
(
String
typeName
,
String
guid
,
AtlasObjectId
end1
,
AtlasObjectId
end2
)
{
this
(
typeName
,
guid
);
setEnd1
(
end1
);
setEnd2
(
end2
);
}
public
AtlasRelationshipHeader
(
String
typeName
,
String
guid
,
AtlasObjectId
end1
,
AtlasObjectId
end2
,
AtlasEntity
.
Status
status
)
{
this
(
typeName
,
guid
,
end1
,
end2
);
setStatus
(
status
);
}
public
AtlasRelationshipHeader
(
AtlasRelationship
relationship
)
{
this
(
relationship
.
getTypeName
(),
relationship
.
getGuid
(),
relationship
.
getEnd1
(),
relationship
.
getEnd2
());
setLabel
(
relationship
.
getLabel
());
switch
(
relationship
.
getStatus
())
{
case
ACTIVE:
setStatus
(
AtlasEntity
.
Status
.
ACTIVE
);
break
;
case
DELETED:
setStatus
(
AtlasEntity
.
Status
.
DELETED
);
break
;
}
}
public
String
getGuid
()
{
return
guid
;
}
public
void
setGuid
(
String
guid
)
{
this
.
guid
=
guid
;
}
public
AtlasEntity
.
Status
getStatus
()
{
return
status
;
}
public
void
setStatus
(
AtlasEntity
.
Status
status
)
{
this
.
status
=
status
;
}
public
void
setPropagateTags
(
AtlasRelationshipDef
.
PropagateTags
propagateTags
)
{
this
.
propagateTags
=
propagateTags
;
}
public
String
getLabel
()
{
return
label
;
}
public
void
setLabel
(
String
label
)
{
this
.
label
=
label
;
}
public
AtlasObjectId
getEnd1
()
{
return
end1
;
}
public
void
setEnd1
(
AtlasObjectId
end1
)
{
this
.
end1
=
end1
;
}
public
AtlasObjectId
getEnd2
()
{
return
end2
;
}
public
void
setEnd2
(
AtlasObjectId
end2
)
{
this
.
end2
=
end2
;
}
public
StringBuilder
toString
(
StringBuilder
sb
)
{
if
(
sb
==
null
)
{
sb
=
new
StringBuilder
();
}
sb
.
append
(
"AtlasRelationshipHeader{"
);
sb
.
append
(
"guid='"
).
append
(
guid
).
append
(
'\''
);
sb
.
append
(
", status="
).
append
(
status
);
sb
.
append
(
", label="
).
append
(
label
);
sb
.
append
(
", propagateTags="
).
append
(
propagateTags
);
sb
.
append
(
", end1="
).
append
(
end1
);
sb
.
append
(
", end2="
).
append
(
end2
);
super
.
toString
(
sb
);
sb
.
append
(
'}'
);
return
sb
;
}
@Override
public
boolean
equals
(
Object
o
)
{
if
(
this
==
o
)
return
true
;
if
(
o
==
null
||
getClass
()
!=
o
.
getClass
())
return
false
;
if
(!
super
.
equals
(
o
))
return
false
;
AtlasRelationshipHeader
that
=
(
AtlasRelationshipHeader
)
o
;
return
Objects
.
equals
(
guid
,
that
.
guid
)
&&
status
==
that
.
status
&&
Objects
.
equals
(
label
,
that
.
label
)
&&
Objects
.
equals
(
propagateTags
,
that
.
propagateTags
)
&&
Objects
.
equals
(
end1
,
that
.
end1
)
&&
Objects
.
equals
(
end2
,
that
.
end2
);
}
@Override
public
int
hashCode
()
{
return
Objects
.
hash
(
super
.
hashCode
(),
guid
,
status
);
}
@Override
public
String
toString
()
{
return
toString
(
new
StringBuilder
()).
toString
();
}
/**
* REST serialization friendly list.
*/
@JsonAutoDetect
(
getterVisibility
=
PUBLIC_ONLY
,
setterVisibility
=
PUBLIC_ONLY
,
fieldVisibility
=
NONE
)
@JsonSerialize
(
include
=
JsonSerialize
.
Inclusion
.
NON_NULL
)
@JsonIgnoreProperties
(
ignoreUnknown
=
true
)
@XmlRootElement
@XmlAccessorType
(
XmlAccessType
.
PROPERTY
)
@XmlSeeAlso
(
AtlasEntity
.
class
)
public
static
class
AtlasRelationshipHeaders
extends
PList
<
AtlasRelationshipHeader
>
{
private
static
final
long
serialVersionUID
=
1L
;
public
AtlasRelationshipHeaders
()
{
super
();
}
public
AtlasRelationshipHeaders
(
List
<
AtlasRelationshipHeader
>
list
)
{
super
(
list
);
}
public
AtlasRelationshipHeaders
(
List
list
,
long
startIndex
,
int
pageSize
,
long
totalCount
,
SortType
sortType
,
String
sortBy
)
{
super
(
list
,
startIndex
,
pageSize
,
totalCount
,
sortType
,
sortBy
);
}
}
}
intg/src/main/java/org/apache/atlas/model/notification/EntityNotification.java
View file @
7527ca06
...
...
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect;
import
com.fasterxml.jackson.annotation.JsonIgnoreProperties
;
import
com.fasterxml.jackson.databind.annotation.JsonSerialize
;
import
org.apache.atlas.model.instance.AtlasEntityHeader
;
import
org.apache.atlas.model.instance.AtlasRelationshipHeader
;
import
javax.xml.bind.annotation.XmlAccessType
;
import
javax.xml.bind.annotation.XmlAccessorType
;
...
...
@@ -100,15 +101,21 @@ public class EntityNotification implements Serializable {
public
enum
OperationType
{
ENTITY_CREATE
,
ENTITY_UPDATE
,
ENTITY_DELETE
,
CLASSIFICATION_ADD
,
CLASSIFICATION_DELETE
,
CLASSIFICATION_UPDATE
CLASSIFICATION_ADD
,
CLASSIFICATION_DELETE
,
CLASSIFICATION_UPDATE
,
RELATIONSHIP_CREATE
,
RELATIONSHIP_UPDATE
,
RELATIONSHIP_DELETE
}
private
AtlasEntityHeader
entity
;
private
AtlasRelationshipHeader
relationship
;
private
OperationType
operationType
;
private
long
eventTime
;
public
EntityNotificationV2
()
{
this
(
null
,
null
,
System
.
currentTimeMillis
());
super
(
ENTITY_NOTIFICATION_V2
);
setEntity
(
null
);
setOperationType
(
null
);
setEventTime
(
System
.
currentTimeMillis
());
}
public
EntityNotificationV2
(
AtlasEntityHeader
entity
,
OperationType
operationType
)
{
...
...
@@ -123,6 +130,14 @@ public class EntityNotification implements Serializable {
setEventTime
(
eventTime
);
}
public
EntityNotificationV2
(
AtlasRelationshipHeader
relationship
,
OperationType
operationType
,
long
eventTime
)
{
super
(
ENTITY_NOTIFICATION_V2
);
setRelationship
(
relationship
);
setOperationType
(
operationType
);
setEventTime
(
eventTime
);
}
public
AtlasEntityHeader
getEntity
()
{
return
entity
;
}
...
...
@@ -131,6 +146,14 @@ public class EntityNotification implements Serializable {
this
.
entity
=
entity
;
}
public
AtlasRelationshipHeader
getRelationship
()
{
return
relationship
;
}
public
void
setRelationship
(
AtlasRelationshipHeader
relationship
)
{
this
.
relationship
=
relationship
;
}
public
OperationType
getOperationType
()
{
return
operationType
;
}
...
...
repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
View file @
7527ca06
...
...
@@ -27,6 +27,7 @@ import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
import
org.apache.atlas.model.instance.AtlasClassification
;
import
org.apache.atlas.model.instance.AtlasEntity
;
import
org.apache.atlas.model.instance.AtlasRelatedObjectId
;
import
org.apache.atlas.model.instance.AtlasRelationship
;
import
org.apache.atlas.repository.converters.AtlasInstanceConverter
;
import
org.apache.atlas.type.AtlasEntityType
;
import
org.apache.atlas.type.AtlasStructType.AtlasAttribute
;
...
...
@@ -462,4 +463,25 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
return
ret
;
}
}
\ No newline at end of file
@Override
public
void
onRelationshipsAdded
(
List
<
AtlasRelationship
>
relationships
,
boolean
isImport
)
throws
AtlasBaseException
{
if
(
LOG
.
isDebugEnabled
())
{
LOG
.
debug
(
"New relationship(s) added to repository("
+
relationships
.
size
()
+
")"
);
}
}
@Override
public
void
onRelationshipsUpdated
(
List
<
AtlasRelationship
>
relationships
,
boolean
isImport
)
throws
AtlasBaseException
{
if
(
LOG
.
isDebugEnabled
())
{
LOG
.
debug
(
"Relationship(s) updated("
+
relationships
.
size
()
+
")"
);
}
}
@Override
public
void
onRelationshipsDeleted
(
List
<
AtlasRelationship
>
relationships
,
boolean
isImport
)
throws
AtlasBaseException
{
if
(
LOG
.
isDebugEnabled
())
{
LOG
.
debug
(
"Relationship(s) deleted from repository("
+
relationships
.
size
()
+
")"
);
}
}
}
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
View file @
7527ca06
...
...
@@ -31,8 +31,10 @@ import org.apache.atlas.model.instance.AtlasEntity;
import
org.apache.atlas.model.instance.AtlasEntityHeader
;
import
org.apache.atlas.model.instance.AtlasRelatedObjectId
;
import
org.apache.atlas.model.instance.AtlasRelationship
;
import
org.apache.atlas.model.instance.EntityMutationResponse
;
import
org.apache.atlas.model.instance.EntityMutations.EntityOperation
;
import
org.apache.atlas.model.notification.EntityNotification
;
import
org.apache.atlas.type.AtlasEntityType
;
import
org.apache.atlas.type.AtlasTypeRegistry
;
import
org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder
;
...
...
@@ -112,6 +114,24 @@ public class AtlasEntityChangeNotifier {
notifyPropagatedEntities
();
}
public
void
notifyRelationshipMutation
(
AtlasRelationship
relationship
,
EntityNotification
.
EntityNotificationV2
.
OperationType
operationType
)
throws
AtlasBaseException
{
if
(
CollectionUtils
.
isEmpty
(
entityChangeListeners
)
||
instanceConverter
==
null
)
{
return
;
}
switch
(
operationType
)
{
case
RELATIONSHIP_CREATE:
notifyRelationshipListeners
(
Collections
.
singletonList
(
relationship
),
EntityOperation
.
CREATE
,
false
);
break
;
case
RELATIONSHIP_UPDATE:
notifyRelationshipListeners
(
Collections
.
singletonList
(
relationship
),
EntityOperation
.
UPDATE
,
false
);
break
;
case
RELATIONSHIP_DELETE:
notifyRelationshipListeners
(
Collections
.
singletonList
(
relationship
),
EntityOperation
.
DELETE
,
false
);
break
;
}
}
public
void
onClassificationAddedToEntity
(
AtlasEntity
entity
,
List
<
AtlasClassification
>
addedClassifications
)
throws
AtlasBaseException
{
if
(
isV2EntityNotificationEnabled
)
{
doFullTextMapping
(
entity
.
getGuid
());
...
...
@@ -284,6 +304,20 @@ public class AtlasEntityChangeNotifier {
}
}
private
void
notifyRelationshipListeners
(
List
<
AtlasRelationship
>
relationships
,
EntityOperation
operation
,
boolean
isImport
)
throws
AtlasBaseException
{
if
(
CollectionUtils
.
isEmpty
(
relationships
))
{
return
;
}
if
(
isV2EntityNotificationEnabled
)
{
notifyV2RelationshipListeners
(
relationships
,
operation
,
isImport
);
return
;
}
LOG
.
warn
(
"Relationships not supported by v1 notifications. {}"
,
relationships
);
}
private
void
notifyV1Listeners
(
List
<
AtlasEntityHeader
>
entityHeaders
,
EntityOperation
operation
,
boolean
isImport
)
throws
AtlasBaseException
{
List
<
Referenceable
>
typedRefInsts
=
toReferenceables
(
entityHeaders
,
operation
);
...
...
@@ -315,10 +349,12 @@ public class AtlasEntityChangeNotifier {
case
CREATE:
listener
.
onEntitiesAdded
(
entities
,
isImport
);
break
;
case
UPDATE:
case
PARTIAL_UPDATE:
listener
.
onEntitiesUpdated
(
entities
,
isImport
);
break
;
case
DELETE:
listener
.
onEntitiesDeleted
(
entities
,
isImport
);
break
;
...
...
@@ -326,6 +362,24 @@ public class AtlasEntityChangeNotifier {
}
}
private
void
notifyV2RelationshipListeners
(
List
<
AtlasRelationship
>
relationships
,
EntityOperation
operation
,
boolean
isImport
)
throws
AtlasBaseException
{
for
(
EntityChangeListenerV2
listener
:
entityChangeListenersV2
)
{
switch
(
operation
)
{
case
CREATE:
listener
.
onRelationshipsAdded
(
relationships
,
isImport
);
break
;
case
UPDATE:
case
PARTIAL_UPDATE:
listener
.
onRelationshipsUpdated
(
relationships
,
isImport
);
break
;
case
DELETE:
listener
.
onRelationshipsDeleted
(
relationships
,
isImport
);
break
;
}
}
}
private
List
<
Referenceable
>
toReferenceables
(
List
<
AtlasEntityHeader
>
entityHeaders
,
EntityOperation
operation
)
throws
AtlasBaseException
{
List
<
Referenceable
>
ret
=
new
ArrayList
<>(
entityHeaders
.
size
());
...
...
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
View file @
7527ca06
...
...
@@ -31,6 +31,7 @@ import org.apache.atlas.model.instance.AtlasEntityHeader;
import
org.apache.atlas.model.instance.AtlasObjectId
;
import
org.apache.atlas.model.instance.AtlasRelationship
;
import
org.apache.atlas.model.instance.AtlasRelationship.AtlasRelationshipWithExtInfo
;
import
org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2.OperationType
;
import
org.apache.atlas.model.typedef.AtlasRelationshipDef
;
import
org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags
;
import
org.apache.atlas.model.typedef.AtlasRelationshipEndDef
;
...
...
@@ -76,6 +77,7 @@ import static org.apache.atlas.repository.Constants.PROVENANCE_TYPE_KEY;
import
static
org
.
apache
.
atlas
.
repository
.
Constants
.
RELATIONSHIPTYPE_TAG_PROPAGATION_KEY
;
import
static
org
.
apache
.
atlas
.
repository
.
Constants
.
RELATIONSHIP_GUID_PROPERTY_KEY
;
import
static
org
.
apache
.
atlas
.
repository
.
Constants
.
VERSION_PROPERTY_KEY
;
import
static
org
.
apache
.
atlas
.
AtlasConfiguration
.
NOTIFICATION_RELATIONSHIPS_ENABLED
;
import
static
org
.
apache
.
atlas
.
repository
.
graph
.
GraphHelper
.
getBlockedClassificationIds
;
...
...
@@ -92,6 +94,7 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
AtlasRelationshipStoreV2
.
class
);
private
static
final
Long
DEFAULT_RELATIONSHIP_VERSION
=
0L
;
private
boolean
notificationsEnabled
=
NOTIFICATION_RELATIONSHIPS_ENABLED
.
getBoolean
();
private
final
AtlasTypeRegistry
typeRegistry
;
private
final
EntityGraphRetriever
entityRetriever
;
...
...
@@ -125,9 +128,6 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
LOG
.
debug
(
"<== create({}): {}"
,
relationship
,
ret
);
}
// notify entities for added/removed classification propagation
entityChangeNotifier
.
notifyPropagatedEntities
();
return
ret
;
}
...
...
@@ -195,9 +195,7 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
validateRelationship
(
end1Vertex
,
end2Vertex
,
edgeType
,
relationship
.
getAttributes
());
AtlasRelationship
ret
=
updateRelationship
(
edge
,
relationship
);
// notify entities for added/removed classification propagation
entityChangeNotifier
.
notifyPropagatedEntities
();
sendNotifications
(
ret
,
OperationType
.
RELATIONSHIP_UPDATE
);
if
(
LOG
.
isDebugEnabled
())
{
LOG
.
debug
(
"<== update({}): {}"
,
relationship
,
ret
);
...
...
@@ -277,8 +275,7 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
deleteDelegate
.
getHandler
().
deleteRelationships
(
Collections
.
singleton
(
edge
),
forceDelete
);
// notify entities for added/removed classification propagation
entityChangeNotifier
.
notifyPropagatedEntities
();
sendNotifications
(
entityRetriever
.
mapEdgeToAtlasRelationship
(
edge
),
OperationType
.
RELATIONSHIP_DELETE
);
if
(
LOG
.
isDebugEnabled
())
{
LOG
.
debug
(
"<== deleteById({}): {}"
,
guid
);
...
...
@@ -408,6 +405,7 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
throw
new
AtlasBaseException
(
AtlasErrorCode
.
INTERNAL_ERROR
,
e
);
}
sendNotifications
(
entityRetriever
.
mapEdgeToAtlasRelationship
(
ret
),
OperationType
.
RELATIONSHIP_CREATE
);
return
ret
;
}
...
...
@@ -498,6 +496,7 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
}
// propagated classifications should contain blocked propagated classification
private
AtlasVertex
validateBlockedPropagatedClassification
(
List
<
AtlasVertex
>
classificationVertices
,
AtlasClassification
classification
)
{
AtlasVertex
ret
=
null
;
...
...
@@ -513,7 +512,6 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
return
ret
;
}
private
void
addToBlockedClassificationIds
(
AtlasEdge
edge
,
List
<
String
>
classificationIds
)
{
if
(
edge
!=
null
)
{
if
(
classificationIds
.
isEmpty
())
{
...
...
@@ -876,4 +874,11 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
return
(
attribute
!=
null
)
?
attribute
.
getRelationshipEdgeLabel
()
:
null
;
}
private
void
sendNotifications
(
AtlasRelationship
ret
,
OperationType
relationshipUpdate
)
throws
AtlasBaseException
{
entityChangeNotifier
.
notifyPropagatedEntities
();
if
(
notificationsEnabled
){
entityChangeNotifier
.
notifyRelationshipMutation
(
ret
,
relationshipUpdate
);
}
}
}
\ No newline at end of file
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
View file @
7527ca06
...
...
@@ -1221,8 +1221,8 @@ public class EntityGraphRetriever {
AtlasVertex
end1Vertex
=
edge
.
getOutVertex
();
AtlasVertex
end2Vertex
=
edge
.
getInVertex
();
relationship
.
setEnd1
(
new
AtlasObjectId
(
getGuid
(
end1Vertex
),
getTypeName
(
end1Vertex
)));
relationship
.
setEnd2
(
new
AtlasObjectId
(
getGuid
(
end2Vertex
),
getTypeName
(
end2Vertex
)));
relationship
.
setEnd1
(
new
AtlasObjectId
(
getGuid
(
end1Vertex
),
getTypeName
(
end1Vertex
)
,
getEntityUniqueAttribute
(
end1Vertex
)
));
relationship
.
setEnd2
(
new
AtlasObjectId
(
getGuid
(
end2Vertex
),
getTypeName
(
end2Vertex
)
,
getEntityUniqueAttribute
(
end2Vertex
)
));
relationship
.
setLabel
(
edge
.
getLabel
());
relationship
.
setPropagateTags
(
getPropagateTags
(
edge
));
...
...
webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
View file @
7527ca06
...
...
@@ -26,6 +26,8 @@ import org.apache.atlas.model.instance.AtlasClassification;
import
org.apache.atlas.model.instance.AtlasEntity
;
import
org.apache.atlas.model.instance.AtlasEntityHeader
;
import
org.apache.atlas.model.instance.AtlasRelatedObjectId
;
import
org.apache.atlas.model.instance.AtlasRelationship
;
import
org.apache.atlas.model.instance.AtlasRelationshipHeader
;
import
org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2
;
import
org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2.OperationType
;
import
org.apache.atlas.type.AtlasClassificationType
;
...
...
@@ -48,8 +50,16 @@ import java.util.List;
import
java.util.Map
;
import
java.util.Set
;
import
static
org
.
apache
.
atlas
.
model
.
notification
.
EntityNotification
.
EntityNotificationV2
.
OperationType
.
CLASSIFICATION_ADD
;
import
static
org
.
apache
.
atlas
.
model
.
notification
.
EntityNotification
.
EntityNotificationV2
.
OperationType
.
CLASSIFICATION_DELETE
;
import
static
org
.
apache
.
atlas
.
model
.
notification
.
EntityNotification
.
EntityNotificationV2
.
OperationType
.
CLASSIFICATION_UPDATE
;
import
static
org
.
apache
.
atlas
.
model
.
notification
.
EntityNotification
.
EntityNotificationV2
.
OperationType
.
ENTITY_CREATE
;
import
static
org
.
apache
.
atlas
.
model
.
notification
.
EntityNotification
.
EntityNotificationV2
.
OperationType
.
ENTITY_DELETE
;
import
static
org
.
apache
.
atlas
.
model
.
notification
.
EntityNotification
.
EntityNotificationV2
.
OperationType
.
ENTITY_UPDATE
;
import
static
org
.
apache
.
atlas
.
model
.
notification
.
EntityNotification
.
EntityNotificationV2
.
OperationType
.
RELATIONSHIP_CREATE
;
import
static
org
.
apache
.
atlas
.
model
.
notification
.
EntityNotification
.
EntityNotificationV2
.
OperationType
.
RELATIONSHIP_DELETE
;
import
static
org
.
apache
.
atlas
.
model
.
notification
.
EntityNotification
.
EntityNotificationV2
.
OperationType
.
RELATIONSHIP_UPDATE
;
import
static
org
.
apache
.
atlas
.
repository
.
graph
.
GraphHelper
.
isInternalType
;
import
static
org
.
apache
.
atlas
.
model
.
notification
.
EntityNotification
.
EntityNotificationV2
.
OperationType
.*;
import
static
org
.
apache
.
atlas
.
repository
.
store
.
graph
.
v2
.
EntityGraphRetriever
.
CREATE_TIME
;
import
static
org
.
apache
.
atlas
.
repository
.
store
.
graph
.
v2
.
EntityGraphRetriever
.
DESCRIPTION
;
import
static
org
.
apache
.
atlas
.
repository
.
store
.
graph
.
v2
.
EntityGraphRetriever
.
NAME
;
...
...
@@ -120,7 +130,6 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
private
void
notifyEntityEvents
(
List
<
AtlasEntity
>
entities
,
OperationType
operationType
)
throws
AtlasBaseException
{
MetricRecorder
metric
=
RequestContext
.
get
().
startMetricRecord
(
"entityNotification"
);
List
<
EntityNotificationV2
>
messages
=
new
ArrayList
<>();
for
(
AtlasEntity
entity
:
entities
)
{
...
...
@@ -131,6 +140,27 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
messages
.
add
(
new
EntityNotificationV2
(
toNotificationHeader
(
entity
),
operationType
,
RequestContext
.
get
().
getRequestTime
()));
}
sendNotifications
(
operationType
,
messages
);
RequestContext
.
get
().
endMetricRecord
(
metric
);
}
private
void
notifyRelationshipEvents
(
List
<
AtlasRelationship
>
relationships
,
OperationType
operationType
)
throws
AtlasBaseException
{
MetricRecorder
metric
=
RequestContext
.
get
().
startMetricRecord
(
"entityNotification"
);
List
<
EntityNotificationV2
>
messages
=
new
ArrayList
<>();
for
(
AtlasRelationship
relationship
:
relationships
)
{
if
(
isInternalType
(
relationship
.
getTypeName
()))
{
continue
;
}
messages
.
add
(
new
EntityNotificationV2
(
toNotificationHeader
(
relationship
),
operationType
,
RequestContext
.
get
().
getRequestTime
()));
}
sendNotifications
(
operationType
,
messages
);
RequestContext
.
get
().
endMetricRecord
(
metric
);
}
private
void
sendNotifications
(
OperationType
operationType
,
List
<
EntityNotificationV2
>
messages
)
throws
AtlasBaseException
{
if
(!
messages
.
isEmpty
())
{
try
{
notificationSender
.
send
(
messages
);
...
...
@@ -138,8 +168,6 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
throw
new
AtlasBaseException
(
AtlasErrorCode
.
ENTITY_NOTIFICATION_FAILED
,
e
,
operationType
.
name
());
}
}
RequestContext
.
get
().
endMetricRecord
(
metric
);
}
private
AtlasEntityHeader
toNotificationHeader
(
AtlasEntity
entity
)
{
...
...
@@ -188,6 +216,10 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
return
ret
;
}
private
AtlasRelationshipHeader
toNotificationHeader
(
AtlasRelationship
relationship
)
{
return
new
AtlasRelationshipHeader
(
relationship
);
}
private
void
setAttribute
(
AtlasEntityHeader
entity
,
String
attrName
,
Object
attrValue
)
{
if
(
attrValue
!=
null
)
{
entity
.
setAttribute
(
attrName
,
attrValue
);
...
...
@@ -237,4 +269,19 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
return
ret
;
}
@Override
public
void
onRelationshipsAdded
(
List
<
AtlasRelationship
>
relationships
,
boolean
isImport
)
throws
AtlasBaseException
{
notifyRelationshipEvents
(
relationships
,
RELATIONSHIP_CREATE
);
}
@Override
public
void
onRelationshipsUpdated
(
List
<
AtlasRelationship
>
relationships
,
boolean
isImport
)
throws
AtlasBaseException
{
notifyRelationshipEvents
(
relationships
,
RELATIONSHIP_UPDATE
);
}
@Override
public
void
onRelationshipsDeleted
(
List
<
AtlasRelationship
>
relationships
,
boolean
isImport
)
throws
AtlasBaseException
{
notifyRelationshipEvents
(
relationships
,
RELATIONSHIP_DELETE
);
}
}
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment