Commit 3de30f55 by Ashutosh Mestry

ATLAS-3799: EntityConumer only adds entity GUIDs that are added when they were produced.

parent 214f2a76
......@@ -59,7 +59,7 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith
private final EntityGraphRetriever entityRetrieverBulk;
private List<AtlasEntity.AtlasEntityWithExtInfo> entityBuffer = new ArrayList<>();
private List<EntityMutationResponse> localResults = new ArrayList<>();
private List<String> localResults = new ArrayList<>();
public EntityConsumer(AtlasTypeRegistry typeRegistry,
AtlasGraph atlasGraph, AtlasEntityStore entityStore,
......@@ -119,7 +119,7 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith
private void importUsingBulkEntityStore(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException {
EntityStream oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null);
EntityMutationResponse result = entityStoreBulk.createOrUpdateForImportNoCommit(oneEntityStream);
localResults.add(result);
localResults.add(entityWithExtInfo.getEntity().getGuid());
entityBuffer.add(entityWithExtInfo);
}
......@@ -133,9 +133,9 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith
try {
LOG.info("Regular: EntityStore: {}: Starting...", this.counter.get());
AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null);
EntityMutationResponse result = this.entityStore.createOrUpdateForImportNoCommit(oneEntityStream);
this.entityStore.createOrUpdateForImportNoCommit(oneEntityStream);
atlasGraph.commit();
localResults.add(result);
localResults.add(entityWithExtInfo.getEntity().getGuid());
dispatchResults();
} catch (Exception e) {
atlasGraph.rollback();
......@@ -244,12 +244,7 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith
}
private void dispatchResults() {
localResults.stream().forEach(x -> {
addResultsFromResponse(x.getCreatedEntities());
addResultsFromResponse(x.getUpdatedEntities());
addResultsFromResponse(x.getDeletedEntities());
});
localResults.stream().forEach(x -> addResult(x));
clear();
}
......@@ -261,16 +256,6 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith
}
}
private void addResultsFromResponse(List<AtlasEntityHeader> entities) {
if (CollectionUtils.isEmpty(entities)) {
return;
}
for (AtlasEntityHeader eh : entities) {
addResult(eh.getGuid());
}
}
private void clear() {
localResults.clear();
entityBuffer.clear();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment