Commit 1bfda02a by Shwetha GS

ATLAS-58 Make hive hook reliable (shwethags)

parent 016e36c0
......@@ -33,7 +33,7 @@
<packaging>jar</packaging>
<properties>
<hive.version>1.2.0</hive.version>
<hive.version>1.2.1</hive.version>
<calcite.version>0.9.2-incubating</calcite.version>
</properties>
......@@ -98,6 +98,11 @@
<artifactId>atlas-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-notification</artifactId>
</dependency>
<!-- to bring up atlas server for integration tests -->
<dependency>
<groupId>org.apache.atlas</groupId>
......@@ -149,7 +154,7 @@
</configuration>
</execution>
<execution>
<id>copy</id>
<id>copy-hook</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
......@@ -209,6 +214,16 @@
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-notification</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>atlas-common</artifactId>
<version>${project.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
......@@ -228,6 +243,21 @@
<artifactId>scalap</artifactId>
<version>${scala.version}</version>
</artifactItem>
<artifactItem>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-multibindings</artifactId>
<version>${guice.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>${kafka.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</artifactItem>
</artifactItems>
</configuration>
</execution>
......@@ -253,12 +283,16 @@
<useTestScope>true</useTestScope>
<systemProperties>
<systemProperty>
<name>log4j.configuration</name>
<value>atlas-log4j.xml</value>
</systemProperty>
<systemProperty>
<name>atlas.log.dir</name>
<value>${project.build.directory}/logs</value>
</systemProperty>
</systemProperties>
<stopKey>atlas-stop</stopKey>
<stopPort>41001</stopPort>
<stopPort>21001</stopPort>
</configuration>
<executions>
<execution>
......@@ -302,7 +336,6 @@
<configuration>
<generateProjectInfo>false</generateProjectInfo>
<generateReports>false</generateReports>
<skip>false</skip>
</configuration>
</plugin>
</plugins>
......
......@@ -67,7 +67,7 @@ done
METADATA_LOG_DIR="${METADATA_LOG_DIR:-$BASEDIR/logs}"
export METADATA_LOG_DIR
JAVA_PROPERTIES="$METADATA_OPTS -Datlas.log.dir=$METADATA_LOG_DIR -Datlas.log.file=import-hive.log"
JAVA_PROPERTIES="$METADATA_OPTS -Datlas.log.dir=$METADATA_LOG_DIR -Datlas.log.file=import-hive.log -Dlog4j.configuration=atlas-log4j.xml"
shift
while [[ ${1} =~ ^\-D ]]; do
......
......@@ -64,11 +64,6 @@
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>apache-atlas</artifactId>
<groupId>org.apache.atlas</groupId>
<version>0.6-incubating-SNAPSHOT</version>
</parent>
<artifactId>atlas-common</artifactId>
<description>Apache Atlas Common</description>
<name>Apache Atlas Common</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-typesystem</artifactId>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
</dependencies>
</project>
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.service;
import org.apache.atlas.AtlasException;
/**
* Service interface to start any background jobs
*/
public interface Service {
void start() throws AtlasException;
void stop();
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.service;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Set;
/**
* Utility for starting and stopping all services
*/
@Singleton
public class Services {
public static final Logger LOG = LoggerFactory.getLogger(Services.class);
private final Set<Service> services;
@Inject
public Services(Set<Service> services) {
this.services = services;
}
public void start() {
try {
for (Service service : services) {
LOG.debug("Starting service {}", service.getClass().getName());
service.start();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void stop() {
for (Service service : services) {
LOG.debug("Stopping service {}", service.getClass().getName());
service.stop();
}
}
}
......@@ -19,7 +19,7 @@
'use strict';
angular.module('dgc.details').factory('DetailsResource', ['$resource', function($resource) {
return $resource('/api/atlas/entities/:id', {}, {
return $resource('/api/atlas/entity/:id', {}, {
get: {
method: 'GET',
transformResponse: function(data) {
......
......@@ -43,7 +43,7 @@ angular.module('dgc.search').controller('SearchController', ['$scope', '$locatio
}, function searchSuccess(response) {
$scope.resultCount = response.count;
$scope.results = response.results;
$scope.resultRows = $scope.results.rows;
$scope.resultRows = $scope.results;
$scope.totalItems = $scope.resultCount;
$scope.transformedResults = {};
$scope.dataTransitioned = false;
......@@ -59,7 +59,7 @@ angular.module('dgc.search').controller('SearchController', ['$scope', '$locatio
} else {
$scope.transformedResults = $scope.resultRows;
}
if ($scope.results.rows)
if ($scope.results)
$scope.searchMessage = $scope.resultCount + ' results matching your search query ' + $scope.query + ' were found';
else
$scope.searchMessage = '0 results matching your search query ' + $scope.query + ' were found';
......
......@@ -57,6 +57,35 @@
</python.path.l>
</properties>
</profile>
<profile>
<id>dist</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>single</goal>
</goals>
<phase>package</phase>
<configuration>
<descriptors>
<descriptor>src/main/assemblies/standalone-package.xml</descriptor>
<descriptor>src/main/assemblies/src-package.xml</descriptor>
</descriptors>
<finalName>apache-atlas-${project.version}</finalName>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<build>
......@@ -69,6 +98,7 @@
<executions>
<execution>
<configuration>
<skip>${skipTests}</skip>
<executable>python</executable>
<workingDirectory>src/test/python</workingDirectory>
<arguments>
......@@ -86,26 +116,6 @@
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<inherited>false</inherited>
<configuration>
<descriptors>
<descriptor>src/main/assemblies/standalone-package.xml</descriptor>
<descriptor>src/main/assemblies/src-package.xml</descriptor>
</descriptors>
<finalName>apache-atlas-${project.version}</finalName>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
......
......@@ -24,7 +24,7 @@ import atlas_config as mc
METADATA_LOG_OPTS="-Datlas.log.dir=%s -Datlas.log.file=application.log"
METADATA_COMMAND_OPTS="-Datlas.home=%s"
METADATA_CONFIG_OPTS="-Datlas.conf=%s"
DEFAULT_JVM_OPTS="-Xmx1024m -Dlog4j.configuration=atlas-log4j.xml"
DEFAULT_JVM_OPTS="-Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=atlas-log4j.xml"
def main():
......
......@@ -45,9 +45,16 @@ atlas.graph.index.search.elasticsearch.client-only=false
atlas.graph.index.search.elasticsearch.local-mode=true
atlas.graph.index.search.elasticsearch.create.sleep=2000
######### Notification Configs #########
atlas.notification.embedded=true
atlas.notification.kafka.data=${sys:atlas.home}/data/kafka
atlas.kafka.data=${sys:atlas.home}/data/kafka
atlas.kafka.zookeeper.connect=localhost:9026
atlas.kafka.bootstrap.servers=localhost:9027
atlas.kafka.zookeeper.session.timeout.ms=400
atlas.kafka.zookeeper.sync.time.ms=20
atlas.kafka.auto.commit.interval.ms=1000
######### Hive Lineage Configs #########
# This models reflects the base super types for Data and Process
......
......@@ -50,13 +50,6 @@
</fileSet>
<fileSet>
<directory>../addons/hive-bridge/src/bin</directory>
<outputDirectory>bin</outputDirectory>
<fileMode>0755</fileMode>
<directoryMode>0755</directoryMode>
</fileSet>
<fileSet>
<directory>../logs</directory>
<outputDirectory>logs</outputDirectory>
<directoryMode>0777</directoryMode>
......@@ -85,6 +78,13 @@
<!-- addons/hive -->
<fileSet>
<directory>../addons/hive-bridge/src/bin</directory>
<outputDirectory>bin</outputDirectory>
<fileMode>0755</fileMode>
<directoryMode>0755</directoryMode>
</fileSet>
<fileSet>
<directory>../addons/hive-bridge/target/dependency/bridge</directory>
<outputDirectory>bridge</outputDirectory>
</fileSet>
......@@ -93,12 +93,6 @@
<directory>../addons/hive-bridge/target/dependency/hook</directory>
<outputDirectory>hook</outputDirectory>
</fileSet>
<fileSet>
<directory>../addons/hive-bridge/target/site</directory>
<outputDirectory>docs/hive</outputDirectory>
</fileSet>
</fileSets>
<files>
......
......@@ -51,13 +51,13 @@ class TestMetadata(unittest.TestCase):
'org.apache.atlas.Main',
['-app', 'metadata_home/server/webapp/atlas'],
'metadata_home/conf:metadata_home/server/webapp/atlas/WEB-INF/classes:metadata_home/server/webapp/atlas/WEB-INF/lib\\*:metadata_home/libext\\*',
['-Datlas.log.dir=metadata_home/logs', '-Datlas.log.file=application.log', '-Datlas.home=metadata_home', '-Datlas.conf=metadata_home/conf', '-Xmx1024m', '-Dlog4j.configuration=atlas-log4j.xml'], 'metadata_home/logs')
['-Datlas.log.dir=metadata_home/logs', '-Datlas.log.file=application.log', '-Datlas.home=metadata_home', '-Datlas.conf=metadata_home/conf', '-Xmx1024m', '-XX:MaxPermSize=512m', '-Dlog4j.configuration=atlas-log4j.xml'], 'metadata_home/logs')
else:
java_mock.assert_called_with(
'org.apache.atlas.Main',
['-app', 'metadata_home/server/webapp/atlas'],
'metadata_home/conf:metadata_home/server/webapp/atlas/WEB-INF/classes:metadata_home/server/webapp/atlas/WEB-INF/lib/*:metadata_home/libext/*',
['-Datlas.log.dir=metadata_home/logs', '-Datlas.log.file=application.log', '-Datlas.home=metadata_home', '-Datlas.conf=metadata_home/conf', '-Xmx1024m', '-Dlog4j.configuration=atlas-log4j.xml'], 'metadata_home/logs')
['-Datlas.log.dir=metadata_home/logs', '-Datlas.log.file=application.log', '-Datlas.home=metadata_home', '-Datlas.conf=metadata_home/conf', '-Xmx1024m', '-XX:MaxPermSize=512m', '-Dlog4j.configuration=atlas-log4j.xml'], 'metadata_home/logs')
pass
def test_jar_java_lookups_fail(self):
......
......@@ -40,18 +40,12 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-site-plugin</artifactId>
<version>3.3</version>
<dependencies>
<dependency>
<groupId>org.apache.maven.doxia</groupId>
<artifactId>doxia-module-twiki</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-ssh-external</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
<executions>
<execution>
......@@ -62,45 +56,28 @@
</execution>
</executions>
<configuration>
<skip>${skipDocs}</skip>
<reportPlugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>2.3</version>
<reportSets>
<reportSet>
<reports>
<report>index</report>
<report>project-team</report>
<report>mailing-list</report>
<report>issue-tracking</report>
<report>license</report>
<report>scm</report>
</reports>
</reportSet>
</reportSets>
<configuration>
<dependencyDetailsEnabled>false</dependencyDetailsEnabled>
<dependencyLocationsEnabled>false</dependencyLocationsEnabled>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.7</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jxr-plugin</artifactId>
<version>2.1</version>
<configuration>
<aggregate>true</aggregate>
</configuration>
</plugin>
</reportPlugins>
<generateProjectInfo>false</generateProjectInfo>
<generateReports>false</generateReports>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>2.8.1</version>
<executions>
<execution>
<goals>
<goal>project-team</goal>
<goal>mailing-list</goal>
<goal>cim</goal>
<goal>issue-tracking</goal>
<goal>license</goal>
<goal>scm</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
......@@ -17,7 +17,7 @@
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" name="Metadata and Governance"
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" name="Apache Atlas"
xmlns="http://maven.apache.org/DECORATION/1.3.0"
xsi:schemaLocation="http://maven.apache.org/DECORATION/1.3.0 http://maven.apache.org/xsd/decoration-1.3.0.xsd">
......@@ -100,21 +100,10 @@
<menu name="Documentation">
<!-- current points to latest release -->
<item name="current" href="./0.5.0-incubating/index.html"/>
<item name="current" href="./index.html"/>
<item name="0.5-incubating" href="./0.5.0-incubating/index.html"/>
</menu>
<menu name="Resources">
<item name="Overview" href="index.html"/>
<item name="Getting Started" href="./QuickStart.html"/>
<item name="Architecture" href="./Architecture.html"/>
<item name="Installation" href="./InstallationSteps.html"/>
<item name="Type System" href="./TypeSystem.html"/>
<item name="Configuration" href="./Configuration.html"/>
<item name="Security" href="./Security.html"/>
<item name="Rest API" href="./api/rest.html"/>
</menu>
<menu name="ASF">
<item name="How Apache Works" href="http://www.apache.org/foundation/how-it-works.html"/>
<item name="Foundation" href="http://www.apache.org/foundation/"/>
......
......@@ -2,5 +2,30 @@
---++ Introduction
---++ Metadata High Level Architecture - Overview
<img src="architecture.png" height="400" width="600" />
---++ Atlas High Level Architecture - Overview
<img src="images/twiki/architecture.png" height="400" width="600" />
---++ Bridges
External components like hive/sqoop/storm/falcon should model their taxonomy using typesystem and register the types with Atlas. For every entity created in this external component, the corresponding entity should be registered in Atlas as well.
This is typically done in a hook which runs in the external component and is called for every entity operation. Hook generally processes the entity asynchronously using a thread pool to avoid adding latency to the main operation.
The hook can then build the entity and register the entity using Atlas REST APIs. Howerver, any failure in APIs because of network issue etc can in result entity not registered in Atlas and hence inconsistent metadata.
Atlas exposes notification interface and can be used for reliable entity registration by hook as well. The hook can send notification message containing the list of entities to be registered. Atlas service contains hook consumer that listens to these messages and registers the entities.
Available bridges are:
* [[hive/Bridge-Hive][Hive Bridge]]
---++ Notification
Notification is used for reliable entity registration from hooks and for entity/type change notifications. Atlas, by default, provides kafka integration, but its possible to provide other implementations as well. Atlas service starts embedded kafka server by default.
Atlas also provides NotificationHookConsumer that runs in Atlas Service and listens to messages from hook and registers the entities in Atlas.
<img src="images/twiki/notification.png" height="100" width="150" />
---+ Hive Atlas Bridge
Hive metadata can be modelled in Atlas using its Type System. The default modelling is available in org.apache.atlas.hive.model.HiveDataModelGenerator. It defines the following types:
* hive_resource_type(EnumType) - [JAR, FILE, ARCHIVE]
* hive_principal_type(EnumType) - [USER, ROLE, GROUP]
* hive_function_type(EnumType) - [JAVA]
* hive_order(StructType) - [col, order]
* hive_resourceuri(StructType) - [resourceType, uri]
* hive_serde(StructType) - [name, serializationLib, parameters]
* hive_process(ClassType) - [name, startTime, endTime, userName, sourceTableNames, targetTableNames, queryText, queryPlan, queryId, queryGraph]
* hive_function(ClassType) - [functionName, dbName, className, ownerName, ownerType, createTime, functionType, resourceUris]
* hive_type(ClassType) - [name, type1, type2, fields]
* hive_partition(ClassType) - [values, dbName, tableName, createTime, lastAccessTime, sd, parameters]
* hive_storagedesc(ClassType) - [cols, location, inputFormat, outputFormat, compressed, numBuckets, serdeInfo, bucketCols, sortCols, parameters, storedAsSubDirectories]
* hive_index(ClassType) - [indexName, indexHandlerClass, dbName, createTime, lastAccessTime, origTableName, indexTableName, sd, parameters, deferredRebuild]
* hive_role(ClassType) - [roleName, createTime, ownerName]
* hive_column(ClassType) - [name, type, comment]
* hive_db(ClassType) - [name, description, locationUri, parameters, ownerName, ownerType]
* hive_table(ClassType) - [name, dbName, owner, createTime, lastAccessTime, retention, sd, partitionKeys, columns, parameters, viewOriginalText, viewExpandedText, tableType, temporary]
---++ Hive Model
The default hive modelling is available in org.apache.atlas.hive.model.HiveDataModelGenerator. It defines the following types:
<verbatim>
hive_object_type(EnumType) - values [GLOBAL, DATABASE, TABLE, PARTITION, COLUMN]
hive_resource_type(EnumType) - values [JAR, FILE, ARCHIVE]
hive_principal_type(EnumType) - values [USER, ROLE, GROUP]
hive_db(ClassType) - super types [Referenceable] - attributes [name, clusterName, description, locationUri, parameters, ownerName, ownerType]
hive_order(StructType) - attributes [col, order]
hive_resourceuri(StructType) - attributes [resourceType, uri]
hive_serde(StructType) - attributes [name, serializationLib, parameters]
hive_type(ClassType) - super types [] - attributes [name, type1, type2, fields]
hive_storagedesc(ClassType) - super types [Referenceable] - attributes [cols, location, inputFormat, outputFormat, compressed, numBuckets, serdeInfo, bucketCols, sortCols, parameters, storedAsSubDirectories]
hive_role(ClassType) - super types [] - attributes [roleName, createTime, ownerName]
hive_column(ClassType) - super types [Referenceable] - attributes [name, type, comment]
hive_table(ClassType) - super types [DataSet] - attributes [tableName, db, owner, createTime, lastAccessTime, comment, retention, sd, partitionKeys, columns, parameters, viewOriginalText, viewExpandedText, tableType, temporary]
hive_partition(ClassType) - super types [Referenceable] - attributes [values, table, createTime, lastAccessTime, sd, columns, parameters]
hive_process(ClassType) - super types [Process] - attributes [startTime, endTime, userName, operationType, queryText, queryPlan, queryId, queryGraph]
</verbatim>
The entities are created and de-duped using unique qualified name. They provide namespace and can be used for querying as well:
hive_db - attribute qualifiedName - clustername.dbname
hive_table - attribute name - clustername.dbname.tablename
hive_partition - attribute qualifiedName - clustername.dbname.tablename.partitionvalues
hive_process - attribute qualifiedName - queryText
---++ Importing Hive Metadata
org.apache.atlas.hive.bridge.HiveMetaStoreBridge imports the hive metadata into Atlas using the typesystem defined in org.apache.atlas.hive.model.HiveDataModelGenerator. import-hive.sh command can be used to facilitate this.
Set-up the following configs in hive-site.xml of your hive set-up and set environment variable HIVE_CONFIG to the
hive conf directory:
org.apache.atlas.hive.bridge.HiveMetaStoreBridge imports the hive metadata into Atlas using the model defined in org.apache.atlas.hive.model.HiveDataModelGenerator. import-hive.sh command can be used to facilitate this.
Set-up the following configs in hive-site.xml of your hive set-up and set environment variable HIVE_CONFIG to the hive conf directory:
* Atlas endpoint - Add the following property with the Atlas endpoint for your set-up
<verbatim>
<property>
......@@ -34,20 +41,20 @@ hive conf directory:
</property>
</verbatim>
Usage: <dgi package>/bin/import-hive.sh. The logs are in <dgi package>/logs/import-hive.log
Usage: <atlas package>/bin/hive/import-hive.sh. The logs are in <atlas package>/logs/import-hive.log
---++ Hive Hook
Hive supports listeners on hive command execution using hive hooks. This is used to add/update/remove entities in Atlas using the model defined in org.apache.atlas.hive.model.HiveDataModelGenerator.
The hook submits the request to a thread pool executor to avoid blocking the command execution. Follow the these instructions in your hive set-up to add hive hook for Atlas:
* Add org.apache.atlas.hive.hook.HiveHook as post execution hook in hive-site.xml
The hook submits the request to a thread pool executor to avoid blocking the command execution. The thread submits the entities as message to the notification server and atlas server reads these messages and registers the entities.
Follow these instructions in your hive set-up to add hive hook for Atlas:
* Set-up atlas hook and atlas endpoint in hive-site.xml:
<verbatim>
<property>
<name>hive.exec.post.hooks</name>
<value>org.apache.atlas.hive.hook.HiveHook</value>
</property>
</verbatim>
* Add the following properties in hive-ste.xml with the Atlas endpoint for your set-up
<verbatim>
<property>
<name>atlas.rest.address</name>
......@@ -58,15 +65,19 @@ The hook submits the request to a thread pool executor to avoid blocking the com
<value>primary</value>
</property>
</verbatim>
* Add 'export HIVE_AUX_JARS_PATH=<dgi package>/hook/hive' in hive-env.sh
* Add 'export HIVE_AUX_JARS_PATH=<atlas package>/hook/hive' in hive-env.sh
* Copy <atlas package>/conf/application.properties to hive conf directory <hive package>/conf
The following properties in hive-site.xml control the thread pool details:
The following properties in hive-site.xml control the thread pool and notification details:
* atlas.hook.hive.synchronous - boolean, true to run the hook synchronously. default false
* atlas.hook.hive.numRetries - number of retries for notification failure. default 3
* atlas.hook.hive.minThreads - core number of threads. default 5
* atlas.hook.hive.maxThreads - maximum number of threads. default 5
* atlas.hook.hive.keepAliveTime - keep alive time in msecs. default 10
* atlas.hook.hive.synchronous - boolean, true to run the hook synchronously. default false
Refer [[Configuration][Configuration]] for notification related configurations
---++ Limitations
* Since database name, table name and column names are case insensitive in hive, the corresponding names in entities are lowercase. So, any search APIs should use lowercase while querying on the entity names
* Only the following hive operations are captured by hive hook currently - create database, create table, create view, CTAS, load, import, export, query, alter table rename and alter view rename
* Only the following hive operations are captured by hive hook currently - create database, create table, create view, CTAS, load, import, export, query, alter table rename and alter view rename
\ No newline at end of file
......@@ -49,6 +49,26 @@ atlas.lineage.hive.process.outputs.name=outputs
atlas.lineage.hive.table.schema.query=hive_table where name=?, columns
</verbatim>
---+++ Notification Configs
Refer http://kafka.apache.org/documentation.html#configuration for kafka configuration. All kafka configs should be prefixed with 'atlas.kafka.'
<verbatim>
atlas.notification.embedded=true
atlas.kafka.data=${sys:atlas.home}/data/kafka
atlas.kafka.zookeeper.connect=localhost:9026
atlas.kafka.bootstrap.servers=localhost:9027
atlas.kafka.zookeeper.session.timeout.ms=400
atlas.kafka.zookeeper.sync.time.ms=20
atlas.kafka.auto.commit.interval.ms=1000
</verbatim>
---+++ Client Configs
<verbatim>
atlas.client.readTimeoutMSecs=60000
atlas.client.connectTimeoutMSecs=60000
</verbatim>
---+++ Security Properties
---++++ SSL config
......
......@@ -14,7 +14,7 @@ Once the build successfully completes, artifacts can be packaged for deployment.
<verbatim>
mvn clean package -DskipTests -DskipCheck=true
mvn clean package -Pdist
</verbatim>
......
......@@ -8,12 +8,12 @@ instance graph below.
---+++ Example Type Definitions
<img src="guide-class-diagram.png"/>
<img src="images/twiki/guide-class-diagram.png"/>
---+++ Example Instance Graph
<img src="guide-instance-graph.png"/>
<img src="images/twiki/guide-instance-graph.png"/>
---++ Running the example
......
......@@ -5,10 +5,10 @@
---++ Overview
---+++ Data Types Overview
<img src="data-types.png" height="400" width="600" />
<img src="images/twiki/data-types.png" height="400" width="600" />
---+++ Types Instances Overview
<img src="types-instances.png" height="400" width="600" />
<img src="images/twiki/types-instances.png" height="400" width="600" />
---++ Details
......
......@@ -47,6 +47,8 @@ allows integration with the whole enterprise data ecosystem.
* [[Search][Search]]
* [[security][security]]
* [[Configuration][Configuration]]
* Bridges
* [[Bridge-Hive][Hive Bridge]]
---++ API Documentation
......@@ -56,4 +58,4 @@ allows integration with the whole enterprise data ecosystem.
#LicenseInfo
---+ Licensing Information
Metadata (Atlas) is distributed under [[http://www.apache.org/licenses/LICENSE-2.0][Apache License 2.0]].
Atlas is distributed under [[http://www.apache.org/licenses/LICENSE-2.0][Apache License 2.0]].
......@@ -27,7 +27,7 @@
<version>0.6-incubating-SNAPSHOT</version>
</parent>
<artifactId>atlas-notification</artifactId>
<description>Apache Atlas Client</description>
<description>Apache Atlas Notification</description>
<name>Apache Atlas Notification</name>
<packaging>jar</packaging>
......@@ -39,6 +39,11 @@
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-typesystem</artifactId>
</dependency>
......@@ -53,8 +58,8 @@
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-multibindings</artifactId>
</dependency>
<dependency>
......
......@@ -30,6 +30,7 @@ import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
......@@ -49,6 +50,9 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
......@@ -57,13 +61,11 @@ import java.util.Properties;
import java.util.concurrent.Future;
@Singleton
public class KafkaNotification extends NotificationInterface {
public class KafkaNotification extends NotificationInterface implements Service {
public static final Logger LOG = LoggerFactory.getLogger(KafkaNotification.class);
public static final String PROPERTY_PREFIX = NotificationInterface.PROPERTY_PREFIX + ".kafka";
public static final String PROPERTY_PREFIX = "atlas.kafka";
private static final int ATLAS_ZK_PORT = 9026;
private static final int ATLAS_KAFKA_PORT = 9027;
private static final String ATLAS_KAFKA_DATA = "data";
public static final String ATLAS_HOOK_TOPIC = "ATLAS_HOOK";
......@@ -92,9 +94,8 @@ public class KafkaNotification extends NotificationInterface {
}
}
@Override
public void initialize(Configuration applicationProperties) throws AtlasException {
super.initialize(applicationProperties);
public KafkaNotification(Configuration applicationProperties) throws AtlasException {
super(applicationProperties);
Configuration subsetConfiguration =
ApplicationProperties.getSubsetConfiguration(applicationProperties, PROPERTY_PREFIX);
properties = ConfigurationConverter.getProperties(subsetConfiguration);
......@@ -118,42 +119,42 @@ public class KafkaNotification extends NotificationInterface {
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "roundrobin");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest");
if (isEmbedded()) {
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + ATLAS_KAFKA_PORT);
properties.setProperty("zookeeper.connect", "localhost:" + ATLAS_ZK_PORT);
}
//todo new APIs not available yet
// consumer = new KafkaConsumer(properties);
// consumer.subscribe(ATLAS_HOOK_TOPIC);
}
@Override
protected void _startService() throws IOException {
startZk();
startKafka();
private URL getURL(String url) throws MalformedURLException {
try {
return new URL(url);
} catch(MalformedURLException e) {
return new URL("http://" + url);
}
}
private String startZk() throws IOException {
//todo read zk endpoint from config
this.factory = NIOServerCnxnFactory.createFactory(new InetSocketAddress("0.0.0.0", ATLAS_ZK_PORT), 1024);
private String startZk() throws IOException, InterruptedException, URISyntaxException {
String zkValue = properties.getProperty("zookeeper.connect");
LOG.debug("Starting zookeeper at {}", zkValue);
URL zkAddress = getURL(zkValue);
this.factory = NIOServerCnxnFactory.createFactory(
new InetSocketAddress(zkAddress.getHost(), zkAddress.getPort()), 1024);
File snapshotDir = constructDir("zk/txn");
File logDir = constructDir("zk/snap");
try {
factory.startup(new ZooKeeperServer(snapshotDir, logDir, 500));
} catch (InterruptedException e) {
throw new IOException(e);
}
factory.startup(new ZooKeeperServer(snapshotDir, logDir, 500));
return factory.getLocalAddress().getAddress().toString();
}
private void startKafka() {
private void startKafka() throws IOException, URISyntaxException {
String kafkaValue = properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
LOG.debug("Starting kafka at {}", kafkaValue);
URL kafkaAddress = getURL(kafkaValue);
Properties brokerConfig = properties;
brokerConfig.setProperty("broker.id", "1");
//todo read kafka endpoint from config
brokerConfig.setProperty("host.name", "0.0.0.0");
brokerConfig.setProperty("port", String.valueOf(ATLAS_KAFKA_PORT));
brokerConfig.setProperty("host.name", kafkaAddress.getHost());
brokerConfig.setProperty("port", String.valueOf(kafkaAddress.getPort()));
brokerConfig.setProperty("log.dirs", constructDir("kafka").getAbsolutePath());
brokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1));
......@@ -162,6 +163,29 @@ public class KafkaNotification extends NotificationInterface {
LOG.debug("Embedded kafka server started with broker config {}", brokerConfig);
}
@Override
public void start() throws AtlasException {
if (isEmbedded()) {
try {
startZk();
startKafka();
} catch(Exception e) {
throw new AtlasException("Failed to start embedded kafka", e);
}
}
}
@Override
public void stop() {
if (kafkaServer != null) {
kafkaServer.shutdown();
}
if (factory != null) {
factory.shutdown();
}
}
private static class SystemTime implements Time {
@Override
public long milliseconds() {
......@@ -192,29 +216,6 @@ public class KafkaNotification extends NotificationInterface {
}
@Override
public void _shutdown() {
if (producer != null) {
producer.close();
}
if (consumer != null) {
consumer.close();
}
for (ConsumerConnector consumerConnector : consumerConnectors) {
consumerConnector.shutdown();
}
if (kafkaServer != null) {
kafkaServer.shutdown();
}
if (factory != null) {
factory.shutdown();
}
}
@Override
public List<NotificationConsumer> createConsumers(NotificationType type, int numConsumers) {
String topic = topicMap.get(type);
......@@ -261,6 +262,24 @@ public class KafkaNotification extends NotificationInterface {
}
}
@Override
public void close() {
if (producer != null) {
producer.close();
producer = null;
}
if (consumer != null) {
consumer.close();
consumer = null;
}
for (ConsumerConnector consumerConnector : consumerConnectors) {
consumerConnector.shutdown();
}
consumerConnectors.clear();
}
//New API, not used now
private List<String> receive(long timeout) throws NotificationException {
Map<String, ConsumerRecords> recordsMap = consumer.poll(timeout);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.kafka;
import com.google.inject.Provider;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.commons.configuration.Configuration;
public class KafkaNotificationProvider implements Provider<KafkaNotification> {
@Override
@Provides
@Singleton
public KafkaNotification get() {
try {
Configuration applicationProperties = ApplicationProperties.get();
KafkaNotification instance = new KafkaNotification(applicationProperties);
return instance;
} catch(AtlasException e) {
throw new RuntimeException(e);
}
}
}
......@@ -18,37 +18,43 @@
package org.apache.atlas.notification;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.codehaus.jettison.json.JSONArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class NotificationHookConsumer {
/**
* Consumer of notifications from hooks e.g., hive hook etc
*/
@Singleton
public class NotificationHookConsumer implements Service {
private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
public static final String ATLAS_ENDPOINT_PROPERTY = "atlas.rest.address";
@Inject
private static NotificationInterface notificationInterface;
private static ExecutorService executors;
private static AtlasClient atlasClient;
private NotificationInterface notificationInterface;
private ExecutorService executors;
private AtlasClient atlasClient;
public static void start() throws AtlasException {
@Override
public void start() throws AtlasException {
Configuration applicationProperties = ApplicationProperties.get();
notificationInterface.initialize(applicationProperties);
String atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000");
atlasClient = new AtlasClient(atlasEndpoint);
int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 2);
int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
List<NotificationConsumer> consumers =
notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
executors = Executors.newFixedThreadPool(consumers.size());
......@@ -58,12 +64,20 @@ public class NotificationHookConsumer {
}
}
public static void stop() {
notificationInterface.shutdown();
executors.shutdown();
@Override
public void stop() {
//Allow for completion of outstanding work
notificationInterface.close();
try {
if (executors != null && !executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
}
} catch (InterruptedException e) {
LOG.error("Failure in shutting down consumers");
}
}
static class HookConsumer implements Runnable {
class HookConsumer implements Runnable {
private final NotificationConsumer consumer;
public HookConsumer(NotificationConsumer consumerInterface) {
......@@ -74,12 +88,13 @@ public class NotificationHookConsumer {
public void run() {
while(consumer.hasNext()) {
String entityJson = consumer.next();
LOG.debug("Processing message {}", entityJson);
LOG.info("Processing message {}", entityJson);
try {
atlasClient.createEntity(entityJson);
} catch (AtlasServiceException e) {
JSONArray guids = atlasClient.createEntity(new JSONArray(entityJson));
LOG.info("Create entities with guid {}", guids);
} catch (Exception e) {
//todo handle failures
LOG.warn("Error handling message {}", entityJson);
LOG.warn("Error handling message {}", entityJson, e);
}
}
}
......
......@@ -20,7 +20,6 @@ package org.apache.atlas.notification;
import org.apache.atlas.AtlasException;
import org.apache.commons.configuration.Configuration;
import java.io.IOException;
import java.util.List;
public abstract class NotificationInterface {
......@@ -33,26 +32,11 @@ public abstract class NotificationInterface {
HOOK, ENTITIES, TYPES
}
/**
* Initialise
* @param applicationProperties
* @throws AtlasException
*/
public void initialize(Configuration applicationProperties) throws AtlasException {
public NotificationInterface(Configuration applicationProperties) throws AtlasException {
this.embedded = applicationProperties.getBoolean(PROPERTY_EMBEDDED, false);
}
/**
* Start embedded notification service on atlast server
* @throws IOException
*/
public final void startService() throws IOException {
if (embedded) {
_startService();
}
}
/**
* Is the notification service embedded in atlas server
* @return
*/
......@@ -60,18 +44,9 @@ public abstract class NotificationInterface {
return embedded;
}
protected abstract void _startService() throws IOException;
/**
* Shutdown - close all the connections
*/
public final void shutdown() {
_shutdown();
}
protected abstract void _shutdown();
public abstract List<NotificationConsumer> createConsumers(NotificationType type, int numConsumers);
public abstract void send(NotificationType type, String... messages) throws NotificationException;
public abstract void close();
}
......@@ -18,11 +18,21 @@
package org.apache.atlas.notification;
import com.google.inject.AbstractModule;
import com.google.inject.Singleton;
import com.google.inject.multibindings.Multibinder;
import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.kafka.KafkaNotificationProvider;
import org.apache.atlas.service.Service;
public class NotificationModule extends AbstractModule {
@Override
protected void configure() {
bind(NotificationInterface.class).to(KafkaNotification.class).asEagerSingleton();
bind(NotificationInterface.class).to(KafkaNotification.class).in(Singleton.class);
bind(KafkaNotification.class).toProvider(KafkaNotificationProvider.class).in(Singleton.class);
Multibinder<Service> serviceBinder = Multibinder.newSetBinder(binder(), Service.class);
serviceBinder.addBinding().to(KafkaNotification.class);
serviceBinder.addBinding().to(NotificationHookConsumer.class);
}
}
......@@ -18,12 +18,10 @@
package org.apache.atlas.kafka;
import com.google.inject.Inject;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
......@@ -35,18 +33,15 @@ import org.testng.annotations.Test;
public class KafkaNotificationTest {
@Inject
private NotificationInterface kafka;
private KafkaNotification kafka;
@BeforeClass
public void setUp() throws Exception {
Configuration conf = ApplicationProperties.get();
conf.setProperty(KafkaNotification.PROPERTY_PREFIX + ".data", "target/data/kafka" + random());
kafka.initialize(conf);
kafka.startService();
kafka.start();
}
@Test
public void testSendMessage() throws AtlasException {
public void testSendReceiveMessage() throws AtlasException {
String msg1 = "message" + random();
String msg2 = "message" + random();
kafka.send(NotificationInterface.NotificationType.HOOK, msg1, msg2);
......@@ -63,6 +58,6 @@ public class KafkaNotificationTest {
@AfterClass
public void teardown() throws Exception {
kafka.shutdown();
kafka.stop();
}
}
......@@ -392,6 +392,7 @@
</profile>
</profiles>
<modules>
<module>common</module>
<module>typesystem</module>
<module>notification</module>
<module>client</module>
......@@ -972,6 +973,12 @@
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-webapp</artifactId>
<version>${project.version}</version>
<type>war</type>
......@@ -1298,7 +1305,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-site-plugin</artifactId>
<version>3.2</version>
<version>3.3</version>
</plugin>
<plugin>
......@@ -1348,13 +1355,6 @@
<goal>testCompile</goal>
</goals>
</execution>
<execution>
<id>attach-scaladocs</id>
<phase>verify</phase>
<goals>
<goal>doc-jar</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
......@@ -1428,6 +1428,9 @@
-Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=atlas-log4j.xml
-Djava.net.preferIPv4Stack=true
</argLine>
<excludes>
<exclude>**/*Base*</exclude>
</excludes>
</configuration>
<dependencies>
<dependency>
......
......@@ -4,6 +4,7 @@ Apache Atlas Release Notes
--trunk - unreleased
INCOMPATIBLE CHANGES:
ATLAS-58 Make hive hook reliable (shwethags)
ATLAS-54 Rename configs in hive hook (shwethags)
ATLAS-3 Mixed Index creation fails with Date types (suma.shivaprasad via shwethags)
......
......@@ -32,6 +32,7 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
@Inject
GraphProvider<TitanGraph> graphProvider;
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
if (titanGraph == null) {
titanGraph = graphProvider.get();
......
......@@ -18,11 +18,11 @@
package org.apache.atlas;
import com.google.inject.Singleton;
import com.google.inject.matcher.Matchers;
import com.google.inject.multibindings.Multibinder;
import com.google.inject.throwingproviders.ThrowingProviderBinder;
import com.thinkaurelius.titan.core.TitanGraph;
import com.tinkerpop.blueprints.Graph;
import org.aopalliance.intercept.MethodInterceptor;
import org.apache.atlas.discovery.DiscoveryService;
import org.apache.atlas.discovery.HiveLineageService;
......@@ -38,6 +38,7 @@ import org.apache.atlas.repository.typestore.GraphBackedTypeStore;
import org.apache.atlas.repository.typestore.ITypeStore;
import org.apache.atlas.services.DefaultMetadataService;
import org.apache.atlas.services.MetadataService;
import org.apache.atlas.typesystem.types.TypeSystem;
/**
* Guice module for Repository module.
......@@ -55,6 +56,8 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
// bind the MetadataRepositoryService interface to an implementation
bind(MetadataRepository.class).to(GraphBackedMetadataRepository.class).asEagerSingleton();
bind(TypeSystem.class).in(Singleton.class);
// bind the ITypeStore interface to an implementation
bind(ITypeStore.class).to(GraphBackedTypeStore.class).asEagerSingleton();
......
......@@ -21,6 +21,8 @@ package org.apache.atlas.listener;
import org.apache.atlas.AtlasException;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import java.util.Collection;
/**
* Entity (a Typed instance) change notification listener.
*/
......@@ -29,10 +31,10 @@ public interface EntityChangeListener {
/**
* This is upon adding a new typed instance to the repository.
*
* @param typedInstance a typed instance
* @param typedInstances a typed instance
* @throws AtlasException
*/
void onEntityAdded(ITypedReferenceableInstance typedInstance) throws AtlasException;
void onEntityAdded(Collection<ITypedReferenceableInstance> typedInstances) throws AtlasException;
/**
* This is upon adding a new trait to a typed instance.
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository;
import org.apache.atlas.AtlasException;
import org.apache.atlas.typesystem.IReferenceableInstance;
public class EntityExistsException extends AtlasException {
public EntityExistsException(IReferenceableInstance typedInstance, Exception e) {
super("Model violation for type "+ typedInstance.getTypeName(), e);
}
public EntityExistsException(IReferenceableInstance typedInstance) {
super("Model violation for type "+ typedInstance.getTypeName());
}
}
......@@ -19,7 +19,6 @@
package org.apache.atlas.repository;
import org.apache.atlas.AtlasException;
import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.ITypedStruct;
import org.apache.atlas.typesystem.types.AttributeInfo;
......@@ -76,11 +75,12 @@ public interface MetadataRepository {
/**
* Creates an entity definition (instance) corresponding to a given type.
*
* @param entity entity (typed instance)
* @param entities entity (typed instance)
* @return a globally unique identifier
* @throws RepositoryException
* @throws EntityExistsException
*/
String createEntity(IReferenceableInstance entity) throws RepositoryException;
String[] createEntities(ITypedReferenceableInstance... entities) throws RepositoryException, EntityExistsException;
/**
* Fetch the complete definition of an entity given its GUID.
......@@ -158,4 +158,13 @@ public interface MetadataRepository {
* @param value property value
*/
void updateEntity(String guid, String property, String value) throws RepositoryException;
/**
* Returns the entity for the given type and qualified name
* @param entityType
* @param attribute
* @param value
* @return entity instance
*/
ITypedReferenceableInstance getEntityDefinition(String entityType, String attribute, String value) throws AtlasException;
}
......@@ -46,34 +46,35 @@ public final class GraphHelper {
}
public static Vertex createVertexWithIdentity(Graph graph, ITypedReferenceableInstance typedInstance,
Set<String> superTypeNames) {
final Vertex vertexWithIdentity =
createVertexWithoutIdentity(graph, typedInstance.getTypeName(), typedInstance.getId(), superTypeNames);
Set<String> superTypeNames) {
final Vertex vertexWithIdentity = createVertexWithoutIdentity(graph, typedInstance.getTypeName(),
typedInstance.getId(), superTypeNames);
// add identity
final String guid = UUID.randomUUID().toString();
vertexWithIdentity.setProperty(Constants.GUID_PROPERTY_KEY, guid);
setProperty(vertexWithIdentity, Constants.GUID_PROPERTY_KEY, guid);
return vertexWithIdentity;
}
public static Vertex createVertexWithoutIdentity(Graph graph, String typeName, Id typedInstanceId,
Set<String> superTypeNames) {
Set<String> superTypeNames) {
LOG.debug("Creating vertex for type {} id {}", typeName, typedInstanceId._getId());
final Vertex vertexWithoutIdentity = graph.addVertex(null);
// add type information
vertexWithoutIdentity.setProperty(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName);
setProperty(vertexWithoutIdentity, Constants.ENTITY_TYPE_PROPERTY_KEY, typeName);
// add super types
for (String superTypeName : superTypeNames) {
((TitanVertex) vertexWithoutIdentity).addProperty(Constants.SUPER_TYPES_PROPERTY_KEY, superTypeName);
addProperty(vertexWithoutIdentity, Constants.SUPER_TYPES_PROPERTY_KEY, superTypeName);
}
// add version information
vertexWithoutIdentity.setProperty(Constants.VERSION_PROPERTY_KEY, typedInstanceId.version);
setProperty(vertexWithoutIdentity, Constants.VERSION_PROPERTY_KEY, typedInstanceId.version);
// add timestamp information
vertexWithoutIdentity.setProperty(Constants.TIMESTAMP_PROPERTY_KEY, System.currentTimeMillis());
setProperty(vertexWithoutIdentity, Constants.TIMESTAMP_PROPERTY_KEY, System.currentTimeMillis());
return vertexWithoutIdentity;
}
......@@ -84,12 +85,12 @@ public final class GraphHelper {
return titanGraph.addEdge(null, fromVertex, toVertex, edgeLabel);
}
public static Vertex findVertexByGUID(TitanGraph titanGraph, String value) {
LOG.debug("Finding vertex for key={}, value={}", Constants.GUID_PROPERTY_KEY, value);
public static Vertex findVertex(TitanGraph titanGraph, String propertyKey, Object value) {
LOG.debug("Finding vertex for {}={}", propertyKey, value);
GraphQuery query = titanGraph.query().has(Constants.GUID_PROPERTY_KEY, value);
GraphQuery query = titanGraph.query().has(propertyKey, value);
Iterator<Vertex> results = query.vertices().iterator();
// returning one since guid should be unique
// returning one since entityType, qualifiedName should be unique
return results.hasNext() ? results.next() : null;
}
......@@ -107,6 +108,16 @@ public final class GraphHelper {
+ edge.getVertex(Direction.IN) + "]";
}
public static void setProperty(Vertex vertex, String propertyName, Object value) {
LOG.debug("Setting property {} = \"{}\" to vertex {}", propertyName, value, vertex);
vertex.setProperty(propertyName, value);
}
public static void addProperty(Vertex vertex, String propertyName, Object value) {
LOG.debug("Setting property {} = \"{}\" to vertex {}", propertyName, value, vertex);
((TitanVertex)vertex).addProperty(propertyName, value);
}
/*
public static void dumpToLog(final Graph graph) {
LOG.debug("*******************Graph Dump****************************");
......
......@@ -66,7 +66,7 @@ public interface MetadataService {
* @param entityDefinition definition
* @return guid
*/
String createEntity(String entityDefinition) throws AtlasException;
String createEntities(String entityDefinition) throws AtlasException;
/**
* Return the definition for the given guid.
......@@ -77,6 +77,16 @@ public interface MetadataService {
String getEntityDefinition(String guid) throws AtlasException;
/**
* Return the definition given type and attribute. The attribute has to be unique attribute for the type
* @param entityType - type name
* @param attribute - attribute name
* @param value - attribute value
* @return
* @throws AtlasException
*/
String getEntityDefinition(String entityType, String attribute, String value) throws AtlasException;
/**
* Return the list of entity names for the given type in the repository.
*
* @param entityType type
......
......@@ -19,11 +19,8 @@
package org.apache.atlas;
import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.util.TitanCleanup;
import org.apache.atlas.repository.graph.GraphProvider;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
......
......@@ -85,7 +85,7 @@ public class GraphBackedDiscoveryServiceTest {
ClassType deptType = typeSystem.getDataType(ClassType.class, "Department");
ITypedReferenceableInstance hrDept2 = deptType.convert(hrDept, Multiplicity.REQUIRED);
repositoryService.createEntity(hrDept2);
repositoryService.createEntities(hrDept2);
}
private void setupSampleData() throws ScriptException {
......@@ -303,6 +303,6 @@ public class GraphBackedDiscoveryServiceTest {
ClassType deptType = TypeSystem.getInstance().getDataType(ClassType.class, "D");
ITypedReferenceableInstance typedInstance = deptType.convert(instance, Multiplicity.REQUIRED);
repositoryService.createEntity(typedInstance);
repositoryService.createEntities(typedInstance);
}
}
\ No newline at end of file
......@@ -54,9 +54,6 @@ import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
/**
......@@ -534,7 +531,10 @@ public class HiveLineageServiceTest {
String entityJSON = InstanceSerialization.toJson(referenceable, true);
System.out.println("Submitting new entity= " + entityJSON);
String guid = metadataService.createEntity(entityJSON);
JSONArray jsonArray = new JSONArray();
jsonArray.put(entityJSON);
String response = metadataService.createEntities(jsonArray.toString());
String guid = new JSONArray(response).getString(0);
System.out.println("created instance for type " + typeName + ", guid: " + guid);
// return the reference to created instance with guid
......
......@@ -18,172 +18,13 @@
package org.apache.atlas.repository;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.atlas.AtlasException;
import org.apache.atlas.repository.memory.MemRepository;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.types.AttributeDefinition;
import org.apache.atlas.typesystem.types.ClassType;
import org.apache.atlas.typesystem.types.DataTypes;
import org.apache.atlas.typesystem.types.HierarchicalType;
import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
import org.apache.atlas.typesystem.types.IDataType;
import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.StructType;
import org.apache.atlas.typesystem.types.StructTypeDefinition;
import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.junit.Before;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Date;
import java.util.Map;
public abstract class BaseTest {
public static final String STRUCT_TYPE_1 = "t1";
public static final String STRUCT_TYPE_2 = "t2";
public static final String TEST_DATE = "2014-12-11T02:35:58.440Z";
public static final long TEST_DATE_IN_LONG = 1418265358440L;
protected IRepository repo;
public static Struct createStruct() throws AtlasException {
StructType structType = (StructType) TypeSystem.getInstance().getDataType(StructType.class, STRUCT_TYPE_1);
Struct s = new Struct(structType.getName());
s.set("a", 1);
s.set("b", true);
s.set("c", (byte) 1);
s.set("d", (short) 2);
s.set("e", 1);
s.set("f", 1);
s.set("g", 1L);
s.set("h", 1.0f);
s.set("i", 1.0);
s.set("j", BigInteger.valueOf(1L));
s.set("k", new BigDecimal(1));
s.set("l", new Date(1418265358440L));
s.set("m", Lists.<Integer>asList(Integer.valueOf(1), new Integer[]{Integer.valueOf(1)}));
s.set("n", Lists.<BigDecimal>asList(BigDecimal.valueOf(1.1), new BigDecimal[]{BigDecimal.valueOf(1.1)}));
Map<String, Double> hm = Maps.<String, Double>newHashMap();
hm.put("a", 1.0);
hm.put("b", 2.0);
s.set("o", hm);
return s;
}
protected final TypeSystem getTypeSystem() {
return TypeSystem.getInstance();
}
protected final IRepository getRepository() {
return repo;
}
@Before
public void setup() throws Exception {
TypeSystem ts = TypeSystem.getInstance();
ts.reset();
repo = new MemRepository(ts);
StructType structType =
ts.defineStructType(STRUCT_TYPE_1, true, TypesUtil.createRequiredAttrDef("a", DataTypes.INT_TYPE),
TypesUtil.createOptionalAttrDef("b", DataTypes.BOOLEAN_TYPE),
TypesUtil.createOptionalAttrDef("c", DataTypes.BYTE_TYPE),
TypesUtil.createOptionalAttrDef("d", DataTypes.SHORT_TYPE),
TypesUtil.createOptionalAttrDef("e", DataTypes.INT_TYPE),
TypesUtil.createOptionalAttrDef("f", DataTypes.INT_TYPE),
TypesUtil.createOptionalAttrDef("g", DataTypes.LONG_TYPE),
TypesUtil.createOptionalAttrDef("h", DataTypes.FLOAT_TYPE),
TypesUtil.createOptionalAttrDef("i", DataTypes.DOUBLE_TYPE),
TypesUtil.createOptionalAttrDef("j", DataTypes.BIGINTEGER_TYPE),
TypesUtil.createOptionalAttrDef("k", DataTypes.BIGDECIMAL_TYPE),
TypesUtil.createOptionalAttrDef("l", DataTypes.DATE_TYPE),
TypesUtil.createOptionalAttrDef("m", ts.defineArrayType(DataTypes.INT_TYPE)),
TypesUtil.createOptionalAttrDef("n", ts.defineArrayType(DataTypes.BIGDECIMAL_TYPE)), TypesUtil
.createOptionalAttrDef("o",
ts.defineMapType(DataTypes.STRING_TYPE, DataTypes.DOUBLE_TYPE)));
StructType recursiveStructType =
ts.defineStructType(STRUCT_TYPE_2, true, TypesUtil.createRequiredAttrDef("a", DataTypes.INT_TYPE),
TypesUtil.createOptionalAttrDef("s", STRUCT_TYPE_2));
}
protected Map<String, IDataType> defineTraits(HierarchicalTypeDefinition... tDefs) throws AtlasException {
return getTypeSystem().defineTraitTypes(tDefs);
}
/*
* Class Hierarchy is:
* Department(name : String, employees : Array[Person])
* Person(name : String, department : Department, manager : Manager)
* Manager(subordinates : Array[Person]) extends Person
*
* Persons can have SecurityClearance(level : Int) clearance.
*/
protected void defineDeptEmployeeTypes(TypeSystem ts) throws AtlasException {
HierarchicalTypeDefinition<ClassType> deptTypeDef = TypesUtil
.createClassTypeDef("Department", ImmutableList.<String>of(),
TypesUtil.createRequiredAttrDef("name", DataTypes.STRING_TYPE),
new AttributeDefinition("employees", String.format("array<%s>", "Person"),
Multiplicity.COLLECTION, true, "department"));
HierarchicalTypeDefinition<ClassType> personTypeDef = TypesUtil
.createClassTypeDef("Person", ImmutableList.<String>of(),
TypesUtil.createRequiredAttrDef("name", DataTypes.STRING_TYPE),
new AttributeDefinition("department", "Department", Multiplicity.REQUIRED, false, "employees"),
new AttributeDefinition("manager", "Manager", Multiplicity.OPTIONAL, false, "subordinates"));
HierarchicalTypeDefinition<ClassType> managerTypeDef = TypesUtil
.createClassTypeDef("Manager", ImmutableList.<String>of("Person"),
new AttributeDefinition("subordinates", String.format("array<%s>", "Person"),
Multiplicity.COLLECTION, false, "manager"));
HierarchicalTypeDefinition<TraitType> securityClearanceTypeDef = TypesUtil
.createTraitTypeDef("SecurityClearance", ImmutableList.<String>of(),
TypesUtil.createRequiredAttrDef("level", DataTypes.INT_TYPE));
ts.defineTypes(ImmutableList.<StructTypeDefinition>of(),
ImmutableList.<HierarchicalTypeDefinition<TraitType>>of(securityClearanceTypeDef),
ImmutableList.<HierarchicalTypeDefinition<ClassType>>of(deptTypeDef, personTypeDef, managerTypeDef));
ImmutableList<HierarchicalType> types = ImmutableList
.of(ts.getDataType(HierarchicalType.class, "SecurityClearance"),
ts.getDataType(ClassType.class, "Department"), ts.getDataType(ClassType.class, "Person"),
ts.getDataType(ClassType.class, "Manager"));
repo.defineTypes(types);
}
protected Referenceable createDeptEg1(TypeSystem ts) throws AtlasException {
Referenceable hrDept = new Referenceable("Department");
Referenceable john = new Referenceable("Person");
Referenceable jane = new Referenceable("Manager", "SecurityClearance");
hrDept.set("name", "hr");
john.set("name", "John");
john.set("department", hrDept);
jane.set("name", "Jane");
jane.set("department", hrDept);
john.set("manager", jane);
hrDept.set("employees", ImmutableList.<Referenceable>of(john, jane));
jane.set("subordinates", ImmutableList.<Referenceable>of(john));
jane.getTrait("SecurityClearance").set("level", 1);
ClassType deptType = ts.getDataType(ClassType.class, "Department");
ITypedReferenceableInstance hrDept2 = deptType.convert(hrDept, Multiplicity.REQUIRED);
return hrDept;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.memory;
import org.apache.atlas.AtlasException;
import org.apache.atlas.repository.BaseTest;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.types.ClassType;
import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class ClassTest extends BaseTest {
@Before
public void setup() throws Exception {
super.setup();
}
@Test
public void test1() throws AtlasException {
TypeSystem ts = getTypeSystem();
defineDeptEmployeeTypes(ts);
Referenceable hrDept = createDeptEg1(ts);
ClassType deptType = ts.getDataType(ClassType.class, "Department");
ITypedReferenceableInstance hrDept2 = deptType.convert(hrDept, Multiplicity.REQUIRED);
Assert.assertEquals(hrDept2.toString(), "{\n" +
"\tid : (type: Department, id: <unassigned>)\n" +
"\tname : \thr\n" +
"\temployees : \t[{\n" +
"\tid : (type: Person, id: <unassigned>)\n" +
"\tname : \tJohn\n" +
"\tdepartment : (type: Department, id: <unassigned>)\n" +
"\tmanager : (type: Manager, id: <unassigned>)\n" +
"}, {\n" +
"\tid : (type: Manager, id: <unassigned>)\n" +
"\tsubordinates : \t[{\n" +
"\tid : (type: Person, id: <unassigned>)\n" +
"\tname : \tJohn\n" +
"\tdepartment : (type: Department, id: <unassigned>)\n" +
"\tmanager : (type: Manager, id: <unassigned>)\n" +
"}]\n" +
"\tname : \tJane\n" +
"\tdepartment : (type: Department, id: <unassigned>)\n" +
"\tmanager : <null>\n" +
"\n" +
"\tSecurityClearance : \t{\n" +
"\t\tlevel : \t\t1\n" +
"\t}}]\n" +
"}");
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.memory;
import com.google.common.collect.ImmutableList;
import org.apache.atlas.AtlasException;
import org.apache.atlas.repository.BaseTest;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.InstanceSerialization$;
import org.apache.atlas.typesystem.json.Serialization$;
import org.apache.atlas.typesystem.json.TypesSerialization$;
import org.apache.atlas.typesystem.types.AttributeDefinition;
import org.apache.atlas.typesystem.types.ClassType;
import org.apache.atlas.typesystem.types.DataTypes;
import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.StructTypeDefinition;
import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
public class InstanceE2ETest extends BaseTest {
protected List<HierarchicalTypeDefinition> createHiveTypes(TypeSystem typeSystem) throws AtlasException {
ArrayList<HierarchicalTypeDefinition> typeDefinitions = new ArrayList<>();
HierarchicalTypeDefinition<ClassType> databaseTypeDefinition = TypesUtil
.createClassTypeDef("hive_database", ImmutableList.<String>of(),
TypesUtil.createRequiredAttrDef("name", DataTypes.STRING_TYPE),
TypesUtil.createRequiredAttrDef("description", DataTypes.STRING_TYPE));
typeDefinitions.add(databaseTypeDefinition);
HierarchicalTypeDefinition<ClassType> tableTypeDefinition = TypesUtil
.createClassTypeDef("hive_table", ImmutableList.<String>of(),
TypesUtil.createRequiredAttrDef("name", DataTypes.STRING_TYPE),
TypesUtil.createRequiredAttrDef("description", DataTypes.STRING_TYPE),
TypesUtil.createRequiredAttrDef("type", DataTypes.STRING_TYPE),
new AttributeDefinition("hive_database", "hive_database", Multiplicity.REQUIRED, false,
"hive_database"));
typeDefinitions.add(tableTypeDefinition);
HierarchicalTypeDefinition<TraitType> fetlTypeDefinition = TypesUtil
.createTraitTypeDef("hive_fetl", ImmutableList.<String>of(),
TypesUtil.createRequiredAttrDef("level", DataTypes.INT_TYPE));
typeDefinitions.add(fetlTypeDefinition);
typeSystem.defineTypes(ImmutableList.<StructTypeDefinition>of(), ImmutableList.of(fetlTypeDefinition),
ImmutableList.of(databaseTypeDefinition, tableTypeDefinition));
return typeDefinitions;
}
protected Referenceable createHiveTableReferenceable() throws AtlasException {
Referenceable databaseInstance = new Referenceable("hive_database");
databaseInstance.set("name", "hive_database");
databaseInstance.set("description", "foo database");
Referenceable tableInstance = new Referenceable("hive_table", "hive_fetl");
tableInstance.set("name", "t1");
tableInstance.set("description", "bar table");
tableInstance.set("type", "managed");
tableInstance.set("hive_database", databaseInstance);
Struct traitInstance = (Struct) tableInstance.getTrait("hive_fetl");
traitInstance.set("level", 1);
tableInstance.set("hive_fetl", traitInstance);
return tableInstance;
}
protected ITypedReferenceableInstance createHiveTableInstance(TypeSystem typeSystem) throws AtlasException {
ClassType tableType = typeSystem.getDataType(ClassType.class, "hive_table");
return tableType.convert(createHiveTableReferenceable(), Multiplicity.REQUIRED);
}
@Test
public void testType() throws AtlasException {
TypeSystem ts = getTypeSystem();
createHiveTypes(ts);
String jsonStr = TypesSerialization$.MODULE$.toJson(ts, ImmutableList.of("hive_database", "hive_table"));
System.out.println(jsonStr);
TypesDef typesDef1 = TypesSerialization$.MODULE$.fromJson(jsonStr);
System.out.println(typesDef1);
ts.reset();
ts.defineTypes(typesDef1);
jsonStr = TypesSerialization$.MODULE$.toJson(ts, ImmutableList.of("hive_database", "hive_table"));
System.out.println(jsonStr);
}
@Test
public void testInstance() throws AtlasException {
TypeSystem ts = getTypeSystem();
createHiveTypes(ts);
ITypedReferenceableInstance i = createHiveTableInstance(getTypeSystem());
String jsonStr = Serialization$.MODULE$.toJson(i);
System.out.println(jsonStr);
i = Serialization$.MODULE$.fromJson(jsonStr);
System.out.println(i);
}
@Test
public void testInstanceSerialization() throws AtlasException {
TypeSystem ts = getTypeSystem();
createHiveTypes(ts);
Referenceable r = createHiveTableReferenceable();
String jsonStr = InstanceSerialization$.MODULE$.toJson(r, true);
Referenceable r1 = InstanceSerialization$.MODULE$.fromJsonReferenceable(jsonStr, true);
ClassType tableType = ts.getDataType(ClassType.class, "hive_table");
ITypedReferenceableInstance i = tableType.convert(r1, Multiplicity.REQUIRED);
jsonStr = Serialization$.MODULE$.toJson(i);
System.out.println(jsonStr);
i = Serialization$.MODULE$.fromJson(jsonStr);
System.out.println(i);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.memory;
import org.apache.atlas.AtlasException;
import org.apache.atlas.repository.BaseTest;
import org.apache.atlas.repository.RepositoryException;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class StorageTest extends BaseTest {
@Before
public void setup() throws Exception {
super.setup();
}
@Test
public void test1() throws AtlasException {
TypeSystem ts = getTypeSystem();
defineDeptEmployeeTypes(ts);
Referenceable hrDept = createDeptEg1(ts);
ITypedReferenceableInstance hrDept2 = getRepository().create(hrDept);
ITypedReferenceableInstance hrDept3 = getRepository().get(hrDept2.getId());
Assert.assertEquals(hrDept3.toString(), "{\n" +
"\tid : (type: Department, id: 1)\n" +
"\tname : \thr\n" +
"\temployees : \t[{\n" +
"\tid : (type: Person, id: 2)\n" +
"\tname : \tJohn\n" +
"\tdepartment : (type: Department, id: 1)\n" +
"\tmanager : (type: Manager, id: 3)\n" +
"}, {\n" +
"\tid : (type: Manager, id: 3)\n" +
"\tsubordinates : \t[(type: Person, id: 2)]\n" +
"\tname : \tJane\n" +
"\tdepartment : (type: Department, id: 1)\n" +
"\tmanager : <null>\n" +
"\n" +
"\tSecurityClearance : \t{\n" +
"\t\tlevel : \t\t1\n" +
"\t}}]\n" +
"}");
}
@Test
public void testGetPerson() throws AtlasException {
TypeSystem ts = getTypeSystem();
defineDeptEmployeeTypes(ts);
Referenceable hrDept = createDeptEg1(ts);
ITypedReferenceableInstance hrDept2 = getRepository().create(hrDept);
Id e1Id = new Id(2, 0, "Person");
ITypedReferenceableInstance e1 = getRepository().get(e1Id);
Assert.assertEquals(e1.toString(), "{\n" +
"\tid : (type: Person, id: 2)\n" +
"\tname : \tJohn\n" +
"\tdepartment : (type: Department, id: 1)\n" +
"\tmanager : (type: Manager, id: 3)\n" +
"}");
}
@Test
public void testInvalidTypeName() throws AtlasException {
TypeSystem ts = getTypeSystem();
defineDeptEmployeeTypes(ts);
Referenceable hrDept = createDeptEg1(ts);
ITypedReferenceableInstance hrDept2 = getRepository().create(hrDept);
Id e1Id = new Id(3, 0, "Person");
try {
ITypedReferenceableInstance e1 = getRepository().get(e1Id);
} catch (RepositoryException re) {
RepositoryException me = (RepositoryException) re.getCause();
Assert.assertEquals(me.getMessage(), "Invalid Id (unknown) : (type: Person, id: 3)");
}
}
@Test
public void testGetManager() throws AtlasException {
TypeSystem ts = getTypeSystem();
defineDeptEmployeeTypes(ts);
Referenceable hrDept = createDeptEg1(ts);
ITypedReferenceableInstance hrDept2 = getRepository().create(hrDept);
Id m1Id = new Id(3, 0, "Manager");
ITypedReferenceableInstance m1 = getRepository().get(m1Id);
Assert.assertEquals(m1.toString(), "{\n" +
"\tid : (type: Manager, id: 3)\n" +
"\tsubordinates : \t[(type: Person, id: 2)]\n" +
"\tname : \tJane\n" +
"\tdepartment : (type: Department, id: 1)\n" +
"\tmanager : <null>\n" +
"\n" +
"\tSecurityClearance : \t{\n" +
"\t\tlevel : \t\t1\n" +
"\t}}");
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.memory;
import org.apache.atlas.AtlasException;
import org.apache.atlas.repository.BaseTest;
import org.apache.atlas.typesystem.ITypedStruct;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.json.InstanceSerialization$;
import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.StructType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class StructTest extends BaseTest {
StructType structType;
StructType recursiveStructType;
@Before
public void setup() throws Exception {
super.setup();
structType = (StructType) getTypeSystem().getDataType(StructType.class, STRUCT_TYPE_1);
recursiveStructType = (StructType) getTypeSystem().getDataType(StructType.class, STRUCT_TYPE_2);
}
@Test
public void test1() throws AtlasException {
Struct s = createStruct();
ITypedStruct ts = structType.convert(s, Multiplicity.REQUIRED);
Assert.assertEquals(ts.toString(), "{\n" +
"\ta : \t1\n" +
"\tb : \ttrue\n" +
"\tc : \t1\n" +
"\td : \t2\n" +
"\te : \t1\n" +
"\tf : \t1\n" +
"\tg : \t1\n" +
"\th : \t1.0\n" +
"\ti : \t1.0\n" +
"\tj : \t1\n" +
"\tk : \t1\n" +
"\tl : \t" + TEST_DATE + "\n" +
"\tm : \t[1, 1]\n" +
"\tn : \t[1.1, 1.1]\n" +
"\to : \t{a=1.0, b=2.0}\n" +
"}");
}
@Test
public void testRecursive() throws AtlasException {
Struct s1 = new Struct(recursiveStructType.getName());
s1.set("a", 1);
Struct s2 = new Struct(recursiveStructType.getName());
s2.set("a", 1);
s2.set("s", s1);
ITypedStruct ts = recursiveStructType.convert(s2, Multiplicity.REQUIRED);
Assert.assertEquals(ts.toString(), "{\n" +
"\ta : \t1\n" +
"\ts : \t{\n" +
"\t\ta : \t\t1\n" +
"\t\ts : <null>\n" +
"\n" +
"\t}\n" +
"}");
}
@Test
public void testSerialization() throws AtlasException {
Struct s = createStruct();
String jsonStr = InstanceSerialization$.MODULE$.toJson(s, true);
Struct s1 = InstanceSerialization$.MODULE$.fromJsonStruct(jsonStr, true);
ITypedStruct ts = structType.convert(s1, Multiplicity.REQUIRED);
Assert.assertEquals(ts.toString(), "{\n" +
"\ta : \t1\n" +
"\tb : \ttrue\n" +
"\tc : \t1\n" +
"\td : \t2\n" +
"\te : \t1\n" +
"\tf : \t1\n" +
"\tg : \t1\n" +
"\th : \t1.0\n" +
"\ti : \t1.0\n" +
"\tj : \t1\n" +
"\tk : \t1\n" +
"\tl : \t" + TEST_DATE + "\n" +
"\tm : \t[1, 1]\n" +
"\tn : \t[1.100000000000000088817841970012523233890533447265625, 1" +
".100000000000000088817841970012523233890533447265625]\n" +
"\to : \t{a=1.0, b=2.0}\n" +
"}");
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.memory;
import com.google.common.collect.ImmutableList;
import org.apache.atlas.AtlasException;
import org.apache.atlas.repository.BaseTest;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.ITypedStruct;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.types.DataTypes;
import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.TraitType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.apache.atlas.typesystem.types.utils.TypesUtil.createOptionalAttrDef;
import static org.apache.atlas.typesystem.types.utils.TypesUtil.createRequiredAttrDef;
import static org.apache.atlas.typesystem.types.utils.TypesUtil.createTraitTypeDef;
public class TraitTest extends BaseTest {
@Before
public void setup() throws Exception {
super.setup();
}
/*
* Type Hierarchy is:
* A(a,b,c,d)
* B(b) extends A
* C(c) extends A
* D(d) extends B,C
*
* - There are a total of 11 fields in an instance of D
* - an attribute that is hidden by a SubType can referenced by prefixing it with the
* complete Path.
* For e.g. the 'b' attribute in A (that is a superType for B) is hidden the 'b' attribute
* in B.
* So it is availabel by the name 'A.B.D.b'
*
* - Another way to set attributes is to cast. Casting a 'D' instance of 'B' makes the 'A.B.D
* .b' attribute
* available as 'A.B.b'. Casting one more time to an 'A' makes the 'A.B.b' attribute
* available as 'b'.
*/
@Test
public void test1() throws AtlasException {
HierarchicalTypeDefinition A = createTraitTypeDef("A", null, createRequiredAttrDef("a", DataTypes.INT_TYPE),
createOptionalAttrDef("b", DataTypes.BOOLEAN_TYPE), createOptionalAttrDef("c", DataTypes.BYTE_TYPE),
createOptionalAttrDef("d", DataTypes.SHORT_TYPE));
HierarchicalTypeDefinition B = createTraitTypeDef("B", ImmutableList.<String>of("A"),
createOptionalAttrDef("b", DataTypes.BOOLEAN_TYPE));
HierarchicalTypeDefinition C =
createTraitTypeDef("C", ImmutableList.<String>of("A"), createOptionalAttrDef("c", DataTypes.BYTE_TYPE));
HierarchicalTypeDefinition D = createTraitTypeDef("D", ImmutableList.<String>of("B", "C"),
createOptionalAttrDef("d", DataTypes.SHORT_TYPE));
defineTraits(A, B, C, D);
TraitType DType = (TraitType) getTypeSystem().getDataType(TraitType.class, "D");
Struct s1 = new Struct("D");
s1.set("d", 1);
s1.set("c", 1);
s1.set("b", true);
s1.set("a", 1);
s1.set("A.B.D.b", true);
s1.set("A.B.D.c", 2);
s1.set("A.B.D.d", 2);
s1.set("A.C.D.a", 3);
s1.set("A.C.D.b", false);
s1.set("A.C.D.c", 3);
s1.set("A.C.D.d", 3);
ITypedStruct ts = DType.convert(s1, Multiplicity.REQUIRED);
Assert.assertEquals(ts.toString(), "{\n" +
"\td : \t1\n" +
"\tb : \ttrue\n" +
"\tc : \t1\n" +
"\ta : \t1\n" +
"\tA.B.D.b : \ttrue\n" +
"\tA.B.D.c : \t2\n" +
"\tA.B.D.d : \t2\n" +
"\tA.C.D.a : \t3\n" +
"\tA.C.D.b : \tfalse\n" +
"\tA.C.D.c : \t3\n" +
"\tA.C.D.d : \t3\n" +
"}");
/*
* cast to B and set the 'b' attribute on A.
*/
TraitType BType = (TraitType) getTypeSystem().getDataType(TraitType.class, "B");
IStruct s2 = DType.castAs(ts, "B");
s2.set("A.B.b", false);
Assert.assertEquals(ts.toString(), "{\n" +
"\td : \t1\n" +
"\tb : \ttrue\n" +
"\tc : \t1\n" +
"\ta : \t1\n" +
"\tA.B.D.b : \tfalse\n" +
"\tA.B.D.c : \t2\n" +
"\tA.B.D.d : \t2\n" +
"\tA.C.D.a : \t3\n" +
"\tA.C.D.b : \tfalse\n" +
"\tA.C.D.c : \t3\n" +
"\tA.C.D.d : \t3\n" +
"}");
/*
* cast again to A and set the 'b' attribute on A.
*/
TraitType AType = (TraitType) getTypeSystem().getDataType(TraitType.class, "A");
IStruct s3 = BType.castAs(s2, "A");
s3.set("b", true);
Assert.assertEquals(ts.toString(), "{\n" +
"\td : \t1\n" +
"\tb : \ttrue\n" +
"\tc : \t1\n" +
"\ta : \t1\n" +
"\tA.B.D.b : \ttrue\n" +
"\tA.B.D.c : \t2\n" +
"\tA.B.D.d : \t2\n" +
"\tA.C.D.a : \t3\n" +
"\tA.C.D.b : \tfalse\n" +
"\tA.C.D.c : \t3\n" +
"\tA.C.D.d : \t3\n" +
"}");
}
@Test
public void testRandomOrder() throws AtlasException {
HierarchicalTypeDefinition A = createTraitTypeDef("A", null, createRequiredAttrDef("a", DataTypes.INT_TYPE),
createOptionalAttrDef("b", DataTypes.BOOLEAN_TYPE), createOptionalAttrDef("c", DataTypes.BYTE_TYPE),
createOptionalAttrDef("d", DataTypes.SHORT_TYPE));
HierarchicalTypeDefinition B = createTraitTypeDef("B", ImmutableList.<String>of("A"),
createOptionalAttrDef("b", DataTypes.BOOLEAN_TYPE));
HierarchicalTypeDefinition C =
createTraitTypeDef("C", ImmutableList.<String>of("A"), createOptionalAttrDef("c", DataTypes.BYTE_TYPE));
HierarchicalTypeDefinition D = createTraitTypeDef("D", ImmutableList.<String>of("B", "C"),
createOptionalAttrDef("d", DataTypes.SHORT_TYPE));
defineTraits(B, D, A, C);
TraitType DType = (TraitType) getTypeSystem().getDataType(TraitType.class, "D");
Struct s1 = new Struct("D");
s1.set("d", 1);
s1.set("c", 1);
s1.set("b", true);
s1.set("a", 1);
s1.set("A.B.D.b", true);
s1.set("A.B.D.c", 2);
s1.set("A.B.D.d", 2);
s1.set("A.C.D.a", 3);
s1.set("A.C.D.b", false);
s1.set("A.C.D.c", 3);
s1.set("A.C.D.d", 3);
ITypedStruct ts = DType.convert(s1, Multiplicity.REQUIRED);
Assert.assertEquals(ts.toString(), "{\n" +
"\td : \t1\n" +
"\tb : \ttrue\n" +
"\tc : \t1\n" +
"\ta : \t1\n" +
"\tA.B.D.b : \ttrue\n" +
"\tA.B.D.c : \t2\n" +
"\tA.B.D.d : \t2\n" +
"\tA.C.D.a : \t3\n" +
"\tA.C.D.b : \tfalse\n" +
"\tA.C.D.c : \t3\n" +
"\tA.C.D.d : \t3\n" +
"}");
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.service;
import com.google.inject.Inject;
import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.util.TitanCleanup;
import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.TestUtils;
import org.apache.atlas.TypeNotFoundException;
import org.apache.atlas.repository.EntityNotFoundException;
import org.apache.atlas.repository.graph.GraphProvider;
import org.apache.atlas.services.MetadataService;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.commons.lang.RandomStringUtils;
import org.codehaus.jettison.json.JSONArray;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
@Guice(modules = RepositoryMetadataModule.class)
public class DefaultMetadataServiceTest {
@Inject
private MetadataService metadataService;
@Inject
private GraphProvider<TitanGraph> graphProvider;
@BeforeClass
public void setUp() throws Exception {
TypesDef typesDef = TestUtils.defineHiveTypes();
try {
metadataService.getTypeDefinition(TestUtils.TABLE_TYPE);
} catch (TypeNotFoundException e) {
metadataService.createType(TypesSerialization.toJson(typesDef));
}
}
@AfterClass
public void shudown() {
try {
//TODO - Fix failure during shutdown while using BDB
graphProvider.get().shutdown();
} catch(Exception e) {
e.printStackTrace();
}
try {
TitanCleanup.clear(graphProvider.get());
} catch(Exception e) {
e.printStackTrace();
}
}
private String createInstance(Referenceable entity) throws Exception {
String entityjson = InstanceSerialization.toJson(entity, true);
JSONArray entitiesJson = new JSONArray();
entitiesJson.put(entityjson);
String response = metadataService.createEntities(entitiesJson.toString());
return new JSONArray(response).getString(0);
}
private Referenceable createDBEntity() {
Referenceable entity = new Referenceable(TestUtils.DATABASE_TYPE);
String dbName = RandomStringUtils.randomAlphanumeric(10);
entity.set("name", dbName);
entity.set("description", "us db");
return entity;
}
@Test
public void testCreateEntityWithUniqueAttribute() throws Exception {
//name is the unique attribute
Referenceable entity = createDBEntity();
String id = createInstance(entity);
//using the same name should succeed, but not create another entity
String newId = createInstance(entity);
Assert.assertEquals(newId, id);
//Same entity, but different qualified name should succeed
entity.set("name", TestUtils.randomString());
newId = createInstance(entity);
Assert.assertNotEquals(newId, id);
}
@Test
public void testCreateEntityWithUniqueAttributeWithReference() throws Exception {
Referenceable db = createDBEntity();
String dbId = createInstance(db);
Referenceable table = new Referenceable(TestUtils.TABLE_TYPE);
table.set("name", TestUtils.randomString());
table.set("description", "random table");
table.set("type", "type");
table.set("tableType", "MANAGED");
table.set("database", db);
createInstance(table);
//table create should re-use the db instance created earlier
String tableDefinitionJson =
metadataService.getEntityDefinition(TestUtils.TABLE_TYPE, "name", (String) table.get("name"));
Referenceable tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true);
Referenceable actualDb = (Referenceable) tableDefinition.get("database");
Assert.assertEquals(actualDb.getId().id, dbId);
}
@Test
public void testGetEntityByUniqueAttribute() throws Exception {
Referenceable entity = createDBEntity();
createInstance(entity);
//get entity by valid qualified name
String entityJson = metadataService.getEntityDefinition(TestUtils.DATABASE_TYPE, "name",
(String) entity.get("name"));
Assert.assertNotNull(entityJson);
Referenceable referenceable = InstanceSerialization.fromJsonReferenceable(entityJson, true);
Assert.assertEquals(referenceable.get("name"), entity.get("name"));
//get entity by invalid qualified name
try {
metadataService.getEntityDefinition(TestUtils.DATABASE_TYPE, "name", "random");
Assert.fail("Expected EntityNotFoundException");
} catch (EntityNotFoundException e) {
//expected
}
//get entity by non-unique attribute
try {
metadataService.getEntityDefinition(TestUtils.DATABASE_TYPE, "description",
(String) entity.get("description"));
Assert.fail("Expected IllegalArgumentException");
} catch (IllegalArgumentException e) {
//expected
}
}
}
......@@ -19,14 +19,13 @@
package org.apache.atlas.query
import java.io.File
import java.util.{Date, UUID}
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Date, UUID}
import javax.script.{Bindings, ScriptEngine, ScriptEngineManager}
import com.thinkaurelius.titan.core.TitanGraph
import com.typesafe.config.ConfigFactory
import org.apache.atlas.repository.BaseTest
import org.apache.atlas.typesystem.types.TypeSystem
import org.apache.atlas.TestUtils
import org.apache.commons.io.FileUtils
import scala.collection.mutable.ArrayBuffer
......@@ -140,7 +139,7 @@ object HiveTitanSample {
created: Option[Date] = None) {
val createdDate : Date = created match {
case Some(x) => x
case None => new Date(BaseTest.TEST_DATE_IN_LONG)
case None => new Date(TestUtils.TEST_DATE_IN_LONG)
}
val sd = StorageDescriptor(inputFormat, outputFormat)
val colDefs = columns map { c =>
......
......@@ -127,7 +127,8 @@ object QueryTestsUtils extends GraphUtils {
def jdbcTraitDef = new HierarchicalTypeDefinition[TraitType](classOf[TraitType], "JdbcAccess", null,
Array[AttributeDefinition]())
TypeSystem.getInstance().defineTypes(ImmutableList.of[StructTypeDefinition],
TypeSystem.getInstance().defineTypes(ImmutableList.of[EnumTypeDefinition],
ImmutableList.of[StructTypeDefinition],
ImmutableList.of[HierarchicalTypeDefinition[TraitType]](dimTraitDef, piiTraitDef,
metricTraitDef, etlTraitDef, jdbcTraitDef),
ImmutableList.of[HierarchicalTypeDefinition[ClassType]](dbClsDef, storageDescClsDef, columnClsDef, tblClsDef,
......
......@@ -126,6 +126,11 @@
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas;
public class TypeExistsException extends AtlasException {
public TypeExistsException(String message) {
super(message);
}
}
......@@ -115,7 +115,11 @@ public class StructInstance implements ITypedStruct {
clsType.validateId((Id) val);
cVal = val;
} else {
cVal = i.dataType().convert(val, i.multiplicity);
try {
cVal = i.dataType().convert(val, i.multiplicity);
} catch(ValueConversionException.NullConversionException e) {
throw new ValueConversionException.NullConversionException("For field '" + attrName + "'", e);
}
}
if (cVal == null) {
nullFlags[nullPos] = true;
......
......@@ -25,6 +25,7 @@ public final class AttributeDefinition {
public final String name;
public final String dataTypeName;
public final Multiplicity multiplicity;
//A composite is the one whose lifecycle is dependent on the enclosing type and is not just a reference
public final boolean isComposite;
public final boolean isUnique;
public final boolean isIndexable;
......
......@@ -27,6 +27,7 @@ import java.util.Map;
public class AttributeInfo {
public final String name;
public final Multiplicity multiplicity;
//A composite is the one whose lifecycle is dependent on the enclosing type and is not just a reference
public final boolean isComposite;
public final boolean isUnique;
public final boolean isIndexable;
......
......@@ -55,8 +55,8 @@ public class ObjectGraphWalker {
throws AtlasException {
this.typeSystem = typeSystem;
this.nodeProcessor = nodeProcessor;
queue = new LinkedList<IReferenceableInstance>();
processedIds = new HashSet<Id>();
queue = new LinkedList<>();
processedIds = new HashSet<>();
if (start != null) {
visitReferenceableInstance(start);
}
......@@ -194,7 +194,7 @@ public class ObjectGraphWalker {
}
}
public static interface NodeProcessor {
public interface NodeProcessor {
void processNode(Node nd) throws AtlasException;
}
......
......@@ -22,6 +22,7 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
import org.apache.atlas.AtlasException;
import org.apache.atlas.TypeExistsException;
import org.apache.atlas.TypeNotFoundException;
import org.apache.atlas.classification.InterfaceAudience;
import org.apache.atlas.typesystem.TypesDef;
......@@ -169,9 +170,9 @@ public class TypeSystem {
public StructType defineStructType(String name, boolean errorIfExists, AttributeDefinition... attrDefs)
throws AtlasException {
StructTypeDefinition structDef = new StructTypeDefinition(name, attrDefs);
defineTypes(ImmutableList.of(structDef), ImmutableList.<HierarchicalTypeDefinition<TraitType>>of(),
defineTypes(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.of(structDef),
ImmutableList.<HierarchicalTypeDefinition<TraitType>>of(),
ImmutableList.<HierarchicalTypeDefinition<ClassType>>of());
return getDataType(StructType.class, structDef.typeName);
}
......@@ -196,56 +197,50 @@ public class TypeSystem {
}
public TraitType defineTraitType(HierarchicalTypeDefinition<TraitType> traitDef) throws AtlasException {
defineTypes(ImmutableList.<StructTypeDefinition>of(), ImmutableList.of(traitDef),
ImmutableList.<HierarchicalTypeDefinition<ClassType>>of());
defineTypes(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
ImmutableList.of(traitDef), ImmutableList.<HierarchicalTypeDefinition<ClassType>>of());
return getDataType(TraitType.class, traitDef.typeName);
}
public ClassType defineClassType(HierarchicalTypeDefinition<ClassType> classDef) throws AtlasException {
defineTypes(ImmutableList.<StructTypeDefinition>of(), ImmutableList.<HierarchicalTypeDefinition<TraitType>>of(),
ImmutableList.of(classDef));
defineTypes(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
ImmutableList.<HierarchicalTypeDefinition<TraitType>>of(), ImmutableList.of(classDef));
return getDataType(ClassType.class, classDef.typeName);
}
public Map<String, IDataType> defineTraitTypes(HierarchicalTypeDefinition<TraitType>... traitDefs)
throws AtlasException {
TransientTypeSystem transientTypes =
new TransientTypeSystem(ImmutableList.<StructTypeDefinition>of(), ImmutableList.copyOf(traitDefs),
new TransientTypeSystem(ImmutableList.<EnumTypeDefinition>of(),
ImmutableList.<StructTypeDefinition>of(), ImmutableList.copyOf(traitDefs),
ImmutableList.<HierarchicalTypeDefinition<ClassType>>of());
return transientTypes.defineTypes();
}
public Map<String, IDataType> defineClassTypes(HierarchicalTypeDefinition<ClassType>... classDefs)
throws AtlasException {
TransientTypeSystem transientTypes = new TransientTypeSystem(ImmutableList.<StructTypeDefinition>of(),
ImmutableList.<HierarchicalTypeDefinition<TraitType>>of(), ImmutableList.copyOf(classDefs));
TransientTypeSystem transientTypes = new TransientTypeSystem(ImmutableList.<EnumTypeDefinition>of(),
ImmutableList.<StructTypeDefinition>of(), ImmutableList.<HierarchicalTypeDefinition<TraitType>>of(),
ImmutableList.copyOf(classDefs));
return transientTypes.defineTypes();
}
public Map<String, IDataType> defineTypes(TypesDef typesDef) throws AtlasException {
Map<String, IDataType> typesAdded = new HashMap<>();
for (EnumTypeDefinition enumDef : typesDef.enumTypesAsJavaList()) {
typesAdded.put(enumDef.name, defineEnumType(enumDef));
}
ImmutableList<EnumTypeDefinition> enumDefs = ImmutableList.copyOf(typesDef.enumTypesAsJavaList());
ImmutableList<StructTypeDefinition> structDefs = ImmutableList.copyOf(typesDef.structTypesAsJavaList());
ImmutableList<HierarchicalTypeDefinition<TraitType>> traitDefs =
ImmutableList.copyOf(typesDef.traitTypesAsJavaList());
ImmutableList<HierarchicalTypeDefinition<ClassType>> classDefs =
ImmutableList.copyOf(typesDef.classTypesAsJavaList());
typesAdded.putAll(defineTypes(structDefs, traitDefs, classDefs));
return typesAdded;
return defineTypes(enumDefs, structDefs, traitDefs, classDefs);
}
public Map<String, IDataType> defineTypes(ImmutableList<StructTypeDefinition> structDefs,
public Map<String, IDataType> defineTypes(ImmutableList<EnumTypeDefinition> enumDefs,
ImmutableList<StructTypeDefinition> structDefs,
ImmutableList<HierarchicalTypeDefinition<TraitType>> traitDefs,
ImmutableList<HierarchicalTypeDefinition<ClassType>> classDefs) throws AtlasException {
TransientTypeSystem transientTypes = new TransientTypeSystem(structDefs, traitDefs, classDefs);
TransientTypeSystem transientTypes = new TransientTypeSystem(enumDefs, structDefs, traitDefs, classDefs);
Map<String, IDataType> definedTypes = transientTypes.defineTypes();
// LOG.debug("Defined new types " + Arrays.toString(definedTypes.keySet().toArray(new
// String[definedTypes.size()])));
......@@ -307,6 +302,7 @@ public class TypeSystem {
final ImmutableList<StructTypeDefinition> structDefs;
final ImmutableList<HierarchicalTypeDefinition<TraitType>> traitDefs;
final ImmutableList<HierarchicalTypeDefinition<ClassType>> classDefs;
private final ImmutableList<EnumTypeDefinition> enumDefs;
Map<String, StructTypeDefinition> structNameToDefMap = new HashMap<>();
Map<String, HierarchicalTypeDefinition<TraitType>> traitNameToDefMap = new HashMap<>();
Map<String, HierarchicalTypeDefinition<ClassType>> classNameToDefMap = new HashMap<>();
......@@ -318,10 +314,10 @@ public class TypeSystem {
List<DataTypes.MapType> recursiveMapTypes;
TransientTypeSystem(ImmutableList<StructTypeDefinition> structDefs,
ImmutableList<HierarchicalTypeDefinition<TraitType>> traitDefs,
ImmutableList<HierarchicalTypeDefinition<ClassType>> classDefs) {
TransientTypeSystem(ImmutableList<EnumTypeDefinition> enumDefs, ImmutableList<StructTypeDefinition> structDefs,
ImmutableList<HierarchicalTypeDefinition<TraitType>> traitDefs,
ImmutableList<HierarchicalTypeDefinition<ClassType>> classDefs) {
this.enumDefs = enumDefs;
this.structDefs = structDefs;
this.traitDefs = traitDefs;
this.classDefs = classDefs;
......@@ -345,10 +341,22 @@ public class TypeSystem {
* - setup shallow Type instances to facilitate recursive type graphs
*/
private void step1() throws AtlasException {
for (EnumTypeDefinition eDef : enumDefs) {
assert eDef.name != null;
if (types.containsKey(eDef.name)) {
throw new AtlasException(String.format("Redefinition of type %s not supported", eDef.name));
}
EnumType eT = new EnumType(this, eDef.name, eDef.enumValues);
TypeSystem.this.types.put(eDef.name, eT);
typeCategoriesToTypeNamesMap.put(DataTypes.TypeCategory.ENUM, eDef.name);
transientTypes.add(eDef.name);
}
for (StructTypeDefinition sDef : structDefs) {
assert sDef.typeName != null;
if (dataType(sDef.typeName) != null) {
throw new AtlasException(String.format("Cannot redefine type %s", sDef.typeName));
throw new TypeExistsException(String.format("Cannot redefine type %s", sDef.typeName));
}
TypeSystem.this.types
.put(sDef.typeName, new StructType(this, sDef.typeName, sDef.attributeDefinitions.length));
......@@ -359,7 +367,7 @@ public class TypeSystem {
for (HierarchicalTypeDefinition<TraitType> traitDef : traitDefs) {
assert traitDef.typeName != null;
if (types.containsKey(traitDef.typeName)) {
throw new AtlasException(String.format("Cannot redefine type %s", traitDef.typeName));
throw new TypeExistsException(String.format("Cannot redefine type %s", traitDef.typeName));
}
TypeSystem.this.types.put(traitDef.typeName, new TraitType(this, traitDef.typeName, traitDef.superTypes,
......@@ -371,7 +379,7 @@ public class TypeSystem {
for (HierarchicalTypeDefinition<ClassType> classDef : classDefs) {
assert classDef.typeName != null;
if (types.containsKey(classDef.typeName)) {
throw new AtlasException(String.format("Cannot redefine type %s", classDef.typeName));
throw new TypeExistsException(String.format("Cannot redefine type %s", classDef.typeName));
}
TypeSystem.this.types.put(classDef.typeName, new ClassType(this, classDef.typeName, classDef.superTypes,
......@@ -588,7 +596,8 @@ public class TypeSystem {
}
@Override
public Map<String, IDataType> defineTypes(ImmutableList<StructTypeDefinition> structDefs,
public Map<String, IDataType> defineTypes(ImmutableList<EnumTypeDefinition> enumDefs,
ImmutableList<StructTypeDefinition> structDefs,
ImmutableList<HierarchicalTypeDefinition<TraitType>> traitDefs,
ImmutableList<HierarchicalTypeDefinition<ClassType>> classDefs) throws AtlasException {
throw new AtlasException("Internal Error: define type called on TrasientTypeSystem");
......
......@@ -43,10 +43,17 @@ public class ValueConversionException extends AtlasException {
super(msg);
}
protected ValueConversionException(String msg, Exception e) {
super(msg, e);
}
public static class NullConversionException extends ValueConversionException {
public NullConversionException(Multiplicity m) {
super(String.format("Null value not allowed for multiplicty %s", m));
}
public NullConversionException(String msg, Exception e) {
super(msg, e);
}
}
}
......@@ -41,6 +41,7 @@ atlas.graph.index.search.elasticsearch.create.sleep=2000
atlas.graph.index.search.solr.mode=cloud
atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address}
######### Hive Lineage Configs #########
# This models reflects the base super types for Data and Process
#atlas.lineage.hive.table.type.name=DataSet
......@@ -53,8 +54,13 @@ atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, co
######### Notification Configs #########
atlas.notification.embedded=true
atlas.notification.implementation=org.apache.atlas.kafka.KafkaNotification
atlas.notification.kafka.data=target/data/kafka
atlas.kafka.zookeeper.connect=localhost:9026
atlas.kafka.bootstrap.servers=localhost:9027
atlas.kafka.data=target/data/kafka
atlas.kafka.zookeeper.session.timeout.ms=400
atlas.kafka.zookeeper.sync.time.ms=20
atlas.kafka.auto.commit.interval.ms=100
######### Security Properties #########
......
<?xml version="1.0" encoding="UTF-8" ?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="console" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/>
</layout>
</appender>
<appender name="AUDIT" class="org.apache.log4j.DailyRollingFileAppender">
<param name="File" value="${atlas.log.dir}/audit.log"/>
<param name="Append" value="true"/>
<param name="Threshold" value="debug"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %x %m%n"/>
</layout>
</appender>
<logger name="org.apache.atlas" additivity="false">
<level value="debug"/>
<appender-ref ref="console"/>
</logger>
<logger name="AUDIT">
<level value="info"/>
<appender-ref ref="console"/>
</logger>
<root>
<priority value="warn"/>
<appender-ref ref="console"/>
</root>
</log4j:configuration>
......@@ -236,7 +236,7 @@ trait TypeHelpers {
@throws(classOf[AtlasException])
def defineClassType(ts: TypeSystem, classDef: HierarchicalTypeDefinition[ClassType]): ClassType = {
ts.defineTypes(ImmutableList.of[StructTypeDefinition],
ts.defineTypes(ImmutableList.of[EnumTypeDefinition], ImmutableList.of[StructTypeDefinition],
ImmutableList.of[HierarchicalTypeDefinition[TraitType]],
ImmutableList.of[HierarchicalTypeDefinition[ClassType]](classDef))
return ts.getDataType(classOf[ClassType], classDef.typeName)
......
......@@ -25,7 +25,7 @@ public class ApplicationPropertiesTest {
@Test
public void testVariables() throws Exception {
Configuration properties = ApplicationProperties.get();
Configuration properties = ApplicationProperties.get(ApplicationProperties.APPLICATION_PROPERTIES);
//plain property without variables
Assert.assertEquals(properties.getString("atlas.service"), "atlas");
......@@ -44,7 +44,7 @@ public class ApplicationPropertiesTest {
@Test
//variable substitutions should work with subset configuration as well
public void testSubset() throws Exception {
Configuration configuration = ApplicationProperties.get();
Configuration configuration = ApplicationProperties.get(ApplicationProperties.APPLICATION_PROPERTIES);
Configuration subConfiguration = configuration.subset("atlas");
Assert.assertEquals(subConfiguration.getString("service"), "atlas");
......
......@@ -28,6 +28,7 @@ import org.apache.atlas.typesystem.types.AttributeDefinition;
import org.apache.atlas.typesystem.types.BaseTest;
import org.apache.atlas.typesystem.types.ClassType;
import org.apache.atlas.typesystem.types.DataTypes;
import org.apache.atlas.typesystem.types.EnumTypeDefinition;
import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.StructTypeDefinition;
......@@ -79,9 +80,9 @@ public class SerializationJavaTest extends BaseTest {
createTraitTypeDef("SecurityClearance", ImmutableList.<String>of(),
createRequiredAttrDef("level", DataTypes.INT_TYPE));
ts.defineTypes(ImmutableList.<StructTypeDefinition>of(),
ImmutableList.<HierarchicalTypeDefinition<TraitType>>of(securityClearanceTypeDef),
ImmutableList.<HierarchicalTypeDefinition<ClassType>>of(deptTypeDef, personTypeDef, managerTypeDef));
ts.defineTypes(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
ImmutableList.of(securityClearanceTypeDef),
ImmutableList.of(deptTypeDef, personTypeDef, managerTypeDef));
Referenceable hrDept = new Referenceable("Department");
Referenceable john = new Referenceable("Person");
......@@ -147,8 +148,8 @@ public class SerializationJavaTest extends BaseTest {
createTraitTypeDef("SecurityClearance2", ImmutableList.<String>of(),
createRequiredAttrDef("level", DataTypes.INT_TYPE));
ts.defineTypes(ImmutableList.<StructTypeDefinition>of(),
ImmutableList.<HierarchicalTypeDefinition<TraitType>>of(securityClearanceTypeDef),
ts.defineTypes(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
ImmutableList.of(securityClearanceTypeDef),
ImmutableList.<HierarchicalTypeDefinition<ClassType>>of());
......
......@@ -136,7 +136,8 @@ public abstract class BaseTest {
.createTraitTypeDef("SecurityClearance", ImmutableList.<String>of(),
TypesUtil.createRequiredAttrDef("level", DataTypes.INT_TYPE));
ts.defineTypes(ImmutableList.<StructTypeDefinition>of(), ImmutableList.of(securityClearanceTypeDef),
ts.defineTypes(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
ImmutableList.of(securityClearanceTypeDef),
ImmutableList.of(deptTypeDef, personTypeDef, managerTypeDef));
ImmutableList.of(ts.getDataType(HierarchicalType.class, "SecurityClearance"),
......
......@@ -80,9 +80,10 @@ public class TypeSystemTest extends BaseTest {
HierarchicalTypeDefinition<TraitType> financeTrait =
TypesUtil.createTraitTypeDef("Finance", ImmutableList.<String>of());
getTypeSystem().defineTypes(ImmutableList.<StructTypeDefinition>of(), ImmutableList
.of(classificationTraitDefinition, piiTrait, phiTrait, pciTrait, soxTrait, secTrait,
financeTrait), ImmutableList.<HierarchicalTypeDefinition<ClassType>>of());
getTypeSystem().defineTypes(ImmutableList.<EnumTypeDefinition>of(),
ImmutableList.<StructTypeDefinition>of(),
ImmutableList.of(classificationTraitDefinition, piiTrait, phiTrait, pciTrait, soxTrait, secTrait,
financeTrait), ImmutableList.<HierarchicalTypeDefinition<ClassType>>of());
final ImmutableList<String> traitsNames = getTypeSystem().getTypeNamesByCategory(DataTypes.TypeCategory.TRAIT);
Assert.assertEquals(traitsNames.size(), 7);
......@@ -102,7 +103,6 @@ public class TypeSystemTest extends BaseTest {
String enumType = random();
EnumTypeDefinition orgLevelEnum =
new EnumTypeDefinition(enumType, new EnumValue(random(), 1), new EnumValue(random(), 2));
ts.defineEnumType(orgLevelEnum);
String structName = random();
String attrType = random();
......@@ -117,7 +117,8 @@ public class TypeSystemTest extends BaseTest {
HierarchicalTypeDefinition<TraitType> traitType = createTraitTypeDef(traitName, ImmutableList.<String>of(),
createRequiredAttrDef(attrType, DataTypes.INT_TYPE));
ts.defineTypes(ImmutableList.of(structType), ImmutableList.of(traitType), ImmutableList.of(classType));
ts.defineTypes(ImmutableList.of(orgLevelEnum), ImmutableList.of(structType),
ImmutableList.of(traitType), ImmutableList.of(classType));
}
@Test
......@@ -127,7 +128,7 @@ public class TypeSystemTest extends BaseTest {
HierarchicalTypeDefinition<ClassType> c = TypesUtil.createClassTypeDef("C", ImmutableList.of("B"));
TypeSystem ts = getTypeSystem();
ts.defineTypes(ImmutableList.<StructTypeDefinition>of(),
ts.defineTypes(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
ImmutableList.<HierarchicalTypeDefinition<TraitType>>of(),
ImmutableList.of(a, b, c));
ClassType ac = ts.getDataType(ClassType.class, "a");
......
......@@ -146,7 +146,7 @@ class SerializationTest extends BaseTest {
TypesUtil.createTraitTypeDef("SecurityClearance", ImmutableList.of[String],
TypesUtil.createRequiredAttrDef("level", DataTypes.INT_TYPE))
ts.defineTypes(ImmutableList.of[StructTypeDefinition],
ts.defineTypes(ImmutableList.of[EnumTypeDefinition], ImmutableList.of[StructTypeDefinition],
ImmutableList.of[HierarchicalTypeDefinition[TraitType]](securityClearanceTypeDef),
ImmutableList.of[HierarchicalTypeDefinition[ClassType]](deptTypeDef, personTypeDef, managerTypeDef)
)
......
......@@ -45,7 +45,7 @@ class TypesSerializationTest extends BaseTest with TypeHelpers {
optionalAttr("o", DataTypes.mapTypeName(DataTypes.STRING_TYPE, DataTypes.DOUBLE_TYPE)))
ts.defineTypes(ImmutableList.of[StructTypeDefinition](sDef),
ts.defineTypes(ImmutableList.of[EnumTypeDefinition], ImmutableList.of[StructTypeDefinition](sDef),
ImmutableList.of[HierarchicalTypeDefinition[TraitType]],
ImmutableList.of[HierarchicalTypeDefinition[ClassType]]
)
......@@ -123,7 +123,7 @@ class TypesSerializationTest extends BaseTest with TypeHelpers {
val securityClearanceTypeDef: HierarchicalTypeDefinition[TraitType] = createTraitTypeDef("SecurityClearance", List(),
requiredAttr("level", DataTypes.INT_TYPE)
)
ts.defineTypes(ImmutableList.of[StructTypeDefinition],
ts.defineTypes(ImmutableList.of[EnumTypeDefinition], ImmutableList.of[StructTypeDefinition],
ImmutableList.of[HierarchicalTypeDefinition[TraitType]](securityClearanceTypeDef),
ImmutableList.of[HierarchicalTypeDefinition[ClassType]](deptTypeDef, personTypeDef, managerTypeDef))
......@@ -136,7 +136,7 @@ class TypesSerializationTest extends BaseTest with TypeHelpers {
typesDef1.enumTypes.foreach(ts1.defineEnumType(_))
ts1.defineTypes(ImmutableList.copyOf(typesDef1.structTypes.toArray),
ts1.defineTypes(ImmutableList.of[EnumTypeDefinition], ImmutableList.copyOf(typesDef1.structTypes.toArray),
ImmutableList.copyOf(typesDef1.traitTypes.toArray),
ImmutableList.copyOf(typesDef1.classTypes.toArray)
)
......
......@@ -188,6 +188,11 @@
<dependencies>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-typesystem</artifactId>
</dependency>
......@@ -202,6 +207,11 @@
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-notification</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
......@@ -413,6 +423,45 @@
</configuration>
</plugin>
<!-- Running unit tests in pre-integration-test phase after war is built -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemProperties>
<user.dir>${project.basedir}</user.dir>
<projectBaseDir>${project.basedir}/..</projectBaseDir>
</systemProperties>
<!--<skipTests>true</skipTests>-->
<forkMode>always</forkMode>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<argLine>-Djava.awt.headless=true -Dproject.version=${project.version}
-Dhadoop.tmp.dir=${project.build.directory}/tmp-hadoop-${user.name}
-Xmx1024m -XX:MaxPermSize=512m
</argLine>
<excludes>
<exclude>**/*Base*</exclude>
</excludes>
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-testng</artifactId>
<version>2.18.1</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>default-test</id>
<phase>pre-integration-test</phase>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-maven-plugin</artifactId>
......@@ -435,6 +484,10 @@
<useTestScope>true</useTestScope>
<systemProperties>
<systemProperty>
<name>log4j.configuration</name>
<value>atlas-log4j.xml</value>
</systemProperty>
<systemProperty>
<name>atlas.log.dir</name>
<value>${project.build.directory}/logs</value>
</systemProperty>
......@@ -453,7 +506,7 @@
</systemProperty>
</systemProperties>
<stopKey>atlas-stop</stopKey>
<stopPort>41001</stopPort>
<stopPort>21001</stopPort>
<daemon>${debug.jetty.daemon}</daemon>
<testClassesDirectory>${project.build.directory}/../../webapp/target/test-classes/</testClassesDirectory>
<useTestClasspath>true</useTestClasspath>
......
......@@ -101,6 +101,7 @@ public final class Main {
configuration.setProperty("atlas.enableTLS", String.valueOf(enableTLS));
showStartupInfo(buildConfiguration, enableTLS, appPort);
server = EmbeddedServer.newServer(appPort, appPath, enableTLS);
server.start();
}
......
......@@ -38,7 +38,6 @@ import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.TypeUtils;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import java.util.List;
......@@ -241,12 +240,11 @@ public class QuickStart {
String entityJSON = InstanceSerialization.toJson(referenceable, true);
System.out.println("Submitting new entity= " + entityJSON);
JSONObject jsonObject = metadataServiceClient.createEntity(entityJSON);
String guid = jsonObject.getString(AtlasClient.GUID);
System.out.println("created instance for type " + typeName + ", guid: " + guid);
JSONArray guids = metadataServiceClient.createEntity(entityJSON);
System.out.println("created instance for type " + typeName + ", guid: " + guids);
// return the Id for created instance with guid
return new Id(guid, referenceable.getId().getVersion(), referenceable.getTypeName());
return new Id(guids.getString(0), referenceable.getId().getVersion(), referenceable.getTypeName());
}
Id database(String name, String description, String owner, String locationUri, String... traitNames)
......@@ -387,11 +385,9 @@ public class QuickStart {
private void search() throws Exception {
for (String dslQuery : getDSLQueries()) {
JSONObject response = metadataServiceClient.searchEntity(dslQuery);
JSONObject results = response.getJSONObject(AtlasClient.RESULTS);
if (!results.isNull("rows")) {
JSONArray rows = results.getJSONArray("rows");
System.out.println("query [" + dslQuery + "] returned [" + rows.length() + "] rows");
JSONArray results = metadataServiceClient.search(dslQuery);
if (results != null) {
System.out.println("query [" + dslQuery + "] returned [" + results.length() + "] rows");
} else {
System.out.println("query [" + dslQuery + "] failed, results:" + results.toString());
}
......
......@@ -33,7 +33,9 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.notification.NotificationModule;
import org.apache.atlas.repository.graph.GraphProvider;
import org.apache.atlas.service.Services;
import org.apache.atlas.web.filters.AtlasAuthenticationFilter;
import org.apache.atlas.web.filters.AuditFilter;
import org.apache.commons.configuration.Configuration;
......@@ -64,7 +66,8 @@ public class GuiceServletConfig extends GuiceServletContextListener {
* .html
*/
if (injector == null) {
injector = Guice.createInjector(new RepositoryMetadataModule(), new JerseyServletModule() {
injector = Guice.createInjector(new RepositoryMetadataModule(), new NotificationModule(),
new JerseyServletModule() {
@Override
protected void configureServlets() {
filter("/*").through(AuditFilter.class);
......@@ -110,6 +113,14 @@ public class GuiceServletConfig extends GuiceServletContextListener {
// perform login operations
LoginProcessor loginProcessor = new LoginProcessor();
loginProcessor.login();
startServices();
}
protected void startServices() {
LOG.debug("Starting services");
Services services = injector.getInstance(Services.class);
services.start();
}
/**
......@@ -132,6 +143,15 @@ public class GuiceServletConfig extends GuiceServletContextListener {
Provider<GraphProvider<TitanGraph>> graphProvider = injector.getProvider(Key.get(graphProviderType));
final Graph graph = graphProvider.get().get();
graph.shutdown();
//stop services
stopServices();
}
}
protected void stopServices() {
LOG.debug("Stopping services");
Services services = injector.getInstance(Services.class);
services.stop();
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.resources;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.repository.EntityExistsException;
import org.apache.atlas.services.MetadataService;
import org.apache.atlas.typesystem.types.ValueConversionException;
import org.apache.atlas.web.util.Servlets;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Singleton;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import java.net.URI;
import java.util.List;
@Path("entities")
@Singleton
public class EntitiesResource {
private static final Logger LOG = LoggerFactory.getLogger(EntitiesResource.class);
@Inject
private MetadataService metadataService;
@Context
UriInfo uriInfo;
/**
* Submits the entity definitions (instances).
* The body contains the JSONArray of entity json. The service takes care of de-duping the entities based on any
* unique attribute for the give type.
*/
@POST
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE)
public Response submit(@Context HttpServletRequest request) {
try {
final String entities = Servlets.getRequestPayload(request);
LOG.debug("submitting entities {} ", AtlasClient.toString(new JSONArray(entities)));
final String guids = metadataService.createEntities(entities);
UriBuilder ub = uriInfo.getAbsolutePathBuilder();
URI locationURI = ub.path(guids).build();
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
response.put(AtlasClient.GUID, new JSONArray(guids));
response.put(AtlasClient.DEFINITION, metadataService.getEntityDefinition(new JSONArray(guids).getString(0)));
return Response.created(locationURI).entity(response).build();
} catch(EntityExistsException e) {
LOG.error("Unique constraint violation", e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.CONFLICT));
} catch (ValueConversionException ve) {
LOG.error("Unable to persist entity instance due to a desrialization error ", ve);
throw new WebApplicationException(Servlets.getErrorResponse(ve.getCause(), Response.Status.BAD_REQUEST));
} catch (AtlasException | IllegalArgumentException e) {
LOG.error("Unable to persist entity instance", e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (Throwable e) {
LOG.error("Unable to persist entity instance", e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
}
}
/**
* Gets the list of entities for a given entity type.
*
* @param entityType name of a type which is unique
*/
@GET
@Produces(Servlets.JSON_MEDIA_TYPE)
public Response getEntityListByType(@QueryParam("type") String entityType) {
try {
Preconditions.checkNotNull(entityType, "Entity type cannot be null");
LOG.debug("Fetching entity list for type={} ", entityType);
final List<String> entityList = metadataService.getEntityList(entityType);
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
response.put(AtlasClient.TYPENAME, entityType);
response.put(AtlasClient.RESULTS, new JSONArray(entityList));
response.put(AtlasClient.COUNT, entityList.size());
return Response.ok(response).build();
} catch (NullPointerException e) {
LOG.error("Entity type cannot be null", e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (AtlasException | IllegalArgumentException e) {
LOG.error("Unable to get entity list for type {}", entityType, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (Throwable e) {
LOG.error("Unable to get entity list for type {}", entityType, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
}
}
}
......@@ -25,7 +25,6 @@ import org.apache.atlas.ParamChecker;
import org.apache.atlas.TypeNotFoundException;
import org.apache.atlas.repository.EntityNotFoundException;
import org.apache.atlas.services.MetadataService;
import org.apache.atlas.typesystem.types.ValueConversionException;
import org.apache.atlas.web.util.Servlets;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
......@@ -59,7 +58,7 @@ import java.util.List;
* An entity is an "instance" of a Type. Entities conform to the definition
* of the Type they correspond with.
*/
@Path("entities")
@Path("entity")
@Singleton
public class EntityResource {
......@@ -82,40 +81,6 @@ public class EntityResource {
this.metadataService = metadataService;
}
/**
* Submits an entity definition (instance) corresponding to a given type.
*/
@POST
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE)
public Response submit(@Context HttpServletRequest request) {
try {
final String entity = Servlets.getRequestPayload(request);
LOG.debug("submitting entity {} ", entity);
final String guid = metadataService.createEntity(entity);
UriBuilder ub = uriInfo.getAbsolutePathBuilder();
URI locationURI = ub.path(guid).build();
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
response.put(AtlasClient.GUID, guid);
response.put(AtlasClient.DEFINITION, metadataService.getEntityDefinition(guid));
return Response.created(locationURI).entity(response).build();
} catch (ValueConversionException ve) {
LOG.error("Unable to persist entity instance due to a desrialization error ", ve);
throw new WebApplicationException(Servlets.getErrorResponse(ve.getCause(), Response.Status.BAD_REQUEST));
} catch (AtlasException | IllegalArgumentException e) {
LOG.error("Unable to persist entity instance", e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (Throwable e) {
LOG.error("Unable to persist entity instance", e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
}
}
/**
* Fetch the complete definition of an entity given its GUID.
......@@ -159,34 +124,47 @@ public class EntityResource {
}
/**
* Gets the list of entities for a given entity type.
* Fetch the complete definition of an entity given its qualified name.
*
* @param entityType name of a type which is unique
* @param entityType
* @param attribute
* @param value
*/
@GET
@Produces(Servlets.JSON_MEDIA_TYPE)
public Response getEntityListByType(@QueryParam("type") String entityType) {
public Response getEntityDefinitionByAttribute(@QueryParam("type") String entityType,
@QueryParam("property") String attribute,
@QueryParam("value") String value) {
try {
Preconditions.checkNotNull(entityType, "Entity type cannot be null");
LOG.debug("Fetching entity definition for type={}, qualified name={}", entityType, value);
ParamChecker.notEmpty(entityType, "type cannot be null");
ParamChecker.notEmpty(attribute, "attribute name cannot be null");
ParamChecker.notEmpty(value, "attribute value cannot be null");
LOG.debug("Fetching entity list for type={} ", entityType);
final List<String> entityList = metadataService.getEntityList(entityType);
final String entityDefinition = metadataService.getEntityDefinition(entityType, attribute, value);
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
response.put(AtlasClient.TYPENAME, entityType);
response.put(AtlasClient.RESULTS, new JSONArray(entityList));
response.put(AtlasClient.COUNT, entityList.size());
return Response.ok(response).build();
} catch (NullPointerException e) {
LOG.error("Entity type cannot be null", e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
Response.Status status = Response.Status.NOT_FOUND;
if (entityDefinition != null) {
response.put(AtlasClient.DEFINITION, entityDefinition);
status = Response.Status.OK;
} else {
response.put(AtlasClient.ERROR, Servlets.escapeJsonString(String.format("An entity with type={%s}, " +
"qualifiedName={%s} does not exist", entityType, value)));
}
return Response.status(status).entity(response).build();
} catch (EntityNotFoundException e) {
LOG.error("An entity with type={} and qualifiedName={} does not exist", entityType, value, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
} catch (AtlasException | IllegalArgumentException e) {
LOG.error("Unable to get entity list for type {}", entityType, e);
LOG.error("Bad type={}, qualifiedName={}", entityType, value, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (Throwable e) {
LOG.error("Unable to get entity list for type {}", entityType, e);
LOG.error("Unable to get instance definition for type={}, qualifiedName={}", entityType, value, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
}
}
......
......@@ -88,28 +88,16 @@ public class MetadataDiscoveryResource {
final String jsonResultStr = discoveryService.searchByDSL(query);
response = new DSLJSONResponseBuilder().results(jsonResultStr).query(query).build();
return Response.ok(response).build();
} catch (IllegalArgumentException e) {
LOG.error("Unable to get entity list for empty query", e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (Throwable throwable) {
LOG.error("Unable to get entity list for query {} using dsl", query, throwable);
try { //fall back to full-text
final String jsonResultStr = discoveryService.searchByFullText(query);
response = new FullTextJSonResponseBuilder().results(jsonResultStr).query(query).build();
} catch (DiscoveryException | IllegalArgumentException e) {
LOG.error("Unable to get entity list for query {}", query, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (Throwable e) {
LOG.error("Unable to get entity list for query {}", query, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
}
return searchUsingFullText(query);
}
return Response.ok(response).build();
}
/**
......@@ -267,7 +255,8 @@ public class MetadataDiscoveryResource {
count(rowsJsonArr.length());
queryType(QUERY_TYPE_DSL);
JSONObject response = super.build();
response.put(AtlasClient.RESULTS, dslResults);
response.put(AtlasClient.RESULTS, rowsJsonArr);
response.put(AtlasClient.DATATYPE, dslResults.get(AtlasClient.DATATYPE));
return response;
}
......
......@@ -21,6 +21,7 @@ package org.apache.atlas.web.resources;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.TypeExistsException;
import org.apache.atlas.services.MetadataService;
import org.apache.atlas.typesystem.types.DataTypes;
import org.apache.atlas.web.util.Servlets;
......@@ -96,6 +97,9 @@ public class TypesResource {
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
response.put(AtlasClient.TYPES, typesResponse);
return Response.status(ClientResponse.Status.CREATED).entity(response).build();
} catch (TypeExistsException e) {
LOG.error("Type already exists", e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.CONFLICT));
} catch (AtlasException | IllegalArgumentException e) {
LOG.error("Unable to persist types", e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
......
......@@ -20,8 +20,6 @@ package org.apache.atlas.web.service;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
......
......@@ -50,7 +50,6 @@
<appender-ref ref="FILE"/>
</logger>
<logger name="AUDIT">
<level value="info"/>
<appender-ref ref="AUDIT"/>
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import com.google.inject.Inject;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.web.resources.BaseResourceIT;
import org.codehaus.jettison.json.JSONArray;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
@Guice(modules = NotificationModule.class)
public class NotificationHookConsumerIT extends BaseResourceIT{
@Inject
private NotificationInterface kafka;
private String dbName;
@BeforeClass
public void setUp() throws Exception {
super.setUp();
createTypeDefinitions();
}
@AfterClass
public void teardown() throws Exception {
kafka.close();
}
private void sendHookMessage(Referenceable entity) throws NotificationException {
String entityJson = InstanceSerialization.toJson(entity, true);
JSONArray jsonArray = new JSONArray();
jsonArray.put(entityJson);
kafka.send(NotificationInterface.NotificationType.HOOK, jsonArray.toString());
}
@Test
public void testConsumeHookMessage() throws Exception {
Referenceable entity = new Referenceable(DATABASE_TYPE);
dbName = "db" + randomString();
entity.set("name", dbName);
entity.set("description", randomString());
sendHookMessage(entity);
waitFor(1000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results =
serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
return results.length() == 1;
}
});
}
@Test (dependsOnMethods = "testConsumeHookMessage")
public void testEnityDeduping() throws Exception {
// Referenceable db = serviceClient.getEntity(DATABASE_TYPE, "name", dbName);
Referenceable db = new Referenceable(DATABASE_TYPE);
db.set("name", dbName);
db.set("description", randomString());
Referenceable table = new Referenceable(HIVE_TABLE_TYPE);
final String tableName = randomString();
table.set("name", tableName);
table.set("db", db);
sendHookMessage(table);
waitFor(1000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results =
serviceClient.searchByDSL(String.format("%s where name='%s'", HIVE_TABLE_TYPE, tableName));
return results.length() == 1;
}
});
JSONArray results =
serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
Assert.assertEquals(results.length(), 1);
}
}
......@@ -21,8 +21,10 @@ import com.google.inject.Provider;
import com.google.inject.TypeLiteral;
import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.util.TitanCleanup;
import com.tinkerpop.blueprints.Graph;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.repository.graph.GraphProvider;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -31,6 +33,7 @@ import javax.servlet.ServletContextEvent;
public class TestGuiceServletConfig extends GuiceServletConfig {
private static final Logger LOG = LoggerFactory.getLogger(TestGuiceServletConfig.class);
private boolean servicesEnabled;
@Override
public void contextInitialized(ServletContextEvent servletContextEvent) {
......@@ -40,6 +43,8 @@ public class TestGuiceServletConfig extends GuiceServletConfig {
@Override
public void contextDestroyed(ServletContextEvent servletContextEvent) {
super.contextDestroyed(servletContextEvent);
if(injector != null) {
TypeLiteral<GraphProvider<TitanGraph>> graphProviderType = new TypeLiteral<GraphProvider<TitanGraph>>() {};
Provider<GraphProvider<TitanGraph>> graphProvider = injector.getProvider(Key.get(graphProviderType));
......@@ -47,11 +52,30 @@ public class TestGuiceServletConfig extends GuiceServletConfig {
LOG.info("Clearing graph store");
try {
graph.shutdown();
TitanCleanup.clear(graph);
} catch (Exception e) {
LOG.warn("Clearing graph store failed ", e);
}
}
}
@Override
protected void startServices() {
try {
Configuration conf = ApplicationProperties.get();
servicesEnabled = conf.getBoolean("atlas.services.enabled", true);
if (servicesEnabled) {
super.startServices();
}
} catch (AtlasException e) {
throw new RuntimeException(e);
}
}
@Override
protected void stopServices() {
if (servicesEnabled) {
super.stopServices();
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment