Commit 160b2874 by ashutoshm Committed by Madhan Neethiraj

ATLAS-1665: export optimization to reduce file-size and export-time

parent 537f6e31
......@@ -196,6 +196,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
}
sb.append("AtlasEntity{");
super.toString(sb);
sb.append("guid='").append(guid).append('\'');
sb.append(", status=").append(status);
sb.append(", createdBy='").append(createdBy).append('\'');
......@@ -207,7 +208,6 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
AtlasBaseTypeDef.dumpObjects(classifications, sb);
sb.append(']');
sb.append(", ");
super.toString(sb);
sb.append('}');
return sb;
......
......@@ -17,14 +17,6 @@
*/
package org.apache.atlas.repository.store.graph.v1;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
......@@ -34,12 +26,26 @@ import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
import org.apache.atlas.repository.store.graph.EntityResolver;
import org.apache.atlas.type.*;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasMapType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class AtlasEntityGraphDiscoveryV1 implements EntityGraphDiscovery {
private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityGraphDiscoveryV1.class);
......
......@@ -159,13 +159,14 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
int progressReportedAtCount = 0;
while (entityStream.hasNext()) {
AtlasEntity entity = entityStream.next();
AtlasEntityWithExtInfo entityWithExtInfo = entityStream.getNextEntityWithExtInfo();
AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
if(entity == null || processedGuids.contains(entity.getGuid())) {
continue;
}
AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entity, entityStream);
AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, entityStream);
EntityMutationResponse resp = createOrUpdate(oneEntityStream, false, true);
......@@ -177,7 +178,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
if ((processedGuids.size() - progressReportedAtCount) > 10) {
if ((processedGuids.size() - progressReportedAtCount) > 1000) {
progressReportedAtCount = processedGuids.size();
LOG.info("bulkImport(): in progress.. number of entities imported: {}", progressReportedAtCount);
......
......@@ -24,8 +24,8 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import java.util.Iterator;
public class AtlasEntityStream implements EntityStream {
private final AtlasEntitiesWithExtInfo entitiesWithExtInfo;
private final EntityStream entityStream;
protected final AtlasEntitiesWithExtInfo entitiesWithExtInfo;
protected final EntityStream entityStream;
private Iterator<AtlasEntity> iterator;
......@@ -49,6 +49,12 @@ public class AtlasEntityStream implements EntityStream {
this.entityStream = entityStream;
}
public AtlasEntityStream(AtlasEntityWithExtInfo entityWithExtInfo, EntityStream entityStream) {
this.entitiesWithExtInfo = new AtlasEntitiesWithExtInfo(entityWithExtInfo);
this.iterator = this.entitiesWithExtInfo.getEntities().iterator();
this.entityStream = entityStream;
}
@Override
public boolean hasNext() {
return iterator.hasNext();
......
......@@ -18,17 +18,29 @@
package org.apache.atlas.repository.store.graph.v1;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import java.util.List;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
public class AtlasEntityStreamForImport extends AtlasEntityStream implements EntityImportStream {
public AtlasEntityStreamForImport(AtlasEntity entity) {
super(entity);
public AtlasEntityStreamForImport(AtlasEntityWithExtInfo entityWithExtInfo, EntityStream entityStream) {
super(entityWithExtInfo, entityStream);
}
@Override
public AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
AtlasEntity entity = next();
return entity != null ? new AtlasEntityWithExtInfo(entity, super.entitiesWithExtInfo) : null;
}
@Override
public AtlasEntity getByGuid(String guid) {
AtlasEntity ent = super.entitiesWithExtInfo.getEntity(guid);
if(ent == null && entityStream != null) {
return entityStream.getByGuid(guid);
}
public AtlasEntityStreamForImport(AtlasEntity entity, EntityStream entityStream) {
super(entity, entityStream);
return ent;
}
@Override
......
......@@ -18,7 +18,11 @@
package org.apache.atlas.repository.store.graph.v1;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
public interface EntityImportStream extends EntityStream {
AtlasEntityWithExtInfo getNextEntityWithExtInfo();
void onImportComplete(String guid);
}
......@@ -18,7 +18,6 @@
package org.apache.atlas.repository.store.graph.v1;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
public interface EntityStream {
......
......@@ -19,9 +19,7 @@ package org.apache.atlas.repository.store.graph.v1;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
......
......@@ -38,11 +38,11 @@ public class AtlasGremlin2QueryProvider extends AtlasGremlinQueryProvider {
case ENTITIES_FOR_TAG_METRIC:
return "g.V().has('__typeName', T.in, g.V().has('__type', 'typeSystem').filter{it.getProperty('__type.category').name() == 'TRAIT'}.'__type.name'.toSet()).groupCount{it.getProperty('__typeName')}.cap.toList()";
case EXPORT_BY_GUID_FULL:
return "g.V('__guid', startGuid).bothE().bothV().has('__guid').__guid.dedup().toList()";
return "g.V('__guid', startGuid).bothE().bothV().has('__guid').transform{[__guid:it.__guid,isProcess:(it.__superTypeNames != null) ? it.__superTypeNames.contains('Process') : false ]}.dedup().toList()";
case EXPORT_BY_GUID_CONNECTED_IN_EDGE:
return "g.V('__guid', startGuid).inE().outV().has('__guid').__guid.dedup().toList()";
return "g.V('__guid', startGuid).inE().outV().has('__guid').transform{[__guid:it.__guid,isProcess:(it.__superTypeNames != null) ? it.__superTypeNames.contains('Process') : false ]}.dedup().toList()";
case EXPORT_BY_GUID_CONNECTED_OUT_EDGE:
return "g.V('__guid', startGuid).outE().inV().has('__guid').__guid.dedup().toList()";
return "g.V('__guid', startGuid).outE().inV().has('__guid').transform{[__guid:it.__guid,isProcess:(it.__superTypeNames != null) ? it.__superTypeNames.contains('Process') : false ]}.dedup().toList()";
case EXPORT_TYPE_STARTS_WITH:
return "g.V().has('__typeName',typeName).filter({it.getProperty(attrName).startsWith(attrValue)}).has('__guid').__guid.toList()";
case EXPORT_TYPE_ENDS_WITH:
......
......@@ -25,6 +25,7 @@ import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
......@@ -55,14 +56,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_FETCH_TYPE;
import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_ATTR_MATCH_TYPE;
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_FULL;
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_CONNECTED;
import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_STARTS_WITH;
import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_CONTAINS;
import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_MATCHES;
import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_ENDS_WITH;
import static org.apache.atlas.model.impexp.AtlasExportRequest.*;
public class ExportService {
private static final Logger LOG = LoggerFactory.getLogger(ExportService.class);
......@@ -119,18 +113,22 @@ public class ExportService {
}
try {
List<AtlasEntity> entities = getStartingEntity(item, context);
List<AtlasEntityWithExtInfo> entities = getStartingEntity(item, context);
for (AtlasEntity entity: entities) {
processEntity(entity, context, TraversalDirection.UNKNOWN);
for (AtlasEntityWithExtInfo entityWithExtInfo : entities) {
processEntity(entityWithExtInfo.getEntity().getGuid(), context);
}
while (!context.guidsToProcessIsEmpty()) {
String guid = context.guidsToProcessRemove(0);
TraversalDirection direction = context.guidDirection.get(guid);
AtlasEntity entity = entityGraphRetriever.toAtlasEntity(guid);
while (!context.guidsToProcess.isEmpty()) {
while (!context.guidsToProcess.isEmpty()) {
String guid = context.guidsToProcess.remove(0);
processEntity(guid, context);
}
processEntity(entity, context, direction);
if (!context.guidsLineageToProcess.isEmpty()) {
context.guidsToProcess.addAll(context.guidsLineageToProcess);
context.guidsLineageToProcess.clear();
}
}
} catch (AtlasBaseException excp) {
context.result.setOperationStatus(AtlasExportResult.OperationStatus.PARTIAL_SUCCESS);
......@@ -143,11 +141,11 @@ public class ExportService {
}
}
private List<AtlasEntity> getStartingEntity(AtlasObjectId item, ExportContext context) throws AtlasBaseException {
List<AtlasEntity> ret = new ArrayList<>();
private List<AtlasEntityWithExtInfo> getStartingEntity(AtlasObjectId item, ExportContext context) throws AtlasBaseException {
List<AtlasEntityWithExtInfo> ret = new ArrayList<>();
if (StringUtils.isNotEmpty(item.getGuid())) {
AtlasEntity entity = entityGraphRetriever.toAtlasEntity(item);
AtlasEntityWithExtInfo entity = entityGraphRetriever.toAtlasEntityWithExtInfo(item);
if (entity != null) {
ret = Collections.singletonList(entity);
......@@ -188,17 +186,17 @@ public class ExportService {
context.bindings.put("attrName", attribute.getQualifiedName());
context.bindings.put("attrValue", attrValue);
List<String> guids = executeGremlinQuery(queryTemplate, context);
List<String> guids = executeGremlinQueryForGuids(queryTemplate, context);
if (CollectionUtils.isNotEmpty(guids)) {
for (String guid : guids) {
AtlasEntity entity = entityGraphRetriever.toAtlasEntity(guid);
AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
if (entity == null) {
if (entityWithExtInfo == null) {
continue;
}
ret.add(entity);
ret.add(entityWithExtInfo);
}
}
......@@ -211,24 +209,37 @@ public class ExportService {
return ret;
}
private void processEntity(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException {
private void processEntity(String guid, ExportContext context) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> processEntity({})", AtlasTypeUtil.getAtlasObjectId(entity));
LOG.debug("==> processEntity({})", guid);
}
if (!context.guidsProcessed.contains(entity.getGuid())) {
context.guidsProcessed.add(entity.getGuid());
context.result.getData().getEntityCreationOrder().add(entity.getGuid());
if (!context.guidsProcessed.contains(guid)) {
TraversalDirection direction = context.guidDirection.get(guid);
AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid());
addEntity(entityWithExtInfo, context);
addTypesAsNeeded(entityWithExtInfo.getEntity().getTypeName(), context);
addClassificationsAsNeeded(entityWithExtInfo.getEntity(), context);
addTypesAsNeeded(entity.getTypeName(), context);
addClassificationsAsNeeded(entity, context);
addEntity(entity, context);
context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid());
getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), context, direction);
getConntedEntitiesBasedOnOption(entity, context, direction);
if(entityWithExtInfo.getReferredEntities() != null) {
for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
addTypesAsNeeded(e.getTypeName(), context);
addClassificationsAsNeeded(e, context);
getConntedEntitiesBasedOnOption(e, context, direction);
}
context.guidsProcessed.addAll(entityWithExtInfo.getReferredEntities().keySet());
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== processEntity({})", AtlasTypeUtil.getAtlasObjectId(entity));
LOG.debug("<== processEntity({})", guid);
}
}
......@@ -245,7 +256,7 @@ public class ExportService {
}
private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException {
if (direction == TraversalDirection.UNKNOWN) {
if (direction == null || direction == TraversalDirection.UNKNOWN) {
getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.INWARD);
} else {
if (isProcessEntity(entity)) {
......@@ -272,41 +283,35 @@ public class ExportService {
String query = getQueryForTraversalDirection(direction);
if (LOG.isDebugEnabled()) {
LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcessSize(), query);
LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), query);
}
context.bindings.clear();
context.bindings.put("startGuid", entity.getGuid());
List<String> guids = executeGremlinQuery(query, context);
List<HashMap<String, Object>> result = executeGremlinQuery(query, context);
if (CollectionUtils.isEmpty(guids)) {
if (CollectionUtils.isEmpty(result)) {
continue;
}
for (String guid : guids) {
for (HashMap<String, Object> hashMap : result) {
String guid = (String) hashMap.get("__guid");
TraversalDirection currentDirection = context.guidDirection.get(guid);
boolean isLineage = (boolean) hashMap.get("isProcess");
if (currentDirection == null) {
context.guidDirection.put(guid, direction);
context.addToBeProcessed(isLineage, guid, direction);
if (!context.guidsToProcessContains(guid)) {
context.guidsToProcessAdd(guid);
}
} else if (currentDirection == TraversalDirection.OUTWARD && direction == TraversalDirection.INWARD) {
context.guidDirection.put(guid, direction);
// the entity should be reprocessed to get inward entities
context.guidsProcessed.remove(guid);
if (!context.guidsToProcessContains(guid)) {
context.guidsToProcessAdd(guid);
}
context.addToBeProcessed(isLineage, guid, direction);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), guids.size(), context.guidsToProcessSize());
LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size());
}
}
}
......@@ -324,7 +329,7 @@ public class ExportService {
private void getEntityGuidsForFullFetch(AtlasEntity entity, ExportContext context) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcessSize());
LOG.debug("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
}
String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL);
......@@ -332,36 +337,38 @@ public class ExportService {
context.bindings.clear();
context.bindings.put("startGuid", entity.getGuid());
List<String> result = executeGremlinQuery(query, context);
List<HashMap<String, Object>> result = executeGremlinQuery(query, context);
if (result == null) {
if (CollectionUtils.isEmpty(result)) {
return;
}
for (String guid : result) {
if (!context.guidsProcessed.contains(guid)) {
if (!context.guidsToProcessContains(guid)) {
context.guidsToProcessAdd(guid);
}
for (HashMap<String, Object> hashMap : result) {
String guid = (String) hashMap.get("__guid");
boolean isLineage = (boolean) hashMap.get("isProcess");
context.guidDirection.put(guid, TraversalDirection.BOTH);
if (!context.guidsProcessed.contains(guid)) {
context.addToBeProcessed(isLineage, guid, TraversalDirection.BOTH);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcessSize());
LOG.debug("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size());
}
}
private void addEntity(AtlasEntity entity, ExportContext context) throws AtlasBaseException {
private void addEntity(AtlasEntityWithExtInfo entity, ExportContext context) throws AtlasBaseException {
context.sink.add(entity);
context.result.incrementMeticsCounter(String.format("entity:%s", entity.getTypeName()));
context.result.incrementMeticsCounter("entities");
if (context.guidsProcessed.size() % 10 == 0) {
LOG.info("export(): in progress.. number of entities exported: {}", context.guidsProcessed.size());
context.result.incrementMeticsCounter(String.format("entity:%s", entity.getEntity().getTypeName()));
if(entity.getReferredEntities() != null) {
for (AtlasEntity e: entity.getReferredEntities().values()) {
context.result.incrementMeticsCounter(String.format("entity:%s", e.getTypeName()));
}
}
context.result.incrementMeticsCounter("entity:withExtInfo");
context.reportProgress();
}
private void addClassificationsAsNeeded(AtlasEntity entity, ExportContext context) {
......@@ -394,15 +401,23 @@ public class ExportService {
}
}
private List<String> executeGremlinQuery(String query, ExportContext context) {
private List<HashMap<String, Object>> executeGremlinQuery(String query, ExportContext context) {
try {
return (List<String>) atlasGraph.executeGremlinScript(context.scriptEngine, context.bindings, query, false);
return (List<HashMap<String, Object>>) atlasGraph.executeGremlinScript(context.scriptEngine, context.bindings, query, false);
} catch (ScriptException e) {
LOG.error("Script execution failed for query: ", query, e);
return null;
}
}
private List<String> executeGremlinQueryForGuids(String query, ExportContext context) {
try {
return (List<String>) atlasGraph.executeGremlinScript(context.scriptEngine, context.bindings, query, false);
} catch (ScriptException e) {
LOG.error("Script execution failed for query: ", query, e);
return null;
}
}
private enum TraversalDirection {
UNKNOWN,
......@@ -432,11 +447,57 @@ public class ExportService {
}
}
private class UniqueList<T> {
private final List<T> list = new ArrayList<>();
private final Set<T> set = new HashSet<>();
public void add(T e) {
if(set.contains(e)) {
return;
}
list.add(e);
set.add(e);
}
public void addAll(UniqueList<T> uniqueList) {
for (T item : uniqueList.list) {
if(set.contains(item)) continue;
set.add(item);
list.add(item);
}
}
public T remove(int index) {
T e = list.remove(index);
set.remove(e);
return e;
}
public boolean contains(T e) {
return set.contains(e);
}
public int size() {
return list.size();
}
public boolean isEmpty() {
return list.isEmpty();
}
public void clear() {
list.clear();
set.clear();
}
}
private class ExportContext {
final Set<String> guidsProcessed = new HashSet<>();
private final List<String> guidsToProcessList = new ArrayList<>();
private final Set<String> guidsToProcessSet = new HashSet<>();
final UniqueList<String> guidsToProcess = new UniqueList<>();
final UniqueList<String> guidsLineageToProcess = new UniqueList<>();
final Map<String, TraversalDirection> guidDirection = new HashMap<>();
final AtlasExportResult result;
final ZipSink sink;
......@@ -446,6 +507,8 @@ public class ExportService {
private final ExportFetchType fetchType;
private final String matchType;
private int progressReportCount = 0;
ExportContext(AtlasExportResult result, ZipSink sink) {
this.result = result;
this.sink = sink;
......@@ -481,33 +544,30 @@ public class ExportService {
}
public void clear() {
guidsToProcessList.clear();
guidsToProcessSet.clear();
guidsToProcess.clear();
guidsProcessed.clear();
guidDirection.clear();
}
public boolean guidsToProcessIsEmpty() {
return this.guidsToProcessList.isEmpty();
public void addToBeProcessed(boolean isSuperTypeProcess, String guid, TraversalDirection direction) {
if(!isSuperTypeProcess) {
guidsToProcess.add(guid);
}
public String guidsToProcessRemove(int i) {
String s = this.guidsToProcessList.remove(i);
guidsToProcessSet.remove(s);
return s;
if(isSuperTypeProcess) {
guidsLineageToProcess.add(guid);
}
public int guidsToProcessSize() {
return this.guidsToProcessList.size();
guidDirection.put(guid, direction);
}
public boolean guidsToProcessContains(String guid) {
return guidsToProcessSet.contains(guid);
}
public void reportProgress() {
if ((guidsProcessed.size() - progressReportCount) > 1000) {
progressReportCount = guidsProcessed.size();
public void guidsToProcessAdd(String guid) {
this.guidsToProcessList.add(guid);
guidsToProcessSet.add(guid);
LOG.info("export(): in progress.. number of entities exported: {}", this.guidsProcessed.size());
}
}
}
}
......@@ -45,6 +45,11 @@ public class ZipSink {
saveToZip(entity.getGuid(), jsonData);
}
public void add(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException {
String jsonData = convertToJSON(entityWithExtInfo);
saveToZip(entityWithExtInfo.getEntity().getGuid(), jsonData);
}
public void setResult(AtlasExportResult result) throws AtlasBaseException {
String jsonData = convertToJSON(result);
saveToZip(ZipExportFileNames.ATLAS_EXPORT_INFO_NAME, jsonData);
......
......@@ -17,17 +17,19 @@
*/
package org.apache.atlas.web.resources;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.codehaus.jackson.type.TypeReference;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
......@@ -57,7 +59,7 @@ public class ZipSource implements EntityImportStream {
public AtlasTypesDef getTypesDef() throws AtlasBaseException {
final String fileName = ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString();
String s = getFromCache(fileName);
String s = (String) getFromCache(fileName);
return convertFromJson(AtlasTypesDef.class, s);
}
......@@ -104,9 +106,10 @@ public class ZipSource implements EntityImportStream {
return this.creationOrder;
}
public AtlasEntity getEntity(String guid) throws AtlasBaseException {
String s = getFromCache(guid);
return convertFromJson(AtlasEntity.class, s);
public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException {
String s = (String) getFromCache(guid);
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, s);
return entityWithExtInfo;
}
private <T> T convertFromJson(TypeReference clazz, String jsonData) throws AtlasBaseException {
......@@ -136,9 +139,7 @@ public class ZipSource implements EntityImportStream {
}
private String getFromCache(String entryName) {
if(!guidEntityJsonMap.containsKey(entryName)) return "";
return guidEntityJsonMap.get(entryName).toString();
return guidEntityJsonMap.get(entryName);
}
public void close() {
......@@ -158,8 +159,15 @@ public class ZipSource implements EntityImportStream {
@Override
public AtlasEntity next() {
AtlasEntityWithExtInfo entityWithExtInfo = getNextEntityWithExtInfo();
return entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
}
@Override
public AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
try {
return getEntity(this.iterator.next());
return getEntityWithExtInfo(this.iterator.next());
} catch (AtlasBaseException e) {
e.printStackTrace();
return null;
......@@ -186,10 +194,16 @@ public class ZipSource implements EntityImportStream {
}
}
private AtlasEntity getEntity(String guid) throws AtlasBaseException {
if(guidEntityJsonMap.containsKey(guid)) {
return getEntityWithExtInfo(guid).getEntity();
}
return null;
}
@Override
public void onImportComplete(String guid) {
if(guid != null) {
guidEntityJsonMap.remove(guid);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment