Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
atlas
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
dataplatform
atlas
Commits
01ea65c2
Commit
01ea65c2
authored
Mar 25, 2019
by
Sarath Subramanian
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
ATLAS-3095: Update QuickStartV2 to use relationships
parent
3ba4a3fe
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
364 additions
and
216 deletions
+364
-216
AtlasRelationshipDef.java
.../org/apache/atlas/model/typedef/AtlasRelationshipDef.java
+1
-2
AtlasTypeUtil.java
intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
+35
-0
AtlasGraphUtilsV2.java
...he/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
+1
-1
QuickStartV2.java
...src/main/java/org/apache/atlas/examples/QuickStartV2.java
+267
-176
QuickStartV2IT.java
...c/test/java/org/apache/atlas/examples/QuickStartV2IT.java
+60
-37
No files found.
intg/src/main/java/org/apache/atlas/model/typedef/AtlasRelationshipDef.java
View file @
01ea65c2
...
...
@@ -152,13 +152,12 @@ public class AtlasRelationshipDef extends AtlasStructDef implements java.io.Seri
*
* The ends are defined as 1 and 2 to avoid implying a direction. So we do not use to and from.
*
* @throws AtlasBaseException
*/
public
AtlasRelationshipDef
(
String
name
,
String
description
,
String
typeVersion
,
RelationshipCategory
relationshipCategory
,
PropagateTags
propagatetags
,
AtlasRelationshipEndDef
endDef1
,
AtlasRelationshipEndDef
endDef2
)
throws
AtlasBaseException
{
AtlasRelationshipEndDef
endDef2
)
{
this
(
name
,
description
,
typeVersion
,
relationshipCategory
,
propagatetags
,
endDef1
,
endDef2
,
new
ArrayList
<
AtlasAttributeDef
>());
}
...
...
intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
View file @
01ea65c2
...
...
@@ -21,6 +21,7 @@ import org.apache.atlas.model.instance.AtlasClassification;
import
org.apache.atlas.model.instance.AtlasEntity
;
import
org.apache.atlas.model.instance.AtlasEntityHeader
;
import
org.apache.atlas.model.instance.AtlasObjectId
;
import
org.apache.atlas.model.instance.AtlasRelatedObjectId
;
import
org.apache.atlas.model.instance.AtlasStruct
;
import
org.apache.atlas.model.typedef.AtlasBaseTypeDef
;
import
org.apache.atlas.model.typedef.AtlasClassificationDef
;
...
...
@@ -276,6 +277,10 @@ public class AtlasTypeUtil {
return
new
AtlasEntityDef
(
name
,
description
,
version
,
Arrays
.
asList
(
attrDefs
),
superTypes
);
}
public
static
AtlasEntityDef
createClassTypeDef
(
String
name
,
String
description
,
String
version
,
Set
<
String
>
superTypes
,
Map
<
String
,
String
>
options
,
AtlasAttributeDef
...
attrDefs
)
{
return
new
AtlasEntityDef
(
name
,
description
,
version
,
Arrays
.
asList
(
attrDefs
),
superTypes
,
options
);
}
public
static
AtlasRelationshipDef
createRelationshipTypeDef
(
String
name
,
String
description
,
String
version
,
...
...
@@ -288,6 +293,10 @@ public class AtlasTypeUtil {
endDef1
,
endDef2
,
Arrays
.
asList
(
attrDefs
));
}
public
static
AtlasRelationshipEndDef
createRelationshipEndDef
(
String
typeName
,
String
name
,
Cardinality
cardinality
,
boolean
isContainer
)
{
return
new
AtlasRelationshipEndDef
(
typeName
,
name
,
cardinality
,
isContainer
);
}
public
static
AtlasTypesDef
getTypesDef
(
List
<
AtlasEnumDef
>
enums
,
List
<
AtlasStructDef
>
structs
,
List
<
AtlasClassificationDef
>
traits
,
...
...
@@ -295,6 +304,14 @@ public class AtlasTypeUtil {
return
new
AtlasTypesDef
(
enums
,
structs
,
traits
,
classes
);
}
public
static
AtlasTypesDef
getTypesDef
(
List
<
AtlasEnumDef
>
enums
,
List
<
AtlasStructDef
>
structs
,
List
<
AtlasClassificationDef
>
traits
,
List
<
AtlasEntityDef
>
classes
,
List
<
AtlasRelationshipDef
>
relations
)
{
return
new
AtlasTypesDef
(
enums
,
structs
,
traits
,
classes
,
relations
);
}
public
static
List
<
AtlasTypeDefHeader
>
toTypeDefHeader
(
AtlasTypesDef
typesDef
)
{
List
<
AtlasTypeDefHeader
>
headerList
=
new
LinkedList
<>();
if
(
CollectionUtils
.
isNotEmpty
(
typesDef
.
getEnumDefs
()))
{
...
...
@@ -360,6 +377,20 @@ public class AtlasTypeUtil {
return
ret
;
}
public
static
Collection
<
AtlasRelatedObjectId
>
toAtlasRelatedObjectIds
(
Collection
<
AtlasEntity
>
entities
)
{
List
<
AtlasRelatedObjectId
>
ret
=
new
ArrayList
<>();
if
(
CollectionUtils
.
isNotEmpty
(
entities
))
{
for
(
AtlasEntity
entity
:
entities
)
{
if
(
entity
!=
null
)
{
ret
.
add
(
toAtlasRelatedObjectId
(
entity
));
}
}
}
return
ret
;
}
public
static
Map
toStructAttributes
(
Map
map
)
{
if
(
map
!=
null
&&
map
.
containsKey
(
"typeName"
)
&&
map
.
containsKey
(
"attributes"
)
&&
map
.
get
(
"attributes"
)
instanceof
Map
)
{
return
(
Map
)
map
.
get
(
"attributes"
);
...
...
@@ -378,6 +409,10 @@ public class AtlasTypeUtil {
return
ret
;
}
public
static
AtlasRelatedObjectId
toAtlasRelatedObjectId
(
AtlasEntity
entity
)
{
return
new
AtlasRelatedObjectId
(
getAtlasObjectId
(
entity
));
}
public
static
AtlasObjectId
getAtlasObjectId
(
AtlasEntity
entity
)
{
return
new
AtlasObjectId
(
entity
.
getGuid
(),
entity
.
getTypeName
());
}
...
...
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
View file @
01ea65c2
...
...
@@ -95,7 +95,7 @@ public class AtlasGraphUtilsV2 {
public
static
final
String
VERTEX_TYPE
=
"typeSystem"
;
private
static
boolean
USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES
=
false
;
private
static
boolean
USE_UNIQUE_INDEX_PROPERTY_TO_FIND_ENTITY
=
fals
e
;
private
static
boolean
USE_UNIQUE_INDEX_PROPERTY_TO_FIND_ENTITY
=
tru
e
;
private
static
String
INDEX_SEARCH_PREFIX
;
static
{
...
...
webapp/src/main/java/org/apache/atlas/examples/QuickStartV2.java
View file @
01ea65c2
...
...
@@ -21,7 +21,6 @@ package org.apache.atlas.examples;
import
com.google.common.annotations.VisibleForTesting
;
import
com.sun.jersey.core.util.MultivaluedMapImpl
;
import
org.apache.atlas.ApplicationProperties
;
import
org.apache.atlas.AtlasClient
;
import
org.apache.atlas.AtlasClientV2
;
import
org.apache.atlas.AtlasException
;
import
org.apache.atlas.AtlasServiceException
;
...
...
@@ -38,13 +37,11 @@ import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
import
org.apache.atlas.model.lineage.AtlasLineageInfo
;
import
org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection
;
import
org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation
;
import
org.apache.atlas.model.typedef.AtlasBaseTypeDef
;
import
org.apache.atlas.model.typedef.AtlasClassificationDef
;
import
org.apache.atlas.model.typedef.AtlasEntityDef
;
import
org.apache.atlas.model.typedef.Atlas
Enum
Def
;
import
org.apache.atlas.model.typedef.Atlas
StructDef
;
import
org.apache.atlas.model.typedef.Atlas
Relationship
Def
;
import
org.apache.atlas.model.typedef.Atlas
RelationshipDef.PropagateTags
;
import
org.apache.atlas.model.typedef.AtlasTypesDef
;
import
org.apache.atlas.type.AtlasTypeUtil
;
import
org.apache.atlas.utils.AuthenticationUtil
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.configuration.Configuration
;
...
...
@@ -53,9 +50,21 @@ import org.apache.commons.lang.ArrayUtils;
import
javax.ws.rs.core.MultivaluedMap
;
import
java.util.*
;
import
static
org
.
apache
.
atlas
.
model
.
typedef
.
AtlasStructDef
.
AtlasConstraintDef
.
CONSTRAINT_PARAM_ATTRIBUTE
;
import
static
org
.
apache
.
atlas
.
model
.
typedef
.
AtlasStructDef
.
AtlasConstraintDef
.
CONSTRAINT_TYPE_INVERSE_REF
;
import
static
org
.
apache
.
atlas
.
model
.
typedef
.
AtlasStructDef
.
AtlasConstraintDef
.
CONSTRAINT_TYPE_OWNED_REF
;
import
static
java
.
util
.
Arrays
.
asList
;
import
static
org
.
apache
.
atlas
.
AtlasClient
.
REFERENCEABLE_ATTRIBUTE_NAME
;
import
static
org
.
apache
.
atlas
.
model
.
typedef
.
AtlasRelationshipDef
.
RelationshipCategory
.
AGGREGATION
;
import
static
org
.
apache
.
atlas
.
model
.
typedef
.
AtlasRelationshipDef
.
RelationshipCategory
.
COMPOSITION
;
import
static
org
.
apache
.
atlas
.
model
.
typedef
.
AtlasStructDef
.
AtlasAttributeDef
.
Cardinality
.
SET
;
import
static
org
.
apache
.
atlas
.
model
.
typedef
.
AtlasStructDef
.
AtlasAttributeDef
.
Cardinality
.
SINGLE
;
import
static
org
.
apache
.
atlas
.
type
.
AtlasTypeUtil
.
createClassTypeDef
;
import
static
org
.
apache
.
atlas
.
type
.
AtlasTypeUtil
.
createOptionalAttrDef
;
import
static
org
.
apache
.
atlas
.
type
.
AtlasTypeUtil
.
createRelationshipEndDef
;
import
static
org
.
apache
.
atlas
.
type
.
AtlasTypeUtil
.
createRelationshipTypeDef
;
import
static
org
.
apache
.
atlas
.
type
.
AtlasTypeUtil
.
createRequiredAttrDef
;
import
static
org
.
apache
.
atlas
.
type
.
AtlasTypeUtil
.
createTraitTypeDef
;
import
static
org
.
apache
.
atlas
.
type
.
AtlasTypeUtil
.
createUniqueRequiredAttrDef
;
import
static
org
.
apache
.
atlas
.
type
.
AtlasTypeUtil
.
toAtlasRelatedObjectId
;
import
static
org
.
apache
.
atlas
.
type
.
AtlasTypeUtil
.
toAtlasRelatedObjectIds
;
/**
* A driver that sets up sample types and entities using v2 types and entity model for testing purposes.
...
...
@@ -112,9 +121,21 @@ public class QuickStartV2 {
public
static
final
String
LOAD_PROCESS_TYPE
=
"LoadProcess"
;
public
static
final
String
STORAGE_DESC_TYPE
=
"StorageDesc"
;
public
static
final
String
TABLE_DATABASE_TYPE
=
"Table_DB"
;
public
static
final
String
VIEW_DATABASE_TYPE
=
"View_DB"
;
public
static
final
String
VIEW_TABLES_TYPE
=
"View_Tables"
;
public
static
final
String
TABLE_COLUMNS_TYPE
=
"Table_Columns"
;
public
static
final
String
TABLE_STORAGE_DESC_TYPE
=
"Table_StorageDesc"
;
public
static
final
String
VERSION_1
=
"1.0"
;
public
static
final
String
MANAGED_TABLE
=
"Managed"
;
public
static
final
String
EXTERNAL_TABLE
=
"External"
;
public
static
final
String
CLUSTER_SUFFIX
=
"@cl1"
;
public
static
final
String
[]
TYPES
=
{
DATABASE_TYPE
,
TABLE_TYPE
,
STORAGE_DESC_TYPE
,
COLUMN_TYPE
,
LOAD_PROCESS_TYPE
,
VIEW_TYPE
,
JDBC_CLASSIFICATION
,
ETL_CLASSIFICATION
,
METRIC_CLASSIFICATION
,
PII_CLASSIFICATION
,
FACT_CLASSIFICATION
,
DIMENSION_CLASSIFICATION
,
LOGDATA_CLASSIFICATION
};
PII_CLASSIFICATION
,
FACT_CLASSIFICATION
,
DIMENSION_CLASSIFICATION
,
LOGDATA_CLASSIFICATION
,
TABLE_DATABASE_TYPE
,
VIEW_DATABASE_TYPE
,
VIEW_TABLES_TYPE
,
TABLE_COLUMNS_TYPE
,
TABLE_STORAGE_DESC_TYPE
};
public
static
void
main
(
String
[]
args
)
throws
Exception
{
String
[]
basicAuthUsernamePassword
=
null
;
...
...
@@ -129,6 +150,7 @@ public class QuickStartV2 {
@VisibleForTesting
static
void
runQuickstart
(
String
[]
args
,
String
[]
basicAuthUsernamePassword
)
throws
Exception
{
String
[]
urls
=
getServerUrl
(
args
);
QuickStartV2
quickStartV2
;
if
(!
AuthenticationUtil
.
isKerberosAuthenticationEnabled
())
{
...
...
@@ -182,77 +204,90 @@ public class QuickStartV2 {
AtlasTypesDef
atlasTypesDef
=
createTypeDefinitions
();
System
.
out
.
println
(
"\nCreating sample types: "
);
atlasClientV2
.
createAtlasTypeDefs
(
atlasTypesDef
);
verifyTypesCreated
();
}
AtlasTypesDef
createTypeDefinitions
()
throws
Exception
{
AtlasEntityDef
dbType
=
AtlasTypeUtil
.
createClassTypeDef
(
DATABASE_TYPE
,
DATABASE_TYPE
,
"1.0"
,
null
,
AtlasTypeUtil
.
createUniqueRequiredAttrDef
(
"name"
,
"string"
),
AtlasTypeUtil
.
createOptionalAttrDef
(
"description"
,
"string"
),
AtlasTypeUtil
.
createOptionalAttrDef
(
"locationUri"
,
"string"
),
AtlasTypeUtil
.
createOptionalAttrDef
(
"owner"
,
"string"
),
AtlasTypeUtil
.
createOptionalAttrDef
(
"createTime"
,
"long"
));
AtlasEntityDef
sdType
=
AtlasTypeUtil
.
createClassTypeDef
(
STORAGE_DESC_TYPE
,
STORAGE_DESC_TYPE
,
"1.0"
,
null
,
AtlasTypeUtil
.
createOptionalAttrDefWithConstraint
(
"table"
,
TABLE_TYPE
,
CONSTRAINT_TYPE_INVERSE_REF
,
new
HashMap
<
String
,
Object
>()
{{
put
(
CONSTRAINT_PARAM_ATTRIBUTE
,
"sd"
);
}}),
AtlasTypeUtil
.
createOptionalAttrDef
(
"location"
,
"string"
),
AtlasTypeUtil
.
createOptionalAttrDef
(
"inputFormat"
,
"string"
),
AtlasTypeUtil
.
createOptionalAttrDef
(
"outputFormat"
,
"string"
),
AtlasTypeUtil
.
createRequiredAttrDef
(
"compressed"
,
"boolean"
));
AtlasEntityDef
colType
=
AtlasTypeUtil
.
createClassTypeDef
(
COLUMN_TYPE
,
COLUMN_TYPE
,
"1.0"
,
null
,
AtlasTypeUtil
.
createOptionalAttrDef
(
"name"
,
"string"
),
AtlasTypeUtil
.
createOptionalAttrDef
(
"dataType"
,
"string"
),
AtlasTypeUtil
.
createOptionalAttrDef
(
"comment"
,
"string"
),
AtlasTypeUtil
.
createOptionalAttrDefWithConstraint
(
"table"
,
TABLE_TYPE
,
CONSTRAINT_TYPE_INVERSE_REF
,
new
HashMap
<
String
,
Object
>()
{{
put
(
CONSTRAINT_PARAM_ATTRIBUTE
,
"columns"
);
}}));
colType
.
setOptions
(
new
HashMap
<
String
,
String
>()
{{
put
(
"schemaAttributes"
,
"[\"name\", \"description\", \"owner\", \"type\", \"comment\", \"position\"]"
);
}});
AtlasEntityDef
tblType
=
AtlasTypeUtil
.
createClassTypeDef
(
TABLE_TYPE
,
TABLE_TYPE
,
"1.0"
,
Collections
.
singleton
(
"DataSet"
),
AtlasTypeUtil
.
createRequiredAttrDef
(
"db"
,
DATABASE_TYPE
),
AtlasTypeUtil
.
createRequiredAttrDefWithConstraint
(
"sd"
,
STORAGE_DESC_TYPE
,
CONSTRAINT_TYPE_OWNED_REF
,
null
),
AtlasTypeUtil
.
createOptionalAttrDef
(
"owner"
,
"string"
),
AtlasTypeUtil
.
createOptionalAttrDef
(
"createTime"
,
"long"
),
AtlasTypeUtil
.
createOptionalAttrDef
(
"lastAccessTime"
,
"long"
),
AtlasTypeUtil
.
createOptionalAttrDef
(
"retention"
,
"long"
),
AtlasTypeUtil
.
createOptionalAttrDef
(
"viewOriginalText"
,
"string"
),
AtlasTypeUtil
.
createOptionalAttrDef
(
"viewExpandedText"
,
"string"
),
AtlasTypeUtil
.
createOptionalAttrDef
(
"tableType"
,
"string"
),
AtlasTypeUtil
.
createOptionalAttrDef
(
"temporary"
,
"boolean"
),
AtlasTypeUtil
.
createRequiredListAttrDefWithConstraint
(
"columns"
,
AtlasBaseTypeDef
.
getArrayTypeName
(
COLUMN_TYPE
),
CONSTRAINT_TYPE_OWNED_REF
,
null
));
tblType
.
setOptions
(
new
HashMap
<
String
,
String
>()
{{
put
(
"schemaElementsAttribute"
,
"columns"
);
}});
AtlasEntityDef
procType
=
AtlasTypeUtil
.
createClassTypeDef
(
LOAD_PROCESS_TYPE
,
LOAD_PROCESS_TYPE
,
"1.0"
,
Collections
.
singleton
(
"Process"
),
AtlasTypeUtil
.
createOptionalAttrDef
(
"userName"
,
"string"
),
AtlasTypeUtil
.
createOptionalAttrDef
(
"startTime"
,
"long"
),
AtlasTypeUtil
.
createOptionalAttrDef
(
"endTime"
,
"long"
),
AtlasTypeUtil
.
createRequiredAttrDef
(
"queryText"
,
"string"
),
AtlasTypeUtil
.
createRequiredAttrDef
(
"queryPlan"
,
"string"
),
AtlasTypeUtil
.
createRequiredAttrDef
(
"queryId"
,
"string"
),
AtlasTypeUtil
.
createRequiredAttrDef
(
"queryGraph"
,
"string"
));
AtlasEntityDef
viewType
=
AtlasTypeUtil
.
createClassTypeDef
(
VIEW_TYPE
,
VIEW_TYPE
,
"1.0"
,
Collections
.
singleton
(
"DataSet"
),
AtlasTypeUtil
.
createRequiredAttrDef
(
"db"
,
DATABASE_TYPE
),
AtlasTypeUtil
.
createOptionalListAttrDef
(
"inputTables"
,
AtlasBaseTypeDef
.
getArrayTypeName
(
TABLE_TYPE
)));
AtlasClassificationDef
dimClassifDef
=
AtlasTypeUtil
.
createTraitTypeDef
(
DIMENSION_CLASSIFICATION
,
"Dimension Classification"
,
"1.0"
,
Collections
.<
String
>
emptySet
());
AtlasClassificationDef
factClassifDef
=
AtlasTypeUtil
.
createTraitTypeDef
(
FACT_CLASSIFICATION
,
"Fact Classification"
,
"1.0"
,
Collections
.<
String
>
emptySet
());
AtlasClassificationDef
piiClassifDef
=
AtlasTypeUtil
.
createTraitTypeDef
(
PII_CLASSIFICATION
,
"PII Classification"
,
"1.0"
,
Collections
.<
String
>
emptySet
());
AtlasClassificationDef
metricClassifDef
=
AtlasTypeUtil
.
createTraitTypeDef
(
METRIC_CLASSIFICATION
,
"Metric Classification"
,
"1.0"
,
Collections
.<
String
>
emptySet
());
AtlasClassificationDef
etlClassifDef
=
AtlasTypeUtil
.
createTraitTypeDef
(
ETL_CLASSIFICATION
,
"ETL Classification"
,
"1.0"
,
Collections
.<
String
>
emptySet
());
AtlasClassificationDef
jdbcClassifDef
=
AtlasTypeUtil
.
createTraitTypeDef
(
JDBC_CLASSIFICATION
,
"JdbcAccess Classification"
,
"1.0"
,
Collections
.<
String
>
emptySet
());
AtlasClassificationDef
logClassifDef
=
AtlasTypeUtil
.
createTraitTypeDef
(
LOGDATA_CLASSIFICATION
,
"LogData Classification"
,
"1.0"
,
Collections
.<
String
>
emptySet
());
return
AtlasTypeUtil
.
getTypesDef
(
Collections
.<
AtlasEnumDef
>
emptyList
(),
Collections
.<
AtlasStructDef
>
emptyList
(),
Arrays
.
asList
(
dimClassifDef
,
factClassifDef
,
piiClassifDef
,
metricClassifDef
,
etlClassifDef
,
jdbcClassifDef
,
logClassifDef
),
Arrays
.
asList
(
dbType
,
sdType
,
colType
,
tblType
,
procType
,
viewType
));
AtlasTypesDef
createTypeDefinitions
()
{
// Entity-Definitions
AtlasEntityDef
dbTypeDef
=
createClassTypeDef
(
DATABASE_TYPE
,
DATABASE_TYPE
,
VERSION_1
,
Collections
.
singleton
(
"DataSet"
),
createUniqueRequiredAttrDef
(
"name"
,
"string"
),
createOptionalAttrDef
(
"description"
,
"string"
),
createOptionalAttrDef
(
"locationUri"
,
"string"
),
createOptionalAttrDef
(
"owner"
,
"string"
),
createOptionalAttrDef
(
"createTime"
,
"long"
));
AtlasEntityDef
tableTypeDef
=
createClassTypeDef
(
TABLE_TYPE
,
TABLE_TYPE
,
VERSION_1
,
Collections
.
singleton
(
"DataSet"
),
new
HashMap
<
String
,
String
>()
{{
put
(
"schemaElementsAttribute"
,
"columns"
);
}}
,
createOptionalAttrDef
(
"owner"
,
"string"
),
createOptionalAttrDef
(
"createTime"
,
"long"
),
createOptionalAttrDef
(
"lastAccessTime"
,
"long"
),
createOptionalAttrDef
(
"retention"
,
"long"
),
createOptionalAttrDef
(
"viewOriginalText"
,
"string"
),
createOptionalAttrDef
(
"viewExpandedText"
,
"string"
),
createOptionalAttrDef
(
"tableType"
,
"string"
),
createOptionalAttrDef
(
"temporary"
,
"boolean"
));
AtlasEntityDef
colTypeDef
=
createClassTypeDef
(
COLUMN_TYPE
,
COLUMN_TYPE
,
VERSION_1
,
Collections
.
singleton
(
"DataSet"
),
new
HashMap
<
String
,
String
>()
{{
put
(
"schemaAttributes"
,
"[\"name\", \"description\", \"owner\", \"type\", \"comment\", \"position\"]"
);
}},
createOptionalAttrDef
(
"name"
,
"string"
),
createOptionalAttrDef
(
"dataType"
,
"string"
),
createOptionalAttrDef
(
"comment"
,
"string"
));
AtlasEntityDef
sdTypeDef
=
createClassTypeDef
(
STORAGE_DESC_TYPE
,
STORAGE_DESC_TYPE
,
VERSION_1
,
Collections
.
singleton
(
"DataSet"
),
createOptionalAttrDef
(
"location"
,
"string"
),
createOptionalAttrDef
(
"inputFormat"
,
"string"
),
createOptionalAttrDef
(
"outputFormat"
,
"string"
),
createRequiredAttrDef
(
"compressed"
,
"boolean"
));
AtlasEntityDef
processTypeDef
=
createClassTypeDef
(
LOAD_PROCESS_TYPE
,
LOAD_PROCESS_TYPE
,
VERSION_1
,
Collections
.
singleton
(
"Process"
),
createOptionalAttrDef
(
"userName"
,
"string"
),
createOptionalAttrDef
(
"startTime"
,
"long"
),
createOptionalAttrDef
(
"endTime"
,
"long"
),
createRequiredAttrDef
(
"queryText"
,
"string"
),
createRequiredAttrDef
(
"queryPlan"
,
"string"
),
createRequiredAttrDef
(
"queryId"
,
"string"
),
createRequiredAttrDef
(
"queryGraph"
,
"string"
));
AtlasEntityDef
viewTypeDef
=
createClassTypeDef
(
VIEW_TYPE
,
VIEW_TYPE
,
VERSION_1
,
Collections
.
singleton
(
"DataSet"
));
// Relationship-Definitions
AtlasRelationshipDef
tableDatabaseTypeDef
=
createRelationshipTypeDef
(
TABLE_DATABASE_TYPE
,
TABLE_DATABASE_TYPE
,
VERSION_1
,
AGGREGATION
,
PropagateTags
.
NONE
,
createRelationshipEndDef
(
TABLE_TYPE
,
"db"
,
SINGLE
,
false
),
createRelationshipEndDef
(
DATABASE_TYPE
,
"tables"
,
SET
,
true
));
AtlasRelationshipDef
viewDatabaseTypeDef
=
createRelationshipTypeDef
(
VIEW_DATABASE_TYPE
,
VIEW_DATABASE_TYPE
,
VERSION_1
,
AGGREGATION
,
PropagateTags
.
NONE
,
createRelationshipEndDef
(
VIEW_TYPE
,
"db"
,
SINGLE
,
false
),
createRelationshipEndDef
(
DATABASE_TYPE
,
"views"
,
SET
,
true
));
AtlasRelationshipDef
viewTablesTypeDef
=
createRelationshipTypeDef
(
VIEW_TABLES_TYPE
,
VIEW_TABLES_TYPE
,
VERSION_1
,
AGGREGATION
,
PropagateTags
.
NONE
,
createRelationshipEndDef
(
VIEW_TYPE
,
"inputTables"
,
SET
,
true
),
createRelationshipEndDef
(
TABLE_TYPE
,
"view"
,
SINGLE
,
false
));
AtlasRelationshipDef
tableColumnsTypeDef
=
createRelationshipTypeDef
(
TABLE_COLUMNS_TYPE
,
TABLE_COLUMNS_TYPE
,
VERSION_1
,
COMPOSITION
,
PropagateTags
.
NONE
,
createRelationshipEndDef
(
TABLE_TYPE
,
"columns"
,
SET
,
true
),
createRelationshipEndDef
(
COLUMN_TYPE
,
"table"
,
SINGLE
,
false
));
AtlasRelationshipDef
tableStorageDescTypeDef
=
createRelationshipTypeDef
(
TABLE_STORAGE_DESC_TYPE
,
TABLE_STORAGE_DESC_TYPE
,
VERSION_1
,
COMPOSITION
,
PropagateTags
.
NONE
,
createRelationshipEndDef
(
TABLE_TYPE
,
"sd"
,
SINGLE
,
true
),
createRelationshipEndDef
(
STORAGE_DESC_TYPE
,
"table"
,
SINGLE
,
false
));
// Classification-Definitions
AtlasClassificationDef
dimClassifDef
=
createTraitTypeDef
(
DIMENSION_CLASSIFICATION
,
"Dimension Classification"
,
VERSION_1
,
Collections
.
emptySet
());
AtlasClassificationDef
factClassifDef
=
createTraitTypeDef
(
FACT_CLASSIFICATION
,
"Fact Classification"
,
VERSION_1
,
Collections
.
emptySet
());
AtlasClassificationDef
piiClassifDef
=
createTraitTypeDef
(
PII_CLASSIFICATION
,
"PII Classification"
,
VERSION_1
,
Collections
.
emptySet
());
AtlasClassificationDef
metricClassifDef
=
createTraitTypeDef
(
METRIC_CLASSIFICATION
,
"Metric Classification"
,
VERSION_1
,
Collections
.
emptySet
());
AtlasClassificationDef
etlClassifDef
=
createTraitTypeDef
(
ETL_CLASSIFICATION
,
"ETL Classification"
,
VERSION_1
,
Collections
.
emptySet
());
AtlasClassificationDef
jdbcClassifDef
=
createTraitTypeDef
(
JDBC_CLASSIFICATION
,
"JdbcAccess Classification"
,
VERSION_1
,
Collections
.
emptySet
());
AtlasClassificationDef
logClassifDef
=
createTraitTypeDef
(
LOGDATA_CLASSIFICATION
,
"LogData Classification"
,
VERSION_1
,
Collections
.
emptySet
());
List
<
AtlasEntityDef
>
entityDefs
=
asList
(
dbTypeDef
,
sdTypeDef
,
colTypeDef
,
tableTypeDef
,
processTypeDef
,
viewTypeDef
);
List
<
AtlasRelationshipDef
>
relationshipDefs
=
asList
(
tableDatabaseTypeDef
,
viewDatabaseTypeDef
,
viewTablesTypeDef
,
tableColumnsTypeDef
,
tableStorageDescTypeDef
);
List
<
AtlasClassificationDef
>
classificationDefs
=
asList
(
dimClassifDef
,
factClassifDef
,
piiClassifDef
,
metricClassifDef
,
etlClassifDef
,
jdbcClassifDef
,
logClassifDef
);
return
new
AtlasTypesDef
(
Collections
.
emptyList
(),
Collections
.
emptyList
(),
classificationDefs
,
entityDefs
,
relationshipDefs
);
}
void
createEntities
()
throws
Exception
{
...
...
@@ -263,165 +298,207 @@ public class QuickStartV2 {
AtlasEntity
reportingDB
=
createDatabase
(
REPORTING_DB
,
"reporting database"
,
"Jane BI"
,
"hdfs://host:8000/apps/warehouse/reporting"
);
AtlasEntity
logDB
=
createDatabase
(
LOGGING_DB
,
"logging database"
,
"Tim ETL"
,
"hdfs://host:8000/apps/warehouse/logging"
);
// Storage Descriptor entities
AtlasEntity
storageDesc
=
createStorageDescriptor
(
"hdfs://host:8000/apps/warehouse/sales"
,
"TextInputFormat"
,
"TextOutputFormat"
,
true
);
// Column entities
List
<
AtlasEntity
>
salesFactColumns
=
Arrays
.
asList
(
createColumn
(
TIME_ID_COLUMN
,
"int"
,
"time id"
),
// Table entities
AtlasEntity
salesFact
=
createTable
(
SALES_FACT_TABLE
,
"sales fact table"
,
salesDB
,
"Joe"
,
MANAGED_TABLE
,
Arrays
.
asList
(
createColumn
(
TIME_ID_COLUMN
,
"int"
,
"time id"
),
createColumn
(
PRODUCT_ID_COLUMN
,
"int"
,
"product id"
),
createColumn
(
CUSTOMER_ID_COLUMN
,
"int"
,
"customer id"
,
PII_CLASSIFICATION
),
createColumn
(
SALES_COLUMN
,
"double"
,
"product id"
,
METRIC_CLASSIFICATION
));
createColumn
(
SALES_COLUMN
,
"double"
,
"product id"
,
METRIC_CLASSIFICATION
)),
FACT_CLASSIFICATION
);
List
<
AtlasEntity
>
logFactColumns
=
Arrays
.
asList
(
createColumn
(
TIME_ID_COLUMN
,
"int"
,
"time id"
),
createColumn
(
APP_ID_COLUMN
,
"int"
,
"app id"
),
createColumn
(
MACHINE_ID_COLUMN
,
"int"
,
"machine id"
),
createColumn
(
LOG_COLUMN
,
"string"
,
"log data"
,
LOGDATA_CLASSIFICATION
));
List
<
AtlasEntity
>
productDimColumns
=
Arrays
.
asList
(
createColumn
(
PRODUCT_ID_COLUMN
,
"int"
,
"product id"
),
AtlasEntity
productDim
=
createTable
(
PRODUCT_DIM_TABLE
,
"product dimension table"
,
salesDB
,
"John Doe"
,
MANAGED_TABLE
,
Arrays
.
asList
(
createColumn
(
PRODUCT_ID_COLUMN
,
"int"
,
"product id"
),
createColumn
(
PRODUCT_NAME_COLUMN
,
"string"
,
"product name"
),
createColumn
(
BRAND_NAME_COLUMN
,
"int"
,
"brand name"
));
createColumn
(
BRAND_NAME_COLUMN
,
"int"
,
"brand name"
)),
DIMENSION_CLASSIFICATION
);
List
<
AtlasEntity
>
timeDimColumns
=
Arrays
.
asList
(
createColumn
(
TIME_ID_COLUMN
,
"int"
,
"time id"
),
AtlasEntity
customerDim
=
createTable
(
CUSTOMER_DIM_TABLE
,
"customer dimension table"
,
salesDB
,
"fetl"
,
EXTERNAL_TABLE
,
Arrays
.
asList
(
createColumn
(
CUSTOMER_ID_COLUMN
,
"int"
,
"customer id"
,
PII_CLASSIFICATION
),
createColumn
(
NAME_COLUMN
,
"string"
,
"customer name"
,
PII_CLASSIFICATION
),
createColumn
(
ADDRESS_COLUMN
,
"string"
,
"customer address"
,
PII_CLASSIFICATION
)),
DIMENSION_CLASSIFICATION
);
AtlasEntity
timeDim
=
createTable
(
TIME_DIM_TABLE
,
"time dimension table"
,
salesDB
,
"John Doe"
,
EXTERNAL_TABLE
,
Arrays
.
asList
(
createColumn
(
TIME_ID_COLUMN
,
"int"
,
"time id"
),
createColumn
(
DAY_OF_YEAR_COLUMN
,
"int"
,
"day Of Year"
),
createColumn
(
WEEKDAY_COLUMN
,
"int"
,
"week Day"
));
createColumn
(
WEEKDAY_COLUMN
,
"int"
,
"week Day"
)),
DIMENSION_CLASSIFICATION
);
List
<
AtlasEntity
>
customerDimColumns
=
Arrays
.
asList
(
createColumn
(
CUSTOMER_ID_COLUMN
,
"int"
,
"customer id"
,
PII_CLASSIFICATION
),
createColumn
(
NAME_COLUMN
,
"string"
,
"customer name"
,
PII_CLASSIFICATION
),
createColumn
(
ADDRESS_COLUMN
,
"string"
,
"customer address"
,
PII_CLASSIFICATION
));
AtlasEntity
loggingFactDaily
=
createTable
(
LOG_FACT_DAILY_MV_TABLE
,
"log fact daily materialized view"
,
logDB
,
"Tim ETL"
,
MANAGED_TABLE
,
Arrays
.
asList
(
createColumn
(
TIME_ID_COLUMN
,
"int"
,
"time id"
),
createColumn
(
APP_ID_COLUMN
,
"int"
,
"app id"
),
createColumn
(
MACHINE_ID_COLUMN
,
"int"
,
"machine id"
),
createColumn
(
LOG_COLUMN
,
"string"
,
"log data"
,
LOGDATA_CLASSIFICATION
)),
LOGDATA_CLASSIFICATION
);
// Table entities
AtlasEntity
salesFact
=
createTable
(
SALES_FACT_TABLE
,
"sales fact table"
,
salesDB
,
storageDesc
,
"Joe"
,
"Managed"
,
salesFactColumns
,
FACT_CLASSIFICATION
);
AtlasEntity
productDim
=
createTable
(
PRODUCT_DIM_TABLE
,
"product dimension table"
,
salesDB
,
storageDesc
,
"John Doe"
,
"Managed"
,
productDimColumns
,
DIMENSION_CLASSIFICATION
);
AtlasEntity
customerDim
=
createTable
(
CUSTOMER_DIM_TABLE
,
"customer dimension table"
,
salesDB
,
storageDesc
,
"fetl"
,
"External"
,
customerDimColumns
,
DIMENSION_CLASSIFICATION
);
AtlasEntity
timeDim
=
createTable
(
TIME_DIM_TABLE
,
"time dimension table"
,
salesDB
,
storageDesc
,
"John Doe"
,
"External"
,
timeDimColumns
,
DIMENSION_CLASSIFICATION
);
AtlasEntity
loggingFactDaily
=
createTable
(
LOG_FACT_DAILY_MV_TABLE
,
"log fact daily materialized view"
,
logDB
,
storageDesc
,
"Tim ETL"
,
"Managed"
,
logFactColumns
,
LOGDATA_CLASSIFICATION
);
AtlasEntity
loggingFactMonthly
=
createTable
(
LOG_FACT_MONTHLY_MV_TABLE
,
"logging fact monthly materialized view"
,
logDB
,
storageDesc
,
"Tim ETL"
,
"Managed"
,
logFactColumns
,
LOGDATA_CLASSIFICATION
);
AtlasEntity
salesFactDaily
=
createTable
(
SALES_FACT_DAILY_MV_TABLE
,
"sales fact daily materialized view"
,
reportingDB
,
storageDesc
,
"Joe BI"
,
"Managed"
,
salesFactColumns
,
METRIC_CLASSIFICATION
);
AtlasEntity
salesFactMonthly
=
createTable
(
SALES_FACT_MONTHLY_MV_TABLE
,
"sales fact monthly materialized view"
,
reportingDB
,
storageDesc
,
"Jane BI"
,
"Managed"
,
salesFactColumns
,
METRIC_CLASSIFICATION
);
AtlasEntity
loggingFactMonthly
=
createTable
(
LOG_FACT_MONTHLY_MV_TABLE
,
"logging fact monthly materialized view"
,
logDB
,
"Tim ETL"
,
MANAGED_TABLE
,
Arrays
.
asList
(
createColumn
(
TIME_ID_COLUMN
,
"int"
,
"time id"
),
createColumn
(
APP_ID_COLUMN
,
"int"
,
"app id"
),
createColumn
(
MACHINE_ID_COLUMN
,
"int"
,
"machine id"
),
createColumn
(
LOG_COLUMN
,
"string"
,
"log data"
,
LOGDATA_CLASSIFICATION
)),
LOGDATA_CLASSIFICATION
);
AtlasEntity
salesFactDaily
=
createTable
(
SALES_FACT_DAILY_MV_TABLE
,
"sales fact daily materialized view"
,
reportingDB
,
"Joe BI"
,
MANAGED_TABLE
,
Arrays
.
asList
(
createColumn
(
TIME_ID_COLUMN
,
"int"
,
"time id"
),
createColumn
(
PRODUCT_ID_COLUMN
,
"int"
,
"product id"
),
createColumn
(
CUSTOMER_ID_COLUMN
,
"int"
,
"customer id"
,
PII_CLASSIFICATION
),
createColumn
(
SALES_COLUMN
,
"double"
,
"product id"
,
METRIC_CLASSIFICATION
)),
METRIC_CLASSIFICATION
);
AtlasEntity
salesFactMonthly
=
createTable
(
SALES_FACT_MONTHLY_MV_TABLE
,
"sales fact monthly materialized view"
,
reportingDB
,
"Jane BI"
,
MANAGED_TABLE
,
Arrays
.
asList
(
createColumn
(
TIME_ID_COLUMN
,
"int"
,
"time id"
),
createColumn
(
PRODUCT_ID_COLUMN
,
"int"
,
"product id"
),
createColumn
(
CUSTOMER_ID_COLUMN
,
"int"
,
"customer id"
,
PII_CLASSIFICATION
),
createColumn
(
SALES_COLUMN
,
"double"
,
"product id"
,
METRIC_CLASSIFICATION
)),
METRIC_CLASSIFICATION
);
// View entities
createView
(
PRODUCT_DIM_VIEW
,
reportingDB
,
Collections
.
singleton
List
(
productDim
),
DIMENSION_CLASSIFICATION
,
JDBC_CLASSIFICATION
);
createView
(
CUSTOMER_DIM_VIEW
,
reportingDB
,
Collections
.
singleton
List
(
customerDim
),
DIMENSION_CLASSIFICATION
,
JDBC_CLASSIFICATION
);
createView
(
PRODUCT_DIM_VIEW
,
reportingDB
,
as
List
(
productDim
),
DIMENSION_CLASSIFICATION
,
JDBC_CLASSIFICATION
);
createView
(
CUSTOMER_DIM_VIEW
,
reportingDB
,
as
List
(
customerDim
),
DIMENSION_CLASSIFICATION
,
JDBC_CLASSIFICATION
);
// Process entities
createProcess
(
LOAD_SALES_DAILY_PROCESS
,
"hive query for daily summary"
,
"John ETL"
,
Arrays
.
asList
(
salesFact
,
timeDim
),
Collections
.
singleton
List
(
salesFactDaily
),
asList
(
salesFact
,
timeDim
),
as
List
(
salesFactDaily
),
"create table as select "
,
"plan"
,
"id"
,
"graph"
,
ETL_CLASSIFICATION
);
createProcess
(
LOAD_SALES_MONTHLY_PROCESS
,
"hive query for monthly summary"
,
"John ETL"
,
Collections
.
singleton
List
(
salesFactDaily
),
Collections
.
singleton
List
(
salesFactMonthly
),
as
List
(
salesFactDaily
),
as
List
(
salesFactMonthly
),
"create table as select "
,
"plan"
,
"id"
,
"graph"
,
ETL_CLASSIFICATION
);
createProcess
(
LOAD_LOGS_MONTHLY_PROCESS
,
"hive query for monthly summary"
,
"Tim ETL"
,
Collections
.
singleton
List
(
loggingFactDaily
),
Collections
.
singleton
List
(
loggingFactMonthly
),
as
List
(
loggingFactDaily
),
as
List
(
loggingFactMonthly
),
"create table as select "
,
"plan"
,
"id"
,
"graph"
,
ETL_CLASSIFICATION
);
}
private
AtlasEntity
createInstance
(
AtlasEntity
entity
,
String
[]
traitNames
)
throws
Exception
{
private
AtlasEntity
createInstance
(
AtlasEntity
entity
)
throws
Exception
{
return
createInstance
(
new
AtlasEntityWithExtInfo
(
entity
));
}
private
AtlasEntity
createInstance
(
AtlasEntityWithExtInfo
entityWithExtInfo
)
throws
Exception
{
AtlasEntity
ret
=
null
;
EntityMutationResponse
response
=
atlasClientV2
.
createEntity
(
new
AtlasEntityWithExtInfo
(
entity
)
);
EntityMutationResponse
response
=
atlasClientV2
.
createEntity
(
entityWithExtInfo
);
List
<
AtlasEntityHeader
>
entities
=
response
.
getEntitiesByOperation
(
EntityOperation
.
CREATE
);
if
(
CollectionUtils
.
isNotEmpty
(
entities
))
{
AtlasEntityWithExtInfo
getByGuidResponse
=
atlasClientV2
.
getEntityByGuid
(
entities
.
get
(
0
).
getGuid
());
ret
=
getByGuidResponse
.
getEntity
();
System
.
out
.
println
(
"Created entity of type ["
+
ret
.
getTypeName
()
+
"], guid: "
+
ret
.
getGuid
());
}
return
ret
;
}
AtlasEntity
createDatabase
(
String
name
,
String
description
,
String
owner
,
String
locationUri
,
String
...
traitNames
)
throws
Exception
{
AtlasEntity
createDatabase
(
String
name
,
String
description
,
String
owner
,
String
locationUri
,
String
...
classificationNames
)
throws
Exception
{
AtlasEntity
entity
=
new
AtlasEntity
(
DATABASE_TYPE
);
entity
.
setClassifications
(
toAtlasClassifications
(
traitNames
));
// set attributes
entity
.
setAttribute
(
"name"
,
name
);
entity
.
setAttribute
(
REFERENCEABLE_ATTRIBUTE_NAME
,
name
+
CLUSTER_SUFFIX
);
entity
.
setAttribute
(
"description"
,
description
);
entity
.
setAttribute
(
"owner"
,
owner
);
entity
.
setAttribute
(
"locationuri"
,
locationUri
);
entity
.
setAttribute
(
"createTime"
,
System
.
currentTimeMillis
());
return
createInstance
(
entity
,
traitNames
);
// set classifications
entity
.
setClassifications
(
toAtlasClassifications
(
classificationNames
));
return
createInstance
(
entity
);
}
private
List
<
AtlasClassification
>
toAtlasClassifications
(
String
[]
trait
Names
)
{
private
List
<
AtlasClassification
>
toAtlasClassifications
(
String
[]
classification
Names
)
{
List
<
AtlasClassification
>
ret
=
new
ArrayList
<>();
List
<
String
>
traits
=
Arrays
.
asList
(
trait
Names
);
List
<
String
>
classifications
=
asList
(
classification
Names
);
if
(
CollectionUtils
.
isNotEmpty
(
trait
s
))
{
for
(
String
trait
:
trait
s
)
{
ret
.
add
(
new
AtlasClassification
(
trait
));
if
(
CollectionUtils
.
isNotEmpty
(
classification
s
))
{
for
(
String
classificationName
:
classification
s
)
{
ret
.
add
(
new
AtlasClassification
(
classificationName
));
}
}
return
ret
;
}
AtlasEntity
createStorageDescriptor
(
String
location
,
String
inputFormat
,
String
outputFormat
,
boolean
compressed
)
throws
Exception
{
AtlasEntity
entity
=
new
AtlasEntity
(
STORAGE_DESC_TYPE
);
AtlasEntity
createStorageDescriptor
(
String
location
,
String
inputFormat
,
String
outputFormat
,
boolean
compressed
)
{
AtlasEntity
ret
=
new
AtlasEntity
(
STORAGE_DESC_TYPE
);
entity
.
setAttribute
(
"location"
,
location
);
entity
.
setAttribute
(
"inputFormat"
,
inputFormat
);
entity
.
setAttribute
(
"outputFormat"
,
outputFormat
);
entity
.
setAttribute
(
"compressed"
,
compressed
);
ret
.
setAttribute
(
"name"
,
"sd:"
+
location
);
ret
.
setAttribute
(
REFERENCEABLE_ATTRIBUTE_NAME
,
"sd:"
+
location
+
CLUSTER_SUFFIX
);
ret
.
setAttribute
(
"location"
,
location
);
ret
.
setAttribute
(
"inputFormat"
,
inputFormat
);
ret
.
setAttribute
(
"outputFormat"
,
outputFormat
);
ret
.
setAttribute
(
"compressed"
,
compressed
);
return
createInstance
(
entity
,
null
)
;
return
ret
;
}
AtlasEntity
createColumn
(
String
name
,
String
dataType
,
String
comment
,
String
...
traitNames
)
throws
Exception
{
AtlasEntity
createColumn
(
String
name
,
String
dataType
,
String
comment
,
String
...
classificationNames
)
{
AtlasEntity
ret
=
new
AtlasEntity
(
COLUMN_TYPE
);
AtlasEntity
entity
=
new
AtlasEntity
(
COLUMN_TYPE
);
entity
.
setClassifications
(
toAtlasClassifications
(
traitNames
));
entity
.
setAttribute
(
"name"
,
name
);
entity
.
setAttribute
(
"dataType"
,
dataType
);
entity
.
setAttribute
(
"comment"
,
comment
);
// set attributes
ret
.
setAttribute
(
"name"
,
name
);
ret
.
setAttribute
(
REFERENCEABLE_ATTRIBUTE_NAME
,
name
+
CLUSTER_SUFFIX
);
ret
.
setAttribute
(
"dataType"
,
dataType
);
ret
.
setAttribute
(
"comment"
,
comment
);
// set classifications
ret
.
setClassifications
(
toAtlasClassifications
(
classificationNames
));
return
createInstance
(
entity
,
traitNames
)
;
return
ret
;
}
AtlasEntity
createTable
(
String
name
,
String
description
,
AtlasEntity
d
b
,
AtlasEntity
sd
,
String
owner
,
String
tableType
,
List
<
AtlasEntity
>
columns
,
String
...
trait
Names
)
throws
Exception
{
AtlasEntity
e
ntity
=
new
AtlasEntity
(
TABLE_TYPE
);
AtlasEntity
createTable
(
String
name
,
String
description
,
AtlasEntity
d
atabase
,
String
owner
,
String
tableType
,
List
<
AtlasEntity
>
columns
,
String
...
classification
Names
)
throws
Exception
{
AtlasEntity
tblE
ntity
=
new
AtlasEntity
(
TABLE_TYPE
);
entity
.
setClassifications
(
toAtlasClassifications
(
traitNames
));
entity
.
setAttribute
(
"name"
,
name
);
entity
.
setAttribute
(
AtlasClient
.
REFERENCEABLE_ATTRIBUTE_NAME
,
name
);
entity
.
setAttribute
(
"description"
,
description
);
entity
.
setAttribute
(
"owner"
,
owner
);
entity
.
setAttribute
(
"tableType"
,
tableType
);
entity
.
setAttribute
(
"createTime"
,
System
.
currentTimeMillis
());
entity
.
setAttribute
(
"lastAccessTime"
,
System
.
currentTimeMillis
());
entity
.
setAttribute
(
"retention"
,
System
.
currentTimeMillis
());
entity
.
setAttribute
(
"db"
,
AtlasTypeUtil
.
getAtlasObjectId
(
db
));
entity
.
setAttribute
(
"sd"
,
AtlasTypeUtil
.
getAtlasObjectId
(
sd
));
entity
.
setAttribute
(
"columns"
,
AtlasTypeUtil
.
toObjectIds
(
columns
));
// set attributes
tblEntity
.
setAttribute
(
"name"
,
name
);
tblEntity
.
setAttribute
(
REFERENCEABLE_ATTRIBUTE_NAME
,
name
+
CLUSTER_SUFFIX
);
tblEntity
.
setAttribute
(
"description"
,
description
);
tblEntity
.
setAttribute
(
"owner"
,
owner
);
tblEntity
.
setAttribute
(
"tableType"
,
tableType
);
tblEntity
.
setAttribute
(
"createTime"
,
System
.
currentTimeMillis
());
tblEntity
.
setAttribute
(
"lastAccessTime"
,
System
.
currentTimeMillis
());
tblEntity
.
setAttribute
(
"retention"
,
System
.
currentTimeMillis
());
// set relationship attributes
AtlasEntity
storageDesc
=
createStorageDescriptor
(
"hdfs://host:8000/apps/warehouse/sales"
,
"TextInputFormat"
,
"TextOutputFormat"
,
true
);
storageDesc
.
setRelationshipAttribute
(
"table"
,
toAtlasRelatedObjectId
(
tblEntity
));
tblEntity
.
setRelationshipAttribute
(
"db"
,
toAtlasRelatedObjectId
(
database
));
tblEntity
.
setRelationshipAttribute
(
"sd"
,
toAtlasRelatedObjectId
(
storageDesc
));
tblEntity
.
setRelationshipAttribute
(
"columns"
,
toAtlasRelatedObjectIds
(
columns
));
// set classifications
tblEntity
.
setClassifications
(
toAtlasClassifications
(
classificationNames
));
AtlasEntityWithExtInfo
entityWithExtInfo
=
new
AtlasEntityWithExtInfo
();
return
createInstance
(
entity
,
traitNames
);
entityWithExtInfo
.
setEntity
(
tblEntity
);
entityWithExtInfo
.
addReferredEntity
(
storageDesc
);
for
(
AtlasEntity
column
:
columns
)
{
column
.
setRelationshipAttribute
(
"table"
,
toAtlasRelatedObjectId
(
tblEntity
));
entityWithExtInfo
.
addReferredEntity
(
column
);
}
return
createInstance
(
entityWithExtInfo
);
}
AtlasEntity
createProcess
(
String
name
,
String
description
,
String
user
,
List
<
AtlasEntity
>
inputs
,
List
<
AtlasEntity
>
outputs
,
String
queryText
,
String
queryPlan
,
String
queryId
,
String
queryGraph
,
String
...
traitNames
)
throws
Exception
{
String
queryText
,
String
queryPlan
,
String
queryId
,
String
queryGraph
,
String
...
classificationNames
)
throws
Exception
{
AtlasEntity
entity
=
new
AtlasEntity
(
LOAD_PROCESS_TYPE
);
entity
.
setClassifications
(
toAtlasClassifications
(
traitNames
));
entity
.
setAttribute
(
AtlasClient
.
NAME
,
name
);
entity
.
setAttribute
(
AtlasClient
.
REFERENCEABLE_ATTRIBUTE_NAME
,
name
);
// set attributes
entity
.
setAttribute
(
"name"
,
name
);
entity
.
setAttribute
(
REFERENCEABLE_ATTRIBUTE_NAME
,
name
+
CLUSTER_SUFFIX
);
entity
.
setAttribute
(
"description"
,
description
);
entity
.
setAttribute
(
"inputs"
,
inputs
);
entity
.
setAttribute
(
"outputs"
,
outputs
);
entity
.
setAttribute
(
"user"
,
user
);
entity
.
setAttribute
(
"startTime"
,
System
.
currentTimeMillis
());
entity
.
setAttribute
(
"endTime"
,
System
.
currentTimeMillis
()
+
10000
);
...
...
@@ -430,19 +507,31 @@ public class QuickStartV2 {
entity
.
setAttribute
(
"queryId"
,
queryId
);
entity
.
setAttribute
(
"queryGraph"
,
queryGraph
);
return
createInstance
(
entity
,
traitNames
);
// set relationship attributes
entity
.
setRelationshipAttribute
(
"inputs"
,
toAtlasRelatedObjectIds
(
inputs
));
entity
.
setRelationshipAttribute
(
"outputs"
,
toAtlasRelatedObjectIds
(
outputs
));
// set classifications
entity
.
setClassifications
(
toAtlasClassifications
(
classificationNames
));
return
createInstance
(
entity
);
}
AtlasEntity
createView
(
String
name
,
AtlasEntity
d
b
,
List
<
AtlasEntity
>
inputTables
,
String
...
trait
Names
)
throws
Exception
{
AtlasEntity
createView
(
String
name
,
AtlasEntity
d
atabase
,
List
<
AtlasEntity
>
inputTables
,
String
...
classification
Names
)
throws
Exception
{
AtlasEntity
entity
=
new
AtlasEntity
(
VIEW_TYPE
);
entity
.
setClassifications
(
toAtlasClassifications
(
traitNames
));
// set attributes
entity
.
setAttribute
(
"name"
,
name
);
entity
.
setAttribute
(
AtlasClient
.
REFERENCEABLE_ATTRIBUTE_NAME
,
name
);
entity
.
setAttribute
(
"db"
,
db
);
entity
.
setAttribute
(
"inputTables"
,
inputTables
);
entity
.
setAttribute
(
REFERENCEABLE_ATTRIBUTE_NAME
,
name
+
CLUSTER_SUFFIX
);
// set relationship attributes
entity
.
setRelationshipAttribute
(
"db"
,
toAtlasRelatedObjectId
(
database
));
entity
.
setRelationshipAttribute
(
"inputTables"
,
toAtlasRelatedObjectIds
(
inputTables
));
return
createInstance
(
entity
,
traitNames
);
// set classifications
entity
.
setClassifications
(
toAtlasClassifications
(
classificationNames
));
return
createInstance
(
entity
);
}
private
void
verifyTypesCreated
()
throws
Exception
{
...
...
@@ -451,10 +540,12 @@ public class QuickStartV2 {
for
(
String
typeName
:
TYPES
)
{
searchParams
.
clear
();
searchParams
.
add
(
SearchFilter
.
PARAM_NAME
,
typeName
);
SearchFilter
searchFilter
=
new
SearchFilter
(
searchParams
);
AtlasTypesDef
searchDefs
=
atlasClientV2
.
getAllTypeDefs
(
searchFilter
);
assert
(!
searchDefs
.
isEmpty
());
System
.
out
.
println
(
"Created type ["
+
typeName
+
"]"
);
}
}
...
...
@@ -551,10 +642,9 @@ public class QuickStartV2 {
}
private
String
getTableId
(
String
tableName
)
throws
AtlasServiceException
{
Map
<
String
,
String
>
attributes
=
new
HashMap
<>();
attributes
.
put
(
AtlasClient
.
REFERENCEABLE_ATTRIBUTE_NAME
,
tableName
);
Map
<
String
,
String
>
attributes
=
Collections
.
singletonMap
(
REFERENCEABLE_ATTRIBUTE_NAME
,
tableName
+
CLUSTER_SUFFIX
);
AtlasEntity
tableEntity
=
atlasClientV2
.
getEntityByAttribute
(
TABLE_TYPE
,
attributes
).
getEntity
();
return
tableEntity
.
getGuid
();
}
}
\ No newline at end of file
webapp/src/test/java/org/apache/atlas/examples/QuickStartV2IT.java
View file @
01ea65c2
...
...
@@ -18,9 +18,9 @@
package
org
.
apache
.
atlas
.
examples
;
import
org.apache.atlas.AtlasClient
;
import
org.apache.atlas.AtlasServiceException
;
import
org.apache.atlas.model.instance.AtlasClassification
;
import
org.apache.atlas.model.instance.AtlasClassification.AtlasClassifications
;
import
org.apache.atlas.model.instance.AtlasEntity
;
import
org.apache.atlas.model.instance.AtlasEntityHeader
;
import
org.apache.atlas.model.lineage.AtlasLineageInfo
;
...
...
@@ -31,11 +31,24 @@ import org.testng.annotations.BeforeClass;
import
org.testng.annotations.Test
;
import
java.util.ArrayList
;
import
java.util.
HashMap
;
import
java.util.
Collections
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.UUID
;
import
static
org
.
apache
.
atlas
.
AtlasClient
.
REFERENCEABLE_ATTRIBUTE_NAME
;
import
static
org
.
apache
.
atlas
.
examples
.
QuickStartV2
.
CLUSTER_SUFFIX
;
import
static
org
.
apache
.
atlas
.
examples
.
QuickStartV2
.
LOAD_PROCESS_TYPE
;
import
static
org
.
apache
.
atlas
.
examples
.
QuickStartV2
.
LOAD_SALES_DAILY_PROCESS
;
import
static
org
.
apache
.
atlas
.
examples
.
QuickStartV2
.
LOAD_SALES_MONTHLY_PROCESS
;
import
static
org
.
apache
.
atlas
.
examples
.
QuickStartV2
.
PRODUCT_DIM_TABLE
;
import
static
org
.
apache
.
atlas
.
examples
.
QuickStartV2
.
PRODUCT_DIM_VIEW
;
import
static
org
.
apache
.
atlas
.
examples
.
QuickStartV2
.
SALES_DB
;
import
static
org
.
apache
.
atlas
.
examples
.
QuickStartV2
.
SALES_FACT_DAILY_MV_TABLE
;
import
static
org
.
apache
.
atlas
.
examples
.
QuickStartV2
.
SALES_FACT_MONTHLY_MV_TABLE
;
import
static
org
.
apache
.
atlas
.
examples
.
QuickStartV2
.
SALES_FACT_TABLE
;
import
static
org
.
apache
.
atlas
.
examples
.
QuickStartV2
.
TIME_DIM_TABLE
;
import
static
org
.
apache
.
atlas
.
examples
.
QuickStartV2
.
VIEW_TYPE
;
import
static
org
.
testng
.
Assert
.
assertEquals
;
import
static
org
.
testng
.
Assert
.
assertNotNull
;
import
static
org
.
testng
.
AssertJUnit
.
assertTrue
;
...
...
@@ -45,27 +58,30 @@ public class QuickStartV2IT extends BaseResourceIT {
@BeforeClass
public
void
runQuickStart
()
throws
Exception
{
super
.
setUp
();
QuickStartV2
.
runQuickstart
(
new
String
[]{},
new
String
[]{
"admin"
,
"admin"
});
}
@Test
public
void
testDBIsAdded
()
throws
Exception
{
AtlasEntity
db
=
getDB
(
QuickStartV2
.
SALES_DB
);
AtlasEntity
db
=
getDB
(
SALES_DB
);
Map
<
String
,
Object
>
dbAttributes
=
db
.
getAttributes
();
assertEquals
(
QuickStartV2
.
SALES_DB
,
dbAttributes
.
get
(
"name"
));
assertEquals
(
SALES_DB
,
dbAttributes
.
get
(
"name"
));
assertEquals
(
"sales database"
,
dbAttributes
.
get
(
"description"
));
}
private
AtlasEntity
getDB
(
String
dbName
)
throws
AtlasServiceException
{
Map
<
String
,
String
>
attributes
=
new
HashMap
<>();
attributes
.
put
(
"name"
,
dbName
);
Map
<
String
,
String
>
attributes
=
Collections
.
singletonMap
(
REFERENCEABLE_ATTRIBUTE_NAME
,
dbName
+
CLUSTER_SUFFIX
);
AtlasEntity
dbEntity
=
atlasClientV2
.
getEntityByAttribute
(
QuickStartV2
.
DATABASE_TYPE
,
attributes
).
getEntity
();
return
dbEntity
;
}
@Test
public
void
testTablesAreAdded
()
throws
AtlasServiceException
{
AtlasEntity
table
=
getTable
(
QuickStart
.
SALES_FACT_TABLE
);
AtlasEntity
table
=
getTable
(
SALES_FACT_TABLE
);
verifySimpleTableAttributes
(
table
);
verifyDBIsLinkedToTable
(
table
);
...
...
@@ -76,67 +92,72 @@ public class QuickStartV2IT extends BaseResourceIT {
}
private
AtlasEntity
getTable
(
String
tableName
)
throws
AtlasServiceException
{
Map
<
String
,
String
>
attributes
=
new
HashMap
<>();
attributes
.
put
(
AtlasClient
.
REFERENCEABLE_ATTRIBUTE_NAME
,
tableName
);
Map
<
String
,
String
>
attributes
=
Collections
.
singletonMap
(
REFERENCEABLE_ATTRIBUTE_NAME
,
tableName
+
CLUSTER_SUFFIX
);
AtlasEntity
tableEntity
=
atlasClientV2
.
getEntityByAttribute
(
QuickStartV2
.
TABLE_TYPE
,
attributes
).
getEntity
();
return
tableEntity
;
}
private
AtlasEntity
getProcess
(
String
processName
)
throws
AtlasServiceException
{
Map
<
String
,
String
>
attributes
=
new
HashMap
<>(
);
attributes
.
put
(
AtlasClient
.
REFERENCEABLE_ATTRIBUTE_NAME
,
processName
);
AtlasEntity
processEntity
=
atlasClientV2
.
getEntityByAttribute
(
QuickStartV2
.
LOAD_PROCESS_TYPE
,
attributes
).
getEntity
();
Map
<
String
,
String
>
attributes
=
Collections
.
singletonMap
(
REFERENCEABLE_ATTRIBUTE_NAME
,
processName
+
CLUSTER_SUFFIX
);
AtlasEntity
processEntity
=
atlasClientV2
.
getEntityByAttribute
(
LOAD_PROCESS_TYPE
,
attributes
).
getEntity
(
);
return
processEntity
;
}
private
void
verifyTrait
(
AtlasEntity
table
)
throws
AtlasServiceException
{
AtlasClassification
.
AtlasClassifications
classfications
=
atlasClientV2
.
getClassifications
(
table
.
getGuid
());
AtlasClassification
s
classfications
=
atlasClientV2
.
getClassifications
(
table
.
getGuid
());
List
<
AtlasClassification
>
traits
=
classfications
.
getList
();
assertNotNull
(
traits
.
get
(
0
).
getTypeName
());
}
private
void
verifyColumnsAreAddedToTable
(
AtlasEntity
table
)
{
Map
<
String
,
Object
>
tableAttributes
=
table
.
getAttributes
();
Map
<
String
,
Object
>
tableAttributes
=
table
.
get
Relationship
Attributes
();
List
<
Map
>
columns
=
(
List
<
Map
>)
tableAttributes
.
get
(
"columns"
);
assertEquals
(
4
,
columns
.
size
());
for
(
Map
colMap
:
columns
)
{
String
colGuid
=
(
String
)
colMap
.
get
(
"guid"
);
assertNotNull
(
UUID
.
fromString
(
colGuid
));
}
}
private
void
verifyDBIsLinkedToTable
(
AtlasEntity
table
)
throws
AtlasServiceException
{
AtlasEntity
db
=
getDB
(
QuickStartV2
.
SALES_DB
);
Map
<
String
,
Object
>
tableAttributes
=
table
.
getAttributes
();
AtlasEntity
db
=
getDB
(
SALES_DB
);
Map
<
String
,
Object
>
tableAttributes
=
table
.
get
Relationship
Attributes
();
Map
dbFromTable
=
(
Map
)
tableAttributes
.
get
(
"db"
);
assertEquals
(
db
.
getGuid
(),
dbFromTable
.
get
(
"guid"
));
}
private
void
verifySimpleTableAttributes
(
AtlasEntity
table
)
{
Map
<
String
,
Object
>
tableAttributes
=
table
.
getAttributes
();
assertEquals
(
QuickStartV2
.
SALES_FACT_TABLE
,
tableAttributes
.
get
(
"name"
));
assertEquals
(
SALES_FACT_TABLE
,
tableAttributes
.
get
(
"name"
));
assertEquals
(
"sales fact table"
,
tableAttributes
.
get
(
"description"
));
}
@Test
public
void
testProcessIsAdded
()
throws
AtlasServiceException
{
Map
<
String
,
String
>
attributes
=
new
HashMap
<>();
attributes
.
put
(
AtlasClient
.
REFERENCEABLE_ATTRIBUTE_NAME
,
QuickStartV2
.
LOAD_SALES_DAILY_PROCESS
);
AtlasEntity
loadProcess
=
atlasClientV2
.
getEntityByAttribute
(
QuickStartV2
.
LOAD_PROCESS_TYPE
,
attributes
).
getEntity
();
Map
<
String
,
String
>
attributes
=
Collections
.
singletonMap
(
REFERENCEABLE_ATTRIBUTE_NAME
,
LOAD_SALES_DAILY_PROCESS
+
CLUSTER_SUFFIX
);
AtlasEntity
loadProcess
=
atlasClientV2
.
getEntityByAttribute
(
LOAD_PROCESS_TYPE
,
attributes
).
getEntity
();
Map
loadProcessAttribs
=
loadProcess
.
getAttributes
();
assertEquals
(
QuickStartV2
.
LOAD_SALES_DAILY_PROCESS
,
loadProcessAttribs
.
get
(
AtlasClient
.
NAME
));
assertEquals
(
LOAD_SALES_DAILY_PROCESS
,
loadProcessAttribs
.
get
(
NAME
));
assertEquals
(
"hive query for daily summary"
,
loadProcessAttribs
.
get
(
"description"
));
List
inputs
=
(
List
)
loadProcessAttribs
.
get
(
"inputs"
);
List
outputs
=
(
List
)
loadProcessAttribs
.
get
(
"outputs"
);
assertEquals
(
2
,
inputs
.
size
());
String
salesFactTableId
=
getTableId
(
QuickStartV2
.
SALES_FACT_TABLE
);
String
timeDimTableId
=
getTableId
(
QuickStartV2
.
TIME_DIM_TABLE
);
String
salesFactDailyMVId
=
getTableId
(
QuickStartV2
.
SALES_FACT_DAILY_MV_TABLE
);
String
salesFactTableId
=
getTableId
(
SALES_FACT_TABLE
);
String
timeDimTableId
=
getTableId
(
TIME_DIM_TABLE
);
String
salesFactDailyMVId
=
getTableId
(
SALES_FACT_DAILY_MV_TABLE
);
assertEquals
(
salesFactTableId
,
((
Map
)
inputs
.
get
(
0
)).
get
(
"guid"
));
assertEquals
(
timeDimTableId
,
((
Map
)
inputs
.
get
(
1
)).
get
(
"guid"
));
...
...
@@ -153,12 +174,12 @@ public class QuickStartV2IT extends BaseResourceIT {
@Test
public
void
testLineageIsMaintained
()
throws
AtlasServiceException
{
String
salesFactTableId
=
getTableId
(
QuickStartV2
.
SALES_FACT_TABLE
);
String
timeDimTableId
=
getTableId
(
QuickStartV2
.
TIME_DIM_TABLE
);
String
salesFactDailyMVId
=
getTableId
(
QuickStartV2
.
SALES_FACT_DAILY_MV_TABLE
);
String
salesFactMonthlyMvId
=
getTableId
(
QuickStartV2
.
SALES_FACT_MONTHLY_MV_TABLE
);
String
salesDailyProcessId
=
getProcessId
(
QuickStartV2
.
LOAD_SALES_DAILY_PROCESS
);
String
salesMonthlyProcessId
=
getProcessId
(
QuickStartV2
.
LOAD_SALES_MONTHLY_PROCESS
);
String
salesFactTableId
=
getTableId
(
SALES_FACT_TABLE
);
String
timeDimTableId
=
getTableId
(
TIME_DIM_TABLE
);
String
salesFactDailyMVId
=
getTableId
(
SALES_FACT_DAILY_MV_TABLE
);
String
salesFactMonthlyMvId
=
getTableId
(
SALES_FACT_MONTHLY_MV_TABLE
);
String
salesDailyProcessId
=
getProcessId
(
LOAD_SALES_DAILY_PROCESS
);
String
salesMonthlyProcessId
=
getProcessId
(
LOAD_SALES_MONTHLY_PROCESS
);
AtlasLineageInfo
inputLineage
=
atlasClientV2
.
getLineageInfo
(
salesFactDailyMVId
,
LineageDirection
.
BOTH
,
0
);
List
<
LineageRelation
>
relations
=
new
ArrayList
<>(
inputLineage
.
getRelations
());
...
...
@@ -177,15 +198,17 @@ public class QuickStartV2IT extends BaseResourceIT {
@Test
public
void
testViewIsAdded
()
throws
AtlasServiceException
{
Map
<
String
,
String
>
attributes
=
new
HashMap
<>();
attributes
.
put
(
AtlasClient
.
REFERENCEABLE_ATTRIBUTE_NAME
,
QuickStartV2
.
PRODUCT_DIM_VIEW
);
AtlasEntity
view
=
atlasClientV2
.
getEntityByAttribute
(
QuickStartV2
.
VIEW_TYPE
,
attributes
).
getEntity
();
Map
<
String
,
String
>
attributes
=
Collections
.
singletonMap
(
REFERENCEABLE_ATTRIBUTE_NAME
,
PRODUCT_DIM_VIEW
+
CLUSTER_SUFFIX
);
AtlasEntity
view
=
atlasClientV2
.
getEntityByAttribute
(
VIEW_TYPE
,
attributes
).
getEntity
();
Map
<
String
,
Object
>
viewAttributes
=
view
.
getAttributes
();
assertEquals
(
QuickStartV2
.
PRODUCT_DIM_VIEW
,
viewAttributes
.
get
(
AtlasClient
.
NAME
)
);
Map
<
String
,
Object
>
viewRelationshipAttributes
=
view
.
getRelationshipAttributes
(
);
String
productDimId
=
getTable
(
QuickStartV2
.
PRODUCT_DIM_TABLE
).
getGuid
();
List
inputTables
=
(
List
)
viewAttributes
.
get
(
"inputTables"
);
assertEquals
(
PRODUCT_DIM_VIEW
,
viewAttributes
.
get
(
NAME
));
String
productDimId
=
getTable
(
PRODUCT_DIM_TABLE
).
getGuid
();
List
inputTables
=
(
List
)
viewRelationshipAttributes
.
get
(
"inputTables"
);
Map
inputTablesMap
=
(
Map
)
inputTables
.
get
(
0
);
assertEquals
(
productDimId
,
inputTablesMap
.
get
(
"guid"
));
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment