Commit 90958e83 by Ashutosh Mestry

ATLAS-2804: Export & Import Detailed Audits.

parent 41997f64
...@@ -262,7 +262,7 @@ ...@@ -262,7 +262,7 @@
"typeName": "string", "typeName": "string",
"cardinality": "SINGLE", "cardinality": "SINGLE",
"isIndexable": true, "isIndexable": true,
"isOptional": false, "isOptional": true,
"isUnique": false "isUnique": false
}, },
{ {
......
...@@ -48,6 +48,7 @@ public class AtlasExportRequest implements Serializable { ...@@ -48,6 +48,7 @@ public class AtlasExportRequest implements Serializable {
public static final String OPTION_FETCH_TYPE = "fetchType"; public static final String OPTION_FETCH_TYPE = "fetchType";
public static final String OPTION_ATTR_MATCH_TYPE = "matchType"; public static final String OPTION_ATTR_MATCH_TYPE = "matchType";
public static final String OPTION_KEY_REPLICATED_TO = "replicatedTo";
public static final String FETCH_TYPE_FULL = "full"; public static final String FETCH_TYPE_FULL = "full";
public static final String FETCH_TYPE_CONNECTED = "connected"; public static final String FETCH_TYPE_CONNECTED = "connected";
public static final String MATCH_TYPE_STARTS_WITH = "startsWith"; public static final String MATCH_TYPE_STARTS_WITH = "startsWith";
......
...@@ -61,6 +61,7 @@ public class AtlasExportResult implements Serializable { ...@@ -61,6 +61,7 @@ public class AtlasExportResult implements Serializable {
private Map<String, Integer> metrics; private Map<String, Integer> metrics;
private AtlasExportData data; private AtlasExportData data;
private OperationStatus operationStatus; private OperationStatus operationStatus;
private String sourceClusterName;
public AtlasExportResult() { public AtlasExportResult() {
...@@ -146,6 +147,13 @@ public class AtlasExportResult implements Serializable { ...@@ -146,6 +147,13 @@ public class AtlasExportResult implements Serializable {
public void setMetric(String key, int value) { public void setMetric(String key, int value) {
metrics.put(key, value); metrics.put(key, value);
} }
public String getSourceClusterName() {
return sourceClusterName;
}
public void setSourceClusterName(String sourceClusterName) {
this.sourceClusterName = sourceClusterName;
}
public void incrementMeticsCounter(String key) { public void incrementMeticsCounter(String key) {
incrementMeticsCounter(key, 1); incrementMeticsCounter(key, 1);
...@@ -167,6 +175,7 @@ public class AtlasExportResult implements Serializable { ...@@ -167,6 +175,7 @@ public class AtlasExportResult implements Serializable {
sb.append(", userName='").append(userName).append("'"); sb.append(", userName='").append(userName).append("'");
sb.append(", clientIpAddress='").append(clientIpAddress).append("'"); sb.append(", clientIpAddress='").append(clientIpAddress).append("'");
sb.append(", hostName='").append(hostName).append("'"); sb.append(", hostName='").append(hostName).append("'");
sb.append(", sourceCluster='").append(sourceClusterName).append("'");
sb.append(", timeStamp='").append(timeStamp).append("'"); sb.append(", timeStamp='").append(timeStamp).append("'");
sb.append(", metrics={"); sb.append(", metrics={");
AtlasBaseTypeDef.dumpObjects(metrics, sb); AtlasBaseTypeDef.dumpObjects(metrics, sb);
...@@ -230,12 +239,12 @@ public class AtlasExportResult implements Serializable { ...@@ -230,12 +239,12 @@ public class AtlasExportResult implements Serializable {
sb = new StringBuilder(); sb = new StringBuilder();
} }
sb.append("AtlasExportData{"); sb.append("AtlasExportData {");
sb.append("typesDef={").append(typesDef).append("}"); sb.append(", typesDef={").append(typesDef).append("}");
sb.append("entities={"); sb.append(", entities={");
AtlasBaseTypeDef.dumpObjects(entities, sb); AtlasBaseTypeDef.dumpObjects(entities, sb);
sb.append("}"); sb.append("}");
sb.append("entityCreationOrder={"); sb.append(", entityCreationOrder={");
AtlasBaseTypeDef.dumpObjects(entityCreationOrder, sb); AtlasBaseTypeDef.dumpObjects(entityCreationOrder, sb);
sb.append("}"); sb.append("}");
sb.append("}"); sb.append("}");
......
...@@ -40,6 +40,7 @@ public class AtlasImportRequest implements Serializable { ...@@ -40,6 +40,7 @@ public class AtlasImportRequest implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public static final String TRANSFORMS_KEY = "transforms"; public static final String TRANSFORMS_KEY = "transforms";
public static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom";
private static final String START_POSITION_KEY = "startPosition"; private static final String START_POSITION_KEY = "startPosition";
private static final String START_GUID_KEY = "startGuid"; private static final String START_GUID_KEY = "startGuid";
private static final String FILE_NAME_KEY = "fileName"; private static final String FILE_NAME_KEY = "fileName";
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package org.apache.atlas.repository.clusterinfo; package org.apache.atlas.repository.clusterinfo;
import org.apache.atlas.annotation.AtlasService; import org.apache.atlas.annotation.AtlasService;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.clusterinfo.AtlasCluster; import org.apache.atlas.model.clusterinfo.AtlasCluster;
import org.apache.atlas.repository.ogm.DataAccess; import org.apache.atlas.repository.ogm.DataAccess;
...@@ -48,7 +49,12 @@ public class ClusterService { ...@@ -48,7 +49,12 @@ public class ClusterService {
return null; return null;
} }
public AtlasCluster save(AtlasCluster clusterInfo) throws AtlasBaseException { @GraphTransaction
public AtlasCluster save(AtlasCluster clusterInfo) {
try {
return dataAccess.save(clusterInfo); return dataAccess.save(clusterInfo);
} catch (AtlasBaseException ex) {
return clusterInfo;
}
} }
} }
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.clusterinfo.AtlasCluster;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.impexp.ExportImportAuditEntry;
import org.apache.atlas.repository.clusterinfo.ClusterService;
import org.apache.atlas.type.AtlasType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.Map;
@Component
public class AuditHelper {
private static final Logger LOG = LoggerFactory.getLogger(AuditHelper.class);
private static final String CLUSTER_NAME_DEFAULT = "default";
private ClusterService clusterService;
private ExportImportAuditService auditService;
@Inject
public AuditHelper(ClusterService clusterService, ExportImportAuditService auditService) {
this.clusterService = clusterService;
this.auditService = auditService;
}
public void audit(String userName, AtlasExportResult result, long startTime, long endTime, boolean hadData) throws AtlasBaseException {
AtlasExportRequest request = result.getRequest();
AtlasCluster cluster = saveCluster(getCurrentClusterName());
String targetClusterName = getClusterNameFromOptions(request.getOptions(), AtlasExportRequest.OPTION_KEY_REPLICATED_TO);
addAuditEntry(userName,
cluster.getName(), targetClusterName,
ExportImportAuditEntry.OPERATION_EXPORT,
AtlasType.toJson(result), startTime, endTime, hadData);
}
public void audit(String userName, AtlasImportResult result, long startTime, long endTime, boolean hadData) throws AtlasBaseException {
AtlasImportRequest request = result.getRequest();
AtlasCluster cluster = saveCluster(getCurrentClusterName());
String sourceCluster = getClusterNameFromOptions(request.getOptions(), AtlasImportRequest.OPTION_KEY_REPLICATED_FROM);
addAuditEntry(userName,
sourceCluster, cluster.getName(),
ExportImportAuditEntry.OPERATION_EXPORT, AtlasType.toJson(result), startTime, endTime, hadData);
}
private String getClusterNameFromOptions(Map options, String key) {
return options.containsKey(key)
? (String) options.get(key)
: "";
}
private void addAuditEntry(String userName, String sourceCluster, String targetCluster, String operation,
String result, long startTime, long endTime, boolean hasData) throws AtlasBaseException {
if(!hasData) return;
ExportImportAuditEntry entry = new ExportImportAuditEntry();
entry.setUserName(userName);
entry.setSourceClusterName(sourceCluster);
entry.setTargetClusterName(targetCluster);
entry.setOperation(operation);
entry.setResultSummary(result);
entry.setStartTime(startTime);
entry.setEndTime(endTime);
auditService.save(entry);
LOG.info("addAuditEntry: user: {}, source: {}, target: {}, operation: {}", entry.getUserName(),
entry.getSourceClusterName(), entry.getTargetClusterName(), entry.getOperation());
}
private AtlasCluster saveCluster(String clusterName) throws AtlasBaseException {
AtlasCluster cluster = new AtlasCluster(clusterName, clusterName);
return clusterService.save(cluster);
}
public static String getCurrentClusterName() {
try {
return ApplicationProperties.get().getString(AtlasConstants.CLUSTER_NAME_KEY, CLUSTER_NAME_DEFAULT);
} catch (AtlasException e) {
LOG.error("getCurrentClusterName", e);
}
return "";
}
}
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package org.apache.atlas.repository.impexp; package org.apache.atlas.repository.impexp;
import org.apache.atlas.annotation.AtlasService; import org.apache.atlas.annotation.AtlasService;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.discovery.AtlasDiscoveryService; import org.apache.atlas.discovery.AtlasDiscoveryService;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.AtlasSearchResult; import org.apache.atlas.model.discovery.AtlasSearchResult;
...@@ -47,6 +48,7 @@ public class ExportImportAuditService { ...@@ -47,6 +48,7 @@ public class ExportImportAuditService {
this.discoveryService = discoveryService; this.discoveryService = discoveryService;
} }
@GraphTransaction
public void save(ExportImportAuditEntry entry) throws AtlasBaseException { public void save(ExportImportAuditEntry entry) throws AtlasBaseException {
dataAccess.saveNoLoad(entry); dataAccess.saveNoLoad(entry);
} }
......
...@@ -69,16 +69,18 @@ public class ExportService { ...@@ -69,16 +69,18 @@ public class ExportService {
private static final Logger LOG = LoggerFactory.getLogger(ExportService.class); private static final Logger LOG = LoggerFactory.getLogger(ExportService.class);
private final AtlasTypeRegistry typeRegistry; private final AtlasTypeRegistry typeRegistry;
private AuditHelper auditHelper;
private final AtlasGraph atlasGraph; private final AtlasGraph atlasGraph;
private final EntityGraphRetriever entityGraphRetriever; private final EntityGraphRetriever entityGraphRetriever;
private final AtlasGremlinQueryProvider gremlinQueryProvider; private final AtlasGremlinQueryProvider gremlinQueryProvider;
@Inject @Inject
public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph) throws AtlasBaseException { public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, AuditHelper auditHelper) {
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry); this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry);
this.atlasGraph = atlasGraph; this.atlasGraph = atlasGraph;
this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE; this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE;
this.auditHelper = auditHelper;
} }
public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName, public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName,
...@@ -93,7 +95,8 @@ public class ExportService { ...@@ -93,7 +95,8 @@ public class ExportService {
AtlasExportResult.OperationStatus[] statuses = processItems(request, context); AtlasExportResult.OperationStatus[] statuses = processItems(request, context);
processTypesDef(context); processTypesDef(context);
updateSinkWithOperationMetrics(context, statuses, getOperationDuration(startTime)); long endTime = System.currentTimeMillis();
updateSinkWithOperationMetrics(userName, context, statuses, startTime, endTime);
} catch(Exception ex) { } catch(Exception ex) {
LOG.error("Operation failed: ", ex); LOG.error("Operation failed: ", ex);
} finally { } finally {
...@@ -106,10 +109,16 @@ public class ExportService { ...@@ -106,10 +109,16 @@ public class ExportService {
return context.result; return context.result;
} }
private void updateSinkWithOperationMetrics(ExportContext context, AtlasExportResult.OperationStatus[] statuses, int duration) throws AtlasBaseException { private void updateSinkWithOperationMetrics(String userName, ExportContext context,
AtlasExportResult.OperationStatus[] statuses,
long startTime, long endTime) throws AtlasBaseException {
int duration = getOperationDuration(startTime, endTime);
context.result.setSourceClusterName(AuditHelper.getCurrentClusterName());
context.result.getData().getEntityCreationOrder().addAll(context.lineageProcessed); context.result.getData().getEntityCreationOrder().addAll(context.lineageProcessed);
context.sink.setExportOrder(context.result.getData().getEntityCreationOrder()); context.sink.setExportOrder(context.result.getData().getEntityCreationOrder());
context.sink.setTypesDef(context.result.getData().getTypesDef()); context.sink.setTypesDef(context.result.getData().getTypesDef());
auditHelper.audit(userName, context.result, startTime, endTime,
!context.result.getData().getEntityCreationOrder().isEmpty());
clearContextData(context); clearContextData(context);
context.result.setOperationStatus(getOverallOperationStatus(statuses)); context.result.setOperationStatus(getOverallOperationStatus(statuses));
context.result.incrementMeticsCounter("duration", duration); context.result.incrementMeticsCounter("duration", duration);
...@@ -120,8 +129,8 @@ public class ExportService { ...@@ -120,8 +129,8 @@ public class ExportService {
context.result.setData(null); context.result.setData(null);
} }
private int getOperationDuration(long startTime) { private int getOperationDuration(long startTime, long endTime) {
return (int) (System.currentTimeMillis() - startTime); return (int) (endTime - startTime);
} }
private void processTypesDef(ExportContext context) { private void processTypesDef(ExportContext context) {
......
...@@ -47,15 +47,17 @@ public class ImportService { ...@@ -47,15 +47,17 @@ public class ImportService {
private final AtlasTypeDefStore typeDefStore; private final AtlasTypeDefStore typeDefStore;
private final AtlasTypeRegistry typeRegistry; private final AtlasTypeRegistry typeRegistry;
private final BulkImporter bulkImporter; private final BulkImporter bulkImporter;
private AuditHelper auditHelper;
private long startTimestamp; private long startTimestamp;
private long endTimestamp; private long endTimestamp;
@Inject @Inject
public ImportService(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry, BulkImporter bulkImporter) { public ImportService(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry, BulkImporter bulkImporter, AuditHelper auditHelper) {
this.typeDefStore = typeDefStore; this.typeDefStore = typeDefStore;
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.bulkImporter = bulkImporter; this.bulkImporter = bulkImporter;
this.auditHelper = auditHelper;
} }
public AtlasImportResult run(ZipSource source, String userName, public AtlasImportResult run(ZipSource source, String userName,
...@@ -81,8 +83,7 @@ public class ImportService { ...@@ -81,8 +83,7 @@ public class ImportService {
startTimestamp = System.currentTimeMillis(); startTimestamp = System.currentTimeMillis();
processTypes(source.getTypesDef(), result); processTypes(source.getTypesDef(), result);
setStartPosition(request, source); setStartPosition(request, source);
processEntities(source, result); processEntities(userName, source, result);
result.setOperationStatus(AtlasImportResult.OperationStatus.SUCCESS); result.setOperationStatus(AtlasImportResult.OperationStatus.SUCCESS);
} catch (AtlasBaseException excp) { } catch (AtlasBaseException excp) {
...@@ -183,10 +184,15 @@ public class ImportService { ...@@ -183,10 +184,15 @@ public class ImportService {
importTypeDefProcessor.processTypes(typeDefinitionMap, result); importTypeDefProcessor.processTypes(typeDefinitionMap, result);
} }
private void processEntities(ZipSource importSource, AtlasImportResult result) throws AtlasBaseException { private void processEntities(String userName, ZipSource importSource, AtlasImportResult result) throws AtlasBaseException {
this.bulkImporter.bulkImport(importSource, result); this.bulkImporter.bulkImport(importSource, result);
endTimestamp = System.currentTimeMillis(); endTimestamp = System.currentTimeMillis();
result.incrementMeticsCounter("duration", (int) (this.endTimestamp - this.startTimestamp)); result.incrementMeticsCounter("duration", getDuration(this.endTimestamp, this.startTimestamp));
auditHelper.audit(userName, result, startTimestamp, endTimestamp, !importSource.getCreationOrder().isEmpty());
}
private int getDuration(long endTime, long startTime) {
return (int) (endTime - startTime);
} }
} }
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import com.google.inject.Inject;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.AtlasSearchResult;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
public class ExportImportTestBase {
protected void assertAuditEntry(ExportImportAuditService auditService) {
AtlasSearchResult result = null;
try {
result = auditService.get("", "", getCurrentCluster(), "", "", "", 10, 0);
} catch (AtlasBaseException e) {
fail("auditService.get: failed!");
} catch (AtlasException e) {
fail("getCurrentCluster: failed!");
}
assertNotNull(result);
assertNotNull(result.getEntities());
assertTrue(result.getEntities().size() > 0);
}
private String getCurrentCluster() throws AtlasException {
return ApplicationProperties.get().getString(AtlasConstants.CLUSTER_NAME_KEY, "default");
}
}
...@@ -63,9 +63,10 @@ import static org.mockito.Mockito.mock; ...@@ -63,9 +63,10 @@ import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@Guice(modules = TestModules.TestOnlyModule.class) @Guice(modules = TestModules.TestOnlyModule.class)
public class ExportServiceTest { public class ExportServiceTest extends ExportImportTestBase {
private static final Logger LOG = LoggerFactory.getLogger(ExportServiceTest.class); private static final Logger LOG = LoggerFactory.getLogger(ExportServiceTest.class);
@Inject @Inject
...@@ -76,16 +77,22 @@ public class ExportServiceTest { ...@@ -76,16 +77,22 @@ public class ExportServiceTest {
@Inject @Inject
private EntityGraphMapper graphMapper; private EntityGraphMapper graphMapper;
@Inject @Inject
ExportService exportService; ExportService exportService;
@Inject
private ExportImportAuditService auditService;
private DeleteHandlerV1 deleteHandler = mock(SoftDeleteHandlerV1.class);; private DeleteHandlerV1 deleteHandler = mock(SoftDeleteHandlerV1.class);;
private AtlasEntityChangeNotifier mockChangeNotifier = mock(AtlasEntityChangeNotifier.class); private AtlasEntityChangeNotifier mockChangeNotifier = mock(AtlasEntityChangeNotifier.class);
private AtlasEntityStoreV2 entityStore; private AtlasEntityStoreV2 entityStore;
@BeforeTest @BeforeTest
public void setupTest() { public void setupTest() throws IOException, AtlasBaseException {
RequestContext.clear(); RequestContext.clear();
RequestContext.get().setUser(TestUtilsV2.TEST_USER, null); RequestContext.get().setUser(TestUtilsV2.TEST_USER, null);
ZipFileResourceTestUtils.loadBaseModel(typeDefStore, typeRegistry);
} }
@BeforeClass @BeforeClass
...@@ -108,6 +115,8 @@ public class ExportServiceTest { ...@@ -108,6 +115,8 @@ public class ExportServiceTest {
@AfterClass @AfterClass
public void clear() throws Exception { public void clear() throws Exception {
Thread.sleep(1000);
assertAuditEntry(auditService);
AtlasGraphProvider.cleanup(); AtlasGraphProvider.cleanup();
if (useLocalSolr()) { if (useLocalSolr()) {
...@@ -202,6 +211,7 @@ public class ExportServiceTest { ...@@ -202,6 +211,7 @@ public class ExportServiceTest {
assertEquals(result.getHostName(), hostName); assertEquals(result.getHostName(), hostName);
assertEquals(result.getClientIpAddress(), requestingIP); assertEquals(result.getClientIpAddress(), requestingIP);
assertEquals(request, result.getRequest()); assertEquals(request, result.getRequest());
assertNotNull(result.getSourceClusterName());
} }
@Test @Test
......
...@@ -40,6 +40,7 @@ import org.slf4j.Logger; ...@@ -40,6 +40,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.testng.ITestContext; import org.testng.ITestContext;
import org.testng.annotations.AfterClass; import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest; import org.testng.annotations.BeforeTest;
import org.testng.annotations.DataProvider; import org.testng.annotations.DataProvider;
import org.testng.annotations.Guice; import org.testng.annotations.Guice;
...@@ -51,16 +52,22 @@ import java.util.List; ...@@ -51,16 +52,22 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr; import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.*; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getDefaultImportRequest;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromJson;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromResourcesJson;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runAndVerifyQuickStart_v1_Import;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithParameters;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertFalse;
@Guice(modules = TestModules.TestOnlyModule.class) @Guice(modules = TestModules.TestOnlyModule.class)
public class ImportServiceTest { public class ImportServiceTest extends ExportImportTestBase {
private static final Logger LOG = LoggerFactory.getLogger(ImportServiceTest.class); private static final Logger LOG = LoggerFactory.getLogger(ImportServiceTest.class);
private static final int DEFAULT_LIMIT = 25; private static final int DEFAULT_LIMIT = 25;
private final ImportService importService; private final ImportService importService;
...@@ -78,6 +85,9 @@ public class ImportServiceTest { ...@@ -78,6 +85,9 @@ public class ImportServiceTest {
AtlasEntityStore entityStore; AtlasEntityStore entityStore;
@Inject @Inject
private ExportImportAuditService auditService;
@Inject
public ImportServiceTest(ImportService importService) { public ImportServiceTest(ImportService importService) {
this.importService = importService; this.importService = importService;
} }
...@@ -97,6 +107,11 @@ public class ImportServiceTest { ...@@ -97,6 +107,11 @@ public class ImportServiceTest {
} }
} }
@AfterTest
public void postTest() {
assertAuditEntry(auditService);
}
@DataProvider(name = "sales") @DataProvider(name = "sales")
public static Object[][] getDataFromQuickStart_v1_Sales(ITestContext context) throws IOException { public static Object[][] getDataFromQuickStart_v1_Sales(ITestContext context) throws IOException {
return getZipSource("sales-v1-full.zip"); return getZipSource("sales-v1-full.zip");
...@@ -340,7 +355,7 @@ public class ImportServiceTest { ...@@ -340,7 +355,7 @@ public class ImportServiceTest {
@Test @Test
public void importServiceProcessesIOException() { public void importServiceProcessesIOException() {
ImportService importService = new ImportService(typeDefStore, typeRegistry, null); ImportService importService = new ImportService(typeDefStore, typeRegistry, null, null);
AtlasImportRequest req = mock(AtlasImportRequest.class); AtlasImportRequest req = mock(AtlasImportRequest.class);
Answer<Map> answer = invocationOnMock -> { Answer<Map> answer = invocationOnMock -> {
......
...@@ -43,7 +43,7 @@ public class ComplexAttributesTest extends MigrationBaseAsserts { ...@@ -43,7 +43,7 @@ public class ComplexAttributesTest extends MigrationBaseAsserts {
String ENTITY_TYPE = "entity_type"; String ENTITY_TYPE = "entity_type";
String ENTITY_WITH_COMPLEX_COLL_TYPE = "entity_with_complex_collection_attr"; String ENTITY_WITH_COMPLEX_COLL_TYPE = "entity_with_complex_collection_attr";
final int EXPECTED_TOTAL_COUNT = 216; final int EXPECTED_TOTAL_COUNT = 217;
final int EXPECTED_ENTITY_TYPE_COUNT = 16; final int EXPECTED_ENTITY_TYPE_COUNT = 16;
final int EXPECTED_STRUCT_TYPE_COUNT = 3; final int EXPECTED_STRUCT_TYPE_COUNT = 3;
final int EXPECTED_ENTITY_WITH_COMPLEX_COLL_TYPE_COUNT = 1; final int EXPECTED_ENTITY_WITH_COMPLEX_COLL_TYPE_COUNT = 1;
......
...@@ -38,7 +38,7 @@ public class HiveStocksTest extends MigrationBaseAsserts { ...@@ -38,7 +38,7 @@ public class HiveStocksTest extends MigrationBaseAsserts {
@Test @Test
public void migrateStocks() throws AtlasBaseException, IOException { public void migrateStocks() throws AtlasBaseException, IOException {
final int EXPECTED_TOTAL_COUNT = 190; final int EXPECTED_TOTAL_COUNT = 191;
final int EXPECTED_DB_COUNT = 1; final int EXPECTED_DB_COUNT = 1;
final int EXPECTED_TABLE_COUNT = 1; final int EXPECTED_TABLE_COUNT = 1;
final int EXPECTED_COLUMN_COUNT = 7; final int EXPECTED_COLUMN_COUNT = 7;
......
...@@ -28,6 +28,7 @@ import org.apache.atlas.authorize.AtlasPrivilege; ...@@ -28,6 +28,7 @@ import org.apache.atlas.authorize.AtlasPrivilege;
import org.apache.atlas.authorize.AtlasAuthorizationUtils; import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.discovery.SearchContext; import org.apache.atlas.discovery.SearchContext;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.clusterinfo.AtlasCluster;
import org.apache.atlas.model.discovery.AtlasSearchResult; import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult; import org.apache.atlas.model.impexp.AtlasExportResult;
...@@ -35,6 +36,7 @@ import org.apache.atlas.model.impexp.AtlasImportRequest; ...@@ -35,6 +36,7 @@ import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.impexp.*; import org.apache.atlas.model.impexp.*;
import org.apache.atlas.model.metrics.AtlasMetrics; import org.apache.atlas.model.metrics.AtlasMetrics;
import org.apache.atlas.repository.clusterinfo.ClusterService;
import org.apache.atlas.repository.impexp.ExportService; import org.apache.atlas.repository.impexp.ExportService;
import org.apache.atlas.repository.impexp.ImportService; import org.apache.atlas.repository.impexp.ImportService;
import org.apache.atlas.repository.impexp.ZipSink; import org.apache.atlas.repository.impexp.ZipSink;
...@@ -80,12 +82,19 @@ import javax.ws.rs.core.MediaType; ...@@ -80,12 +82,19 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.*; import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
/** /**
* Jersey Resource for admin operations * Jersey Resource for admin operations.
*/ */
@Path("admin") @Path("admin")
@Singleton @Singleton
...@@ -121,7 +130,8 @@ public class AdminResource { ...@@ -121,7 +130,8 @@ public class AdminResource {
private final AtlasTypeRegistry typeRegistry; private final AtlasTypeRegistry typeRegistry;
private final MigrationProgressService migrationProgressService; private final MigrationProgressService migrationProgressService;
private final ReentrantLock importExportOperationLock; private final ReentrantLock importExportOperationLock;
private ExportImportAuditService exportImportAuditService; private final ExportImportAuditService exportImportAuditService;
private final ClusterService clusterService;
static { static {
try { try {
...@@ -135,6 +145,7 @@ public class AdminResource { ...@@ -135,6 +145,7 @@ public class AdminResource {
public AdminResource(ServiceState serviceState, MetricsService metricsService, AtlasTypeRegistry typeRegistry, public AdminResource(ServiceState serviceState, MetricsService metricsService, AtlasTypeRegistry typeRegistry,
ExportService exportService, ImportService importService, SearchTracker activeSearches, ExportService exportService, ImportService importService, SearchTracker activeSearches,
MigrationProgressService migrationProgressService, MigrationProgressService migrationProgressService,
ClusterService clusterService,
ExportImportAuditService exportImportAuditService) { ExportImportAuditService exportImportAuditService) {
this.serviceState = serviceState; this.serviceState = serviceState;
this.metricsService = metricsService; this.metricsService = metricsService;
...@@ -143,8 +154,9 @@ public class AdminResource { ...@@ -143,8 +154,9 @@ public class AdminResource {
this.activeSearches = activeSearches; this.activeSearches = activeSearches;
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.migrationProgressService = migrationProgressService; this.migrationProgressService = migrationProgressService;
this.clusterService = clusterService;
this.exportImportAuditService = exportImportAuditService; this.exportImportAuditService = exportImportAuditService;
importExportOperationLock = new ReentrantLock(); this.importExportOperationLock = new ReentrantLock();
} }
/** /**
...@@ -436,6 +448,33 @@ public class AdminResource { ...@@ -436,6 +448,33 @@ public class AdminResource {
return result; return result;
} }
/**
* Fetch details of a cluster.
* @param clusterName name of target cluster with which it is paired
* @param entityQualifiedName qualified name of top level entity
* @return AtlasCluster
* @throws AtlasBaseException
*/
@GET
@Path("/cluster/{clusterName}")
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE)
public AtlasCluster getCluster(@PathParam("clusterName") String clusterName,
@QueryParam("entity") String entityQualifiedName) throws AtlasBaseException {
AtlasPerfTracer perf = null;
try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "cluster.getCluster(" + clusterName + ")");
}
AtlasCluster cluster = new AtlasCluster(clusterName, clusterName);
return clusterService.get(cluster);
} finally {
AtlasPerfTracer.log(perf);
}
}
@GET @GET
@Path("/expimp/audit") @Path("/expimp/audit")
@Consumes(Servlets.JSON_MEDIA_TYPE) @Consumes(Servlets.JSON_MEDIA_TYPE)
......
...@@ -51,7 +51,7 @@ public class AdminResourceTest { ...@@ -51,7 +51,7 @@ public class AdminResourceTest {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE); when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null); AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null);
Response response = adminResource.getStatus(); Response response = adminResource.getStatus();
assertEquals(response.getStatus(), HttpServletResponse.SC_OK); assertEquals(response.getStatus(), HttpServletResponse.SC_OK);
JsonNode entity = AtlasJson.parseToV1JsonNode((String) response.getEntity()); JsonNode entity = AtlasJson.parseToV1JsonNode((String) response.getEntity());
...@@ -62,7 +62,7 @@ public class AdminResourceTest { ...@@ -62,7 +62,7 @@ public class AdminResourceTest {
public void testResourceGetsValueFromServiceState() throws IOException { public void testResourceGetsValueFromServiceState() throws IOException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null); AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null);
Response response = adminResource.getStatus(); Response response = adminResource.getStatus();
verify(serviceState).getState(); verify(serviceState).getState();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment