Commit 831ad014 by Ashutosh Mestry

ATLAS-2806: Using replication attributes during export and import process.

parent 1d8b81d8
{
"patches": [
{
"action": "ADD_ATTRIBUTE",
"typeName": "Referenceable",
"applyToVersion": "1.0",
"updateToVersion": "1.1",
"params": null,
"attributeDefs": [
{
"name": "replicatedFromCluster",
"typeName": "array<AtlasCluster>",
"cardinality": "SET",
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "replicatedToCluster",
"typeName": "array<AtlasCluster>",
"cardinality": "SET",
"isIndexable": false,
"isOptional": true,
"isUnique": false
}
]
}
]
}
...@@ -159,6 +159,14 @@ public final class Constants { ...@@ -159,6 +159,14 @@ public final class Constants {
public static final String VERTEX_ID_IN_IMPORT_KEY = "__vIdInImport"; public static final String VERTEX_ID_IN_IMPORT_KEY = "__vIdInImport";
public static final String EDGE_ID_IN_IMPORT_KEY = "__eIdInImport"; public static final String EDGE_ID_IN_IMPORT_KEY = "__eIdInImport";
/*
* replication attributes
*/
public static final String ATTR_NAME_REFERENCEABLE = "Referenceable.";
public static final String ATTR_NAME_REPLICATED_TO_CLUSTER = "replicatedToCluster";
public static final String ATTR_NAME_REPLICATED_FROM_CLUSTER = "replicatedFromCluster";
private Constants() { private Constants() {
} }
......
...@@ -145,6 +145,6 @@ curl -X POST -u adminuser:password -H "Content-Type: application/json" -H "Cache ...@@ -145,6 +145,6 @@ curl -X POST -u adminuser:password -H "Content-Type: application/json" -H "Cache
{ "typeName": "DB", "uniqueAttributes": { "name": "Logging" } { "typeName": "DB", "uniqueAttributes": { "name": "Logging" }
} }
], ],
"options": "full" "options": { "full" }
}' "http://localhost:21000/api/atlas/admin/export" > quickStartDB.zip }' "http://localhost:21000/api/atlas/admin/export" > quickStartDB.zip
</verbatim> </verbatim>
...@@ -28,47 +28,54 @@ import org.apache.atlas.model.impexp.AtlasExportResult; ...@@ -28,47 +28,54 @@ import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.impexp.ExportImportAuditEntry; import org.apache.atlas.model.impexp.ExportImportAuditEntry;
import org.apache.atlas.repository.clusterinfo.ClusterService; import org.apache.atlas.repository.Constants;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.inject.Inject; import javax.inject.Inject;
import java.util.List;
import java.util.Map; import java.util.Map;
@Component @Component
public class AuditHelper { public class AuditsWriter {
private static final Logger LOG = LoggerFactory.getLogger(AuditHelper.class); private static final Logger LOG = LoggerFactory.getLogger(AuditsWriter.class);
private static final String CLUSTER_NAME_DEFAULT = "default"; private static final String CLUSTER_NAME_DEFAULT = "default";
private ClusterService clusterService; private ClusterService clusterService;
private ExportImportAuditService auditService; private ExportImportAuditService auditService;
private ExportAudits auditForExport = new ExportAudits();
private ImportAudits auditForImport = new ImportAudits();
@Inject @Inject
public AuditHelper(ClusterService clusterService, ExportImportAuditService auditService) { public AuditsWriter(ClusterService clusterService, ExportImportAuditService auditService) {
this.clusterService = clusterService; this.clusterService = clusterService;
this.auditService = auditService; this.auditService = auditService;
} }
public void write(String userName, AtlasExportResult result, long startTime, long endTime, List<String> entityCreationOrder) throws AtlasBaseException {
auditForExport.add(userName, result, startTime, endTime, entityCreationOrder);
}
public void audit(String userName, AtlasExportResult result, long startTime, long endTime, boolean hadData) throws AtlasBaseException { public void write(String userName, AtlasImportResult result, long startTime, long endTime, List<String> entityCreationOrder) throws AtlasBaseException {
AtlasExportRequest request = result.getRequest(); auditForImport.add(userName, result, startTime, endTime, entityCreationOrder);
AtlasCluster cluster = saveCluster(getCurrentClusterName());
String targetClusterName = getClusterNameFromOptions(request.getOptions(), AtlasExportRequest.OPTION_KEY_REPLICATED_TO);
addAuditEntry(userName,
cluster.getName(), targetClusterName,
ExportImportAuditEntry.OPERATION_EXPORT,
AtlasType.toJson(result), startTime, endTime, hadData);
} }
public void audit(String userName, AtlasImportResult result, long startTime, long endTime, boolean hadData) throws AtlasBaseException { private boolean isReplicationOptionSet(Map<String, ? extends Object> options, String replicatedKey) {
AtlasImportRequest request = result.getRequest(); return options.containsKey(replicatedKey);
AtlasCluster cluster = saveCluster(getCurrentClusterName()); }
String sourceCluster = getClusterNameFromOptions(request.getOptions(), AtlasImportRequest.OPTION_KEY_REPLICATED_FROM);
addAuditEntry(userName, private void updateReplicationAttribute(boolean isReplicationSet, String clusterName,
sourceCluster, cluster.getName(), List<String> exportedGuids,
ExportImportAuditEntry.OPERATION_EXPORT, AtlasType.toJson(result), startTime, endTime, hadData); String attrNameReplicated) throws AtlasBaseException {
if (!isReplicationSet) {
return;
}
AtlasCluster cluster = saveCluster(clusterName);
clusterService.updateEntityWithCluster(cluster, exportedGuids, attrNameReplicated);
} }
private String getClusterNameFromOptions(Map options, String key) { private String getClusterNameFromOptions(Map options, String key) {
...@@ -110,4 +117,69 @@ public class AuditHelper { ...@@ -110,4 +117,69 @@ public class AuditHelper {
return ""; return "";
} }
private class ExportAudits {
private AtlasExportRequest request;
private AtlasCluster cluster;
private String targetClusterName;
private String optionKeyReplicatedTo;
private boolean replicationOptionState;
public void add(String userName, AtlasExportResult result, long startTime, long endTime, List<String> entitityGuids) throws AtlasBaseException {
optionKeyReplicatedTo = AtlasExportRequest.OPTION_KEY_REPLICATED_TO;
request = result.getRequest();
cluster = saveCluster(getCurrentClusterName());
replicationOptionState = isReplicationOptionSet(request.getOptions(), optionKeyReplicatedTo);
targetClusterName = getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedTo);
addAuditEntry(userName,
cluster.getName(), targetClusterName,
ExportImportAuditEntry.OPERATION_EXPORT,
AtlasType.toJson(result), startTime, endTime, !entitityGuids.isEmpty());
updateReplicationAttributeForExport(entitityGuids, request);
}
private void updateReplicationAttributeForExport(List<String> entityGuids, AtlasExportRequest request) throws AtlasBaseException {
if(!replicationOptionState) return;
updateReplicationAttribute(replicationOptionState, targetClusterName, entityGuids, Constants.ATTR_NAME_REPLICATED_TO_CLUSTER);
}
}
private class ImportAudits {
private AtlasImportRequest request;
private boolean replicationOptionState;
private AtlasCluster cluster;
private String optionKeyReplicatedFrom;
private AtlasImportResult result;
public void add(String userName, AtlasImportResult result, long startTime, long endTime, List<String> entitityGuids) throws AtlasBaseException {
this.result = result;
request = result.getRequest();
optionKeyReplicatedFrom = AtlasImportRequest.OPTION_KEY_REPLICATED_FROM;
replicationOptionState = isReplicationOptionSet(request.getOptions(), optionKeyReplicatedFrom);
cluster = saveCluster(getClusterNameFromOptionsState());
String sourceCluster = getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedFrom);
addAuditEntry(userName,
sourceCluster, cluster.getName(),
ExportImportAuditEntry.OPERATION_EXPORT, AtlasType.toJson(result), startTime, endTime, !entitityGuids.isEmpty());
updateReplicationAttributeForImport(entitityGuids);
}
private void updateReplicationAttributeForImport(List<String> entityGuids) throws AtlasBaseException {
if(!replicationOptionState) return;
String targetClusterName = cluster.getName();
updateReplicationAttribute(replicationOptionState, targetClusterName, entityGuids, Constants.ATTR_NAME_REPLICATED_FROM_CLUSTER);
}
private String getClusterNameFromOptionsState() {
return replicationOptionState
? getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedFrom)
: getCurrentClusterName();
}
}
} }
...@@ -16,27 +16,36 @@ ...@@ -16,27 +16,36 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.repository.clusterinfo; package org.apache.atlas.repository.impexp;
import org.apache.atlas.annotation.AtlasService; import org.apache.atlas.annotation.AtlasService;
import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.clusterinfo.AtlasCluster; import org.apache.atlas.model.clusterinfo.AtlasCluster;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.repository.ogm.DataAccess; import org.apache.atlas.repository.ogm.DataAccess;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.inject.Inject; import javax.inject.Inject;
import java.util.ArrayList;
import java.util.List;
@AtlasService @AtlasService
public class ClusterService { public class ClusterService {
private static final Logger LOG = LoggerFactory.getLogger(ClusterService.class); private static final Logger LOG = LoggerFactory.getLogger(ClusterService.class);
private final DataAccess dataAccess; private final DataAccess dataAccess;
private final AtlasEntityStore entityStore;
@Inject @Inject
public ClusterService(DataAccess dataAccess) { public ClusterService(DataAccess dataAccess, AtlasEntityStore entityStore) {
this.dataAccess = dataAccess; this.dataAccess = dataAccess;
this.entityStore = entityStore;
} }
public AtlasCluster get(AtlasCluster cluster) { public AtlasCluster get(AtlasCluster cluster) {
...@@ -53,8 +62,59 @@ public class ClusterService { ...@@ -53,8 +62,59 @@ public class ClusterService {
public AtlasCluster save(AtlasCluster clusterInfo) { public AtlasCluster save(AtlasCluster clusterInfo) {
try { try {
return dataAccess.save(clusterInfo); return dataAccess.save(clusterInfo);
} catch (AtlasBaseException ex) { } catch (AtlasBaseException e) {
return clusterInfo; return clusterInfo;
} }
} }
@GraphTransaction
public void updateEntityWithCluster(AtlasCluster cluster, List<String> guids, String attributeName) throws AtlasBaseException {
if(cluster != null && StringUtils.isEmpty(cluster.getGuid())) return;
AtlasObjectId objectId = getObjectId(cluster);
for (String guid : guids) {
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = entityStore.getById(guid);
updateAttribute(entityWithExtInfo, attributeName, objectId);
entityStore.createOrUpdate(new AtlasEntityStream(entityWithExtInfo), true);
}
}
private AtlasObjectId getObjectId(AtlasCluster cluster) {
return new AtlasObjectId(cluster.getGuid(), AtlasCluster.class.getSimpleName());
}
/**
* Attribute passed by name is updated with the value passed.
* @param entityWithExtInfo Entity to be updated
* @param propertyName attribute name
* @param value Value to be set for attribute
*/
private void updateAttribute(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, String propertyName, Object value) {
updateAttribute(entityWithExtInfo.getEntity(), propertyName, value);
for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
updateAttribute(e, propertyName, value);
}
}
private void updateAttribute(AtlasEntity e, String propertyName, Object value) {
if(e.hasAttribute(propertyName) == false) return;
Object oVal = e.getAttribute(propertyName);
if (oVal != null && !(oVal instanceof List)) return;
List list;
if (oVal == null) {
list = new ArrayList();
} else {
list = (List) oVal;
}
if (!list.contains(value)) {
list.add(value);
}
e.setAttribute(propertyName, list);
}
} }
...@@ -69,18 +69,18 @@ public class ExportService { ...@@ -69,18 +69,18 @@ public class ExportService {
private static final Logger LOG = LoggerFactory.getLogger(ExportService.class); private static final Logger LOG = LoggerFactory.getLogger(ExportService.class);
private final AtlasTypeRegistry typeRegistry; private final AtlasTypeRegistry typeRegistry;
private AuditHelper auditHelper; private AuditsWriter auditsWriter;
private final AtlasGraph atlasGraph; private final AtlasGraph atlasGraph;
private final EntityGraphRetriever entityGraphRetriever; private final EntityGraphRetriever entityGraphRetriever;
private final AtlasGremlinQueryProvider gremlinQueryProvider; private final AtlasGremlinQueryProvider gremlinQueryProvider;
@Inject @Inject
public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, AuditHelper auditHelper) { public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, AuditsWriter auditsWriter) {
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry); this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry);
this.atlasGraph = atlasGraph; this.atlasGraph = atlasGraph;
this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE; this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE;
this.auditHelper = auditHelper; this.auditsWriter = auditsWriter;
} }
public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName, public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName,
...@@ -113,12 +113,11 @@ public class ExportService { ...@@ -113,12 +113,11 @@ public class ExportService {
AtlasExportResult.OperationStatus[] statuses, AtlasExportResult.OperationStatus[] statuses,
long startTime, long endTime) throws AtlasBaseException { long startTime, long endTime) throws AtlasBaseException {
int duration = getOperationDuration(startTime, endTime); int duration = getOperationDuration(startTime, endTime);
context.result.setSourceClusterName(AuditHelper.getCurrentClusterName()); context.result.setSourceClusterName(AuditsWriter.getCurrentClusterName());
context.result.getData().getEntityCreationOrder().addAll(context.lineageProcessed); context.result.getData().getEntityCreationOrder().addAll(context.lineageProcessed);
context.sink.setExportOrder(context.result.getData().getEntityCreationOrder()); context.sink.setExportOrder(context.result.getData().getEntityCreationOrder());
context.sink.setTypesDef(context.result.getData().getTypesDef()); context.sink.setTypesDef(context.result.getData().getTypesDef());
auditHelper.audit(userName, context.result, startTime, endTime, auditsWriter.write(userName, context.result, startTime, endTime, context.result.getData().getEntityCreationOrder());
!context.result.getData().getEntityCreationOrder().isEmpty());
clearContextData(context); clearContextData(context);
context.result.setOperationStatus(getOverallOperationStatus(statuses)); context.result.setOperationStatus(getOverallOperationStatus(statuses));
context.result.incrementMeticsCounter("duration", duration); context.result.incrementMeticsCounter("duration", duration);
......
...@@ -47,17 +47,17 @@ public class ImportService { ...@@ -47,17 +47,17 @@ public class ImportService {
private final AtlasTypeDefStore typeDefStore; private final AtlasTypeDefStore typeDefStore;
private final AtlasTypeRegistry typeRegistry; private final AtlasTypeRegistry typeRegistry;
private final BulkImporter bulkImporter; private final BulkImporter bulkImporter;
private AuditHelper auditHelper; private AuditsWriter auditsWriter;
private long startTimestamp; private long startTimestamp;
private long endTimestamp; private long endTimestamp;
@Inject @Inject
public ImportService(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry, BulkImporter bulkImporter, AuditHelper auditHelper) { public ImportService(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry, BulkImporter bulkImporter, AuditsWriter auditsWriter) {
this.typeDefStore = typeDefStore; this.typeDefStore = typeDefStore;
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.bulkImporter = bulkImporter; this.bulkImporter = bulkImporter;
this.auditHelper = auditHelper; this.auditsWriter = auditsWriter;
} }
public AtlasImportResult run(ZipSource source, String userName, public AtlasImportResult run(ZipSource source, String userName,
...@@ -189,7 +189,7 @@ public class ImportService { ...@@ -189,7 +189,7 @@ public class ImportService {
endTimestamp = System.currentTimeMillis(); endTimestamp = System.currentTimeMillis();
result.incrementMeticsCounter("duration", getDuration(this.endTimestamp, this.startTimestamp)); result.incrementMeticsCounter("duration", getDuration(this.endTimestamp, this.startTimestamp));
auditHelper.audit(userName, result, startTimestamp, endTimestamp, !importSource.getCreationOrder().isEmpty()); auditsWriter.write(userName, result, startTimestamp, endTimestamp, importSource.getCreationOrder());
} }
private int getDuration(long endTime, long startTime) { private int getDuration(long endTime, long startTime) {
......
...@@ -22,6 +22,7 @@ import org.apache.atlas.TestModules; ...@@ -22,6 +22,7 @@ import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.clusterinfo.AtlasCluster; import org.apache.atlas.model.clusterinfo.AtlasCluster;
import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.impexp.ClusterService;
import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
......
...@@ -96,7 +96,8 @@ public class ExportImportAuditServiceTest { ...@@ -96,7 +96,8 @@ public class ExportImportAuditServiceTest {
} }
private ExportImportAuditEntry retrieveEntry(ExportImportAuditEntry entry) throws AtlasBaseException { private ExportImportAuditEntry retrieveEntry(ExportImportAuditEntry entry) throws AtlasBaseException, InterruptedException {
Thread.sleep(5000);
AtlasSearchResult result = auditService.get(entry.getUserName(), entry.getOperation(), entry.getSourceClusterName(), AtlasSearchResult result = auditService.get(entry.getUserName(), entry.getOperation(), entry.getSourceClusterName(),
entry.getTargetClusterName(), Long.toString(entry.getStartTime()), "", 10, 0); entry.getTargetClusterName(), Long.toString(entry.getStartTime()), "", 10, 0);
assertNotNull(result); assertNotNull(result);
......
...@@ -24,13 +24,18 @@ import org.apache.atlas.AtlasConstants; ...@@ -24,13 +24,18 @@ import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.AtlasSearchResult; import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1;
import org.apache.atlas.repository.store.graph.v1.SoftDeleteHandlerV1;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail; import static org.testng.Assert.fail;
public class ExportImportTestBase { public class ExportImportTestBase {
protected DeleteHandlerV1 deleteHandler = mock(SoftDeleteHandlerV1.class);
protected void assertAuditEntry(ExportImportAuditService auditService) { protected void assertAuditEntry(ExportImportAuditService auditService) {
AtlasSearchResult result = null; AtlasSearchResult result = null;
try { try {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import com.google.common.collect.ImmutableList;
import org.apache.atlas.RequestContext;
import org.apache.atlas.TestModules;
import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.clusterinfo.AtlasCluster;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityChangeNotifier;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.TestResourceFileUtils;
import org.testng.SkipException;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
import java.io.IOException;
import java.util.List;
import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_KEY_REPLICATED_TO;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createAtlasEntity;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadEntity;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadHiveModel;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithParameters;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
@Guice(modules = TestModules.TestOnlyModule.class)
public class ReplicationEntityAttributeTest extends ExportImportTestBase {
private final String ENTITIES_SUB_DIR = "stocksDB-Entities";
private final String EXPORT_REQUEST_FILE = "export-replicatedTo";
private final String IMPORT_REQUEST_FILE = "import-replicatedFrom";
private final String DB_GUID = "1637a33e-6512-447b-ade7-249c8cb5344b";
private final String TABLE_GUID = "df122fc3-5555-40f8-a30f-3090b8a622f8";
private String REPLICATED_TO_CLUSTER_NAME = "";
private String REPLICATED_FROM_CLUSTER_NAME = "";
@Inject
AtlasTypeRegistry typeRegistry;
@Inject
private AtlasTypeDefStore typeDefStore;
@Inject
private EntityGraphMapper graphMapper;
@Inject
ExportService exportService;
@Inject
ImportService importService;
@Inject
ClusterService clusterService;
private AtlasEntityChangeNotifier mockChangeNotifier = mock(AtlasEntityChangeNotifier.class);
private AtlasEntityStoreV2 entityStore;
private ZipSource zipSource;
@BeforeClass
public void setup() throws IOException, AtlasBaseException {
loadBaseModel(typeDefStore, typeRegistry);
loadHiveModel(typeDefStore, typeRegistry);
createEntities();
}
private void createEntities() {
entityStore = new AtlasEntityStoreV2(deleteHandler, typeRegistry, mockChangeNotifier, graphMapper);
createAtlasEntity(entityStore, loadEntity(ENTITIES_SUB_DIR,"db"));
createAtlasEntity(entityStore, loadEntity(ENTITIES_SUB_DIR, "table-columns"));
try {
AtlasEntity.AtlasEntitiesWithExtInfo entities = entityStore.getByIds(ImmutableList.of(DB_GUID, TABLE_GUID));
assertEquals(entities.getEntities().size(), 2);
} catch (AtlasBaseException e) {
throw new SkipException(String.format("getByIds: could not load '%s' & '%s'.", DB_GUID, TABLE_GUID));
}
}
@BeforeMethod
public void setupTest() {
RequestContext.clear();
RequestContext.get().setUser(TestUtilsV2.TEST_USER, null);
}
@Test
public void exportWithReplicationToOption_AddsClusterObjectIdToReplicatedFromAttribute() throws AtlasBaseException {
final int expectedEntityCount = 2;
AtlasExportRequest request = getUpdateMetaInfoUpdateRequest();
zipSource = runExportWithParameters(exportService, request);
assertNotNull(zipSource);
assertNotNull(zipSource.getCreationOrder());
assertEquals(zipSource.getCreationOrder().size(), expectedEntityCount);
assertClusterInfo(REPLICATED_TO_CLUSTER_NAME);
assertReplicationAttribute(Constants.ATTR_NAME_REPLICATED_TO_CLUSTER);
}
@Test(dependsOnMethods = "exportWithReplicationToOption_AddsClusterObjectIdToReplicatedFromAttribute", enabled = false)
public void importWithReplicationFromOption_AddsClusterObjectIdToReplicatedFromAttribute() throws AtlasBaseException, IOException {
AtlasImportRequest request = getImportRequestWithReplicationOption();
runImportWithParameters(importService, request, zipSource);
assertClusterInfo(REPLICATED_FROM_CLUSTER_NAME);
assertReplicationAttribute(Constants.ATTR_NAME_REPLICATED_FROM_CLUSTER);
}
private void assertReplicationAttribute(String attrNameReplication) throws AtlasBaseException {
AtlasEntity.AtlasEntitiesWithExtInfo entities = entityStore.getByIds(ImmutableList.of(DB_GUID, TABLE_GUID));
for (AtlasEntity e : entities.getEntities()) {
Object ex = e.getAttribute(attrNameReplication);
assertNotNull(ex);
List<String> clusterNameSyncType = (List) ex;
assertEquals(clusterNameSyncType.size(), 1);
}
}
private void assertClusterInfo(String name) {
AtlasCluster actual = clusterService.get(new AtlasCluster(name, name));
assertNotNull(actual);
assertEquals(actual.getName(), name);
}
private AtlasExportRequest getUpdateMetaInfoUpdateRequest() {
AtlasExportRequest request = getExportRequestWithReplicationOption();
request.getOptions().put(AtlasExportRequest.OPTION_KEY_REPLICATED_TO, REPLICATED_TO_CLUSTER_NAME);
return request;
}
private AtlasEntity.AtlasEntityWithExtInfo getEntities(ZipSource source, int expectedCount) {
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = new AtlasEntity.AtlasEntityWithExtInfo();
try {
int count = 0;
for(String s : source.getCreationOrder()) {
AtlasEntity entity = source.getByGuid(s);
entityWithExtInfo.addReferredEntity(s, entity);
count++;
}
assertEquals(count, expectedCount);
return entityWithExtInfo;
} catch (AtlasBaseException e) {
throw new SkipException("getEntities: failed!");
}
}
private AtlasExportRequest getExportRequestWithReplicationOption() {
try {
AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, EXPORT_REQUEST_FILE, AtlasExportRequest.class);
REPLICATED_TO_CLUSTER_NAME = (String) request.getOptions().get(OPTION_KEY_REPLICATED_TO);
return request;
} catch (IOException e) {
throw new SkipException(String.format("getExportRequestWithReplicationOption: '%s' could not be laoded.", EXPORT_REQUEST_FILE));
}
}
private AtlasImportRequest getImportRequestWithReplicationOption() {
try {
AtlasImportRequest request = TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, IMPORT_REQUEST_FILE, AtlasImportRequest.class);
REPLICATED_FROM_CLUSTER_NAME = request.getOptions().get(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM);
return request;
} catch (IOException e) {
throw new SkipException(String.format("getExportRequestWithReplicationOption: '%s' could not be laoded.", IMPORT_REQUEST_FILE));
}
}
}
...@@ -27,6 +27,8 @@ import org.apache.atlas.model.impexp.AtlasImportRequest; ...@@ -27,6 +27,8 @@ import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer; import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2; import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
...@@ -253,6 +255,7 @@ public class ZipFileResourceTestUtils { ...@@ -253,6 +255,7 @@ public class ZipFileResourceTestUtils {
public static void loadModelFromJson(String fileName, AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) throws IOException, AtlasBaseException { public static void loadModelFromJson(String fileName, AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) throws IOException, AtlasBaseException {
AtlasTypesDef typesFromJson = getAtlasTypesDefFromFile(fileName); AtlasTypesDef typesFromJson = getAtlasTypesDefFromFile(fileName);
addReplicationAttributes(typesFromJson);
createTypesAsNeeded(typesFromJson, typeDefStore, typeRegistry); createTypesAsNeeded(typesFromJson, typeDefStore, typeRegistry);
} }
...@@ -264,6 +267,17 @@ public class ZipFileResourceTestUtils { ...@@ -264,6 +267,17 @@ public class ZipFileResourceTestUtils {
} }
} }
private static void addReplicationAttributes(AtlasTypesDef typesFromJson) throws IOException {
AtlasEntityDef ed = typesFromJson.getEntityDefs().get(0);
if(!ed.getName().equals("Referenceable")) return;
String replAttr1Json = TestResourceFileUtils.getJson("stocksDB-Entities","replicationAttrs");
String replAttr2Json = StringUtils.replace(replAttr1Json, "From", "To");
ed.addAttribute(AtlasType.fromJson(replAttr1Json, AtlasStructDef.AtlasAttributeDef.class));
ed.addAttribute(AtlasType.fromJson(replAttr2Json, AtlasStructDef.AtlasAttributeDef.class));
}
public static void loadModelFromResourcesJson(String fileName, AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) throws IOException, AtlasBaseException { public static void loadModelFromResourcesJson(String fileName, AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) throws IOException, AtlasBaseException {
AtlasTypesDef typesFromJson = getAtlasTypesDefFromResourceFile(fileName); AtlasTypesDef typesFromJson = getAtlasTypesDefFromResourceFile(fileName);
createTypesAsNeeded(typesFromJson, typeDefStore, typeRegistry); createTypesAsNeeded(typesFromJson, typeDefStore, typeRegistry);
...@@ -332,14 +346,14 @@ public class ZipFileResourceTestUtils { ...@@ -332,14 +346,14 @@ public class ZipFileResourceTestUtils {
} }
public static void loadBaseModel(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) throws IOException, AtlasBaseException { public static void loadBaseModel(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) throws IOException, AtlasBaseException {
loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry); loadModelFromJson("0010-base_model.json", typeDefStore, typeRegistry);
} }
public static void loadFsModel(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) throws IOException, AtlasBaseException { public static void loadFsModel(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) throws IOException, AtlasBaseException {
loadModelFromJson("1000-Hadoop/0020-fs_model.json", typeDefStore, typeRegistry); loadModelFromJson("1020-fs_model.json", typeDefStore, typeRegistry);
} }
public static void loadHiveModel(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) throws IOException, AtlasBaseException { public static void loadHiveModel(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) throws IOException, AtlasBaseException {
loadModelFromJson("1000-Hadoop/0030-hive_model.json", typeDefStore, typeRegistry); loadModelFromJson("1030-hive_model.json", typeDefStore, typeRegistry);
} }
} }
{
"entity": {
"attributes": {
"clusterName": "cl1",
"description": null,
"location": "hdfs://localhost.localdomain:8020/apps/hive/warehouse/stocks_base.db",
"name": "stocks_base",
"owner": "anonymous",
"ownerType": "USER",
"parameters": null,
"qualifiedName": "stocks_base@cl1"
},
"classifications": [],
"createTime": 1528238690000,
"createdBy": "anonymous",
"guid": "1637a33e-6512-447b-ade7-249c8cb5344b",
"status": "ACTIVE",
"typeName": "hive_db",
"updateTime": 1528238690000,
"updatedBy": "anonymous",
"version": 0
},
"referredEntities": {}
}
\ No newline at end of file
{
"itemsToExport": [
{
"typeName": "hive_db", "uniqueAttributes": { "qualifiedName": "stocks_base@cl1" }
}
],
"options": {
"fetchType": "full",
"replicatedTo": "clTarget"
}
}
{
"itemsToExport": [
{
"typeName": "hive_db", "uniqueAttributes": { "qualifiedName": "stocks_base@cl1" }
}
],
"options": {
"fetchType": "incremental",
"skipLineage": true
}
}
{
"name": "replicatedFromCluster",
"typeName": "array<AtlasCluster>",
"cardinality": "SET",
"isIndexable": false,
"isOptional": true,
"isUnique": false
}
{
"entity": {
"attributes": {
"clusterName": "cl1",
"description": null,
"endTime": 1529605199906,
"inputs": [
{
"guid": "df122fc3-5555-40f8-a30f-3090b8a622f8",
"typeName": "hive_table"
}
],
"name": "create view stocks_view as select * from stocks_daily",
"operationType": "CREATEVIEW",
"outputs": [
{
"guid": "c54400d8-79b7-45ba-9c01-b662b36ceb0e",
"typeName": "hive_table"
}
],
"owner": null,
"qualifiedName": "stocks.stocks_view@cl1:1529605199000",
"queryGraph": null,
"queryId": "hive_20180621111959_72f4bec1-585f-4291-bc11-7a184a62a19b",
"queryPlan": "Not Supported",
"queryText": "create view stocks_view as select * from stocks_daily",
"recentQueries": [
"create view stocks_view as select * from stocks_daily"
],
"startTime": 1529605199002,
"userName": "anonymous"
},
"classifications": [],
"createTime": 1529605212691,
"createdBy": "anonymous",
"guid": "6f3b305a-c459-4ae4-b651-aee0deb0685f",
"status": "ACTIVE",
"typeName": "hive_process",
"updateTime": 1529605221342,
"updatedBy": "anonymous",
"version": 0
},
"referredEntities": {}
}
{
"entity": {
"attributes": {
"aliases": null,
"columns": [
{
"guid": "2d3f0239-03cc-41b2-b0cb-c924ff7d096d",
"typeName": "hive_column"
}
],
"comment": null,
"createTime": 1529605199000,
"db": {
"guid": "1637a33e-6512-447b-ade7-249c8cb5344b",
"typeName": "hive_db"
},
"description": null,
"lastAccessTime": 1529605199000,
"name": "stocks_view",
"owner": "anonymous",
"parameters": {
"transient_lastDdlTime": "1529605199"
},
"partitionKeys": null,
"qualifiedName": "stocks.stocks_view@cl1",
"retention": 0,
"sd": {
"guid": "56415119-7cb0-40dd-ace8-1e50efd54991",
"typeName": "hive_storagedesc"
},
"tableType": "VIRTUAL_VIEW",
"temporary": false,
"viewExpandedText": "select `stocks_daily`.`dt`, `stocks_daily`.`open`, `stocks_daily`.`high`, `stocks_daily`.`low`, `stocks_daily`.`close`, `stocks_daily`.`volume`, `stocks_daily`.`adj_close` from `stocks`.`stocks_daily`",
"viewOriginalText": "select * from stocks_daily"
},
"classifications": [],
"createTime": 1529605205448,
"createdBy": "anonymous",
"guid": "c54400d8-79b7-45ba-9c01-b662b36ceb0e",
"status": "ACTIVE",
"typeName": "hive_table",
"updateTime": 1529605221342,
"updatedBy": "anonymous",
"version": 0
},
"referredEntities": {
"2d3f0239-03cc-41b2-b0cb-c924ff7d096d": {
"attributes": {
"comment": null,
"description": null,
"name": "dt",
"owner": "anonymous",
"position": 0,
"qualifiedName": "stocks.stocks_view.dt@cl1",
"table": {
"guid": "c54400d8-79b7-45ba-9c01-b662b36ceb0e",
"typeName": "hive_table"
},
"type": "string"
},
"classifications": [],
"createTime": 1529605205448,
"createdBy": "anonymous",
"guid": "2d3f0239-03cc-41b2-b0cb-c924ff7d096d",
"status": "ACTIVE",
"typeName": "hive_column",
"updateTime": 1529605221342,
"updatedBy": "anonymous",
"version": 0
},
"56415119-7cb0-40dd-ace8-1e50efd54991": {
"attributes": {
"bucketCols": null,
"compressed": false,
"inputFormat": "org.apache.hadoop.mapred.SequenceFileInputFormat",
"location": null,
"numBuckets": -1,
"outputFormat": "org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat",
"parameters": null,
"qualifiedName": "stocks.stocks_view@cl1_storage",
"serdeInfo": {
"attributes": {
"name": null,
"parameters": null,
"serializationLib": null
},
"typeName": "hive_serde"
},
"sortCols": null,
"storedAsSubDirectories": false,
"table": {
"guid": "c54400d8-79b7-45ba-9c01-b662b36ceb0e",
"typeName": "hive_table"
}
},
"classifications": [],
"createTime": 1529605205448,
"createdBy": "anonymous",
"guid": "56415119-7cb0-40dd-ace8-1e50efd54991",
"status": "ACTIVE",
"typeName": "hive_storagedesc",
"updateTime": 1529605221342,
"updatedBy": "anonymous",
"version": 0
}
}
}
{
"classificationDefs": [
{
"attributeDefs": [],
"category": "CLASSIFICATION",
"createTime": 1528240995728,
"createdBy": "admin",
"description": "T1",
"guid": "29fc3eed-e97a-429c-ad9e-45d6ccb99fce",
"name": "T1",
"subTypes": [],
"superTypes": [],
"typeVersion": "1.0",
"updateTime": 1528240995728,
"updatedBy": "admin",
"version": 1
}
],
"entityDefs": [],
"enumDefs": [],
"structDefs": []
}
...@@ -36,7 +36,6 @@ import org.apache.atlas.model.impexp.AtlasImportRequest; ...@@ -36,7 +36,6 @@ import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.impexp.*; import org.apache.atlas.model.impexp.*;
import org.apache.atlas.model.metrics.AtlasMetrics; import org.apache.atlas.model.metrics.AtlasMetrics;
import org.apache.atlas.repository.clusterinfo.ClusterService;
import org.apache.atlas.repository.impexp.ExportService; import org.apache.atlas.repository.impexp.ExportService;
import org.apache.atlas.repository.impexp.ImportService; import org.apache.atlas.repository.impexp.ImportService;
import org.apache.atlas.repository.impexp.ZipSink; import org.apache.atlas.repository.impexp.ZipSink;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment