Commit 525082ba by ashutoshm Committed by Madhan Neethiraj

ATLAS-1618: updated export to support scope option - full/connected

parent 9bddaeb3
...@@ -18,12 +18,6 @@ ...@@ -18,12 +18,6 @@
package org.apache.atlas.model.impexp; package org.apache.atlas.model.impexp;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef; import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.codehaus.jackson.annotate.JsonAutoDetect; import org.codehaus.jackson.annotate.JsonAutoDetect;
...@@ -33,6 +27,11 @@ import org.codehaus.jackson.map.annotate.JsonSerialize; ...@@ -33,6 +27,11 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE; import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE;
import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY; import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY;
...@@ -44,12 +43,17 @@ import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONL ...@@ -44,12 +43,17 @@ import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONL
@XmlRootElement @XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY) @XmlAccessorType(XmlAccessType.PROPERTY)
public class AtlasExportRequest implements Serializable { public class AtlasExportRequest implements Serializable {
private static final long serialVersionUID = 1L;
public static final String EXPORT_PARAM_OPTION = "FORMAT"; private static final long serialVersionUID = 1L;
public static final String EXPORT_PARAM_OPTION_FORMAT_JSON = "JSON";
public static final String EXPORT_PARAM_OPTION_FORMAT_ZIP = "ZIP";
public static final String OPTION_FETCH_TYPE = "fetchType";
public static final String OPTION_ATTR_MATCH_TYPE = "matchType";
public static final String FETCH_TYPE_FULL = "full";
public static final String FETCH_TYPE_CONNECTED = "connected";
public static final String MATCH_TYPE_STARTS_WITH = "startsWith";
public static final String MATCH_TYPE_ENDS_WITH = "endsWith";
public static final String MATCH_TYPE_CONTAINS = "contains";
public static final String MATCH_TYPE_MATCHES = "matches";
private List<AtlasObjectId> itemsToExport = new ArrayList<>(); private List<AtlasObjectId> itemsToExport = new ArrayList<>();
private Map<String, Object> options = new HashMap<>(); private Map<String, Object> options = new HashMap<>();
......
...@@ -23,9 +23,10 @@ import org.apache.atlas.exception.AtlasBaseException; ...@@ -23,9 +23,10 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
import org.apache.atlas.repository.store.graph.v1.EntityStream; import org.apache.atlas.repository.store.graph.v1.EntityStream;
import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasEntityType;
...@@ -76,7 +77,7 @@ public interface AtlasEntityStore { ...@@ -76,7 +77,7 @@ public interface AtlasEntityStore {
* @return EntityMutationResponse Entity mutations operations with the corresponding set of entities on which these operations were performed * @return EntityMutationResponse Entity mutations operations with the corresponding set of entities on which these operations were performed
* @throws AtlasBaseException * @throws AtlasBaseException
*/ */
EntityMutationResponse bulkImport(EntityStream entityStream, AtlasImportResult importResult) throws AtlasBaseException; EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException;
/** /**
* Update a single entity * Update a single entity
......
...@@ -142,7 +142,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { ...@@ -142,7 +142,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
} }
@Override @Override
public EntityMutationResponse bulkImport(EntityStream entityStream, AtlasImportResult importResult) throws AtlasBaseException { public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("==> bulkImport()"); LOG.debug("==> bulkImport()");
} }
...@@ -168,6 +168,10 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { ...@@ -168,6 +168,10 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
EntityMutationResponse resp = createOrUpdate(oneEntityStream, false, true); EntityMutationResponse resp = createOrUpdate(oneEntityStream, false, true);
if(CollectionUtils.isNotEmpty(entity.getClassifications())) {
addClassifications(entity.getGuid(), entity.getClassifications());
}
updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult); updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult); updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult); updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
...@@ -181,6 +185,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { ...@@ -181,6 +185,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
if (resp.getGuidAssignments() != null) { if (resp.getGuidAssignments() != null) {
ret.getGuidAssignments().putAll(resp.getGuidAssignments()); ret.getGuidAssignments().putAll(resp.getGuidAssignments());
} }
entityStream.onImportComplete(entity.getGuid());
} }
importResult.getProcessedEntities().addAll(processedGuids); importResult.getProcessedEntities().addAll(processedGuids);
......
...@@ -18,6 +18,9 @@ ...@@ -18,6 +18,9 @@
package org.apache.atlas.repository.store.graph.v1; package org.apache.atlas.repository.store.graph.v1;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import java.util.List;
public class AtlasEntityStreamForImport extends AtlasEntityStream implements EntityImportStream { public class AtlasEntityStreamForImport extends AtlasEntityStream implements EntityImportStream {
public AtlasEntityStreamForImport(AtlasEntity entity) { public AtlasEntityStreamForImport(AtlasEntity entity) {
...@@ -27,4 +30,9 @@ public class AtlasEntityStreamForImport extends AtlasEntityStream implements Ent ...@@ -27,4 +30,9 @@ public class AtlasEntityStreamForImport extends AtlasEntityStream implements Ent
public AtlasEntityStreamForImport(AtlasEntity entity, EntityStream entityStream) { public AtlasEntityStreamForImport(AtlasEntity entity, EntityStream entityStream) {
super(entity, entityStream); super(entity, entityStream);
} }
@Override
public void onImportComplete(String guid) {
}
} }
...@@ -17,5 +17,8 @@ ...@@ -17,5 +17,8 @@
*/ */
package org.apache.atlas.repository.store.graph.v1; package org.apache.atlas.repository.store.graph.v1;
public interface EntityImportStream extends EntityStream { public interface EntityImportStream extends EntityStream {
void onImportComplete(String guid);
} }
...@@ -37,8 +37,12 @@ public class AtlasGremlin2QueryProvider extends AtlasGremlinQueryProvider { ...@@ -37,8 +37,12 @@ public class AtlasGremlin2QueryProvider extends AtlasGremlinQueryProvider {
return "g.V().has('__superTypeNames', T.in, ['Referenceable']).has('__traitNames').count()"; return "g.V().has('__superTypeNames', T.in, ['Referenceable']).has('__traitNames').count()";
case ENTITIES_FOR_TAG_METRIC: case ENTITIES_FOR_TAG_METRIC:
return "g.V().has('__typeName', T.in, g.V().has('__type', 'typeSystem').filter{it.'__type.category'.name() == 'TRAIT'}.'__type.name'.toSet()).groupCount{it.'__typeName'}.cap.toList()"; return "g.V().has('__typeName', T.in, g.V().has('__type', 'typeSystem').filter{it.'__type.category'.name() == 'TRAIT'}.'__type.name'.toSet()).groupCount{it.'__typeName'}.cap.toList()";
case EXPORT_BY_GUID: case EXPORT_BY_GUID_FULL:
return "g.V('__guid', startGuid).bothE().bothV().has('__guid').__guid.dedup().toList()"; return "g.V('__guid', startGuid).bothE().bothV().has('__guid').__guid.dedup().toList()";
case EXPORT_BY_GUID_CONNECTED_IN_EDGE:
return "g.V('__guid', startGuid).inE().outV().has('__guid').__guid.dedup().toList()";
case EXPORT_BY_GUID_CONNECTED_OUT_EDGE:
return "g.V('__guid', startGuid).outE().inV().has('__guid').__guid.dedup().toList()";
case EXPORT_TYPE_STARTS_WITH: case EXPORT_TYPE_STARTS_WITH:
return "g.V().has('__typeName','%s').filter({it.'%s'.startsWith(attrValue)}).has('__guid').__guid.toList()"; return "g.V().has('__typeName','%s').filter({it.'%s'.startsWith(attrValue)}).has('__guid').__guid.toList()";
case EXPORT_TYPE_ENDS_WITH: case EXPORT_TYPE_ENDS_WITH:
......
...@@ -42,7 +42,9 @@ public abstract class AtlasGremlinQueryProvider { ...@@ -42,7 +42,9 @@ public abstract class AtlasGremlinQueryProvider {
ENTITIES_FOR_TAG_METRIC, ENTITIES_FOR_TAG_METRIC,
// Import Export related Queries // Import Export related Queries
EXPORT_BY_GUID, EXPORT_BY_GUID_FULL,
EXPORT_BY_GUID_CONNECTED_IN_EDGE,
EXPORT_BY_GUID_CONNECTED_OUT_EDGE,
EXPORT_TYPE_STARTS_WITH, EXPORT_TYPE_STARTS_WITH,
EXPORT_TYPE_ENDS_WITH, EXPORT_TYPE_ENDS_WITH,
EXPORT_TYPE_CONTAINS, EXPORT_TYPE_CONTAINS,
......
...@@ -22,12 +22,7 @@ import com.google.common.collect.ImmutableList; ...@@ -22,12 +22,7 @@ import com.google.common.collect.ImmutableList;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.*;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
...@@ -82,6 +77,22 @@ public class TypeUtils { ...@@ -82,6 +77,22 @@ public class TypeUtils {
public static <L, R> Pair<L, R> of(L left, R right) { public static <L, R> Pair<L, R> of(L left, R right) {
return new Pair<>(left, right); return new Pair<>(left, right);
} }
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Pair p = (Pair)o;
return Objects.equals(left, p.left) && Objects.equals(right, p.right);
}
public int hashCode() { return Objects.hash(left, right); }
} }
/** /**
......
...@@ -22,7 +22,9 @@ import com.google.inject.Inject; ...@@ -22,7 +22,9 @@ import com.google.inject.Inject;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException; import org.apache.atlas.authorize.AtlasActionTypes;
import org.apache.atlas.authorize.AtlasResourceTypes;
import org.apache.atlas.authorize.simple.AtlasAuthorizationUtils;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult; import org.apache.atlas.model.impexp.AtlasExportResult;
...@@ -31,14 +33,12 @@ import org.apache.atlas.model.impexp.AtlasImportResult; ...@@ -31,14 +33,12 @@ import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.metrics.AtlasMetrics; import org.apache.atlas.model.metrics.AtlasMetrics;
import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.services.MetricsService; import org.apache.atlas.services.MetricsService;
import org.apache.atlas.authorize.AtlasActionTypes;
import org.apache.atlas.authorize.AtlasResourceTypes;
import org.apache.atlas.authorize.simple.AtlasAuthorizationUtils;
import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.web.filters.AtlasCSRFPreventionFilter; import org.apache.atlas.web.filters.AtlasCSRFPreventionFilter;
import org.apache.atlas.web.service.ServiceState; import org.apache.atlas.web.service.ServiceState;
import org.apache.atlas.web.util.Servlets; import org.apache.atlas.web.util.Servlets;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
...@@ -54,13 +54,7 @@ import javax.inject.Singleton; ...@@ -54,13 +54,7 @@ import javax.inject.Singleton;
import javax.servlet.ServletOutputStream; import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes; import javax.ws.rs.*;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context; import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
...@@ -70,9 +64,6 @@ import java.util.Collection; ...@@ -70,9 +64,6 @@ import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.configuration.Configuration;
import static org.apache.atlas.repository.converters.AtlasInstanceConverter.toAtlasBaseException;
/** /**
...@@ -369,7 +360,7 @@ public class AdminResource { ...@@ -369,7 +360,7 @@ public class AdminResource {
try { try {
AtlasImportRequest request = new AtlasImportRequest(Servlets.getParameterMap(httpServletRequest)); AtlasImportRequest request = new AtlasImportRequest(Servlets.getParameterMap(httpServletRequest));
ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
ImportService importService = new ImportService(this.typesDefStore, this.entityStore); ImportService importService = new ImportService(this.typesDefStore, this.entityStore, this.typeRegistry);
ZipSource zipSource = new ZipSource(inputStream); ZipSource zipSource = new ZipSource(inputStream);
...@@ -405,7 +396,7 @@ public class AdminResource { ...@@ -405,7 +396,7 @@ public class AdminResource {
try { try {
AtlasImportRequest request = new AtlasImportRequest(Servlets.getParameterMap(httpServletRequest)); AtlasImportRequest request = new AtlasImportRequest(Servlets.getParameterMap(httpServletRequest));
ImportService importService = new ImportService(this.typesDefStore, this.entityStore); ImportService importService = new ImportService(this.typesDefStore, this.entityStore, this.typeRegistry);
result = importService.run(request, Servlets.getUserName(httpServletRequest), result = importService.run(request, Servlets.getUserName(httpServletRequest),
Servlets.getHostName(httpServletRequest), Servlets.getHostName(httpServletRequest),
......
...@@ -27,6 +27,7 @@ import org.apache.atlas.model.impexp.AtlasExportResult; ...@@ -27,6 +27,7 @@ import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasClassificationDef; import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasEntityDef; import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.model.typedef.AtlasTypesDef;
...@@ -47,65 +48,44 @@ import org.slf4j.LoggerFactory; ...@@ -47,65 +48,44 @@ import org.slf4j.LoggerFactory;
import javax.script.Bindings; import javax.script.Bindings;
import javax.script.ScriptContext; import javax.script.ScriptContext;
import javax.script.ScriptEngine;
import javax.script.ScriptException; import javax.script.ScriptException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_FETCH_TYPE;
import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_ATTR_MATCH_TYPE;
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_FULL;
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_CONNECTED;
import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_STARTS_WITH;
import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_CONTAINS;
import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_MATCHES;
import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_ENDS_WITH;
public class ExportService { public class ExportService {
private static final Logger LOG = LoggerFactory.getLogger(ExportService.class); private static final Logger LOG = LoggerFactory.getLogger(ExportService.class);
public static final String OPTION_ATTR_MATCH_TYPE = "matchType"; private final AtlasTypeRegistry typeRegistry;
public static final String MATCH_TYPE_STARTS_WITH = "startsWith"; private final AtlasGraph atlasGraph;
public static final String MATCH_TYPE_ENDS_WITH = "endsWith"; private final EntityGraphRetriever entityGraphRetriever;
public static final String MATCH_TYPE_CONTAINS = "contains";
public static final String MATCH_TYPE_MATCHES = "matches";
private final AtlasTypeRegistry typeRegistry;
private final AtlasGraph atlasGraph;
private final EntityGraphRetriever entityGraphRetriever;
private final AtlasGremlinQueryProvider gremlinQueryProvider; private final AtlasGremlinQueryProvider gremlinQueryProvider;
// query engine support
private final ScriptEngine scriptEngine;
private final Bindings bindings;
public ExportService(final AtlasTypeRegistry typeRegistry) throws AtlasBaseException { public ExportService(final AtlasTypeRegistry typeRegistry) throws AtlasBaseException {
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry); this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry);
this.atlasGraph = AtlasGraphProvider.getGraphInstance(); this.atlasGraph = AtlasGraphProvider.getGraphInstance();
this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE; this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE;
this.scriptEngine = new GremlinGroovyScriptEngine();
//Do not cache script compilations due to memory implications
scriptEngine.getContext().setAttribute("#jsr223.groovy.engine.keep.globals", "phantom", ScriptContext.ENGINE_SCOPE);
bindings = scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE);
}
private class ExportContext {
final Set<String> guidsProcessed = new HashSet<>();
final List<String> guidsToProcess = new ArrayList<>();
final AtlasExportResult result;
final ZipSink sink;
ExportContext(AtlasExportResult result, ZipSink sink) {
this.result = result;
this.sink = sink;
}
} }
public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName, public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName,
String requestingIP) throws AtlasBaseException { String requestingIP) throws AtlasBaseException {
long startTimestamp = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
ExportContext context = new ExportContext(new AtlasExportResult(request, userName, hostName, requestingIP, AtlasExportResult result = new AtlasExportResult(request, userName, hostName, requestingIP, startTime);
System.currentTimeMillis()), exportSink); ExportContext context = new ExportContext(result, exportSink);
try { try {
LOG.info("==> export(user={}, from={})", userName, requestingIP); LOG.info("==> export(user={}, from={})", userName, requestingIP);
...@@ -114,13 +94,14 @@ public class ExportService { ...@@ -114,13 +94,14 @@ public class ExportService {
processObjectId(item, context); processObjectId(item, context);
} }
long endTime = System.currentTimeMillis();
context.sink.setExportOrder(context.result.getData().getEntityCreationOrder()); context.sink.setExportOrder(context.result.getData().getEntityCreationOrder());
context.sink.setTypesDef(context.result.getData().getTypesDef()); context.sink.setTypesDef(context.result.getData().getTypesDef());
context.result.setData(null); context.result.setData(null);
context.result.setOperationStatus(AtlasExportResult.OperationStatus.SUCCESS); context.result.setOperationStatus(AtlasExportResult.OperationStatus.SUCCESS);
context.result.incrementMeticsCounter("duration", (int) (endTime - startTime));
long endTimestamp = System.currentTimeMillis();
context.result.incrementMeticsCounter("duration", (int) (endTimestamp - startTimestamp));
context.sink.setResult(context.result); context.sink.setResult(context.result);
} catch(Exception ex) { } catch(Exception ex) {
LOG.error("Operation failed: ", ex); LOG.error("Operation failed: ", ex);
...@@ -140,15 +121,15 @@ public class ExportService { ...@@ -140,15 +121,15 @@ public class ExportService {
List<AtlasEntity> entities = getStartingEntity(item, context); List<AtlasEntity> entities = getStartingEntity(item, context);
for (AtlasEntity entity: entities) { for (AtlasEntity entity: entities) {
processEntity(entity, context); processEntity(entity, context, TraversalDirection.UNKNOWN);
} }
while (!context.guidsToProcess.isEmpty()) { while (!context.guidsToProcess.isEmpty()) {
String guid = context.guidsToProcess.remove(0); String guid = context.guidsToProcess.remove(0);
TraversalDirection direction = context.guidDirection.get(guid);
AtlasEntity entity = entityGraphRetriever.toAtlasEntity(guid);
AtlasEntity e = entityGraphRetriever.toAtlasEntity(guid); processEntity(entity, context, direction);
processEntity(e, context);
} }
} catch (AtlasBaseException excp) { } catch (AtlasBaseException excp) {
context.result.setOperationStatus(AtlasExportResult.OperationStatus.PARTIAL_SUCCESS); context.result.setOperationStatus(AtlasExportResult.OperationStatus.PARTIAL_SUCCESS);
...@@ -178,23 +159,14 @@ public class ExportService { ...@@ -178,23 +159,14 @@ public class ExportService {
throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, typeName); throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, typeName);
} }
AtlasExportRequest request = context.result.getRequest();
String matchType = null;
if (MapUtils.isNotEmpty(request.getOptions())) {
if (request.getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) {
matchType = request.getOptions().get(OPTION_ATTR_MATCH_TYPE).toString();
}
}
final String queryTemplate; final String queryTemplate;
if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_STARTS_WITH)) { if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_STARTS_WITH)) {
queryTemplate = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_STARTS_WITH); queryTemplate = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_STARTS_WITH);
} else if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_ENDS_WITH)) { } else if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_ENDS_WITH)) {
queryTemplate = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_ENDS_WITH); queryTemplate = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_ENDS_WITH);
} else if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_CONTAINS)) { } else if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_CONTAINS)) {
queryTemplate = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_CONTAINS); queryTemplate = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_CONTAINS);
} else if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_MATCHES)) { } else if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_MATCHES)) {
queryTemplate = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_MATCHES); queryTemplate = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_MATCHES);
} else { // default } else { // default
queryTemplate = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_DEFAULT); queryTemplate = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_DEFAULT);
...@@ -211,7 +183,7 @@ public class ExportService { ...@@ -211,7 +183,7 @@ public class ExportService {
} }
String query = String.format(queryTemplate, typeName, attribute.getQualifiedName()); String query = String.format(queryTemplate, typeName, attribute.getQualifiedName());
List<String> guids = executeGremlinScriptFor(query, "attrValue", attrValue.toString()); List<String> guids = executeGremlinQuery(query, "attrValue", attrValue.toString(), context);
if (CollectionUtils.isNotEmpty(guids)) { if (CollectionUtils.isNotEmpty(guids)) {
for (String guid : guids) { for (String guid : guids) {
...@@ -228,13 +200,13 @@ public class ExportService { ...@@ -228,13 +200,13 @@ public class ExportService {
break; break;
} }
LOG.info("export(item={}; matchType={}): found {} entities", item, matchType, ret.size()); LOG.info("export(item={}; matchType={}, fetchType={}): found {} entities", item, context.matchType, context.fetchType, ret.size());
} }
return ret; return ret;
} }
private void processEntity(AtlasEntity entity, ExportContext context) throws AtlasBaseException { private void processEntity(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("==> processEntity({})", AtlasTypeUtil.getAtlasObjectId(entity)); LOG.debug("==> processEntity({})", AtlasTypeUtil.getAtlasObjectId(entity));
} }
...@@ -247,7 +219,7 @@ public class ExportService { ...@@ -247,7 +219,7 @@ public class ExportService {
addClassificationsAsNeeded(entity, context); addClassificationsAsNeeded(entity, context);
addEntity(entity, context); addEntity(entity, context);
getConnectedEntityGuids(entity, context); getConntedEntitiesBasedOnOption(entity, context, direction);
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
...@@ -255,26 +227,125 @@ public class ExportService { ...@@ -255,26 +227,125 @@ public class ExportService {
} }
} }
private void getConnectedEntityGuids(AtlasEntity entity, ExportContext context) { private void getConntedEntitiesBasedOnOption(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException {
switch (context.fetchType) {
case CONNECTED:
getEntityGuidsForConnectedFetch(entity, context, direction);
break;
case FULL:
default:
getEntityGuidsForFullFetch(entity, context);
}
}
private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException {
if (direction == TraversalDirection.UNKNOWN) {
getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.OUTWARD);
} else {
if (isProcessEntity(entity)) {
direction = TraversalDirection.OUTWARD;
}
getConnectedEntityGuids(entity, context, direction);
}
}
private boolean isProcessEntity(AtlasEntity entity) throws AtlasBaseException {
String typeName = entity.getTypeName();
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
return entityType.isSubTypeOf(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS);
}
private void getConnectedEntityGuids(AtlasEntity entity, ExportContext context, TraversalDirection... directions) {
if(directions == null) {
return;
}
try {
for (TraversalDirection direction : directions) {
String query = getQueryForTraversalDirection(direction);
if (LOG.isDebugEnabled()) {
LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), query);
}
List<String> guids = executeGremlinQuery(query, entity.getGuid(), context);
if (CollectionUtils.isEmpty(guids)) {
continue;
}
for (String guid : guids) {
TraversalDirection currentDirection = context.guidDirection.get(guid);
if (currentDirection == null) {
context.guidDirection.put(guid, direction);
if (!context.guidsToProcess.contains(guid)) {
context.guidsToProcess.add(guid);
}
} else if (currentDirection == TraversalDirection.OUTWARD && direction == TraversalDirection.INWARD) {
context.guidDirection.put(guid, direction);
// the entity should be reprocessed to get inward entities
context.guidsProcessed.remove(guid);
if (!context.guidsToProcess.contains(guid)) {
context.guidsToProcess.add(guid);
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), guids.size(), context.guidsToProcess.size());
}
}
} catch (ScriptException e) {
LOG.error("Child entities could not be added for %s", entity.getGuid());
}
}
private String getQueryForTraversalDirection(TraversalDirection direction) {
switch (direction) {
case INWARD:
return this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_IN_EDGE);
default:
case OUTWARD:
return this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_OUT_EDGE);
}
}
private void getEntityGuidsForFullFetch(AtlasEntity entity, ExportContext context) {
try { try {
String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size()); LOG.debug("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
} }
List<String> result = executeGremlinScriptForHive(entity.getGuid()); List<String> result = executeGremlinQuery(query, entity.getGuid(), context);
if(result == null) {
if (result == null) {
return; return;
} }
for (String guid : result) { for (String guid : result) {
if (!context.guidsProcessed.contains(guid)) { if (!context.guidsProcessed.contains(guid)) {
context.guidsToProcess.add(guid); if (!context.guidsToProcess.contains(guid)) {
context.guidsToProcess.add(guid);
}
context.guidDirection.put(guid, TraversalDirection.BOTH);
} }
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size()); LOG.debug("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size());
} }
} catch (ScriptException e) { } catch (ScriptException e) {
LOG.error("Child entities could not be added for %s", entity.getGuid()); LOG.error("Child entities could not be added for %s", entity.getGuid());
...@@ -322,16 +393,19 @@ public class ExportService { ...@@ -322,16 +393,19 @@ public class ExportService {
} }
} }
private List<String> executeGremlinScriptForHive(String guid) throws ScriptException { private List<String> executeGremlinQuery(String query, String guid, ExportContext context) throws ScriptException {
String queryByGuid = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID); context.bindings.put("startGuid", guid);
return executeGremlinScriptFor(queryByGuid, "startGuid", guid); return (List<String>) atlasGraph.executeGremlinScript(context.scriptEngine,
context.bindings,
query,
false);
} }
private List<String> executeGremlinScriptFor(String query, String parameterName, String parameterValue) { private List<String> executeGremlinQuery(String query, String parameterName, String parameterValue, ExportContext context) {
bindings.put(parameterName, parameterValue); context.bindings.put(parameterName, parameterValue);
try { try {
return (List<String>) atlasGraph.executeGremlinScript(this.scriptEngine, return (List<String>) atlasGraph.executeGremlinScript(context.scriptEngine,
this.bindings, context.bindings,
query, query,
false); false);
} catch (ScriptException e) { } catch (ScriptException e) {
...@@ -339,4 +413,87 @@ public class ExportService { ...@@ -339,4 +413,87 @@ public class ExportService {
return null; return null;
} }
} }
private enum TraversalDirection {
UNKNOWN,
INWARD,
OUTWARD,
BOTH;
}
public enum ExportFetchType {
FULL(FETCH_TYPE_FULL),
CONNECTED(FETCH_TYPE_CONNECTED);
final String str;
ExportFetchType(String s) {
this.str = s;
}
public static final ExportFetchType from(String s) {
for (ExportFetchType b : ExportFetchType.values()) {
if (b.str.equalsIgnoreCase(s)) {
return b;
}
}
return FULL;
}
}
private class ExportContext {
final Set<String> guidsProcessed = new HashSet<>();
final List<String> guidsToProcess = new ArrayList<>();
final Map<String, TraversalDirection> guidDirection = new HashMap<>();
final AtlasExportResult result;
final ZipSink sink;
private final GremlinGroovyScriptEngine scriptEngine;
private final Bindings bindings;
private final ExportFetchType fetchType;
private final String matchType;
ExportContext(AtlasExportResult result, ZipSink sink) {
this.result = result;
this.sink = sink;
this.scriptEngine = new GremlinGroovyScriptEngine();
//Do not cache script compilations due to memory implications
scriptEngine.getContext().setAttribute("#jsr223.groovy.engine.keep.globals",
"phantom",
ScriptContext.ENGINE_SCOPE);
bindings = scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE);
fetchType = getFetchType(result.getRequest());
matchType = getMatchType(result.getRequest());
}
private ExportFetchType getFetchType(AtlasExportRequest request) {
Object fetchOption = request.getOptions() != null ? request.getOptions().get(OPTION_FETCH_TYPE) : null;
if (fetchOption instanceof String) {
return ExportFetchType.from((String) fetchOption);
} else if (fetchOption instanceof ExportFetchType) {
return (ExportFetchType) fetchOption;
}
return ExportFetchType.FULL;
}
private String getMatchType(AtlasExportRequest request) {
String matchType = null;
if (MapUtils.isNotEmpty(request.getOptions())) {
if (request.getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) {
matchType = request.getOptions().get(OPTION_ATTR_MATCH_TYPE).toString();
}
}
return matchType;
}
}
} }
...@@ -18,21 +18,22 @@ ...@@ -18,21 +18,22 @@
package org.apache.atlas.web.resources; package org.apache.atlas.web.resources;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.typedef.*; import org.apache.atlas.model.typedef.*;
import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.commons.io.FileUtils;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.*; import java.io.ByteArrayInputStream;
import java.util.List; import java.io.File;
import java.util.concurrent.TimeUnit; import java.io.FileNotFoundException;
public class ImportService { public class ImportService {
...@@ -40,14 +41,16 @@ public class ImportService { ...@@ -40,14 +41,16 @@ public class ImportService {
private final AtlasTypeDefStore typeDefStore; private final AtlasTypeDefStore typeDefStore;
private final AtlasEntityStore entityStore; private final AtlasEntityStore entityStore;
private final AtlasTypeRegistry typeRegistry;
private long startTimestamp; private long startTimestamp;
private long endTimestamp; private long endTimestamp;
public ImportService(final AtlasTypeDefStore typeDefStore, final AtlasEntityStore entityStore) { public ImportService(final AtlasTypeDefStore typeDefStore, final AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
this.typeDefStore = typeDefStore; this.typeDefStore = typeDefStore;
this.entityStore = entityStore; this.entityStore = entityStore;
this.typeRegistry = typeRegistry;
} }
public AtlasImportResult run(ZipSource source, AtlasImportRequest request, String userName, public AtlasImportResult run(ZipSource source, AtlasImportRequest request, String userName,
...@@ -116,9 +119,13 @@ public class ImportService { ...@@ -116,9 +119,13 @@ public class ImportService {
} }
private void processTypes(AtlasTypesDef typeDefinitionMap, AtlasImportResult result) throws AtlasBaseException { private void processTypes(AtlasTypesDef typeDefinitionMap, AtlasImportResult result) throws AtlasBaseException {
setGuidToEmpty(typeDefinitionMap.getEntityDefs()); setGuidToEmpty(typeDefinitionMap);
typeDefStore.updateTypesDef(typeDefinitionMap); AtlasTypesDef typesToCreate = AtlasTypeDefStoreInitializer.getTypesToCreate(typeDefinitionMap, this.typeRegistry);
if (!typesToCreate.isEmpty()) {
typeDefStore.createTypesDef(typesToCreate);
}
typeDefStore.updateTypesDef(typeDefinitionMap);
updateMetricsForTypesDef(typeDefinitionMap, result); updateMetricsForTypesDef(typeDefinitionMap, result);
} }
...@@ -129,9 +136,21 @@ public class ImportService { ...@@ -129,9 +136,21 @@ public class ImportService {
result.incrementMeticsCounter("typedef:struct", typeDefinitionMap.getStructDefs().size()); result.incrementMeticsCounter("typedef:struct", typeDefinitionMap.getStructDefs().size());
} }
private void setGuidToEmpty(List<AtlasEntityDef> entityDefList) { private void setGuidToEmpty(AtlasTypesDef typesDef) {
for (AtlasEntityDef edf: entityDefList) { for (AtlasEntityDef def: typesDef.getEntityDefs()) {
edf.setGuid(""); def.setGuid(null);
}
for (AtlasClassificationDef def: typesDef.getClassificationDefs()) {
def.setGuid(null);
}
for (AtlasEnumDef def: typesDef.getEnumDefs()) {
def.setGuid(null);
}
for (AtlasStructDef def: typesDef.getStructDefs()) {
def.setGuid(null);
} }
} }
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
*/ */
package org.apache.atlas.web.resources; package org.apache.atlas.web.resources;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.codehaus.jackson.type.TypeReference; import org.codehaus.jackson.type.TypeReference;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
...@@ -184,4 +185,11 @@ public class ZipSource implements EntityImportStream { ...@@ -184,4 +185,11 @@ public class ZipSource implements EntityImportStream {
return null; return null;
} }
} }
@Override
public void onImportComplete(String guid) {
if(guid != null) {
guidEntityJsonMap.remove(guid);
}
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment