Commit 08b76391 by nikhilbonte Committed by nixonrodrigues

ATLAS-3256 Modify export API to process with relationshipAttributes

parent 1c399cc7
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.type.AtlasTypeRegistry;
import java.util.HashMap;
import java.util.Map;
public class EntitiesExtractor {
static final String PROPERTY_GUID = "__guid";
private static final String VERTEX_BASED_EXTRACT = "default";
private static final String INCREMENTAL_EXTRACT = "incremental";
private static final String RELATION_BASED_EXTRACT = "relationship";
private Map<String, ExtractStrategy> extractors = new HashMap<>();
private ExtractStrategy extractor;
public EntitiesExtractor(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry) {
extractors.put(VERTEX_BASED_EXTRACT, new VertexExtractor(atlasGraph, typeRegistry));
extractors.put(INCREMENTAL_EXTRACT, new IncrementalExportEntityProvider(atlasGraph));
extractors.put(RELATION_BASED_EXTRACT, new RelationshipAttributesExtractor(typeRegistry));
}
public void get(AtlasEntity entity, ExportService.ExportContext context) {
if(extractor == null) {
extractor = extractors.get(VERTEX_BASED_EXTRACT);
}
switch (context.fetchType) {
case CONNECTED:
extractor.connectedFetch(entity, context);
break;
case INCREMENTAL:
if (context.isHiveDBIncrementalSkipLineage()) {
extractors.get(INCREMENTAL_EXTRACT).fullFetch(entity, context);
break;
}
case FULL:
default:
extractor.fullFetch(entity, context);
}
}
public void setExtractor(AtlasEntityDef atlasEntityDef) {
extractor = extractUsing(atlasEntityDef);
}
public void close() {
for (ExtractStrategy es : extractors.values()) {
es.close();
}
}
private ExtractStrategy extractUsing(AtlasEntityDef atlasEntityDef) {
return (atlasEntityDef == null || atlasEntityDef.getRelationshipAttributeDefs().size() == 0)
? extractors.get(VERTEX_BASED_EXTRACT)
: extractors.get(RELATION_BASED_EXTRACT);
}
}
......@@ -25,7 +25,6 @@ import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasEnumDef;
......@@ -35,19 +34,13 @@ import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.util.UniqueList;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.util.AtlasGremlinQueryProvider;
import org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
......@@ -63,30 +56,23 @@ import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREM
public class ExportService {
private static final Logger LOG = LoggerFactory.getLogger(ExportService.class);
public static final String PROPERTY_GUID = "__guid";
private static final String PROPERTY_IS_PROCESS = "isProcess";
private final AtlasTypeRegistry typeRegistry;
private final String QUERY_BINDING_START_GUID = "startGuid";
private final StartEntityFetchByExportRequest startEntityFetchByExportRequest;
private final EntitiesExtractor entitiesExtractor;
private AuditsWriter auditsWriter;
private final AtlasGraph atlasGraph;
private final EntityGraphRetriever entityGraphRetriever;
private final AtlasGremlinQueryProvider gremlinQueryProvider;
private ExportTypeProcessor exportTypeProcessor;
private final HdfsPathEntityCreator hdfsPathEntityCreator;
private IncrementalExportEntityProvider incrementalExportEntityProvider;
@Inject
public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph,
AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator) {
this.typeRegistry = typeRegistry;
this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry);
this.atlasGraph = atlasGraph;
this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE;
this.auditsWriter = auditsWriter;
this.hdfsPathEntityCreator = hdfsPathEntityCreator;
this.startEntityFetchByExportRequest = new StartEntityFetchByExportRequest(atlasGraph, typeRegistry, AtlasGremlinQueryProvider.INSTANCE);
this.entitiesExtractor = new EntitiesExtractor(atlasGraph, typeRegistry);
}
public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName,
......@@ -95,7 +81,7 @@ public class ExportService {
AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP,
hostName, startTime, getCurrentChangeMarker());
ExportContext context = new ExportContext(atlasGraph, result, exportSink);
ExportContext context = new ExportContext(result, exportSink);
exportTypeProcessor = new ExportTypeProcessor(typeRegistry);
try {
......@@ -109,12 +95,12 @@ public class ExportService {
} catch(Exception ex) {
LOG.error("Operation failed: ", ex);
} finally {
atlasGraph.releaseGremlinScriptEngine(context.scriptEngine);
entitiesExtractor.close();
LOG.info("<== export(user={}, from={}): status {}: changeMarker: {}",
userName, requestingIP, context.result.getOperationStatus(), context.result.getChangeMarker());
context.clear();
result.clear();
incrementalExportEntityProvider = null;
}
return context.result;
......@@ -203,7 +189,9 @@ public class ExportService {
}
private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, ExportContext context) {
debugLog("==> processObjectId({})", item);
if (LOG.isDebugEnabled()) {
LOG.debug("==> processObjectId({})", item);
}
try {
List<String> entityGuids = getStartingEntity(item, context);
......@@ -211,9 +199,10 @@ public class ExportService {
return AtlasExportResult.OperationStatus.FAIL;
}
entitiesExtractor.setExtractor(typeRegistry.getEntityDefByName(item.getTypeName()));
for (String guid : entityGuids) {
processEntityGuid(guid, context);
populateEntitesForIncremental(guid, context);
}
while (!context.guidsToProcess.isEmpty()) {
......@@ -227,13 +216,16 @@ public class ExportService {
context.lineageProcessed.addAll(context.lineageToProcess.getList());
context.lineageToProcess.clear();
}
context.isSkipConnectedFetch = false;
}
} catch (AtlasBaseException excp) {
LOG.error("Fetching entity failed for: {}", item, excp);
return AtlasExportResult.OperationStatus.FAIL;
}
debugLog("<== processObjectId({})", item);
if (LOG.isDebugEnabled()) {
LOG.debug("<== processObjectId({})", item);
}
return AtlasExportResult.OperationStatus.SUCCESS;
}
......@@ -245,181 +237,41 @@ public class ExportService {
return startEntityFetchByExportRequest.get(context.result.getRequest(), item);
}
private void debugLog(String s, Object... params) {
if (!LOG.isDebugEnabled()) {
return;
}
LOG.debug(s, params);
}
private void processEntityGuid(String guid, ExportContext context) throws AtlasBaseException {
debugLog("==> processEntityGuid({})", guid);
if (LOG.isDebugEnabled()) {
LOG.debug("==> processEntityGuid({})", guid);
}
if (context.guidsProcessed.contains(guid)) {
return;
}
TraversalDirection direction = context.guidDirection.get(guid);
AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
processEntity(entityWithExtInfo, context, direction);
debugLog("<== processEntityGuid({})", guid);
processEntity(entityWithExtInfo, context);
if (LOG.isDebugEnabled()) {
LOG.debug("<== processEntityGuid({})", guid);
}
}
public void processEntity(AtlasEntityWithExtInfo entityWithExtInfo,
ExportContext context,
TraversalDirection direction) throws AtlasBaseException {
public void processEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException {
addEntity(entityWithExtInfo, context);
exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), context);
context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid());
getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), context, direction);
entitiesExtractor.get(entityWithExtInfo.getEntity(), context);
if (entityWithExtInfo.getReferredEntities() != null) {
for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
exportTypeProcessor.addTypes(e, context);
getConntedEntitiesBasedOnOption(e, context, direction);
entitiesExtractor.get(e, context);
}
context.guidsProcessed.addAll(entityWithExtInfo.getReferredEntities().keySet());
}
}
private void getConntedEntitiesBasedOnOption(AtlasEntity entity, ExportContext context, TraversalDirection direction) {
switch (context.fetchType) {
case CONNECTED:
getEntityGuidsForConnectedFetch(entity, context, direction);
break;
case INCREMENTAL:
if(context.isHiveDBIncrementalSkipLineage()) {
break;
}
case FULL:
default:
getEntityGuidsForFullFetch(entity, context);
}
}
private void populateEntitesForIncremental(String topLevelEntityGuid, ExportContext context) {
if (context.isHiveDBIncrementalSkipLineage() == false || incrementalExportEntityProvider != null) {
return;
}
incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph, context.scriptEngine);
incrementalExportEntityProvider.populate(topLevelEntityGuid, context.changeMarker, context.guidsToProcess);
}
private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) {
if (direction == null || direction == TraversalDirection.UNKNOWN) {
getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.INWARD);
} else {
if (isProcessEntity(entity)) {
direction = TraversalDirection.OUTWARD;
}
getConnectedEntityGuids(entity, context, direction);
}
}
private boolean isProcessEntity(AtlasEntity entity) {
String typeName = entity.getTypeName();
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
return entityType.isSubTypeOf(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS);
}
private void getConnectedEntityGuids(AtlasEntity entity, ExportContext context, TraversalDirection... directions) {
if(directions == null) {
return;
}
for (TraversalDirection direction : directions) {
String query = getQueryForTraversalDirection(direction);
if(LOG.isDebugEnabled()) {
debugLog("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), query);
}
context.bindings.clear();
context.bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
List<Map<String, Object>> result = executeGremlinQuery(query, context);
if (CollectionUtils.isEmpty(result)) {
continue;
}
for (Map<String, Object> hashMap : result) {
String guid = (String) hashMap.get(PROPERTY_GUID);
TraversalDirection currentDirection = context.guidDirection.get(guid);
boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
if(context.skipLineage && isLineage) continue;
if (currentDirection == null) {
context.addToBeProcessed(isLineage, guid, direction);
} else if (currentDirection == TraversalDirection.OUTWARD && direction == TraversalDirection.INWARD) {
// the entity should be reprocessed to get inward entities
context.guidsProcessed.remove(guid);
context.addToBeProcessed(isLineage, guid, direction);
}
}
if(LOG.isDebugEnabled()) {
debugLog("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size());
}
}
}
private String getQueryForTraversalDirection(TraversalDirection direction) {
switch (direction) {
case INWARD:
return this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_IN_EDGE);
default:
case OUTWARD:
return this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_OUT_EDGE);
}
}
private void getEntityGuidsForFullFetch(AtlasEntity entity, ExportContext context) {
if(LOG.isDebugEnabled()) {
debugLog("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
}
String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL);
context.bindings.clear();
context.bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
List<Map<String, Object>> result = executeGremlinQuery(query, context);
if (CollectionUtils.isEmpty(result)) {
return;
}
for (Map<String, Object> hashMap : result) {
String guid = (String) hashMap.get(PROPERTY_GUID);
boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
if(context.getSkipLineage() && isLineage) continue;
if (!context.guidsProcessed.contains(guid)) {
context.addToBeProcessed(isLineage, guid, TraversalDirection.BOTH);
}
}
if(LOG.isDebugEnabled()) {
debugLog("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}",
entity.getGuid(), result.size(), context.guidsToProcess.size());
}
}
private void addEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException {
if(context.sink.hasEntity(entityWithExtInfo.getEntity().getGuid())) {
......@@ -448,15 +300,6 @@ public class ExportService {
context.reportProgress();
}
private List<Map<String, Object>> executeGremlinQuery(String query, ExportContext context) {
try {
return (List<Map<String, Object>>) atlasGraph.executeGremlinScript(context.scriptEngine, context.bindings, query, false);
} catch (ScriptException e) {
LOG.error("Script execution failed for query: ", query, e);
return null;
}
}
public enum TraversalDirection {
UNKNOWN,
INWARD,
......@@ -493,7 +336,7 @@ public class ExportService {
final UniqueList<String> entityCreationOrder = new UniqueList<>();
final Set<String> guidsProcessed = new HashSet<>();
final private UniqueList<String> guidsToProcess = new UniqueList<>();
final UniqueList<String> guidsToProcess = new UniqueList<>();
final UniqueList<String> lineageToProcess = new UniqueList<>();
final Set<String> lineageProcessed = new HashSet<>();
final Map<String, TraversalDirection> guidDirection = new HashMap<>();
......@@ -505,25 +348,23 @@ public class ExportService {
final AtlasExportResult result;
private final ZipSink sink;
private final ScriptEngine scriptEngine;
private final Map<String, Object> bindings;
private final ExportFetchType fetchType;
private final boolean skipLineage;
private final long changeMarker;
final ExportFetchType fetchType;
final boolean skipLineage;
final long changeMarker;
boolean isSkipConnectedFetch;
private final boolean isHiveDBIncremental;
private int progressReportCount = 0;
ExportContext(AtlasGraph atlasGraph, AtlasExportResult result, ZipSink sink) throws AtlasBaseException {
ExportContext(AtlasExportResult result, ZipSink sink) {
this.result = result;
this.sink = sink;
scriptEngine = atlasGraph.getGremlinScriptEngine();
bindings = new HashMap<>();
fetchType = ExportFetchType.from(result.getRequest().getFetchTypeOptionValue());
skipLineage = result.getRequest().getSkipLineageOptionValue();
this.changeMarker = result.getRequest().getChangeTokenFromOptions();
this.isHiveDBIncremental = checkHiveDBIncrementalSkipLineage(result.getRequest());
this.isSkipConnectedFetch = false;
}
private boolean checkHiveDBIncrementalSkipLineage(AtlasExportRequest request) {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import org.apache.atlas.model.instance.AtlasEntity;
public interface ExtractStrategy {
void connectedFetch(AtlasEntity entity, ExportService.ExportContext context);
void fullFetch(AtlasEntity entity, ExportService.ExportContext context);
void close();
}
......@@ -18,6 +18,8 @@
package org.apache.atlas.repository.impexp;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.util.UniqueList;
import org.slf4j.Logger;
......@@ -28,11 +30,10 @@ import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class IncrementalExportEntityProvider {
public class IncrementalExportEntityProvider implements ExtractStrategy {
private static final Logger LOG = LoggerFactory.getLogger(IncrementalExportEntityProvider.class);
private static final String QUERY_PARAMETER_START_GUID = "startGuid";
......@@ -50,9 +51,23 @@ public class IncrementalExportEntityProvider {
private ScriptEngine scriptEngine;
@Inject
public IncrementalExportEntityProvider(AtlasGraph atlasGraph, ScriptEngine scriptEngine) {
public IncrementalExportEntityProvider(AtlasGraph atlasGraph) {
this.atlasGraph = atlasGraph;
this.scriptEngine = scriptEngine;
try {
this.scriptEngine = atlasGraph.getGremlinScriptEngine();
} catch (AtlasBaseException e) {
LOG.error("Error instantiating script engine.", e);
}
}
@Override
public void fullFetch(AtlasEntity entity, ExportService.ExportContext context) {
populate(entity.getGuid(), context.changeMarker, context.guidsToProcess);
}
@Override
public void connectedFetch(AtlasEntity entity, ExportService.ExportContext context) {
}
public void populate(String dbEntityGuid, long timeStamp, UniqueList<String> guidsToProcess) {
......@@ -63,6 +78,13 @@ public class IncrementalExportEntityProvider {
}
}
@Override
public void close() {
if (scriptEngine != null) {
atlasGraph.releaseGremlinScriptEngine(scriptEngine);
}
}
private void partial(String dbEntityGuid, long timeStamp, UniqueList<String> guidsToProcess) {
guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_TABLE, timeStamp));
guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_SD, timeStamp));
......@@ -98,7 +120,7 @@ public class IncrementalExportEntityProvider {
}
for (Map<String, Object> item : result) {
guids.add((String) item.get(ExportService.PROPERTY_GUID));
guids.add((String) item.get(EntitiesExtractor.PROPERTY_GUID));
}
return guids;
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
public class RelationshipAttributesExtractor implements ExtractStrategy {
private static final Logger LOG = LoggerFactory.getLogger(RelationshipAttributesExtractor.class);
private final AtlasTypeRegistry typeRegistry;
public RelationshipAttributesExtractor(AtlasTypeRegistry typeRegistry) {
this.typeRegistry = typeRegistry;
}
@Override
public void fullFetch(AtlasEntity entity, ExportService.ExportContext context) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> fullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
}
List<AtlasRelatedObjectId> atlasRelatedObjectIdList = getRelatedObjectIds(entity);
for (AtlasRelatedObjectId ar : atlasRelatedObjectIdList) {
boolean isLineage = isLineageType(ar.getTypeName());
if (context.skipLineage && isLineage) {
continue;
}
context.addToBeProcessed(isLineage, ar.getGuid(), ExportService.TraversalDirection.BOTH);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== fullFetch({}): guidsToProcess {}", entity.getGuid(), context.guidsToProcess.size());
}
}
@Override
public void connectedFetch(AtlasEntity entity, ExportService.ExportContext context) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> connectedFetch({}): guidsToProcess {} isSkipConnectedFetch :{}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), context.isSkipConnectedFetch);
}
List<AtlasRelatedObjectId> atlasRelatedObjectIdList = getRelatedObjectIds(entity);
for (AtlasRelatedObjectId ar : atlasRelatedObjectIdList) {
boolean isLineage = isLineageType(ar.getTypeName());
if (context.skipLineage && isLineage) {
continue;
}
if (!context.isSkipConnectedFetch || isLineage) {
context.addToBeProcessed(isLineage, ar.getGuid(), ExportService.TraversalDirection.BOTH);
}
}
if(isLineageType(entity.getTypeName())){
context.isSkipConnectedFetch = false;
}else{
context.isSkipConnectedFetch = true;
}
if (LOG.isDebugEnabled()) {
LOG.debug("==> connectedFetch({}): guidsToProcess {}, isSkipConnectedFetch :{}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), context.isSkipConnectedFetch);
}
}
@Override
public void close() {
}
private boolean isLineageType(String typeName) {
AtlasEntityDef entityDef = typeRegistry.getEntityDefByName(typeName);
return entityDef.getSuperTypes().contains("Process");
}
private List<AtlasRelatedObjectId> getRelatedObjectIds(AtlasEntity entity) {
List<AtlasRelatedObjectId> relatedObjectIds = new ArrayList<>();
for (Object o : entity.getRelationshipAttributes().values()) {
if (o instanceof AtlasRelatedObjectId) {
relatedObjectIds.add((AtlasRelatedObjectId) o);
} else if (o instanceof Collection) {
relatedObjectIds.addAll((List) o);
}
}
return relatedObjectIds;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.util.AtlasGremlinQueryProvider;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.atlas.repository.impexp.EntitiesExtractor.PROPERTY_GUID;
public class VertexExtractor implements ExtractStrategy {
private static final Logger LOG = LoggerFactory.getLogger(VertexExtractor.class);
private static final String PROPERTY_IS_PROCESS = "isProcess";
private static final String QUERY_BINDING_START_GUID = "startGuid";
private final AtlasGremlinQueryProvider gremlinQueryProvider;
private final Map<String, Object> bindings;
private AtlasGraph atlasGraph;
private AtlasTypeRegistry typeRegistry;
private ScriptEngine scriptEngine;
public VertexExtractor(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry) {
this.atlasGraph = atlasGraph;
this.typeRegistry = typeRegistry;
try {
this.scriptEngine = atlasGraph.getGremlinScriptEngine();
} catch (AtlasBaseException e) {
LOG.error("Script Engine: Instantiation failed!");
}
this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE;
this.bindings = new HashMap<>();
}
@Override
public void fullFetch(AtlasEntity entity, ExportService.ExportContext context) {
if (LOG.isDebugEnabled()){
LOG.debug("==> fullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
}
String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_BY_GUID_FULL);
bindings.clear();
bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
List<Map<String, Object>> result = executeGremlinQuery(query, context);
if (CollectionUtils.isEmpty(result)) {
return;
}
for (Map<String, Object> hashMap : result) {
String guid = (String) hashMap.get(PROPERTY_GUID);
boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
if (context.getSkipLineage() && isLineage) continue;
if (!context.guidsProcessed.contains(guid)) {
context.addToBeProcessed(isLineage, guid, ExportService.TraversalDirection.BOTH);
}
}
}
@Override
public void connectedFetch(AtlasEntity entity, ExportService.ExportContext context) {
if (LOG.isDebugEnabled()){
LOG.debug("==> connectedFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
}
ExportService.TraversalDirection direction = context.guidDirection.get(entity.getGuid());
if (direction == null || direction == ExportService.TraversalDirection.UNKNOWN) {
getConnectedEntityGuids(entity, context, ExportService.TraversalDirection.OUTWARD, ExportService.TraversalDirection.INWARD);
} else {
if (isProcessEntity(entity)) {
direction = ExportService.TraversalDirection.OUTWARD;
}
getConnectedEntityGuids(entity, context, direction);
}
}
@Override
public void close() {
if (scriptEngine != null) {
atlasGraph.releaseGremlinScriptEngine(scriptEngine);
}
}
private void getConnectedEntityGuids(AtlasEntity entity, ExportService.ExportContext context, ExportService.TraversalDirection... directions) {
if (directions == null) {
return;
}
for (ExportService.TraversalDirection direction : directions) {
String query = getQueryForTraversalDirection(direction);
bindings.clear();
bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
List<Map<String, Object>> result = executeGremlinQuery(query, context);
if (CollectionUtils.isEmpty(result)) {
continue;
}
for (Map<String, Object> hashMap : result) {
String guid = (String) hashMap.get(PROPERTY_GUID);
ExportService.TraversalDirection currentDirection = context.guidDirection.get(guid);
boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
if (context.skipLineage && isLineage) continue;
if (currentDirection == null) {
context.addToBeProcessed(isLineage, guid, direction);
} else if (currentDirection == ExportService.TraversalDirection.OUTWARD && direction == ExportService.TraversalDirection.INWARD) {
// the entity should be reprocessed to get inward entities
context.guidsProcessed.remove(guid);
context.addToBeProcessed(isLineage, guid, direction);
}
}
}
}
private boolean isProcessEntity(AtlasEntity entity) {
String typeName = entity.getTypeName();
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
return entityType.isSubTypeOf(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS);
}
private String getQueryForTraversalDirection(ExportService.TraversalDirection direction) {
switch (direction) {
case INWARD:
return this.gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_IN_EDGE);
default:
case OUTWARD:
return this.gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_OUT_EDGE);
}
}
private List<Map<String, Object>> executeGremlinQuery(String query, ExportService.ExportContext context) {
try {
return (List<Map<String, Object>>) atlasGraph.executeGremlinScript(scriptEngine, bindings, query, false);
} catch (ScriptException e) {
LOG.error("Script execution failed for query: ", query, e);
return null;
}
}
}
......@@ -63,7 +63,7 @@ public class IncrementalExportEntityProviderTest extends ExportImportTestBase {
verifyCreatedEntities(entityStore, entityGuids, 2);
gremlinScriptEngine = atlasGraph.getGremlinScriptEngine();
incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph, gremlinScriptEngine);
incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph);
}
@AfterClass
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import org.apache.atlas.RequestContext;
import org.apache.atlas.TestModules;
import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.runner.LocalSolrRunner;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.testng.ITestContext;
import org.testng.annotations.Test;
import org.testng.annotations.Guice;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.AfterClass;
import org.testng.annotations.DataProvider;
import javax.inject.Inject;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.util.HashMap;
import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.*;
import static org.testng.Assert.*;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@Guice(modules = TestModules.TestOnlyModule.class)
public class RelationshipAttributesExtractorTest {
private static final String EXPORT_FULL = "full";
private static final String EXPORT_CONNECTED = "connected";
private static final String QUALIFIED_NAME_DB = "db_test_1@02052019";
private static final String QUALIFIED_NAME_TABLE_LINEAGE = "db_test_1.test_tbl_ctas_2@02052019";
private static final String QUALIFIED_NAME_TABLE_NON_LINEAGE = "db_test_1.test_tbl_1@02052019";
private static final String GUID_DB = "f0b72ab4-7452-4e42-ac74-2aee7728cce4";
private static final String GUID_TABLE_1 = "4d5adf00-2c9b-4877-ad23-c41fd7319150";
private static final String GUID_TABLE_2 = "8d0b834c-61ce-42d8-8f66-6fa51c36bccb";
private static final String GUID_TABLE_CTAS_2 = "eaec545b-3ac7-4e1b-a497-bd4a2b6434a2";
private static final String GUID_HIVE_PROCESS = "bd3138b2-f29e-4226-b859-de25eaa1c18b";
@Inject
private ImportService importService;
@Inject
AtlasTypeRegistry typeRegistry;
@Inject
private AtlasTypeDefStore typeDefStore;
@Inject
private ExportService exportService;
@BeforeClass
public void setup() throws IOException, AtlasBaseException {
loadBaseModel();
loadHiveModel();
}
@BeforeTest
public void setupTest() {
RequestContext.clear();
RequestContext.get().setUser(TestUtilsV2.TEST_USER, null);
}
@AfterClass
public void clear() throws Exception {
AtlasGraphProvider.cleanup();
if (useLocalSolr()) {
LocalSolrRunner.stop();
}
}
@DataProvider(name = "hiveDb")
public static Object[][] getData(ITestContext context) throws IOException, AtlasBaseException {
return getZipSource("hive_db_lineage.zip");
}
@Test(dataProvider = "hiveDb")
public void importHiveDb(ZipSource zipSource) throws AtlasBaseException, IOException {
runImportWithNoParameters(importService, zipSource);
}
@Test(dependsOnMethods = "importHiveDb")
public void exportDBFull() throws Exception {
ZipSource source = runExport(getExportRequestForHiveDb(QUALIFIED_NAME_DB, EXPORT_FULL, false));
verifyDBFull(source);
}
@Test(dependsOnMethods = "importHiveDb")
public void exportDBFullSkipLineageFull() throws Exception {
ZipSource source = runExport(getExportRequestForHiveDb(QUALIFIED_NAME_DB, EXPORT_FULL, true));
verifyDBFullSkipLineageFull(source);
}
@Test(dependsOnMethods = "importHiveDb")
public void exportTableWithLineageFull() throws Exception {
ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_FULL, false));
verifyTableWithLineageFull(source);
}
@Test(dependsOnMethods = "importHiveDb")
public void exportTableWithLineageSkipLineageFull() throws Exception {
ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_FULL, true));
verifyTableWithLineageSkipLineageFull(source);
}
@Test(dependsOnMethods = "importHiveDb")
public void exportTableWithoutLineageFull() throws Exception {
ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_NON_LINEAGE, EXPORT_FULL, false));
verifyTableWithoutLineageFull(source);
}
@Test(dependsOnMethods = "importHiveDb")
public void exportTableWithoutLineageSkipLineageFull() throws Exception {
ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_NON_LINEAGE, EXPORT_FULL, true));
verifyTableWithoutLineageSkipLineageFull(source);
}
@Test(dependsOnMethods = "importHiveDb")
public void exportDBConn() throws Exception {
ZipSource source = runExport(getExportRequestForHiveDb(QUALIFIED_NAME_DB, EXPORT_CONNECTED, false));
verifyDBConn(source);
}
@Test(dependsOnMethods = "importHiveDb")
public void exportDBSkipLineageConn() throws Exception {
ZipSource source = runExport(getExportRequestForHiveDb(QUALIFIED_NAME_DB, EXPORT_CONNECTED, true));
verifyDBSkipLineageConn(source);
}
@Test(dependsOnMethods = "importHiveDb")
public void exportTableWithLineageConn() throws Exception {
ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_CONNECTED, false));
verifyTableWithLineageConn(source);
}
@Test(dependsOnMethods = "importHiveDb")
public void exportTableWithLineageSkipLineageConn() throws Exception {
ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_CONNECTED, true));
verifyTableWithLineageSkipLineageConn(source);
}
@Test(dependsOnMethods = "importHiveDb")
public void exportTableWithoutLineageConn() throws Exception {
ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_NON_LINEAGE, EXPORT_CONNECTED, false));
verifyTableWithoutLineageConn(source);
}
@Test(dependsOnMethods = "importHiveDb")
public void exportTableWithoutLineageSkipLineageConn() throws Exception {
ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_NON_LINEAGE, EXPORT_CONNECTED, true));
verifyTableWithoutLineageSkipLineageConn(source);
}
private void loadHiveModel() throws IOException, AtlasBaseException {
loadModelFromJson("1000-Hadoop/1030-hive_model.json", typeDefStore, typeRegistry);
}
private void loadBaseModel() throws IOException, AtlasBaseException {
loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
}
private AtlasExportRequest getExportRequestForHiveDb(String hiveDbName, String fetchType, boolean skipLineage) {
AtlasExportRequest request = new AtlasExportRequest();
List<AtlasObjectId> itemsToExport = new ArrayList<>();
itemsToExport.add(new AtlasObjectId("hive_db", "qualifiedName", hiveDbName));
request.setItemsToExport(itemsToExport);
request.setOptions(getOptionsMap(fetchType, skipLineage));
return request;
}
private AtlasExportRequest getExportRequestForHiveTable(String hiveTableName, String fetchType, boolean skipLineage) {
AtlasExportRequest request = new AtlasExportRequest();
List<AtlasObjectId> itemsToExport = new ArrayList<>();
itemsToExport.add(new AtlasObjectId("hive_table", "qualifiedName", hiveTableName));
request.setItemsToExport(itemsToExport);
request.setOptions(getOptionsMap(fetchType, skipLineage));
return request;
}
private Map<String, Object> getOptionsMap(String fetchType, boolean skipLineage){
Map<String, Object> optionsMap = new HashMap<>();
optionsMap.put("fetchType", fetchType.isEmpty() ? "full" : fetchType );
optionsMap.put("skipLineage", skipLineage);
return optionsMap;
}
private ZipSource runExport(AtlasExportRequest request) throws AtlasBaseException, IOException {
final String requestingIP = "1.0.0.0";
final String hostName = "localhost";
final String userName = "admin";
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ZipSink zipSink = new ZipSink(baos);
AtlasExportResult result = exportService.run(zipSink, request, userName, hostName, requestingIP);
zipSink.close();
ByteArrayInputStream bis = new ByteArrayInputStream(baos.toByteArray());
ZipSource zipSource = new ZipSource(bis);
return zipSource;
}
private void verifyDBFull(ZipSource zipSource) {
assertNotNull(zipSource.getCreationOrder());
assertEquals(zipSource.getCreationOrder().size(), 5);
assertTrue(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2, GUID_HIVE_PROCESS);
}
private void verifyDBFullSkipLineageFull(ZipSource zipSource) {
assertNotNull(zipSource.getCreationOrder());
assertEquals(zipSource.getCreationOrder().size(), 4);
assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2);
}
private void verifyTableWithLineageFull(ZipSource zipSource) {
assertNotNull(zipSource.getCreationOrder());
assertEquals(zipSource.getCreationOrder().size(), 5);
assertTrue(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2, GUID_HIVE_PROCESS);
}
private void verifyTableWithLineageSkipLineageFull(ZipSource zipSource) {
assertNotNull(zipSource.getCreationOrder());
assertEquals(zipSource.getCreationOrder().size(), 4);
assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2);
}
private void verifyTableWithoutLineageFull(ZipSource zipSource) {
assertNotNull(zipSource.getCreationOrder());
assertEquals(zipSource.getCreationOrder().size(), 5);
assertTrue(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2,GUID_HIVE_PROCESS);
}
private void verifyTableWithoutLineageSkipLineageFull(ZipSource zipSource) {
assertNotNull(zipSource.getCreationOrder());
assertEquals(zipSource.getCreationOrder().size(), 4);
assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2);
}
private void verifyDBConn(ZipSource zipSource) {
assertNotNull(zipSource.getCreationOrder());
assertEquals(zipSource.getCreationOrder().size(), 5);
assertTrue(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2, GUID_HIVE_PROCESS);
}
private void verifyDBSkipLineageConn(ZipSource zipSource) {
assertNotNull(zipSource.getCreationOrder());
assertEquals(zipSource.getCreationOrder().size(), 4);
assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2);
}
private void verifyTableWithLineageConn(ZipSource zipSource) {
assertNotNull(zipSource.getCreationOrder());
assertEquals(zipSource.getCreationOrder().size(), 4);
assertTrue(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_2, GUID_TABLE_CTAS_2, GUID_HIVE_PROCESS);
}
private void verifyTableWithLineageSkipLineageConn(ZipSource zipSource) {
assertNotNull(zipSource.getCreationOrder());
assertEquals(zipSource.getCreationOrder().size(),2);
assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_CTAS_2);;
}
private void verifyTableWithoutLineageConn(ZipSource zipSource) {
assertNotNull(zipSource.getCreationOrder());
assertEquals(zipSource.getCreationOrder().size(), 2);
assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1);
}
private void verifyTableWithoutLineageSkipLineageConn(ZipSource zipSource) {
assertNotNull(zipSource.getCreationOrder());
assertEquals(zipSource.getCreationOrder().size(), 2);;
assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1);
}
private void verifyExpectedEntities(List<String> fileNames, String... guids){
assertEquals(fileNames.size(), guids.length);
for (String guid : guids) {
assertTrue(fileNames.contains(guid.toLowerCase()));
}
}
private List<String> getFileNames(ZipSource zipSource){
List<String> ret = new ArrayList<>();
assertTrue(zipSource.hasNext());
while (zipSource.hasNext()){
AtlasEntity atlasEntity = zipSource.next();
assertNotNull(atlasEntity);
ret.add(atlasEntity.getGuid());
}
return ret;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment