Commit faad323e by Suma Shivaprasad

ATLAS-599 HDFS Path Model (sumasai via yhemanth)

parent 6b0f7d82
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>apache-atlas</artifactId>
<groupId>org.apache.atlas</groupId>
<version>0.7-incubating-SNAPSHOT</version>
<relativePath>../../</relativePath>
</parent>
<artifactId>hdfs-model</artifactId>
<description>Apache Atlas FileSystem Model</description>
<name>Apache Atlas FileSystem Model</name>
<packaging>jar</packaging>
<dependencies>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<!-- apache atlas core dependencies -->
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-typesystem</artifactId>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-notification</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
</dependency>
<!-- apache hdfs core dependencies -->
<!-- Testing dependencies -->
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
<!-- to bring up atlas server for integration tests -->
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-webapp</artifactId>
<type>war</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-repository</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-site-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.maven.doxia</groupId>
<artifactId>doxia-module-twiki</artifactId>
<version>1.3</version>
</dependency>
</dependencies>
<executions>
<execution>
<goals>
<goal>site</goal>
</goals>
<phase>prepare-package</phase>
</execution>
</executions>
<configuration>
<generateProjectInfo>false</generateProjectInfo>
<generateReports>false</generateReports>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<inherited>false</inherited>
<executions>
<execution>
<configuration>
<mainClass>org.apache.atlas.fs.model.FSDataModelGenerator</mainClass>
<arguments>
<argument>${project.build.directory}/models/fs_model.json</argument>
</arguments>
</configuration>
<phase>package</phase>
<goals>
<goal>java</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.fs.model;
import org.apache.atlas.addons.ModelDefinitionDump;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.TypesSerialization;
import java.io.IOException;
public class FSDataModelGenerator {
public static void main(String[] args) throws IOException {
FSDataModel.main(args);
TypesDef typesDef = FSDataModel.typesDef();
String fsTypesAsJSON = TypesSerialization.toJson(typesDef);
if (args.length == 1) {
ModelDefinitionDump.dumpModelToFile(args[0], fsTypesAsJSON);
return;
}
System.out.println("FS Data Model as JSON = " + fsTypesAsJSON);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.fs.model
import org.apache.atlas.{AtlasConstants, AtlasClient}
import org.apache.atlas.typesystem.TypesDef
import org.apache.atlas.typesystem.builders.TypesBuilder
import org.apache.atlas.typesystem.json.TypesSerialization
import org.apache.atlas.typesystem.types.DataTypes.MapType
import org.apache.hadoop.fs.permission.FsAction
import scala.tools.scalap.scalax.rules.scalasig.ClassFileParser.EnumConstValue
/**
* This represents the data model for a HDFS Path
*/
object FSDataModel extends App {
var typesDef : TypesDef = null
val typesBuilder = new TypesBuilder
import typesBuilder._
typesDef = types {
// FS DataSet
_class(FSDataTypes.FS_PATH.toString, List("DataSet", AtlasClient.REFERENCEABLE_SUPER_TYPE)) {
//fully qualified path/URI to the filesystem path is stored in 'qualifiedName' and 'path'.
"path" ~ (string, required, indexed)
"createTime" ~ (date, optional, indexed)
"modifiedTime" ~ (date, optional, indexed)
//Is a regular file or a directory. If true, it is a file else a directory
"isFile" ~ (boolean, optional, indexed)
//Is a symlink or not
"isSymlink" ~ (boolean, optional, indexed)
//Optional and may not be set for a directory
"fileSize" ~ (long, optional, indexed)
"owner" ~ (string, optional, indexed)
"group" ~ (string, optional, indexed)
"posixPermissions" ~ (FSDataTypes.FS_PERMISSIONS.toString, optional, indexed)
}
enum(FSDataTypes.FS_ACTION.toString, FsAction.values().map(x => x.name()) : _*)
struct(FSDataTypes.FS_PERMISSIONS.toString) {
PosixPermissions.PERM_USER.toString ~ (FSDataTypes.FS_ACTION.toString, required, indexed)
PosixPermissions.PERM_GROUP.toString ~ (FSDataTypes.FS_ACTION.toString, required, indexed)
PosixPermissions.PERM_OTHER.toString ~ (FSDataTypes.FS_ACTION.toString, required, indexed)
PosixPermissions.STICKY_BIT.toString ~ (boolean, required, indexed)
}
//HDFS DataSet
_class(FSDataTypes.HDFS_PATH.toString, List(FSDataTypes.FS_PATH.toString)) {
//Making cluster optional since path is already unique containing the namenode URI
AtlasConstants.CLUSTER_NAME_ATTRIBUTE ~ (string, optional, indexed)
"numberOfReplicas" ~ (int, optional, indexed)
"extendedAttributes" ~ (map(string, string), optional, indexed)
}
//TODO - ACLs - https://hadoop.apache.org/docs/r2.7.1/hadoop-project-dist/hadoop-hdfs/HdfsPermissionsGuide.html#ACLs_Access_Control_Lists
}
// add the types to atlas
val typesAsJSON = TypesSerialization.toJson(typesDef)
println("FS Data Model as JSON: ")
println(typesAsJSON)
}
object FSDataTypes extends Enumeration {
type FSDataTypes = Value
val FS_ACTION = Value("file_action")
val FS_PATH = Value("fs_path")
val HDFS_PATH = Value("hdfs_path")
val FS_PERMISSIONS = Value("fs_permissions")
}
object PosixPermissions extends Enumeration {
type PosixPermissions = Value
val PERM_USER = Value("user")
val PERM_GROUP = Value("group")
val PERM_OTHER = Value("others")
val STICKY_BIT = Value("sticky")
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.fs.model;
import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.util.TitanCleanup;
import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.repository.graph.GraphProvider;
import org.apache.atlas.services.MetadataService;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import scala.Enumeration;
import scala.collection.Iterator;
import javax.inject.Inject;
@Test
@Guice(modules = RepositoryMetadataModule.class)
public class HDFSModelTest {
public static final Logger LOG = LoggerFactory.getLogger(HDFSModelTest.class);
private static final String ATLAS_URL = "http://localhost:21000/";
@Inject
private MetadataService metadataService;
@Inject
private GraphProvider<TitanGraph> graphProvider;
@BeforeClass
public void setUp() throws Exception {
}
@AfterClass
public void tearDown() throws Exception {
TypeSystem.getInstance().reset();
try {
//TODO - Fix failure during shutdown while using BDB
graphProvider.get().shutdown();
} catch(Exception e) {
e.printStackTrace();
}
try {
TitanCleanup.clear(graphProvider.get());
} catch(Exception e) {
e.printStackTrace();
}
}
@Test
public void testCreateDataModel() throws Exception {
FSDataModel.main(new String[]{});
TypesDef fsTypesDef = FSDataModel.typesDef();
String fsTypesAsJSON = TypesSerialization.toJson(fsTypesDef);
LOG.info("fsTypesAsJSON = {}", fsTypesAsJSON);
metadataService.createType(fsTypesAsJSON);
// verify types are registered
final Iterator<Enumeration.Value> valueIterator = FSDataTypes.values().iterator();
while (valueIterator.hasNext() ) {
final Enumeration.Value typeEnum = valueIterator.next();
String typeDefStr = metadataService.getTypeDefinition(typeEnum.toString());
Assert.assertNotNull(typeDefStr);
TypesDef typesDef = TypesSerialization.fromJson(typeDefStr);
Assert.assertNotNull(typesDef);
}
}
}
\ No newline at end of file
...@@ -64,6 +64,11 @@ ...@@ -64,6 +64,11 @@
<dependency> <dependency>
<groupId>org.apache.atlas</groupId> <groupId>org.apache.atlas</groupId>
<artifactId>hdfs-model</artifactId>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>hive-bridge</artifactId> <artifactId>hive-bridge</artifactId>
</dependency> </dependency>
......
...@@ -26,6 +26,7 @@ import backtype.storm.generated.TopologyInfo; ...@@ -26,6 +26,7 @@ import backtype.storm.generated.TopologyInfo;
import backtype.storm.utils.Utils; import backtype.storm.utils.Utils;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants; import org.apache.atlas.AtlasConstants;
import org.apache.atlas.fs.model.FSDataTypes;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator; import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.hook.AtlasHook;
...@@ -33,6 +34,7 @@ import org.apache.atlas.storm.model.StormDataTypes; ...@@ -33,6 +34,7 @@ import org.apache.atlas.storm.model.StormDataTypes;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -208,14 +210,17 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -208,14 +210,17 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
break; break;
case "HdfsBolt": case "HdfsBolt":
dataSetReferenceable = new Referenceable(StormDataTypes.HDFS_DATA_SET.getName()); dataSetReferenceable = new Referenceable(FSDataTypes.HDFS_PATH().toString());
String hdfsUri = config.get("HdfsBolt.rotationActions") == null String hdfsUri = config.get("HdfsBolt.rotationActions") == null
? config.get("HdfsBolt.fileNameFormat.path") ? config.get("HdfsBolt.fileNameFormat.path")
: config.get("HdfsBolt.rotationActions"); : config.get("HdfsBolt.rotationActions");
final String hdfsPath = config.get("HdfsBolt.fsUrl") + hdfsUri; final String hdfsPathStr = config.get("HdfsBolt.fsUrl") + hdfsUri;
dataSetReferenceable.set("pathURI", hdfsPath); dataSetReferenceable.set(HiveDataModelGenerator.CLUSTER_NAME, getClusterName(stormConf));
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, hdfsPathStr);
dataSetReferenceable.set("path", hdfsPathStr);
dataSetReferenceable.set("owner", stormConf.get("hdfs.kerberos.principal")); dataSetReferenceable.set("owner", stormConf.get("hdfs.kerberos.principal"));
dataSetReferenceable.set("name", hdfsPath); final Path hdfsPath = new Path(hdfsPathStr);
dataSetReferenceable.set(AtlasClient.NAME, hdfsPath.getName());
break; break;
case "HiveBolt": case "HiveBolt":
......
...@@ -35,7 +35,6 @@ public enum StormDataTypes { ...@@ -35,7 +35,6 @@ public enum StormDataTypes {
KAFKA_TOPIC, // kafka data set KAFKA_TOPIC, // kafka data set
JMS_TOPIC, // jms data set JMS_TOPIC, // jms data set
HBASE_TABLE, // hbase table data set HBASE_TABLE, // hbase table data set
HDFS_DATA_SET, // HDFS data set
; ;
public String getName() { public String getName() {
......
...@@ -100,15 +100,6 @@ object StormDataModel extends App { ...@@ -100,15 +100,6 @@ object StormDataModel extends App {
"owner" ~ (string, required, indexed) "owner" ~ (string, required, indexed)
} }
// HDFS Data Set
// todo: replace this with a generic data model for HDFS data sets
// todo: should only be used in light of storm
_class(StormDataTypes.HDFS_DATA_SET.getName, List("DataSet")) {
// fully qualified path to file or dir
"pathURI" ~ (string, required, unique, indexed)
"owner" ~ (string, required, indexed)
}
_trait("DataProcessor") { _trait("DataProcessor") {
} }
......
...@@ -434,6 +434,7 @@ ...@@ -434,6 +434,7 @@
<module>dashboard</module> <module>dashboard</module>
<module>webapp</module> <module>webapp</module>
<module>docs</module> <module>docs</module>
<module>addons/hdfs-model</module>
<module>addons/hive-bridge</module> <module>addons/hive-bridge</module>
<module>addons/falcon-bridge</module> <module>addons/falcon-bridge</module>
<module>addons/sqoop-bridge</module> <module>addons/sqoop-bridge</module>
...@@ -1029,6 +1030,12 @@ ...@@ -1029,6 +1030,12 @@
<dependency> <dependency>
<groupId>org.apache.atlas</groupId> <groupId>org.apache.atlas</groupId>
<artifactId>hdfs-model</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>falcon-bridge</artifactId> <artifactId>falcon-bridge</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
......
...@@ -13,6 +13,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ...@@ -13,6 +13,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES: ALL CHANGES:
ATLAS-599 HDFS Path Model (sumasai via yhemanth)
ATLAS-553 Entity mutation - Fix issue with reordering of elements in array<class> with composite references (sumasai via shwethags) ATLAS-553 Entity mutation - Fix issue with reordering of elements in array<class> with composite references (sumasai via shwethags)
ATLAS-513 Admin support for HA (yhemanth via sumasai) ATLAS-513 Admin support for HA (yhemanth via sumasai)
ATLAS-511 Ability to run multiple instances of Atlas Server with automatic failover to one active server (yhemanth via shwethags) ATLAS-511 Ability to run multiple instances of Atlas Server with automatic failover to one active server (yhemanth via shwethags)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment