Commit f4574531 by Ashutosh Mestry

ATLAS-3212: Importing with import transforms applied to exising entities.

parent b0063fa5
......@@ -25,4 +25,5 @@ import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface GraphTransaction {
boolean logRollback() default true;
}
......@@ -20,6 +20,7 @@ package org.apache.atlas;
import com.google.common.annotations.VisibleForTesting;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.exception.NotFoundException;
import org.apache.atlas.repository.graphdb.AtlasGraph;
......@@ -63,6 +64,7 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
Method method = invocation.getMethod();
String invokingClass = method.getDeclaringClass().getSimpleName();
String invokedMethodName = method.getName();
boolean logRollback = method.getAnnotation(GraphTransaction.class).logRollback();
boolean isInnerTxn = isTxnOpen.get();
// Outermost txn marks any subsequent transaction as inner
......@@ -99,7 +101,7 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
}
innerFailure.set(true);
} else {
doRollback(t);
doRollback(logRollback, t);
}
throw t;
}
......@@ -159,12 +161,15 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
}
}
private void doRollback(final Throwable t) {
private void doRollback(boolean logRollback, final Throwable t) {
if (logRollback) {
if (logException(t)) {
LOG.error("graph rollback due to exception ", t);
} else {
LOG.error("graph rollback due to exception {}:{}", t.getClass().getSimpleName(), t.getMessage());
}
}
graph.rollback();
}
......
......@@ -276,7 +276,7 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
}
@Override
@GraphTransaction
@GraphTransaction(logRollback = false)
public EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException {
return createOrUpdate(entityStream, false, true);
}
......
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
......@@ -22,7 +22,6 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
......@@ -30,14 +29,12 @@ import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.impexp.StartEntityFetchByExportRequest;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.BulkImporter;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.AtlasGremlinQueryProvider;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -48,7 +45,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.atlas.repository.Constants.HISTORICAL_GUID_PROPERTY_KEY;
......@@ -59,15 +55,14 @@ public class BulkImporterImpl implements BulkImporter {
private final AtlasEntityStore entityStore;
private final EntityGraphRetriever entityGraphRetriever;
private final Map<AtlasObjectId, String> objectIdExistingGuidMap;
private final StartEntityFetchByExportRequest startEntityFetchByExportRequest;
private AtlasTypeRegistry typeRegistry;
private final int MAX_ATTEMPTS = 2;
@Inject
public BulkImporterImpl(AtlasGraph atlasGraph, AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
this.entityStore = entityStore;
this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
this.objectIdExistingGuidMap = new HashMap<>();
this.startEntityFetchByExportRequest = new StartEntityFetchByExportRequest(atlasGraph, typeRegistry, AtlasGremlinQueryProvider.INSTANCE);
this.typeRegistry = typeRegistry;
}
@Override
......@@ -80,7 +75,6 @@ public class BulkImporterImpl implements BulkImporter {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
}
fetchExistingEntitiesNotCreatedByImport(importResult.getExportResult().getRequest());
EntityMutationResponse ret = new EntityMutationResponse();
ret.setGuidAssignments(new HashMap<>());
......@@ -98,40 +92,51 @@ public class BulkImporterImpl implements BulkImporter {
continue;
}
AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, entityStream);
for (int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
try {
AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null);
EntityMutationResponse resp = entityStore.createOrUpdateForImport(oneEntityStream);
if (resp.getGuidAssignments() != null) {
ret.getGuidAssignments().putAll(resp.getGuidAssignments());
}
currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids, entityStream.getPosition(), entityImportStreamWithResidualList.getStreamSize(), currentPercent);
currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids,
entityStream.getPosition(),
entityImportStreamWithResidualList.getStreamSize(),
currentPercent);
entityStream.onImportComplete(entity.getGuid());
break;
} catch (AtlasBaseException e) {
if (!updateResidualList(e, residualList, entityWithExtInfo.getEntity().getGuid())) {
throw e;
}
break;
} catch (AtlasSchemaViolationException e) {
AtlasObjectId objectId = entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity);
if (objectIdExistingGuidMap.containsKey(objectId)) {
updateVertexGuidIfImportingToNonImportedEntity(entity, objectId);
if (LOG.isDebugEnabled()) {
LOG.debug("Entity: {}", entity.getGuid(), e);
}
continue;
if (attempt == 0) {
updateVertexGuid(entity);
} else {
LOG.error("Guid update failed: {}", entityWithExtInfo.getEntity().getGuid());
throw e;
}
catch (Throwable e) {
} catch (Throwable e) {
AtlasBaseException abe = new AtlasBaseException(e);
if (!updateResidualList(abe, residualList, entityWithExtInfo.getEntity().getGuid())) {
throw abe;
}
LOG.warn("Exception: {}", entity.getGuid(), e);
break;
} finally {
RequestContext.get().clearCache();
}
}
}
importResult.getProcessedEntities().addAll(processedGuids);
LOG.info("bulkImport(): done. Total number of entities (including referred entities) imported: {}", processedGuids.size());
......@@ -139,25 +144,21 @@ public class BulkImporterImpl implements BulkImporter {
return ret;
}
private void fetchExistingEntitiesNotCreatedByImport(AtlasExportRequest request) {
List<AtlasObjectId> objectIds = startEntityFetchByExportRequest.get(request);
if (objectIds.isEmpty()) {
return;
}
for (AtlasObjectId objectId : objectIds) {
String existingGuid = objectId.getGuid();
objectId.setGuid(null);
@GraphTransaction
public void updateVertexGuid(AtlasEntity entity) {
String entityGuid = entity.getGuid();
AtlasObjectId objectId = entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity);
objectIdExistingGuidMap.put(objectId, existingGuid);
}
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
String vertexGuid = null;
try {
vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, objectId.getUniqueAttributes());
} catch (AtlasBaseException e) {
LOG.warn("Entity: {}: Does not exist!", objectId);
return;
}
@GraphTransaction
public void updateVertexGuidIfImportingToNonImportedEntity(AtlasEntity entity, AtlasObjectId objectId) {
String entityGuid = entity.getGuid();
String vertexGuid = objectIdExistingGuidMap.get(objectId);
if (vertexGuid.equals(entityGuid)) {
if (StringUtils.isEmpty(vertexGuid) || vertexGuid.equals(entityGuid)) {
return;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment