Commit 2dd0f070 by Madhan Neethiraj

ATLAS-1684: export should include super-type definitions, import should preserve…

ATLAS-1684: export should include super-type definitions, import should preserve system attribute values
parent 41839141
...@@ -233,6 +233,9 @@ public class AtlasEntityChangeNotifier { ...@@ -233,6 +233,9 @@ public class AtlasEntityChangeNotifier {
} }
AtlasVertex atlasVertex = AtlasGraphUtilsV1.findByGuid(entityId); AtlasVertex atlasVertex = AtlasGraphUtilsV1.findByGuid(entityId);
if(atlasVertex == null) {
return;
}
if (atlasVertex == null) { if (atlasVertex == null) {
LOG.warn("updateFullTextMapping(): no entity exists with guid {}", entityId); LOG.warn("updateFullTextMapping(): no entity exists with guid {}", entityId);
......
...@@ -170,10 +170,6 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { ...@@ -170,10 +170,6 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
EntityMutationResponse resp = createOrUpdate(oneEntityStream, false, true); EntityMutationResponse resp = createOrUpdate(oneEntityStream, false, true);
if(CollectionUtils.isNotEmpty(entity.getClassifications())) {
addClassifications(entity.getGuid(), entity.getClassifications());
}
updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult); updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult); updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult); updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
...@@ -567,6 +563,11 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { ...@@ -567,6 +563,11 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
context.addCreated(guid, entity, entityType, vertex); context.addCreated(guid, entity, entityType, vertex);
} }
// during import, update the system attributes
if (entityStream instanceof EntityImportStream) {
entityGraphMapper.updateSystemAttributes(vertex, entity);
}
} }
} }
......
...@@ -51,6 +51,7 @@ import org.apache.atlas.type.AtlasTypeUtil; ...@@ -51,6 +51,7 @@ import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute; import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils; import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -112,6 +113,28 @@ public class EntityGraphMapper { ...@@ -112,6 +113,28 @@ public class EntityGraphMapper {
return ret; return ret;
} }
public void updateSystemAttributes(AtlasVertex vertex, AtlasEntity entity) {
if (entity.getStatus() != null) {
AtlasGraphUtilsV1.setProperty(vertex, Constants.STATE_PROPERTY_KEY, entity.getStatus().name());
}
if (entity.getCreateTime() != null) {
AtlasGraphUtilsV1.setProperty(vertex, Constants.TIMESTAMP_PROPERTY_KEY, entity.getCreateTime().getTime());
}
if (entity.getUpdateTime() != null) {
AtlasGraphUtilsV1.setProperty(vertex, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, entity.getUpdateTime().getTime());
}
if (StringUtils.isNotEmpty(entity.getCreatedBy())) {
AtlasGraphUtilsV1.setProperty(vertex, Constants.CREATED_BY_KEY, entity.getCreatedBy());
}
if (StringUtils.isNotEmpty(entity.getUpdatedBy())) {
AtlasGraphUtilsV1.setProperty(vertex, Constants.MODIFIED_BY_KEY, entity.getUpdatedBy());
}
}
public EntityMutationResponse mapAttributesAndClassifications(EntityMutationContext context, final boolean isPartialUpdate, final boolean replaceClassifications) throws AtlasBaseException { public EntityMutationResponse mapAttributesAndClassifications(EntityMutationContext context, final boolean isPartialUpdate, final boolean replaceClassifications) throws AtlasBaseException {
EntityMutationResponse resp = new EntityMutationResponse(); EntityMutationResponse resp = new EntityMutationResponse();
......
...@@ -21,6 +21,7 @@ import org.apache.atlas.AtlasErrorCode; ...@@ -21,6 +21,7 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException; import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult; import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasClassification;
...@@ -29,13 +30,22 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; ...@@ -29,13 +30,22 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef; import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasClassificationDef; import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasEnumDef;
import org.apache.atlas.model.typedef.AtlasEntityDef; import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever; import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEnumType;
import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasMapType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute; import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil; import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.util.AtlasGremlinQueryProvider; import org.apache.atlas.util.AtlasGremlinQueryProvider;
...@@ -88,6 +98,32 @@ public class ExportService { ...@@ -88,6 +98,32 @@ public class ExportService {
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
AtlasTypesDef typesDef = context.result.getData().getTypesDef();
for (String entityType : context.entityTypes) {
AtlasEntityDef entityDef = typeRegistry.getEntityDefByName(entityType);
typesDef.getEntityDefs().add(entityDef);
}
for (String classificationType : context.classificationTypes) {
AtlasClassificationDef classificationDef = typeRegistry.getClassificationDefByName(classificationType);
typesDef.getClassificationDefs().add(classificationDef);
}
for (String structType : context.structTypes) {
AtlasStructDef structDef = typeRegistry.getStructDefByName(structType);
typesDef.getStructDefs().add(structDef);
}
for (String enumType : context.enumTypes) {
AtlasEnumDef enumDef = typeRegistry.getEnumDefByName(enumType);
typesDef.getEnumDefs().add(enumDef);
}
context.sink.setExportOrder(context.result.getData().getEntityCreationOrder()); context.sink.setExportOrder(context.result.getData().getEntityCreationOrder());
context.sink.setTypesDef(context.result.getData().getTypesDef()); context.sink.setTypesDef(context.result.getData().getTypesDef());
context.result.setData(null); context.result.setData(null);
...@@ -221,16 +257,14 @@ public class ExportService { ...@@ -221,16 +257,14 @@ public class ExportService {
context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid()); context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid());
addEntity(entityWithExtInfo, context); addEntity(entityWithExtInfo, context);
addTypesAsNeeded(entityWithExtInfo.getEntity().getTypeName(), context); addTypes(entityWithExtInfo.getEntity(), context);
addClassificationsAsNeeded(entityWithExtInfo.getEntity(), context);
context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid()); context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid());
getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), context, direction); getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), context, direction);
if(entityWithExtInfo.getReferredEntities() != null) { if(entityWithExtInfo.getReferredEntities() != null) {
for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) { for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
addTypesAsNeeded(e.getTypeName(), context); addTypes(e, context);
addClassificationsAsNeeded(e, context);
getConntedEntitiesBasedOnOption(e, context, direction); getConntedEntitiesBasedOnOption(e, context, direction);
} }
...@@ -371,33 +405,114 @@ public class ExportService { ...@@ -371,33 +405,114 @@ public class ExportService {
context.reportProgress(); context.reportProgress();
} }
private void addClassificationsAsNeeded(AtlasEntity entity, ExportContext context) { private void addTypes(AtlasEntity entity, ExportContext context) {
AtlasExportResult result = context.result; addEntityType(entity.getTypeName(), context);
AtlasTypesDef typesDef = result.getData().getTypesDef();
if(CollectionUtils.isNotEmpty(entity.getClassifications())) { if(CollectionUtils.isNotEmpty(entity.getClassifications())) {
for (AtlasClassification c : entity.getClassifications()) { for (AtlasClassification c : entity.getClassifications()) {
if (typesDef.hasClassificationDef(c.getTypeName())) { addClassificationType(c.getTypeName(), context);
continue; }
}
}
private void addType(String typeName, ExportContext context) {
AtlasType type = null;
try {
type = typeRegistry.getType(typeName);
addType(type, context);
} catch (AtlasBaseException excp) {
LOG.error("unknown type {}", typeName);
}
}
private void addEntityType(String typeName, ExportContext context) {
if (!context.entityTypes.contains(typeName)) {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
addEntityType(entityType, context);
}
}
private void addClassificationType(String typeName, ExportContext context) {
if (!context.classificationTypes.contains(typeName)) {
AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(typeName);
addClassificationType(classificationType, context);
}
}
private void addType(AtlasType type, ExportContext context) {
if (type.getTypeCategory() == TypeCategory.PRIMITIVE) {
return;
}
if (type instanceof AtlasArrayType) {
AtlasArrayType arrayType = (AtlasArrayType)type;
addType(arrayType.getElementType(), context);
} else if (type instanceof AtlasMapType) {
AtlasMapType mapType = (AtlasMapType)type;
addType(mapType.getKeyType(), context);
addType(mapType.getValueType(), context);
} else if (type instanceof AtlasEntityType) {
addEntityType((AtlasEntityType)type, context);
} else if (type instanceof AtlasClassificationType) {
addClassificationType((AtlasClassificationType)type, context);
} else if (type instanceof AtlasStructType) {
addStructType((AtlasStructType)type, context);
} else if (type instanceof AtlasEnumType) {
addEnumType((AtlasEnumType)type, context);
}
}
private void addEntityType(AtlasEntityType entityType, ExportContext context) {
if (!context.entityTypes.contains(entityType.getTypeName())) {
context.entityTypes.add(entityType.getTypeName());
addAttributeTypes(entityType, context);
if (CollectionUtils.isNotEmpty(entityType.getAllSuperTypes())) {
for (String superType : entityType.getAllSuperTypes()) {
addEntityType(superType, context);
} }
}
}
}
private void addClassificationType(AtlasClassificationType classificationType, ExportContext context) {
if (!context.classificationTypes.contains(classificationType.getTypeName())) {
context.classificationTypes.add(classificationType.getTypeName());
AtlasClassificationDef cd = typeRegistry.getClassificationDefByName(c.getTypeName()); addAttributeTypes(classificationType, context);
typesDef.getClassificationDefs().add(cd); if (CollectionUtils.isNotEmpty(classificationType.getAllSuperTypes())) {
result.incrementMeticsCounter("typedef:classification"); for (String superType : classificationType.getAllSuperTypes()) {
addClassificationType(superType, context);
}
} }
} }
} }
private void addTypesAsNeeded(String typeName, ExportContext context) { private void addStructType(AtlasStructType structType, ExportContext context) {
AtlasExportResult result = context.result; if (!context.structTypes.contains(structType.getTypeName())) {
AtlasTypesDef typesDef = result.getData().getTypesDef(); context.structTypes.add(structType.getTypeName());
addAttributeTypes(structType, context);
}
}
if(!typesDef.hasEntityDef(typeName)) { private void addEnumType(AtlasEnumType enumType, ExportContext context) {
AtlasEntityDef typeDefinition = typeRegistry.getEntityDefByName(typeName); if (!context.enumTypes.contains(enumType.getTypeName())) {
context.enumTypes.add(enumType.getTypeName());
}
}
typesDef.getEntityDefs().add(typeDefinition); private void addAttributeTypes(AtlasStructType structType, ExportContext context) {
result.incrementMeticsCounter("typedef:" + typeDefinition.getName()); for (AtlasAttributeDef attributeDef : structType.getStructDef().getAttributeDefs()) {
addType(attributeDef.getTypeName(), context);
} }
} }
...@@ -499,6 +614,10 @@ public class ExportService { ...@@ -499,6 +614,10 @@ public class ExportService {
final UniqueList<String> guidsToProcess = new UniqueList<>(); final UniqueList<String> guidsToProcess = new UniqueList<>();
final UniqueList<String> guidsLineageToProcess = new UniqueList<>(); final UniqueList<String> guidsLineageToProcess = new UniqueList<>();
final Map<String, TraversalDirection> guidDirection = new HashMap<>(); final Map<String, TraversalDirection> guidDirection = new HashMap<>();
final Set<String> entityTypes = new HashSet<>();
final Set<String> classificationTypes = new HashSet<>();
final Set<String> structTypes = new HashSet<>();
final Set<String> enumTypes = new HashSet<>();
final AtlasExportResult result; final AtlasExportResult result;
final ZipSink sink; final ZipSink sink;
......
...@@ -120,13 +120,14 @@ public class ImportService { ...@@ -120,13 +120,14 @@ public class ImportService {
private void processTypes(AtlasTypesDef typeDefinitionMap, AtlasImportResult result) throws AtlasBaseException { private void processTypes(AtlasTypesDef typeDefinitionMap, AtlasImportResult result) throws AtlasBaseException {
setGuidToEmpty(typeDefinitionMap); setGuidToEmpty(typeDefinitionMap);
AtlasTypesDef typesToCreate = AtlasTypeDefStoreInitializer.getTypesToCreate(typeDefinitionMap, this.typeRegistry); AtlasTypesDef typesToCreate = AtlasTypeDefStoreInitializer.getTypesToCreate(typeDefinitionMap, this.typeRegistry);
if (!typesToCreate.isEmpty()) { if (!typesToCreate.isEmpty()) {
typeDefStore.createTypesDef(typesToCreate); typeDefStore.createTypesDef(typesToCreate);
}
typeDefStore.updateTypesDef(typeDefinitionMap); updateMetricsForTypesDef(typesToCreate, result);
updateMetricsForTypesDef(typeDefinitionMap, result); }
} }
private void updateMetricsForTypesDef(AtlasTypesDef typeDefinitionMap, AtlasImportResult result) { private void updateMetricsForTypesDef(AtlasTypesDef typeDefinitionMap, AtlasImportResult result) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment