Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
atlas
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
dataplatform
atlas
Commits
511c8867
Commit
511c8867
authored
Jan 16, 2017
by
Sarath Subramanian
Committed by
Madhan Neethiraj
Jan 19, 2017
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
ATLAS-1463: option to exclude specific entity attributes in audit records
Signed-off-by:
Madhan Neethiraj
<
madhan@apache.org
>
parent
75bcccd1
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
300 additions
and
21 deletions
+300
-21
EntityAuditListener.java
...rg/apache/atlas/repository/audit/EntityAuditListener.java
+205
-17
EntityAuditRepository.java
.../apache/atlas/repository/audit/EntityAuditRepository.java
+14
-0
HBaseBasedAuditRepository.java
...che/atlas/repository/audit/HBaseBasedAuditRepository.java
+57
-4
InMemoryEntityAuditRepository.java
...atlas/repository/audit/InMemoryEntityAuditRepository.java
+10
-0
NoopEntityAuditRepository.java
...che/atlas/repository/audit/NoopEntityAuditRepository.java
+10
-0
StructInstance.java
...g/apache/atlas/typesystem/persistence/StructInstance.java
+4
-0
No files found.
repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
View file @
511c8867
...
...
@@ -21,20 +21,32 @@ package org.apache.atlas.repository.audit;
import
com.google.inject.Inject
;
import
org.apache.atlas.AtlasException
;
import
org.apache.atlas.EntityAuditEvent
;
import
org.apache.atlas.EntityAuditEvent.EntityAuditAction
;
import
org.apache.atlas.RequestContext
;
import
org.apache.atlas.listener.EntityChangeListener
;
import
org.apache.atlas.typesystem.IReferenceableInstance
;
import
org.apache.atlas.typesystem.IStruct
;
import
org.apache.atlas.typesystem.ITypedReferenceableInstance
;
import
org.apache.atlas.typesystem.json.InstanceSerialization
;
import
org.apache.atlas.typesystem.types.AttributeInfo
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.collections.MapUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.nio.charset.StandardCharsets
;
import
java.util.ArrayList
;
import
java.util.Collection
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
/**
* Listener on entity create/update/delete, tag add/delete. Adds the corresponding audit event to the audit repository.
*/
public
class
EntityAuditListener
implements
EntityChangeListener
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
EntityAuditListener
.
class
);
private
EntityAuditRepository
auditRepository
;
@Inject
...
...
@@ -46,44 +58,41 @@ public class EntityAuditListener implements EntityChangeListener {
public
void
onEntitiesAdded
(
Collection
<
ITypedReferenceableInstance
>
entities
)
throws
AtlasException
{
List
<
EntityAuditEvent
>
events
=
new
ArrayList
<>();
long
currentTime
=
RequestContext
.
get
().
getRequestTime
();
for
(
ITypedReferenceableInstance
entity
:
entities
)
{
EntityAuditEvent
event
=
createEvent
(
entity
,
currentTime
,
EntityAuditEvent
.
EntityAuditAction
.
ENTITY_CREATE
,
"Created: "
+
InstanceSerialization
.
toJson
(
entity
,
true
));
EntityAuditEvent
event
=
createEvent
(
entity
,
currentTime
,
EntityAuditAction
.
ENTITY_CREATE
);
events
.
add
(
event
);
}
auditRepository
.
putEvents
(
events
);
}
private
EntityAuditEvent
createEvent
(
ITypedReferenceableInstance
entity
,
long
ts
,
EntityAuditEvent
.
EntityAuditAction
action
,
String
details
)
throws
AtlasException
{
return
new
EntityAuditEvent
(
entity
.
getId
().
_getId
(),
ts
,
RequestContext
.
get
().
getUser
(),
action
,
details
,
entity
);
auditRepository
.
putEvents
(
events
);
}
@Override
public
void
onEntitiesUpdated
(
Collection
<
ITypedReferenceableInstance
>
entities
)
throws
AtlasException
{
List
<
EntityAuditEvent
>
events
=
new
ArrayList
<>();
long
currentTime
=
RequestContext
.
get
().
getRequestTime
();
for
(
ITypedReferenceableInstance
entity
:
entities
)
{
EntityAuditEvent
event
=
createEvent
(
entity
,
currentTime
,
EntityAuditEvent
.
EntityAuditAction
.
ENTITY_UPDATE
,
"Updated: "
+
InstanceSerialization
.
toJson
(
entity
,
true
));
EntityAuditEvent
event
=
createEvent
(
entity
,
currentTime
,
EntityAuditAction
.
ENTITY_UPDATE
);
events
.
add
(
event
);
}
auditRepository
.
putEvents
(
events
);
}
@Override
public
void
onTraitAdded
(
ITypedReferenceableInstance
entity
,
IStruct
trait
)
throws
AtlasException
{
EntityAuditEvent
event
=
createEvent
(
entity
,
RequestContext
.
get
().
getRequestTime
(),
EntityAuditEvent
.
EntityAuditAction
.
TAG_ADD
,
"Added trait: "
+
InstanceSerialization
.
toJson
(
trait
,
true
));
EntityAuditEvent
event
=
createEvent
(
entity
,
RequestContext
.
get
().
getRequestTime
(),
EntityAuditAction
.
TAG_ADD
,
"Added trait: "
+
InstanceSerialization
.
toJson
(
trait
,
true
));
auditRepository
.
putEvents
(
event
);
}
@Override
public
void
onTraitDeleted
(
ITypedReferenceableInstance
entity
,
String
traitName
)
throws
AtlasException
{
EntityAuditEvent
event
=
createEvent
(
entity
,
RequestContext
.
get
().
getRequestTime
(),
EntityAuditEvent
.
EntityAuditAction
.
TAG_DELETE
,
"Deleted trait: "
+
traitName
);
EntityAuditEvent
event
=
createEvent
(
entity
,
RequestContext
.
get
().
getRequestTime
(),
EntityAuditAction
.
TAG_DELETE
,
"Deleted trait: "
+
traitName
);
auditRepository
.
putEvents
(
event
);
}
...
...
@@ -91,11 +100,190 @@ public class EntityAuditListener implements EntityChangeListener {
public
void
onEntitiesDeleted
(
Collection
<
ITypedReferenceableInstance
>
entities
)
throws
AtlasException
{
List
<
EntityAuditEvent
>
events
=
new
ArrayList
<>();
long
currentTime
=
RequestContext
.
get
().
getRequestTime
();
for
(
ITypedReferenceableInstance
entity
:
entities
)
{
EntityAuditEvent
event
=
createEvent
(
entity
,
currentTime
,
EntityAuditEvent
.
EntityAuditAction
.
ENTITY_DELETE
,
"Deleted entity"
);
EntityAuditEvent
event
=
createEvent
(
entity
,
currentTime
,
EntityAuditAction
.
ENTITY_DELETE
,
"Deleted entity"
);
events
.
add
(
event
);
}
auditRepository
.
putEvents
(
events
);
}
private
EntityAuditEvent
createEvent
(
ITypedReferenceableInstance
entity
,
long
ts
,
EntityAuditAction
action
)
throws
AtlasException
{
String
detail
=
getAuditEventDetail
(
entity
,
action
);
return
createEvent
(
entity
,
ts
,
action
,
detail
);
}
private
EntityAuditEvent
createEvent
(
ITypedReferenceableInstance
entity
,
long
ts
,
EntityAuditAction
action
,
String
details
)
throws
AtlasException
{
return
new
EntityAuditEvent
(
entity
.
getId
().
_getId
(),
ts
,
RequestContext
.
get
().
getUser
(),
action
,
details
,
entity
);
}
private
String
getAuditEventDetail
(
ITypedReferenceableInstance
entity
,
EntityAuditAction
action
)
throws
AtlasException
{
Map
<
String
,
Object
>
prunedAttributes
=
pruneEntityAttributesForAudit
(
entity
);
String
auditPrefix
=
getAuditPrefix
(
action
);
String
auditString
=
auditPrefix
+
InstanceSerialization
.
toJson
(
entity
,
true
);
byte
[]
auditBytes
=
auditString
.
getBytes
(
StandardCharsets
.
UTF_8
);
long
auditSize
=
auditBytes
!=
null
?
auditBytes
.
length
:
0
;
long
auditMaxSize
=
auditRepository
.
repositoryMaxSize
();
if
(
auditMaxSize
>=
0
&&
auditSize
>
auditMaxSize
)
{
// don't store attributes in audit
LOG
.
warn
(
"audit record too long: entityType={}, guid={}, size={}; maxSize={}. entity attribute values not stored in audit"
,
entity
.
getTypeName
(),
entity
.
getId
().
_getId
(),
auditSize
,
auditMaxSize
);
Map
<
String
,
Object
>
attrValues
=
entity
.
getValuesMap
();
clearAttributeValues
(
entity
);
auditString
=
auditPrefix
+
InstanceSerialization
.
toJson
(
entity
,
true
);
addAttributeValues
(
entity
,
attrValues
);
}
restoreEntityAttributes
(
entity
,
prunedAttributes
);
return
auditString
;
}
private
void
clearAttributeValues
(
IReferenceableInstance
entity
)
throws
AtlasException
{
Map
<
String
,
Object
>
attributesMap
=
entity
.
getValuesMap
();
if
(
MapUtils
.
isNotEmpty
(
attributesMap
))
{
for
(
String
attribute
:
attributesMap
.
keySet
())
{
entity
.
setNull
(
attribute
);
}
}
}
private
void
addAttributeValues
(
ITypedReferenceableInstance
entity
,
Map
<
String
,
Object
>
attributesMap
)
throws
AtlasException
{
if
(
MapUtils
.
isNotEmpty
(
attributesMap
))
{
for
(
String
attr
:
attributesMap
.
keySet
())
{
entity
.
set
(
attr
,
attributesMap
.
get
(
attr
));
}
}
}
private
Map
<
String
,
Object
>
pruneEntityAttributesForAudit
(
ITypedReferenceableInstance
entity
)
throws
AtlasException
{
Map
<
String
,
Object
>
ret
=
null
;
Map
<
String
,
Object
>
entityAttributes
=
entity
.
getValuesMap
();
List
<
String
>
excludeAttributes
=
auditRepository
.
getAuditExcludeAttributes
(
entity
.
getTypeName
());
if
(
CollectionUtils
.
isNotEmpty
(
excludeAttributes
)
&&
MapUtils
.
isNotEmpty
(
entityAttributes
))
{
Map
<
String
,
AttributeInfo
>
attributeInfoMap
=
entity
.
fieldMapping
().
fields
;
for
(
String
attrName
:
entityAttributes
.
keySet
())
{
Object
attrValue
=
entityAttributes
.
get
(
attrName
);
AttributeInfo
attrInfo
=
attributeInfoMap
.
get
(
attrName
);
if
(
excludeAttributes
.
contains
(
attrName
))
{
if
(
ret
==
null
)
{
ret
=
new
HashMap
<>();
}
ret
.
put
(
attrName
,
attrValue
);
entity
.
setNull
(
attrName
);
}
else
if
(
attrInfo
.
isComposite
)
{
if
(
attrValue
instanceof
Collection
)
{
for
(
Object
attribute
:
(
Collection
)
attrValue
)
{
if
(
attribute
instanceof
ITypedReferenceableInstance
)
{
ITypedReferenceableInstance
attrInstance
=
(
ITypedReferenceableInstance
)
attribute
;
Map
<
String
,
Object
>
prunedAttrs
=
pruneEntityAttributesForAudit
(
attrInstance
);
if
(
MapUtils
.
isNotEmpty
(
prunedAttrs
))
{
if
(
ret
==
null
)
{
ret
=
new
HashMap
<>();
}
ret
.
put
(
attrInstance
.
getId
().
_getId
(),
prunedAttrs
);
}
}
}
}
else
if
(
attrValue
instanceof
ITypedReferenceableInstance
)
{
ITypedReferenceableInstance
attrInstance
=
(
ITypedReferenceableInstance
)
attrValue
;
Map
<
String
,
Object
>
prunedAttrs
=
pruneEntityAttributesForAudit
(
attrInstance
);
if
(
MapUtils
.
isNotEmpty
(
prunedAttrs
))
{
if
(
ret
==
null
)
{
ret
=
new
HashMap
<>();
}
ret
.
put
(
attrInstance
.
getId
().
_getId
(),
prunedAttrs
);
}
}
}
}
}
return
ret
;
}
private
void
restoreEntityAttributes
(
ITypedReferenceableInstance
entity
,
Map
<
String
,
Object
>
prunedAttributes
)
throws
AtlasException
{
if
(
MapUtils
.
isEmpty
(
prunedAttributes
))
{
return
;
}
Map
<
String
,
Object
>
entityAttributes
=
entity
.
getValuesMap
();
if
(
MapUtils
.
isNotEmpty
(
entityAttributes
))
{
Map
<
String
,
AttributeInfo
>
attributeInfoMap
=
entity
.
fieldMapping
().
fields
;
for
(
String
attrName
:
entityAttributes
.
keySet
())
{
Object
attrValue
=
entityAttributes
.
get
(
attrName
);
AttributeInfo
attrInfo
=
attributeInfoMap
.
get
(
attrName
);
if
(
prunedAttributes
.
containsKey
(
attrName
))
{
entity
.
set
(
attrName
,
prunedAttributes
.
get
(
attrName
));
}
else
if
(
attrInfo
.
isComposite
)
{
if
(
attrValue
instanceof
Collection
)
{
for
(
Object
attributeEntity
:
(
Collection
)
attrValue
)
{
if
(
attributeEntity
instanceof
ITypedReferenceableInstance
)
{
ITypedReferenceableInstance
attrInstance
=
(
ITypedReferenceableInstance
)
attributeEntity
;
Object
obj
=
prunedAttributes
.
get
(
attrInstance
.
getId
().
_getId
());
if
(
obj
instanceof
Map
)
{
restoreEntityAttributes
(
attrInstance
,
(
Map
)
obj
);
}
}
}
}
else
if
(
attrValue
instanceof
ITypedReferenceableInstance
)
{
ITypedReferenceableInstance
attrInstance
=
(
ITypedReferenceableInstance
)
attrValue
;
Object
obj
=
prunedAttributes
.
get
(
attrInstance
.
getId
().
_getId
());
if
(
obj
instanceof
Map
)
{
restoreEntityAttributes
(
attrInstance
,
(
Map
)
obj
);
}
}
}
}
}
}
private
String
getAuditPrefix
(
EntityAuditAction
action
)
{
final
String
ret
;
switch
(
action
)
{
case
ENTITY_CREATE:
ret
=
"Created: "
;
break
;
case
ENTITY_UPDATE:
ret
=
"Updated: "
;
break
;
case
ENTITY_DELETE:
ret
=
"Deleted: "
;
break
;
case
TAG_ADD:
ret
=
"Added trait: "
;
break
;
case
TAG_DELETE:
ret
=
"Deleted trait: "
;
break
;
default
:
ret
=
"Unknown: "
;
}
return
ret
;
}
}
repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
View file @
511c8867
...
...
@@ -50,4 +50,18 @@ public interface EntityAuditRepository {
* @throws AtlasException
*/
List
<
EntityAuditEvent
>
listEvents
(
String
entityId
,
String
startKey
,
short
n
)
throws
AtlasException
;
/**
* Returns maximum allowed repository size per EntityAuditEvent
* @throws AtlasException
*/
long
repositoryMaxSize
()
throws
AtlasException
;
/**
* list of attributes to be excluded when storing in audit repo.
* @param entityType type of entity
* @return list of attribute names to be excluded
* @throws AtlasException
*/
List
<
String
>
getAuditExcludeAttributes
(
String
entityType
)
throws
AtlasException
;
}
repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
View file @
511c8867
...
...
@@ -52,8 +52,10 @@ import java.io.Closeable;
import
java.io.IOException
;
import
java.util.ArrayList
;
import
java.util.Arrays
;
import
java.util.HashMap
;
import
java.util.Iterator
;
import
java.util.List
;
import
java.util.Map
;
/**
* HBase based repository for entity audit events
...
...
@@ -74,9 +76,6 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
public
static
final
String
CONFIG_PREFIX
=
"atlas.audit"
;
public
static
final
String
CONFIG_TABLE_NAME
=
CONFIG_PREFIX
+
".hbase.tablename"
;
public
static
final
String
DEFAULT_TABLE_NAME
=
"ATLAS_ENTITY_AUDIT_EVENTS"
;
private
static
final
String
FIELD_SEPARATOR
=
":"
;
public
static
final
String
CONFIG_PERSIST_ENTITY_DEFINITION
=
CONFIG_PREFIX
+
".persistEntityDefinition"
;
public
static
final
byte
[]
COLUMN_FAMILY
=
Bytes
.
toBytes
(
"dt"
);
...
...
@@ -85,7 +84,15 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
public
static
final
byte
[]
COLUMN_USER
=
Bytes
.
toBytes
(
"u"
);
public
static
final
byte
[]
COLUMN_DEFINITION
=
Bytes
.
toBytes
(
"f"
);
private
static
boolean
persistEntityDefinition
;
private
static
final
String
AUDIT_REPOSITORY_MAX_SIZE_PROPERTY
=
"atlas.hbase.client.keyvalue.maxsize"
;
private
static
final
String
AUDIT_EXCLUDE_ATTRIBUTE_PROPERTY
=
"atlas.audit.hbase.entity"
;
private
static
final
String
FIELD_SEPARATOR
=
":"
;
private
static
final
long
ATLAS_HBASE_KEYVALUE_DEFAULT_SIZE
=
1024
*
1024
;
private
static
Configuration
APPLICATION_PROPERTIES
=
null
;
private
static
boolean
persistEntityDefinition
;
private
Map
<
String
,
List
<
String
>>
auditExcludedAttributesCache
=
new
HashMap
<>();
static
{
try
{
...
...
@@ -219,6 +226,52 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
}
}
@Override
public
long
repositoryMaxSize
()
throws
AtlasException
{
long
ret
;
initApplicationProperties
();
if
(
APPLICATION_PROPERTIES
==
null
)
{
ret
=
ATLAS_HBASE_KEYVALUE_DEFAULT_SIZE
;
}
else
{
ret
=
APPLICATION_PROPERTIES
.
getLong
(
AUDIT_REPOSITORY_MAX_SIZE_PROPERTY
,
ATLAS_HBASE_KEYVALUE_DEFAULT_SIZE
);
}
return
ret
;
}
@Override
public
List
<
String
>
getAuditExcludeAttributes
(
String
entityType
)
throws
AtlasException
{
List
<
String
>
ret
=
null
;
initApplicationProperties
();
if
(
auditExcludedAttributesCache
.
containsKey
(
entityType
))
{
ret
=
auditExcludedAttributesCache
.
get
(
entityType
);
}
else
if
(
APPLICATION_PROPERTIES
!=
null
)
{
String
[]
excludeAttributes
=
APPLICATION_PROPERTIES
.
getStringArray
(
AUDIT_EXCLUDE_ATTRIBUTE_PROPERTY
+
"."
+
entityType
+
"."
+
"attributes.exclude"
);
if
(
excludeAttributes
!=
null
)
{
ret
=
Arrays
.
asList
(
excludeAttributes
);
}
auditExcludedAttributesCache
.
put
(
entityType
,
ret
);
}
return
ret
;
}
private
void
initApplicationProperties
()
{
if
(
APPLICATION_PROPERTIES
==
null
)
{
try
{
APPLICATION_PROPERTIES
=
ApplicationProperties
.
get
();
}
catch
(
AtlasException
ex
)
{
// ignore
}
}
}
private
String
getResultString
(
Result
result
,
byte
[]
columnName
)
{
byte
[]
rawValue
=
result
.
getValue
(
COLUMN_FAMILY
,
columnName
);
if
(
rawValue
!=
null
)
{
...
...
repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
View file @
511c8867
...
...
@@ -66,4 +66,14 @@ public class InMemoryEntityAuditRepository implements EntityAuditRepository {
}
return
events
;
}
@Override
public
long
repositoryMaxSize
()
throws
AtlasException
{
return
-
1
;
}
@Override
public
List
<
String
>
getAuditExcludeAttributes
(
String
entityType
)
throws
AtlasException
{
return
null
;
}
}
repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java
View file @
511c8867
...
...
@@ -47,4 +47,14 @@ public class NoopEntityAuditRepository implements EntityAuditRepository {
throws
AtlasException
{
return
Collections
.
emptyList
();
}
@Override
public
long
repositoryMaxSize
()
throws
AtlasException
{
return
-
1
;
}
@Override
public
List
<
String
>
getAuditExcludeAttributes
(
String
entityType
)
throws
AtlasException
{
return
null
;
}
}
typesystem/src/main/java/org/apache/atlas/typesystem/persistence/StructInstance.java
View file @
511c8867
...
...
@@ -254,6 +254,10 @@ public class StructInstance implements ITypedStruct {
bigDecimals
[
pos
]
=
null
;
}
else
if
(
i
.
dataType
()
==
DataTypes
.
DATE_TYPE
)
{
dates
[
pos
]
=
null
;
}
else
if
(
i
.
dataType
()
==
DataTypes
.
INT_TYPE
)
{
ints
[
pos
]
=
0
;
}
else
if
(
i
.
dataType
()
==
DataTypes
.
BOOLEAN_TYPE
)
{
bools
[
pos
]
=
false
;
}
else
if
(
i
.
dataType
()
==
DataTypes
.
STRING_TYPE
)
{
strings
[
pos
]
=
null
;
}
else
if
(
i
.
dataType
().
getTypeCategory
()
==
DataTypes
.
TypeCategory
.
ARRAY
)
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment