Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
atlas
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
dataplatform
atlas
Commits
fc17b38e
Commit
fc17b38e
authored
7 years ago
by
Sarath Subramanian
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
ATLAS-2580: Add unit tests for classification propagation and blocked classification propagation
parent
710a507d
master
No related merge requests found
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
595 additions
and
0 deletions
+595
-0
ClassificationPropagationTest.java
...ository/tagpropagation/ClassificationPropagationTest.java
+595
-0
No files found.
repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationTest.java
0 → 100644
View file @
fc17b38e
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
repository
.
tagpropagation
;
import
org.apache.atlas.RequestContextV1
;
import
org.apache.atlas.TestModules
;
import
org.apache.atlas.discovery.AtlasLineageService
;
import
org.apache.atlas.exception.AtlasBaseException
;
import
org.apache.atlas.model.instance.AtlasClassification
;
import
org.apache.atlas.model.instance.AtlasEntity
;
import
org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo
;
import
org.apache.atlas.model.instance.AtlasRelationship
;
import
org.apache.atlas.model.lineage.AtlasLineageInfo
;
import
org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation
;
import
org.apache.atlas.model.typedef.AtlasClassificationDef
;
import
org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef
;
import
org.apache.atlas.model.typedef.AtlasTypesDef
;
import
org.apache.atlas.repository.graph.AtlasGraphProvider
;
import
org.apache.atlas.repository.impexp.ImportService
;
import
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils
;
import
org.apache.atlas.repository.impexp.ZipSource
;
import
org.apache.atlas.repository.store.graph.AtlasEntityStore
;
import
org.apache.atlas.repository.store.graph.AtlasRelationshipStore
;
import
org.apache.atlas.runner.LocalSolrRunner
;
import
org.apache.atlas.store.AtlasTypeDefStore
;
import
org.apache.atlas.type.AtlasTypeRegistry
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.testng.SkipException
;
import
org.testng.annotations.AfterClass
;
import
org.testng.annotations.BeforeClass
;
import
org.testng.annotations.Guice
;
import
org.testng.annotations.Test
;
import
javax.inject.Inject
;
import
java.io.FileInputStream
;
import
java.io.IOException
;
import
java.util.Arrays
;
import
java.util.Collections
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Set
;
import
static
org
.
apache
.
atlas
.
graph
.
GraphSandboxUtil
.
useLocalSolr
;
import
static
org
.
apache
.
atlas
.
model
.
lineage
.
AtlasLineageInfo
.
LineageDirection
;
import
static
org
.
apache
.
atlas
.
model
.
typedef
.
AtlasRelationshipDef
.
PropagateTags
.
BOTH
;
import
static
org
.
apache
.
atlas
.
model
.
typedef
.
AtlasRelationshipDef
.
PropagateTags
.
NONE
;
import
static
org
.
apache
.
atlas
.
model
.
typedef
.
AtlasRelationshipDef
.
PropagateTags
.
ONE_TO_TWO
;
import
static
org
.
apache
.
atlas
.
model
.
typedef
.
AtlasRelationshipDef
.
PropagateTags
.
TWO_TO_ONE
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
ZipFileResourceTestUtils
.
loadModelFromJson
;
import
static
org
.
apache
.
atlas
.
repository
.
impexp
.
ZipFileResourceTestUtils
.
runImportWithNoParameters
;
import
static
org
.
testng
.
Assert
.
assertEquals
;
import
static
org
.
testng
.
Assert
.
assertNotNull
;
import
static
org
.
testng
.
Assert
.
assertTrue
;
import
static
org
.
testng
.
Assert
.
fail
;
@Guice
(
modules
=
TestModules
.
TestOnlyModule
.
class
)
public
class
ClassificationPropagationTest
{
public
static
final
String
HDFS_PATH_EMPLOYEES
=
"HDFS_PATH_EMPLOYEES"
;
public
static
final
String
EMPLOYEES1_TABLE
=
"EMPLOYEES1_TABLE"
;
public
static
final
String
EMPLOYEES2_TABLE
=
"EMPLOYEES2_TABLE"
;
public
static
final
String
EMPLOYEES_UNION_TABLE
=
"EMPLOYEES_UNION_TABLE"
;
public
static
final
String
EMPLOYEES1_PROCESS
=
"EMPLOYEES1_PROCESS"
;
public
static
final
String
EMPLOYEES2_PROCESS
=
"EMPLOYEES2_PROCESS"
;
public
static
final
String
EMPLOYEES_UNION_PROCESS
=
"EMPLOYEES_UNION_PROCESS"
;
@Inject
private
AtlasTypeDefStore
typeDefStore
;
@Inject
private
AtlasTypeRegistry
typeRegistry
;
@Inject
private
AtlasEntityStore
entityStore
;
@Inject
private
AtlasRelationshipStore
relationshipStore
;
@Inject
private
ImportService
importService
;
@Inject
private
AtlasLineageService
lineageService
;
private
Map
<
String
,
String
>
entitiesMap
;
private
AtlasLineageInfo
lineageInfo
;
@BeforeClass
public
void
setup
()
{
RequestContextV1
.
clear
();
loadModelFilesAndImportTestData
();
}
@AfterClass
public
void
clear
()
throws
Exception
{
AtlasGraphProvider
.
cleanup
();
if
(
useLocalSolr
())
{
LocalSolrRunner
.
stop
();
}
}
/** This test uses the lineage graph:
*
[Process1] ----> [Employees1]
/ \
/ \
[hdfs_employees] [Process3] ----> [ EmployeesUnion ]
\ /
\ /
[Process2] ----> [Employees2]
*/
@Test
public
void
addClassification_PropagateFalse
()
throws
AtlasBaseException
{
AtlasEntity
hdfs_employees
=
getEntity
(
HDFS_PATH_EMPLOYEES
);
AtlasClassification
tag2
=
new
AtlasClassification
(
"tag2"
);
tag2
.
setPropagate
(
false
);
tag2
.
setEntityGuid
(
hdfs_employees
.
getGuid
());
// add classification with propagate to 'false'
addClassification
(
hdfs_employees
,
tag2
);
List
<
String
>
propagatedToEntities
=
Arrays
.
asList
(
EMPLOYEES1_PROCESS
,
EMPLOYEES2_PROCESS
,
EMPLOYEES1_TABLE
,
EMPLOYEES2_TABLE
,
EMPLOYEES_UNION_PROCESS
,
EMPLOYEES_UNION_TABLE
);
assertClassificationNotExistInEntities
(
propagatedToEntities
,
tag2
);
}
@Test
(
dependsOnMethods
=
{
"addClassification_PropagateFalse"
})
public
void
updateClassification_PropagateFalseToTrue
()
throws
AtlasBaseException
{
AtlasEntity
hdfs_employees
=
getEntity
(
HDFS_PATH_EMPLOYEES
);
AtlasClassification
tag2
=
new
AtlasClassification
(
"tag2"
);
tag2
.
setEntityGuid
(
hdfs_employees
.
getGuid
());
//update tag2 propagate to 'true'
tag2
=
getClassification
(
hdfs_employees
,
tag2
);
tag2
.
setPropagate
(
true
);
updateClassifications
(
hdfs_employees
,
tag2
);
List
<
String
>
propagatedToEntities
=
Arrays
.
asList
(
EMPLOYEES1_PROCESS
,
EMPLOYEES2_PROCESS
,
EMPLOYEES1_TABLE
,
EMPLOYEES2_TABLE
,
EMPLOYEES_UNION_PROCESS
,
EMPLOYEES_UNION_TABLE
);
assertClassificationExistInEntities
(
propagatedToEntities
,
tag2
);
deleteClassification
(
hdfs_employees
,
tag2
);
}
@Test
(
dependsOnMethods
=
{
"updateClassification_PropagateFalseToTrue"
})
public
void
addClassification_PropagateTrue
()
throws
AtlasBaseException
{
AtlasEntity
hdfs_employees
=
getEntity
(
HDFS_PATH_EMPLOYEES
);
AtlasClassification
tag1
=
new
AtlasClassification
(
"tag1"
);
tag1
.
setPropagate
(
true
);
tag1
.
setEntityGuid
(
hdfs_employees
.
getGuid
());
// add classification with propagate flag to 'true'
addClassification
(
hdfs_employees
,
tag1
);
List
<
String
>
propagatedToEntities
=
Arrays
.
asList
(
EMPLOYEES1_PROCESS
,
EMPLOYEES2_PROCESS
,
EMPLOYEES1_TABLE
,
EMPLOYEES2_TABLE
,
EMPLOYEES_UNION_PROCESS
,
EMPLOYEES_UNION_TABLE
);
assertClassificationExistInEntities
(
propagatedToEntities
,
tag1
);
}
@Test
(
dependsOnMethods
=
{
"addClassification_PropagateTrue"
})
public
void
updateClassification_PropagateTrueToFalse
()
throws
AtlasBaseException
{
AtlasEntity
hdfs_employees
=
getEntity
(
HDFS_PATH_EMPLOYEES
);
AtlasClassification
tag1
=
new
AtlasClassification
(
"tag1"
);
tag1
.
setEntityGuid
(
hdfs_employees
.
getGuid
());
List
<
String
>
propagatedToEntities
=
Arrays
.
asList
(
EMPLOYEES1_PROCESS
,
EMPLOYEES2_PROCESS
,
EMPLOYEES1_TABLE
,
EMPLOYEES2_TABLE
,
EMPLOYEES_UNION_PROCESS
,
EMPLOYEES_UNION_TABLE
);
// update propagate flag to 'false'
tag1
=
getClassification
(
hdfs_employees
,
tag1
);
tag1
.
setPropagate
(
false
);
updateClassifications
(
hdfs_employees
,
tag1
);
assertClassificationNotExistInEntities
(
propagatedToEntities
,
tag1
);
}
@Test
(
dependsOnMethods
=
{
"updateClassification_PropagateTrueToFalse"
})
public
void
deleteClassification_PropagateTrue
()
throws
AtlasBaseException
{
AtlasEntity
hdfs_employees
=
getEntity
(
HDFS_PATH_EMPLOYEES
);
AtlasClassification
tag1
=
new
AtlasClassification
(
"tag1"
);
tag1
.
setPropagate
(
true
);
tag1
.
setEntityGuid
(
hdfs_employees
.
getGuid
());
deleteClassification
(
hdfs_employees
,
tag1
);
List
<
String
>
propagatedToEntities
=
Arrays
.
asList
(
EMPLOYEES1_PROCESS
,
EMPLOYEES2_PROCESS
,
EMPLOYEES1_TABLE
,
EMPLOYEES2_TABLE
,
EMPLOYEES_UNION_PROCESS
,
EMPLOYEES_UNION_TABLE
);
assertClassificationNotExistInEntities
(
propagatedToEntities
,
tag1
);
}
@Test
(
dependsOnMethods
=
{
"deleteClassification_PropagateTrue"
})
public
void
propagateSameTagFromDifferentEntities
()
throws
AtlasBaseException
{
// add tag1 to hdfs_employees
AtlasEntity
hdfs_employees
=
getEntity
(
HDFS_PATH_EMPLOYEES
);
AtlasClassification
tag1
=
new
AtlasClassification
(
"tag1"
);
tag1
.
setPropagate
(
true
);
tag1
.
setEntityGuid
(
hdfs_employees
.
getGuid
());
addClassification
(
hdfs_employees
,
tag1
);
// add tag1 to employees2
AtlasEntity
employees2_table
=
getEntity
(
EMPLOYEES2_TABLE
);
tag1
=
new
AtlasClassification
(
"tag1"
);
tag1
.
setPropagate
(
true
);
tag1
.
setEntityGuid
(
employees2_table
.
getGuid
());
addClassification
(
employees2_table
,
tag1
);
// employees_union table should have two tags 'tag1' propagated from hdfs_employees and employees2 table
AtlasEntity
employees_union_table
=
getEntity
(
EMPLOYEES_UNION_TABLE
);
List
<
AtlasClassification
>
classifications
=
employees_union_table
.
getClassifications
();
assertNotNull
(
classifications
);
assertEquals
(
classifications
.
size
(),
2
);
// assert same tag propagated from hdfs_employees and employees2
assertEquals
(
classifications
.
get
(
0
).
getTypeName
(),
tag1
.
getTypeName
());
assertEquals
(
classifications
.
get
(
1
).
getTypeName
(),
tag1
.
getTypeName
());
if
(
classifications
.
get
(
0
).
getEntityGuid
().
equals
(
hdfs_employees
.
getGuid
()))
{
assertEquals
(
classifications
.
get
(
1
).
getEntityGuid
(),
employees2_table
.
getGuid
());
}
if
(
classifications
.
get
(
0
).
getEntityGuid
().
equals
(
employees2_table
.
getGuid
()))
{
assertEquals
(
classifications
.
get
(
1
).
getEntityGuid
(),
hdfs_employees
.
getGuid
());
}
// cleanup
deleteClassification
(
hdfs_employees
,
tag1
);
deleteClassification
(
employees2_table
,
tag1
);
}
@Test
(
dependsOnMethods
=
{
"propagateSameTagFromDifferentEntities"
})
public
void
updatePropagateTagsValue
()
throws
AtlasBaseException
{
AtlasEntity
hdfs_employees
=
getEntity
(
HDFS_PATH_EMPLOYEES
);
AtlasEntity
employees2_table
=
getEntity
(
EMPLOYEES2_TABLE
);
AtlasEntity
employees_union_process
=
getEntity
(
EMPLOYEES_UNION_PROCESS
);
AtlasEntity
employees_union_table
=
getEntity
(
EMPLOYEES_UNION_TABLE
);
AtlasClassification
tag1
=
new
AtlasClassification
(
"tag1"
);
tag1
.
setPropagate
(
true
);
tag1
.
setEntityGuid
(
hdfs_employees
.
getGuid
());
AtlasClassification
tag2
=
new
AtlasClassification
(
"tag2"
);
tag1
.
setPropagate
(
true
);
tag2
.
setEntityGuid
(
employees2_table
.
getGuid
());
AtlasClassification
tag3
=
new
AtlasClassification
(
"tag3"
);
tag1
.
setPropagate
(
true
);
tag3
.
setEntityGuid
(
employees_union_process
.
getGuid
());
AtlasClassification
tag4
=
new
AtlasClassification
(
"tag4"
);
tag1
.
setPropagate
(
true
);
tag4
.
setEntityGuid
(
employees_union_table
.
getGuid
());
// add tag1 to hdfs_employees, tag2 to employees2, tag3 to process3, tag4 to employees_union
addClassification
(
hdfs_employees
,
tag1
);
addClassification
(
employees2_table
,
tag2
);
addClassification
(
employees_union_process
,
tag3
);
addClassification
(
employees_union_table
,
tag4
);
//validate if tag1, tag2, tag3 propagated to employees_union table
assertClassificationExistInEntity
(
EMPLOYEES_UNION_TABLE
,
tag1
);
assertClassificationExistInEntity
(
EMPLOYEES_UNION_TABLE
,
tag2
);
assertClassificationExistInEntity
(
EMPLOYEES_UNION_TABLE
,
tag3
);
assertClassificationExistInEntity
(
EMPLOYEES_UNION_TABLE
,
tag4
);
// change propagation between employees2 -> process3 from TWO_TO_ONE to NONE
AtlasRelationship
employees2_process_relationship
=
getRelationship
(
EMPLOYEES2_TABLE
,
EMPLOYEES_UNION_PROCESS
);
assertEquals
(
employees2_process_relationship
.
getPropagateTags
(),
TWO_TO_ONE
);
employees2_process_relationship
.
setPropagateTags
(
NONE
);
relationshipStore
.
update
(
employees2_process_relationship
);
// validate tag1 propagated to employees_union through other path
assertClassificationExistInEntity
(
EMPLOYEES_UNION_TABLE
,
tag1
);
// validate tag2 is no more propagated to employees_union
assertClassificationNotExistInEntity
(
EMPLOYEES_UNION_TABLE
,
tag2
);
// change propagation between employees2 -> process3 from NONE to TWO_TO_ONE
employees2_process_relationship
=
getRelationship
(
EMPLOYEES2_TABLE
,
EMPLOYEES_UNION_PROCESS
);
assertEquals
(
employees2_process_relationship
.
getPropagateTags
(),
NONE
);
employees2_process_relationship
.
setPropagateTags
(
TWO_TO_ONE
);
relationshipStore
.
update
(
employees2_process_relationship
);
// validate tag2 is propagated to employees_union
assertClassificationExistInEntity
(
EMPLOYEES_UNION_TABLE
,
tag2
);
//update propagation to BOTH for edge process3 --> employee_union
AtlasRelationship
process3_employee_union_relationship
=
getRelationship
(
EMPLOYEES_UNION_PROCESS
,
EMPLOYEES_UNION_TABLE
);
assertEquals
(
process3_employee_union_relationship
.
getPropagateTags
(),
ONE_TO_TWO
);
process3_employee_union_relationship
.
setPropagateTags
(
BOTH
);
relationshipStore
.
update
(
process3_employee_union_relationship
);
// process3 should get 'tag4' from employee_union and employee_union should get tag3 from process3 (BOTH)
assertClassificationExistInEntity
(
EMPLOYEES_UNION_PROCESS
,
tag4
);
assertClassificationExistInEntity
(
EMPLOYEES_UNION_TABLE
,
tag3
);
//update propagation to ONE_TO_TWO for edge process3 --> employee_union
process3_employee_union_relationship
.
setPropagateTags
(
ONE_TO_TWO
);
relationshipStore
.
update
(
process3_employee_union_relationship
);
assertClassificationNotExistInEntity
(
EMPLOYEES_UNION_PROCESS
,
tag4
);
//cleanup
deleteClassification
(
hdfs_employees
,
tag1
);
deleteClassification
(
employees2_table
,
tag2
);
deleteClassification
(
employees_union_process
,
tag3
);
deleteClassification
(
employees_union_table
,
tag4
);
}
@Test
(
dependsOnMethods
=
{
"updatePropagateTagsValue"
})
public
void
addBlockedPropagatedClassifications
()
throws
AtlasBaseException
{
AtlasEntity
hdfs_path
=
getEntity
(
HDFS_PATH_EMPLOYEES
);
AtlasEntity
employees1
=
getEntity
(
EMPLOYEES1_TABLE
);
AtlasEntity
employees2
=
getEntity
(
EMPLOYEES2_TABLE
);
AtlasEntity
employees_union
=
getEntity
(
EMPLOYEES_UNION_TABLE
);
AtlasClassification
PII_tag1
=
new
AtlasClassification
(
"PII"
);
PII_tag1
.
setPropagate
(
true
);
PII_tag1
.
setAttribute
(
"type"
,
"from hdfs_path entity"
);
PII_tag1
.
setAttribute
(
"valid"
,
true
);
AtlasClassification
PII_tag2
=
new
AtlasClassification
(
"PII"
);
PII_tag2
.
setPropagate
(
true
);
PII_tag2
.
setAttribute
(
"type"
,
"from employees1 entity"
);
PII_tag2
.
setAttribute
(
"valid"
,
true
);
AtlasClassification
PII_tag3
=
new
AtlasClassification
(
"PII"
);
PII_tag3
.
setPropagate
(
true
);
PII_tag3
.
setAttribute
(
"type"
,
"from employees2 entity"
);
PII_tag3
.
setAttribute
(
"valid"
,
true
);
AtlasClassification
PII_tag4
=
new
AtlasClassification
(
"PII"
);
PII_tag4
.
setPropagate
(
true
);
PII_tag4
.
setAttribute
(
"type"
,
"from employees_union entity"
);
PII_tag4
.
setAttribute
(
"valid"
,
true
);
// add PII to hdfs_path, employees1, employees2 and employee_union
addClassification
(
hdfs_path
,
PII_tag1
);
addClassification
(
employees1
,
PII_tag2
);
addClassification
(
employees2
,
PII_tag3
);
// check 4 PII tags exists in employee_union table
assertClassificationExistInEntity
(
EMPLOYEES_UNION_TABLE
,
PII_tag1
);
assertClassificationExistInEntity
(
EMPLOYEES_UNION_TABLE
,
PII_tag2
);
assertClassificationExistInEntity
(
EMPLOYEES_UNION_TABLE
,
PII_tag3
);
AtlasRelationship
process3_employee_union_relationship
=
getRelationship
(
EMPLOYEES_UNION_PROCESS
,
EMPLOYEES_UNION_TABLE
);
List
<
AtlasClassification
>
propagatedClassifications
=
process3_employee_union_relationship
.
getPropagatedClassifications
();
List
<
AtlasClassification
>
blockedClassifications
=
process3_employee_union_relationship
.
getBlockedPropagatedClassifications
();
assertNotNull
(
propagatedClassifications
);
assertTrue
(
propagatedClassifications
.
contains
(
PII_tag1
));
assertTrue
(
propagatedClassifications
.
contains
(
PII_tag2
));
assertTrue
(
propagatedClassifications
.
contains
(
PII_tag3
));
assertTrue
(
blockedClassifications
.
isEmpty
());
// block PII tag propagating from employees1 and employees2
process3_employee_union_relationship
.
setBlockedPropagatedClassifications
(
Arrays
.
asList
(
PII_tag2
,
PII_tag3
));
relationshipStore
.
update
(
process3_employee_union_relationship
);
process3_employee_union_relationship
=
getRelationship
(
EMPLOYEES_UNION_PROCESS
,
EMPLOYEES_UNION_TABLE
);
propagatedClassifications
=
process3_employee_union_relationship
.
getPropagatedClassifications
();
blockedClassifications
=
process3_employee_union_relationship
.
getBlockedPropagatedClassifications
();
assertTrue
(
propagatedClassifications
.
contains
(
PII_tag1
));
assertTrue
(!
blockedClassifications
.
isEmpty
());
assertTrue
(
blockedClassifications
.
contains
(
PII_tag2
));
assertTrue
(
blockedClassifications
.
contains
(
PII_tag3
));
assertClassificationNotExistInEntity
(
EMPLOYEES_UNION_TABLE
,
PII_tag2
);
assertClassificationNotExistInEntity
(
EMPLOYEES_UNION_TABLE
,
PII_tag3
);
// assert only PII from hdfs_path is propagated to employees_union, PII from employees1 and employees2 is blocked.
assertEquals
(
getEntity
(
EMPLOYEES_UNION_TABLE
).
getClassifications
().
size
(),
1
);
assertClassificationExistInEntity
(
EMPLOYEES_UNION_TABLE
,
PII_tag1
);
}
@Test
(
dependsOnMethods
=
{
"addBlockedPropagatedClassifications"
})
public
void
removeBlockedPropagatedClassifications
()
throws
AtlasBaseException
{
AtlasEntity
hdfs_path
=
getEntity
(
HDFS_PATH_EMPLOYEES
);
AtlasEntity
employees1
=
getEntity
(
EMPLOYEES1_TABLE
);
AtlasEntity
employees2
=
getEntity
(
EMPLOYEES2_TABLE
);
AtlasClassification
PII_tag1
=
new
AtlasClassification
(
"PII"
);
PII_tag1
.
setPropagate
(
true
);
PII_tag1
.
setEntityGuid
(
hdfs_path
.
getGuid
());
PII_tag1
.
setAttribute
(
"type"
,
"from hdfs_path entity"
);
PII_tag1
.
setAttribute
(
"valid"
,
true
);
AtlasClassification
PII_tag2
=
new
AtlasClassification
(
"PII"
);
PII_tag2
.
setPropagate
(
true
);
PII_tag2
.
setEntityGuid
(
employees1
.
getGuid
());
PII_tag2
.
setAttribute
(
"type"
,
"from employees1 entity"
);
PII_tag2
.
setAttribute
(
"valid"
,
true
);
AtlasClassification
PII_tag3
=
new
AtlasClassification
(
"PII"
);
PII_tag3
.
setPropagate
(
true
);
PII_tag3
.
setEntityGuid
(
employees2
.
getGuid
());
PII_tag3
.
setAttribute
(
"type"
,
"from employees2 entity"
);
PII_tag3
.
setAttribute
(
"valid"
,
true
);
AtlasRelationship
process3_employee_union_relationship
=
getRelationship
(
EMPLOYEES_UNION_PROCESS
,
EMPLOYEES_UNION_TABLE
);
// remove blocked propagated classification entry for PII (from employees2) - allow PII from employees2 to propagate to employee_union
process3_employee_union_relationship
.
setBlockedPropagatedClassifications
(
Arrays
.
asList
(
PII_tag3
));
relationshipStore
.
update
(
process3_employee_union_relationship
);
process3_employee_union_relationship
=
getRelationship
(
EMPLOYEES_UNION_PROCESS
,
EMPLOYEES_UNION_TABLE
);
List
<
AtlasClassification
>
propagatedClassifications
=
process3_employee_union_relationship
.
getPropagatedClassifications
();
List
<
AtlasClassification
>
blockedClassifications
=
process3_employee_union_relationship
.
getBlockedPropagatedClassifications
();
assertClassificationExistInList
(
propagatedClassifications
,
PII_tag1
);
assertClassificationExistInList
(
propagatedClassifications
,
PII_tag2
);
assertClassificationExistInList
(
blockedClassifications
,
PII_tag3
);
// remove all blocked propagated classification entry
process3_employee_union_relationship
.
setBlockedPropagatedClassifications
(
Collections
.
emptyList
());
relationshipStore
.
update
(
process3_employee_union_relationship
);
process3_employee_union_relationship
=
getRelationship
(
EMPLOYEES_UNION_PROCESS
,
EMPLOYEES_UNION_TABLE
);
propagatedClassifications
=
process3_employee_union_relationship
.
getPropagatedClassifications
();
blockedClassifications
=
process3_employee_union_relationship
.
getBlockedPropagatedClassifications
();
assertClassificationExistInList
(
propagatedClassifications
,
PII_tag1
);
assertClassificationExistInList
(
propagatedClassifications
,
PII_tag2
);
assertClassificationExistInList
(
propagatedClassifications
,
PII_tag3
);
assertTrue
(
blockedClassifications
.
isEmpty
());
}
private
void
assertClassificationExistInList
(
List
<
AtlasClassification
>
classifications
,
AtlasClassification
classification
)
{
String
classificationName
=
classification
.
getTypeName
();
String
entityGuid
=
classification
.
getEntityGuid
();
boolean
foundClassification
=
false
;
for
(
AtlasClassification
c
:
classifications
)
{
if
(
c
.
getTypeName
().
equals
(
classificationName
)
&&
c
.
getEntityGuid
().
equals
(
entityGuid
))
{
foundClassification
=
true
;
}
}
if
(!
foundClassification
)
{
fail
(
"Propagated classification is not present in classifications list!"
);
}
}
private
void
assertClassificationExistInEntities
(
List
<
String
>
entityNames
,
AtlasClassification
classification
)
throws
AtlasBaseException
{
for
(
String
entityName
:
entityNames
)
{
assertClassificationExistInEntity
(
entityName
,
classification
);
}
}
private
void
assertClassificationExistInEntity
(
String
entityName
,
AtlasClassification
classification
)
throws
AtlasBaseException
{
List
<
AtlasClassification
>
classifications
=
getEntity
(
entityName
).
getClassifications
();
String
classificationName
=
classification
.
getTypeName
();
String
entityGuid
=
classification
.
getEntityGuid
();
if
(
CollectionUtils
.
isNotEmpty
(
classifications
))
{
boolean
foundClassification
=
false
;
for
(
AtlasClassification
c
:
classifications
)
{
if
(
c
.
getTypeName
().
equals
(
classificationName
)
&&
c
.
getEntityGuid
().
equals
(
entityGuid
))
{
foundClassification
=
true
;
}
}
if
(!
foundClassification
)
{
fail
(
"Propagated classification is not present in entity!"
);
}
}
}
private
void
assertClassificationNotExistInEntities
(
List
<
String
>
entityNames
,
AtlasClassification
classification
)
throws
AtlasBaseException
{
for
(
String
entityName
:
entityNames
)
{
assertClassificationNotExistInEntity
(
entityName
,
classification
);
}
}
private
void
assertClassificationNotExistInEntity
(
String
entityName
,
AtlasClassification
classification
)
throws
AtlasBaseException
{
List
<
AtlasClassification
>
classifications
=
getEntity
(
entityName
).
getClassifications
();
String
classificationName
=
classification
.
getTypeName
();
String
entityGuid
=
classification
.
getEntityGuid
();
if
(
CollectionUtils
.
isNotEmpty
(
classifications
))
{
for
(
AtlasClassification
c
:
classifications
)
{
if
(
c
.
getTypeName
().
equals
(
classificationName
)
&&
c
.
getEntityGuid
().
equals
(
entityGuid
))
{
fail
(
"Propagated classification should not be present in entity!"
);
}
}
}
}
private
void
loadModelFilesAndImportTestData
()
{
try
{
loadModelFromJson
(
"0000-Area0/0010-base_model.json"
,
typeDefStore
,
typeRegistry
);
loadModelFromJson
(
"1000-Hadoop/1020-fs_model.json"
,
typeDefStore
,
typeRegistry
);
loadModelFromJson
(
"1000-Hadoop/1030-hive_model.json"
,
typeDefStore
,
typeRegistry
);
loadModelFromJson
(
"1000-Hadoop/patches/001-hive_column_add_position.json"
,
typeDefStore
,
typeRegistry
);
loadModelFromJson
(
"1000-Hadoop/patches/002-hive_column_table_add_options.json"
,
typeDefStore
,
typeRegistry
);
loadModelFromJson
(
"1000-Hadoop/patches/003-hive_column_update_table_remove_constraint.json"
,
typeDefStore
,
typeRegistry
);
loadSampleClassificationDefs
();
runImportWithNoParameters
(
importService
,
getZipSource
(
"tag-propagation-data.zip"
));
initializeEntitiesMap
();
}
catch
(
AtlasBaseException
|
IOException
e
)
{
throw
new
SkipException
(
"Model loading failed!"
);
}
}
public
static
ZipSource
getZipSource
(
String
fileName
)
throws
IOException
{
FileInputStream
fs
=
ZipFileResourceTestUtils
.
getFileInputStream
(
fileName
);
return
new
ZipSource
(
fs
);
}
private
void
loadSampleClassificationDefs
()
throws
AtlasBaseException
{
AtlasClassificationDef
tag1
=
new
AtlasClassificationDef
(
"tag1"
);
AtlasClassificationDef
tag2
=
new
AtlasClassificationDef
(
"tag2"
);
AtlasClassificationDef
tag3
=
new
AtlasClassificationDef
(
"tag3"
);
AtlasClassificationDef
tag4
=
new
AtlasClassificationDef
(
"tag4"
);
AtlasClassificationDef
PII
=
new
AtlasClassificationDef
(
"PII"
);
PII
.
addAttribute
(
new
AtlasAttributeDef
(
"type"
,
"string"
));
PII
.
addAttribute
(
new
AtlasAttributeDef
(
"valid"
,
"boolean"
));
typeDefStore
.
createTypesDef
(
new
AtlasTypesDef
(
Collections
.
emptyList
(),
Collections
.
emptyList
(),
Arrays
.
asList
(
tag1
,
tag2
,
tag3
,
tag4
,
PII
),
Collections
.
emptyList
(),
Collections
.
emptyList
()));
}
private
void
initializeEntitiesMap
()
throws
AtlasBaseException
{
entitiesMap
=
new
HashMap
<>();
entitiesMap
.
put
(
HDFS_PATH_EMPLOYEES
,
"a3955120-ac17-426f-a4af-972ec8690e5f"
);
entitiesMap
.
put
(
EMPLOYEES1_TABLE
,
"cdf0040e-739e-4590-a137-964d10e73573"
);
entitiesMap
.
put
(
EMPLOYEES2_TABLE
,
"0a3e66b6-472c-48b3-8453-abdd24f9494f"
);
entitiesMap
.
put
(
EMPLOYEES_UNION_TABLE
,
"1ceac963-1a2b-476a-a269-10396187d406"
);
entitiesMap
.
put
(
EMPLOYEES1_PROCESS
,
"26dae763-85b7-40af-8516-71056d91d2de"
);
entitiesMap
.
put
(
EMPLOYEES2_PROCESS
,
"c0201260-dbeb-45f4-930d-5129eab31dc9"
);
entitiesMap
.
put
(
EMPLOYEES_UNION_PROCESS
,
"470a2d1e-b1fd-47de-8f2d-8dfd0a0275a7"
);
lineageInfo
=
lineageService
.
getAtlasLineageInfo
(
entitiesMap
.
get
(
HDFS_PATH_EMPLOYEES
),
LineageDirection
.
BOTH
,
3
);
}
private
AtlasEntity
getEntity
(
String
entityName
)
throws
AtlasBaseException
{
String
entityGuid
=
entitiesMap
.
get
(
entityName
);
AtlasEntityWithExtInfo
entityWithExtInfo
=
entityStore
.
getById
(
entityGuid
);
return
entityWithExtInfo
.
getEntity
();
}
private
AtlasClassification
getClassification
(
AtlasEntity
hdfs_employees
,
AtlasClassification
tag2
)
throws
AtlasBaseException
{
return
entityStore
.
getClassification
(
hdfs_employees
.
getGuid
(),
tag2
.
getTypeName
());
}
private
void
addClassification
(
AtlasEntity
entity
,
AtlasClassification
classification
)
throws
AtlasBaseException
{
addClassifications
(
entity
,
Collections
.
singletonList
(
classification
));
}
private
void
addClassifications
(
AtlasEntity
entity
,
List
<
AtlasClassification
>
classifications
)
throws
AtlasBaseException
{
entityStore
.
addClassifications
(
entity
.
getGuid
(),
classifications
);
}
private
void
updateClassifications
(
AtlasEntity
entity
,
AtlasClassification
classification
)
throws
AtlasBaseException
{
updateClassifications
(
entity
,
Collections
.
singletonList
(
classification
));
}
private
void
updateClassifications
(
AtlasEntity
entity
,
List
<
AtlasClassification
>
classifications
)
throws
AtlasBaseException
{
entityStore
.
updateClassifications
(
entity
.
getGuid
(),
classifications
);
}
private
void
deleteClassification
(
AtlasEntity
entity
,
AtlasClassification
classification
)
throws
AtlasBaseException
{
deleteClassifications
(
entity
,
Collections
.
singletonList
(
classification
.
getTypeName
()));
}
private
void
deleteClassifications
(
AtlasEntity
entity
,
List
<
String
>
classificationNames
)
throws
AtlasBaseException
{
entityStore
.
deleteClassifications
(
entity
.
getGuid
(),
classificationNames
);
}
private
AtlasRelationship
getRelationship
(
String
fromEntityName
,
String
toEntityName
)
throws
AtlasBaseException
{
String
fromEntityId
=
entitiesMap
.
get
(
fromEntityName
);
String
toEntityId
=
entitiesMap
.
get
(
toEntityName
);
Set
<
LineageRelation
>
relations
=
lineageInfo
.
getRelations
();
String
relationshipGuid
=
null
;
for
(
AtlasLineageInfo
.
LineageRelation
relation
:
relations
)
{
if
(
relation
.
getFromEntityId
().
equals
(
fromEntityId
)
&&
relation
.
getToEntityId
().
equals
(
toEntityId
))
{
relationshipGuid
=
relation
.
getRelationshipId
();
}
}
return
relationshipStore
.
getById
(
relationshipGuid
);
}
}
\ No newline at end of file
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment