Commit 572c5d64 by Ashutosh Mestry

ATLAS-2900: Export connected addressed case where imported file is not usable.

parent 0cf62fd8
......@@ -19,8 +19,6 @@ package org.apache.atlas.repository.impexp;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
......@@ -135,18 +133,16 @@ public class ExportService {
long startTime, long endTime) throws AtlasBaseException {
int duration = getOperationDuration(startTime, endTime);
context.result.setSourceClusterName(AuditsWriter.getCurrentClusterName());
context.result.getData().getEntityCreationOrder().addAll(context.lineageProcessed);
context.sink.setExportOrder(context.result.getData().getEntityCreationOrder());
context.addToEntityCreationOrder(context.lineageProcessed);
context.sink.setExportOrder(context.entityCreationOrder.getList());
context.sink.setTypesDef(context.result.getData().getTypesDef());
context.result.setOperationStatus(getOverallOperationStatus(statuses));
context.result.incrementMeticsCounter("duration", duration);
auditsWriter.write(userName, context.result, startTime, endTime, context.result.getData().getEntityCreationOrder());
clearContextData(context);
context.sink.setResult(context.result);
}
auditsWriter.write(userName, context.result, startTime, endTime, context.entityCreationOrder.getList());
private void clearContextData(ExportContext context) {
context.result.setData(null);
context.sink.setResult(context.result);
}
private int getOperationDuration(long startTime, long endTime) {
......@@ -375,7 +371,7 @@ public class ExportService {
TraversalDirection direction) throws AtlasBaseException {
if (!context.lineageProcessed.contains(guid) && context.doesTimestampQualify(entityWithExtInfo.getEntity())) {
context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid());
context.addToEntityCreationOrder(entityWithExtInfo.getEntity().getGuid());
}
addEntity(entityWithExtInfo, context);
......@@ -546,7 +542,7 @@ public class ExportService {
} else {
List<AtlasEntity> entities = context.getEntitiesWithModifiedTimestamp(entityWithExtInfo);
for (AtlasEntity e : entities) {
context.result.getData().getEntityCreationOrder().add(e.getGuid());
context.addToEntityCreationOrder(e.getGuid());
context.addToSink(new AtlasEntityWithExtInfo(e));
context.result.incrementMeticsCounter(String.format("entity:%s", e.getTypeName()));
}
......@@ -718,6 +714,7 @@ public class ExportService {
private static final String ATLAS_TYPE_HIVE_DB = "hive_db";
final UniqueList<String> entityCreationOrder = new UniqueList<>();
final Set<String> guidsProcessed = new HashSet<>();
final private UniqueList<String> guidsToProcess = new UniqueList<>();
final UniqueList<String> lineageToProcess = new UniqueList<>();
......@@ -826,5 +823,13 @@ public class ExportService {
public boolean isHiveDBIncrementalSkipLineage() {
return isHiveDBIncremental;
}
public void addToEntityCreationOrder(String guid) {
entityCreationOrder.add(guid);
}
public void addToEntityCreationOrder(Collection<String> guids) {
entityCreationOrder.addAll(guids);
}
}
}
......@@ -18,6 +18,7 @@
package org.apache.atlas.repository.util;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
......@@ -44,8 +45,8 @@ public class UniqueList<T> {
}
}
public void addAll(List<T> list) {
for (T item : list) {
public void addAll(Collection<T> collection) {
for (T item : collection) {
add(item);
}
}
......
......@@ -27,6 +27,7 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
import org.apache.atlas.repository.util.UniqueList;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasTypeRegistry;
......@@ -39,6 +40,7 @@ import org.testng.annotations.Test;
import javax.inject.Inject;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER;
......@@ -63,6 +65,7 @@ public class ExportIncrementalTest extends ExportImportTestBase {
private AtlasEntityStoreV2 entityStore;
private final String EXPORT_REQUEST_INCREMENTAL = "export-incremental";
private final String EXPORT_REQUEST_CONNECTED = "export-connected";
private long nextTimestamp;
@BeforeClass
......@@ -158,6 +161,17 @@ public class ExportIncrementalTest extends ExportImportTestBase {
assertNotNull(source);
}
@Test
public void connectedExport() {
ZipSource source = runExportWithParameters(exportService, getConnected());
UniqueList<String> creationOrder = new UniqueList<>();
List<String> zipCreationOrder = source.getCreationOrder();
creationOrder.addAll(zipCreationOrder);
assertNotNull(source);
assertEquals(creationOrder.size(), zipCreationOrder.size());
}
private AtlasExportRequest getIncrementalRequest(long timestamp) {
try {
AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, EXPORT_REQUEST_INCREMENTAL, AtlasExportRequest.class);
......@@ -168,4 +182,13 @@ public class ExportIncrementalTest extends ExportImportTestBase {
throw new SkipException(String.format("getIncrementalRequest: '%s' could not be laoded.", EXPORT_REQUEST_INCREMENTAL));
}
}
private AtlasExportRequest getConnected() {
try {
return TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, EXPORT_REQUEST_CONNECTED, AtlasExportRequest.class);
} catch (IOException e) {
throw new SkipException(String.format("getIncrementalRequest: '%s' could not be laoded.", EXPORT_REQUEST_CONNECTED));
}
}
}
......@@ -383,7 +383,7 @@ public class ExportServiceTest extends ExportImportTestBase {
private void verifyExportForHrDataForConnected(ZipSource zipSource) throws IOException, AtlasBaseException {
assertNotNull(zipSource.getCreationOrder());
assertTrue(zipSource.getCreationOrder().size() == 2);
assertEquals(zipSource.getCreationOrder().size(), 1);
assertTrue(zipSource.hasNext());
AtlasEntity entity = zipSource.next();
......
{
"itemsToExport": [
{
"typeName": "hive_table", "uniqueAttributes": { "qualifiedName": "stocks_base.stocks_daily@cl1" }
}
],
"options": {
"fetchType": "connected"
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment