Commit 24a106b4 by ashutoshm Committed by Madhan Neethiraj

ATLAS-1948: export fix to correct the import order

parent cfb6b84f
...@@ -98,7 +98,6 @@ public class ExportService { ...@@ -98,7 +98,6 @@ public class ExportService {
AtlasExportResult.OperationStatus[] statuses = processItems(request, context); AtlasExportResult.OperationStatus[] statuses = processItems(request, context);
processTypesDef(context); processTypesDef(context);
updateSinkWithOperationMetrics(context, statuses, getOperationDuration(startTime)); updateSinkWithOperationMetrics(context, statuses, getOperationDuration(startTime));
} catch(Exception ex) { } catch(Exception ex) {
LOG.error("Operation failed: ", ex); LOG.error("Operation failed: ", ex);
...@@ -113,6 +112,7 @@ public class ExportService { ...@@ -113,6 +112,7 @@ public class ExportService {
} }
private void updateSinkWithOperationMetrics(ExportContext context, AtlasExportResult.OperationStatus[] statuses, int duration) throws AtlasBaseException { private void updateSinkWithOperationMetrics(ExportContext context, AtlasExportResult.OperationStatus[] statuses, int duration) throws AtlasBaseException {
context.result.getData().getEntityCreationOrder().addAll(context.lineageProcessed);
context.sink.setExportOrder(context.result.getData().getEntityCreationOrder()); context.sink.setExportOrder(context.result.getData().getEntityCreationOrder());
context.sink.setTypesDef(context.result.getData().getTypesDef()); context.sink.setTypesDef(context.result.getData().getTypesDef());
clearContextData(context); clearContextData(context);
...@@ -201,9 +201,10 @@ public class ExportService { ...@@ -201,9 +201,10 @@ public class ExportService {
processEntity(guid, context); processEntity(guid, context);
} }
if (!context.guidsLineageToProcess.isEmpty()) { if (!context.lineageToProcess.isEmpty()) {
context.guidsToProcess.addAll(context.guidsLineageToProcess); context.guidsToProcess.addAll(context.lineageToProcess);
context.guidsLineageToProcess.clear(); context.lineageProcessed.addAll(context.lineageToProcess.getList());
context.lineageToProcess.clear();
} }
} }
} catch (AtlasBaseException excp) { } catch (AtlasBaseException excp) {
...@@ -295,7 +296,9 @@ public class ExportService { ...@@ -295,7 +296,9 @@ public class ExportService {
TraversalDirection direction = context.guidDirection.get(guid); TraversalDirection direction = context.guidDirection.get(guid);
AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid); AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
if(!context.lineageProcessed.contains(guid)) {
context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid()); context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid());
}
addEntity(entityWithExtInfo, context); addEntity(entityWithExtInfo, context);
addTypes(entityWithExtInfo.getEntity(), context); addTypes(entityWithExtInfo.getEntity(), context);
...@@ -651,13 +654,18 @@ public class ExportService { ...@@ -651,13 +654,18 @@ public class ExportService {
list.clear(); list.clear();
set.clear(); set.clear();
} }
public List<T> getList() {
return list;
}
} }
private class ExportContext { private class ExportContext {
final Set<String> guidsProcessed = new HashSet<>(); final Set<String> guidsProcessed = new HashSet<>();
final UniqueList<String> guidsToProcess = new UniqueList<>(); final UniqueList<String> guidsToProcess = new UniqueList<>();
final UniqueList<String> guidsLineageToProcess = new UniqueList<>(); final UniqueList<String> lineageToProcess = new UniqueList<>();
final Set<String> lineageProcessed = new HashSet<>();
final Map<String, TraversalDirection> guidDirection = new HashMap<>(); final Map<String, TraversalDirection> guidDirection = new HashMap<>();
final Set<String> entityTypes = new HashSet<>(); final Set<String> entityTypes = new HashSet<>();
final Set<String> classificationTypes = new HashSet<>(); final Set<String> classificationTypes = new HashSet<>();
...@@ -719,7 +727,7 @@ public class ExportService { ...@@ -719,7 +727,7 @@ public class ExportService {
} }
if(isSuperTypeProcess) { if(isSuperTypeProcess) {
guidsLineageToProcess.add(guid); lineageToProcess.add(guid);
} }
guidDirection.put(guid, direction); guidDirection.put(guid, direction);
......
...@@ -196,7 +196,7 @@ public class ZipSource implements EntityImportStream { ...@@ -196,7 +196,7 @@ public class ZipSource implements EntityImportStream {
AtlasEntity entity = getEntity(guid); AtlasEntity entity = getEntity(guid);
return entity; return entity;
} catch (AtlasBaseException e) { } catch (AtlasBaseException e) {
e.printStackTrace(); LOG.error("getByGuid: {} failed!", guid, e);
return null; return null;
} }
} }
......
...@@ -6,9 +6,9 @@ ...@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* * <p>
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* * <p>
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
...@@ -161,28 +161,35 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { ...@@ -161,28 +161,35 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
ret.setGuidAssignments(new HashMap<String, String>()); ret.setGuidAssignments(new HashMap<String, String>());
Set<String> processedGuids = new HashSet<>(); Set<String> processedGuids = new HashSet<>();
int streamSize = entityStream.size();
float currentPercent = 0f; float currentPercent = 0f;
while (entityStream.hasNext()) { List<String> residualList = new ArrayList<>();
AtlasEntityWithExtInfo entityWithExtInfo = entityStream.getNextEntityWithExtInfo(); EntityImportStreamWithResidualList entityImportStreamWithResidualList = new EntityImportStreamWithResidualList(entityStream, residualList);
while (entityImportStreamWithResidualList.hasNext()) {
AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo();
AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
if(entity == null || processedGuids.contains(entity.getGuid())) { if (entity == null || processedGuids.contains(entity.getGuid())) {
continue; continue;
} }
AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, entityStream); AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, entityStream);
try {
EntityMutationResponse resp = createOrUpdate(oneEntityStream, false, true); EntityMutationResponse resp = createOrUpdate(oneEntityStream, false, true);
currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids,
entityStream.getPosition(), streamSize, currentPercent);
if (resp.getGuidAssignments() != null) { if (resp.getGuidAssignments() != null) {
ret.getGuidAssignments().putAll(resp.getGuidAssignments()); ret.getGuidAssignments().putAll(resp.getGuidAssignments());
} }
currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids, entityStream.getPosition(),
entityImportStreamWithResidualList.getStreamSize(), currentPercent);
entityStream.onImportComplete(entity.getGuid()); entityStream.onImportComplete(entity.getGuid());
} catch (AtlasBaseException e) {
if (!updateResidualList(e, residualList, entityWithExtInfo.getEntity().getGuid())) {
throw e;
}
}
} }
importResult.getProcessedEntities().addAll(processedGuids); importResult.getProcessedEntities().addAll(processedGuids);
...@@ -191,12 +198,20 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { ...@@ -191,12 +198,20 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
return ret; return ret;
} }
private boolean updateResidualList(AtlasBaseException e, List<String> lineageList, String guid) {
if (!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode())) {
return false;
}
lineageList.add(guid);
return true;
}
private float updateImportMetrics(AtlasEntityWithExtInfo currentEntity, private float updateImportMetrics(AtlasEntityWithExtInfo currentEntity,
EntityMutationResponse resp, EntityMutationResponse resp,
AtlasImportResult importResult, AtlasImportResult importResult,
Set<String> processedGuids, Set<String> processedGuids,
int currentIndex, int streamSize, float currentPercent) { int currentIndex, int streamSize, float currentPercent) {
updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult); updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult); updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult); updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
...@@ -214,7 +229,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { ...@@ -214,7 +229,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
final double tolerance = 0.000001; final double tolerance = 0.000001;
final int MAX_PERCENT = 100; final int MAX_PERCENT = 100;
float percent = (float) ((currentIndex * MAX_PERCENT)/streamSize); float percent = (float) ((currentIndex * MAX_PERCENT) / streamSize);
boolean updateLog = Double.compare(percent, currentPercent) > tolerance; boolean updateLog = Double.compare(percent, currentPercent) > tolerance;
float updatedPercent = (MAX_PERCENT < streamSize) ? percent : float updatedPercent = (MAX_PERCENT < streamSize) ? percent :
((updateLog) ? ++currentPercent : currentPercent); ((updateLog) ? ++currentPercent : currentPercent);
...@@ -232,7 +247,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { ...@@ -232,7 +247,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
} }
for (AtlasEntityHeader h : list) { for (AtlasEntityHeader h : list) {
if(processedGuids.contains(h.getGuid())) { if (processedGuids.contains(h.getGuid())) {
continue; continue;
} }
...@@ -704,6 +719,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { ...@@ -704,6 +719,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
/** /**
* Validate if classification is not already associated with the entities * Validate if classification is not already associated with the entities
*
* @param guid unique entity id * @param guid unique entity id
* @param classifications list of classifications to be associated * @param classifications list of classifications to be associated
*/ */
...@@ -734,4 +750,43 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { ...@@ -734,4 +750,43 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
return ret; return ret;
} }
private static class EntityImportStreamWithResidualList {
private final EntityImportStream stream;
private final List<String> residualList;
private boolean navigateResidualList;
private int currentResidualListIndex;
public EntityImportStreamWithResidualList(EntityImportStream stream, List<String> residualList) {
this.stream = stream;
this.residualList = residualList;
this.navigateResidualList = false;
this.currentResidualListIndex = 0;
}
public AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
if (navigateResidualList == false) {
return stream.getNextEntityWithExtInfo();
} else {
stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++));
return stream.getNextEntityWithExtInfo();
}
}
public boolean hasNext() {
if (!navigateResidualList) {
boolean streamHasNext = stream.hasNext();
navigateResidualList = (streamHasNext == false);
return streamHasNext ? streamHasNext : (currentResidualListIndex < residualList.size());
} else {
return (currentResidualListIndex < residualList.size());
}
}
public int getStreamSize() {
return stream.size() + residualList.size();
}
}
} }
...@@ -6,9 +6,9 @@ ...@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* * <p>
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* * <p>
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
...@@ -133,4 +133,22 @@ public class ImportServiceTest { ...@@ -133,4 +133,22 @@ public class ImportServiceTest {
assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName)); assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName));
assertEquals(typeDefStore.getEnumDefByName(newEnumDefName).getElementDefs().size(), 8); assertEquals(typeDefStore.getEnumDefByName(newEnumDefName).getElementDefs().size(), 8);
} }
@DataProvider(name = "ctas")
public static Object[][] getDataFromCtas(ITestContext context) throws IOException {
return getZipSource("ctas.zip");
}
@Test(dataProvider = "ctas")
public void importCTAS(ZipSource zipSource) throws IOException, AtlasBaseException {
loadModelFromJson("0010-base_model.json", typeDefStore, typeRegistry);
loadModelFromJson("0030-hive_model.json", typeDefStore, typeRegistry);
AtlasImportRequest request = getDefaultImportRequest();
runImportWithParameters(getImportService(), getDefaultImportRequest(), zipSource);
}
private ImportService getImportService() {
return new ImportService(typeDefStore, entityStore, typeRegistry);
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment