Commit 871f02bb by Ashutosh Mestry

ATLAS-2797: Atlas Cluster.

parent 6d0c5c8c
......@@ -100,6 +100,46 @@
]
},
{
"name": "AtlasCluster",
"typeVersion": "1.0",
"superTypes": [
],
"attributeDefs": [
{
"name": "displayName",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "qualifiedName",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": true
},
{
"name": "urls",
"typeName": "array<string>",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "additionalInfo",
"typeName": "map<string,string>",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
}
]
},
{
"name": "__AtlasUserProfile",
"superTypes": [
"__internal"
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.model.clusterinfo;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.atlas.model.AtlasBaseModelObject;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class AtlasCluster extends AtlasBaseModelObject implements Serializable {
private static final long serialVersionUID = 1L;
public static final String SYNC_INFO_KEY = "syncInfo";
public static final String OPERATION = "operation";
public static final String NEXT_MODIFIED_TIMESTAMP = "nextModifiedTimestamp";
private String name;
private String qualifiedName;
private Map<String, String> additionalInfo;
private List<String> urls;
public AtlasCluster() {
urls = new ArrayList<>();
}
public AtlasCluster(String name, String qualifiedName) {
this.name = name;
this.qualifiedName = qualifiedName;
}
public void setName(String name) {
this.name = name;
}
public String getName() {
return this.name;
}
public void setAdditionalInfo(Map<String, String> additionalInfo) {
if(this.additionalInfo == null) {
this.additionalInfo = new HashMap<>();
}
this.additionalInfo = additionalInfo;
}
public void setAdditionalInfo(String key, String value) {
if(this.additionalInfo == null) {
this.additionalInfo = new HashMap<>();
}
additionalInfo.put(key, value);
}
public Map<String, String> getAdditionalInfo() {
return this.additionalInfo;
}
public String getAdditionalInfo(String key) {
return additionalInfo.get(key);
}
public String getQualifiedName() {
return qualifiedName;
}
public void setQualifiedName(String qualifiedName) {
this.qualifiedName = qualifiedName;
}
public void setUrls(List<String> urls) {
this.urls = urls;
}
public List<String> getUrls() {
return this.urls;
}
@Override
public StringBuilder toString(StringBuilder sb) {
sb.append(", name=").append(name);
sb.append(", qualifiedName=").append(getQualifiedName());
sb.append(", urls=").append(urls);
sb.append(", additionalInfo=").append(additionalInfo);
sb.append("}");
return sb;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.clusterinfo;
import org.apache.atlas.annotation.AtlasService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.clusterinfo.AtlasCluster;
import org.apache.atlas.repository.ogm.DataAccess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
@AtlasService
public class ClusterService {
private static final Logger LOG = LoggerFactory.getLogger(ClusterService.class);
private final DataAccess dataAccess;
@Inject
public ClusterService(DataAccess dataAccess) {
this.dataAccess = dataAccess;
}
public AtlasCluster get(AtlasCluster cluster) {
try {
return dataAccess.load(cluster);
} catch (AtlasBaseException e) {
LOG.error("dataAccess", e);
}
return null;
}
public AtlasCluster save(AtlasCluster clusterInfo) throws AtlasBaseException {
return dataAccess.save(clusterInfo);
}
}
......@@ -17,7 +17,6 @@
*/
package org.apache.atlas.repository.ogm;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.AtlasBaseModelObject;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.Constants;
......@@ -31,16 +30,16 @@ public abstract class AbstractDataTransferObject<T extends AtlasBaseModelObject>
private final Class<T> objectType;
private final String entityTypeName;
protected AbstractDataTransferObject(AtlasTypeRegistry typeRegistry, Class<T> tClass) {
this(typeRegistry, tClass, tClass.getSimpleName());
}
protected AbstractDataTransferObject(AtlasTypeRegistry typeRegistry, Class<T> tClass, String entityTypeName) {
this.typeRegistry = typeRegistry;
this.objectType = tClass;
this.entityTypeName = entityTypeName;
}
protected AbstractDataTransferObject(AtlasTypeRegistry typeRegistry, Class<T> tClass) {
this(typeRegistry, tClass, Constants.INTERNAL_PROPERTY_KEY_PREFIX + tClass.getSimpleName());
}
@Override
public Class getObjectType() {
return objectType;
......@@ -54,7 +53,7 @@ public abstract class AbstractDataTransferObject<T extends AtlasBaseModelObject>
}
protected AtlasEntity getDefaultAtlasEntity(T obj) throws AtlasBaseException {
protected AtlasEntity getDefaultAtlasEntity(T obj) {
AtlasEntity ret = getEntityType().createDefaultValue();
if (obj != null) {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.ogm;
import org.apache.atlas.model.clusterinfo.AtlasCluster;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
public class AtlasClusterDTO extends AbstractDataTransferObject<AtlasCluster> {
private final String PROPERTY_CLUSTER_NAME = "displayName";
private final String PROPERTY_QUALIFIED_NAME = "qualifiedName";
private final String PROPERTY_ADDITIONAL_INFO = "additionalInfo";
private final String PROPERTY_URLS = "urls";
@Inject
public AtlasClusterDTO(AtlasTypeRegistry typeRegistry) {
super(typeRegistry, AtlasCluster.class, AtlasCluster.class.getSimpleName());
}
public AtlasCluster from(AtlasEntity entity) {
AtlasCluster cluster = new AtlasCluster();
setGuid(cluster, entity);
cluster.setName((String) entity.getAttribute(PROPERTY_CLUSTER_NAME));
cluster.setQualifiedName((String) entity.getAttribute(PROPERTY_QUALIFIED_NAME));
cluster.setAdditionalInfo((Map<String,String>) entity.getAttribute(PROPERTY_ADDITIONAL_INFO));
cluster.setUrls((List<String>) entity.getAttribute(PROPERTY_URLS));
return cluster;
}
public AtlasCluster from(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
return from(entityWithExtInfo.getEntity());
}
@Override
public AtlasEntity toEntity(AtlasCluster obj) {
AtlasEntity entity = getDefaultAtlasEntity(obj);
entity.setAttribute(PROPERTY_CLUSTER_NAME, obj.getName());
entity.setAttribute(PROPERTY_QUALIFIED_NAME, obj.getQualifiedName());
entity.setAttribute(PROPERTY_ADDITIONAL_INFO, obj.getAdditionalInfo());
return entity;
}
@Override
public AtlasEntity.AtlasEntityWithExtInfo toEntityWithExtInfo(AtlasCluster obj) {
return new AtlasEntity.AtlasEntityWithExtInfo(toEntity(obj));
}
@Override
public Map<String, Object> getUniqueAttributes(final AtlasCluster obj) {
return new HashMap<String, Object>() {{
put(PROPERTY_QUALIFIED_NAME, obj.getQualifiedName());
}};
}
}
......@@ -41,6 +41,7 @@ import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.GraphDBMigrator;
import org.apache.atlas.repository.graphdb.janus.migration.GraphDBGraphSONMigrator;
import org.apache.atlas.repository.impexp.ExportService;
import org.apache.atlas.repository.ogm.AtlasClusterDTO;
import org.apache.atlas.repository.ogm.profiles.AtlasSavedSearchDTO;
import org.apache.atlas.repository.ogm.profiles.AtlasUserProfileDTO;
import org.apache.atlas.repository.ogm.DTORegistry;
......@@ -170,6 +171,7 @@ public class TestModules {
availableDTOs.addBinding().to(AtlasGlossaryDTO.class);
availableDTOs.addBinding().to(AtlasGlossaryTermDTO.class);
availableDTOs.addBinding().to(AtlasGlossaryCategoryDTO.class);
availableDTOs.addBinding().to(AtlasClusterDTO.class);
bind(DTORegistry.class).asEagerSingleton();
bind(DataAccess.class).asEagerSingleton();
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.clusterinfo;
import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.clusterinfo.AtlasCluster;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@Guice(modules = TestModules.TestOnlyModule.class)
public class ClusterServiceTest {
private final String TOP_LEVEL_ENTITY_NAME = "db1@cl1";
private final String CLUSTER_NAME = "testCl1";
private final String TARGET_CLUSTER_NAME = "testCl2";
@Inject
private AtlasTypeDefStore typeDefStore;
@Inject
private AtlasTypeRegistry typeRegistry;
@Inject
private ClusterService clusterService;
@BeforeClass
public void setup() throws IOException, AtlasBaseException {
loadBaseModel(typeDefStore, typeRegistry);
}
@Test
public void saveAndRetrieveClusterInfo() throws AtlasBaseException {
AtlasCluster expected = getCluster(CLUSTER_NAME, TOP_LEVEL_ENTITY_NAME, "EXPORT", 0l, TARGET_CLUSTER_NAME);
AtlasCluster expected2 = getCluster(TARGET_CLUSTER_NAME, TOP_LEVEL_ENTITY_NAME, "IMPORT", 0L, TARGET_CLUSTER_NAME);
AtlasCluster expected3 = getCluster(TARGET_CLUSTER_NAME + "_3", TOP_LEVEL_ENTITY_NAME, "IMPORT", 0, TARGET_CLUSTER_NAME);
AtlasCluster actual = clusterService.save(expected);
AtlasCluster actual2 = clusterService.save(expected2);
AtlasCluster actual3 = clusterService.save(expected3);
AtlasCluster actual2x = clusterService.get(expected2);
assertNotNull(actual.getGuid());
assertNotNull(actual2.getGuid());
assertNotEquals(actual.getGuid(), actual2.getGuid());
assertNotEquals(actual2.getGuid(), actual3.getGuid());
assertEquals(actual2.getGuid(), actual2x.getGuid());
assertEquals(actual.getName(), expected.getName());
assertEquals(actual.getQualifiedName(), expected.getQualifiedName());
assertEquals(getAdditionalInfo(actual, TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.OPERATION),
getAdditionalInfo(expected, TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.OPERATION));
assertEquals(getAdditionalInfo(actual, TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.NEXT_MODIFIED_TIMESTAMP),
getAdditionalInfo(expected, TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.NEXT_MODIFIED_TIMESTAMP));
}
private AtlasCluster getCluster(String name, String topLevelEntity, String operation, long nextModifiedTimestamp, String targetClusterName) {
AtlasCluster cluster = new AtlasCluster(name, name);
Map<String, Object> syncMap = new HashMap<>();
syncMap.put("operation", operation);
syncMap.put("nextModifiedTimestamp", nextModifiedTimestamp);
syncMap.put("targetCluster", targetClusterName);
String syncMapJson = AtlasType.toJson(syncMap);
String topLevelEntitySpecificKey = getTopLevelEntitySpecificKey(topLevelEntity);
cluster.setAdditionalInfo(topLevelEntitySpecificKey, syncMapJson);
return cluster;
}
private Map<String, Object> getAdditionalInfo(AtlasCluster cluster, String topLevelEntityName) {
String topLevelEntitySpecificKey = getTopLevelEntitySpecificKey(topLevelEntityName);
assertTrue(cluster.getAdditionalInfo().containsKey(topLevelEntitySpecificKey));
String json = cluster.getAdditionalInfo(topLevelEntitySpecificKey);
return AtlasType.fromJson(json, Map.class);
}
private String getTopLevelEntitySpecificKey(String topLevelEntity) {
return String.format("%s:%s", AtlasCluster.SYNC_INFO_KEY, topLevelEntity);
}
}
......@@ -21,11 +21,16 @@ import com.google.common.collect.Sets;
import org.apache.atlas.RequestContext;
import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
......@@ -35,7 +40,10 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.SkipException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
......@@ -141,11 +149,20 @@ public class ZipFileResourceTestUtils {
}
public static Object[][] getZipSource(String fileName) throws IOException {
return new Object[][]{{getZipSourceFrom(fileName)}};
}
public static ZipSource getZipSourceFrom(String fileName) throws IOException {
FileInputStream fs = ZipFileResourceTestUtils.getFileInputStream(fileName);
return new Object[][]{{new ZipSource(fs)}};
return new ZipSource(fs);
}
private static ZipSource getZipSourceFrom(ByteArrayOutputStream baos) throws IOException {
ByteArrayInputStream bis = new ByteArrayInputStream(baos.toByteArray());
ZipSource zipSource = new ZipSource(bis);
return zipSource;
}
public static void verifyImportedEntities(List<String> creationOrder, List<String> processedEntities) {
Set<String> lhs = com.google.common.collect.Sets.newHashSet(creationOrder);
......@@ -169,6 +186,62 @@ public class ZipFileResourceTestUtils {
}
}
public static AtlasTypesDef loadTypes(String entitiesSubDir, String fileName) {
try {
return TestResourceFileUtils.readObjectFromJson(entitiesSubDir, fileName, AtlasTypesDef.class);
} catch (IOException e) {
throw new SkipException(String.format("createTypes: '%s' could not be laoded.", fileName));
}
}
public static AtlasEntity.AtlasEntityWithExtInfo loadEntity(String entitiesSubDir, String fileName) {
try {
return TestResourceFileUtils.readObjectFromJson(entitiesSubDir, fileName, AtlasEntity.AtlasEntityWithExtInfo.class);
} catch (IOException e) {
throw new SkipException(String.format("createTypes: '%s' could not be laoded.", fileName));
}
}
public static void createTypes(AtlasTypeDefStore typeDefStore, String entitiesSubDir, String typesDef) {
try {
typeDefStore.createTypesDef(loadTypes(entitiesSubDir, typesDef));
} catch (AtlasBaseException e) {
throw new SkipException("setup: could not load typesDef.");
}
}
public static void createAtlasEntity(AtlasEntityStoreV2 entityStoreV1, AtlasEntity.AtlasEntityWithExtInfo atlasEntity) {
try {
EntityMutationResponse response = entityStoreV1.createOrUpdateForImport(new AtlasEntityStreamForImport(atlasEntity, null));
assertNotNull(response);
assertTrue((response.getCreatedEntities() != null && response.getCreatedEntities().size() > 0) ||
(response.getMutatedEntities() != null && response.getMutatedEntities().size() > 0));
} catch (AtlasBaseException e) {
throw new SkipException(String.format("createAtlasEntity: could not load '%s'.", atlasEntity.getEntity().getTypeName()));
}
}
public static ZipSource runExportWithParameters(ExportService exportService, AtlasExportRequest request) {
final String requestingIP = "1.0.0.0";
final String hostName = "localhost";
final String userName = "admin";
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ZipSink zipSink = new ZipSink(baos);
AtlasExportResult result = exportService.run(zipSink, request, userName, hostName, requestingIP);
assertEquals(result.getOperationStatus(), AtlasExportResult.OperationStatus.SUCCESS);
zipSink.close();
return getZipSourceFrom(baos);
}
catch(Exception ex) {
throw new SkipException(String.format("runExportWithParameters: %s: failed!", request.toString()));
}
}
private static Map<String,Integer> getImportMetricsForCompare(AtlasImportResult result) {
Map<String, Integer> r = new HashMap<>();
for (Map.Entry<String, Integer> entry : result.getMetrics().entrySet()) {
......@@ -257,4 +330,16 @@ public class ZipFileResourceTestUtils {
verifyImportedMetrics(exportResult, result);
verifyImportedEntities(creationOrder, result.getProcessedEntities());
}
public static void loadBaseModel(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) throws IOException, AtlasBaseException {
loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
}
public static void loadFsModel(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) throws IOException, AtlasBaseException {
loadModelFromJson("1000-Hadoop/0020-fs_model.json", typeDefStore, typeRegistry);
}
public static void loadHiveModel(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) throws IOException, AtlasBaseException {
loadModelFromJson("1000-Hadoop/0030-hive_model.json", typeDefStore, typeRegistry);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment