Commit 515130cc by ashutoshm Committed by Madhan Neethiraj

ATLAS-1503: update export to specify objects-to-export using attribute value

parent 0c1d599d
...@@ -20,11 +20,10 @@ package org.apache.atlas; ...@@ -20,11 +20,10 @@ package org.apache.atlas;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.ws.rs.core.Response;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.Arrays; import java.util.Arrays;
import javax.ws.rs.core.Response;
public enum AtlasErrorCode { public enum AtlasErrorCode {
NO_SEARCH_RESULTS(204, "ATLAS2041E", "Given search filter {0} did not yield any results"), NO_SEARCH_RESULTS(204, "ATLAS2041E", "Given search filter {0} did not yield any results"),
...@@ -90,7 +89,10 @@ public enum AtlasErrorCode { ...@@ -90,7 +89,10 @@ public enum AtlasErrorCode {
DISCOVERY_QUERY_FAILED(500, "ATLAS5004E", "Discovery query failed {0}"), DISCOVERY_QUERY_FAILED(500, "ATLAS5004E", "Discovery query failed {0}"),
FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK(500, "ATLAS5005E", "Failed to get the lock; another type update might be in progress. Please try again"), FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK(500, "ATLAS5005E", "Failed to get the lock; another type update might be in progress. Please try again"),
FAILED_TO_OBTAIN_IMPORT_EXPORT_LOCK(500, "ATLAS5006E", "Another import or export is in progress. Please try again"), FAILED_TO_OBTAIN_IMPORT_EXPORT_LOCK(500, "ATLAS5006E", "Another import or export is in progress. Please try again"),
NOTIFICATION_FAILED(500, "ATLAS5007E", "Failed to notify for change {0}"); NOTIFICATION_FAILED(500, "ATLAS5007E", "Failed to notify for change {0}"),
GREMLIN_GROOVY_SCRIPT_ENGINE_FAILED(500, "ATLAS5008E", "scriptEngine cannot be initialized for: {0}"),
JSON_ERROR_OBJECT_MAPPER_NULL_RETURNED(500, "ATLAS5009E", "ObjectMapper.readValue returned NULL for class: {0}"),
GREMLIN_SCRIPT_EXECUTION_FAILED(500, "ATLAS5010E", "Script execution failed for: {0}");
private String errorCode; private String errorCode;
private String errorMessage; private String errorMessage;
......
...@@ -23,18 +23,13 @@ import com.google.inject.Singleton; ...@@ -23,18 +23,13 @@ import com.google.inject.Singleton;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations.EntityOperation; import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.graph.*;
import org.apache.atlas.repository.graph.DeleteHandler;
import org.apache.atlas.repository.graph.FullTextMapper;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graph.GraphToTypedInstanceMapper;
import org.apache.atlas.repository.graph.TypedInstanceToGraphMapper;
import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.typesystem.ITypedReferenceableInstance; import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.util.AtlasRepositoryConfiguration; import org.apache.atlas.util.AtlasRepositoryConfiguration;
...@@ -46,11 +41,6 @@ import java.util.ArrayList; ...@@ -46,11 +41,6 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.CREATE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
@Singleton @Singleton
public class AtlasEntityChangeNotifier { public class AtlasEntityChangeNotifier {
...@@ -157,11 +147,17 @@ public class AtlasEntityChangeNotifier { ...@@ -157,11 +147,17 @@ public class AtlasEntityChangeNotifier {
for (AtlasEntityHeader atlasEntityHeader : atlasEntityHeaders) { for (AtlasEntityHeader atlasEntityHeader : atlasEntityHeaders) {
AtlasVertex atlasVertex = AtlasGraphUtilsV1.findByGuid(atlasEntityHeader.getGuid()); AtlasVertex atlasVertex = AtlasGraphUtilsV1.findByGuid(atlasEntityHeader.getGuid());
if(atlasVertex == null) {
continue;
}
try { try {
String fullText = fullTextMapper.mapRecursive(atlasVertex, true); String fullText = fullTextMapper.mapRecursive(atlasVertex, true);
GraphHelper.setProperty(atlasVertex, Constants.ENTITY_TEXT_PROPERTY_KEY, fullText); GraphHelper.setProperty(atlasVertex, Constants.ENTITY_TEXT_PROPERTY_KEY, fullText);
} catch (AtlasException e) { } catch (AtlasException e) {
LOG.error("FullText mapping failed for Vertex[ guid = {} ]", atlasEntityHeader.getGuid()); LOG.error("FullText mapping failed for Vertex[ guid = {} ]", atlasEntityHeader.getGuid(), e);
} }
} }
} }
......
...@@ -160,7 +160,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { ...@@ -160,7 +160,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
while (entityStream.hasNext()) { while (entityStream.hasNext()) {
AtlasEntity entity = entityStream.next(); AtlasEntity entity = entityStream.next();
if(processedGuids.contains(entity.getGuid())) { if(entity == null || processedGuids.contains(entity.getGuid())) {
continue; continue;
} }
......
...@@ -377,7 +377,6 @@ ...@@ -377,7 +377,6 @@
<groupId>com.webcohesion.enunciate</groupId> <groupId>com.webcohesion.enunciate</groupId>
<artifactId>enunciate-core-annotations</artifactId> <artifactId>enunciate-core-annotations</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>
<build> <build>
......
...@@ -17,48 +17,73 @@ ...@@ -17,48 +17,73 @@
*/ */
package org.apache.atlas.web.resources; package org.apache.atlas.web.resources;
import com.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngine;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever; import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.impexp.*;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.type.AtlasTypeUtil; import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.script.*; import javax.script.Bindings;
import javax.script.ScriptContext;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.util.*; import java.util.*;
public class ExportService { public class ExportService {
private static final Logger LOG = LoggerFactory.getLogger(ExportService.class); private static final Logger LOG = LoggerFactory.getLogger(ExportService.class);
public static final String OPTION_ATTR_MATCH_TYPE = "matchType";
public static final String MATCH_TYPE_STARTS_WITH = "startsWith";
public static final String MATCH_TYPE_ENDS_WITH = "endsWith";
public static final String MATCH_TYPE_CONTAINS = "contains";
public static final String MATCH_TYPE_MATCHES = "matches";
private final AtlasTypeRegistry typeRegistry; private final AtlasTypeRegistry typeRegistry;
private final AtlasGraph atlasGraph; private final AtlasGraph atlasGraph;
private final EntityGraphRetriever entityGraphRetriever; private final EntityGraphRetriever entityGraphRetriever;
// query engine support // query engine support
private ScriptEngineManager scriptEngineManager; private final ScriptEngine scriptEngine;
private ScriptEngine scriptEngine; private final Bindings bindings;
private Bindings bindings; private final String queryByGuid = "g.V('__guid', startGuid).bothE().bothV().has('__guid').__guid.dedup().toList()";
private final String gremlinQuery = "g.V('__guid', startGuid).bothE().bothV().has('__guid').__guid.dedup().toList()"; final private String queryByAttrEquals = "g.V().has('__typeName','%s').has('%s', attrValue).has('__guid').__guid.toList()";
final private String queryByAttrStartWith = "g.V().has('__typeName','%s').filter({it.'%s'.startsWith(attrValue)}).has('__guid').__guid.toList()";
public ExportService(final AtlasTypeRegistry typeRegistry) { final private String queryByAttrEndsWith = "g.V().has('__typeName','%s').filter({it.'%s'.endsWith(attrValue)}).has('__guid').__guid.toList()";
final private String queryByAttrContains = "g.V().has('__typeName','%s').filter({it.'%s'.contains(attrValue)}).has('__guid').__guid.toList()";
final private String queryByAttrMatches = "g.V().has('__typeName','%s').filter({it.'%s'.matches(attrValue)}).has('__guid').__guid.toList()";
public ExportService(final AtlasTypeRegistry typeRegistry) throws AtlasBaseException {
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry); this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry);
this.atlasGraph = AtlasGraphProvider.getGraphInstance(); this.atlasGraph = AtlasGraphProvider.getGraphInstance();
initScriptEngine(); this.scriptEngine = new GremlinGroovyScriptEngine();
//Do not cache script compilations due to memory implications
scriptEngine.getContext().setAttribute("#jsr223.groovy.engine.keep.globals", "phantom", ScriptContext.ENGINE_SCOPE);
bindings = scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE);
} }
private class ExportContext { private class ExportContext {
...@@ -109,16 +134,18 @@ public class ExportService { ...@@ -109,16 +134,18 @@ public class ExportService {
} }
try { try {
AtlasEntity entity = entityGraphRetriever.toAtlasEntity(item); List<AtlasEntity> entities = getStartingEntity(item, context);
processEntity(entity, context); for (AtlasEntity entity: entities) {
processEntity(entity, context);
}
while (!context.guidsToProcess.isEmpty()) { while (!context.guidsToProcess.isEmpty()) {
String guid = context.guidsToProcess.remove(0); String guid = context.guidsToProcess.remove(0);
entity = entityGraphRetriever.toAtlasEntity(guid); AtlasEntity e = entityGraphRetriever.toAtlasEntity(guid);
processEntity(entity, context); processEntity(e, context);
} }
} catch (AtlasBaseException excp) { } catch (AtlasBaseException excp) {
context.result.setOperationStatus(AtlasExportResult.OperationStatus.PARTIAL_SUCCESS); context.result.setOperationStatus(AtlasExportResult.OperationStatus.PARTIAL_SUCCESS);
...@@ -131,19 +158,92 @@ public class ExportService { ...@@ -131,19 +158,92 @@ public class ExportService {
} }
} }
private List<AtlasEntity> getStartingEntity(AtlasObjectId item, ExportContext context) throws AtlasBaseException {
List<AtlasEntity> ret = new ArrayList<>();
if (StringUtils.isNotEmpty(item.getGuid())) {
AtlasEntity entity = entityGraphRetriever.toAtlasEntity(item);
if (entity != null) {
ret = Collections.singletonList(entity);
}
} else if (StringUtils.isNotEmpty(item.getTypeName()) && MapUtils.isNotEmpty(item.getUniqueAttributes())) {
String typeName = item.getTypeName();
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
if (entityType == null) {
throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, typeName);
}
AtlasExportRequest request = context.result.getRequest();
String matchType = null;
if (MapUtils.isNotEmpty(request.getOptions())) {
if (request.getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) {
matchType = request.getOptions().get(OPTION_ATTR_MATCH_TYPE).toString();
}
}
final String queryTemplate;
if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_STARTS_WITH)) {
queryTemplate = queryByAttrStartWith;
} else if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_ENDS_WITH)) {
queryTemplate = queryByAttrEndsWith;
} else if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_CONTAINS)) {
queryTemplate = queryByAttrContains;
} else if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_MATCHES)) {
queryTemplate = queryByAttrMatches;
} else { // default
queryTemplate = queryByAttrEquals;
}
for (Map.Entry<String, Object> e : item.getUniqueAttributes().entrySet()) {
String attrName = e.getKey();
Object attrValue = e.getValue();
AtlasAttribute attribute = entityType.getAttribute(attrName);
if (attribute == null || attrValue == null) {
continue;
}
String query = String.format(queryTemplate, typeName, attribute.getQualifiedName());
List<String> guids = executeGremlinScriptFor(query, "attrValue", attrValue.toString());
if (CollectionUtils.isNotEmpty(guids)) {
for (String guid : guids) {
AtlasEntity entity = entityGraphRetriever.toAtlasEntity(guid);
if (entity == null) {
continue;
}
ret.add(entity);
}
}
break;
}
LOG.info("export(item={}; matchType={}): found {} entities", item, matchType, ret.size());
}
return ret;
}
private void processEntity(AtlasEntity entity, ExportContext context) throws AtlasBaseException { private void processEntity(AtlasEntity entity, ExportContext context) throws AtlasBaseException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("==> processEntity({})", AtlasTypeUtil.getAtlasObjectId(entity)); LOG.debug("==> processEntity({})", AtlasTypeUtil.getAtlasObjectId(entity));
} }
if (!context.guidsProcessed.contains(entity.getGuid())) { if (!context.guidsProcessed.contains(entity.getGuid())) {
context.guidsProcessed.add(entity.getGuid());
context.result.getData().getEntityCreationOrder().add(entity.getGuid());
addTypesAsNeeded(entity.getTypeName(), context); addTypesAsNeeded(entity.getTypeName(), context);
addClassificationsAsNeeded(entity, context); addClassificationsAsNeeded(entity, context);
addEntity(entity, context); addEntity(entity, context);
context.guidsProcessed.add(entity.getGuid());
context.result.getData().getEntityCreationOrder().add(entity.getGuid());
getConnectedEntityGuids(entity, context); getConnectedEntityGuids(entity, context);
} }
...@@ -159,7 +259,11 @@ public class ExportService { ...@@ -159,7 +259,11 @@ public class ExportService {
LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size()); LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
} }
List<String> result = executeGremlinScriptFor(entity.getGuid()); List<String> result = executeGremlinScriptForHive(entity.getGuid());
if(result == null) {
return;
}
for (String guid : result) { for (String guid : result) {
if (!context.guidsProcessed.contains(guid)) { if (!context.guidsProcessed.contains(guid)) {
context.guidsToProcess.add(guid); context.guidsToProcess.add(guid);
...@@ -215,22 +319,20 @@ public class ExportService { ...@@ -215,22 +319,20 @@ public class ExportService {
} }
} }
private List<String> executeGremlinScriptFor(String guid) throws ScriptException { private List<String> executeGremlinScriptForHive(String guid) throws ScriptException {
return executeGremlinScriptFor(this.queryByGuid, "startGuid", guid);
bindings.put("startGuid", guid);
return (List<String>) atlasGraph.executeGremlinScript(this.scriptEngine, this.bindings, this.gremlinQuery, false);
} }
private void initScriptEngine() { private List<String> executeGremlinScriptFor(String query, String parameterName, String parameterValue) {
if (scriptEngineManager != null) { bindings.put(parameterName, parameterValue);
return; try {
return (List<String>) atlasGraph.executeGremlinScript(this.scriptEngine,
this.bindings,
query,
false);
} catch (ScriptException e) {
LOG.error("Script execution failed for query: ", query, e);
return null;
} }
scriptEngineManager = new ScriptEngineManager();
scriptEngine = scriptEngineManager.getEngineByName("gremlin-groovy");
bindings = scriptEngine.createBindings();
//Do not cache script compilations due to memory implications
scriptEngine.getContext().setAttribute("#jsr223.groovy.engine.keep.globals", "phantom", ScriptContext.ENGINE_SCOPE);
} }
} }
...@@ -34,6 +34,8 @@ import java.util.Map; ...@@ -34,6 +34,8 @@ import java.util.Map;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream; import java.util.zip.ZipInputStream;
import static org.apache.atlas.AtlasErrorCode.JSON_ERROR_OBJECT_MAPPER_NULL_RETURNED;
public class ZipSource implements EntityImportStream { public class ZipSource implements EntityImportStream {
private static final Logger LOG = LoggerFactory.getLogger(ZipSource.class); private static final Logger LOG = LoggerFactory.getLogger(ZipSource.class);
...@@ -80,7 +82,6 @@ public class ZipSource implements EntityImportStream { ...@@ -80,7 +82,6 @@ public class ZipSource implements EntityImportStream {
String entryName = zipEntry.getName().replace(".json", ""); String entryName = zipEntry.getName().replace(".json", "");
if (guidEntityJsonMap.containsKey(entryName)) continue; if (guidEntityJsonMap.containsKey(entryName)) continue;
if (zipEntry == null) continue;
byte[] buf = new byte[1024]; byte[] buf = new byte[1024];
...@@ -111,8 +112,12 @@ public class ZipSource implements EntityImportStream { ...@@ -111,8 +112,12 @@ public class ZipSource implements EntityImportStream {
try { try {
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(jsonData, clazz); T ret = mapper.readValue(jsonData, clazz);
if(ret == null) {
throw new AtlasBaseException(JSON_ERROR_OBJECT_MAPPER_NULL_RETURNED, clazz.toString());
}
return ret;
} catch (Exception e) { } catch (Exception e) {
throw new AtlasBaseException("Error converting file to JSON.", e); throw new AtlasBaseException("Error converting file to JSON.", e);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment