Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
atlas
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
dataplatform
atlas
Commits
85280ddf
Commit
85280ddf
authored
Nov 09, 2018
by
Sarath Subramanian
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
ATLAS-2959: Update metrics query to use index query instead of graph query for faster response
parent
e272a28e
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
193 additions
and
237 deletions
+193
-237
AtlasIndexQuery.java
.../org/apache/atlas/repository/graphdb/AtlasIndexQuery.java
+6
-0
AtlasJanusIndexQuery.java
.../atlas/repository/graphdb/janus/AtlasJanusIndexQuery.java
+5
-0
MetricsService.java
...c/main/java/org/apache/atlas/services/MetricsService.java
+62
-128
MetricsServiceTest.java
...st/java/org/apache/atlas/services/MetricsServiceTest.java
+120
-109
metrics-entities-data.zip
repository/src/test/resources/metrics-entities-data.zip
+0
-0
No files found.
graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasIndexQuery.java
View file @
85280ddf
...
...
@@ -44,6 +44,12 @@ public interface AtlasIndexQuery<V, E> {
Iterator
<
Result
<
V
,
E
>>
vertices
(
int
offset
,
int
limit
);
/**
* Gets the total count of query results
* @return
*/
Long
vertexTotals
();
/**
* Query result from an index query.
*
* @param <V>
...
...
graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusIndexQuery.java
View file @
85280ddf
...
...
@@ -77,6 +77,11 @@ public class AtlasJanusIndexQuery implements AtlasIndexQuery<AtlasJanusVertex, A
return
Iterators
.
transform
(
results
,
function
);
}
@Override
public
Long
vertexTotals
()
{
return
query
.
vertexTotals
();
}
/**
* Janus implementation of AtlasIndexQuery.Result.
*/
...
...
repository/src/main/java/org/apache/atlas/services/MetricsService.java
View file @
85280ddf
...
...
@@ -17,13 +17,12 @@
*/
package
org
.
apache
.
atlas
.
services
;
import
com.google.common.annotations.VisibleForTesting
;
import
org.apache.atlas.annotation.AtlasService
;
import
org.apache.atlas.
exception.AtlasBaseException
;
import
org.apache.atlas.
model.instance.AtlasEntity.Status
;
import
org.apache.atlas.model.metrics.AtlasMetrics
;
import
org.apache.atlas.repository.graphdb.AtlasGraph
;
import
org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2
;
import
org.apache.atlas.type.AtlasTypeRegistry
;
import
org.apache.atlas.util.AtlasGremlinQueryProvider
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.configuration.Configuration
;
import
org.slf4j.Logger
;
...
...
@@ -32,13 +31,14 @@ import org.slf4j.LoggerFactory;
import
javax.inject.Inject
;
import
java.util.Collection
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.stream.Collectors
;
import
java.util.stream.IntStream
;
import
static
org
.
apache
.
atlas
.
util
.
AtlasGremlinQueryProvider
.
AtlasGremlinQuery
;
import
static
org
.
apache
.
atlas
.
util
.
AtlasGremlinQueryProvider
.
INSTANCE
;
import
static
org
.
apache
.
atlas
.
discovery
.
SearchProcessor
.
AND_STR
;
import
static
org
.
apache
.
atlas
.
model
.
instance
.
AtlasEntity
.
Status
.
ACTIVE
;
import
static
org
.
apache
.
atlas
.
model
.
instance
.
AtlasEntity
.
Status
.
DELETED
;
import
static
org
.
apache
.
atlas
.
repository
.
Constants
.
ENTITY_TYPE_PROPERTY_KEY
;
import
static
org
.
apache
.
atlas
.
repository
.
Constants
.
STATE_PROPERTY_KEY
;
import
static
org
.
apache
.
atlas
.
repository
.
Constants
.
VERTEX_INDEX
;
@AtlasService
public
class
MetricsService
{
...
...
@@ -53,53 +53,31 @@ public class MetricsService {
// Query names
protected
static
final
String
METRIC_TYPE_COUNT
=
TYPE
+
"Count"
;
protected
static
final
String
METRIC_TYPE_UNUSED_COUNT
=
TYPE
+
"UnusedCount"
;
protected
static
final
String
METRIC_TYPE_ENTITIES
=
TYPE
+
"Entities"
;
protected
static
final
String
METRIC_ENTITY_COUNT
=
ENTITY
+
"Count"
;
protected
static
final
String
METRIC_ENTITY_DELETED
=
ENTITY
+
"Deleted"
;
protected
static
final
String
METRIC_ENTITY_ACTIVE
=
ENTITY
+
"Active"
;
protected
static
final
String
METRIC_TAGGED_ENTITIES
=
ENTITY
+
"Tagged"
;
protected
static
final
String
METRIC_TAGS_PER_ENTITY
=
ENTITY
+
"Tags"
;
protected
static
final
String
METRIC_TAG_COUNT
=
TAG
+
"Count"
;
protected
static
final
String
METRIC_ENTITIES_PER_TAG
=
TAG
+
"Entities"
;
public
static
final
String
METRIC_QUERY_PREFIX
=
"atlas.metric.query."
;
public
static
final
String
METRIC_QUERY_CACHE_TTL
=
"atlas.metric.query.cache.ttlInSecs"
;
public
static
final
String
METRIC_QUERY_GREMLIN_TYPES_BATCH_SIZE
=
"atlas.metric.query.gremlin.typesBatchSize"
;
public
static
final
int
DEFAULT_CACHE_TTL_IN_SECS
=
900
;
public
static
final
int
DEFAULT_GREMLIN_BATCH_SIZE
=
25
;
public
static
final
String
METRIC_COLLECTION_TIME
=
"collectionTime"
;
private
static
Configuration
configuration
=
null
;
private
static
AtlasGremlinQueryProvider
gremlinQueryProvider
=
null
;
private
final
AtlasGraph
atlasGraph
;
private
final
AtlasTypeRegistry
typeRegistry
;
private
final
int
cacheTTLInSecs
;
private
final
int
gremlinBatchSize
;
private
final
String
indexSearchPrefix
=
AtlasGraphUtilsV2
.
getIndexSearchPrefix
()
;
private
AtlasMetrics
cachedMetrics
=
null
;
private
long
cacheExpirationTime
=
0
;
@Inject
public
MetricsService
(
final
Configuration
configuration
,
final
AtlasGraph
graph
,
final
AtlasTypeRegistry
typeRegistry
)
{
this
(
configuration
,
graph
,
typeRegistry
,
INSTANCE
);
}
@VisibleForTesting
MetricsService
(
Configuration
configuration
,
AtlasGraph
graph
,
AtlasTypeRegistry
typeRegistry
,
AtlasGremlinQueryProvider
queryProvider
)
{
MetricsService
.
configuration
=
configuration
;
atlasGraph
=
graph
;
cacheTTLInSecs
=
configuration
!=
null
?
configuration
.
getInt
(
METRIC_QUERY_CACHE_TTL
,
DEFAULT_CACHE_TTL_IN_SECS
)
:
DEFAULT_CACHE_TTL_IN_SECS
;
gremlinBatchSize
=
configuration
!=
null
?
configuration
.
getInt
(
METRIC_QUERY_GREMLIN_TYPES_BATCH_SIZE
,
DEFAULT_GREMLIN_BATCH_SIZE
)
:
DEFAULT_GREMLIN_BATCH_SIZE
;
gremlinQueryProvider
=
queryProvider
;
this
.
atlasGraph
=
graph
;
this
.
cacheTTLInSecs
=
configuration
!=
null
?
configuration
.
getInt
(
METRIC_QUERY_CACHE_TTL
,
DEFAULT_CACHE_TTL_IN_SECS
)
:
DEFAULT_CACHE_TTL_IN_SECS
;
this
.
typeRegistry
=
typeRegistry
;
}
@SuppressWarnings
(
"unchecked"
)
...
...
@@ -107,62 +85,51 @@ public class MetricsService {
if
(
ignoreCache
||
!
isCacheValid
())
{
AtlasMetrics
metrics
=
new
AtlasMetrics
();
int
typeCount
=
0
,
unusedTypeCount
=
0
;
metrics
.
addMetric
(
GENERAL
,
METRIC_TYPE_COUNT
,
getAllTypesCount
());
metrics
.
addMetric
(
GENERAL
,
METRIC_TAG_COUNT
,
getAllTagsCount
());
Collection
<
String
>
typeNames
=
typeRegistry
.
getAllTypeNames
();
if
(
CollectionUtils
.
isNotEmpty
(
typeNames
))
{
typeCount
=
typeNames
.
size
();
}
metrics
.
addMetric
(
GENERAL
,
METRIC_TYPE_COUNT
,
typeCount
);
Map
<
String
,
Long
>
activeCountMap
=
new
HashMap
<>();
Map
<
String
,
Long
>
deletedCountMap
=
new
HashMap
<>();
int
tagCount
=
0
;
// metrics for classifications
Collection
<
String
>
classificationDefNames
=
typeRegistry
.
getAllClassificationDefNames
();
Map
<
String
,
Number
>
activeCountMap
=
new
HashMap
<>();
Map
<
String
,
Number
>
deletedCountMap
=
new
HashMap
<>();
List
<
String
>
classificationDefNames
=
typeRegistry
.
getAllClassificationDefNames
()
.
stream
()
.
map
(
x
->
"'"
+
x
+
"'"
)
.
collect
(
Collectors
.
toList
());
if
(
CollectionUtils
.
isNotEmpty
(
classificationDefNames
))
{
tagCount
=
classificationDefNames
.
size
();
if
(
classificationDefNames
!=
null
)
{
for
(
String
classificationDefName
:
classificationDefNames
)
{
activeCountMap
.
put
(
classificationDefName
,
getTypeCount
(
classificationDefName
,
ACTIVE
));
}
}
metrics
.
addMetric
(
GENERAL
,
METRIC_TAG_COUNT
,
tagCount
);
IntStream
.
range
(
0
,
(
classificationDefNames
.
size
()
+
gremlinBatchSize
-
1
)
/
gremlinBatchSize
)
.
mapToObj
(
i
->
classificationDefNames
.
subList
(
i
*
gremlinBatchSize
,
Math
.
min
(
classificationDefNames
.
size
(),
(
i
+
1
)
*
gremlinBatchSize
)))
.
forEach
(
batch
->
captureCounts
(
batch
,
activeCountMap
,
deletedCountMap
));
List
<
String
>
entityDefNames
=
typeRegistry
.
getAllEntityDefNames
()
.
stream
()
.
map
(
x
->
"'"
+
x
+
"'"
)
.
collect
(
Collectors
.
toList
());
IntStream
.
range
(
0
,
(
entityDefNames
.
size
()
+
gremlinBatchSize
-
1
)
/
gremlinBatchSize
)
.
mapToObj
(
i
->
entityDefNames
.
subList
(
i
*
gremlinBatchSize
,
Math
.
min
(
entityDefNames
.
size
(),
(
i
+
1
)
*
gremlinBatchSize
)))
.
forEach
(
batch
->
captureCounts
(
batch
,
activeCountMap
,
deletedCountMap
));
// metrics for entities
Collection
<
String
>
entityDefNames
=
typeRegistry
.
getAllEntityDefNames
();
int
totalEntities
=
0
;
if
(
entityDefNames
!=
null
)
{
for
(
String
entityDefName
:
entityDefNames
)
{
activeCountMap
.
put
(
entityDefName
,
getTypeCount
(
entityDefName
,
ACTIVE
));
deletedCountMap
.
put
(
entityDefName
,
getTypeCount
(
entityDefName
,
DELETED
));
}
}
Map
<
String
,
Number
>
activeEntityCount
=
new
HashMap
<>();
Map
<
String
,
Number
>
deletedEntityCount
=
new
HashMap
<>();
Map
<
String
,
Long
>
activeEntityCount
=
new
HashMap
<>();
Map
<
String
,
Long
>
deletedEntityCount
=
new
HashMap
<>();
long
unusedTypeCount
=
0
;
long
totalEntities
=
0
;
for
(
String
entityDefName
:
typeRegistry
.
getAllEntityDefNames
())
{
Number
activeCount
=
activeCountMap
.
getOrDefault
(
entityDefName
,
null
);
Number
deletedCount
=
deletedCountMap
.
getOrDefault
(
entityDefName
,
null
);
Long
activeCount
=
activeCountMap
.
get
(
entityDefName
);
Long
deletedCount
=
deletedCountMap
.
get
(
entityDefName
);
if
(
activeCount
!=
null
)
{
if
(
activeCount
>
0
)
{
activeEntityCount
.
put
(
entityDefName
,
activeCount
);
totalEntities
+=
activeCount
.
int
Value
();
totalEntities
+=
activeCount
.
long
Value
();
}
if
(
deletedCount
!=
null
)
{
if
(
deletedCount
>
0
)
{
deletedEntityCount
.
put
(
entityDefName
,
deletedCount
);
totalEntities
+=
deletedCount
.
int
Value
();
totalEntities
+=
deletedCount
.
long
Value
();
}
if
(
activeCount
==
null
&&
deletedCount
==
null
)
{
if
(
activeCount
==
0
&&
deletedCount
==
0
)
{
unusedTypeCount
++;
}
}
...
...
@@ -172,15 +139,17 @@ public class MetricsService {
metrics
.
addMetric
(
ENTITY
,
METRIC_ENTITY_ACTIVE
,
activeEntityCount
);
metrics
.
addMetric
(
ENTITY
,
METRIC_ENTITY_DELETED
,
deletedEntityCount
);
Map
<
String
,
Number
>
taggedEntityCount
=
new
HashMap
<>();
Map
<
String
,
Long
>
taggedEntityCount
=
new
HashMap
<>();
for
(
String
classificationName
:
typeRegistry
.
getAllClassificationDefNames
())
{
Object
count
=
activeCountMap
.
getOrDefault
(
classificationName
,
null
);
if
(
count
!=
null
)
{
taggedEntityCount
.
put
(
classificationName
,
(
Number
)
count
);
Long
count
=
activeCountMap
.
get
(
classificationName
);
if
(
count
>
0
)
{
taggedEntityCount
.
put
(
classificationName
,
count
);
}
}
metrics
.
addMetric
(
TAG
,
METRIC_ENTITIES_PER_TAG
,
taggedEntityCount
);
metrics
.
addMetric
(
TAG
,
METRIC_ENTITIES_PER_TAG
,
taggedEntityCount
);
// Miscellaneous metrics
long
collectionTime
=
System
.
currentTimeMillis
();
...
...
@@ -194,48 +163,25 @@ public class MetricsService {
return
cachedMetrics
;
}
private
void
captureCounts
(
List
<
String
>
typeNames
,
Map
<
String
,
Number
>
activeCountMap
,
Map
<
String
,
Number
>
deletedCountMap
)
{
String
typeNamesAsStr
=
String
.
join
(
","
,
typeNames
);
String
query
=
String
.
format
(
gremlinQueryProvider
.
getQuery
(
AtlasGremlinQuery
.
ENTITY_ACTIVE_METRIC
),
typeNamesAsStr
);
activeCountMap
.
putAll
(
extractCounts
(
query
));
private
Long
getTypeCount
(
String
typeName
,
Status
status
)
{
String
indexQuery
=
indexSearchPrefix
+
"\""
+
ENTITY_TYPE_PROPERTY_KEY
+
"\" : (%s)"
+
AND_STR
+
indexSearchPrefix
+
"\""
+
STATE_PROPERTY_KEY
+
"\" : (%s)"
;
query
=
String
.
format
(
gremlinQueryProvider
.
getQuery
(
AtlasGremlinQuery
.
ENTITY_DELETED_METRIC
),
typeNamesAsStr
);
deletedCountMap
.
putAll
(
extractCounts
(
query
));
indexQuery
=
String
.
format
(
indexQuery
,
typeName
,
status
.
name
());
return
atlasGraph
.
indexQuery
(
VERTEX_INDEX
,
indexQuery
).
vertexTotals
();
}
private
Map
<
String
,
Number
>
extractCounts
(
final
String
query
)
{
Map
<
String
,
Number
>
ret
=
new
HashMap
<>();
try
{
if
(
LOG
.
isDebugEnabled
())
{
LOG
.
debug
(
"Executing query: {}"
,
query
);
}
private
int
getAllTypesCount
()
{
Collection
<
String
>
allTypeNames
=
typeRegistry
.
getAllTypeNames
();
Object
result
=
executeQuery
(
query
);
if
(
result
instanceof
List
)
{
for
(
Object
entry
:
(
List
)
result
)
{
if
(
entry
instanceof
Map
)
{
ret
.
putAll
((
Map
<
String
,
Number
>)
entry
);
}
}
}
else
if
(
result
instanceof
Map
)
{
ret
.
putAll
((
Map
<
String
,
Number
>)
result
);
}
else
{
String
returnClassName
=
result
!=
null
?
result
.
getClass
().
getSimpleName
()
:
"null"
;
LOG
.
warn
(
"Unhandled return type {} for {}. Ignoring"
,
returnClassName
,
query
);
}
}
catch
(
AtlasBaseException
e
)
{
if
(
LOG
.
isDebugEnabled
())
{
LOG
.
debug
(
"Gremlin execution failed for metric {}"
,
query
,
e
);
}
else
{
LOG
.
warn
(
"Gremlin execution failed for metric {}"
,
query
);
}
}
return
ret
;
return
CollectionUtils
.
isNotEmpty
(
allTypeNames
)
?
allTypeNames
.
size
()
:
0
;
}
private
Object
executeQuery
(
final
String
query
)
throws
AtlasBaseException
{
return
atlasGraph
.
executeGremlinScript
(
query
,
false
);
private
int
getAllTagsCount
()
{
Collection
<
String
>
allTagNames
=
typeRegistry
.
getAllClassificationDefNames
();
return
CollectionUtils
.
isNotEmpty
(
allTagNames
)
?
allTagNames
.
size
()
:
0
;
}
private
boolean
isCacheValid
()
{
...
...
@@ -249,15 +195,4 @@ public class MetricsService {
return
valid
;
}
private
static
String
getQuery
(
String
type
,
String
name
,
String
defaultQuery
)
{
String
ret
=
configuration
!=
null
?
configuration
.
getString
(
METRIC_QUERY_PREFIX
+
type
+
"."
+
name
,
defaultQuery
)
:
defaultQuery
;
if
(
LOG
.
isDebugEnabled
())
{
LOG
.
debug
(
"query for {}.{}: {}"
,
type
,
name
,
ret
);
}
return
ret
;
}
}
\ No newline at end of file
repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
View file @
85280ddf
...
...
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
<p>
* http://www.apache.org/licenses/LICENSE-2.0
*
*
<p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
...
...
@@ -17,146 +17,156 @@
*/
package
org
.
apache
.
atlas
.
services
;
import
org.apache.atlas.RequestContext
;
import
org.apache.atlas.TestModules
;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.model.metrics.AtlasMetrics
;
import
org.apache.atlas.repository.graphdb.AtlasGraph
;
import
org.apache.atlas.repository.graph.AtlasGraphProvider
;
import
org.apache.atlas.repository.impexp.ImportService
;
import
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils
;
import
org.apache.atlas.repository.impexp.ZipSource
;
import
org.apache.atlas.runner.LocalSolrRunner
;
import
org.apache.atlas.store.AtlasTypeDefStore
;
import
org.apache.atlas.type.AtlasTypeRegistry
;
import
org.apache.atlas.util.AtlasGremlin3QueryProvider
;
import
org.apache.commons.configuration.Configuration
;
import
org.testng.SkipException
;
import
org.testng.annotations.AfterClass
;
import
org.testng.annotations.BeforeClass
;
import
org.testng.annotations.Guice
;
import
org.testng.annotations.Test
;
import
java.util.ArrayList
;
import
java.util.Arrays
;
import
javax.inject.Inject
;
import
java.io.FileInputStream
;
import
java.io.IOException
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
static
org
.
apache
.
atlas
.
graph
.
GraphSandboxUtil
.
useLocalSolr
;
import
static
org
.
mockito
.
Matchers
.
anyString
;
import
static
org
.
mockito
.
Matchers
.
eq
;
import
static
org
.
mockito
.
Mockito
.*;
import
static
org
.
testng
.
Assert
.
assertEquals
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
ZipFileResourceTestUtils
.
loadModelFromJson
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
ZipFileResourceTestUtils
.
runImportWithNoParameters
;
import
static
org
.
apache
.
atlas
.
services
.
MetricsService
.
ENTITY
;
import
static
org
.
apache
.
atlas
.
services
.
MetricsService
.
GENERAL
;
import
static
org
.
apache
.
atlas
.
services
.
MetricsService
.
METRIC_ENTITIES_PER_TAG
;
import
static
org
.
apache
.
atlas
.
services
.
MetricsService
.
METRIC_ENTITY_ACTIVE
;
import
static
org
.
apache
.
atlas
.
services
.
MetricsService
.
METRIC_ENTITY_COUNT
;
import
static
org
.
apache
.
atlas
.
services
.
MetricsService
.
METRIC_ENTITY_DELETED
;
import
static
org
.
apache
.
atlas
.
services
.
MetricsService
.
METRIC_TAG_COUNT
;
import
static
org
.
apache
.
atlas
.
services
.
MetricsService
.
METRIC_TYPE_COUNT
;
import
static
org
.
apache
.
atlas
.
services
.
MetricsService
.
METRIC_TYPE_UNUSED_COUNT
;
import
static
org
.
apache
.
atlas
.
services
.
MetricsService
.
TAG
;
import
static
org
.
testng
.
Assert
.
assertNotNull
;
import
static
org
.
testng
.
Assert
.
assertEquals
;
@Guice
(
modules
=
TestModules
.
TestOnlyModule
.
class
)
public
class
MetricsServiceTest
{
private
Configuration
mockConfig
=
mock
(
Configuration
.
class
);
private
Configuration
mockConfig1
=
mock
(
Configuration
.
class
);
private
AtlasTypeRegistry
mockTypeRegistry
=
mock
(
AtlasTypeRegistry
.
class
);
private
AtlasGraph
mockGraph
=
mock
(
AtlasGraph
.
class
);
private
AtlasGraph
mockGraph1
=
mock
(
AtlasGraph
.
class
);
public
static
final
String
IMPORT_FILE
=
"metrics-entities-data.zip"
;
@Inject
private
AtlasTypeDefStore
typeDefStore
;
@Inject
private
AtlasTypeRegistry
typeRegistry
;
@Inject
private
ImportService
importService
;
@Inject
private
MetricsService
metricsService
;
private
MetricsService
metricsService1
;
private
List
<
Map
>
activeEntityCountList
=
new
ArrayList
<>();
private
List
<
Map
>
deletedEntityCountList
=
new
ArrayList
<>();
private
final
Map
<
String
,
Long
>
activeEntityMetricsExpected
=
new
HashMap
<
String
,
Long
>()
{{
put
(
"hive_storagedesc"
,
5L
);
put
(
"__ExportImportAuditEntry"
,
1L
);
put
(
"AtlasServer"
,
1L
);
put
(
"hive_column_lineage"
,
8L
);
put
(
"hive_table"
,
5L
);
put
(
"hive_column"
,
13L
);
put
(
"hive_db"
,
2L
);
put
(
"hive_process"
,
3L
);
}};
private
final
Map
<
String
,
Long
>
deletedEntityMetricsExpected
=
new
HashMap
<
String
,
Long
>()
{{
put
(
"hive_storagedesc"
,
1L
);
put
(
"hive_table"
,
1L
);
put
(
"hive_column"
,
2L
);
put
(
"hive_db"
,
1L
);
}};
private
final
Map
<
String
,
Long
>
tagMetricsExpected
=
new
HashMap
<
String
,
Long
>()
{{
put
(
"PII"
,
1L
);
}};
@BeforeClass
public
void
init
()
throws
Exception
{
try
{
Map
<
String
,
Object
>
activeEntityCount
=
new
HashMap
<>();
activeEntityCount
.
put
(
"a"
,
1
);
activeEntityCount
.
put
(
"b"
,
2
);
activeEntityCount
.
put
(
"d"
,
5
);
activeEntityCount
.
put
(
"e"
,
10
);
activeEntityCount
.
put
(
"f"
,
15
);
activeEntityCountList
.
add
(
activeEntityCount
);
Map
<
String
,
Object
>
deletedEntityCount
=
new
HashMap
<>();
deletedEntityCount
.
put
(
"b"
,
5
);
deletedEntityCountList
.
add
(
deletedEntityCount
);
when
(
mockConfig
.
getInt
(
eq
(
MetricsService
.
METRIC_QUERY_GREMLIN_TYPES_BATCH_SIZE
),
anyInt
())).
thenReturn
(
25
);
when
(
mockConfig
.
getInt
(
eq
(
MetricsService
.
METRIC_QUERY_CACHE_TTL
),
anyInt
())).
thenReturn
(
900
);
when
(
mockConfig1
.
getInt
(
eq
(
MetricsService
.
METRIC_QUERY_GREMLIN_TYPES_BATCH_SIZE
),
anyInt
())).
thenReturn
(
2
);
when
(
mockTypeRegistry
.
getAllTypeNames
()).
thenReturn
(
Arrays
.
asList
(
"a"
,
"b"
,
"c"
,
"d"
,
"e"
,
"f"
));
when
(
mockTypeRegistry
.
getAllEntityDefNames
()).
thenReturn
(
Arrays
.
asList
(
"a"
,
"b"
,
"c"
));
when
(
mockTypeRegistry
.
getAllClassificationDefNames
()).
thenReturn
(
Arrays
.
asList
(
"d"
,
"e"
,
"f"
));
setupMockGraph
();
metricsService
=
new
MetricsService
(
mockConfig
,
mockGraph
,
mockTypeRegistry
,
new
AtlasGremlin3QueryProvider
());
metricsService1
=
new
MetricsService
(
mockConfig1
,
mockGraph1
,
mockTypeRegistry
,
new
AtlasGremlin3QueryProvider
());
public
void
setup
()
{
RequestContext
.
clear
();
loadModelFilesAndImportTestData
();
// sleep for sometime for import to complete
sleep
();
}
catch
(
Exception
e
)
{
throw
new
SkipException
(
"MetricsServicesTest: init failed!"
,
e
);
private
void
sleep
()
{
try
{
Thread
.
sleep
(
5000
);
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
();
}
}
@AfterClass
public
void
cleanup
()
throws
Exception
{
public
void
clear
()
throws
Exception
{
AtlasGraphProvider
.
cleanup
();
if
(
useLocalSolr
())
{
LocalSolrRunner
.
stop
();
}
}
private
void
setupMockGraph
()
throws
AtlasBaseException
{
if
(
mockGraph
==
null
)
mockGraph
=
mock
(
AtlasGraph
.
class
);
if
(
mockGraph1
==
null
)
mockGraph1
=
mock
(
AtlasGraph
.
class
);
when
(
mockGraph
.
executeGremlinScript
(
anyString
(),
eq
(
false
))).
thenAnswer
(
invocationOnMock
->
{
if
(((
String
)
invocationOnMock
.
getArguments
()[
0
]).
contains
(
"ACTIVE"
))
{
return
activeEntityCountList
;
}
else
{
return
deletedEntityCountList
;
@Test
public
void
testGetMetrics
()
{
AtlasMetrics
metrics
=
metricsService
.
getMetrics
(
true
);
assertNotNull
(
metrics
);
// general metrics
assertEquals
(
metrics
.
getNumericMetric
(
GENERAL
,
METRIC_ENTITY_COUNT
).
intValue
(),
43
);
assertEquals
(
metrics
.
getNumericMetric
(
GENERAL
,
METRIC_TAG_COUNT
).
intValue
(),
1
);
assertEquals
(
metrics
.
getNumericMetric
(
GENERAL
,
METRIC_TYPE_UNUSED_COUNT
).
intValue
(),
10
);
assertEquals
(
metrics
.
getNumericMetric
(
GENERAL
,
METRIC_TYPE_COUNT
).
intValue
(),
44
);
// tag metrics
Map
tagMetricsActual
=
(
Map
)
metrics
.
getMetric
(
TAG
,
METRIC_ENTITIES_PER_TAG
);
Map
activeEntityMetricsActual
=
(
Map
)
metrics
.
getMetric
(
ENTITY
,
METRIC_ENTITY_ACTIVE
);
Map
deletedEntityMetricsActual
=
(
Map
)
metrics
.
getMetric
(
ENTITY
,
METRIC_ENTITY_DELETED
);
assertEquals
(
tagMetricsActual
.
size
(),
1
);
assertEquals
(
activeEntityMetricsActual
.
size
(),
8
);
assertEquals
(
deletedEntityMetricsActual
.
size
(),
4
);
assertEquals
(
tagMetricsActual
,
tagMetricsExpected
);
assertEquals
(
activeEntityMetricsActual
,
activeEntityMetricsExpected
);
assertEquals
(
deletedEntityMetricsActual
,
deletedEntityMetricsExpected
);
}
});
when
(
mockGraph1
.
executeGremlinScript
(
anyString
(),
eq
(
false
))).
thenAnswer
(
invocationOnMock
->
{
if
(((
String
)
invocationOnMock
.
getArguments
()[
0
]).
contains
(
"ACTIVE"
))
{
return
activeEntityCountList
;
}
else
{
return
deletedEntityCountList
;
private
void
loadModelFilesAndImportTestData
()
{
try
{
loadModelFromJson
(
"0000-Area0/0010-base_model.json"
,
typeDefStore
,
typeRegistry
);
loadModelFromJson
(
"0000-Area0/patches/001-base_model_replication_attributes.json"
,
typeDefStore
,
typeRegistry
);
loadModelFromJson
(
"1000-Hadoop/1020-fs_model.json"
,
typeDefStore
,
typeRegistry
);
loadModelFromJson
(
"1000-Hadoop/1030-hive_model.json"
,
typeDefStore
,
typeRegistry
);
loadModelFromJson
(
"1000-Hadoop/patches/001-hive_column_add_position.json"
,
typeDefStore
,
typeRegistry
);
loadModelFromJson
(
"1000-Hadoop/patches/002-hive_column_table_add_options.json"
,
typeDefStore
,
typeRegistry
);
loadModelFromJson
(
"1000-Hadoop/patches/003-hive_column_update_table_remove_constraint.json"
,
typeDefStore
,
typeRegistry
);
runImportWithNoParameters
(
importService
,
getZipSource
(
IMPORT_FILE
));
}
catch
(
AtlasBaseException
|
IOException
e
)
{
throw
new
SkipException
(
"Model loading failed!"
);
}
});
}
@Test
public
void
testGetMetrics
()
throws
InterruptedException
,
AtlasBaseException
{
assertNotNull
(
metricsService
);
AtlasMetrics
metrics
=
metricsService
.
getMetrics
(
false
);
assertNotNull
(
metrics
);
Map
activeMetrics
=
(
Map
)
metrics
.
getMetric
(
"entity"
,
"entityActive"
);
assertNotNull
(
activeMetrics
);
assertEquals
(
activeMetrics
.
get
(
"a"
),
1
);
assertEquals
(
activeMetrics
.
get
(
"b"
),
2
);
Map
deletedMetrics
=
(
Map
)
metrics
.
getMetric
(
"entity"
,
"entityDeleted"
);
assertEquals
(
deletedMetrics
.
get
(
"b"
),
5
);
Number
unusedTypeCount
=
metrics
.
getNumericMetric
(
"general"
,
"typeUnusedCount"
);
assertEquals
(
unusedTypeCount
,
1
);
Number
cCount
=
metrics
.
getNumericMetric
(
"general"
,
"entityCount"
);
assertEquals
(
cCount
,
8
);
Number
aTags
=
(
Number
)
metrics
.
getMetric
(
"general"
,
"tagCount"
);
assertEquals
(
aTags
,
3
);
Map
taggedEntityMetric
=
(
Map
)
metrics
.
getMetric
(
"tag"
,
"tagEntities"
);
assertNotNull
(
taggedEntityMetric
);
assertEquals
(
taggedEntityMetric
.
get
(
"d"
),
5
);
assertEquals
(
taggedEntityMetric
.
get
(
"e"
),
10
);
assertEquals
(
taggedEntityMetric
.
get
(
"f"
),
15
);
// 2 calls for entity types and 2 calls for classification types
verify
(
mockGraph
,
times
(
4
)).
executeGremlinScript
(
anyString
(),
anyBoolean
());
// Test batched calls
metricsService1
.
getMetrics
(
false
);
// 3 classifications, 3 entity types & batch size = 2 and 2 calls per batch, total batches = 4, total calls = 8
// 2 for entity and 2 for classification
verify
(
mockGraph1
,
times
(
8
)).
executeGremlinScript
(
anyString
(),
anyBoolean
());
// Subsequent call within the cache timeout window
metricsService
.
getMetrics
(
false
);
verifyZeroInteractions
(
mockGraph
);
// Now test the cache refresh
Thread
.
sleep
(
6000
);
metricsService
.
getMetrics
(
true
);
verify
(
mockGraph
,
atLeastOnce
()).
executeGremlinScript
(
anyString
(),
anyBoolean
());
public
static
ZipSource
getZipSource
(
String
fileName
)
throws
IOException
,
AtlasBaseException
{
FileInputStream
fs
=
ZipFileResourceTestUtils
.
getFileInputStream
(
fileName
);
return
new
ZipSource
(
fs
);
}
}
\ No newline at end of file
repository/src/test/resources/metrics-entities-data.zip
0 → 100644
View file @
85280ddf
File added
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment