Commit 1d85e95f by ashutoshm Committed by Madhan Neethiraj

ATLAS-1503: optimization of import API implementation

parent 852a7118
......@@ -88,6 +88,7 @@ public enum AtlasErrorCode {
INDEX_CREATION_FAILED(500, "ATLAS5002E", "Index creation failed for {0}"),
INDEX_ROLLBACK_FAILED(500, "ATLAS5003E", "Index rollback failed for {0}"),
FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK(500, "ATLAS5004E", "Failed to get the lock; another type update might be in progress. Please try again"),
FAILED_TO_OBTAIN_IMPORT_EXPORT_LOCK(500, "ATLAS5005E", "Another import or export is in progress. Please try again"),
NOTIFICATION_FAILED(500, "ATLAS5005E", "Failed to notify for change {0}");
private String errorCode;
......
......@@ -26,7 +26,9 @@ import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE;
......@@ -50,6 +52,7 @@ public class AtlasImportResult {
private String hostName;
private long timeStamp;
private Map<String, Integer> metrics;
private List<String> processedEntities;
private OperationStatus operationStatus;
public AtlasImportResult() {
......@@ -65,6 +68,7 @@ public class AtlasImportResult {
this.timeStamp = timeStamp;
this.metrics = new HashMap<>();
this.operationStatus = OperationStatus.FAIL;
this.processedEntities = new ArrayList<>();
}
public AtlasImportRequest getRequest() {
......@@ -133,6 +137,10 @@ public class AtlasImportResult {
metrics.put(key, currentValue + incrementBy);
}
public void setProcessedEntities(List<String> processedEntities) { this.processedEntities = processedEntities; }
public List<String> getProcessedEntities() { return this.processedEntities; }
public StringBuilder toString(StringBuilder sb) {
if (sb == null) {
sb = new StringBuilder();
......@@ -149,6 +157,9 @@ public class AtlasImportResult {
sb.append("}");
sb.append(", operationStatus='").append(operationStatus).append("'");
sb.append(", processedEntities=[");
AtlasBaseTypeDef.dumpObjects(processedEntities, sb);
sb.append("]");
sb.append("}");
return sb;
......
......@@ -46,7 +46,7 @@ public class EntityMutations implements Serializable {
CREATE,
UPDATE,
PARTIAL_UPDATE,
DELETE,
DELETE
}
public static final class EntityMutation implements Serializable {
......
......@@ -562,6 +562,7 @@ public final class TestUtilsV2 {
AtlasTypeUtil.createUniqueRequiredAttrDef(NAME, "string"),
AtlasTypeUtil.createOptionalAttrDef("isReplicated", "boolean"),
AtlasTypeUtil.createOptionalAttrDef("created", "string"),
AtlasTypeUtil.createOptionalAttrDef("parameters", "map<string,string>"),
AtlasTypeUtil.createRequiredAttrDef("description", "string"));
......
......@@ -19,6 +19,7 @@ package org.apache.atlas.repository.store.graph;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
......@@ -69,6 +70,14 @@ public interface AtlasEntityStore {
EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate) throws AtlasBaseException;
/**
* Create or update entities in the stream using repeated commits of connected entities
* @param entityStream AtlasEntityStream
* @return EntityMutationResponse Entity mutations operations with the corresponding set of entities on which these operations were performed
* @throws AtlasBaseException
*/
EntityMutationResponse bulkImport(EntityStream entityStream, AtlasImportResult importResult) throws AtlasBaseException;
/**
* Update a single entity
* @param entityType type of the entity
* @param uniqAttributes Attributes that uniquely identify the entity
......
......@@ -24,13 +24,10 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.GraphTransaction;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.*;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
......@@ -43,10 +40,9 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.*;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.*;
@Singleton
......@@ -130,6 +126,65 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
}
@Override
public EntityMutationResponse bulkImport(EntityStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> bulkImport()");
}
if (entityStream == null || !entityStream.hasNext()) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
}
EntityMutationResponse ret = new EntityMutationResponse();
ret.setGuidAssignments(new HashMap<String, String>());
Set<String> processedGuids = new HashSet<>();
int progressReportedAtCount = 0;
while (entityStream.hasNext()) {
AtlasEntity entity = entityStream.next();
if(processedGuids.contains(entity.getGuid())) {
continue;
}
AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entity, entityStream);
EntityMutationResponse resp = createOrUpdate(oneEntityStream, false);
updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
if ((processedGuids.size() - progressReportedAtCount) > 10) {
progressReportedAtCount = processedGuids.size();
LOG.info("bulkImport(): in progress.. number of entities imported: {}", progressReportedAtCount);
}
if (resp.getGuidAssignments() != null) {
ret.getGuidAssignments().putAll(resp.getGuidAssignments());
}
}
importResult.getProcessedEntities().addAll(processedGuids);
LOG.info("bulkImport(): done. Number of entities imported: {}", processedGuids.size());
return ret;
}
private void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
if (list == null) {
return;
}
for (AtlasEntityHeader h : list) {
processedGuids.add(h.getGuid());
importResult.incrementMeticsCounter(String.format(prefix, h.getTypeName()));
}
}
@Override
@GraphTransaction
public EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
......@@ -323,11 +378,11 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
deleteHandler.deleteEntities(deletionCandidates);
RequestContextV1 req = RequestContextV1.get();
for (AtlasObjectId id : req.getDeletedEntityIds()) {
response.addEntity(EntityMutations.EntityOperation.DELETE, EntityGraphMapper.constructHeader(id));
response.addEntity(DELETE, EntityGraphMapper.constructHeader(id));
}
for (AtlasObjectId id : req.getUpdatedEntityIds()) {
response.addEntity(EntityMutations.EntityOperation.UPDATE, EntityGraphMapper.constructHeader(id));
response.addEntity(UPDATE, EntityGraphMapper.constructHeader(id));
}
return response;
......
......@@ -24,11 +24,10 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import java.util.Iterator;
public class AtlasEntityStream implements EntityStream {
private AtlasEntitiesWithExtInfo entitiesWithExtInfo = new AtlasEntitiesWithExtInfo();
private final AtlasEntitiesWithExtInfo entitiesWithExtInfo;
private final EntityStream entityStream;
private Iterator<AtlasEntity> iterator;
public AtlasEntityStream() {
}
public AtlasEntityStream(AtlasEntity entity) {
this(new AtlasEntitiesWithExtInfo(entity));
......@@ -41,6 +40,13 @@ public class AtlasEntityStream implements EntityStream {
public AtlasEntityStream(AtlasEntitiesWithExtInfo entitiesWithExtInfo) {
this.entitiesWithExtInfo = entitiesWithExtInfo;
this.iterator = this.entitiesWithExtInfo.getEntities().iterator();
this.entityStream = null;
}
public AtlasEntityStream(AtlasEntity entity, EntityStream entityStream) {
this.entitiesWithExtInfo = new AtlasEntitiesWithExtInfo(entity);
this.iterator = this.entitiesWithExtInfo.getEntities().iterator();
this.entityStream = entityStream;
}
@Override
......@@ -60,7 +66,7 @@ public class AtlasEntityStream implements EntityStream {
@Override
public AtlasEntity getByGuid(String guid) {
return entitiesWithExtInfo.getEntity(guid);
return entityStream != null ? entityStream.getByGuid(guid) : entitiesWithExtInfo.getEntity(guid);
}
@Override
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v1;
import org.apache.atlas.model.instance.AtlasEntity;
public class AtlasEntityStreamForImport extends AtlasEntityStream implements EntityImportStream {
public AtlasEntityStreamForImport(AtlasEntity entity) {
super(entity);
}
public AtlasEntityStreamForImport(AtlasEntity entity, EntityStream entityStream) {
super(entity, entityStream);
}
}
......@@ -739,28 +739,18 @@ public class EntityGraphMapper {
private AtlasEntityHeader constructHeader(AtlasEntity entity, final AtlasEntityType type, AtlasVertex vertex) {
//TODO - enhance to return only selective attributes
AtlasEntityHeader header = new AtlasEntityHeader(entity.getTypeName(), AtlasGraphUtilsV1.getIdFromVertex(vertex), entity.getAttributes());
final Map<String, AtlasStructType.AtlasAttribute> allAttributes = type.getAllAttributes();
for (String attribute : allAttributes.keySet()) {
AtlasType attributeType = allAttributes.get(attribute).getAttributeType();
AtlasAttributeDef attributeDef = allAttributes.get(attribute).getAttributeDef();
if ( header.getAttribute(attribute) == null && (TypeCategory.PRIMITIVE == attributeType.getTypeCategory())) {
if ( attributeDef.getIsOptional()) {
header.setAttribute(attribute, attributeType.createOptionalDefaultValue());
} else {
header.setAttribute(attribute, attributeType.createDefaultValue());
}
}
AtlasEntityHeader header = new AtlasEntityHeader(entity.getTypeName());
header.setGuid(AtlasGraphUtilsV1.getIdFromVertex(vertex));
for (AtlasAttribute attribute : type.getUniqAttributes().values()) {
header.setAttribute(attribute.getName(), entity.getAttribute(attribute.getName()));
}
return header;
}
public static AtlasEntityHeader constructHeader(AtlasObjectId id) {
AtlasEntityHeader entity = new AtlasEntityHeader(id.getTypeName());
entity.setGuid(id.getGuid());
return entity;
return new AtlasEntityHeader(id.getTypeName(), id.getGuid(), id.getUniqueAttributes());
}
}
......@@ -74,6 +74,7 @@ import static org.apache.atlas.TestUtils.NAME;
import static org.apache.atlas.TestUtils.randomString;
import static org.apache.atlas.TestUtilsV2.TABLE_TYPE;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@Guice(modules = RepositoryMetadataModule.class)
public class AtlasEntityStoreV1Test {
......@@ -199,8 +200,8 @@ public class AtlasEntityStoreV1Test {
init();
EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
AtlasEntityHeader updatedTable = response.getFirstUpdatedEntityByTypeName(tableEntity.getTypeName());
validateEntity(entitiesInfo, getEntityFromStore(updatedTable));
AtlasEntityHeader updatedTableHeader = response.getFirstUpdatedEntityByTypeName(tableEntity.getTypeName());
validateEntity(entitiesInfo, getEntityFromStore(updatedTableHeader));
//Complete update. Add array elements - col3,col4
AtlasEntity col3 = TestUtilsV2.createColumnEntity(tableEntity);
......@@ -219,8 +220,8 @@ public class AtlasEntityStoreV1Test {
init();
response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
updatedTable = response.getFirstUpdatedEntityByTypeName(tableEntity.getTypeName());
validateEntity(entitiesInfo, getEntityFromStore(updatedTable));
updatedTableHeader = response.getFirstUpdatedEntityByTypeName(tableEntity.getTypeName());
validateEntity(entitiesInfo, getEntityFromStore(updatedTableHeader));
//Swap elements
columns.clear();
......@@ -231,8 +232,10 @@ public class AtlasEntityStoreV1Test {
init();
response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
updatedTable = response.getFirstUpdatedEntityByTypeName(tableEntity.getTypeName());
Assert.assertEquals(((List<AtlasObjectId>) updatedTable.getAttribute(COLUMNS_ATTR_NAME)).size(), 2);
updatedTableHeader = response.getFirstUpdatedEntityByTypeName(tableEntity.getTypeName());
AtlasEntity updatedEntity = getEntityFromStore(updatedTableHeader);
// deleted columns are also included in "columns" attribute
Assert.assertTrue(((List<AtlasObjectId>) updatedEntity.getAttribute(COLUMNS_ATTR_NAME)).size() >= 2);
assertEquals(response.getEntitiesByOperation(EntityMutations.EntityOperation.DELETE).size(), 2); // col1, col2 are deleted
......@@ -242,8 +245,8 @@ public class AtlasEntityStoreV1Test {
init();
response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
updatedTable = response.getFirstUpdatedEntityByTypeName(tableEntity.getTypeName());
validateEntity(entitiesInfo, getEntityFromStore(updatedTable));
updatedTableHeader = response.getFirstUpdatedEntityByTypeName(tableEntity.getTypeName());
validateEntity(entitiesInfo, getEntityFromStore(updatedTableHeader));
assertEquals(response.getEntitiesByOperation(EntityMutations.EntityOperation.DELETE).size(), 2);
}
......@@ -261,9 +264,10 @@ public class AtlasEntityStoreV1Test {
EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
AtlasEntityHeader tableDefinition1 = response.getFirstUpdatedEntityByTypeName(TABLE_TYPE);
validateEntity(entitiesInfo, getEntityFromStore(tableDefinition1));
AtlasEntity updatedTableDef1 = getEntityFromStore(tableDefinition1);
validateEntity(entitiesInfo, updatedTableDef1);
Assert.assertTrue(partsMap.get("part0").equals(((Map<String, AtlasStruct>) tableDefinition1.getAttribute("partitionsMap")).get("part0")));
Assert.assertTrue(partsMap.get("part0").equals(((Map<String, AtlasStruct>) updatedTableDef1.getAttribute("partitionsMap")).get("part0")));
//update map - add a map key
partsMap.put("part1", new AtlasStruct(TestUtils.PARTITION_STRUCT_TYPE, TestUtilsV2.NAME, "test1"));
......@@ -273,10 +277,11 @@ public class AtlasEntityStoreV1Test {
response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
AtlasEntityHeader tableDefinition2 = response.getFirstUpdatedEntityByTypeName(TABLE_TYPE);
validateEntity(entitiesInfo, getEntityFromStore(tableDefinition2));
AtlasEntity updatedTableDef2 = getEntityFromStore(tableDefinition2);
validateEntity(entitiesInfo, updatedTableDef2);
assertEquals(((Map<String, AtlasStruct>) tableDefinition2.getAttribute("partitionsMap")).size(), 2);
Assert.assertTrue(partsMap.get("part1").equals(((Map<String, AtlasStruct>) tableDefinition2.getAttribute("partitionsMap")).get("part1")));
assertEquals(((Map<String, AtlasStruct>) updatedTableDef2.getAttribute("partitionsMap")).size(), 2);
Assert.assertTrue(partsMap.get("part1").equals(((Map<String, AtlasStruct>) updatedTableDef2.getAttribute("partitionsMap")).get("part1")));
//update map - remove a key and add another key
partsMap.remove("part0");
......@@ -287,11 +292,12 @@ public class AtlasEntityStoreV1Test {
response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
AtlasEntityHeader tableDefinition3 = response.getFirstUpdatedEntityByTypeName(TABLE_TYPE);
validateEntity(entitiesInfo, getEntityFromStore(tableDefinition3));
AtlasEntity updatedTableDef3 = getEntityFromStore(tableDefinition3);
validateEntity(entitiesInfo, updatedTableDef3);
assertEquals(((Map<String, AtlasStruct>) tableDefinition3.getAttribute("partitionsMap")).size(), 2);
Assert.assertNull(((Map<String, AtlasStruct>) tableDefinition3.getAttribute("partitionsMap")).get("part0"));
Assert.assertTrue(partsMap.get("part2").equals(((Map<String, AtlasStruct>) tableDefinition3.getAttribute("partitionsMap")).get("part2")));
assertEquals(((Map<String, AtlasStruct>) updatedTableDef3.getAttribute("partitionsMap")).size(), 2);
Assert.assertNull(((Map<String, AtlasStruct>) updatedTableDef3.getAttribute("partitionsMap")).get("part0"));
Assert.assertTrue(partsMap.get("part2").equals(((Map<String, AtlasStruct>) updatedTableDef3.getAttribute("partitionsMap")).get("part2")));
//update struct value for existing map key
AtlasStruct partition2 = partsMap.get("part2");
......@@ -301,11 +307,12 @@ public class AtlasEntityStoreV1Test {
response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false);
AtlasEntityHeader tableDefinition4 = response.getFirstUpdatedEntityByTypeName(TABLE_TYPE);
validateEntity(entitiesInfo, getEntityFromStore(tableDefinition4));
AtlasEntity updatedTableDef4 = getEntityFromStore(tableDefinition4);
validateEntity(entitiesInfo, updatedTableDef4);
assertEquals(((Map<String, AtlasStruct>) tableDefinition4.getAttribute("partitionsMap")).size(), 2);
Assert.assertNull(((Map<String, AtlasStruct>) tableDefinition4.getAttribute("partitionsMap")).get("part0"));
Assert.assertTrue(partsMap.get("part2").equals(((Map<String, AtlasStruct>) tableDefinition4.getAttribute("partitionsMap")).get("part2")));
assertEquals(((Map<String, AtlasStruct>) updatedTableDef4.getAttribute("partitionsMap")).size(), 2);
Assert.assertNull(((Map<String, AtlasStruct>) updatedTableDef4.getAttribute("partitionsMap")).get("part0"));
Assert.assertTrue(partsMap.get("part2").equals(((Map<String, AtlasStruct>) updatedTableDef4.getAttribute("partitionsMap")).get("part2")));
//Test map pointing to a class
......@@ -523,8 +530,9 @@ public class AtlasEntityStoreV1Test {
response = entityStore.createOrUpdate(new InMemoryMapEntityStream(tableCloneMap), false);
final AtlasEntityHeader tableDefinition = response.getFirstUpdatedEntityByTypeName(TABLE_TYPE);
Assert.assertNotNull(tableDefinition.getAttribute("database"));
Assert.assertEquals(((AtlasObjectId) tableDefinition.getAttribute("database")).getGuid(), dbCreated.getGuid());
AtlasEntity updatedTableDefinition = getEntityFromStore(tableDefinition);
Assert.assertNotNull(updatedTableDefinition.getAttribute("database"));
Assert.assertEquals(((AtlasObjectId) updatedTableDefinition.getAttribute("database")).getGuid(), dbCreated.getGuid());
}
@Test
......@@ -534,7 +542,7 @@ public class AtlasEntityStoreV1Test {
init();
EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(dbEntity), false);
AtlasEntityHeader firstEntityCreated = response.getFirstCreatedEntityByTypeName(TestUtilsV2.DATABASE_TYPE);
AtlasEntity firstEntityCreated = getEntityFromStore(response.getFirstCreatedEntityByTypeName(TestUtilsV2.DATABASE_TYPE));
//The optional boolean attribute should have a non-null value
final String isReplicatedAttr = "isReplicated";
......@@ -552,7 +560,7 @@ public class AtlasEntityStoreV1Test {
init();
response = entityStore.createOrUpdate(new AtlasEntityStream(dbEntity), false);
AtlasEntityHeader firstEntityUpdated = response.getFirstUpdatedEntityByTypeName(TestUtilsV2.DATABASE_TYPE);
AtlasEntity firstEntityUpdated = getEntityFromStore(response.getFirstUpdatedEntityByTypeName(TestUtilsV2.DATABASE_TYPE));
Assert.assertNotNull(firstEntityUpdated);
Assert.assertNotNull(firstEntityUpdated.getAttribute(isReplicatedAttr));
......@@ -736,8 +744,9 @@ public class AtlasEntityStoreV1Test {
tblHeader = response.getFirstEntityPartialUpdated();
AtlasEntity updatedTblEntity = getEntityFromStore(tblHeader);
columns = (List<AtlasObjectId>) tblHeader.getAttribute(TestUtilsV2.COLUMNS_ATTR_NAME);
assertEquals(columns.size(), 2);
columns = (List<AtlasObjectId>) updatedTblEntity.getAttribute(TestUtilsV2.COLUMNS_ATTR_NAME);
// deleted columns are included in the attribute; hence use >=
assertTrue(columns.size() >= 2);
}
@Test
......@@ -867,7 +876,7 @@ public class AtlasEntityStoreV1Test {
if (MapUtils.isNotEmpty(expectedMap)) {
Assert.assertTrue(MapUtils.isNotEmpty(actualMap));
//actual map could have deleted entities. Hence size may not match.
// deleted entries are included in the attribute; hence use >=
Assert.assertTrue(actualMap.size() >= expectedMap.size());
for (Object key : expectedMap.keySet()) {
......
......@@ -20,7 +20,7 @@ package org.apache.atlas.web.resources;
import com.google.inject.Inject;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult;
......@@ -59,6 +59,7 @@ import javax.ws.rs.core.Response;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.atlas.repository.converters.AtlasInstanceConverter.toAtlasBaseException;
......@@ -76,6 +77,8 @@ public class AdminResource {
@Context
private HttpServletResponse httpServletResponse;
private final ReentrantLock importExportOperationLock;
private static final String isCSRF_ENABLED = "atlas.rest-csrf.enabled";
private static final String BROWSER_USER_AGENT_PARAM = "atlas.rest-csrf.browser-useragents-regex";
private static final String CUSTOM_METHODS_TO_IGNORE_PARAM = "atlas.rest-csrf.methods-to-ignore";
......@@ -102,6 +105,7 @@ public class AdminResource {
this.typeRegistry = typeRegistry;
this.typesDefStore = typeDefStore;
this.entityStore = entityStore;
this.importExportOperationLock = new ReentrantLock();
}
/**
......@@ -275,6 +279,10 @@ public class AdminResource {
return metrics;
}
private void releaseExportImportLock() {
importExportOperationLock.unlock();
}
@POST
@Path("/export")
@Consumes(Servlets.JSON_MEDIA_TYPE)
......@@ -283,6 +291,8 @@ public class AdminResource {
LOG.debug("==> AdminResource.export()");
}
acquireExportImportLock("export");
ZipSink exportSink = null;
try {
exportSink = new ZipSink();
......@@ -308,6 +318,8 @@ public class AdminResource {
throw new AtlasBaseException(excp);
} finally {
releaseExportImportLock();
if (exportSink != null) {
exportSink.close();
}
......@@ -327,6 +339,8 @@ public class AdminResource {
LOG.debug("==> AdminResource.importData(bytes.length={})", bytes.length);
}
acquireExportImportLock("import");
AtlasImportResult result;
try {
......@@ -344,6 +358,8 @@ public class AdminResource {
throw new AtlasBaseException(excp);
} finally {
releaseExportImportLock();
if (LOG.isDebugEnabled()) {
LOG.debug("<== AdminResource.importData(binary)");
}
......@@ -360,6 +376,8 @@ public class AdminResource {
LOG.debug("==> AdminResource.importFile()");
}
acquireExportImportLock("importFile");
AtlasImportResult result;
try {
......@@ -374,6 +392,8 @@ public class AdminResource {
throw new AtlasBaseException(excp);
} finally {
releaseExportImportLock();
if (LOG.isDebugEnabled()) {
LOG.debug("<== AdminResource.importFile()");
}
......@@ -407,4 +427,15 @@ public class AdminResource {
return ret;
}
private void acquireExportImportLock(String activity) throws AtlasBaseException {
boolean alreadyLocked = importExportOperationLock.isLocked();
if (alreadyLocked) {
LOG.warn("Another export or import is currently in progress..aborting this " + activity, Thread.currentThread().getName());
throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_OBTAIN_IMPORT_EXPORT_LOCK);
}
importExportOperationLock.lock();
}
}
\ No newline at end of file
......@@ -75,7 +75,7 @@ public class ExportService {
public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName,
String requestingIP) throws AtlasBaseException {
long startTimestamp = System.currentTimeMillis();
ExportContext context = new ExportContext(new AtlasExportResult(request, userName, hostName, requestingIP,
System.currentTimeMillis()), exportSink);
......@@ -90,6 +90,9 @@ public class ExportService {
context.sink.setTypesDef(context.result.getData().getTypesDef());
context.result.setData(null);
context.result.setOperationStatus(AtlasExportResult.OperationStatus.SUCCESS);
long endTimestamp = System.currentTimeMillis();
context.result.incrementMeticsCounter("duration", (int) (endTimestamp - startTimestamp));
context.sink.setResult(context.result);
} catch(Exception ex) {
LOG.error("Operation failed: ", ex);
......@@ -175,7 +178,7 @@ public class ExportService {
context.sink.add(entity);
context.result.incrementMeticsCounter(String.format("entity:%s", entity.getTypeName()));
context.result.incrementMeticsCounter("Entities");
context.result.incrementMeticsCounter("entities");
if (context.guidsProcessed.size() % 10 == 0) {
LOG.info("export(): in progress.. number of entities exported: {}", context.guidsProcessed.size());
......@@ -195,7 +198,7 @@ public class ExportService {
AtlasClassificationDef cd = typeRegistry.getClassificationDefByName(c.getTypeName());
typesDef.getClassificationDefs().add(cd);
result.incrementMeticsCounter("Classification");
result.incrementMeticsCounter("typedef:classification");
}
}
}
......@@ -208,7 +211,7 @@ public class ExportService {
AtlasEntityDef typeDefinition = typeRegistry.getEntityDefByName(typeName);
typesDef.getEntityDefs().add(typeDefinition);
result.incrementMeticsCounter("Type(s)");
result.incrementMeticsCounter("typedef:" + typeDefinition.getName());
}
}
......
......@@ -20,6 +20,7 @@ package org.apache.atlas.web.resources;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.typedef.*;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.commons.io.FileUtils;
......@@ -31,6 +32,7 @@ import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class ImportService {
......@@ -39,6 +41,9 @@ public class ImportService {
private final AtlasTypeDefStore typeDefStore;
private final AtlasEntityStore entityStore;
private long startTimestamp;
private long endTimestamp;
public ImportService(final AtlasTypeDefStore typeDefStore, final AtlasEntityStore entityStore) {
this.typeDefStore = typeDefStore;
......@@ -52,6 +57,7 @@ public class ImportService {
try {
LOG.info("==> import(user={}, from={})", userName, requestingIP);
startTimestamp = System.currentTimeMillis();
processTypes(source.getTypesDef(), result);
processEntities(source, result);
......@@ -65,12 +71,7 @@ public class ImportService {
throw new AtlasBaseException(excp);
} finally {
try {
source.close();
} catch (IOException e) {
// ignore
}
LOG.info("<== import(user={}, from={}): status={}", userName, requestingIP, result.getOperationStatus());
}
......@@ -118,10 +119,14 @@ public class ImportService {
setGuidToEmpty(typeDefinitionMap.getEntityDefs());
typeDefStore.updateTypesDef(typeDefinitionMap);
result.incrementMeticsCounter("Enum(s)", typeDefinitionMap.getEnumDefs().size());
result.incrementMeticsCounter("Struct(s)", typeDefinitionMap.getStructDefs().size());
result.incrementMeticsCounter("Classification(s)", typeDefinitionMap.getClassificationDefs().size());
result.incrementMeticsCounter("Entity definition(s)", typeDefinitionMap.getEntityDefs().size());
updateMetricsForTypesDef(typeDefinitionMap, result);
}
private void updateMetricsForTypesDef(AtlasTypesDef typeDefinitionMap, AtlasImportResult result) {
result.incrementMeticsCounter("typedef:classification", typeDefinitionMap.getClassificationDefs().size());
result.incrementMeticsCounter("typedef:enum", typeDefinitionMap.getEnumDefs().size());
result.incrementMeticsCounter("typedef:entitydef", typeDefinitionMap.getEntityDefs().size());
result.incrementMeticsCounter("typedef:struct", typeDefinitionMap.getStructDefs().size());
}
private void setGuidToEmpty(List<AtlasEntityDef> entityDefList) {
......@@ -131,7 +136,9 @@ public class ImportService {
}
private void processEntities(ZipSource importSource, AtlasImportResult result) throws AtlasBaseException {
this.entityStore.createOrUpdate(importSource, false);
result.incrementMeticsCounter("Entities", importSource.getCreationOrder().size());
this.entityStore.bulkImport(importSource, result);
endTimestamp = System.currentTimeMillis();
result.incrementMeticsCounter("Duration", (int) (this.endTimestamp - this.startTimestamp));
}
}
......@@ -19,7 +19,6 @@ package org.apache.atlas.web.resources;
import org.codehaus.jackson.type.TypeReference;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
......@@ -28,8 +27,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
......@@ -40,100 +41,70 @@ public class ZipSource implements EntityImportStream {
private final ByteArrayInputStream inputStream;
private List<String> creationOrder;
private Iterator<String> iterator;
private Map<String, String> guidEntityJsonMap;
public ZipSource(ByteArrayInputStream inputStream) {
public ZipSource(ByteArrayInputStream inputStream) throws IOException {
this.inputStream = inputStream;
guidEntityJsonMap = new HashMap<>();
updateGuidZipEntryMap();
this.setCreationOrder();
}
public AtlasTypesDef getTypesDef() throws AtlasBaseException {
final String fileName = ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString();
try {
String s = get(fileName);
String s = getFromCache(fileName);
return convertFromJson(AtlasTypesDef.class, s);
} catch (IOException e) {
LOG.error(String.format("Error retrieving '%s' from zip.", fileName), e);
return null;
}
}
public AtlasExportResult getExportResult() throws AtlasBaseException {
String fileName = ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString();
try {
String s = get(fileName);
return convertFromJson(AtlasExportResult.class, s);
} catch (IOException e) {
LOG.error(String.format("Error retrieving '%s' from zip.", fileName), e);
return null;
}
}
private void setCreationOrder() {
String fileName = ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString();
try {
String s = get(fileName);
String s = getFromCache(fileName);
this.creationOrder = convertFromJson(new TypeReference<List<String>>(){}, s);
this.iterator = this.creationOrder.iterator();
} catch (IOException e) {
LOG.error(String.format("Error retrieving '%s' from zip.", fileName), e);
} catch (AtlasBaseException e) {
LOG.error(String.format("Error retrieving '%s' from zip.", fileName), e);
}
}
public List<String> getCreationOrder() throws AtlasBaseException {
return this.creationOrder;
}
public AtlasEntity getEntity(String guid) throws AtlasBaseException {
try {
String s = get(guid);
return convertFromJson(AtlasEntity.class, s);
} catch (IOException e) {
LOG.error(String.format("Error retrieving '%s' from zip.", guid), e);
return null;
}
}
private String get(String entryName) throws IOException {
String ret = "";
private void updateGuidZipEntryMap() throws IOException {
inputStream.reset();
ZipInputStream zipInputStream = new ZipInputStream(inputStream);
ZipEntry zipEntry = zipInputStream.getNextEntry();
entryName = entryName + ".json";
while (zipEntry != null) {
if (zipEntry.getName().equals(entryName)) {
break;
}
String entryName = zipEntry.getName().replace(".json", "");
zipEntry = zipInputStream.getNextEntry();
}
if (guidEntityJsonMap.containsKey(entryName)) continue;
if (zipEntry == null) continue;
if (zipEntry != null) {
ByteArrayOutputStream os = new ByteArrayOutputStream();
byte[] buf = new byte[1024];
int n = 0;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
while ((n = zipInputStream.read(buf, 0, 1024)) > -1) {
os.write(buf, 0, n);
bos.write(buf, 0, n);
}
ret = os.toString();
} else {
LOG.warn("{}: no such entry in zip file", entryName);
guidEntityJsonMap.put(entryName, bos.toString());
zipEntry = zipInputStream.getNextEntry();
}
zipInputStream.close();
}
return ret;
public List<String> getCreationOrder() throws AtlasBaseException {
return this.creationOrder;
}
public AtlasEntity getEntity(String guid) throws AtlasBaseException {
String s = getFromCache(guid);
return convertFromJson(AtlasEntity.class, s);
}
private <T> T convertFromJson(TypeReference clazz, String jsonData) throws AtlasBaseException {
......@@ -158,8 +129,20 @@ public class ZipSource implements EntityImportStream {
}
}
public void close() throws IOException {
private String getFromCache(String entryName) {
if(!guidEntityJsonMap.containsKey(entryName)) return "";
return guidEntityJsonMap.get(entryName).toString();
}
public void close() {
try {
inputStream.close();
guidEntityJsonMap.clear();
}
catch(IOException ex) {
LOG.warn("{}: Error closing streams.");
}
}
@Override
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment