Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
atlas
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
dataplatform
atlas
Commits
4985b9f2
Commit
4985b9f2
authored
Jan 23, 2019
by
Ashutosh Mestry
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
ATLAS-3015: Classification Updater tool.
parent
a9aa5b0e
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
756 additions
and
0 deletions
+756
-0
pom.xml
tools/classification-updater/pom.xml
+114
-0
BulkFetchAndUpdate.java
.../main/java/org/apache/atlas/tools/BulkFetchAndUpdate.java
+510
-0
atlas-log4j.xml
...classification-updater/src/main/resources/atlas-log4j.xml
+42
-0
update-classifications.sh
...tion-updater/src/main/resources/update-classifications.sh
+90
-0
No files found.
tools/classification-updater/pom.xml
0 → 100644
View file @
4985b9f2
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
<parent>
<artifactId>
apache-atlas
</artifactId>
<groupId>
org.apache.atlas
</groupId>
<version>
2.0.0-SNAPSHOT
</version>
<relativePath>
../../
</relativePath>
</parent>
<artifactId>
atlas-classification-updater
</artifactId>
<description>
Apache Atlas classification updater Module
</description>
<name>
Apache Atlas classification updater
</name>
<packaging>
jar
</packaging>
<properties>
<calcite.version>
0.9.2-incubating
</calcite.version>
</properties>
<dependencies>
<!-- Logging -->
<dependency>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
</dependency>
<dependency>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-log4j12
</artifactId>
</dependency>
<dependency>
<groupId>
org.apache.atlas
</groupId>
<artifactId>
atlas-client-v1
</artifactId>
</dependency>
<dependency>
<groupId>
org.apache.atlas
</groupId>
<artifactId>
atlas-client-v2
</artifactId>
<version>
${project.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.atlas
</groupId>
<artifactId>
atlas-notification
</artifactId>
</dependency>
<dependency>
<groupId>
org.apache.atlas
</groupId>
<artifactId>
hdfs-model
</artifactId>
</dependency>
<!-- to bring up atlas server for integration tests -->
<dependency>
<groupId>
com.fasterxml.jackson.core
</groupId>
<artifactId>
jackson-databind
</artifactId>
<version>
${jackson.version}
</version>
</dependency>
<dependency>
<groupId>
org.testng
</groupId>
<artifactId>
testng
</artifactId>
</dependency>
<dependency>
<groupId>
org.mockito
</groupId>
<artifactId>
mockito-all
</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-jar-plugin
</artifactId>
<version>
2.4
</version>
<executions>
<execution>
<goals>
<goal>
test-jar
</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-compiler-plugin
</artifactId>
<configuration>
<source>
1.8
</source>
<target>
1.8
</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
tools/classification-updater/src/main/java/org/apache/atlas/tools/BulkFetchAndUpdate.java
0 → 100644
View file @
4985b9f2
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
atlas
.
tools
;
import
com.sun.jersey.core.util.MultivaluedMapImpl
;
import
org.apache.atlas.ApplicationProperties
;
import
org.apache.atlas.AtlasClientV2
;
import
org.apache.atlas.AtlasException
;
import
org.apache.atlas.AtlasServiceException
;
import
org.apache.atlas.model.SearchFilter
;
import
org.apache.atlas.model.instance.AtlasClassification
;
import
org.apache.atlas.model.instance.AtlasEntityHeader
;
import
org.apache.atlas.model.instance.AtlasEntityHeaders
;
import
org.apache.atlas.model.typedef.AtlasClassificationDef
;
import
org.apache.atlas.model.typedef.AtlasTypesDef
;
import
org.apache.atlas.type.AtlasType
;
import
org.apache.atlas.utils.AtlasJson
;
import
org.apache.atlas.utils.AuthenticationUtil
;
import
org.apache.commons.cli.CommandLine
;
import
org.apache.commons.cli.DefaultParser
;
import
org.apache.commons.cli.Options
;
import
org.apache.commons.cli.ParseException
;
import
org.apache.commons.configuration.Configuration
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.hadoop.security.UserGroupInformation
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
javax.ws.rs.core.MultivaluedMap
;
import
java.io.BufferedReader
;
import
java.io.File
;
import
java.io.FileNotFoundException
;
import
java.io.FileReader
;
import
java.io.FileWriter
;
import
java.io.IOException
;
import
java.text.SimpleDateFormat
;
import
java.util.Collections
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.TimeZone
;
import
java.util.function.Consumer
;
import
static
org
.
apache
.
atlas
.
model
.
instance
.
AtlasEntity
.
Status
.
DELETED
;
public
class
BulkFetchAndUpdate
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
BulkFetchAndUpdate
.
class
);
private
static
final
String
DATE_FORMAT_SUPPORTED
=
"yyyy-MM-dd'T'HH:mm:ss"
;
private
static
final
String
OPTION_FROM
=
"f"
;
private
static
final
String
APPLICATION_PROPERTY_ATLAS_ENDPOINT
=
"atlas.rest.address"
;
private
static
final
String
SYSTEM_PROPERTY_USER_DIR
=
"user.dir"
;
private
static
final
String
STEP_PREPARE
=
"prepare"
;
private
static
final
String
STEP_UPDATE
=
"update"
;
private
static
final
int
EXIT_CODE_SUCCESS
=
0
;
private
static
final
int
EXIT_CODE_FAILED
=
1
;
private
static
final
String
DEFAULT_ATLAS_URL
=
"http://localhost:21000/"
;
private
static
final
String
FILE_CLASSIFICATION_DEFS
=
"classification-definitions.json"
;
private
static
final
String
FILE_ENTITY_HEADERS
=
"entity-headers.json"
;
private
final
static
String
[]
filesToUse
=
new
String
[]
{
FILE_CLASSIFICATION_DEFS
,
FILE_ENTITY_HEADERS
};
public
static
void
main
(
String
[]
args
)
{
int
exitCode
=
EXIT_CODE_FAILED
;
try
{
long
fromTimestamp
=
0L
;
CommandLine
cmd
=
getCommandLine
(
args
);
String
stepToExecute
=
cmd
.
getOptionValue
(
"s"
).
trim
();
String
uid
=
cmd
.
getOptionValue
(
"u"
);
String
pwd
=
cmd
.
getOptionValue
(
"p"
);
String
directory
=
cmd
.
getOptionValue
(
"d"
);
String
fromTime
=
cmd
.
getOptionValue
(
OPTION_FROM
);
String
basePath
=
getDirectory
(
directory
);
displayCrLf
(
basePath
);
String
[]
atlasEndpoint
=
getAtlasRESTUrl
();
if
(
atlasEndpoint
==
null
||
atlasEndpoint
.
length
==
0
)
{
atlasEndpoint
=
new
String
[]{
DEFAULT_ATLAS_URL
};
}
if
(
StringUtils
.
equals
(
stepToExecute
,
STEP_PREPARE
))
{
if
(
StringUtils
.
isEmpty
(
fromTime
))
{
displayCrLf
(
"'fromTime' is empty"
+
fromTime
);
printUsage
();
return
;
}
fromTimestamp
=
getTimestamp
(
fromTime
);
displayCrLf
(
"fromTimestamp: "
+
fromTimestamp
);
if
(
fromTimestamp
==
0L
)
{
printUsage
();
return
;
}
}
process
(
stepToExecute
,
basePath
,
atlasEndpoint
,
uid
,
pwd
,
fromTimestamp
);
exitCode
=
EXIT_CODE_SUCCESS
;
}
catch
(
ParseException
e
)
{
LOG
.
error
(
"Failed to parse arguments. Error: "
,
e
.
getMessage
());
printUsage
();
}
catch
(
Exception
e
)
{
LOG
.
error
(
"Failed!"
,
e
);
displayCrLf
(
"Failed: "
+
e
.
getMessage
());
}
System
.
exit
(
exitCode
);
}
private
static
long
getTimestamp
(
String
str
)
{
try
{
if
(
StringUtils
.
isEmpty
(
str
))
{
return
0
;
}
TimeZone
utc
=
TimeZone
.
getDefault
();
SimpleDateFormat
simpleDateFormat
=
new
SimpleDateFormat
(
DATE_FORMAT_SUPPORTED
);
simpleDateFormat
.
setTimeZone
(
utc
);
return
simpleDateFormat
.
parse
(
str
).
getTime
();
}
catch
(
java
.
text
.
ParseException
e
)
{
displayCrLf
(
"Unsupported date format: "
+
str
);
return
0
;
}
}
private
static
void
process
(
String
stepToExecute
,
String
basePath
,
String
[]
atlasEndpoint
,
String
uid
,
String
pwd
,
long
fromTimestamp
)
throws
Exception
{
AtlasClientV2
atlasClientV2
=
getAtlasClientV2
(
atlasEndpoint
,
new
String
[]{
uid
,
pwd
});
switch
(
stepToExecute
)
{
case
STEP_PREPARE:
{
Preparer
p
=
new
Preparer
(
atlasClientV2
);
p
.
run
(
basePath
,
fromTimestamp
);
}
break
;
case
STEP_UPDATE:
{
Updater
u
=
new
Updater
(
atlasClientV2
);
u
.
run
(
basePath
);
}
break
;
default
:
printUsage
();
break
;
}
}
private
static
String
getDirectory
(
String
directory
)
{
String
basePath
=
System
.
getProperty
(
SYSTEM_PROPERTY_USER_DIR
)
+
File
.
separatorChar
;
if
(
StringUtils
.
isNotEmpty
(
directory
)
&&
checkDirectoryExists
(
directory
))
{
basePath
=
directory
+
File
.
separatorChar
;
}
else
{
display
(
"Using directory: "
);
}
return
basePath
;
}
private
static
CommandLine
getCommandLine
(
String
[]
args
)
throws
ParseException
{
Options
options
=
new
Options
();
options
.
addRequiredOption
(
"s"
,
"step"
,
true
,
"Step to run."
);
options
.
addOption
(
"u"
,
"user"
,
true
,
"User name."
);
options
.
addOption
(
"p"
,
"password"
,
true
,
"Password name."
);
options
.
addOption
(
"d"
,
"dir"
,
true
,
"Directory for reading/writing data."
);
options
.
addOption
(
OPTION_FROM
,
"fromDate"
,
true
,
"Date, in YYYY-MM-DD format, from where to start reading."
);
return
new
DefaultParser
().
parse
(
options
,
args
);
}
private
static
void
printUsage
()
{
System
.
out
.
println
();
displayCrLf
(
"Usage: classification-updater.sh [-s <step>] [-f <from time>] [-t <optional: to time>] [-d <dir>]"
);
displayCrLf
(
" step: Specify which step to execute:"
);
displayCrLf
(
" prepare: prepare classifications and associated entities."
);
displayCrLf
(
" update: update classifications and entities."
);
displayCrLf
(
" dir: [optional] Directory where read/write will happen."
);
displayCrLf
(
" If not specified, current directory will be used."
);
displayCrLf
(
" from: [mandatory for 'prepare' step, optional for 'update' step] Date, in YYYY-MM-DD format, from where audits need to be read."
);
displayCrLf
(
" If not specified, current directory will be used."
);
System
.
out
.
println
();
}
private
static
String
[]
getAtlasRESTUrl
()
{
Configuration
atlasConf
=
null
;
try
{
atlasConf
=
ApplicationProperties
.
get
();
return
atlasConf
.
getStringArray
(
APPLICATION_PROPERTY_ATLAS_ENDPOINT
);
}
catch
(
AtlasException
e
)
{
return
new
String
[]{
DEFAULT_ATLAS_URL
};
}
}
private
static
AtlasClientV2
getAtlasClientV2
(
String
[]
atlasEndpoint
,
String
[]
uidPwdFromCommandLine
)
throws
IOException
{
AtlasClientV2
atlasClientV2
;
if
(!
AuthenticationUtil
.
isKerberosAuthenticationEnabled
())
{
String
[]
uidPwd
=
(
uidPwdFromCommandLine
[
0
]
==
null
||
uidPwdFromCommandLine
[
1
]
==
null
)
?
AuthenticationUtil
.
getBasicAuthenticationInput
()
:
uidPwdFromCommandLine
;
atlasClientV2
=
new
AtlasClientV2
(
atlasEndpoint
,
uidPwd
);
}
else
{
UserGroupInformation
ugi
=
UserGroupInformation
.
getCurrentUser
();
atlasClientV2
=
new
AtlasClientV2
(
ugi
,
ugi
.
getShortUserName
(),
atlasEndpoint
);
}
return
atlasClientV2
;
}
private
static
void
displayCrLf
(
String
...
formatMessage
)
{
displayFn
(
System
.
out
::
println
,
formatMessage
);
}
private
static
void
display
(
String
...
formatMessage
)
{
displayFn
(
System
.
out
::
print
,
formatMessage
);
}
private
static
void
displayFn
(
Consumer
<
String
>
fn
,
String
...
formatMessage
)
{
if
(
formatMessage
.
length
==
1
)
{
fn
.
accept
(
formatMessage
[
0
]);
}
else
{
fn
.
accept
(
String
.
format
(
formatMessage
[
0
],
formatMessage
[
1
]));
}
}
private
static
void
closeReader
(
BufferedReader
bufferedReader
)
{
try
{
if
(
bufferedReader
==
null
)
{
return
;
}
bufferedReader
.
close
();
}
catch
(
IOException
ex
)
{
LOG
.
error
(
"closeReader"
,
ex
);
}
}
private
static
BufferedReader
getBufferedReader
(
String
basePath
,
String
fileName
)
throws
FileNotFoundException
{
return
new
BufferedReader
(
new
FileReader
(
basePath
+
fileName
));
}
private
static
boolean
fileCheck
(
String
basePath
,
String
[]
files
,
boolean
existCheck
)
{
boolean
ret
=
true
;
for
(
String
f
:
files
)
{
ret
=
ret
&&
fileCheck
(
basePath
,
f
,
existCheck
);
}
return
ret
;
}
private
static
boolean
fileCheck
(
String
basePath
,
String
file
,
boolean
existCheck
)
{
String
errorMessage
=
existCheck
?
"does not exist"
:
"exists"
;
if
(
checkFileExists
(
basePath
+
file
)
!=
existCheck
)
{
displayCrLf
(
String
.
format
(
"File '%s' %s!"
,
basePath
+
file
,
errorMessage
));
return
false
;
}
return
true
;
}
private
static
boolean
checkFileExists
(
String
fileName
)
{
File
f
=
new
File
(
fileName
);
return
f
.
exists
()
&&
!
f
.
isDirectory
();
}
private
static
boolean
checkDirectoryExists
(
String
fileName
)
{
File
f
=
new
File
(
fileName
);
return
f
.
exists
()
&&
f
.
isDirectory
();
}
private
static
FileWriter
getFileWriter
(
String
basePath
,
String
fileName
)
throws
IOException
{
String
filePath
=
basePath
+
fileName
;
displayCrLf
(
"Creating %s"
,
filePath
);
return
new
FileWriter
(
filePath
,
true
);
}
private
static
class
Preparer
{
private
static
final
String
ATTR_NAME_QUALIFIED_NAME
=
"qualifiedName"
;
private
AtlasClientV2
atlasClientV2
;
public
Preparer
(
AtlasClientV2
atlasClientV2
)
{
this
.
atlasClientV2
=
atlasClientV2
;
}
public
void
run
(
String
basePath
,
long
fromTimestamp
)
throws
Exception
{
if
(!
fileCheck
(
basePath
,
filesToUse
,
false
))
return
;
displayCrLf
(
"Starting: from: "
+
fromTimestamp
+
" to: "
+
"current time ("
+
System
.
currentTimeMillis
()
+
")..."
);
writeClassificationDefs
(
basePath
,
FILE_CLASSIFICATION_DEFS
,
getAllClassificationsDefs
());
writeEntityHeaders
(
basePath
,
FILE_ENTITY_HEADERS
,
fromTimestamp
);
displayCrLf
(
"Done!"
);
}
private
void
writeClassificationDefs
(
String
basePath
,
String
fileName
,
List
<
AtlasClassificationDef
>
classificationDefs
)
throws
IOException
{
FileWriter
fileWriter
=
null
;
try
{
fileWriter
=
getFileWriter
(
basePath
,
fileName
);
for
(
AtlasClassificationDef
classificationDef
:
classificationDefs
)
{
try
{
classificationDef
.
setGuid
(
null
);
String
json
=
AtlasType
.
toJson
(
classificationDef
);
fileWriter
.
write
(
json
+
"\n"
);
}
catch
(
Exception
e
)
{
LOG
.
error
(
"Error writing classifications: {}"
,
e
);
displayCrLf
(
"Error writing classifications."
);
}
}
}
finally
{
if
(
fileWriter
!=
null
)
{
fileWriter
.
close
();
}
}
}
private
void
writeEntityHeaders
(
String
basePath
,
String
fileName
,
long
fromTimestamp
)
throws
AtlasServiceException
,
IOException
{
FileWriter
fileWriter
=
null
;
try
{
fileWriter
=
getFileWriter
(
basePath
,
fileName
);
}
catch
(
IOException
e
)
{
LOG
.
error
(
"Error opening {}/{}"
,
basePath
,
fileName
,
e
);
displayCrLf
(
"Error opening: %"
,
basePath
+
File
.
separatorChar
+
fileName
);
return
;
}
try
{
AtlasEntityHeaders
response
=
atlasClientV2
.
getEntityHeaders
(
fromTimestamp
);
int
guidHeaderMapSize
=
response
.
getGuidHeaderMap
().
size
();
try
{
displayCrLf
(
"Read entities: "
+
guidHeaderMapSize
);
AtlasEntityHeaders
updatedHeaders
=
removeEntityGuids
(
response
);
fileWriter
.
write
(
AtlasType
.
toJson
(
updatedHeaders
));
displayCrLf
(
"Writing entities: "
+
updatedHeaders
.
getGuidHeaderMap
().
size
());
}
catch
(
Exception
e
)
{
LOG
.
error
(
"Error writing: {}"
,
guidHeaderMapSize
,
e
);
displayCrLf
(
"Error writing: "
+
e
.
toString
());
}
}
finally
{
if
(
fileWriter
!=
null
)
{
fileWriter
.
close
();
}
}
}
private
AtlasEntityHeaders
removeEntityGuids
(
AtlasEntityHeaders
headers
)
{
Map
<
String
,
AtlasEntityHeader
>
qualifiedNameHeaderMap
=
new
HashMap
<>();
for
(
AtlasEntityHeader
header
:
headers
.
getGuidHeaderMap
().
values
())
{
String
qualifiedName
=
getQualifiedName
(
header
);
displayCrLf
(
"Processing: "
+
qualifiedName
);
if
(
header
.
getStatus
()
==
DELETED
)
{
continue
;
}
header
.
setGuid
(
null
);
if
(
header
.
getClassifications
().
size
()
==
0
)
{
continue
;
}
boolean
keyFound
=
qualifiedNameHeaderMap
.
containsKey
(
qualifiedName
);
if
(!
keyFound
)
{
qualifiedNameHeaderMap
.
put
(
qualifiedName
,
header
);
}
AtlasEntityHeader
currentHeader
=
qualifiedNameHeaderMap
.
get
(
qualifiedName
);
for
(
AtlasClassification
c
:
header
.
getClassifications
())
{
c
.
setEntityGuid
(
null
);
if
(
keyFound
)
{
boolean
found
=
currentHeader
.
getClassifications
().
stream
().
anyMatch
(
ox
->
ox
.
getTypeName
().
equals
(
c
.
getTypeName
()));
if
(!
found
)
{
currentHeader
.
getClassifications
().
add
(
c
);
}
else
{
displayCrLf
(
"Ignoring: "
+
c
.
toString
());
LOG
.
warn
(
"Ignoring: {}"
,
AtlasJson
.
toJson
(
c
));
}
}
}
displayCrLf
(
"Processing: "
+
qualifiedName
);
}
displayCrLf
(
"Processed: "
+
qualifiedNameHeaderMap
.
size
());
headers
.
setGuidHeaderMap
(
qualifiedNameHeaderMap
);
return
headers
;
}
private
String
getQualifiedName
(
AtlasEntityHeader
header
)
{
return
(
String
)
header
.
getAttribute
(
ATTR_NAME_QUALIFIED_NAME
);
}
private
List
<
AtlasClassificationDef
>
getAllClassificationsDefs
()
throws
Exception
{
MultivaluedMap
<
String
,
String
>
searchParams
=
new
MultivaluedMapImpl
();
searchParams
.
add
(
SearchFilter
.
PARAM_TYPE
,
"CLASSIFICATION"
);
SearchFilter
searchFilter
=
new
SearchFilter
(
searchParams
);
AtlasTypesDef
typesDef
=
atlasClientV2
.
getAllTypeDefs
(
searchFilter
);
displayCrLf
(
"Found classifications: "
+
typesDef
.
getClassificationDefs
().
size
());
return
typesDef
.
getClassificationDefs
();
}
}
private
static
class
Updater
{
private
AtlasClientV2
atlasClientV2
;
public
Updater
(
AtlasClientV2
atlasClientV2
)
{
this
.
atlasClientV2
=
atlasClientV2
;
}
public
void
run
(
String
basePath
)
throws
Exception
{
if
(!
fileCheck
(
basePath
,
filesToUse
,
true
))
return
;
displayCrLf
(
"Starting..."
);
readAndCreateOrUpdateClassificationDefs
(
basePath
,
FILE_CLASSIFICATION_DEFS
);
readEntityUpdates
(
basePath
,
FILE_ENTITY_HEADERS
);
displayCrLf
(
"Done!"
);
}
private
void
readEntityUpdates
(
String
basePath
,
String
fileName
)
throws
IOException
{
BufferedReader
bufferedReader
=
null
;
try
{
bufferedReader
=
getBufferedReader
(
basePath
,
fileName
);
String
json
=
bufferedReader
.
readLine
();
if
(
StringUtils
.
isEmpty
(
json
))
{
displayCrLf
(
"Empty file encountered: %s"
,
fileName
);
return
;
}
AtlasEntityHeaders
response
=
AtlasType
.
fromJson
(
json
,
AtlasEntityHeaders
.
class
);
displayCrLf
(
"Found :"
+
response
.
getGuidHeaderMap
().
size
());
String
output
=
atlasClientV2
.
setClassifications
(
response
);
displayCrLf
(
output
);
}
catch
(
AtlasServiceException
e
)
{
displayCrLf
(
"Error updating. Please see log for details."
);
LOG
.
error
(
"Error updating. {}"
,
e
);
}
finally
{
closeReader
(
bufferedReader
);
}
}
private
void
readAndCreateOrUpdateClassificationDefs
(
String
basePath
,
String
fileName
)
throws
Exception
{
BufferedReader
bufferedReader
=
null
;
try
{
bufferedReader
=
getBufferedReader
(
basePath
,
fileName
);
for
(
String
cd
;
(
cd
=
bufferedReader
.
readLine
())
!=
null
;
)
{
AtlasClassificationDef
classificationDef
=
AtlasType
.
fromJson
(
cd
,
AtlasClassificationDef
.
class
);
createOrUpdateClassification
(
classificationDef
);
}
}
finally
{
closeReader
(
bufferedReader
);
}
}
private
void
createOrUpdateClassification
(
AtlasClassificationDef
classificationDef
)
{
String
name
=
classificationDef
.
getName
();
AtlasTypesDef
typesDef
=
new
AtlasTypesDef
(
null
,
null
,
Collections
.
singletonList
(
classificationDef
),
null
,
null
);
try
{
display
(
"%s -> "
,
name
);
atlasClientV2
.
createAtlasTypeDefs
(
typesDef
);
displayCrLf
(
" [Done]"
);
}
catch
(
AtlasServiceException
e
)
{
LOG
.
error
(
"{} skipped!"
,
name
,
e
);
displayCrLf
(
" [Skipped]"
,
name
);
updateClassification
(
classificationDef
);
}
}
private
void
updateClassification
(
AtlasClassificationDef
classificationDef
)
{
String
name
=
classificationDef
.
getName
();
AtlasTypesDef
typesDef
=
new
AtlasTypesDef
(
null
,
null
,
Collections
.
singletonList
(
classificationDef
),
null
,
null
);
try
{
display
(
"Update: %s -> "
,
name
);
atlasClientV2
.
updateAtlasTypeDefs
(
typesDef
);
displayCrLf
(
" [Done]"
);
}
catch
(
AtlasServiceException
e
)
{
LOG
.
error
(
"{} skipped!"
,
name
,
e
);
displayCrLf
(
" [Skipped]"
,
name
);
}
}
}
}
tools/classification-updater/src/main/resources/atlas-log4j.xml
0 → 100644
View file @
4985b9f2
<?xml version="1.0" encoding="UTF-8" ?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration
xmlns:log4j=
"http://jakarta.apache.org/log4j/"
>
<appender
name=
"FILE"
class=
"org.apache.log4j.RollingFileAppender"
>
<param
name=
"File"
value=
"/var/log/atlas/classification-updater.log"
/>
<param
name=
"Append"
value=
"true"
/>
<param
name=
"maxFileSize"
value=
"100MB"
/>
<param
name=
"maxBackupIndex"
value=
"20"
/>
<layout
class=
"org.apache.log4j.PatternLayout"
>
<param
name=
"ConversionPattern"
value=
"%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"
/>
</layout>
</appender>
<logger
name=
"org.apache.atlas.tools.BulkFetchAndUpdate"
additivity=
"false"
>
<level
value=
"info"
/>
<appender-ref
ref=
"FILE"
/>
</logger>
<root>
<priority
value=
"warn"
/>
<appender-ref
ref=
"FILE"
/>
</root>
</log4j:configuration>
tools/classification-updater/src/main/resources/update-classifications.sh
0 → 100644
View file @
4985b9f2
#!/bin/bash
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License. See accompanying LICENSE file.
#
# resolve links - $0 may be a softlink
PRG
=
"
${
0
}
"
[[
`
uname
-s
`
==
*
"CYGWIN"
*
]]
&&
CYGWIN
=
true
while
[
-h
"
${
PRG
}
"
]
;
do
ls
=
`
ls
-ld
"
${
PRG
}
"
`
link
=
`
expr
"
$ls
"
:
'.*-> \(.*\)$'
`
if
expr
"
$link
"
:
'/.*'
>
/dev/null
;
then
PRG
=
"
$link
"
else
PRG
=
`
dirname
"
${
PRG
}
"
`
/
"
$link
"
fi
done
BASEDIR
=
`
dirname
${
PRG
}
`
if
test
-z
"
${
JAVA_HOME
}
"
then
JAVA_BIN
=
`
which java
`
JAR_BIN
=
`
which jar
`
else
JAVA_BIN
=
"
${
JAVA_HOME
}
/bin/java"
JAR_BIN
=
"
${
JAVA_HOME
}
/bin/jar"
fi
export
JAVA_BIN
if
[
!
-e
"
${
JAVA_BIN
}
"
]
||
[
!
-e
"
${
JAR_BIN
}
"
]
;
then
echo
"
$JAVA_BIN
and/or
$JAR_BIN
not found on the system. Please make sure java and jar commands are available."
exit
1
fi
# Construct ATLAS_CONF where atlas-properties reside
export
ATLAS_CONF
=
/usr/hdp/current/atlas-server/conf/
# log dir for applications
ATLAS_LOG_DIR
=
"/var/log/atlas"
ATLAS_LOG_FILE
=
"classification-updater.log"
LOG_CONFIG
=
"
${
BASEDIR
}
/atlas-log4j.xml"
# Construct Atlas classpath.
for
i
in
"/usr/hdp/current/atlas-server/server/webapp/atlas/WEB-INF/lib/"
*
.jar
;
do
ATLASCPPATH
=
"
${
ATLASCPPATH
}
:
$i
"
done
for
i
in
"
${
BASEDIR
}
/"
*
.jar
;
do
ATLASCPPATH
=
"
${
ATLASCPPATH
}
:
$i
"
done
echo
"Logging:
${
ATLAS_LOG_DIR
}
/
${
ATLAS_LOG_FILE
}
"
echo
"Log config:
${
LOG_CONFIG
}
"
TIME
=
`
date
+%Y%m%d%H%M%s
`
CP
=
"
${
ATLASCPPATH
}
:
${
ATLAS_CONF
}
"
# If running in cygwin, convert pathnames and classpath to Windows format.
if
[
"
${
CYGWIN
}
"
==
"true"
]
then
ATLAS_LOG_DIR
=
`
cygpath
-w
${
ATLAS_LOG_DIR
}
`
ATLAS_LOG_FILE
=
`
cygpath
-w
${
ATLAS_LOG_FILE
}
`
CP
=
`
cygpath
-w
-p
${
CP
}
`
fi
JAVA_PROPERTIES
=
"
$ATLAS_OPTS
-Datlas.log.dir=
$ATLAS_LOG_DIR
-Datlas.log.file=
$ATLAS_LOG_FILE
-Dlog4j.configuration=file://
$LOG_CONFIG
"
IMPORT_ARGS
=
$@
JVM_ARGS
=
JAVA_PROPERTIES
=
"
${
JAVA_PROPERTIES
}
${
JVM_ARGS
}
"
"
${
JAVA_BIN
}
"
${
JAVA_PROPERTIES
}
-cp
"
${
CP
}
"
org.apache.atlas.tools.BulkFetchAndUpdate
$IMPORT_ARGS
RETVAL
=
$?
[
$RETVAL
-eq
0
]
&&
echo
Done!
[
$RETVAL
-ne
0
]
&&
echo
Failed!
exit
$RETVAL
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment