Commit 6170063c by ashutoshm Committed by Madhan Neethiraj

ATLAS-1503: implementation of import REST API; optimization of export API implementation

parent 68c55925
......@@ -21,6 +21,9 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.Set;
import javax.script.Bindings;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import org.apache.atlas.groovy.GroovyExpression;
......@@ -265,6 +268,22 @@ public interface AtlasGraph<V, E> {
Object executeGremlinScript(String query, boolean isPath) throws ScriptException;
/**
* Executes a Gremlin script using a ScriptEngineManager provided by consumer, returns an object with the result.
* This is useful for scenarios where an operation executes large number of queries.
*
* @param scriptEngine: ScriptEngine initialized by consumer.
* @param bindings: Update bindings with Graph instance for ScriptEngine that is initilized externally.
* @param query
* @param isPath whether this is a path query
*
* @return the result from executing the script
*
* @throws ScriptException
*/
Object executeGremlinScript(ScriptEngine scriptEngine, Bindings bindings, String query, boolean isPath) throws ScriptException;
/**
* Convenience method to check whether the given property is
* a multi-property.
*
......@@ -272,6 +291,4 @@ public interface AtlasGraph<V, E> {
* @return
*/
boolean isMultiProperty(String name);
}
......@@ -264,6 +264,10 @@ public class Titan0Graph implements AtlasGraph<Titan0Vertex, Titan0Edge> {
public Object executeGremlinScript(String query, boolean isPath) throws ScriptException {
Object result = executeGremlinScript(query);
return convertGremlinScriptResult(isPath, result);
}
private Object convertGremlinScriptResult(boolean isPath, Object result) {
if (isPath) {
List<Object> path = convertPathQueryResultToList(result);
......@@ -277,6 +281,17 @@ public class Titan0Graph implements AtlasGraph<Titan0Vertex, Titan0Edge> {
}
}
@Override
public Object executeGremlinScript(ScriptEngine scriptEngine, Bindings bindings, String query, boolean isPath) throws ScriptException {
if(!bindings.containsKey("g")) {
bindings.put("g", getGraph());
}
Object result = scriptEngine.eval(query, bindings);
return convertGremlinScriptResult(isPath, result);
}
private Object executeGremlinScript(String gremlinQuery) throws ScriptException {
ScriptEngineManager manager = new ScriptEngineManager();
......@@ -351,7 +366,6 @@ public class Titan0Graph implements AtlasGraph<Titan0Vertex, Titan0Edge> {
});
}
@Override
public boolean isMultiProperty(String propertyName) {
return multiProperties.contains(propertyName);
......
......@@ -29,6 +29,7 @@ import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -187,15 +188,15 @@ public class AtlasExportResult implements Serializable {
public static class AtlasExportData implements Serializable{
private static final long serialVersionUID = 1L;
private AtlasTypesDef typesDef;
private Map<String, AtlasEntity> entities;
private Map<String, List<String>> entityCreationOrder;
private AtlasTypesDef typesDef;
private Map<String, AtlasEntity> entities;
private List<String> entityCreationOrder;
public AtlasExportData() {
typesDef = new AtlasTypesDef();
entities = new HashMap<>();
entityCreationOrder = new HashMap<>();
entityCreationOrder = new ArrayList<>();
}
public AtlasTypesDef getTypesDef() { return typesDef; }
......@@ -206,25 +207,11 @@ public class AtlasExportResult implements Serializable {
public void setEntities(Map<String, AtlasEntity> entities) { this.entities = entities; }
public Map<String, List<String>> getEntityCreationOrder() { return entityCreationOrder; }
public List<String> getEntityCreationOrder() { return entityCreationOrder; }
public void setEntityCreationOrder(Map<String, List<String>> entityCreationOrder) { this.entityCreationOrder = entityCreationOrder; }
public void setEntityCreationOrder(List<String> entityCreationOrder) { this.entityCreationOrder = entityCreationOrder; }
public void clear() {
if (entityCreationOrder != null) {
entityCreationOrder.clear();
}
if (typesDef != null) {
typesDef.clear();
}
if (entities != null) {
entities.clear();
}
}
public StringBuilder toString(StringBuilder sb) {
if (sb == null) {
sb = new StringBuilder();
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.model.impexp;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.codehaus.jackson.annotate.JsonAutoDetect;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE;
import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY;
@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown=true)
@XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY)
public class AtlasImportRequest implements Serializable {
private static final long serialVersionUID = 1L;
private Map<String, Object> options;
public AtlasImportRequest() {
this.options = new HashMap<>();
}
public AtlasImportRequest(Map<String, Object> options) {
this.options = options;
}
public Map<String, Object> getOptions() { return options; }
public void setOptions(Map<String, Object> options) { this.options = options; }
public StringBuilder toString(StringBuilder sb) {
if (sb == null) {
sb = new StringBuilder();
}
sb.append("AtlasImportRequest{");
sb.append("options={");
AtlasBaseTypeDef.dumpObjects(options, sb);
sb.append("}");
sb.append("}");
return sb;
}
@Override
public String toString() {
return toString(new StringBuilder()).toString();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.model.impexp;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.codehaus.jackson.annotate.JsonAutoDetect;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.HashMap;
import java.util.Map;
import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE;
import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY;
@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown=true)
@XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY)
public class AtlasImportResult {
private static final long serialVersionUID = 1L;
public enum OperationStatus {
SUCCESS, PARTIAL_SUCCESS, FAIL
}
private AtlasImportRequest request;
private String userName;
private String clientIpAddress;
private String hostName;
private long timeStamp;
private Map<String, Integer> metrics;
private OperationStatus operationStatus;
public AtlasImportResult() {
this(null, null, null, null, System.currentTimeMillis());
}
public AtlasImportResult(AtlasImportRequest request, String userName,
String clientIpAddress, String hostName, long timeStamp) {
this.request = request;
this.userName = userName;
this.clientIpAddress = clientIpAddress;
this.hostName = hostName;
this.timeStamp = timeStamp;
this.metrics = new HashMap<>();
this.operationStatus = OperationStatus.FAIL;
}
public AtlasImportRequest getRequest() {
return request;
}
public void setRequest(AtlasImportRequest request) {
this.request = request;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getClientIpAddress() {
return clientIpAddress;
}
public void setClientIpAddress(String clientIpAddress) {
this.clientIpAddress = clientIpAddress;
}
public String getHostName() {
return hostName;
}
public void setHostName(String hostName) {
this.hostName = hostName;
}
public long getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}
public Map<String, Integer> getMetrics() {
return metrics;
}
public void setMetrics(Map<String, Integer> metrics) {
this.metrics = metrics;
}
public OperationStatus getOperationStatus() {
return operationStatus;
}
public void setOperationStatus(OperationStatus operationStatus) {
this.operationStatus = operationStatus;
}
public void incrementMeticsCounter(String key) {
incrementMeticsCounter(key, 1);
}
public void incrementMeticsCounter(String key, int incrementBy) {
int currentValue = metrics.containsKey(key) ? metrics.get(key) : 0;
metrics.put(key, currentValue + incrementBy);
}
public StringBuilder toString(StringBuilder sb) {
if (sb == null) {
sb = new StringBuilder();
}
sb.append("AtlasImportResult{");
sb.append("request={").append(request).append("}");
sb.append(", userName='").append(userName).append("'");
sb.append(", clientIpAddress='").append(clientIpAddress).append("'");
sb.append(", hostName='").append(hostName).append("'");
sb.append(", timeStamp='").append(timeStamp).append("'");
sb.append(", metrics={");
AtlasBaseTypeDef.dumpObjects(metrics, sb);
sb.append("}");
sb.append(", operationStatus='").append(operationStatus).append("'");
sb.append("}");
return sb;
}
@Override
public String toString() {
return toString(new StringBuilder()).toString();
}
}
......@@ -21,9 +21,7 @@ package org.apache.atlas.repository.store.graph.v1;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.GraphTransaction;
import org.apache.atlas.RequestContext;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasClassification;
......@@ -33,7 +31,6 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
......@@ -46,10 +43,8 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
......@@ -303,7 +298,11 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
//Create vertices which do not exist in the repository
vertex = entityGraphMapper.createVertex(entity);
if ((entityStream instanceof EntityImportStream) && AtlasEntity.isAssigned(entity.getGuid())) {
vertex = entityGraphMapper.createVertexWithGuid(entity, entity.getGuid());
} else {
vertex = entityGraphMapper.createVertex(entity);
}
discoveryContext.addResolvedGuid(guid, vertex);
......
......@@ -84,6 +84,11 @@ public class EntityGraphMapper {
}
public AtlasVertex createVertex(AtlasEntity entity) {
final String guid = UUID.randomUUID().toString();
return createVertexWithGuid(entity, guid);
}
public AtlasVertex createVertexWithGuid(AtlasEntity entity, String guid) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> createVertex({})", entity.getTypeName());
}
......@@ -96,8 +101,6 @@ public class EntityGraphMapper {
AtlasGraphUtilsV1.addProperty(ret, Constants.SUPER_TYPES_PROPERTY_KEY, superTypeName);
}
final String guid = UUID.randomUUID().toString();
AtlasGraphUtilsV1.setProperty(ret, Constants.GUID_PROPERTY_KEY, guid);
AtlasGraphUtilsV1.setProperty(ret, Constants.VERSION_PROPERTY_KEY, getEntityVersion(entity));
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v1;
public interface EntityImportStream extends EntityStream {
}
......@@ -21,7 +21,6 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
import org.apache.atlas.repository.store.graph.EntityResolver;
......@@ -34,7 +33,6 @@ import org.slf4j.LoggerFactory;
public class IDBasedEntityResolver implements EntityResolver {
private static final Logger LOG = LoggerFactory.getLogger(IDBasedEntityResolver.class);
private final GraphHelper graphHelper = GraphHelper.getInstance();
private final AtlasTypeRegistry typeRegistry;
public IDBasedEntityResolver(AtlasTypeRegistry typeRegistry) {
......@@ -49,8 +47,8 @@ public class IDBasedEntityResolver implements EntityResolver {
EntityStream entityStream = context.getEntityStream();
for (String guid : context.getReferencedGuids()) {
boolean isAssignedGuid = AtlasEntity.isAssigned(guid);
AtlasVertex vertex = isAssignedGuid ? AtlasGraphUtilsV1.findByGuid(guid) : null;
boolean isAssignedGuid = AtlasEntity.isAssigned(guid);
AtlasVertex vertex = isAssignedGuid ? AtlasGraphUtilsV1.findByGuid(guid) : null;
if (vertex == null) { // if not found in the store, look if the entity is present in the stream
AtlasEntity entity = entityStream.getByGuid(guid);
......@@ -71,7 +69,7 @@ public class IDBasedEntityResolver implements EntityResolver {
if (vertex != null) {
context.addResolvedGuid(guid, vertex);
} else {
if (isAssignedGuid) {
if (isAssignedGuid && !(entityStream instanceof EntityImportStream)) {
throw new AtlasBaseException(AtlasErrorCode.REFERENCED_ENTITY_NOT_FOUND, guid);
} else {
context.addLocalGuidReference(guid);
......@@ -81,4 +79,4 @@ public class IDBasedEntityResolver implements EntityResolver {
return context;
}
}
}
\ No newline at end of file
......@@ -21,15 +21,18 @@ package org.apache.atlas.web.resources;
import com.google.inject.Inject;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.discovery.DiscoveryService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.metrics.AtlasMetrics;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.services.MetricsService;
import org.apache.atlas.authorize.AtlasActionTypes;
import org.apache.atlas.authorize.AtlasResourceTypes;
import org.apache.atlas.authorize.simple.AtlasAuthorizationUtils;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.web.filters.AtlasCSRFPreventionFilter;
import org.apache.atlas.web.service.ServiceState;
......@@ -53,6 +56,7 @@ import javax.ws.rs.*;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.*;
......@@ -85,16 +89,19 @@ public class AdminResource {
private final ServiceState serviceState;
private final MetricsService metricsService;
private final DiscoveryService discoveryService;
private final AtlasTypeRegistry typeRegistry;
private final AtlasTypeDefStore typesDefStore;
private final AtlasEntityStore entityStore;
@Inject
public AdminResource(ServiceState serviceState, MetricsService metricsService,
DiscoveryService discoveryService, AtlasTypeRegistry typeRegistry) {
AtlasTypeRegistry typeRegistry, AtlasTypeDefStore typeDefStore,
AtlasEntityStore entityStore) {
this.serviceState = serviceState;
this.metricsService = metricsService;
this.discoveryService = discoveryService;
this.typeRegistry = typeRegistry;
this.typesDefStore = typeDefStore;
this.entityStore = entityStore;
}
/**
......@@ -272,6 +279,10 @@ public class AdminResource {
@Path("/export")
@Consumes(Servlets.JSON_MEDIA_TYPE)
public Response export(AtlasExportRequest request) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> AdminResource.export()");
}
ZipSink exportSink = null;
try {
exportSink = new ZipSink();
......@@ -292,16 +303,85 @@ public class AdminResource {
outStream.flush();
return Response.ok().build();
} catch (AtlasException | IOException ex) {
LOG.error("export() failed", ex);
} catch (IOException excp) {
LOG.error("export() failed", excp);
throw toAtlasBaseException(new AtlasException(ex));
throw new AtlasBaseException(excp);
} finally {
if (exportSink != null)
if (exportSink != null) {
exportSink.close();
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== AdminResource.export()");
}
}
}
@POST
@Path("/import")
@Produces(Servlets.JSON_MEDIA_TYPE)
@Consumes(Servlets.BINARY)
public AtlasImportResult importData(byte[] bytes) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> AdminResource.importData(bytes.length={})", bytes.length);
}
AtlasImportResult result;
try {
AtlasImportRequest request = new AtlasImportRequest(Servlets.getParameterMap(httpServletRequest));
ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
ImportService importService = new ImportService(this.typesDefStore, this.entityStore);
ZipSource zipSource = new ZipSource(inputStream);
result = importService.run(zipSource, request, Servlets.getUserName(httpServletRequest),
Servlets.getHostName(httpServletRequest),
Servlets.getRequestIpAddress(httpServletRequest));
} catch (Exception excp) {
LOG.error("importData(binary) failed", excp);
throw new AtlasBaseException(excp);
} finally {
if (LOG.isDebugEnabled()) {
LOG.debug("<== AdminResource.importData(binary)");
}
}
return result;
}
@POST
@Path("/importfile")
@Produces(Servlets.JSON_MEDIA_TYPE)
public AtlasImportResult importFile() throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> AdminResource.importFile()");
}
AtlasImportResult result;
try {
AtlasImportRequest request = new AtlasImportRequest(Servlets.getParameterMap(httpServletRequest));
ImportService importService = new ImportService(this.typesDefStore, this.entityStore);
result = importService.run(request, Servlets.getUserName(httpServletRequest),
Servlets.getHostName(httpServletRequest),
Servlets.getRequestIpAddress(httpServletRequest));
} catch (Exception excp) {
LOG.error("importFile() failed", excp);
throw new AtlasBaseException(excp);
} finally {
if (LOG.isDebugEnabled()) {
LOG.debug("<== AdminResource.importFile()");
}
}
return result;
}
private String getEditableEntityTypes(PropertiesConfiguration config) {
String ret = DEFAULT_EDITABLE_ENTITY_TYPES;
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.resources;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.typedef.*;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.commons.io.FileUtils;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.List;
public class ImportService {
private static final Logger LOG = LoggerFactory.getLogger(ImportService.class);
private final AtlasTypeDefStore typeDefStore;
private final AtlasEntityStore entityStore;
public ImportService(final AtlasTypeDefStore typeDefStore, final AtlasEntityStore entityStore) {
this.typeDefStore = typeDefStore;
this.entityStore = entityStore;
}
public AtlasImportResult run(ZipSource source, AtlasImportRequest request, String userName,
String hostName, String requestingIP) throws AtlasBaseException {
AtlasImportResult result = new AtlasImportResult(request, userName, requestingIP, hostName, System.currentTimeMillis());
try {
LOG.info("==> import(user={}, from={})", userName, requestingIP);
processTypes(source.getTypesDef(), result);
processEntities(source, result);
result.setOperationStatus(AtlasImportResult.OperationStatus.SUCCESS);
} catch (AtlasBaseException excp) {
LOG.error("import(user={}, from={}): failed", userName, requestingIP, excp);
throw excp;
} catch (Exception excp) {
LOG.error("import(user={}, from={}): failed", userName, requestingIP, excp);
throw new AtlasBaseException(excp);
} finally {
try {
source.close();
} catch (IOException e) {
// ignore
}
LOG.info("<== import(user={}, from={}): status={}", userName, requestingIP, result.getOperationStatus());
}
return result;
}
public AtlasImportResult run(AtlasImportRequest request, String userName, String hostName, String requestingIP)
throws AtlasBaseException {
String fileName = (String)request.getOptions().get("FILENAME");
if (StringUtils.isBlank(fileName)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "FILENAME parameter not found");
}
AtlasImportResult result = null;
try {
LOG.info("==> import(user={}, from={}, fileName={})", userName, requestingIP, fileName);
File file = new File(fileName);
ZipSource source = new ZipSource(new ByteArrayInputStream(FileUtils.readFileToByteArray(file)));
result = run(source, request, userName, hostName, requestingIP);
} catch (AtlasBaseException excp) {
LOG.error("import(user={}, from={}, fileName={}): failed", userName, requestingIP, excp);
throw excp;
} catch (FileNotFoundException excp) {
LOG.error("import(user={}, from={}, fileName={}): file not found", userName, requestingIP, excp);
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, fileName + ": file not found");
} catch (Exception excp) {
LOG.error("import(user={}, from={}, fileName={}): failed", userName, requestingIP, excp);
throw new AtlasBaseException(excp);
} finally {
LOG.info("<== import(user={}, from={}, fileName={}): status={}", userName, requestingIP, fileName,
(result == null ? AtlasImportResult.OperationStatus.FAIL : result.getOperationStatus()));
}
return result;
}
private void processTypes(AtlasTypesDef typeDefinitionMap, AtlasImportResult result) throws AtlasBaseException {
setGuidToEmpty(typeDefinitionMap.getEntityDefs());
typeDefStore.updateTypesDef(typeDefinitionMap);
result.incrementMeticsCounter("Enum(s)", typeDefinitionMap.getEnumDefs().size());
result.incrementMeticsCounter("Struct(s)", typeDefinitionMap.getStructDefs().size());
result.incrementMeticsCounter("Classification(s)", typeDefinitionMap.getClassificationDefs().size());
result.incrementMeticsCounter("Entity definition(s)", typeDefinitionMap.getEntityDefs().size());
}
private void setGuidToEmpty(List<AtlasEntityDef> entityDefList) {
for (AtlasEntityDef edf: entityDefList) {
edf.setGuid("");
}
}
private void processEntities(ZipSource importSource, AtlasImportResult result) throws AtlasBaseException {
this.entityStore.createOrUpdate(importSource, false);
result.incrementMeticsCounter("Entities", importSource.getCreationOrder().size());
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.resources;
public enum ZipExportFileNames {
ATLAS_EXPORT_INFO_NAME("atlas-export-info"),
ATLAS_EXPORT_ORDER_NAME("atlas-export-order"),
ATLAS_TYPESDEF_NAME("atlas-typesdef");
public final String name;
ZipExportFileNames(String name) {
this.name = name;
}
@Override
public String toString() {
return this.name;
}
}
......@@ -54,21 +54,18 @@ public class ZipSink {
}
public void setResult(AtlasExportResult result) throws AtlasBaseException {
final String fileName = "atlas-export-info";
String jsonData = convertToJSON(result);
saveToZip(fileName, jsonData);
saveToZip(ZipExportFileNames.ATLAS_EXPORT_INFO_NAME, jsonData);
}
public void setTypesDef(AtlasTypesDef typesDef) throws AtlasBaseException {
final String fileName = "atlas-typesdef";
String jsonData = convertToJSON(typesDef);
saveToZip(fileName, jsonData);
saveToZip(ZipExportFileNames.ATLAS_TYPESDEF_NAME, jsonData);
}
public void setExportOrder(Map<String, List<String>> result) throws AtlasBaseException {
final String fileName = "atlas-export-order";
public void setExportOrder(List<String> result) throws AtlasBaseException {
String jsonData = convertToJSON(result);
saveToZip(fileName, jsonData);
saveToZip(ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME, jsonData);
}
public void writeTo(OutputStream stream) throws IOException {
......@@ -90,9 +87,13 @@ public class ZipSink {
return AtlasType.toJson(entity);
}
private void saveToZip(ZipExportFileNames fileName, String jsonData) throws AtlasBaseException {
saveToZip(fileName.toString(), jsonData);
}
private void saveToZip(String fileName, String jsonData) throws AtlasBaseException {
try {
addToZipStream(fileName + ".json", jsonData);
addToZipStream(fileName.toString() + ".json", jsonData);
} catch (IOException e) {
throw new AtlasBaseException(String.format("Error writing file %s.", fileName), e);
}
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.resources;
import org.codehaus.jackson.type.TypeReference;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.Iterator;
import java.util.List;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
public class ZipSource implements EntityImportStream {
private static final Logger LOG = LoggerFactory.getLogger(ZipSource.class);
private final ByteArrayInputStream inputStream;
private List<String> creationOrder;
private Iterator<String> iterator;
public ZipSource(ByteArrayInputStream inputStream) {
this.inputStream = inputStream;
this.setCreationOrder();
}
public AtlasTypesDef getTypesDef() throws AtlasBaseException {
final String fileName = ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString();
try {
String s = get(fileName);
return convertFromJson(AtlasTypesDef.class, s);
} catch (IOException e) {
LOG.error(String.format("Error retrieving '%s' from zip.", fileName), e);
return null;
}
}
public AtlasExportResult getExportResult() throws AtlasBaseException {
String fileName = ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString();
try {
String s = get(fileName);
return convertFromJson(AtlasExportResult.class, s);
} catch (IOException e) {
LOG.error(String.format("Error retrieving '%s' from zip.", fileName), e);
return null;
}
}
private void setCreationOrder() {
String fileName = ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString();
try {
String s = get(fileName);
this.creationOrder = convertFromJson(new TypeReference<List<String>>(){}, s);
this.iterator = this.creationOrder.iterator();
} catch (IOException e) {
LOG.error(String.format("Error retrieving '%s' from zip.", fileName), e);
} catch (AtlasBaseException e) {
LOG.error(String.format("Error retrieving '%s' from zip.", fileName), e);
}
}
public List<String> getCreationOrder() throws AtlasBaseException {
return this.creationOrder;
}
public AtlasEntity getEntity(String guid) throws AtlasBaseException {
try {
String s = get(guid);
return convertFromJson(AtlasEntity.class, s);
} catch (IOException e) {
LOG.error(String.format("Error retrieving '%s' from zip.", guid), e);
return null;
}
}
private String get(String entryName) throws IOException {
String ret = "";
inputStream.reset();
ZipInputStream zipInputStream = new ZipInputStream(inputStream);
ZipEntry zipEntry = zipInputStream.getNextEntry();
entryName = entryName + ".json";
while (zipEntry != null) {
if (zipEntry.getName().equals(entryName)) {
break;
}
zipEntry = zipInputStream.getNextEntry();
}
if (zipEntry != null) {
ByteArrayOutputStream os = new ByteArrayOutputStream();
byte[] buf = new byte[1024];
int n = 0;
while ((n = zipInputStream.read(buf, 0, 1024)) > -1) {
os.write(buf, 0, n);
}
ret = os.toString();
} else {
LOG.warn("{}: no such entry in zip file", entryName);
}
zipInputStream.close();
return ret;
}
private <T> T convertFromJson(TypeReference clazz, String jsonData) throws AtlasBaseException {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(jsonData, clazz);
} catch (Exception e) {
throw new AtlasBaseException("Error converting file to JSON.", e);
}
}
private <T> T convertFromJson(Class<T> clazz, String jsonData) throws AtlasBaseException {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(jsonData, clazz);
} catch (Exception e) {
throw new AtlasBaseException("Error converting file to JSON.", e);
}
}
public void close() throws IOException {
inputStream.close();
}
@Override
public boolean hasNext() {
return this.iterator.hasNext();
}
@Override
public AtlasEntity next() {
try {
return getEntity(this.iterator.next());
} catch (AtlasBaseException e) {
e.printStackTrace();
return null;
}
}
@Override
public void reset() {
try {
getCreationOrder();
this.iterator = this.creationOrder.iterator();
} catch (AtlasBaseException e) {
LOG.error("reset", e);
}
}
@Override
public AtlasEntity getByGuid(String guid) {
try {
return getEntity(guid);
} catch (AtlasBaseException e) {
e.printStackTrace();
return null;
}
}
}
......@@ -21,6 +21,7 @@ package org.apache.atlas.web.util;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.LocalServletRequest;
import org.apache.atlas.utils.ParamChecker;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
......@@ -41,7 +42,9 @@ import java.io.StringWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Utility functions for dealing with servlets.
......@@ -193,4 +196,23 @@ public final class Servlets {
public static String getUserName(HttpServletRequest httpServletRequest) throws IOException {
return httpServletRequest.getRemoteUser();
}
public static Map<String, Object> getParameterMap(HttpServletRequest request) {
Map<String, Object> attributes = new HashMap<>();
if (MapUtils.isNotEmpty(request.getParameterMap())) {
for (Map.Entry<String, String[]> e : request.getParameterMap().entrySet()) {
String key = e.getKey();
if (key != null) {
String[] values = e.getValue();
String value = values != null && values.length > 0 ? values[0] : null;
attributes.put(key, value);
}
}
}
return attributes;
}
}
......@@ -48,7 +48,7 @@ public class AdminResourceTest {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
AdminResource adminResource = new AdminResource(serviceState, null, null, null);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null);
Response response = adminResource.getStatus();
assertEquals(response.getStatus(), HttpServletResponse.SC_OK);
JSONObject entity = (JSONObject) response.getEntity();
......@@ -59,7 +59,7 @@ public class AdminResourceTest {
public void testResourceGetsValueFromServiceState() throws JSONException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
AdminResource adminResource = new AdminResource(serviceState, null, null, null);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null);
Response response = adminResource.getStatus();
verify(serviceState).getState();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment