Commit 49db4cac by Ashutosh Mestry

ATLAS-3176: Fix for export order regression.

parent 73d2134a
......@@ -20,10 +20,8 @@ package org.apache.atlas.repository.impexp;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
......@@ -33,20 +31,11 @@ import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasEnumDef;
import org.apache.atlas.model.typedef.AtlasRelationshipDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.util.UniqueList;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasEnumType;
import org.apache.atlas.type.AtlasMapType;
import org.apache.atlas.type.AtlasRelationshipType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.util.AtlasGremlinQueryProvider;
......@@ -59,9 +48,16 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.atlas.model.impexp.AtlasExportRequest.*;
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_CONNECTED;
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_FULL;
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL;
@Component
public class ExportService {
......@@ -70,7 +66,6 @@ public class ExportService {
public static final String PROPERTY_GUID = "__guid";
private static final String PROPERTY_IS_PROCESS = "isProcess";
private final AtlasTypeRegistry typeRegistry;
private final String QUERY_BINDING_START_GUID = "startGuid";
private final StartEntityFetchByExportRequest startEntityFetchByExportRequest;
......@@ -101,7 +96,7 @@ public class ExportService {
hostName, startTime, getCurrentChangeMarker());
ExportContext context = new ExportContext(atlasGraph, result, exportSink);
exportTypeProcessor = new ExportTypeProcessor(typeRegistry, context);
exportTypeProcessor = new ExportTypeProcessor(typeRegistry);
try {
LOG.info("==> export(user={}, from={})", userName, requestingIP);
......@@ -134,7 +129,6 @@ public class ExportService {
long startTime, long endTime) throws AtlasBaseException {
int duration = getOperationDuration(startTime, endTime);
context.result.setSourceClusterName(AuditsWriter.getCurrentClusterName());
context.addToEntityCreationOrder(context.lineageProcessed);
context.sink.setExportOrder(context.entityCreationOrder.getList());
context.sink.setTypesDef(context.result.getData().getTypesDef());
......@@ -269,19 +263,15 @@ public class ExportService {
TraversalDirection direction = context.guidDirection.get(guid);
AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
processEntity(guid, entityWithExtInfo, context, direction);
processEntity(entityWithExtInfo, context, direction);
debugLog("<== processEntityGuid({})", guid);
}
public void processEntity(String guid, AtlasEntityWithExtInfo entityWithExtInfo,
public void processEntity(AtlasEntityWithExtInfo entityWithExtInfo,
ExportContext context,
TraversalDirection direction) throws AtlasBaseException {
if (!context.lineageProcessed.contains(guid) && context.doesTimestampQualify(entityWithExtInfo.getEntity())) {
context.addToEntityCreationOrder(entityWithExtInfo.getEntity().getGuid());
}
addEntity(entityWithExtInfo, context);
exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), context);
......@@ -315,7 +305,7 @@ public class ExportService {
}
}
private void populateEntitesForIncremental(String topLevelEntityGuid, ExportContext context) throws AtlasBaseException {
private void populateEntitesForIncremental(String topLevelEntityGuid, ExportContext context) {
if (context.isHiveDBIncrementalSkipLineage() == false || incrementalExportEntityProvider != null) {
return;
}
......@@ -450,7 +440,6 @@ public class ExportService {
} else {
List<AtlasEntity> entities = context.getEntitiesWithModifiedTimestamp(entityWithExtInfo);
for (AtlasEntity e : entities) {
context.addToEntityCreationOrder(e.getGuid());
context.addToSink(new AtlasEntityWithExtInfo(e));
context.result.incrementMeticsCounter(String.format("entity:%s", e.getTypeName()));
}
......@@ -459,142 +448,6 @@ public class ExportService {
context.reportProgress();
}
private void addTypes(AtlasEntity entity, ExportContext context) {
addEntityType(entity.getTypeName(), context);
if(CollectionUtils.isNotEmpty(entity.getClassifications())) {
for (AtlasClassification c : entity.getClassifications()) {
addClassificationType(c.getTypeName(), context);
}
}
}
private void addType(String typeName, ExportContext context) {
AtlasType type = null;
try {
type = typeRegistry.getType(typeName);
addType(type, context);
} catch (AtlasBaseException excp) {
LOG.error("unknown type {}", typeName);
}
}
private void addEntityType(String typeName, ExportContext context) {
if (!context.entityTypes.contains(typeName)) {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
addEntityType(entityType, context);
}
}
private void addClassificationType(String typeName, ExportContext context) {
if (!context.classificationTypes.contains(typeName)) {
AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(typeName);
addClassificationType(classificationType, context);
}
}
private void addType(AtlasType type, ExportContext context) {
if (type.getTypeCategory() == TypeCategory.PRIMITIVE) {
return;
}
if (type instanceof AtlasArrayType) {
AtlasArrayType arrayType = (AtlasArrayType)type;
addType(arrayType.getElementType(), context);
} else if (type instanceof AtlasMapType) {
AtlasMapType mapType = (AtlasMapType)type;
addType(mapType.getKeyType(), context);
addType(mapType.getValueType(), context);
} else if (type instanceof AtlasEntityType) {
addEntityType((AtlasEntityType)type, context);
} else if (type instanceof AtlasClassificationType) {
addClassificationType((AtlasClassificationType)type, context);
} else if (type instanceof AtlasStructType) {
addStructType((AtlasStructType)type, context);
} else if (type instanceof AtlasEnumType) {
addEnumType((AtlasEnumType)type, context);
} else if (type instanceof AtlasRelationshipType) {
addRelationshipType(type.getTypeName(), context);
}
}
private void addEntityType(AtlasEntityType entityType, ExportContext context) {
if (!context.entityTypes.contains(entityType.getTypeName())) {
context.entityTypes.add(entityType.getTypeName());
addAttributeTypes(entityType, context);
addRelationshipTypes(entityType, context);
if (CollectionUtils.isNotEmpty(entityType.getAllSuperTypes())) {
for (String superType : entityType.getAllSuperTypes()) {
addEntityType(superType, context);
}
}
}
}
private void addClassificationType(AtlasClassificationType classificationType, ExportContext context) {
if (!context.classificationTypes.contains(classificationType.getTypeName())) {
context.classificationTypes.add(classificationType.getTypeName());
addAttributeTypes(classificationType, context);
if (CollectionUtils.isNotEmpty(classificationType.getAllSuperTypes())) {
for (String superType : classificationType.getAllSuperTypes()) {
addClassificationType(superType, context);
}
}
}
}
private void addStructType(AtlasStructType structType, ExportContext context) {
if (!context.structTypes.contains(structType.getTypeName())) {
context.structTypes.add(structType.getTypeName());
addAttributeTypes(structType, context);
}
}
private void addEnumType(AtlasEnumType enumType, ExportContext context) {
if (!context.enumTypes.contains(enumType.getTypeName())) {
context.enumTypes.add(enumType.getTypeName());
}
}
private void addRelationshipType(String relationshipTypeName, ExportContext context) {
if (!context.relationshipTypes.contains(relationshipTypeName)) {
AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationshipTypeName);
if (relationshipType != null) {
context.relationshipTypes.add(relationshipTypeName);
addAttributeTypes(relationshipType, context);
addEntityType(relationshipType.getEnd1Type(), context);
addEntityType(relationshipType.getEnd2Type(), context);
}
}
}
private void addAttributeTypes(AtlasStructType structType, ExportContext context) {
for (AtlasAttributeDef attributeDef : structType.getStructDef().getAttributeDefs()) {
addType(attributeDef.getTypeName(), context);
}
}
private void addRelationshipTypes(AtlasEntityType entityType, ExportContext context) {
for (Map.Entry<String, Map<String, AtlasAttribute>> entry : entityType.getRelationshipAttributes().entrySet()) {
for (String relationshipType : entry.getValue().keySet()) {
addRelationshipType(relationshipType, context);
}
}
}
private List<Map<String, Object>> executeGremlinQuery(String query, ExportContext context) {
try {
return (List<Map<String, Object>>) atlasGraph.executeGremlinScript(context.scriptEngine, context.bindings, query, false);
......@@ -740,6 +593,7 @@ public class ExportService {
}
public void addToSink(AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException {
addToEntityCreationOrder(entityWithExtInfo.getEntity().getGuid());
sink.add(entityWithExtInfo);
}
......@@ -750,9 +604,5 @@ public class ExportService {
public void addToEntityCreationOrder(String guid) {
entityCreationOrder.add(guid);
}
public void addToEntityCreationOrder(Collection<String> guids) {
entityCreationOrder.addAll(guids);
}
}
}
......@@ -42,11 +42,9 @@ class ExportTypeProcessor {
private static final Logger LOG = LoggerFactory.getLogger(ExportTypeProcessor.class);
private AtlasTypeRegistry typeRegistry;
private final ExportService.ExportContext context;
ExportTypeProcessor(AtlasTypeRegistry typeRegistry, ExportService.ExportContext context) {
ExportTypeProcessor(AtlasTypeRegistry typeRegistry) {
this.typeRegistry = typeRegistry;
this.context = context;
}
public void addTypes(AtlasEntity entity, ExportService.ExportContext context) {
......
......@@ -190,10 +190,10 @@ public class ZipSource implements EntityImportStream {
T t;
try {
t = AtlasType.fromJson(jsonData, clazz);
if(t == null) {
throw new AtlasBaseException("Error converting file to JSON.");
if (t == null) {
LOG.error("Error converting file to JSON.");
return null;
}
} catch (Exception e) {
throw new AtlasBaseException("Error converting file to JSON.", e);
}
......@@ -238,7 +238,7 @@ public class ZipSource implements EntityImportStream {
currentPosition++;
return getEntityWithExtInfo(this.iterator.next());
} catch (AtlasBaseException e) {
LOG.error("getNextEntityWithExtInfo", e);
LOG.warn("getNextEntityWithExtInfo", e);
return null;
}
}
......
......@@ -73,7 +73,7 @@ public class AdminExportImportTestIT extends BaseResourceIT {
@Test(dependsOnMethods = "importData")
public void exportData() throws AtlasServiceException, IOException, AtlasBaseException {
final int EXPECTED_CREATION_ORDER_SIZE = 10;
final int EXPECTED_CREATION_ORDER_SIZE = 6;
AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(".", EXPORT_REQUEST_FILE, AtlasExportRequest.class);
InputStream exportedStream = atlasClientV2.exportData(request);
......@@ -81,7 +81,7 @@ public class AdminExportImportTestIT extends BaseResourceIT {
ZipSource zs = new ZipSource(exportedStream);
assertNotNull(zs.getExportResult());
assertTrue(zs.getCreationOrder().size() > EXPECTED_CREATION_ORDER_SIZE, "expected creationOrderSize > " + EXPECTED_CREATION_ORDER_SIZE + ", but found " + zs.getCreationOrder().size());
assertTrue(zs.getCreationOrder().size() >= EXPECTED_CREATION_ORDER_SIZE, "expected creationOrderSize > " + EXPECTED_CREATION_ORDER_SIZE + ", but found " + zs.getCreationOrder().size());
}
private void performImport(String fileToImport, int expectedProcessedEntitiesCount) throws AtlasServiceException {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment