Commit def9e385 by Madhan Neethiraj

ATLAS-1272: updated types bootstrap to load from new format typedef JSON files

parent 2ea3a455
...@@ -355,24 +355,6 @@ ...@@ -355,24 +355,6 @@
<version>1.2.1</version> <version>1.2.1</version>
<inherited>false</inherited> <inherited>false</inherited>
<executions> <executions>
<execution>
<configuration>
<mainClass>org.apache.atlas.falcon.model.FalconDataModelGenerator</mainClass>
<systemProperties>
<systemProperty>
<key>atlas.conf</key>
<value>${project.build.directory}/../../../typesystem/target/test-classes</value>
</systemProperty>
</systemProperties>
<arguments>
<argument>${project.build.directory}/models/falcon_model.json</argument>
</arguments>
</configuration>
<phase>package</phase>
<goals>
<goal>java</goal>
</goals>
</execution>
</executions> </executions>
</plugin> </plugin>
</plugins> </plugins>
......
...@@ -21,11 +21,8 @@ package org.apache.atlas.falcon.bridge; ...@@ -21,11 +21,8 @@ package org.apache.atlas.falcon.bridge;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants; import org.apache.atlas.AtlasConstants;
import org.apache.atlas.falcon.Util.EventUtil; import org.apache.atlas.falcon.Util.EventUtil;
import org.apache.atlas.falcon.model.FalconDataModelGenerator;
import org.apache.atlas.falcon.model.FalconDataTypes; import org.apache.atlas.falcon.model.FalconDataTypes;
import org.apache.atlas.fs.model.FSDataTypes;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
...@@ -61,6 +58,15 @@ import java.util.Map; ...@@ -61,6 +58,15 @@ import java.util.Map;
public class FalconBridge { public class FalconBridge {
private static final Logger LOG = LoggerFactory.getLogger(FalconBridge.class); private static final Logger LOG = LoggerFactory.getLogger(FalconBridge.class);
public static final String COLO = "colo";
public static final String TAGS = "tags";
public static final String GROUPS = "groups";
public static final String PIPELINES = "pipelines";
public static final String WFPROPERTIES = "workflow-properties";
public static final String RUNSON = "runs-on";
public static final String STOREDIN = "stored-in";
public static final String FREQUENCY = "frequency";
/** /**
* Creates cluster entity * Creates cluster entity
* *
...@@ -77,14 +83,14 @@ public class FalconBridge { ...@@ -77,14 +83,14 @@ public class FalconBridge {
clusterRef.set(AtlasClient.DESCRIPTION, cluster.getDescription()); clusterRef.set(AtlasClient.DESCRIPTION, cluster.getDescription());
clusterRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, cluster.getName()); clusterRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, cluster.getName());
clusterRef.set(FalconDataModelGenerator.COLO, cluster.getColo()); clusterRef.set(FalconBridge.COLO, cluster.getColo());
if (cluster.getACL() != null) { if (cluster.getACL() != null) {
clusterRef.set(AtlasClient.OWNER, cluster.getACL().getGroup()); clusterRef.set(AtlasClient.OWNER, cluster.getACL().getGroup());
} }
if (StringUtils.isNotEmpty(cluster.getTags())) { if (StringUtils.isNotEmpty(cluster.getTags())) {
clusterRef.set(FalconDataModelGenerator.TAGS, clusterRef.set(FalconBridge.TAGS,
EventUtil.convertKeyValueStringToMap(cluster.getTags())); EventUtil.convertKeyValueStringToMap(cluster.getTags()));
} }
...@@ -100,19 +106,19 @@ public class FalconBridge { ...@@ -100,19 +106,19 @@ public class FalconBridge {
String feedQualifiedName = String feedQualifiedName =
getFeedQualifiedName(feed.getName(), (String) clusterReferenceable.get(AtlasClient.NAME)); getFeedQualifiedName(feed.getName(), (String) clusterReferenceable.get(AtlasClient.NAME));
feedEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feedQualifiedName); feedEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feedQualifiedName);
feedEntity.set(FalconDataModelGenerator.FREQUENCY, feed.getFrequency().toString()); feedEntity.set(FalconBridge.FREQUENCY, feed.getFrequency().toString());
feedEntity.set(FalconDataModelGenerator.STOREDIN, clusterReferenceable); feedEntity.set(FalconBridge.STOREDIN, clusterReferenceable);
if (feed.getACL() != null) { if (feed.getACL() != null) {
feedEntity.set(AtlasClient.OWNER, feed.getACL().getOwner()); feedEntity.set(AtlasClient.OWNER, feed.getACL().getOwner());
} }
if (StringUtils.isNotEmpty(feed.getTags())) { if (StringUtils.isNotEmpty(feed.getTags())) {
feedEntity.set(FalconDataModelGenerator.TAGS, feedEntity.set(FalconBridge.TAGS,
EventUtil.convertKeyValueStringToMap(feed.getTags())); EventUtil.convertKeyValueStringToMap(feed.getTags()));
} }
if (feed.getGroups() != null) { if (feed.getGroups() != null) {
feedEntity.set(FalconDataModelGenerator.GROUPS, feed.getGroups()); feedEntity.set(FalconBridge.GROUPS, feed.getGroups());
} }
return feedEntity; return feedEntity;
...@@ -165,7 +171,7 @@ public class FalconBridge { ...@@ -165,7 +171,7 @@ public class FalconBridge {
feedCreateEntity.set(AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS, outputs); feedCreateEntity.set(AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS, outputs);
} }
feedCreateEntity.set(FalconDataModelGenerator.STOREDIN, clusterReferenceable); feedCreateEntity.set(FalconBridge.STOREDIN, clusterReferenceable);
entities.add(feedCreateEntity); entities.add(feedCreateEntity);
} }
...@@ -244,7 +250,7 @@ public class FalconBridge { ...@@ -244,7 +250,7 @@ public class FalconBridge {
processEntity.set(AtlasClient.NAME, process.getName()); processEntity.set(AtlasClient.NAME, process.getName());
processEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
getProcessQualifiedName(process.getName(), cluster.getName())); getProcessQualifiedName(process.getName(), cluster.getName()));
processEntity.set(FalconDataModelGenerator.FREQUENCY, process.getFrequency().toString()); processEntity.set(FalconBridge.FREQUENCY, process.getFrequency().toString());
if (!inputs.isEmpty()) { if (!inputs.isEmpty()) {
processEntity.set(AtlasClient.PROCESS_ATTRIBUTE_INPUTS, inputs); processEntity.set(AtlasClient.PROCESS_ATTRIBUTE_INPUTS, inputs);
...@@ -254,7 +260,7 @@ public class FalconBridge { ...@@ -254,7 +260,7 @@ public class FalconBridge {
} }
// set cluster // set cluster
processEntity.set(FalconDataModelGenerator.RUNSON, clusterReferenceable); processEntity.set(FalconBridge.RUNSON, clusterReferenceable);
// Set user // Set user
if (process.getACL() != null) { if (process.getACL() != null) {
...@@ -262,15 +268,15 @@ public class FalconBridge { ...@@ -262,15 +268,15 @@ public class FalconBridge {
} }
if (StringUtils.isNotEmpty(process.getTags())) { if (StringUtils.isNotEmpty(process.getTags())) {
processEntity.set(FalconDataModelGenerator.TAGS, processEntity.set(FalconBridge.TAGS,
EventUtil.convertKeyValueStringToMap(process.getTags())); EventUtil.convertKeyValueStringToMap(process.getTags()));
} }
if (process.getPipelines() != null) { if (process.getPipelines() != null) {
processEntity.set(FalconDataModelGenerator.PIPELINES, process.getPipelines()); processEntity.set(FalconBridge.PIPELINES, process.getPipelines());
} }
processEntity.set(FalconDataModelGenerator.WFPROPERTIES, processEntity.set(FalconBridge.WFPROPERTIES,
getProcessEntityWFProperties(process.getWorkflow(), getProcessEntityWFProperties(process.getWorkflow(),
process.getName())); process.getName()));
...@@ -319,7 +325,7 @@ public class FalconBridge { ...@@ -319,7 +325,7 @@ public class FalconBridge {
private static List<Referenceable> fillHDFSDataSet(final String pathUri, final String clusterName) { private static List<Referenceable> fillHDFSDataSet(final String pathUri, final String clusterName) {
List<Referenceable> entities = new ArrayList<>(); List<Referenceable> entities = new ArrayList<>();
Referenceable ref = new Referenceable(FSDataTypes.HDFS_PATH().toString()); Referenceable ref = new Referenceable(HiveMetaStoreBridge.HDFS_PATH);
ref.set("path", pathUri); ref.set("path", pathUri);
// Path path = new Path(pathUri); // Path path = new Path(pathUri);
// ref.set("name", path.getName()); // ref.set("name", path.getName());
...@@ -352,7 +358,7 @@ public class FalconBridge { ...@@ -352,7 +358,7 @@ public class FalconBridge {
tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName)); HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName));
tableRef.set(AtlasClient.NAME, tableName.toLowerCase()); tableRef.set(AtlasClient.NAME, tableName.toLowerCase());
tableRef.set(HiveDataModelGenerator.DB, dbRef); tableRef.set(HiveMetaStoreBridge.DB, dbRef);
entities.add(tableRef); entities.add(tableRef);
return entities; return entities;
...@@ -364,7 +370,7 @@ public class FalconBridge { ...@@ -364,7 +370,7 @@ public class FalconBridge {
Referenceable clusterRef = new Referenceable(FalconDataTypes.FALCON_CLUSTER.getName()); Referenceable clusterRef = new Referenceable(FalconDataTypes.FALCON_CLUSTER.getName());
clusterRef.set(AtlasClient.NAME, String.format("%s", clusterName)); clusterRef.set(AtlasClient.NAME, String.format("%s", clusterName));
clusterRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, clusterName); clusterRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, clusterName);
clusterRef.set(FalconDataModelGenerator.COLO, colo); clusterRef.set(FalconBridge.COLO, colo);
return clusterRef; return clusterRef;
} }
...@@ -375,8 +381,8 @@ public class FalconBridge { ...@@ -375,8 +381,8 @@ public class FalconBridge {
feedDatasetRef.set(AtlasClient.NAME, feed.getName()); feedDatasetRef.set(AtlasClient.NAME, feed.getName());
feedDatasetRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getFeedQualifiedName(feed.getName(), feedDatasetRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getFeedQualifiedName(feed.getName(),
(String) clusterReference.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME))); (String) clusterReference.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME)));
feedDatasetRef.set(FalconDataModelGenerator.STOREDIN, clusterReference); feedDatasetRef.set(FalconBridge.STOREDIN, clusterReference);
feedDatasetRef.set(FalconDataModelGenerator.FREQUENCY, feed.getFrequency()); feedDatasetRef.set(FalconBridge.FREQUENCY, feed.getFrequency());
return feedDatasetRef; return feedDatasetRef;
} }
......
...@@ -23,9 +23,7 @@ import org.apache.atlas.ApplicationProperties; ...@@ -23,9 +23,7 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException; import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.falcon.bridge.FalconBridge; import org.apache.atlas.falcon.bridge.FalconBridge;
import org.apache.atlas.falcon.model.FalconDataModelGenerator;
import org.apache.atlas.falcon.model.FalconDataTypes; import org.apache.atlas.falcon.model.FalconDataTypes;
import org.apache.atlas.fs.model.FSDataTypes;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
...@@ -47,7 +45,6 @@ import org.apache.falcon.entity.v0.feed.Location; ...@@ -47,7 +45,6 @@ import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType; import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.security.CurrentUser; import org.apache.falcon.security.CurrentUser;
import org.apache.hadoop.hive.conf.HiveConf;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test; import org.testng.annotations.Test;
...@@ -84,24 +81,9 @@ public class FalconHookIT { ...@@ -84,24 +81,9 @@ public class FalconHookIT {
AtlasService service = new AtlasService(); AtlasService service = new AtlasService();
service.init(); service.init();
STORE.registerListener(service); STORE.registerListener(service);
registerFalconDataModel();
CurrentUser.authenticate(System.getProperty("user.name")); CurrentUser.authenticate(System.getProperty("user.name"));
} }
private void registerFalconDataModel() throws Exception {
if (isDataModelAlreadyRegistered()) {
LOG.info("Falcon data model is already registered!");
return;
}
HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(ApplicationProperties.get(), new HiveConf(), atlasClient);
hiveMetaStoreBridge.registerHiveDataModel();
FalconDataModelGenerator dataModelGenerator = new FalconDataModelGenerator();
LOG.info("Registering Falcon data model");
atlasClient.createType(dataModelGenerator.getModelAsJson());
}
private boolean isDataModelAlreadyRegistered() throws Exception { private boolean isDataModelAlreadyRegistered() throws Exception {
try { try {
atlasClient.getType(FalconDataTypes.FALCON_PROCESS.getName()); atlasClient.getType(FalconDataTypes.FALCON_PROCESS.getName());
...@@ -196,7 +178,7 @@ public class FalconHookIT { ...@@ -196,7 +178,7 @@ public class FalconHookIT {
String inputId = ((List<Id>) processEntity.get("inputs")).get(0).getId()._getId(); String inputId = ((List<Id>) processEntity.get("inputs")).get(0).getId()._getId();
Referenceable pathEntity = atlasClient.getEntity(inputId); Referenceable pathEntity = atlasClient.getEntity(inputId);
assertEquals(pathEntity.getTypeName(), FSDataTypes.HDFS_PATH().toString()); assertEquals(pathEntity.getTypeName(), HiveMetaStoreBridge.HDFS_PATH.toString());
List<Location> locations = FeedHelper.getLocations(feedCluster, feed); List<Location> locations = FeedHelper.getLocations(feedCluster, feed);
Location dataLocation = FileSystemStorage.getLocation(locations, LocationType.DATA); Location dataLocation = FileSystemStorage.getLocation(locations, LocationType.DATA);
...@@ -243,7 +225,7 @@ public class FalconHookIT { ...@@ -243,7 +225,7 @@ public class FalconHookIT {
private void assertFeedAttributes(String feedId) throws Exception { private void assertFeedAttributes(String feedId) throws Exception {
Referenceable feedEntity = atlasClient.getEntity(feedId); Referenceable feedEntity = atlasClient.getEntity(feedId);
assertEquals(feedEntity.get(AtlasClient.OWNER), "testuser"); assertEquals(feedEntity.get(AtlasClient.OWNER), "testuser");
assertEquals(feedEntity.get(FalconDataModelGenerator.FREQUENCY), "hours(1)"); assertEquals(feedEntity.get(FalconBridge.FREQUENCY), "hours(1)");
assertEquals(feedEntity.get(AtlasClient.DESCRIPTION), "test input"); assertEquals(feedEntity.get(AtlasClient.DESCRIPTION), "test input");
} }
......
...@@ -180,24 +180,6 @@ ...@@ -180,24 +180,6 @@
<version>1.2.1</version> <version>1.2.1</version>
<inherited>false</inherited> <inherited>false</inherited>
<executions> <executions>
<execution>
<configuration>
<mainClass>org.apache.atlas.fs.model.FSDataModelGenerator</mainClass>
<systemProperties>
<systemProperty>
<key>atlas.conf</key>
<value>${project.build.directory}/../../../typesystem/src/test/resources/</value>
</systemProperty>
</systemProperties>
<arguments>
<argument>${project.build.directory}/models/fs_model.json</argument>
</arguments>
</configuration>
<phase>package</phase>
<goals>
<goal>java</goal>
</goals>
</execution>
</executions> </executions>
</plugin> </plugin>
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.fs.model;
import org.apache.atlas.addons.ModelDefinitionDump;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.TypesSerialization;
import java.io.IOException;
public class FSDataModelGenerator {
public static void main(String[] args) throws IOException {
FSDataModel.main(args);
TypesDef typesDef = FSDataModel.typesDef();
String fsTypesAsJSON = TypesSerialization.toJson(typesDef);
if (args.length == 1) {
ModelDefinitionDump.dumpModelToFile(args[0], fsTypesAsJSON);
return;
}
System.out.println("FS Data Model as JSON = " + fsTypesAsJSON);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.fs.model
import org.apache.atlas.{AtlasConstants, AtlasClient}
import org.apache.atlas.typesystem.TypesDef
import org.apache.atlas.typesystem.builders.TypesBuilder
import org.apache.atlas.typesystem.json.TypesSerialization
import org.apache.atlas.typesystem.types.DataTypes.MapType
import org.apache.hadoop.fs.permission.FsAction
import scala.tools.scalap.scalax.rules.scalasig.ClassFileParser.EnumConstValue
/**
* This represents the data model for a HDFS Path
*/
object FSDataModel extends App {
val typesBuilder = new TypesBuilder
import typesBuilder._
val typesDef : TypesDef = types {
// FS DataSet
_class(FSDataTypes.FS_PATH.toString, List(AtlasClient.DATA_SET_SUPER_TYPE)) {
//fully qualified path/URI to the filesystem path is stored in 'qualifiedName' and 'path'.
"path" ~ (string, required, indexed)
"createTime" ~ (date, optional, indexed)
"modifiedTime" ~ (date, optional, indexed)
//Is a regular file or a directory. If true, it is a file else a directory
"isFile" ~ (boolean, optional, indexed)
//Is a symlink or not
"isSymlink" ~ (boolean, optional)
//Optional and may not be set for a directory
"fileSize" ~ (long, optional)
"group" ~ (string, optional, indexed)
"posixPermissions" ~ (FSDataTypes.FS_PERMISSIONS.toString, optional, indexed)
}
enum(FSDataTypes.FS_ACTION.toString, FsAction.values().map(x => x.name()) : _*)
struct(FSDataTypes.FS_PERMISSIONS.toString) {
PosixPermissions.PERM_USER.toString ~ (FSDataTypes.FS_ACTION.toString, required, indexed)
PosixPermissions.PERM_GROUP.toString ~ (FSDataTypes.FS_ACTION.toString, required, indexed)
PosixPermissions.PERM_OTHER.toString ~ (FSDataTypes.FS_ACTION.toString, required, indexed)
PosixPermissions.STICKY_BIT.toString ~ (boolean, required, indexed)
}
//HDFS DataSet
_class(FSDataTypes.HDFS_PATH.toString, List(FSDataTypes.FS_PATH.toString)) {
//Making cluster optional since path is already unique containing the namenode URI
AtlasConstants.CLUSTER_NAME_ATTRIBUTE ~ (string, optional, indexed)
"numberOfReplicas" ~ (int, optional, indexed)
"extendedAttributes" ~ (map(string, string), optional)
}
//TODO - ACLs - https://hadoop.apache.org/docs/r2.7.1/hadoop-project-dist/hadoop-hdfs/HdfsPermissionsGuide.html#ACLs_Access_Control_Lists
}
// add the types to atlas
val typesAsJSON = TypesSerialization.toJson(typesDef)
println("FS Data Model as JSON: ")
println(typesAsJSON)
}
object FSDataTypes extends Enumeration {
type FSDataTypes = Value
val FS_ACTION = Value("file_action")
val FS_PATH = Value("fs_path")
val HDFS_PATH = Value("hdfs_path")
val FS_PERMISSIONS = Value("fs_permissions")
}
object PosixPermissions extends Enumeration {
type PosixPermissions = Value
val PERM_USER = Value("user")
val PERM_GROUP = Value("group")
val PERM_OTHER = Value("others")
val STICKY_BIT = Value("sticky")
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.fs.model;
import javax.inject.Inject;
import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.services.MetadataService;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import scala.Enumeration;
import scala.collection.Iterator;
@Test
@Guice(modules = RepositoryMetadataModule.class)
public class HDFSModelTest {
public static final Logger LOG = LoggerFactory.getLogger(HDFSModelTest.class);
private static final String ATLAS_URL = "http://localhost:21000/";
@Inject
private MetadataService metadataService;
@BeforeClass
public void setUp() throws Exception {
}
@AfterClass
public void tearDown() throws Exception {
TypeSystem.getInstance().reset();
AtlasGraphProvider.cleanup();
}
@Test
public void testCreateDataModel() throws Exception {
FSDataModel.main(new String[]{});
TypesDef fsTypesDef = FSDataModel.typesDef();
String fsTypesAsJSON = TypesSerialization.toJson(fsTypesDef);
LOG.info("fsTypesAsJSON = {}", fsTypesAsJSON);
metadataService.createType(fsTypesAsJSON);
// verify types are registered
final Iterator<Enumeration.Value> valueIterator = FSDataTypes.values().iterator();
while (valueIterator.hasNext() ) {
final Enumeration.Value typeEnum = valueIterator.next();
String typeDefStr = metadataService.getTypeDefinition(typeEnum.toString());
Assert.assertNotNull(typeDefStr);
TypesDef typesDef = TypesSerialization.fromJson(typeDefStr);
Assert.assertNotNull(typesDef);
}
}
}
\ No newline at end of file
...@@ -399,24 +399,6 @@ ...@@ -399,24 +399,6 @@
<version>1.2.1</version> <version>1.2.1</version>
<inherited>false</inherited> <inherited>false</inherited>
<executions> <executions>
<execution>
<configuration>
<mainClass>org.apache.atlas.hive.model.HiveDataModelGenerator</mainClass>
<systemProperties>
<systemProperty>
<key>atlas.conf</key>
<value>${project.build.directory}/../../../typesystem/target/test-classes</value>
</systemProperty>
</systemProperties>
<arguments>
<argument>${project.build.directory}/models/hive_model.json</argument>
</arguments>
</configuration>
<phase>package</phase>
<goals>
<goal>java</goal>
</goals>
</execution>
</executions> </executions>
</plugin> </plugin>
</plugins> </plugins>
......
...@@ -19,8 +19,6 @@ ...@@ -19,8 +19,6 @@
package org.apache.atlas.hive.bridge; package org.apache.atlas.hive.bridge;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.hive.hook.HiveHook;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
import org.apache.hadoop.hive.ql.hooks.LineageInfo; import org.apache.hadoop.hive.ql.hooks.LineageInfo;
...@@ -31,7 +29,6 @@ import java.util.ArrayList; ...@@ -31,7 +29,6 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
public class ColumnLineageUtils { public class ColumnLineageUtils {
public static final Logger LOG = LoggerFactory.getLogger(ColumnLineageUtils.class); public static final Logger LOG = LoggerFactory.getLogger(ColumnLineageUtils.class);
...@@ -92,7 +89,7 @@ public class ColumnLineageUtils { ...@@ -92,7 +89,7 @@ public class ColumnLineageUtils {
if (r.getTypeName().equals(HiveDataTypes.HIVE_TABLE.getName())) { if (r.getTypeName().equals(HiveDataTypes.HIVE_TABLE.getName())) {
String qName = (String) r.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME); String qName = (String) r.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
String[] qNameComps = extractComponents(qName); String[] qNameComps = extractComponents(qName);
for (Referenceable col : (List<Referenceable>) r.get(HiveDataModelGenerator.COLUMNS)) { for (Referenceable col : (List<Referenceable>) r.get(HiveMetaStoreBridge.COLUMNS)) {
String cName = (String) col.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME); String cName = (String) col.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
String[] colQNameComps = extractComponents(cName); String[] colQNameComps = extractComponents(cName);
String colQName = colQNameComps[0] + "." + colQNameComps[1] + "." + colQNameComps[2]; String colQName = colQNameComps[0] + "." + colQNameComps[1] + "." + colQNameComps[2];
......
...@@ -25,7 +25,6 @@ import org.apache.atlas.AtlasClient; ...@@ -25,7 +25,6 @@ import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants; import org.apache.atlas.AtlasConstants;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.bridge.ColumnLineageUtils; import org.apache.atlas.hive.bridge.ColumnLineageUtils;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.notification.hook.HookNotification;
...@@ -411,10 +410,10 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -411,10 +410,10 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
Referenceable tableEntity = tables.get(Type.TABLE); Referenceable tableEntity = tables.get(Type.TABLE);
//Reset regular column QF Name to old Name and create a new partial notification request to replace old column QFName to newName to retain any existing traits //Reset regular column QF Name to old Name and create a new partial notification request to replace old column QFName to newName to retain any existing traits
replaceColumnQFName(event, (List<Referenceable>) tableEntity.get(HiveDataModelGenerator.COLUMNS), oldQualifiedName, newQualifiedName); replaceColumnQFName(event, (List<Referenceable>) tableEntity.get(HiveMetaStoreBridge.COLUMNS), oldQualifiedName, newQualifiedName);
//Reset partition key column QF Name to old Name and create a new partial notification request to replace old column QFName to newName to retain any existing traits //Reset partition key column QF Name to old Name and create a new partial notification request to replace old column QFName to newName to retain any existing traits
replaceColumnQFName(event, (List<Referenceable>) tableEntity.get(HiveDataModelGenerator.PART_COLS), oldQualifiedName, newQualifiedName); replaceColumnQFName(event, (List<Referenceable>) tableEntity.get(HiveMetaStoreBridge.PART_COLS), oldQualifiedName, newQualifiedName);
//Reset SD QF Name to old Name and create a new partial notification request to replace old SD QFName to newName to retain any existing traits //Reset SD QF Name to old Name and create a new partial notification request to replace old SD QFName to newName to retain any existing traits
replaceSDQFName(event, tableEntity, oldQualifiedName, newQualifiedName); replaceSDQFName(event, tableEntity, oldQualifiedName, newQualifiedName);
...@@ -437,7 +436,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -437,7 +436,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
ArrayList<String> alias_list = new ArrayList<>(); ArrayList<String> alias_list = new ArrayList<>();
alias_list.add(oldTable.getTableName().toLowerCase()); alias_list.add(oldTable.getTableName().toLowerCase());
newEntity.set(HiveDataModelGenerator.TABLE_ALIAS_LIST, alias_list); newEntity.set(HiveMetaStoreBridge.TABLE_ALIAS_LIST, alias_list);
event.addMessage(new HookNotification.EntityPartialUpdateRequest(event.getUser(), event.addMessage(new HookNotification.EntityPartialUpdateRequest(event.getUser(),
HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
oldTableQFName, newEntity)); oldTableQFName, newEntity));
...@@ -466,7 +465,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -466,7 +465,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private Referenceable replaceSDQFName(final HiveEventContext event, Referenceable tableEntity, final String oldTblQFName, final String newTblQFName) { private Referenceable replaceSDQFName(final HiveEventContext event, Referenceable tableEntity, final String oldTblQFName, final String newTblQFName) {
//Reset storage desc QF Name to old Name //Reset storage desc QF Name to old Name
final Referenceable sdRef = ((Referenceable) tableEntity.get(HiveDataModelGenerator.STORAGE_DESC)); final Referenceable sdRef = ((Referenceable) tableEntity.get(HiveMetaStoreBridge.STORAGE_DESC));
sdRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getStorageDescQFName(oldTblQFName)); sdRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getStorageDescQFName(oldTblQFName));
//Replace SD QF name first to retain tags //Replace SD QF name first to retain tags
......
...@@ -20,7 +20,6 @@ package org.apache.atlas.hive; ...@@ -20,7 +20,6 @@ package org.apache.atlas.hive;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.fs.model.FSDataTypes;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.hook.HiveHookIT; import org.apache.atlas.hive.hook.HiveHookIT;
import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.hive.model.HiveDataTypes;
...@@ -93,7 +92,6 @@ public class HiveITBase { ...@@ -93,7 +92,6 @@ public class HiveITBase {
} }
hiveMetaStoreBridge = new HiveMetaStoreBridge(configuration, conf, atlasClient); hiveMetaStoreBridge = new HiveMetaStoreBridge(configuration, conf, atlasClient);
hiveMetaStoreBridge.registerHiveDataModel();
HiveConf conf = new HiveConf(); HiveConf conf = new HiveConf();
conf.set("hive.exec.post.hooks", ""); conf.set("hive.exec.post.hooks", "");
...@@ -232,7 +230,7 @@ public class HiveITBase { ...@@ -232,7 +230,7 @@ public class HiveITBase {
private String assertHDFSPathIsRegistered(String path) throws Exception { private String assertHDFSPathIsRegistered(String path) throws Exception {
LOG.debug("Searching for hdfs path {}", path); LOG.debug("Searching for hdfs path {}", path);
return assertEntityIsRegistered(FSDataTypes.HDFS_PATH().toString(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, path, null); return assertEntityIsRegistered(HiveMetaStoreBridge.HDFS_PATH, AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, path, null);
} }
protected String assertDatabaseIsRegistered(String dbName) throws Exception { protected String assertDatabaseIsRegistered(String dbName) throws Exception {
......
...@@ -20,7 +20,6 @@ package org.apache.atlas.hive.bridge; ...@@ -20,7 +20,6 @@ package org.apache.atlas.hive.bridge;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException; import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
...@@ -32,7 +31,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; ...@@ -32,7 +31,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextInputFormat;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONException;
import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatcher;
import org.mockito.Mock; import org.mockito.Mock;
...@@ -110,7 +108,7 @@ public class HiveMetaStoreBridgeTest { ...@@ -110,7 +108,7 @@ public class HiveMetaStoreBridgeTest {
// verify update is called on table // verify update is called on table
verify(atlasClient).updateEntity(eq("82e06b34-9151-4023-aa9d-b82103a50e77"), verify(atlasClient).updateEntity(eq("82e06b34-9151-4023-aa9d-b82103a50e77"),
(Referenceable) argThat(new MatchesReferenceableProperty(HiveDataModelGenerator.TABLE_TYPE_ATTR, (Referenceable) argThat(new MatchesReferenceableProperty(HiveMetaStoreBridge.TABLE_TYPE_ATTR,
TableType.EXTERNAL_TABLE.name()))); TableType.EXTERNAL_TABLE.name())));
} }
...@@ -228,7 +226,7 @@ public class HiveMetaStoreBridgeTest { ...@@ -228,7 +226,7 @@ public class HiveMetaStoreBridgeTest {
private Referenceable createTableReference() { private Referenceable createTableReference() {
Referenceable tableReference = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); Referenceable tableReference = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
Referenceable sdReference = new Referenceable(HiveDataTypes.HIVE_STORAGEDESC.getName()); Referenceable sdReference = new Referenceable(HiveDataTypes.HIVE_STORAGEDESC.getName());
tableReference.set(HiveDataModelGenerator.STORAGE_DESC, sdReference); tableReference.set(HiveMetaStoreBridge.STORAGE_DESC, sdReference);
return tableReference; return tableReference;
} }
......
{
"enumDefs": [],
"structDefs": [],
"classificationDefs": [],
"entityDefs": [
{
"name": "Referenceable",
"superTypes": [],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "qualifiedName",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": true
}
]
},
{
"name": "Asset",
"superTypes": [],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "name",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "description",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "owner",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": true,
"isUnique": false
}
]
},
{
"name": "DataSet",
"superTypes": [
"Referenceable",
"Asset"
],
"typeVersion": "1.0",
"attributeDefs": []
},
{
"name": "Infrastructure",
"superTypes": [
"Referenceable",
"Asset"
],
"typeVersion": "1.0",
"attributeDefs": []
},
{
"name": "Process",
"superTypes": [
"Referenceable",
"Asset"
],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "inputs",
"typeName": "array<DataSet>",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "outputs",
"typeName": "array<DataSet>",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
}
]
}
]
}
{
"enumDefs": [
{
"name": "file_action",
"typeVersion": "1.0",
"elementDefs": [
{
"ordinal": 0,
"value": "NONE"
},
{
"ordinal": 1,
"value": "EXECUTE"
},
{
"ordinal": 2,
"value": "WRITE"
},
{
"ordinal": 3,
"value": "WRITE_EXECUTE"
},
{
"ordinal": 4,
"value": "READ"
},
{
"ordinal": 5,
"value": "READ_EXECUTE"
},
{
"ordinal": 6,
"value": "READ_WRITE"
},
{
"ordinal": 7,
"value": "ALL"
}
]
}
],
"structDefs": [
{
"name": "fs_permissions",
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "group",
"typeName": "file_action",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "user",
"typeName": "file_action",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "sticky",
"typeName": "boolean",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "others",
"typeName": "file_action",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
}
]
}
],
"classificationDefs": [],
"entityDefs": [
{
"name": "fs_path",
"superTypes": [
"DataSet"
],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "path",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "createTime",
"typeName": "date",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": true,
"isUnique": false
},
{
"name": "modifiedTime",
"typeName": "date",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": true,
"isUnique": false
},
{
"name": "isFile",
"typeName": "boolean",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": true,
"isUnique": false
},
{
"name": "isSymlink",
"typeName": "boolean",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "fileSize",
"typeName": "long",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "group",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": true,
"isUnique": false
},
{
"name": "posixPermissions",
"typeName": "fs_permissions",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": true,
"isUnique": false
}
]
},
{
"name": "hdfs_path",
"superTypes": [
"fs_path"
],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "clusterName",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": true,
"isUnique": false
},
{
"name": "numberOfReplicas",
"typeName": "int",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": true,
"isUnique": false
},
{
"name": "extendedAttributes",
"typeName": "map<string,string>",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
}
]
}
]
}
{
"enumDefs": [],
"structDefs": [],
"classificationDefs": [],
"entityDefs": [
{
"name": "sqoop_process",
"superTypes": [
"Process"
],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "operation",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "commandlineOpts",
"typeName": "map<string,string>",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "startTime",
"typeName": "date",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "endTime",
"typeName": "date",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "userName",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": true,
"isUnique": false
}
]
},
{
"name": "sqoop_dbdatastore",
"superTypes": [
"DataSet"
],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "dbStoreType",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "storeUse",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "storeUri",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "source",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": true,
"isUnique": false
}
]
}
]
}
{
"enumDefs": [],
"structDefs": [],
"classificationDefs": [],
"entityDefs": [
{
"name": "falcon_feed_replication",
"superTypes": [
"Process"
],
"typeVersion": "1.0",
"attributeDefs": []
},
{
"name": "falcon_cluster",
"superTypes": [
"Infrastructure"
],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "colo",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "tags",
"typeName": "map<string,string>",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
}
]
},
{
"name": "falcon_feed",
"superTypes": [
"DataSet"
],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "frequency",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "stored-in",
"typeName": "falcon_cluster",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "groups",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "tags",
"typeName": "map<string,string>",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
}
]
},
{
"name": "falcon_process",
"superTypes": [
"Process"
],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "frequency",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "runs-on",
"typeName": "falcon_cluster",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "tags",
"typeName": "map<string,string>",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "pipelines",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": true,
"isUnique": false
},
{
"name": "workflow-properties",
"typeName": "map<string,string>",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
}
]
},
{
"name": "falcon_feed_creation",
"superTypes": [
"Process"
],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "stored-in",
"typeName": "falcon_cluster",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
}
]
}
]
}
{
"enumDefs": [],
"structDefs": [],
"classificationDefs": [],
"entityDefs": [
{
"name": "hbase_table",
"superTypes": [
"DataSet"
],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "uri",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
}
]
}
]
}
{
"enumDefs": [],
"structDefs": [],
"classificationDefs": [],
"entityDefs": [
{
"name": "kafka_topic",
"superTypes": [
"DataSet"
],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "topic",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": true
},
{
"name": "uri",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
}
]
},
{
"name": "jms_topic",
"superTypes": [
"DataSet"
],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "topic",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": true
},
{
"name": "uri",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
}
]
}
]
}
{
"enumDefs": [],
"structDefs": [],
"classificationDefs": [],
"entityDefs": [
{
"name": "storm_topology",
"superTypes": [
"Process"
],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "id",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": true
},
{
"name": "startTime",
"typeName": "date",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "endTime",
"typeName": "date",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "conf",
"typeName": "map<string,string>",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "clusterName",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": true,
"isUnique": false
},
{
"name": "nodes",
"typeName": "array<storm_node>",
"cardinality": "LIST",
"constraintDefs": [
{
"type": "foreignKey"
}
],
"isIndexable": false,
"isOptional": false,
"isUnique": false
}
]
},
{
"name": "storm_node",
"superTypes": [],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "name",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "description",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": true,
"isUnique": false
},
{
"name": "driverClass",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "conf",
"typeName": "map<string,string>",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
}
]
},
{
"name": "storm_spout",
"superTypes": [
"storm_node"
],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "outputs",
"typeName": "array<string>",
"cardinality": "LIST",
"isIndexable": false,
"isOptional": false,
"isUnique": false
}
]
},
{
"name": "storm_bolt",
"superTypes": [
"storm_node"
],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "inputs",
"typeName": "array<string>",
"cardinality": "LIST",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "outputs",
"typeName": "array<string>",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
}
]
}
]
}
{
"patches": [
{
"action": "ADD_ATTRIBUTE",
"typeName": "hive_column",
"applyToVersion": "1.0",
"updateToVersion": "1.1",
"params": null,
"attributeDefs": [
{
"name": "position",
"typeName": "int",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
}
]
}
]
}
...@@ -411,24 +411,6 @@ ...@@ -411,24 +411,6 @@
<version>1.2.1</version> <version>1.2.1</version>
<inherited>false</inherited> <inherited>false</inherited>
<executions> <executions>
<execution>
<configuration>
<mainClass>org.apache.atlas.sqoop.model.SqoopDataModelGenerator</mainClass>
<systemProperties>
<systemProperty>
<key>atlas.conf</key>
<value>${project.build.directory}/../../../typesystem/target/test-classes</value>
</systemProperty>
</systemProperties>
<arguments>
<argument>${project.build.directory}/models/sqoop_model.json</argument>
</arguments>
</configuration>
<phase>package</phase>
<goals>
<goal>java</goal>
</goals>
</execution>
</executions> </executions>
</plugin> </plugin>
</plugins> </plugins>
......
...@@ -23,11 +23,9 @@ import org.apache.atlas.ApplicationProperties; ...@@ -23,11 +23,9 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants; import org.apache.atlas.AtlasConstants;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.sqoop.model.SqoopDataModelGenerator;
import org.apache.atlas.sqoop.model.SqoopDataTypes; import org.apache.atlas.sqoop.model.SqoopDataTypes;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
...@@ -55,6 +53,20 @@ public class SqoopHook extends SqoopJobDataPublisher { ...@@ -55,6 +53,20 @@ public class SqoopHook extends SqoopJobDataPublisher {
public static final String ATLAS_CLUSTER_NAME = "atlas.cluster.name"; public static final String ATLAS_CLUSTER_NAME = "atlas.cluster.name";
public static final String DEFAULT_CLUSTER_NAME = "primary"; public static final String DEFAULT_CLUSTER_NAME = "primary";
public static final String USER = "userName";
public static final String DB_STORE_TYPE = "dbStoreType";
public static final String DB_STORE_USAGE = "storeUse";
public static final String SOURCE = "source";
public static final String DESCRIPTION = "description";
public static final String STORE_URI = "storeUri";
public static final String OPERATION = "operation";
public static final String START_TIME = "startTime";
public static final String END_TIME = "endTime";
public static final String CMD_LINE_OPTS = "commandlineOpts";
// multiple inputs and outputs for process
public static final String INPUTS = "inputs";
public static final String OUTPUTS = "outputs";
static { static {
org.apache.hadoop.conf.Configuration.addDefaultResource("sqoop-site.xml"); org.apache.hadoop.conf.Configuration.addDefaultResource("sqoop-site.xml");
} }
...@@ -75,7 +87,7 @@ public class SqoopHook extends SqoopJobDataPublisher { ...@@ -75,7 +87,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName)); HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName));
tableRef.set(AtlasClient.NAME, tableName.toLowerCase()); tableRef.set(AtlasClient.NAME, tableName.toLowerCase());
tableRef.set(HiveDataModelGenerator.DB, dbRef); tableRef.set(HiveMetaStoreBridge.DB, dbRef);
return tableRef; return tableRef;
} }
...@@ -94,11 +106,11 @@ public class SqoopHook extends SqoopJobDataPublisher { ...@@ -94,11 +106,11 @@ public class SqoopHook extends SqoopJobDataPublisher {
String name = getSqoopDBStoreName(data); String name = getSqoopDBStoreName(data);
storeRef.set(AtlasClient.NAME, name); storeRef.set(AtlasClient.NAME, name);
storeRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); storeRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
storeRef.set(SqoopDataModelGenerator.DB_STORE_TYPE, data.getStoreType()); storeRef.set(SqoopHook.DB_STORE_TYPE, data.getStoreType());
storeRef.set(SqoopDataModelGenerator.DB_STORE_USAGE, usage); storeRef.set(SqoopHook.DB_STORE_USAGE, usage);
storeRef.set(SqoopDataModelGenerator.STORE_URI, data.getUrl()); storeRef.set(SqoopHook.STORE_URI, data.getUrl());
storeRef.set(SqoopDataModelGenerator.SOURCE, source); storeRef.set(SqoopHook.SOURCE, source);
storeRef.set(SqoopDataModelGenerator.DESCRIPTION, ""); storeRef.set(SqoopHook.DESCRIPTION, "");
storeRef.set(AtlasClient.OWNER, data.getUser()); storeRef.set(AtlasClient.OWNER, data.getUser());
return storeRef; return storeRef;
} }
...@@ -109,24 +121,24 @@ public class SqoopHook extends SqoopJobDataPublisher { ...@@ -109,24 +121,24 @@ public class SqoopHook extends SqoopJobDataPublisher {
final String sqoopProcessName = getSqoopProcessName(data, clusterName); final String sqoopProcessName = getSqoopProcessName(data, clusterName);
procRef.set(AtlasClient.NAME, sqoopProcessName); procRef.set(AtlasClient.NAME, sqoopProcessName);
procRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sqoopProcessName); procRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sqoopProcessName);
procRef.set(SqoopDataModelGenerator.OPERATION, data.getOperation()); procRef.set(SqoopHook.OPERATION, data.getOperation());
if (isImportOperation(data)) { if (isImportOperation(data)) {
procRef.set(SqoopDataModelGenerator.INPUTS, dbStoreRef); procRef.set(SqoopHook.INPUTS, dbStoreRef);
procRef.set(SqoopDataModelGenerator.OUTPUTS, hiveTableRef); procRef.set(SqoopHook.OUTPUTS, hiveTableRef);
} else { } else {
procRef.set(SqoopDataModelGenerator.INPUTS, hiveTableRef); procRef.set(SqoopHook.INPUTS, hiveTableRef);
procRef.set(SqoopDataModelGenerator.OUTPUTS, dbStoreRef); procRef.set(SqoopHook.OUTPUTS, dbStoreRef);
} }
procRef.set(SqoopDataModelGenerator.USER, data.getUser()); procRef.set(SqoopHook.USER, data.getUser());
procRef.set(SqoopDataModelGenerator.START_TIME, new Date(data.getStartTime())); procRef.set(SqoopHook.START_TIME, new Date(data.getStartTime()));
procRef.set(SqoopDataModelGenerator.END_TIME, new Date(data.getEndTime())); procRef.set(SqoopHook.END_TIME, new Date(data.getEndTime()));
Map<String, String> sqoopOptionsMap = new HashMap<>(); Map<String, String> sqoopOptionsMap = new HashMap<>();
Properties options = data.getOptions(); Properties options = data.getOptions();
for (Object k : options.keySet()) { for (Object k : options.keySet()) {
sqoopOptionsMap.put((String)k, (String) options.get(k)); sqoopOptionsMap.put((String)k, (String) options.get(k));
} }
procRef.set(SqoopDataModelGenerator.CMD_LINE_OPTS, sqoopOptionsMap); procRef.set(SqoopHook.CMD_LINE_OPTS, sqoopOptionsMap);
return procRef; return procRef;
} }
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.sqoop.model;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.addons.ModelDefinitionDump;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.atlas.typesystem.types.AttributeDefinition;
import org.apache.atlas.typesystem.types.ClassType;
import org.apache.atlas.typesystem.types.DataTypes;
import org.apache.atlas.typesystem.types.EnumType;
import org.apache.atlas.typesystem.types.EnumTypeDefinition;
import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.StructTypeDefinition;
import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
/**
* Utility that generates Sqoop data model for both metastore entities and DDL/DML queries.
*/
public class SqoopDataModelGenerator {
private static final Logger LOG = LoggerFactory.getLogger(SqoopDataModelGenerator.class);
private final Map<String, HierarchicalTypeDefinition<ClassType>> classTypeDefinitions;
private final Map<String, EnumTypeDefinition> enumTypeDefinitionMap;
private final Map<String, StructTypeDefinition> structTypeDefinitionMap;
private static final DataTypes.MapType STRING_MAP_TYPE =
new DataTypes.MapType(DataTypes.STRING_TYPE, DataTypes.STRING_TYPE);
public static final String USER = "userName";
public static final String DB_STORE_TYPE = "dbStoreType";
public static final String DB_STORE_USAGE = "storeUse";
public static final String SOURCE = "source";
public static final String DESCRIPTION = "description";
public static final String STORE_URI = "storeUri";
public static final String OPERATION = "operation";
public static final String START_TIME = "startTime";
public static final String END_TIME = "endTime";
public static final String CMD_LINE_OPTS = "commandlineOpts";
// multiple inputs and outputs for process
public static final String INPUTS = "inputs";
public static final String OUTPUTS = "outputs";
public SqoopDataModelGenerator() {
classTypeDefinitions = new HashMap<>();
enumTypeDefinitionMap = new HashMap<>();
structTypeDefinitionMap = new HashMap<>();
}
public void createDataModel() throws AtlasException {
LOG.info("Generating the Sqoop Data Model....");
// enums
// structs
// classes
createSqoopDbStoreClass();
// DDL/DML Process
createSqoopProcessClass();
}
public TypesDef getTypesDef() {
return TypesUtil.getTypesDef(getEnumTypeDefinitions(), getStructTypeDefinitions(), getTraitTypeDefinitions(),
getClassTypeDefinitions());
}
public String getDataModelAsJSON() {
return TypesSerialization.toJson(getTypesDef());
}
public ImmutableList<EnumTypeDefinition> getEnumTypeDefinitions() {
return ImmutableList.copyOf(enumTypeDefinitionMap.values());
}
public ImmutableList<StructTypeDefinition> getStructTypeDefinitions() {
return ImmutableList.copyOf(structTypeDefinitionMap.values());
}
public ImmutableList<HierarchicalTypeDefinition<ClassType>> getClassTypeDefinitions() {
return ImmutableList.copyOf(classTypeDefinitions.values());
}
public ImmutableList<HierarchicalTypeDefinition<TraitType>> getTraitTypeDefinitions() {
return ImmutableList.of();
}
private void createSqoopDbStoreClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition(DB_STORE_TYPE,
DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, false, true, null),
new AttributeDefinition(DB_STORE_USAGE,
DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null),
new AttributeDefinition(STORE_URI,
DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null),
new AttributeDefinition(SOURCE,
DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, false, true, null)
};
HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, SqoopDataTypes.SQOOP_DBDATASTORE.getName(), null,
ImmutableSet.of(AtlasClient.DATA_SET_SUPER_TYPE), attributeDefinitions);
classTypeDefinitions.put(SqoopDataTypes.SQOOP_DBDATASTORE.getName(), definition);
LOG.debug("Created definition for " + SqoopDataTypes.SQOOP_DBDATASTORE.getName());
}
private void createSqoopProcessClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition(OPERATION,
DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, false, true, null),
new AttributeDefinition(CMD_LINE_OPTS, STRING_MAP_TYPE.getName(), Multiplicity.REQUIRED, false, null),
new AttributeDefinition(START_TIME, DataTypes.DATE_TYPE.getName(), Multiplicity.REQUIRED, false, null),
new AttributeDefinition(END_TIME, DataTypes.DATE_TYPE.getName(), Multiplicity.REQUIRED, false, null),
new AttributeDefinition(USER,
DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, false, true, null),
};
HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, SqoopDataTypes.SQOOP_PROCESS.getName(), null,
ImmutableSet.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions);
classTypeDefinitions.put(SqoopDataTypes.SQOOP_PROCESS.getName(), definition);
LOG.debug("Created definition for " + SqoopDataTypes.SQOOP_PROCESS.getName());
}
public String getModelAsJson() throws AtlasException {
createDataModel();
return getDataModelAsJSON();
}
public static void main(String[] args) throws Exception {
SqoopDataModelGenerator dataModelGenerator = new SqoopDataModelGenerator();
String modelAsJson = dataModelGenerator.getModelAsJson();
if (args.length == 1) {
ModelDefinitionDump.dumpModelToFile(args[0], modelAsJson);
return;
}
System.out.println("sqoopDataModelAsJSON = " + modelAsJson);
TypesDef typesDef = dataModelGenerator.getTypesDef();
for (EnumTypeDefinition enumType : typesDef.enumTypesAsJavaList()) {
System.out.println(String.format("%s(%s) - values %s", enumType.name, EnumType.class.getSimpleName(),
Arrays.toString(enumType.enumValues)));
}
for (HierarchicalTypeDefinition<ClassType> classType : typesDef.classTypesAsJavaList()) {
System.out.println(
String.format("%s(%s) - super types [%s] - attributes %s", classType.typeName,
ClassType.class.getSimpleName(), StringUtils.join(classType.superTypes, ","),
Arrays.toString(classType.attributeDefinitions)));
}
}
}
...@@ -18,17 +18,13 @@ ...@@ -18,17 +18,13 @@
package org.apache.atlas.sqoop.hook; package org.apache.atlas.sqoop.hook;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.sqoop.model.SqoopDataModelGenerator;
import org.apache.atlas.sqoop.model.SqoopDataTypes; import org.apache.atlas.sqoop.model.SqoopDataTypes;
import org.apache.atlas.utils.AuthenticationUtil; import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.sqoop.SqoopJobDataPublisher; import org.apache.sqoop.SqoopJobDataPublisher;
import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
...@@ -54,28 +50,6 @@ public class SqoopHookIT { ...@@ -54,28 +50,6 @@ public class SqoopHookIT {
} else { } else {
atlasClient = new AtlasClient(configuration.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT)); atlasClient = new AtlasClient(configuration.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT));
} }
registerDataModels(atlasClient);
}
private void registerDataModels(AtlasClient client) throws Exception {
// Make sure hive model exists
HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(ApplicationProperties.get(), new HiveConf(), atlasClient);
hiveMetaStoreBridge.registerHiveDataModel();
SqoopDataModelGenerator dataModelGenerator = new SqoopDataModelGenerator();
//Register sqoop data model if its not already registered
try {
client.getType(SqoopDataTypes.SQOOP_PROCESS.getName());
LOG.info("Sqoop data model is already registered!");
} catch(AtlasServiceException ase) {
if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
//Expected in case types do not exist
LOG.info("Registering Sqoop data model");
client.createType(dataModelGenerator.getModelAsJson());
} else {
throw ase;
}
}
} }
@Test @Test
......
...@@ -529,24 +529,6 @@ ...@@ -529,24 +529,6 @@
<version>1.2.1</version> <version>1.2.1</version>
<inherited>false</inherited> <inherited>false</inherited>
<executions> <executions>
<execution>
<configuration>
<mainClass>org.apache.atlas.storm.model.StormDataModelGenerator</mainClass>
<systemProperties>
<systemProperty>
<key>atlas.conf</key>
<value>${project.build.directory}/../../../typesystem/target/test-classes</value>
</systemProperty>
</systemProperties>
<arguments>
<argument>${project.build.directory}/models/storm_model.json</argument>
</arguments>
</configuration>
<phase>package</phase>
<goals>
<goal>java</goal>
</goals>
</execution>
</executions> </executions>
</plugin> </plugin>
......
...@@ -26,9 +26,7 @@ import org.apache.storm.generated.TopologyInfo; ...@@ -26,9 +26,7 @@ import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.utils.Utils; import org.apache.storm.utils.Utils;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants; import org.apache.atlas.AtlasConstants;
import org.apache.atlas.fs.model.FSDataTypes;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.storm.model.StormDataTypes; import org.apache.atlas.storm.model.StormDataTypes;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
...@@ -213,7 +211,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -213,7 +211,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
break; break;
case "HdfsBolt": case "HdfsBolt":
dataSetReferenceable = new Referenceable(FSDataTypes.HDFS_PATH().toString()); dataSetReferenceable = new Referenceable(HiveMetaStoreBridge.HDFS_PATH);
String hdfsUri = config.get("HdfsBolt.rotationActions") == null String hdfsUri = config.get("HdfsBolt.rotationActions") == null
? config.get("HdfsBolt.fileNameFormat.path") ? config.get("HdfsBolt.fileNameFormat.path")
: config.get("HdfsBolt.rotationActions"); : config.get("HdfsBolt.rotationActions");
...@@ -241,7 +239,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -241,7 +239,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
final String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(clusterName, final String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(clusterName,
databaseName, hiveTableName); databaseName, hiveTableName);
dataSetReferenceable.set(AtlasClient.NAME, hiveTableName); dataSetReferenceable.set(AtlasClient.NAME, hiveTableName);
dataSetReferenceable.set(HiveDataModelGenerator.DB, dbReferenceable); dataSetReferenceable.set(HiveMetaStoreBridge.DB, dbReferenceable);
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName); dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
break; break;
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.storm.model;
import org.apache.atlas.addons.ModelDefinitionDump;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.TypesSerialization;
import java.io.IOException;
public class StormDataModelGenerator {
public static void main(String[] args) throws IOException {
StormDataModel.main(new String[]{});
TypesDef typesDef = StormDataModel.typesDef();
String stormTypesAsJSON = TypesSerialization.toJson(typesDef);
if (args.length == 1) {
ModelDefinitionDump.dumpModelToFile(args[0], stormTypesAsJSON);
return;
}
System.out.println("stormTypesAsJSON = " + stormTypesAsJSON);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.storm.model
import org.apache.atlas.AtlasClient
import org.apache.atlas.typesystem.TypesDef
import org.apache.atlas.typesystem.builders.TypesBuilder
import org.apache.atlas.typesystem.json.TypesSerialization
/**
* This represents the data model for a storm topology.
*/
object StormDataModel extends App {
var typesDef : TypesDef = null
val typesBuilder = new TypesBuilder
import typesBuilder._
typesDef = types {
/**
* Model is represented as:
* Topology is a Process Super Type inheriting inputs/outputs
* Input DataSet(s) => Topology => Output DataSet(s)
* Also, Topology contains the Graph of Nodes
* Topology => Node(s) -> Spouts/Bolts
*/
_class(StormDataTypes.STORM_TOPOLOGY.getName, List(AtlasClient.PROCESS_SUPER_TYPE)) {
"id" ~ (string, required, indexed, unique)
"startTime" ~ date
"endTime" ~ date
"conf" ~ (map(string, string), optional)
"clusterName" ~ (string, optional, indexed)
// Nodes in the Graph
"nodes" ~ (array(StormDataTypes.STORM_NODE.getName), collection, composite)
}
// Base class for DataProducer aka Spouts and
// DataProcessor aka Bolts, also links from Topology
_class(StormDataTypes.STORM_NODE.getName) {
"name" ~ (string, required, indexed)
"description" ~ (string, optional, indexed)
// fully qualified driver java class name
"driverClass" ~ (string, required, indexed)
// spout or bolt configuration NVPs
"conf" ~ (map(string, string), optional)
}
// Data Producer and hence only outputs
_class(StormDataTypes.STORM_SPOUT.getName, List(StormDataTypes.STORM_NODE.getName)) {
// "outputs" ~ (array(StormDataTypes.STORM_NODE.getName), collection, composite)
"outputs" ~ (array(string), collection)
}
// Data Processor and hence both inputs and outputs (inherited from Spout)
_class(StormDataTypes.STORM_BOLT.getName, List(StormDataTypes.STORM_NODE.getName)) {
// "inputs" ~ (array(StormDataTypes.STORM_NODE.getName), collection, composite)
"inputs" ~ (array(string), collection)
"outputs" ~ (array(string), collection, optional)
}
// Kafka Data Set
_class(StormDataTypes.KAFKA_TOPIC.getName, List(AtlasClient.DATA_SET_SUPER_TYPE)) {
"topic" ~ (string, required, unique, indexed)
"uri" ~ (string, required)
}
// JMS Data Set
_class(StormDataTypes.JMS_TOPIC.getName, List(AtlasClient.DATA_SET_SUPER_TYPE)) {
"topic" ~ (string, required, unique, indexed)
"uri" ~ (string, required)
}
// HBase Data Set
_class(StormDataTypes.HBASE_TABLE.getName, List(AtlasClient.DATA_SET_SUPER_TYPE)) {
"uri" ~ (string, required)
}
// Hive table data set already exists in atlas.
}
// add the types to atlas
val typesAsJSON = TypesSerialization.toJson(typesDef)
println("Storm Data Model as JSON: ")
println(typesAsJSON)
}
...@@ -18,19 +18,11 @@ ...@@ -18,19 +18,11 @@
package org.apache.atlas.storm.hook; package org.apache.atlas.storm.hook;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.storm.model.StormDataModel;
import org.apache.atlas.storm.model.StormDataTypes; import org.apache.atlas.storm.model.StormDataTypes;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.atlas.utils.AuthenticationUtil; import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.storm.ILocalCluster; import org.apache.storm.ILocalCluster;
...@@ -67,37 +59,6 @@ public class StormAtlasHookIT { ...@@ -67,37 +59,6 @@ public class StormAtlasHookIT {
} else { } else {
atlasClient = new AtlasClient(configuration.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT)); atlasClient = new AtlasClient(configuration.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT));
} }
registerDataModel(new HiveDataModelGenerator());
}
private void registerDataModel(HiveDataModelGenerator dataModelGenerator) throws AtlasException,
AtlasServiceException {
try {
atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName());
LOG.info("Hive data model is already registered! Going ahead with registration of Storm Data model");
} catch(AtlasServiceException ase) {
if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
//Expected in case types do not exist
LOG.info("Registering Hive data model");
atlasClient.createType(dataModelGenerator.getModelAsJson());
} else {
throw ase;
}
}
try {
atlasClient.getType(StormDataTypes.STORM_TOPOLOGY.getName());
} catch(AtlasServiceException ase) {
if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
LOG.info("Registering Storm/Kafka data model");
StormDataModel.main(new String[]{});
TypesDef typesDef = StormDataModel.typesDef();
String stormTypesAsJSON = TypesSerialization.toJson(typesDef);
LOG.info("stormTypesAsJSON = {}", stormTypesAsJSON);
atlasClient.createType(stormTypesAsJSON);
}
}
} }
...@@ -109,23 +70,6 @@ public class StormAtlasHookIT { ...@@ -109,23 +70,6 @@ public class StormAtlasHookIT {
atlasClient = null; atlasClient = null;
} }
@Test
public void testCreateDataModel() throws Exception {
StormDataModel.main(new String[]{});
TypesDef stormTypesDef = StormDataModel.typesDef();
String stormTypesAsJSON = TypesSerialization.toJson(stormTypesDef);
LOG.info("stormTypesAsJSON = {}", stormTypesAsJSON);
registerDataModel(new HiveDataModelGenerator());
// verify types are registered
for (StormDataTypes stormDataType : StormDataTypes.values()) {
Assert.assertNotNull(atlasClient.getType(stormDataType.getName()));
}
}
@Test (dependsOnMethods = "testCreateDataModel")
public void testAddEntities() throws Exception { public void testAddEntities() throws Exception {
StormTopology stormTopology = StormTestUtil.createTestTopology(); StormTopology stormTopology = StormTestUtil.createTestTopology();
StormTestUtil.submitTopology(stormCluster, TOPOLOGY_NAME, stormTopology); StormTestUtil.submitTopology(stormCluster, TOPOLOGY_NAME, stormTopology);
......
...@@ -106,9 +106,9 @@ ...@@ -106,9 +106,9 @@
<outputDirectory>examples</outputDirectory> <outputDirectory>examples</outputDirectory>
</fileSet> </fileSet>
<!-- addons/hdfs-model --> <!-- out-of-box-models -->
<fileSet> <fileSet>
<directory>../addons/hdfs-model/target/models</directory> <directory>../addons/models</directory>
<outputDirectory>models</outputDirectory> <outputDirectory>models</outputDirectory>
</fileSet> </fileSet>
...@@ -130,55 +130,29 @@ ...@@ -130,55 +130,29 @@
<outputDirectory>hook</outputDirectory> <outputDirectory>hook</outputDirectory>
</fileSet> </fileSet>
<fileSet>
<directory>../addons/hive-bridge/target/models</directory>
<outputDirectory>models</outputDirectory>
</fileSet>
<!-- addons/falcon --> <!-- addons/falcon -->
<fileSet> <fileSet>
<directory>../addons/falcon-bridge/target/dependency/hook</directory> <directory>../addons/falcon-bridge/target/dependency/hook</directory>
<outputDirectory>hook</outputDirectory> <outputDirectory>hook</outputDirectory>
</fileSet> </fileSet>
<fileSet>
<directory>../addons/falcon-bridge/target/models</directory>
<outputDirectory>models</outputDirectory>
</fileSet>
<!-- addons/sqoop --> <!-- addons/sqoop -->
<fileSet> <fileSet>
<directory>../addons/sqoop-bridge/target/dependency/hook</directory> <directory>../addons/sqoop-bridge/target/dependency/hook</directory>
<outputDirectory>hook</outputDirectory> <outputDirectory>hook</outputDirectory>
</fileSet> </fileSet>
<fileSet>
<directory>../addons/sqoop-bridge/target/models</directory>
<outputDirectory>models</outputDirectory>
</fileSet>
<!-- addons/storm --> <!-- addons/storm -->
<fileSet> <fileSet>
<directory>../addons/storm-bridge/target/dependency/hook</directory> <directory>../addons/storm-bridge/target/dependency/hook</directory>
<outputDirectory>hook</outputDirectory> <outputDirectory>hook</outputDirectory>
</fileSet> </fileSet>
<fileSet>
<directory>../addons/storm-bridge/target/models</directory>
<outputDirectory>models</outputDirectory>
</fileSet>
<!-- for kafka topic setup --> <!-- for kafka topic setup -->
<fileSet> <fileSet>
<directory>../notification/target/dependency/hook</directory> <directory>../notification/target/dependency/hook</directory>
<outputDirectory>hook</outputDirectory> <outputDirectory>hook</outputDirectory>
</fileSet> </fileSet>
<!-- for patches -->
<fileSet>
<directory>../addons/hive-bridge/src/patches</directory>
<outputDirectory>models/patches</outputDirectory>
</fileSet>
</fileSets> </fileSets>
<files> <files>
......
...@@ -53,7 +53,9 @@ public enum AtlasErrorCode { ...@@ -53,7 +53,9 @@ public enum AtlasErrorCode {
INTERNAL_ERROR(500, "ATLAS5001E", "Internal server error {0}"), INTERNAL_ERROR(500, "ATLAS5001E", "Internal server error {0}"),
INDEX_CREATION_FAILED(500, "ATLAS5002E", "Index creation failed for {0}"), INDEX_CREATION_FAILED(500, "ATLAS5002E", "Index creation failed for {0}"),
INDEX_ROLLBACK_FAILED(500, "ATLAS5003E", "Index rollback failed for {0}") INDEX_ROLLBACK_FAILED(500, "ATLAS5003E", "Index rollback failed for {0}"),
PATCH_NOT_APPLICABLE_FOR_TYPE(500, "ATLAS5004E", "{0} - invalid patch for type {1}"),
PATCH_FOR_UNKNOWN_TYPE(500, "ATLAS5005E", "{0} - patch references unknown type {1}")
; ;
private String errorCode; private String errorCode;
......
...@@ -279,12 +279,12 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable { ...@@ -279,12 +279,12 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable {
List<AtlasConstraintDef> constraintDefs) { List<AtlasConstraintDef> constraintDefs) {
setName(name); setName(name);
setTypeName(typeName); setTypeName(typeName);
setOptional(isOptional); setIsOptional(isOptional);
setCardinality(cardinality); setCardinality(cardinality);
setValuesMinCount(valuesMinCount); setValuesMinCount(valuesMinCount);
setValuesMaxCount(valuesMaxCount); setValuesMaxCount(valuesMaxCount);
setUnique(isUnique); setIsUnique(isUnique);
setIndexable(isIndexable); setIsIndexable(isIndexable);
setConstraintDefs(constraintDefs); setConstraintDefs(constraintDefs);
} }
...@@ -292,12 +292,12 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable { ...@@ -292,12 +292,12 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable {
if (other != null) { if (other != null) {
setName(other.getName()); setName(other.getName());
setTypeName(other.getTypeName()); setTypeName(other.getTypeName());
setOptional(other.isOptional()); setIsOptional(other.getIsOptional());
setCardinality(other.getCardinality()); setCardinality(other.getCardinality());
setValuesMinCount(other.getValuesMinCount()); setValuesMinCount(other.getValuesMinCount());
setValuesMaxCount(other.getValuesMaxCount()); setValuesMaxCount(other.getValuesMaxCount());
setUnique(other.isUnique()); setIsUnique(other.getIsUnique());
setIndexable(other.isIndexable()); setIsIndexable(other.getIsIndexable());
setConstraintDefs(other.getConstraintDefs()); setConstraintDefs(other.getConstraintDefs());
} }
} }
...@@ -318,11 +318,11 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable { ...@@ -318,11 +318,11 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable {
this.typeName = typeName; this.typeName = typeName;
} }
public boolean isOptional() { public boolean getIsOptional() {
return isOptional; return isOptional;
} }
public void setOptional(boolean optional) { isOptional = optional; } public void setIsOptional(boolean optional) { isOptional = optional; }
public void setCardinality(Cardinality cardinality) { public void setCardinality(Cardinality cardinality) {
this.cardinality = cardinality; this.cardinality = cardinality;
...@@ -348,19 +348,19 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable { ...@@ -348,19 +348,19 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable {
this.valuesMaxCount = valuesMaxCount; this.valuesMaxCount = valuesMaxCount;
} }
public boolean isUnique() { public boolean getIsUnique() {
return isUnique; return isUnique;
} }
public void setUnique(boolean unique) { public void setIsUnique(boolean unique) {
isUnique = unique; isUnique = unique;
} }
public boolean isIndexable() { public boolean getIsIndexable() {
return isIndexable; return isIndexable;
} }
public void setIndexable(boolean idexable) { public void setIsIndexable(boolean idexable) {
isIndexable = idexable; isIndexable = idexable;
} }
...@@ -399,7 +399,7 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable { ...@@ -399,7 +399,7 @@ public class AtlasStructDef extends AtlasBaseTypeDef implements Serializable {
sb.append("AtlasAttributeDef{"); sb.append("AtlasAttributeDef{");
sb.append("name='").append(name).append('\''); sb.append("name='").append(name).append('\'');
sb.append(", typeName='").append(typeName).append('\''); sb.append(", typeName='").append(typeName).append('\'');
sb.append(", isOptional=").append(isOptional); sb.append(", getIsOptional=").append(isOptional);
sb.append(", cardinality=").append(cardinality); sb.append(", cardinality=").append(cardinality);
sb.append(", valuesMinCount=").append(valuesMinCount); sb.append(", valuesMinCount=").append(valuesMinCount);
sb.append(", valuesMaxCount=").append(valuesMaxCount); sb.append(", valuesMaxCount=").append(valuesMaxCount);
......
...@@ -19,6 +19,7 @@ package org.apache.atlas.model.typedef; ...@@ -19,6 +19,7 @@ package org.apache.atlas.model.typedef;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.codehaus.jackson.annotate.JsonAutoDetect; import org.codehaus.jackson.annotate.JsonAutoDetect;
import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.annotate.JsonIgnoreProperties; import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize; import org.codehaus.jackson.map.annotate.JsonSerialize;
...@@ -91,6 +92,7 @@ public class AtlasTypesDef { ...@@ -91,6 +92,7 @@ public class AtlasTypesDef {
this.entityDefs = entityDefs; this.entityDefs = entityDefs;
} }
@JsonIgnore
public boolean isEmpty() { public boolean isEmpty() {
return CollectionUtils.isEmpty(enumDefs) && return CollectionUtils.isEmpty(enumDefs) &&
CollectionUtils.isEmpty(structDefs) && CollectionUtils.isEmpty(structDefs) &&
......
...@@ -198,7 +198,7 @@ public class AtlasStructType extends AtlasType { ...@@ -198,7 +198,7 @@ public class AtlasStructType extends AtlasType {
if (value != null) { if (value != null) {
ret = dataType.validateValue(value, fieldName, messages) && ret; ret = dataType.validateValue(value, fieldName, messages) && ret;
} else if (!attributeDef.isOptional()) { } else if (!attributeDef.getIsOptional()) {
ret = false; ret = false;
messages.add(fieldName + ": mandatory attribute value missing in type " + getTypeName()); messages.add(fieldName + ": mandatory attribute value missing in type " + getTypeName());
...@@ -218,7 +218,7 @@ public class AtlasStructType extends AtlasType { ...@@ -218,7 +218,7 @@ public class AtlasStructType extends AtlasType {
if (value != null) { if (value != null) {
ret = dataType.validateValue(value, fieldName, messages) && ret; ret = dataType.validateValue(value, fieldName, messages) && ret;
} else if (!attributeDef.isOptional()) { } else if (!attributeDef.getIsOptional()) {
ret = false; ret = false;
messages.add(fieldName + ": mandatory attribute value missing in type " + getTypeName()); messages.add(fieldName + ": mandatory attribute value missing in type " + getTypeName());
...@@ -244,7 +244,7 @@ public class AtlasStructType extends AtlasType { ...@@ -244,7 +244,7 @@ public class AtlasStructType extends AtlasType {
Object attributeValue = getNormalizedValue(obj.getAttribute(attributeName), attributeDef); Object attributeValue = getNormalizedValue(obj.getAttribute(attributeName), attributeDef);
obj.setAttribute(attributeName, attributeValue); obj.setAttribute(attributeName, attributeValue);
} else if (!attributeDef.isOptional()) { } else if (!attributeDef.getIsOptional()) {
obj.setAttribute(attributeName, createDefaultValue(attributeDef)); obj.setAttribute(attributeName, createDefaultValue(attributeDef));
} }
} }
...@@ -260,7 +260,7 @@ public class AtlasStructType extends AtlasType { ...@@ -260,7 +260,7 @@ public class AtlasStructType extends AtlasType {
Object attributeValue = getNormalizedValue(obj.get(attributeName), attributeDef); Object attributeValue = getNormalizedValue(obj.get(attributeName), attributeDef);
obj.put(attributeName, attributeValue); obj.put(attributeName, attributeValue);
} else if (!attributeDef.isOptional()) { } else if (!attributeDef.getIsOptional()) {
obj.put(attributeName, createDefaultValue(attributeDef)); obj.put(attributeName, createDefaultValue(attributeDef));
} }
} }
...@@ -276,7 +276,7 @@ public class AtlasStructType extends AtlasType { ...@@ -276,7 +276,7 @@ public class AtlasStructType extends AtlasType {
} }
for (AtlasAttributeDef attributeDef : structDef.getAttributeDefs()) { for (AtlasAttributeDef attributeDef : structDef.getAttributeDefs()) {
if (!attributeDef.isOptional()) { if (!attributeDef.getIsOptional()) {
attributes.put(attributeDef.getName(), createDefaultValue(attributeDef)); attributes.put(attributeDef.getName(), createDefaultValue(attributeDef));
} }
} }
...@@ -310,7 +310,7 @@ public class AtlasStructType extends AtlasType { ...@@ -310,7 +310,7 @@ public class AtlasStructType extends AtlasType {
ret = false; // invalid value ret = false; // invalid value
} }
} }
} else if (!attributeDef.isOptional()) { } else if (!attributeDef.getIsOptional()) {
ret = false; // mandatory attribute not present ret = false; // mandatory attribute not present
} }
...@@ -322,7 +322,7 @@ public class AtlasStructType extends AtlasType { ...@@ -322,7 +322,7 @@ public class AtlasStructType extends AtlasType {
if (attrType != null) { if (attrType != null) {
if (value == null) { if (value == null) {
if (!attributeDef.isOptional()) { if (!attributeDef.getIsOptional()) {
return attrType.createDefaultValue(); return attrType.createDefaultValue();
} }
} else { } else {
......
...@@ -64,6 +64,10 @@ public class AtlasTypeRegistry { ...@@ -64,6 +64,10 @@ public class AtlasTypeRegistry {
public Collection<String> getAllTypeNames() { return registryData.allTypes.getAllTypeNames(); } public Collection<String> getAllTypeNames() { return registryData.allTypes.getAllTypeNames(); }
public boolean isRegisteredType(String typeName) {
return registryData.allTypes.isKnownType(typeName);
}
public AtlasType getType(String typeName) throws AtlasBaseException { public AtlasType getType(String typeName) throws AtlasBaseException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("==> AtlasTypeRegistry.getType({})", typeName); LOG.debug("==> AtlasTypeRegistry.getType({})", typeName);
...@@ -677,6 +681,10 @@ class TypeCache { ...@@ -677,6 +681,10 @@ class TypeCache {
} }
} }
public boolean isKnownType(String typeName) {
return typeNameMap.containsKey(typeName);
}
public Collection<String> getAllTypeNames() { public Collection<String> getAllTypeNames() {
return Collections.unmodifiableCollection(typeNameMap.keySet()); return Collections.unmodifiableCollection(typeNameMap.keySet());
} }
......
...@@ -44,9 +44,7 @@ import org.apache.atlas.repository.typestore.GraphBackedTypeStore; ...@@ -44,9 +44,7 @@ import org.apache.atlas.repository.typestore.GraphBackedTypeStore;
import org.apache.atlas.repository.typestore.ITypeStore; import org.apache.atlas.repository.typestore.ITypeStore;
import org.apache.atlas.service.Service; import org.apache.atlas.service.Service;
import org.apache.atlas.services.DefaultMetadataService; import org.apache.atlas.services.DefaultMetadataService;
import org.apache.atlas.services.IBootstrapTypesRegistrar;
import org.apache.atlas.services.MetadataService; import org.apache.atlas.services.MetadataService;
import org.apache.atlas.services.ReservedTypesRegistrar;
import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.typesystem.types.TypeSystem; import org.apache.atlas.typesystem.types.TypeSystem;
...@@ -92,8 +90,6 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule { ...@@ -92,8 +90,6 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
// bind the MetadataService interface to an implementation // bind the MetadataService interface to an implementation
bind(MetadataService.class).to(DefaultMetadataService.class).asEagerSingleton(); bind(MetadataService.class).to(DefaultMetadataService.class).asEagerSingleton();
bind(IBootstrapTypesRegistrar.class).to(ReservedTypesRegistrar.class);
// bind the DiscoveryService interface to an implementation // bind the DiscoveryService interface to an implementation
bind(DiscoveryService.class).to(GraphBackedDiscoveryService.class).asEagerSingleton(); bind(DiscoveryService.class).to(GraphBackedDiscoveryService.class).asEagerSingleton();
......
...@@ -267,8 +267,8 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang ...@@ -267,8 +267,8 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
AtlasAttributeDef attributeDef) { AtlasAttributeDef attributeDef) {
final String propertyName = GraphHelper.encodePropertyKey(typeName + "." + attributeDef.getName()); final String propertyName = GraphHelper.encodePropertyKey(typeName + "." + attributeDef.getName());
AtlasCardinality cardinality = toAtlasCardinality(attributeDef.getCardinality()); AtlasCardinality cardinality = toAtlasCardinality(attributeDef.getCardinality());
boolean isUnique = attributeDef.isUnique(); boolean isUnique = attributeDef.getIsUnique();
boolean isIndexable = attributeDef.isIndexable(); boolean isIndexable = attributeDef.getIsIndexable();
String attribTypeName = attributeDef.getTypeName(); String attribTypeName = attributeDef.getTypeName();
boolean isBuiltInType = AtlasTypeUtil.isBuiltInType(attribTypeName); boolean isBuiltInType = AtlasTypeUtil.isBuiltInType(attribTypeName);
boolean isArrayType = AtlasTypeUtil.isArrayType(attribTypeName); boolean isArrayType = AtlasTypeUtil.isArrayType(attribTypeName);
......
...@@ -35,6 +35,7 @@ import org.apache.atlas.model.typedef.AtlasEnumDef.AtlasEnumDefs; ...@@ -35,6 +35,7 @@ import org.apache.atlas.model.typedef.AtlasEnumDef.AtlasEnumDefs;
import org.apache.atlas.model.typedef.AtlasStructDef; import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasStructDefs; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasStructDefs;
import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
import org.apache.atlas.repository.util.FilterUtil; import org.apache.atlas.repository.util.FilterUtil;
import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
...@@ -45,6 +46,7 @@ import org.apache.commons.lang3.StringUtils; ...@@ -45,6 +46,7 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
...@@ -89,6 +91,8 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -89,6 +91,8 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
ttr.addTypes(typesDef); ttr.addTypes(typesDef);
typeRegistry.commitTransientTypeRegistry(ttr); typeRegistry.commitTransientTypeRegistry(ttr);
bootstrapTypes();
} }
@Override @Override
...@@ -886,6 +890,15 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -886,6 +890,15 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
LOG.info("Not reacting to a Passive state change"); LOG.info("Not reacting to a Passive state change");
} }
private void bootstrapTypes() {
AtlasTypeDefStoreInitializer storeInitializer = new AtlasTypeDefStoreInitializer();
String atlasHomeDir = System.getProperty("atlas.home");
String typesDirName = (StringUtils.isEmpty(atlasHomeDir) ? "." : atlasHomeDir) + File.separator + "models";
storeInitializer.initializeStore(this, typeRegistry, typesDirName);
}
private void updateTypeRegistryPostCommit(AtlasTransientTypeRegistry ttr) { private void updateTypeRegistryPostCommit(AtlasTransientTypeRegistry ttr) {
new TypeRegistryUpdateHook(ttr); new TypeRegistryUpdateHook(ttr);
} }
......
...@@ -421,7 +421,7 @@ public class AtlasStructDefStoreV1 extends AtlasAbstractDefStoreV1 implements At ...@@ -421,7 +421,7 @@ public class AtlasStructDefStoreV1 extends AtlasAbstractDefStoreV1 implements At
for (AtlasAttributeDef attributeDef : structDef.getAttributeDefs()) { for (AtlasAttributeDef attributeDef : structDef.getAttributeDefs()) {
if (CollectionUtils.isEmpty(currAttrNames) || !currAttrNames.contains(attributeDef.getName())) { if (CollectionUtils.isEmpty(currAttrNames) || !currAttrNames.contains(attributeDef.getName())) {
// new attribute - only allow if optional // new attribute - only allow if optional
if (!attributeDef.isOptional()) { if (!attributeDef.getIsOptional()) {
throw new AtlasBaseException(AtlasErrorCode.CANNOT_ADD_MANDATORY_ATTRIBUTE, structDef.getName(), attributeDef.getName()); throw new AtlasBaseException(AtlasErrorCode.CANNOT_ADD_MANDATORY_ATTRIBUTE, structDef.getName(), attributeDef.getName());
} }
} }
...@@ -510,13 +510,30 @@ public class AtlasStructDefStoreV1 extends AtlasAbstractDefStoreV1 implements At ...@@ -510,13 +510,30 @@ public class AtlasStructDefStoreV1 extends AtlasAbstractDefStoreV1 implements At
attribInfo.put("name", attributeDef.getName()); attribInfo.put("name", attributeDef.getName());
attribInfo.put("dataType", attributeDef.getTypeName()); attribInfo.put("dataType", attributeDef.getTypeName());
attribInfo.put("isUnique", attributeDef.isUnique()); attribInfo.put("isUnique", attributeDef.getIsUnique());
attribInfo.put("isIndexable", attributeDef.isIndexable()); attribInfo.put("isIndexable", attributeDef.getIsIndexable());
attribInfo.put("isComposite", isComposite); attribInfo.put("isComposite", isComposite);
attribInfo.put("reverseAttributeName", reverseAttribName); attribInfo.put("reverseAttributeName", reverseAttribName);
final int lower;
final int upper;
if (attributeDef.getCardinality() == AtlasAttributeDef.Cardinality.SINGLE) {
lower = attributeDef.getIsOptional() ? 0 : 1;
upper = 1;
} else {
if(attributeDef.getIsOptional()) {
lower = 0;
} else {
lower = attributeDef.getValuesMinCount() < 1 ? 1 : attributeDef.getValuesMinCount();
}
upper = attributeDef.getValuesMaxCount() < 2 ? Integer.MAX_VALUE : attributeDef.getValuesMaxCount();
}
Map<String, Object> multiplicity = new HashMap<>(); Map<String, Object> multiplicity = new HashMap<>();
multiplicity.put("lower", attributeDef.getValuesMinCount()); multiplicity.put("lower", lower);
multiplicity.put("upper", attributeDef.getValuesMaxCount()); multiplicity.put("upper", upper);
multiplicity.put("isUnique", AtlasAttributeDef.Cardinality.SET.equals(attributeDef.getCardinality())); multiplicity.put("isUnique", AtlasAttributeDef.Cardinality.SET.equals(attributeDef.getCardinality()));
attribInfo.put("multiplicity", AtlasType.toJson(multiplicity)); attribInfo.put("multiplicity", AtlasType.toJson(multiplicity));
...@@ -532,8 +549,8 @@ public class AtlasStructDefStoreV1 extends AtlasAbstractDefStoreV1 implements At ...@@ -532,8 +549,8 @@ public class AtlasStructDefStoreV1 extends AtlasAbstractDefStoreV1 implements At
ret.setName((String) attribInfo.get("name")); ret.setName((String) attribInfo.get("name"));
ret.setTypeName((String) attribInfo.get("dataType")); ret.setTypeName((String) attribInfo.get("dataType"));
ret.setUnique((Boolean) attribInfo.get("isUnique")); ret.setIsUnique((Boolean) attribInfo.get("isUnique"));
ret.setIndexable((Boolean) attribInfo.get("isIndexable")); ret.setIsIndexable((Boolean) attribInfo.get("isIndexable"));
String attrTypeName = ret.getTypeName(); String attrTypeName = ret.getTypeName();
...@@ -608,10 +625,10 @@ public class AtlasStructDefStoreV1 extends AtlasAbstractDefStoreV1 implements At ...@@ -608,10 +625,10 @@ public class AtlasStructDefStoreV1 extends AtlasAbstractDefStoreV1 implements At
Boolean isUnique = (Boolean) multiplicity.get("isUnique"); Boolean isUnique = (Boolean) multiplicity.get("isUnique");
if (minCount == null || minCount.intValue() == 0) { if (minCount == null || minCount.intValue() == 0) {
ret.setOptional(true); ret.setIsOptional(true);
ret.setValuesMinCount(0); ret.setValuesMinCount(0);
} else { } else {
ret.setOptional(false); ret.setIsOptional(false);
ret.setValuesMinCount(minCount.intValue()); ret.setValuesMinCount(minCount.intValue());
} }
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.services;
import com.google.gson.FieldNamingPolicy;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
import com.google.gson.JsonSyntaxException;
import org.apache.atlas.services.AtlasTypePatch.PatchContent;
import org.apache.atlas.services.AtlasTypePatch.PatchData;
import org.apache.atlas.services.AtlasTypePatch.PatchResult;
import org.apache.atlas.services.AtlasTypePatch.PatchStatus;
import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.typesystem.types.TypeUpdateException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class AtlasPatchHandler {
private static final Logger LOG = LoggerFactory.getLogger(AtlasPatchHandler.class);
public static void handlePatches(DefaultMetadataService metadataService, TypeSystem typeSystem) throws TypeUpdateException {
Map<String, AtlasTypePatch> patchHandlerMap = initializePatchHandlerMap(metadataService, typeSystem);
if (patchHandlerMap == null || patchHandlerMap.isEmpty())
return;
String patchDirName = ReservedTypesRegistrar.getTypesDir() + File.separator + "patches";
LOG.info("Checking for any patches to be applied to the type system in " + patchDirName);
File patchDir = new File(patchDirName);
if (!patchDir.exists()) {
LOG.info("Patch directory {} doesn't exist, not applying any patches", patchDirName);
return;
}
File[] patchFiles = patchDir.listFiles();
if (patchFiles == null || patchFiles.length == 0) {
LOG.info("No patch files found in {}, not applying any patches", patchDirName);
return;
}
// Sort the patch files based on file name.
Arrays.sort(patchFiles, new Comparator<File>() {
public int compare(File f1, File f2) {
return String.valueOf(f1.getName()).compareTo(f2.getName());
}
});
LOG.info("Found " + patchFiles.length + " patch files to process.");
int patchNumber = 0;
Gson gson = initializeGson();
AtlasTypePatch typePatch;
for (File patchFile : patchFiles) {
try {
LOG.info("Processing patch file " + (++patchNumber) + " - " + patchFile.getAbsolutePath());
String content = new String(Files.readAllBytes(patchFile.toPath()), StandardCharsets.UTF_8);
PatchContent patchContent = gson.fromJson(content, PatchContent.class);
PatchData[] patchDatas = patchContent.getPatches();
PatchResult result;
int patchCounter = 0;
for (PatchData patch : patchDatas) {
typePatch = patchHandlerMap.get(patch.getAction());
if (typePatch != null) {
result = typePatch.applyPatch(patch);
if (result != null) {
LOG.info(result.getMessage() + " Patch " + (++patchCounter) + " out of " + patchDatas.length + " processed in : " + patchFile.toPath());
if (result.getStatus().equals(PatchStatus.FAILED)) {
throw new TypeUpdateException(result.getMessage() + " patch " + patchNumber + " failed in :" + patchFile.getAbsolutePath());
}
}
}
}
} catch (IOException e) {
throw new TypeUpdateException("Unable to read patch file from " + patchFile.getAbsolutePath());
} catch (JsonSyntaxException e) {
throw new TypeUpdateException("Invalid non-parseable JSON patch file in " + patchFile.getAbsolutePath());
}
}
LOG.info("Processed " + patchFiles.length + " patch files.");
}
private static Map<String, AtlasTypePatch> initializePatchHandlerMap(DefaultMetadataService metadataService, TypeSystem typeSystem) {
Map<String, AtlasTypePatch> patchHandlerMap = new HashMap<String, AtlasTypePatch>();
List<AtlasTypePatch> patchers = new ArrayList<AtlasTypePatch>();
// Register new patch classes here
patchers.add(new AtlasTypeAttributePatch(metadataService, typeSystem));
for (AtlasTypePatch patcher : patchers) {
for (String action : patcher.getSupportedActions()) {
patchHandlerMap.put(action, patcher);
}
}
return patchHandlerMap;
}
public static Gson initializeGson() {
GsonBuilder gsonBuilder = new GsonBuilder();
gsonBuilder.registerTypeAdapter(Multiplicity.class, new MultiplicityDeserializer());
gsonBuilder.setFieldNamingPolicy(FieldNamingPolicy.IDENTITY);
Gson gson = gsonBuilder.create();
return gson;
}
static class MultiplicityDeserializer implements JsonDeserializer<Multiplicity> {
@Override
public Multiplicity deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context)
throws JsonParseException {
String multiplicityString = json.getAsString().toLowerCase();
Multiplicity m = null;
switch (multiplicityString) {
case "optional":
m = Multiplicity.OPTIONAL;
break;
case "required":
m = Multiplicity.REQUIRED;
break;
case "collection":
m = Multiplicity.COLLECTION;
break;
case "set":
m = Multiplicity.SET;
break;
default:
break;
}
return m;
}
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.services;
import org.apache.atlas.typesystem.types.AttributeDefinition;
import org.apache.atlas.typesystem.types.TypeSystem;
import java.util.Map;
public abstract class AtlasTypePatch {
protected final TypeSystem typeSystem;
protected final DefaultMetadataService metadataService;
protected final String[] supportedActions;
protected AtlasTypePatch(DefaultMetadataService metadataService, TypeSystem typeSystem, String[] supportedActions) {
this.metadataService = metadataService;
this.typeSystem = typeSystem;
this.supportedActions = supportedActions;
}
public final String[] getSupportedActions() { return supportedActions; }
public abstract PatchResult applyPatch(PatchData patch);
public enum PatchStatus { SUCCESS, FAILED, SKIPPED }
public class PatchResult {
private String message;
private PatchStatus status;
public PatchResult(String message, PatchStatus status) {
this.message = message;
this.status = status;
}
public String getMessage() { return message; }
public void setMessage(String message) { this.message = message; }
public PatchStatus getStatus() { return status; }
public void setStatus(PatchStatus status) { this.status = status; }
}
/**
* A class to capture patch content.
*/
public class PatchContent {
private PatchData[] patches;
public PatchData[] getPatches() {
return patches;
}
}
public static class PatchData {
private String action;
private String typeName;
private String applyToVersion;
private String updateToVersion;
private Map<String, String> params;
private AttributeDefinition[] attributeDefinitions;
public PatchData(String action, String typeName, String applyToVersion, String updateToVersion, Map<String, String> params, AttributeDefinition[] attributeDefinitions) {
this.action = action;
this.typeName = typeName;
this.applyToVersion = applyToVersion;
this.updateToVersion = updateToVersion;
this.params = params;
this.attributeDefinitions = attributeDefinitions;
}
public String getAction() { return action; }
public String getTypeName() { return typeName; }
public String getApplyToVersion() { return applyToVersion; }
public String getUpdateToVersion() { return updateToVersion; }
public Map<String, String> getParams() { return params; }
public AttributeDefinition[] getAttributeDefinitions() { return attributeDefinitions; }
}
}
...@@ -20,7 +20,6 @@ package org.apache.atlas.services; ...@@ -20,7 +20,6 @@ package org.apache.atlas.services;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Provider; import com.google.inject.Provider;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
...@@ -29,7 +28,6 @@ import org.apache.atlas.AtlasErrorCode; ...@@ -29,7 +28,6 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.RequestContext; import org.apache.atlas.RequestContext;
import org.apache.atlas.classification.InterfaceAudience;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.listener.ActiveStateChangeHandler;
...@@ -54,7 +52,6 @@ import org.apache.atlas.typesystem.json.InstanceSerialization; ...@@ -54,7 +52,6 @@ import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.typesystem.json.TypesSerialization; import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.atlas.typesystem.persistence.Id; import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.typesystem.persistence.ReferenceableInstance; import org.apache.atlas.typesystem.persistence.ReferenceableInstance;
import org.apache.atlas.typesystem.types.AttributeDefinition;
import org.apache.atlas.typesystem.types.AttributeInfo; import org.apache.atlas.typesystem.types.AttributeInfo;
import org.apache.atlas.typesystem.types.ClassType; import org.apache.atlas.typesystem.types.ClassType;
import org.apache.atlas.typesystem.types.DataTypes; import org.apache.atlas.typesystem.types.DataTypes;
...@@ -83,9 +80,6 @@ import java.util.Map; ...@@ -83,9 +80,6 @@ import java.util.Map;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Singleton; import javax.inject.Singleton;
import static org.apache.atlas.AtlasClient.PROCESS_ATTRIBUTE_INPUTS;
import static org.apache.atlas.AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS;
/** /**
...@@ -103,35 +97,29 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang ...@@ -103,35 +97,29 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
private final TypeSystem typeSystem; private final TypeSystem typeSystem;
private final MetadataRepository repository; private final MetadataRepository repository;
private final ITypeStore typeStore; private final ITypeStore typeStore;
private IBootstrapTypesRegistrar typesRegistrar;
private final Collection<TypesChangeListener> typeChangeListeners = new LinkedHashSet<>(); private final Collection<TypesChangeListener> typeChangeListeners = new LinkedHashSet<>();
private final Collection<EntityChangeListener> entityChangeListeners = new LinkedHashSet<>(); private final Collection<EntityChangeListener> entityChangeListeners = new LinkedHashSet<>();
private boolean wasInitialized = false;
@Inject @Inject
private EntityAuditRepository auditRepository; private EntityAuditRepository auditRepository;
@Inject @Inject
DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore, DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore,
final IBootstrapTypesRegistrar typesRegistrar,
final Collection<Provider<TypesChangeListener>> typeListenerProviders, final Collection<Provider<TypesChangeListener>> typeListenerProviders,
final Collection<Provider<EntityChangeListener>> entityListenerProviders, TypeCache typeCache) final Collection<Provider<EntityChangeListener>> entityListenerProviders, TypeCache typeCache)
throws AtlasException { throws AtlasException {
this(repository, typeStore, typesRegistrar, typeListenerProviders, entityListenerProviders, this(repository, typeStore, typeListenerProviders, entityListenerProviders,
TypeSystem.getInstance(), ApplicationProperties.get(), typeCache); TypeSystem.getInstance(), ApplicationProperties.get(), typeCache);
} }
//for testing only //for testing only
public DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore, public DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore,
final IBootstrapTypesRegistrar typesRegistrar,
final Collection<Provider<TypesChangeListener>> typeListenerProviders, final Collection<Provider<TypesChangeListener>> typeListenerProviders,
final Collection<Provider<EntityChangeListener>> entityListenerProviders, final Collection<Provider<EntityChangeListener>> entityListenerProviders,
final TypeSystem typeSystem, final TypeSystem typeSystem,
final Configuration configuration, TypeCache typeCache) throws AtlasException { final Configuration configuration, TypeCache typeCache) throws AtlasException {
this.typeStore = typeStore; this.typeStore = typeStore;
this.typesRegistrar = typesRegistrar;
this.typeSystem = typeSystem; this.typeSystem = typeSystem;
/** /**
* Ideally a TypeCache implementation should have been injected in the TypeSystemProvider, * Ideally a TypeCache implementation should have been injected in the TypeSystemProvider,
...@@ -163,71 +151,21 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang ...@@ -163,71 +151,21 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
private void restoreTypeSystem() throws AtlasException { private void restoreTypeSystem() throws AtlasException {
LOG.info("Restoring type system from the store"); LOG.info("Restoring type system from the store");
TypesDef typesDef = typeStore.restore(); TypesDef typesDef = typeStore.restore();
if (!wasInitialized) {
LOG.info("Initializing type system for the first time.");
typeSystem.defineTypes(typesDef);
// restore types before creating super types
createSuperTypes();
typesRegistrar.registerTypes(ReservedTypesRegistrar.getTypesDir(), typeSystem, this);
wasInitialized = true;
} else {
LOG.info("Type system was already initialized, refreshing cache.");
refreshCache(typesDef); refreshCache(typesDef);
}
LOG.info("Restored type system from the store"); LOG.info("Restored type system from the store");
} }
private void refreshCache(TypesDef typesDef) throws AtlasException { private void refreshCache(TypesDef typesDef) throws AtlasException {
TypeSystem.TransientTypeSystem transientTypeSystem if (typesDef != null && !typesDef.isEmpty()) {
= typeSystem.createTransientTypeSystem(typesDef, true); TypeSystem.TransientTypeSystem transientTypeSystem = typeSystem.createTransientTypeSystem(typesDef, true);
Map<String, IDataType> typesAdded = transientTypeSystem.getTypesAdded(); Map<String, IDataType> typesAdded = transientTypeSystem.getTypesAdded();
LOG.info("Number of types got from transient type system: " + typesAdded.size()); LOG.info("Number of types got from transient type system: " + typesAdded.size());
typeSystem.commitTypes(typesAdded); typeSystem.commitTypes(typesAdded);
} }
@InterfaceAudience.Private
private void createSuperTypes() throws AtlasException {
HierarchicalTypeDefinition<ClassType> referenceableType = TypesUtil
.createClassTypeDef(AtlasClient.REFERENCEABLE_SUPER_TYPE, ImmutableSet.<String>of(),
new AttributeDefinition(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, true, true, null));
createType(referenceableType);
HierarchicalTypeDefinition<ClassType> assetType = TypesUtil
.createClassTypeDef(AtlasClient.ASSET_TYPE, ImmutableSet.<String>of(),
new AttributeDefinition(AtlasClient.NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, false, true, null),
TypesUtil.createOptionalAttrDef(AtlasClient.DESCRIPTION, DataTypes.STRING_TYPE),
new AttributeDefinition(AtlasClient.OWNER, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, false, true, null));
createType(assetType);
HierarchicalTypeDefinition<ClassType> infraType = TypesUtil
.createClassTypeDef(AtlasClient.INFRASTRUCTURE_SUPER_TYPE,
ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE, AtlasClient.ASSET_TYPE));
createType(infraType);
HierarchicalTypeDefinition<ClassType> datasetType = TypesUtil
.createClassTypeDef(AtlasClient.DATA_SET_SUPER_TYPE,
ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE, AtlasClient.ASSET_TYPE));
createType(datasetType);
HierarchicalTypeDefinition<ClassType> processType = TypesUtil
.createClassTypeDef(AtlasClient.PROCESS_SUPER_TYPE,
ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE, AtlasClient.ASSET_TYPE),
new AttributeDefinition(PROCESS_ATTRIBUTE_INPUTS, DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition(PROCESS_ATTRIBUTE_OUTPUTS, DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE),
Multiplicity.OPTIONAL, false, null));
createType(processType);
}
private void createType(HierarchicalTypeDefinition<ClassType> type) throws AtlasException {
if (!typeSystem.isRegistered(type.typeName)) {
TypesDef typesDef = TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
ImmutableList.<HierarchicalTypeDefinition<TraitType>>of(),
ImmutableList.of(type));
createType(TypesSerialization.toJson(typesDef));
}
} }
/** /**
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.services;
import org.apache.atlas.AtlasException;
import org.apache.atlas.typesystem.types.TypeSystem;
public interface IBootstrapTypesRegistrar {
void registerTypes(String typesDirName, TypeSystem typeSystem, MetadataService metadataService)
throws AtlasException;
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.services;
import com.google.common.collect.ImmutableList;
import org.apache.atlas.AtlasException;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.atlas.typesystem.types.*;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
public class ReservedTypesRegistrar implements IBootstrapTypesRegistrar {
private static final Logger LOG = LoggerFactory.getLogger(ReservedTypesRegistrar.class);
static String getTypesDir() {
return System.getProperty("atlas.home")+ File.separator+"models";
}
@Override
public void registerTypes(String typesDirName, TypeSystem typeSystem, MetadataService metadataService)
throws AtlasException {
File typesDir = new File(typesDirName);
if (!typesDir.exists()) {
LOG.info("No types directory {} found - not registering any reserved types", typesDirName);
return;
}
File[] typeDefFiles = typesDir.listFiles();
//TODO - Enforce a dependency order among models registered by definition and not by modifiedTime as below
// Workaround - Sort by modifiedTime to get the dependency of models in the right order - first hdfs, followed by hive and hive is needed by storm, falcon models.
// Sorting them by time will ensure the right order since the modules are in the correct order in pom.
Arrays.sort(typeDefFiles, new Comparator<File>() {
public int compare(File f1, File f2) {
return Long.valueOf(f1.lastModified()).compareTo(f2.lastModified());
}
});
for (File typeDefFile : typeDefFiles) {
try {
if (typeDefFile.isFile()) {
String typeDefJSON = new String(Files.readAllBytes(typeDefFile.toPath()), StandardCharsets.UTF_8);
registerType(typeSystem, metadataService, typeDefFile.getAbsolutePath(), typeDefJSON);
}
} catch (IOException e) {
LOG.error("error while registering types in file " + typeDefFile.getAbsolutePath(), e);
} catch (AtlasException e) {
LOG.error("error while registering types in file " + typeDefFile.getAbsolutePath(), e);
throw e;
}
}
}
void registerType(TypeSystem typeSystem, MetadataService metadataService, String typeDefName, String typeDefJSON)
throws AtlasException {
TypesDef typesDef = null;
try {
typesDef = TypesSerialization.fromJson(typeDefJSON);
} catch (Exception e) {
LOG.error("Error while deserializing JSON in {}", typeDefName);
throw new ReservedTypesRegistrationException("Error while deserializing JSON in " + typeDefName, e);
}
List<HierarchicalTypeDefinition<ClassType>> createClassDefList = new ArrayList<>();
List<HierarchicalTypeDefinition<TraitType>> createTraitDefList = new ArrayList<>();
List<EnumTypeDefinition> createEnumDefList = new ArrayList<>();
List<StructTypeDefinition> createStructDefList = new ArrayList<>();
for(HierarchicalTypeDefinition<ClassType> classTypeDef:typesDef.classTypesAsJavaList()){
if(!typeSystem.isRegistered(classTypeDef.typeName)){
LOG.debug("ClassType {} is not registered. Adding to create type list", classTypeDef.typeName);
createClassDefList.add(classTypeDef);
}
}
for(HierarchicalTypeDefinition<TraitType> traitTypeDef:typesDef.traitTypesAsJavaList()){
if(!typeSystem.isRegistered(traitTypeDef.typeName)){
LOG.debug("TraitType {} is not registered. Adding to create type list", traitTypeDef.typeName);
createTraitDefList.add(traitTypeDef);
}
}
for(StructTypeDefinition structTypeDef:typesDef.structTypesAsJavaList()){
if(!typeSystem.isRegistered(structTypeDef.typeName)){
LOG.debug("StructType {} is not registered. Adding to create type list", structTypeDef.typeName);
createStructDefList.add(structTypeDef);
}
}
for(EnumTypeDefinition enumTypeDef:typesDef.enumTypesAsJavaList()){
if(!typeSystem.isRegistered(enumTypeDef.name)){
LOG.debug("EnumType {} is not registered. Adding to create type list", enumTypeDef.name);
createEnumDefList.add(enumTypeDef);
}
}
TypesDef createTypes = TypesUtil.getTypesDef(ImmutableList.copyOf(createEnumDefList), ImmutableList.copyOf(createStructDefList),
ImmutableList.copyOf(createTraitDefList), ImmutableList.copyOf(createClassDefList));
if (! createTypes.isEmpty()) {
String createTypeJSON = TypesSerialization.toJson(createTypes);
if (createTypeJSON != null) {
metadataService.createType(createTypeJSON);
LOG.info("Created types definition JSON {}", createTypeJSON);
}
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.services;
public class ReservedTypesRegistrationException extends RuntimeException {
public ReservedTypesRegistrationException(String message, Exception e) {
super(message, e);
}
}
...@@ -44,6 +44,9 @@ import org.apache.atlas.typesystem.types.TypeSystem; ...@@ -44,6 +44,9 @@ import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.typesystem.types.utils.TypesUtil; import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.testng.annotations.Guice; import org.testng.annotations.Guice;
import static org.apache.atlas.AtlasClient.PROCESS_ATTRIBUTE_INPUTS;
import static org.apache.atlas.AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
...@@ -67,6 +70,7 @@ public class BaseRepositoryTest { ...@@ -67,6 +70,7 @@ public class BaseRepositoryTest {
protected void setUp() throws Exception { protected void setUp() throws Exception {
//force graph initialization / built in type registration //force graph initialization / built in type registration
TestUtils.getGraph(); TestUtils.getGraph();
setUpDefaultTypes();
setUpTypes(); setUpTypes();
new GraphBackedSearchIndexer(new AtlasTypeRegistry()); new GraphBackedSearchIndexer(new AtlasTypeRegistry());
TestUtils.resetRequestContext(); TestUtils.resetRequestContext();
...@@ -395,4 +399,42 @@ public class BaseRepositoryTest { ...@@ -395,4 +399,42 @@ public class BaseRepositoryTest {
// return the reference to created instance with guid // return the reference to created instance with guid
return new Id(guids.get(guids.size() - 1), 0, referenceable.getTypeName()); return new Id(guids.get(guids.size() - 1), 0, referenceable.getTypeName());
} }
private void setUpDefaultTypes() throws Exception {
TypesDef typesDef = createDefaultTypeDefinitions();
String typesAsJSON = TypesSerialization.toJson(typesDef);
metadataService.createType(typesAsJSON);
}
TypesDef createDefaultTypeDefinitions() {
HierarchicalTypeDefinition<ClassType> referenceableType = TypesUtil
.createClassTypeDef(AtlasClient.REFERENCEABLE_SUPER_TYPE, ImmutableSet.<String>of(),
new AttributeDefinition(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, true, true, null));
HierarchicalTypeDefinition<ClassType> assetType = TypesUtil
.createClassTypeDef(AtlasClient.ASSET_TYPE, ImmutableSet.<String>of(),
new AttributeDefinition(AtlasClient.NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, false, true, null),
TypesUtil.createOptionalAttrDef(AtlasClient.DESCRIPTION, DataTypes.STRING_TYPE),
new AttributeDefinition(AtlasClient.OWNER, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, false, true, null));
HierarchicalTypeDefinition<ClassType> infraType = TypesUtil
.createClassTypeDef(AtlasClient.INFRASTRUCTURE_SUPER_TYPE,
ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE, AtlasClient.ASSET_TYPE));
HierarchicalTypeDefinition<ClassType> datasetType = TypesUtil
.createClassTypeDef(AtlasClient.DATA_SET_SUPER_TYPE,
ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE, AtlasClient.ASSET_TYPE));
HierarchicalTypeDefinition<ClassType> processType = TypesUtil
.createClassTypeDef(AtlasClient.PROCESS_SUPER_TYPE,
ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE, AtlasClient.ASSET_TYPE),
new AttributeDefinition(PROCESS_ATTRIBUTE_INPUTS, DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition(PROCESS_ATTRIBUTE_OUTPUTS, DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE),
Multiplicity.OPTIONAL, false, null));
return TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
ImmutableList.<HierarchicalTypeDefinition<TraitType>>of(),
ImmutableList.of(referenceableType, assetType, infraType, datasetType, processType));
}
} }
...@@ -33,7 +33,6 @@ import org.apache.atlas.repository.typestore.GraphBackedTypeStore; ...@@ -33,7 +33,6 @@ import org.apache.atlas.repository.typestore.GraphBackedTypeStore;
import org.apache.atlas.repository.typestore.ITypeStore; import org.apache.atlas.repository.typestore.ITypeStore;
import org.apache.atlas.services.DefaultMetadataService; import org.apache.atlas.services.DefaultMetadataService;
import org.apache.atlas.services.MetadataService; import org.apache.atlas.services.MetadataService;
import org.apache.atlas.services.ReservedTypesRegistrar;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.typesystem.ITypedReferenceableInstance; import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
...@@ -527,7 +526,6 @@ public final class TestUtils { ...@@ -527,7 +526,6 @@ public final class TestUtils {
ITypeStore typeStore = new GraphBackedTypeStore(); ITypeStore typeStore = new GraphBackedTypeStore();
DefaultMetadataService defaultMetadataService = new DefaultMetadataService(repo, DefaultMetadataService defaultMetadataService = new DefaultMetadataService(repo,
typeStore, typeStore,
new ReservedTypesRegistrar(),
Collections.singletonList(indexerProvider), Collections.singletonList(indexerProvider),
new ArrayList<Provider<EntityChangeListener>>(), TypeSystem.getInstance(), config, typeCache); new ArrayList<Provider<EntityChangeListener>>(), TypeSystem.getInstance(), config, typeCache);
......
...@@ -190,7 +190,7 @@ public class AtlasTypeDefGraphStoreTest { ...@@ -190,7 +190,7 @@ public class AtlasTypeDefGraphStoreTest {
try { try {
existingTypesDef = typeDefStore.searchTypesDef(new SearchFilter()); existingTypesDef = typeDefStore.searchTypesDef(new SearchFilter());
} catch (AtlasBaseException e) { } catch (AtlasBaseException e) {
fail("Shouldn't have failed during Search"); // ignore
} }
assertNotEquals(atlasTypesDef, existingTypesDef, "Types to be created already exist in the system"); assertNotEquals(atlasTypesDef, existingTypesDef, "Types to be created already exist in the system");
...@@ -204,7 +204,7 @@ public class AtlasTypeDefGraphStoreTest { ...@@ -204,7 +204,7 @@ public class AtlasTypeDefGraphStoreTest {
assertTrue(createdTypesDef.getEntityDefs().containsAll(atlasTypesDef.getEntityDefs()), "EntityDef creation failed"); assertTrue(createdTypesDef.getEntityDefs().containsAll(atlasTypesDef.getEntityDefs()), "EntityDef creation failed");
} catch (AtlasBaseException e) { } catch (AtlasBaseException e) {
fail("Creation of Types should've been a success"); fail("Creation of Types should've been a success", e);
} }
} }
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.services;
import com.google.inject.Provider;
import org.apache.atlas.AtlasException;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.listener.TypesChangeListener;
import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.typestore.ITypeStore;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.types.IDataType;
import org.apache.commons.configuration.Configuration;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.HashMap;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class DefaultMetadataServiceMockTest {
@Mock
private IBootstrapTypesRegistrar typesRegistrar;
@Mock
private TypeSystem typeSystem;
@Mock
private MetadataRepository metadataRepository;
@Mock
private ITypeStore typeStore;
@Mock
private Configuration configuration;
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Test
public void testShouldInvokeTypesRegistrarOnCreation() throws AtlasException {
when(typeSystem.isRegistered(any(String.class))).thenReturn(true);
when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(false);
DefaultMetadataService defaultMetadataService = new DefaultMetadataService(mock(MetadataRepository.class),
mock(ITypeStore.class),
typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(),
new ArrayList<Provider<EntityChangeListener>>(), typeSystem, configuration, null);
verify(typesRegistrar).registerTypes(ReservedTypesRegistrar.getTypesDir(),
typeSystem, defaultMetadataService);
}
@Test
public void testShouldNotRestoreTypesIfHAIsEnabled() throws AtlasException {
when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
new DefaultMetadataService(metadataRepository, typeStore,
typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(),
new ArrayList<Provider<EntityChangeListener>>(), typeSystem, configuration, null);
verifyZeroInteractions(typeStore);
verify(typeSystem, never()).defineTypes(Matchers.<TypesDef>any());
verifyZeroInteractions(typesRegistrar);
}
@Test
public void testShouldRestoreTypeSystemOnServerActive() throws AtlasException {
when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
TypesDef typesDef = mock(TypesDef.class);
when(typeStore.restore()).thenReturn(typesDef);
when(typeSystem.isRegistered(any(String.class))).thenReturn(true);
DefaultMetadataService defaultMetadataService = new DefaultMetadataService(metadataRepository,
typeStore,
typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(),
new ArrayList<Provider<EntityChangeListener>>(), typeSystem, configuration, null);
defaultMetadataService.instanceIsActive();
verify(typeStore).restore();
verify(typeSystem).defineTypes(typesDef);
verify(typesRegistrar).registerTypes(ReservedTypesRegistrar.getTypesDir(),
typeSystem, defaultMetadataService);
}
@Test
public void testShouldOnlyRestoreCacheOnServerActiveIfAlreadyDoneOnce() throws AtlasException {
when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
TypesDef typesDef = mock(TypesDef.class);
when(typeStore.restore()).thenReturn(typesDef);
when(typeSystem.isRegistered(any(String.class))).thenReturn(true);
TypeSystem.TransientTypeSystem transientTypeSystem = mock(TypeSystem.TransientTypeSystem.class);
HashMap<String, IDataType> typesAdded = new HashMap<>();
when(transientTypeSystem.getTypesAdded()).thenReturn(typesAdded);
when(typeSystem.createTransientTypeSystem(typesDef, true)).
thenReturn(transientTypeSystem);
DefaultMetadataService defaultMetadataService = new DefaultMetadataService(metadataRepository,
typeStore,
typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(),
new ArrayList<Provider<EntityChangeListener>>(), typeSystem, configuration, null);
defaultMetadataService.instanceIsActive();
defaultMetadataService.instanceIsPassive();
defaultMetadataService.instanceIsActive();
verify(typeStore, times(2)).restore();
verify(typeSystem, times(1)).defineTypes(typesDef);
verify(typesRegistrar, times(1)).
registerTypes(ReservedTypesRegistrar.getTypesDir(), typeSystem, defaultMetadataService);
verify(typeSystem, times(1)).createTransientTypeSystem(typesDef, true);
verify(typeSystem, times(1)).commitTypes(typesAdded);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.services;
import org.apache.atlas.AtlasException;
import org.apache.atlas.TestUtils;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.typesystem.types.TypeUtils;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class ReservedTypesRegistrarTest {
@Mock
private TypeSystem typeSystem;
@Mock
private MetadataService metadataService;
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Test
public void testRegistrationWithNoFiles() throws AtlasException {
IBootstrapTypesRegistrar bootstrapTypesRegistrar = new ReservedTypesRegistrar();
bootstrapTypesRegistrar.registerTypes("/some/dir/", typeSystem, metadataService);
verifyZeroInteractions(typeSystem);
}
@Test
public void testRegisterCreatesTypesUsingMetadataService() throws AtlasException {
ReservedTypesRegistrar reservedTypesRegistrar = new ReservedTypesRegistrar();
TypesDef typesDef = TestUtils.defineHiveTypes();
String typesJson = TypesSerialization.toJson(typesDef);
reservedTypesRegistrar.registerType(typeSystem, metadataService, "/some/file/model.json", typesJson);
verify(metadataService).createType(typesJson);
}
@Test(expectedExceptions = ReservedTypesRegistrationException.class)
public void testRegisterFailsIfErrorInJson() throws AtlasException {
ReservedTypesRegistrar reservedTypesRegistrar = new ReservedTypesRegistrar();
reservedTypesRegistrar.registerType(typeSystem, metadataService, "/some/file/model.json", "invalid json");
}
@Test(expectedExceptions = AtlasException.class)
public void testRegisterFailsOnTypeCreationException() throws AtlasException {
ReservedTypesRegistrar reservedTypesRegistrar = new ReservedTypesRegistrar();
TypesDef typesDef = TestUtils.defineHiveTypes();
String typesJson = TypesSerialization.toJson(typesDef);
when(metadataService.createType(typesJson)).thenThrow(new AtlasException("some exception"));
reservedTypesRegistrar.registerType(typeSystem, metadataService, "/some/file/model.json", typesJson);
}
@Test
public void testCreateAndUpdateType() throws AtlasException{
ReservedTypesRegistrar reservedTypesRegistrar = new ReservedTypesRegistrar();
TypesDef typesDef = TestUtils.simpleType();
String typesJson = TypesSerialization.toJson(typesDef);
reservedTypesRegistrar.registerType(typeSystem, metadataService, "/some/file/model.json", typesJson);
verify(metadataService).createType(typesJson);
//test update simple type
TypesDef updatedTypesDef = TestUtils.simpleTypeUpdated();
String updatedTypesJson = TypesSerialization.toJson(updatedTypesDef);
TypesDef simpleTypeUpdatedDiff = TestUtils.simpleTypeUpdatedDiff();
String simpleTypeUpdatedDiffJson = TypesSerialization.toJson(simpleTypeUpdatedDiff);
when(typeSystem.isRegistered("h_type")).thenReturn(true);
when(typeSystem.isRegistered("t_type")).thenReturn(true);
when(typeSystem.isRegistered("s_type")).thenReturn(true);
when(typeSystem.isRegistered("e_type")).thenReturn(true);
reservedTypesRegistrar.registerType(typeSystem, metadataService, "/some/file/model.json", updatedTypesJson);
verify(metadataService).createType(simpleTypeUpdatedDiffJson);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment