Commit 2b38550b by Ashutosh Mestry

ATLAS-3737: Refactoring: Support multiple instances of AtlasGraph

parent 40fa099c
......@@ -319,8 +319,8 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE
}
}
private JanusGraph getGraph() {
return getGraphInstance();
public JanusGraph getGraph() {
return this.janusGraph;
}
@Override
......
......@@ -56,7 +56,7 @@ public class NativeJanusGraphQuery implements NativeTinkerpopGraphQuery<AtlasJan
private JanusGraphQuery<?> query;
public NativeJanusGraphQuery(AtlasJanusGraph graph) {
this.query = AtlasJanusGraphDatabase.getGraphInstance().query();
this.query = graph.getGraph().query();
this.graph = graph;
}
......
......@@ -99,7 +99,7 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
SearchTracker searchTracker,
UserProfileService userProfileService) throws AtlasException {
this.graph = graph;
this.entityRetriever = new EntityGraphRetriever(typeRegistry);
this.entityRetriever = new EntityGraphRetriever(this.graph, typeRegistry);
this.indexer = indexer;
this.searchTracker = searchTracker;
this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE;
......
......@@ -94,7 +94,7 @@ public class EntityLineageService implements AtlasLineageService {
EntityLineageService(AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph) {
this.graph = atlasGraph;
this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE;
this.entityRetriever = new EntityGraphRetriever(typeRegistry);
this.entityRetriever = new EntityGraphRetriever(atlasGraph, typeRegistry);
this.atlasTypeRegistry = typeRegistry;
}
......@@ -259,7 +259,7 @@ public class EntityLineageService implements AtlasLineageService {
}
if (isDataSet) {
AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(guid);
AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid);
if (direction == INPUT || direction == BOTH) {
traverseEdges(datasetVertex, true, depth, ret);
......@@ -269,7 +269,7 @@ public class EntityLineageService implements AtlasLineageService {
traverseEdges(datasetVertex, false, depth, ret);
}
} else {
AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(guid);
AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid);
// make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1'
if (direction == INPUT || direction == BOTH) {
......
......@@ -34,6 +34,7 @@ import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
import org.apache.atlas.model.instance.GuidMapping;
import org.apache.atlas.model.legacy.EntityResult;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct;
......@@ -68,11 +69,11 @@ public class AtlasInstanceConverter {
private final EntityGraphRetriever entityGraphRetrieverIgnoreRelationshipAttrs;
@Inject
public AtlasInstanceConverter(AtlasTypeRegistry typeRegistry, AtlasFormatConverters instanceFormatters) {
public AtlasInstanceConverter(AtlasGraph graph, AtlasTypeRegistry typeRegistry, AtlasFormatConverters instanceFormatters) {
this.typeRegistry = typeRegistry;
this.instanceFormatters = instanceFormatters;
this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
this.entityGraphRetrieverIgnoreRelationshipAttrs = new EntityGraphRetriever(typeRegistry, true);
this.entityGraphRetriever = new EntityGraphRetriever(graph, typeRegistry);
this.entityGraphRetrieverIgnoreRelationshipAttrs = new EntityGraphRetriever(graph, typeRegistry, true);
}
public Referenceable[] getReferenceables(Collection<AtlasEntity> entities) throws AtlasBaseException {
......
......@@ -25,6 +25,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasBuiltInTypes;
......@@ -68,13 +69,13 @@ public class FullTextMapperV2 implements IFullTextMapper {
@Inject
public FullTextMapperV2(AtlasTypeRegistry typeRegistry, Configuration configuration) {
public FullTextMapperV2(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry, Configuration configuration) {
this.typeRegistry = typeRegistry;
this.configuration = configuration;
followReferences = this.configuration != null && this.configuration.getBoolean(FULL_TEXT_FOLLOW_REFERENCES, false);
// If followReferences = false then ignore relationship attr loading
entityGraphRetriever = new EntityGraphRetriever(typeRegistry, !followReferences);
entityGraphRetriever = new EntityGraphRetriever(atlasGraph, typeRegistry, !followReferences);
}
/**
......
......@@ -30,8 +30,11 @@ import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.listener.ChangedTypeDefs;
import org.apache.atlas.listener.TypeDefChangeListener;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasEnumDef;
import org.apache.atlas.model.typedef.AtlasRelationshipDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.repository.Constants;
......@@ -41,8 +44,12 @@ import org.apache.atlas.repository.graphdb.*;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.type.*;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.solr.common.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
......@@ -192,6 +199,10 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
//resolve index fields names for the new entity attributes.
resolveIndexFieldNames(management, changedTypeDefs);
createEdgeLabels(management, changedTypeDefs.getCreatedTypeDefs());
createEdgeLabels(management, changedTypeDefs.getUpdatedTypeDefs());
//Commit indexes
commit(management);
} catch (RepositoryException | IndexException e) {
......@@ -349,6 +360,13 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
// create fulltext indexes
createFullTextIndex(management, ENTITY_TEXT_PROPERTY_KEY, String.class, SINGLE);
createPropertyKey(management, IS_PROXY_KEY, Boolean.class, SINGLE);
createPropertyKey(management, PROVENANCE_TYPE_KEY, Integer.class, SINGLE);
createPropertyKey(management, HOME_ID_KEY, String.class, SINGLE);
createEdgeLabel(management, Constants.TERM_ASSIGNMENT_LABEL);
createEdgeLabel(management, Constants.CLASSIFICATION_LABEL);
commit(management);
LOG.info("Index creation for global keys complete.");
......@@ -689,6 +707,14 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
String label = Constants.INTERNAL_PROPERTY_KEY_PREFIX + propertyName;
createEdgeLabelUsingLabelName(management, label);
}
private void createEdgeLabelUsingLabelName(final AtlasGraphManagement management, final String label) {
if (StringUtils.isEmpty(label)) {
return;
}
org.apache.atlas.repository.graphdb.AtlasEdgeLabel edgeLabel = management.getEdgeLabel(label);
if (edgeLabel == null) {
......@@ -999,4 +1025,47 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
}
}
private void createEdgeLabels(AtlasGraphManagement management, List<? extends AtlasBaseTypeDef> typeDefs) {
if (CollectionUtils.isEmpty(typeDefs)) {
return;
}
for (AtlasBaseTypeDef typeDef : typeDefs) {
if (typeDef instanceof AtlasEntityDef) {
AtlasEntityDef entityDef = (AtlasEntityDef) typeDef;
createEdgeLabelsForStruct(management, entityDef);
} else if (typeDef instanceof AtlasRelationshipDef) {
createEdgeLabels(management, (AtlasRelationshipDef) typeDef);
}
}
}
private void createEdgeLabelsForStruct(AtlasGraphManagement management, AtlasEntityDef entityDef) {
try {
AtlasType type = typeRegistry.getType(entityDef.getName());
if (!(type instanceof AtlasEntityType)) {
return;
}
AtlasEntityType entityType = (AtlasEntityType) type;
for (AtlasAttributeDef attributeDef : entityDef.getAttributeDefs()) {
AtlasAttribute attribute = entityType.getAttribute(attributeDef.getName());
if (attribute.getAttributeType().getTypeCategory() == TypeCategory.STRUCT) {
String relationshipLabel = attribute.getRelationshipEdgeLabel();
createEdgeLabelUsingLabelName(management, relationshipLabel);
}
}
} catch (AtlasBaseException e) {
LOG.error("Error fetching type: {}", entityDef.getName(), e);
}
}
private void createEdgeLabels(AtlasGraphManagement management, AtlasRelationshipDef relationshipDef) {
String relationshipTypeName = relationshipDef.getName();
AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationshipTypeName);
String relationshipLabel = relationshipType.getRelationshipLabel();
createEdgeLabelUsingLabelName(management, relationshipLabel);
}
}
......@@ -95,18 +95,13 @@ public final class GraphHelper {
public static final String RETRY_DELAY = "atlas.graph.storage.retry.sleeptime.ms";
public static final String DEFAULT_REMOVE_PROPAGATIONS_ON_ENTITY_DELETE = "atlas.graph.remove.propagations.default";
private final AtlasGremlinQueryProvider queryProvider = AtlasGremlinQueryProvider.INSTANCE;
private static volatile GraphHelper INSTANCE;
private AtlasGraph graph;
private static int maxRetries;
private static long retrySleepTimeMillis;
private static boolean removePropagations;
private int maxRetries = 3;
private long retrySleepTimeMillis = 1000;
private boolean removePropagations = false;
@VisibleForTesting
GraphHelper(AtlasGraph graph) {
public GraphHelper(AtlasGraph graph) {
this.graph = graph;
try {
maxRetries = ApplicationProperties.get().getInt(RETRY_COUNT, 3);
......@@ -117,69 +112,10 @@ public final class GraphHelper {
}
}
public static GraphHelper getInstance() {
if ( INSTANCE == null) {
synchronized (GraphHelper.class) {
if (INSTANCE == null) {
INSTANCE = new GraphHelper(AtlasGraphProvider.getGraphInstance());
}
}
}
return INSTANCE;
}
@VisibleForTesting
static GraphHelper getInstance(AtlasGraph graph) {
if ( INSTANCE == null) {
synchronized (GraphHelper.class) {
if (INSTANCE == null) {
INSTANCE = new GraphHelper(graph);
}
}
}
return INSTANCE;
}
public static boolean isTermEntityEdge(AtlasEdge edge) {
return StringUtils.equals(edge.getLabel(), TERM_ASSIGNMENT_LABEL);
}
public AtlasVertex createVertexWithIdentity(Referenceable typedInstance, Set<String> superTypeNames) {
final String guid = UUID.randomUUID().toString();
final AtlasVertex vertexWithIdentity = createVertexWithoutIdentity(typedInstance.getTypeName(),
new Id(guid, 0, typedInstance.getTypeName()), superTypeNames);
// add identity
AtlasGraphUtilsV2.setEncodedProperty(vertexWithIdentity, Constants.GUID_PROPERTY_KEY, guid);
// add version information
AtlasGraphUtilsV2.setEncodedProperty(vertexWithIdentity, Constants.VERSION_PROPERTY_KEY, Long.valueOf(typedInstance.getId().getVersion()));
return vertexWithIdentity;
}
public AtlasVertex createVertexWithoutIdentity(String typeName, Id typedInstanceId, Set<String> superTypeNames) {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating AtlasVertex for type {} id {}", typeName, typedInstanceId != null ? typedInstanceId._getId() : null);
}
final AtlasVertex ret = graph.addVertex();
AtlasGraphUtilsV2.setEncodedProperty(ret, ENTITY_TYPE_PROPERTY_KEY, typeName);
AtlasGraphUtilsV2.setEncodedProperty(ret, STATE_PROPERTY_KEY, ACTIVE.name());
AtlasGraphUtilsV2.setEncodedProperty(ret, TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
AtlasGraphUtilsV2.setEncodedProperty(ret, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
AtlasGraphUtilsV2.setEncodedProperty(ret, CREATED_BY_KEY, RequestContext.get().getUser());
AtlasGraphUtilsV2.setEncodedProperty(ret, MODIFIED_BY_KEY, RequestContext.get().getUser());
for (String superTypeName : superTypeNames) {
AtlasGraphUtilsV2.addEncodedProperty(ret, SUPER_TYPES_PROPERTY_KEY, superTypeName);
}
return ret;
}
public AtlasEdge addClassificationEdge(AtlasVertex entityVertex, AtlasVertex classificationVertex, boolean isPropagated) {
AtlasEdge ret = addEdge(entityVertex, classificationVertex, CLASSIFICATION_LABEL);
......@@ -1193,6 +1129,14 @@ public final class GraphHelper {
return ret;
}
public AtlasGraph getGraph() {
return this.graph;
}
public Boolean getDefaultRemovePropagations() {
return this.removePropagations;
}
/**
* Guid and AtlasVertex combo
*/
......@@ -1663,10 +1607,6 @@ public final class GraphHelper {
return StringUtils.equals(getGuid(vertexB), getGuid(vertexA));
}
public static boolean getDefaultRemovePropagations() {
return removePropagations;
}
public static String getDelimitedClassificationNames(Collection<String> classificationNames) {
String ret = null;
......
......@@ -66,14 +66,14 @@ public class ExportService {
private final HdfsPathEntityCreator hdfsPathEntityCreator;
@Inject
public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph,
public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph graph,
AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator) {
this.typeRegistry = typeRegistry;
this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry);
this.entityGraphRetriever = new EntityGraphRetriever(graph, this.typeRegistry);
this.auditsWriter = auditsWriter;
this.hdfsPathEntityCreator = hdfsPathEntityCreator;
this.startEntityFetchByExportRequest = new StartEntityFetchByExportRequest(atlasGraph, typeRegistry, AtlasGremlinQueryProvider.INSTANCE);
this.entitiesExtractor = new EntitiesExtractor(atlasGraph, typeRegistry);
this.startEntityFetchByExportRequest = new StartEntityFetchByExportRequest(graph, typeRegistry, AtlasGremlinQueryProvider.INSTANCE);
this.entitiesExtractor = new EntitiesExtractor(graph, typeRegistry);
}
public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName,
......
......@@ -19,6 +19,7 @@
package org.apache.atlas.repository.store.graph.v1;
import org.apache.atlas.RequestContext;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.store.DeleteType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.AtlasRepositoryConfiguration;
......@@ -37,11 +38,13 @@ public class DeleteHandlerDelegate {
private final SoftDeleteHandlerV1 softDeleteHandler;
private final HardDeleteHandlerV1 hardDeleteHandler;
private final DeleteHandlerV1 defaultHandler;
private final AtlasGraph atlasGraph;
@Inject
public DeleteHandlerDelegate(AtlasTypeRegistry typeRegistry) {
this.softDeleteHandler = new SoftDeleteHandlerV1(typeRegistry);
this.hardDeleteHandler = new HardDeleteHandlerV1(typeRegistry);
public DeleteHandlerDelegate(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry) {
this.atlasGraph = atlasGraph;
this.softDeleteHandler = new SoftDeleteHandlerV1(atlasGraph, typeRegistry);
this.hardDeleteHandler = new HardDeleteHandlerV1(atlasGraph, typeRegistry);
this.defaultHandler = getDefaultConfiguredHandler(typeRegistry);
}
......@@ -74,7 +77,7 @@ public class DeleteHandlerDelegate {
LOG.info("Default delete handler set to: {}", handlerFromProperties.getName());
ret = (DeleteHandlerV1) handlerFromProperties.getConstructor(AtlasTypeRegistry.class).newInstance(typeRegistry);
ret = (DeleteHandlerV1) handlerFromProperties.getConstructor(AtlasGraph.class, AtlasTypeRegistry.class).newInstance(this.atlasGraph, typeRegistry);
} catch (Exception ex) {
LOG.error("Error instantiating default delete handler. Defaulting to: {}", softDeleteHandler.getClass().getName(), ex);
......
......@@ -32,6 +32,7 @@ import org.apache.atlas.repository.graph.AtlasEdgeLabel;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
......@@ -62,15 +63,17 @@ import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelation
public abstract class DeleteHandlerV1 {
public static final Logger LOG = LoggerFactory.getLogger(DeleteHandlerV1.class);
protected static final GraphHelper graphHelper = GraphHelper.getInstance();
protected final GraphHelper graphHelper;
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphRetriever entityRetriever;
private final boolean shouldUpdateInverseReferences;
private final boolean softDelete;
public DeleteHandlerV1(AtlasTypeRegistry typeRegistry, boolean shouldUpdateInverseReference, boolean softDelete) {
public DeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry, boolean shouldUpdateInverseReference, boolean softDelete) {
this.typeRegistry = typeRegistry;
this.entityRetriever = new EntityGraphRetriever(typeRegistry);
this.graphHelper = new GraphHelper(graph);
this.entityRetriever = new EntityGraphRetriever(graph, typeRegistry);
this.shouldUpdateInverseReferences = shouldUpdateInverseReference;
this.softDelete = softDelete;
}
......@@ -210,7 +213,7 @@ public abstract class DeleteHandlerV1 {
if (attributeInfo.getAttributeDef().isSoftReferenced()) {
String softRefVal = vertex.getProperty(attributeInfo.getVertexPropertyName(), String.class);
AtlasObjectId refObjId = AtlasEntityUtil.parseSoftRefValue(softRefVal);
AtlasVertex refVertex = refObjId != null ? AtlasGraphUtilsV2.findByGuid(refObjId.getGuid()) : null;
AtlasVertex refVertex = refObjId != null ? AtlasGraphUtilsV2.findByGuid(this.graphHelper.getGraph(), refObjId.getGuid()) : null;
if (refVertex != null) {
vertices.push(refVertex);
......@@ -244,7 +247,7 @@ public abstract class DeleteHandlerV1 {
if (CollectionUtils.isNotEmpty(refObjIds)) {
for (AtlasObjectId refObjId : refObjIds) {
AtlasVertex refVertex = AtlasGraphUtilsV2.findByGuid(refObjId.getGuid());
AtlasVertex refVertex = AtlasGraphUtilsV2.findByGuid(this.graphHelper.getGraph(), refObjId.getGuid());
if (refVertex != null) {
vertices.push(refVertex);
......@@ -257,7 +260,7 @@ public abstract class DeleteHandlerV1 {
if (MapUtils.isNotEmpty(refObjIds)) {
for (AtlasObjectId refObjId : refObjIds.values()) {
AtlasVertex refVertex = AtlasGraphUtilsV2.findByGuid(refObjId.getGuid());
AtlasVertex refVertex = AtlasGraphUtilsV2.findByGuid(this.graphHelper.getGraph(), refObjId.getGuid());
if (refVertex != null) {
vertices.push(refVertex);
......
......@@ -22,6 +22,7 @@ import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.springframework.stereotype.Component;
......@@ -33,8 +34,8 @@ import javax.inject.Inject;
public class HardDeleteHandlerV1 extends DeleteHandlerV1 {
@Inject
public HardDeleteHandlerV1(AtlasTypeRegistry typeRegistry) {
super(typeRegistry, true, false);
public HardDeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
super(graph, typeRegistry, true, false);
}
@Override
......
......@@ -22,6 +22,7 @@ import org.apache.atlas.RequestContext;
import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.type.AtlasTypeRegistry;
......@@ -36,8 +37,8 @@ import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
public class SoftDeleteHandlerV1 extends DeleteHandlerV1 {
@Inject
public SoftDeleteHandlerV1(AtlasTypeRegistry typeRegistry) {
super(typeRegistry, false, true);
public SoftDeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
super(graph, typeRegistry, false, true);
}
@Override
......
......@@ -24,6 +24,7 @@ import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
import org.apache.atlas.repository.store.graph.EntityResolver;
......@@ -56,11 +57,13 @@ import static org.apache.atlas.repository.store.graph.v2.EntityGraphMapper.valid
public class AtlasEntityGraphDiscoveryV2 implements EntityGraphDiscovery {
private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityGraphDiscoveryV2.class);
private final AtlasGraph graph;
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphDiscoveryContext discoveryContext;
private final EntityGraphMapper entityGraphMapper;
public AtlasEntityGraphDiscoveryV2(AtlasTypeRegistry typeRegistry, EntityStream entityStream, EntityGraphMapper entityGraphMapper) {
public AtlasEntityGraphDiscoveryV2(AtlasGraph graph, AtlasTypeRegistry typeRegistry, EntityStream entityStream, EntityGraphMapper entityGraphMapper) {
this.graph = graph;
this.typeRegistry = typeRegistry;
this.discoveryContext = new EntityGraphDiscoveryContext(typeRegistry, entityStream);
this.entityGraphMapper = entityGraphMapper;
......@@ -189,8 +192,8 @@ public class AtlasEntityGraphDiscoveryV2 implements EntityGraphDiscovery {
protected void resolveReferences() throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("resolveReferences");
EntityResolver[] entityResolvers = new EntityResolver[] { new IDBasedEntityResolver(typeRegistry, entityGraphMapper),
new UniqAttrBasedEntityResolver(typeRegistry, entityGraphMapper)
EntityResolver[] entityResolvers = new EntityResolver[] { new IDBasedEntityResolver(this.graph, typeRegistry),
new UniqAttrBasedEntityResolver(this.graph, typeRegistry, entityGraphMapper)
};
for (EntityResolver resolver : entityResolvers) {
......
......@@ -40,6 +40,7 @@ import org.apache.atlas.repository.RepositoryException;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
......@@ -91,20 +92,23 @@ import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getTy
@Component
public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
private static final Logger LOG = LoggerFactory.getLogger(AtlasRelationshipStoreV2.class);
private static final Long DEFAULT_RELATIONSHIP_VERSION = 0L;
private final AtlasGraph graph;
private boolean notificationsEnabled = NOTIFICATION_RELATIONSHIPS_ENABLED.getBoolean();
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphRetriever entityRetriever;
private final DeleteHandlerDelegate deleteDelegate;
private final GraphHelper graphHelper = GraphHelper.getInstance();
private final GraphHelper graphHelper;
private final IAtlasEntityChangeNotifier entityChangeNotifier;
@Inject
public AtlasRelationshipStoreV2(AtlasTypeRegistry typeRegistry, DeleteHandlerDelegate deleteDelegate, IAtlasEntityChangeNotifier entityChangeNotifier) {
public AtlasRelationshipStoreV2(AtlasGraph graph, AtlasTypeRegistry typeRegistry, DeleteHandlerDelegate deleteDelegate, IAtlasEntityChangeNotifier entityChangeNotifier) {
this.graph = graph;
this.typeRegistry = typeRegistry;
this.entityRetriever = new EntityGraphRetriever(typeRegistry);
this.graphHelper = new GraphHelper(graph);
this.entityRetriever = new EntityGraphRetriever(graph, typeRegistry);
this.deleteDelegate = deleteDelegate;
this.entityChangeNotifier = entityChangeNotifier;
}
......@@ -274,7 +278,7 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_ALREADY_DELETED, guid);
}
String relationShipType = GraphHelper.getTypeName(edge);
String relationShipType = graphHelper.getTypeName(edge);
AtlasEntityHeader end1Entity = entityRetriever.toAtlasEntityHeaderWithClassifications(edge.getOutVertex());
AtlasEntityHeader end2Entity = entityRetriever.toAtlasEntityHeaderWithClassifications(edge.getInVertex());
......@@ -721,7 +725,7 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
String guid = end.getGuid();
String typeName = end.getTypeName();
Map<String, Object> uniqueAttributes = end.getUniqueAttributes();
AtlasVertex endVertex = AtlasGraphUtilsV2.findByGuid(guid);
AtlasVertex endVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid);
if (!AtlasTypeUtil.isValidGuid(guid) || endVertex == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
......@@ -729,7 +733,7 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
} else if (MapUtils.isNotEmpty(uniqueAttributes)) {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
if (AtlasGraphUtilsV2.findByUniqueAttributes(entityType, uniqueAttributes) == null) {
if (AtlasGraphUtilsV2.findByUniqueAttributes(this.graph, entityType, uniqueAttributes) == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, typeName, uniqueAttributes.toString());
}
} else {
......@@ -801,11 +805,11 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
AtlasVertex ret = null;
if (StringUtils.isNotEmpty(endPoint.getGuid())) {
ret = AtlasGraphUtilsV2.findByGuid(endPoint.getGuid());
ret = AtlasGraphUtilsV2.findByGuid(this.graph, endPoint.getGuid());
} else if (StringUtils.isNotEmpty(endPoint.getTypeName()) && MapUtils.isNotEmpty(endPoint.getUniqueAttributes())) {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(endPoint.getTypeName());
ret = AtlasGraphUtilsV2.findByUniqueAttributes(entityType, endPoint.getUniqueAttributes());
ret = AtlasGraphUtilsV2.findByUniqueAttributes(this.graph, entityType, endPoint.getUniqueAttributes());
}
return ret;
......@@ -888,7 +892,7 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
String typeName = objectId.getTypeName();
if (StringUtils.isBlank(typeName)) {
typeName = AtlasGraphUtilsV2.getTypeNameFromGuid(objectId.getGuid());
typeName = AtlasGraphUtilsV2.getTypeNameFromGuid(this.graph, objectId.getGuid());
}
return typeName;
......
......@@ -26,6 +26,7 @@ import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.BulkImporter;
......@@ -51,10 +52,12 @@ public class BulkImporterImpl implements BulkImporter {
private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2.class);
private final AtlasEntityStore entityStore;
private final AtlasGraph atlasGraph;
private AtlasTypeRegistry typeRegistry;
@Inject
public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
public BulkImporterImpl(AtlasGraph atlasGraph, AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
this.atlasGraph = atlasGraph;
this.entityStore = entityStore;
this.typeRegistry = typeRegistry;
}
......@@ -65,9 +68,9 @@ public class BulkImporterImpl implements BulkImporter {
if (importResult.getRequest().getOptions() != null &&
importResult.getRequest().getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION)) {
importStrategy = new MigrationImport(new AtlasGraphProvider(), this.typeRegistry);
importStrategy = new MigrationImport(this.atlasGraph, new AtlasGraphProvider(), this.typeRegistry);
} else {
importStrategy = new RegularImport(this.entityStore, this.typeRegistry);
importStrategy = new RegularImport(this.atlasGraph, this.entityStore, this.typeRegistry);
}
LOG.info("BulkImportImpl: {}", importStrategy.getClass().getSimpleName());
......@@ -121,14 +124,14 @@ public class BulkImporterImpl implements BulkImporter {
}
}
public static void updateVertexGuid(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityGraphRetriever, AtlasEntity entity) {
public static void updateVertexGuid(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityGraphRetriever, AtlasEntity entity) {
String entityGuid = entity.getGuid();
AtlasObjectId objectId = entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
String vertexGuid = null;
try {
vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, objectId.getUniqueAttributes());
vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(atlasGraph, entityType, objectId.getUniqueAttributes());
} catch (AtlasBaseException e) {
LOG.warn("Entity: {}: Does not exist!", objectId);
return;
......@@ -138,7 +141,7 @@ public class BulkImporterImpl implements BulkImporter {
return;
}
AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid);
AtlasVertex v = AtlasGraphUtilsV2.findByGuid(atlasGraph, vertexGuid);
if (v == null) {
return;
}
......
......@@ -23,6 +23,8 @@ import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasEntityHeaders;
import org.apache.atlas.repository.audit.EntityAuditRepository;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.type.AtlasEntityType;
......@@ -49,11 +51,15 @@ public class ClassificationAssociator {
private final EntityAuditRepository auditRepository;
private final EntityGraphRetriever entityRetriever;
public Retriever(AtlasTypeRegistry typeRegistry, EntityAuditRepository auditRepository) {
this.entityRetriever = new EntityGraphRetriever(typeRegistry);
public Retriever(AtlasGraph graph, AtlasTypeRegistry typeRegistry, EntityAuditRepository auditRepository) {
this.entityRetriever = new EntityGraphRetriever(graph, typeRegistry);
this.auditRepository = auditRepository;
}
public Retriever(AtlasTypeRegistry typeRegistry, EntityAuditRepository auditRepository) {
this(AtlasGraphProvider.getGraphInstance(), typeRegistry, auditRepository);
}
Retriever(EntityGraphRetriever entityGraphRetriever, EntityAuditRepository auditRepository) {
this.entityRetriever = entityGraphRetriever;
this.auditRepository = auditRepository;
......@@ -104,16 +110,21 @@ public class ClassificationAssociator {
static final String PROCESS_DELETE = "Delete";
static final String JSONIFY_STRING_FORMAT = "\"%s\",";
private final AtlasGraph graph;
private final AtlasTypeRegistry typeRegistry;
private final AtlasEntityStore entitiesStore;
private final EntityGraphRetriever entityRetriever;
private StringBuilder actionSummary = new StringBuilder();
private Map<String, String> typeNameUniqueAttributeNameMap = new HashMap<>();
private final StringBuilder actionSummary = new StringBuilder();
public Updater(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore) {
public Updater(AtlasGraph graph, AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore) {
this.graph = graph;
this.typeRegistry = typeRegistry;
this.entitiesStore = entitiesStore;
entityRetriever = new EntityGraphRetriever(typeRegistry);
entityRetriever = new EntityGraphRetriever(graph, typeRegistry);
}
public Updater(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore) {
this(AtlasGraphProvider.getGraphInstance(), typeRegistry, entitiesStore);
}
public String setClassifications(Map<String, AtlasEntityHeader> map) {
......@@ -246,7 +257,7 @@ public class ClassificationAssociator {
AtlasEntityHeader getByUniqueAttributes(AtlasEntityType entityType, String qualifiedName, Map<String, Object> attrValues) {
try {
AtlasVertex vertex = AtlasGraphUtilsV2.findByUniqueAttributes(entityType, attrValues);
AtlasVertex vertex = AtlasGraphUtilsV2.findByUniqueAttributes(this.graph, entityType, attrValues);
if (vertex == null) {
return null;
}
......
......@@ -43,6 +43,7 @@ import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasElement;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType;
......@@ -131,18 +132,21 @@ public class EntityGraphRetriever {
public static final String QUALIFIED_NAME = "qualifiedName";
private static final TypeReference<List<TimeBoundary>> TIME_BOUNDARIES_LIST_TYPE = new TypeReference<List<TimeBoundary>>() {};
private static final GraphHelper graphHelper = GraphHelper.getInstance();
private final GraphHelper graphHelper;
private final AtlasTypeRegistry typeRegistry;
private final boolean ignoreRelationshipAttr;
private final AtlasGraph graph;
@Inject
public EntityGraphRetriever(AtlasTypeRegistry typeRegistry) {
this(typeRegistry, false);
public EntityGraphRetriever(AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
this(graph, typeRegistry, false);
}
public EntityGraphRetriever(AtlasTypeRegistry typeRegistry, boolean ignoreRelationshipAttr) {
public EntityGraphRetriever(AtlasGraph graph, AtlasTypeRegistry typeRegistry, boolean ignoreRelationshipAttr) {
this.graph = graph;
this.graphHelper = new GraphHelper(graph);
this.typeRegistry = typeRegistry;
this.ignoreRelationshipAttr = ignoreRelationshipAttr;
}
......@@ -365,7 +369,7 @@ public class EntityGraphRetriever {
}
public AtlasVertex getEntityVertex(String guid) throws AtlasBaseException {
AtlasVertex ret = AtlasGraphUtilsV2.findByGuid(guid);
AtlasVertex ret = AtlasGraphUtilsV2.findByGuid(this.graph, guid);
if (ret == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
......@@ -419,7 +423,7 @@ public class EntityGraphRetriever {
if (entityType != null) {
for (Map<String, Object> uniqAttributes : uniqueAttributesList) {
try {
AtlasVertex vertex = AtlasGraphUtilsV2.getVertexByUniqueAttributes(entityType, uniqAttributes);
AtlasVertex vertex = AtlasGraphUtilsV2.getVertexByUniqueAttributes(this.graph, entityType, uniqAttributes);
if (vertex != null) {
AtlasEntity entity = mapVertexToAtlasEntity(vertex, ret, isMinExtInfo);
......@@ -442,7 +446,7 @@ public class EntityGraphRetriever {
public void evaluateClassificationPropagation(AtlasVertex classificationVertex, List<AtlasVertex> entitiesToAddPropagation, List<AtlasVertex> entitiesToRemovePropagation) {
if (classificationVertex != null) {
String entityGuid = getClassificationEntityGuid(classificationVertex);
AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(entityGuid);
AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(this.graph, entityGuid);
String classificationId = classificationVertex.getIdForDisplay();
List<AtlasVertex> propagatedEntities = getAllPropagatedEntityVertices(classificationVertex);
List<AtlasVertex> impactedEntities = getImpactedVerticesV2(entityVertex, null, classificationId);
......@@ -472,7 +476,7 @@ public class EntityGraphRetriever {
for (AtlasVertex classificationVertex : classificationVertices) {
String classificationId = classificationVertex.getIdForDisplay();
String sourceEntityId = getClassificationEntityGuid(classificationVertex);
AtlasVertex sourceEntityVertex = AtlasGraphUtilsV2.findByGuid(sourceEntityId);
AtlasVertex sourceEntityVertex = AtlasGraphUtilsV2.findByGuid(this.graph, sourceEntityId);
List<AtlasVertex> entitiesPropagatingTo = getImpactedVerticesV2(sourceEntityVertex, relationshipGuidToExclude, classificationId);
ret.put(classificationVertex, entitiesPropagatingTo);
......@@ -582,12 +586,12 @@ public class EntityGraphRetriever {
}
if (AtlasTypeUtil.isAssignedGuid(objId)) {
ret = AtlasGraphUtilsV2.findByGuid(objId.getGuid());
ret = AtlasGraphUtilsV2.findByGuid(this.graph, objId.getGuid());
} else {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(objId.getTypeName());
Map<String, Object> uniqAttributes = objId.getUniqueAttributes();
ret = AtlasGraphUtilsV2.getVertexByUniqueAttributes(entityType, uniqAttributes);
ret = AtlasGraphUtilsV2.getVertexByUniqueAttributes(this.graph, entityType, uniqAttributes);
}
if (ret == null) {
......@@ -828,10 +832,10 @@ public class EntityGraphRetriever {
return ret;
}
public List<AtlasTermAssignmentHeader> mapAssignedTerms(AtlasVertex entityVertex) throws AtlasBaseException {
public List<AtlasTermAssignmentHeader> mapAssignedTerms(AtlasVertex entityVertex) {
List<AtlasTermAssignmentHeader> ret = new ArrayList<>();
Iterable edges = entityVertex.query().direction(AtlasEdgeDirection.IN).label(TERM_ASSIGNMENT_LABEL).edges();
Iterable edges = entityVertex.query().direction(AtlasEdgeDirection.IN).label(Constants.INTERNAL_PROPERTY_KEY_PREFIX + TERM_ASSIGNMENT_LABEL).edges();
if (edges != null) {
for (final AtlasEdge edge : (Iterable<AtlasEdge>) edges) {
......
......@@ -28,13 +28,13 @@ import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
......@@ -55,13 +55,15 @@ import static org.apache.atlas.repository.graph.GraphHelper.getDelimitedClassifi
public final class EntityStateChecker {
private static final Logger LOG = LoggerFactory.getLogger(EntityStateChecker.class);
private final AtlasGraph graph;
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphRetriever entityRetriever;
@Inject
public EntityStateChecker(AtlasTypeRegistry typeRegistry) {
public EntityStateChecker(AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
this.graph = graph;
this.typeRegistry = typeRegistry;
this.entityRetriever = new EntityGraphRetriever(typeRegistry);
this.entityRetriever = new EntityGraphRetriever(graph, typeRegistry);
}
......@@ -140,7 +142,7 @@ public final class EntityStateChecker {
* @throws AtlasBaseException
*/
public AtlasEntityState checkEntityState(String guid, boolean fixIssues, AtlasCheckStateResult result) throws AtlasBaseException {
AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(guid);
AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid);
if (entityVertex == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
......
......@@ -22,6 +22,7 @@ import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
import org.apache.atlas.repository.store.graph.EntityResolver;
......@@ -35,12 +36,12 @@ import org.slf4j.LoggerFactory;
public class IDBasedEntityResolver implements EntityResolver {
private static final Logger LOG = LoggerFactory.getLogger(IDBasedEntityResolver.class);
private final AtlasGraph graph;
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphMapper entityGraphMapper;
public IDBasedEntityResolver(AtlasTypeRegistry typeRegistry, EntityGraphMapper entityGraphMapper) {
public IDBasedEntityResolver(AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
this.graph = graph;
this.typeRegistry = typeRegistry;
this.entityGraphMapper = entityGraphMapper;
}
public EntityGraphDiscoveryContext resolveEntityReferences(EntityGraphDiscoveryContext context) throws AtlasBaseException {
......@@ -52,7 +53,7 @@ public class IDBasedEntityResolver implements EntityResolver {
for (String guid : context.getReferencedGuids()) {
boolean isAssignedGuid = AtlasTypeUtil.isAssignedGuid(guid);
AtlasVertex vertex = isAssignedGuid ? AtlasGraphUtilsV2.findByGuid(guid) : null;
AtlasVertex vertex = isAssignedGuid ? AtlasGraphUtilsV2.findByGuid(this.graph, guid) : null;
if (vertex == null && !RequestContext.get().isImportInProgress()) { // if not found in the store, look if the entity is present in the stream
AtlasEntity entity = entityStream.getByGuid(guid);
......@@ -64,7 +65,7 @@ public class IDBasedEntityResolver implements EntityResolver {
throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName());
}
vertex = AtlasGraphUtilsV2.findByUniqueAttributes(entityType, entity.getAttributes());
vertex = AtlasGraphUtilsV2.findByUniqueAttributes(this.graph, entityType, entity.getAttributes());
} else if (!isAssignedGuid) { // for local-guids, entity must be in the stream
throw new AtlasBaseException(AtlasErrorCode.REFERENCED_ENTITY_NOT_FOUND, guid);
}
......
......@@ -22,6 +22,7 @@ import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
import org.apache.atlas.repository.store.graph.EntityResolver;
......@@ -36,10 +37,12 @@ import java.util.List;
public class UniqAttrBasedEntityResolver implements EntityResolver {
private static final Logger LOG = LoggerFactory.getLogger(UniqAttrBasedEntityResolver.class);
private final AtlasGraph graph;
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphMapper entityGraphMapper;
public UniqAttrBasedEntityResolver(AtlasTypeRegistry typeRegistry, EntityGraphMapper entityGraphMapper) {
public UniqAttrBasedEntityResolver(AtlasGraph graph, AtlasTypeRegistry typeRegistry, EntityGraphMapper entityGraphMapper) {
this.graph = graph;
this.typeRegistry = typeRegistry;
this.entityGraphMapper = entityGraphMapper;
}
......@@ -61,7 +64,7 @@ public class UniqAttrBasedEntityResolver implements EntityResolver {
throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), objId.getTypeName());
}
AtlasVertex vertex = AtlasGraphUtilsV2.findByUniqueAttributes(entityType, objId.getUniqueAttributes());
AtlasVertex vertex = AtlasGraphUtilsV2.findByUniqueAttributes(this.graph, entityType, objId.getUniqueAttributes());
if (vertex == null && RequestContext.get().isCreateShellEntityForNonExistingReference()) {
vertex = entityGraphMapper.createShellEntityVertex(objId, context);
......
......@@ -28,7 +28,6 @@ import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.migration.DataMigrationStatusService;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
......@@ -46,15 +45,14 @@ import org.slf4j.LoggerFactory;
public class MigrationImport extends ImportStrategy {
private static final Logger LOG = LoggerFactory.getLogger(MigrationImport.class);
private final AtlasGraph graph;
private final AtlasGraphProvider graphProvider;
private final AtlasTypeRegistry typeRegistry;
private AtlasGraph atlasGraph;
private EntityGraphRetriever entityGraphRetriever;
private EntityGraphMapper entityGraphMapper;
private AtlasEntityStore entityStore;
public MigrationImport(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry) {
public MigrationImport(AtlasGraph graph, AtlasGraphProvider graphProvider, AtlasTypeRegistry typeRegistry) {
this.graph = graph;
this.graphProvider = graphProvider;
this.typeRegistry = typeRegistry;
setupEntityStore(atlasGraphProvider, typeRegistry);
LOG.info("MigrationImport: Using bulkLoading...");
}
......@@ -72,7 +70,7 @@ public class MigrationImport extends ImportStrategy {
long index = 0;
int streamSize = entityStream.size();
EntityMutationResponse ret = new EntityMutationResponse();
EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, importResult, dataMigrationStatusService);
EntityCreationManager creationManager = createEntityCreationManager(importResult, dataMigrationStatusService);
try {
LOG.info("Migration Import: Size: {}: Starting...", streamSize);
......@@ -95,16 +93,24 @@ public class MigrationImport extends ImportStrategy {
return dataMigrationStatusService;
}
private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph,
AtlasImportResult importResult,
private EntityCreationManager createEntityCreationManager(AtlasImportResult importResult,
DataMigrationStatusService dataMigrationStatusService) {
atlasGraph = threadedAtlasGraph;
AtlasGraph graphBulk = graphProvider.getBulkLoading();
EntityGraphRetriever entityGraphRetriever = new EntityGraphRetriever(this.graph, typeRegistry);
EntityGraphRetriever entityGraphRetrieverBulk = new EntityGraphRetriever(graphBulk, typeRegistry);
AtlasEntityStoreV2 entityStore = createEntityStore(this.graph, typeRegistry);
AtlasEntityStoreV2 entityStoreBulk = createEntityStore(graphBulk, typeRegistry);
int batchSize = importResult.getRequest().getOptionKeyBatchSize();
int numWorkers = getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers());
EntityConsumerBuilder consumerBuilder =
new EntityConsumerBuilder(threadedAtlasGraph, entityStore, entityGraphRetriever, typeRegistry, batchSize);
new EntityConsumerBuilder(typeRegistry, this.graph, entityStore, entityGraphRetriever, graphBulk,
entityStoreBulk, entityGraphRetrieverBulk, batchSize);
LOG.info("MigrationImport: EntityCreationManager: Created!");
return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult, dataMigrationStatusService);
}
......@@ -114,17 +120,17 @@ public class MigrationImport extends ImportStrategy {
return ret;
}
private void setupEntityStore(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry) {
this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
this.atlasGraph = atlasGraphProvider.getBulkLoading();
DeleteHandlerDelegate deleteDelegate = new DeleteHandlerDelegate(typeRegistry);
private AtlasEntityStoreV2 createEntityStore(AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
FullTextMapperV2Nop fullTextMapperV2 = new FullTextMapperV2Nop();
IAtlasEntityChangeNotifier entityChangeNotifier = new EntityChangeNotifierNop();
AtlasRelationshipStore relationshipStore = new AtlasRelationshipStoreV2(typeRegistry, deleteDelegate, entityChangeNotifier);
DeleteHandlerDelegate deleteDelegate = new DeleteHandlerDelegate(graph, typeRegistry);
AtlasFormatConverters formatConverters = new AtlasFormatConverters(typeRegistry);
AtlasInstanceConverter instanceConverter = new AtlasInstanceConverter(typeRegistry, formatConverters);
this.entityGraphMapper = new EntityGraphMapper(deleteDelegate, typeRegistry, atlasGraph, relationshipStore, entityChangeNotifier, instanceConverter, new FullTextMapperV2Nop());
this.entityStore = new AtlasEntityStoreV2(deleteDelegate, typeRegistry, entityChangeNotifier, entityGraphMapper);
AtlasInstanceConverter instanceConverter = new AtlasInstanceConverter(graph, typeRegistry, formatConverters);
AtlasRelationshipStore relationshipStore = new AtlasRelationshipStoreV2(graph, typeRegistry, deleteDelegate, entityChangeNotifier);
EntityGraphMapper entityGraphMapper = new EntityGraphMapper(deleteDelegate, typeRegistry, graph, relationshipStore, entityChangeNotifier, instanceConverter, fullTextMapperV2);
return new AtlasEntityStoreV2(graph, deleteDelegate, typeRegistry, entityChangeNotifier, entityGraphMapper);
}
private void shutdownEntityCreationManager(EntityCreationManager creationManager) {
......
......@@ -31,6 +31,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
......@@ -57,15 +58,18 @@ import static org.apache.atlas.repository.store.graph.v2.BulkImporterImpl.update
public class RegularImport extends ImportStrategy {
private static final Logger LOG = LoggerFactory.getLogger(RegularImport.class);
private static final int MAX_ATTEMPTS = 3;
private final AtlasGraph graph;
private final AtlasEntityStore entityStore;
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphRetriever entityGraphRetriever;
private boolean directoryBasedImportConfigured;
public RegularImport(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
public RegularImport(AtlasGraph graph, AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
this.graph = graph;
this.entityStore = entityStore;
this.typeRegistry = typeRegistry;
this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
this.entityGraphRetriever = new EntityGraphRetriever(graph, typeRegistry);
this.directoryBasedImportConfigured = StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
}
......@@ -156,7 +160,7 @@ public class RegularImport extends ImportStrategy {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
String vertexGuid = null;
try {
vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, objectId.getUniqueAttributes());
vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(this.graph, entityType, objectId.getUniqueAttributes());
} catch (AtlasBaseException e) {
LOG.warn("Entity: {}: Does not exist!", objectId);
return;
......@@ -166,7 +170,7 @@ public class RegularImport extends ImportStrategy {
return;
}
AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid);
AtlasVertex v = AtlasGraphUtilsV2.findByGuid(this.graph, vertexGuid);
if (v == null) {
return;
}
......
......@@ -30,7 +30,9 @@ import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport;
import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.EntityStream;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
......@@ -49,23 +51,30 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith
private AtomicLong counter = new AtomicLong(1);
private AtomicLong currentBatch = new AtomicLong(1);
private final AtlasGraph atlasGraph;
private final AtlasEntityStore entityStoreV2;
private AtlasGraph atlasGraph;
private final AtlasEntityStore entityStore;
private final AtlasGraph atlasGraphBulk;
private final AtlasEntityStore entityStoreBulk;
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphRetriever entityGraphRetriever;
private final EntityGraphRetriever entityRetrieverBulk;
private List<AtlasEntity.AtlasEntityWithExtInfo> entityBuffer = new ArrayList<>();
private List<EntityMutationResponse> localResults = new ArrayList<>();
public EntityConsumer(AtlasGraph atlasGraph, AtlasEntityStore entityStore,
EntityGraphRetriever entityGraphRetriever, AtlasTypeRegistry typeRegistry,
public EntityConsumer(AtlasTypeRegistry typeRegistry,
AtlasGraph atlasGraph, AtlasEntityStore entityStore,
AtlasGraph atlasGraphBulk, AtlasEntityStore entityStoreBulk, EntityGraphRetriever entityRetrieverBulk,
BlockingQueue queue, int batchSize) {
super(queue);
this.typeRegistry = typeRegistry;
this.atlasGraph = atlasGraph;
this.entityStoreV2 = entityStore;
this.entityGraphRetriever = entityGraphRetriever;
this.typeRegistry = typeRegistry;
this.entityStore = entityStore;
this.atlasGraphBulk = atlasGraphBulk;
this.entityStoreBulk = entityStoreBulk;
this.entityRetrieverBulk = entityRetrieverBulk;
this.batchSize = batchSize;
}
......@@ -77,7 +86,6 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith
long currentCount = counter.addAndGet(delta);
currentBatch.addAndGet(delta);
entityBuffer.add(entityWithExtInfo);
try {
processEntity(entityWithExtInfo, currentCount);
......@@ -88,24 +96,68 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith
}
private void processEntity(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, long currentCount) {
try {
RequestContext.get().setImportInProgress(true);
RequestContext.get().setCreateShellEntityForNonExistingReference(true);
AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null);
try {
LOG.debug("Processing: {}", currentCount);
EntityMutationResponse result = entityStoreV2.createOrUpdateForImportNoCommit(oneEntityStream);
localResults.add(result);
importUsingBulkEntityStore(entityWithExtInfo);
} catch (IllegalStateException | IllegalArgumentException e) {
LOG.warn("{}: {} - {}", e.getClass().getSimpleName(), entityWithExtInfo.getEntity().getTypeName(), entityWithExtInfo.getEntity().getGuid(), e);
importUsingRegularEntityStore(entityWithExtInfo, e);
} catch (AtlasBaseException e) {
addResult(entityWithExtInfo.getEntity().getGuid());
LOG.warn("Exception: {}", entityWithExtInfo.getEntity().getGuid(), e);
LOG.warn("AtlasBaseException: {} - {}", entityWithExtInfo.getEntity().getTypeName(), entityWithExtInfo.getEntity().getGuid(), e);
} catch (AtlasSchemaViolationException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Entity: {}", entityWithExtInfo.getEntity().getGuid(), e);
}
BulkImporterImpl.updateVertexGuid(typeRegistry, entityGraphRetriever, entityWithExtInfo.getEntity());
BulkImporterImpl.updateVertexGuid(this.atlasGraphBulk, typeRegistry, entityRetrieverBulk, entityWithExtInfo.getEntity());
}
}
private void importUsingBulkEntityStore(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException {
EntityStream oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null);
EntityMutationResponse result = entityStoreBulk.createOrUpdateForImportNoCommit(oneEntityStream);
localResults.add(result);
entityBuffer.add(entityWithExtInfo);
}
private void importUsingRegularEntityStore(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, Exception ex) {
commitValidatedEntities(ex);
performRegularImport(entityWithExtInfo);
}
private void performRegularImport(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
synchronized (atlasGraph) {
try {
LOG.info("Regular: EntityStore: {}: Starting...", this.counter.get());
AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null);
EntityMutationResponse result = this.entityStore.createOrUpdateForImportNoCommit(oneEntityStream);
atlasGraph.commit();
localResults.add(result);
dispatchResults();
} catch (Exception e) {
atlasGraph.rollback();
LOG.error("Regular: EntityStore: Rollback!: Entity creation using regular (non-bulk) failed! Please correct entity and re-submit!", e);
} finally {
LOG.info("Regular: EntityStore: {}: Commit: Done!", this.counter.get());
atlasGraph.commit();
addResult(entityWithExtInfo.getEntity().getGuid());
clear();
LOG.info("Regular: EntityStore: {}: Done!", this.counter.get());
}
}
}
private void commitValidatedEntities(Exception ex) {
try {
LOG.info("Validated Entities: Commit: Starting...");
rollbackPauseRetry(1, ex);
doCommit();
}
finally {
LOG.info("Validated Entities: Commit: Done!");
}
}
......@@ -137,8 +189,10 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith
}
private boolean commitWithRetry(int retryCount) {
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("commitWithRetry");
try {
atlasGraph.commit();
atlasGraphBulk.commit();
if (LOG.isDebugEnabled()) {
LOG.debug("Commit: Done!: Buffer: {}: Batch: {}: Counter: {}", entityBuffer.size(), currentBatch.get(), counter.get());
}
......@@ -148,14 +202,15 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith
} catch (Exception ex) {
rollbackPauseRetry(retryCount, ex);
return false;
} finally {
RequestContext.get().endMetricRecord(metric);
}
}
private void rollbackPauseRetry(int retryCount, Exception ex) {
atlasGraph.rollback();
clearCache();
bulkGraphRollback(retryCount);
LOG.error("Rollback: Done! Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount);
LOG.warn("Rollback: Done! Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount);
pause(retryCount);
String exceptionClass = ex.getClass().getSimpleName();
if (!exceptionClass.equals("JanusGraphException") && !exceptionClass.equals("PermanentLockingException")) {
......@@ -164,14 +219,27 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith
retryProcessEntity(retryCount);
}
private void bulkGraphRollback(int retryCount) {
try {
atlasGraphBulk.rollback();
clearCache();
} catch (Exception e) {
LOG.error("Rollback: Exception! Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount);
}
}
private void retryProcessEntity(int retryCount) {
if (LOG.isDebugEnabled() || retryCount > 1) {
LOG.info("Replaying: Starting!: Buffer: {}: Retry count: {}", entityBuffer.size(), retryCount);
}
for (AtlasEntity.AtlasEntityWithExtInfo e : entityBuffer) {
List<AtlasEntity.AtlasEntityWithExtInfo> localBuffer = new ArrayList<>(entityBuffer);
entityBuffer.clear();
for (AtlasEntity.AtlasEntityWithExtInfo e : localBuffer) {
processEntity(e, counter.get());
}
LOG.info("Replaying: Done!: Buffer: {}: Retry count: {}", entityBuffer.size(), retryCount);
}
......
......@@ -22,6 +22,7 @@ import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.pc.WorkItemBuilder;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.type.AtlasTypeRegistry;
......@@ -30,21 +31,33 @@ import java.util.concurrent.BlockingQueue;
public class EntityConsumerBuilder implements WorkItemBuilder<EntityConsumer, AtlasEntity.AtlasEntityWithExtInfo> {
private AtlasGraph atlasGraph;
private AtlasEntityStore entityStore;
private final EntityGraphRetriever entityGraphRetriever;
private AtlasGraph atlasGraphBulk;
private AtlasEntityStore entityStoreBulk;
private final EntityGraphRetriever entityRetriever;
private final AtlasTypeRegistry typeRegistry;
private EntityGraphRetriever entityRetrieverBulk;
private int batchSize;
public EntityConsumerBuilder(AtlasGraph atlasGraph, AtlasEntityStore entityStore,
EntityGraphRetriever entityGraphRetriever, AtlasTypeRegistry typeRegistry, int batchSize) {
public EntityConsumerBuilder(AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, AtlasEntityStoreV2 entityStore, EntityGraphRetriever entityRetriever,
AtlasGraph atlasGraphBulk, AtlasEntityStoreV2 entityStoreBulk, EntityGraphRetriever entityRetrieverBulk,
int batchSize) {
this.typeRegistry = typeRegistry;
this.atlasGraph = atlasGraph;
this.entityStore = entityStore;
this.entityGraphRetriever = entityGraphRetriever;
this.typeRegistry = typeRegistry;
this.entityRetriever = entityRetriever;
this.atlasGraphBulk = atlasGraphBulk;
this.entityStoreBulk = entityStoreBulk;
this.entityRetrieverBulk = entityRetrieverBulk;
this.batchSize = batchSize;
}
@Override
public EntityConsumer build(BlockingQueue<AtlasEntity.AtlasEntityWithExtInfo> queue) {
return new EntityConsumer(atlasGraph, entityStore, entityGraphRetriever, typeRegistry, queue, this.batchSize);
return new EntityConsumer(typeRegistry, atlasGraph, entityStore,
atlasGraphBulk, entityStoreBulk, entityRetrieverBulk,
queue, this.batchSize);
}
}
......@@ -26,6 +26,7 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.AtlasTestBase;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityChangeNotifier;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
......@@ -66,6 +67,9 @@ public class ExportSkipLineageTest extends AtlasTestBase {
@Inject
ExportService exportService;
@Inject
AtlasGraph atlasGraph;
private DeleteHandlerDelegate deleteDelegate = mock(DeleteHandlerDelegate.class);
private AtlasEntityChangeNotifier mockChangeNotifier = mock(AtlasEntityChangeNotifier.class);
private AtlasEntityStoreV2 entityStore;
......@@ -76,7 +80,7 @@ public class ExportSkipLineageTest extends AtlasTestBase {
loadHiveModel(typeDefStore, typeRegistry);
RequestContext.get().setImportInProgress(true);
entityStore = new AtlasEntityStoreV2(deleteDelegate, typeRegistry, mockChangeNotifier, graphMapper);
entityStore = new AtlasEntityStoreV2(atlasGraph, deleteDelegate, typeRegistry, mockChangeNotifier, graphMapper);
createEntities(entityStore, ENTITIES_SUB_DIR, new String[]{"db", "table-columns", "table-view", "table-table-lineage"});
final String[] entityGuids = {DB_GUID, TABLE_GUID, TABLE_TABLE_GUID, TABLE_VIEW_GUID};
verifyCreatedEntities(entityStore, entityGuids, 4);
......
......@@ -51,9 +51,9 @@ public class HiveParititionTest extends MigrationBaseAsserts {
assertTypeCountNameGuid("hive_db", 1, "parts_db", "ae30d78b-51b4-42ab-9436-8d60c8f68b95");
assertTypeCountNameGuid("hive_process", 1, "", "");
assertEdges("hive_db", "parts_db", AtlasEdgeDirection.IN,1, 1, "");
assertEdges("hive_table", "t1", AtlasEdgeDirection.OUT, 1, 1, "hive_table_db");
assertEdges("hive_table", "tv1", AtlasEdgeDirection.OUT, 1, 1, "hive_table_db");
assertEdges("hive_db", "parts_db", AtlasEdgeDirection.IN, 1, "");
assertEdges("hive_table", "t1", AtlasEdgeDirection.OUT, 1, "hive_table_db");
assertEdges("hive_table", "tv1", AtlasEdgeDirection.OUT, 1, "hive_table_db");
assertMigrationStatus(EXPECTED_TOTAL_COUNT);
}
......
......@@ -59,9 +59,9 @@ public class HiveStocksTest extends MigrationBaseAsserts {
assertTypeCountNameGuid("hive_storagedesc", 1, "", "294290d8-4498-4677-973c-c266d594b039");
assertTypeCountNameGuid("Tag1", 1, "", "");
assertEdges(getVertex("hive_db", "stocks").getEdges(AtlasEdgeDirection.IN).iterator(),1, 1, "");
assertEdges(getVertex("hive_table", "stocks_daily").getEdges(AtlasEdgeDirection.OUT).iterator(), 1, 1, "hive_table_db");
assertEdges(getVertex("hive_column", "high").getEdges(AtlasEdgeDirection.OUT).iterator(), 1,1, "hive_table_columns");
assertEdges(getVertex("hive_db", "stocks").getEdges(AtlasEdgeDirection.IN).iterator(), 1, "");
assertEdges(getVertex("hive_table", "stocks_daily").getEdges(AtlasEdgeDirection.OUT).iterator(), 1, "hive_table_db");
assertEdges(getVertex("hive_column", "high").getEdges(AtlasEdgeDirection.OUT).iterator(), 1, "hive_table_columns");
assertMigrationStatus(EXPECTED_TOTAL_COUNT);
}
......
......@@ -133,25 +133,33 @@ public class MigrationBaseAsserts {
return iterator.hasNext() ? iterator.next() : null;
}
protected void assertEdges(String typeName, String assetName, AtlasEdgeDirection edgeDirection, int startIdx, int expectedItems, String edgeTypeName) {
assertEdges(getVertex(typeName, assetName).getEdges(edgeDirection).iterator(),startIdx, expectedItems, edgeTypeName);
protected void assertEdges(String typeName, String assetName, AtlasEdgeDirection edgeDirection, int expectedItems, String edgeTypeName) {
Iterator edgeIterator = getVertex(typeName, assetName).getEdges(edgeDirection).iterator();
assertEdges(edgeIterator, expectedItems, edgeTypeName);
}
protected void assertEdges(Iterator<AtlasEdge> results, int startIdx, int expectedItems, String edgeTypeName) {
protected void assertEdges(Iterator<AtlasEdge> results, int expectedItems, String edgeTypeNameExpected) {
int count = 0;
AtlasEdge e = null;
for (Iterator<AtlasEdge> it = results; it.hasNext() && count < startIdx; count++) {
boolean searchedEdgeFound = false;
for (Iterator<AtlasEdge> it = results; it.hasNext();) {
e = it.next();
String typeName = AtlasGraphUtilsV2.getEncodedProperty(e, TYPE_NAME_PROPERTY, String.class);
searchedEdgeFound = StringUtils.isEmpty(edgeTypeNameExpected) || typeName.equals(edgeTypeNameExpected);
if (searchedEdgeFound) {
count++;
break;
}
}
assertNotNull(AtlasGraphUtilsV2.getEncodedProperty(e, R_GUID_PROPERTY_NAME, Object.class));
assertNotNull(AtlasGraphUtilsV2.getEncodedProperty(e, "tagPropagation", Object.class));
if(StringUtils.isNotEmpty(edgeTypeName)) {
assertEquals(AtlasGraphUtilsV2.getEncodedProperty(e, TYPE_NAME_PROPERTY, Object.class), edgeTypeName, edgeTypeName);
if(StringUtils.isNotEmpty(edgeTypeNameExpected)) {
assertTrue(searchedEdgeFound, edgeTypeNameExpected);
}
assertEquals(count, expectedItems, String.format("%s", edgeTypeName));
assertEquals(count, expectedItems, String.format("%s", edgeTypeNameExpected));
}
protected void assertEdgesWithLabel(Iterator<AtlasEdge> results, int startIdx, String edgeTypeName) {
......
......@@ -125,7 +125,7 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
}
@BeforeTest
public void init() throws Exception {
entityStore = new AtlasEntityStoreV2(deleteDelegate, typeRegistry, mockChangeNotifier, graphMapper);
entityStore = new AtlasEntityStoreV2(graph, deleteDelegate, typeRegistry, mockChangeNotifier, graphMapper);
RequestContext.clear();
RequestContext.get().setUser(TestUtilsV2.TEST_USER, null);
......
......@@ -107,7 +107,7 @@ public class AtlasEntityTestBase {
@BeforeTest
public void init() throws Exception {
entityStore = new AtlasEntityStoreV2(deleteDelegate, typeRegistry, mockChangeNotifier, graphMapper);
entityStore = new AtlasEntityStoreV2(graph, deleteDelegate, typeRegistry, mockChangeNotifier, graphMapper);
RequestContext.clear();
RequestContext.get().setUser(TestUtilsV2.TEST_USER, null);
......
......@@ -32,6 +32,7 @@ import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
......@@ -86,6 +87,9 @@ public abstract class AtlasRelationshipStoreV2Test {
@Inject
AtlasEntityChangeNotifier entityNotifier;
@Inject
AtlasGraph atlasGraph;
AtlasEntityStore entityStore;
AtlasRelationshipStore relationshipStore;
AtlasEntityChangeNotifier mockChangeNotifier = mock(AtlasEntityChangeNotifier.class);
......@@ -124,8 +128,8 @@ public abstract class AtlasRelationshipStoreV2Test {
@BeforeTest
public void init() throws Exception {
entityStore = new AtlasEntityStoreV2(deleteDelegate, typeRegistry, mockChangeNotifier, graphMapper);
relationshipStore = new AtlasRelationshipStoreV2(typeRegistry, deleteDelegate, entityNotifier);
entityStore = new AtlasEntityStoreV2(atlasGraph, deleteDelegate, typeRegistry, mockChangeNotifier, graphMapper);
relationshipStore = new AtlasRelationshipStoreV2(atlasGraph, typeRegistry, deleteDelegate, entityNotifier);
RequestContext.clear();
RequestContext.get().setUser(TestUtilsV2.TEST_USER, null);
......
......@@ -20,15 +20,13 @@ package org.apache.atlas.repository.store.graph.v2;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasEntityHeaders;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.repository.audit.EntityAuditRepository;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasJson;
import org.apache.atlas.utils.TestResourceFileUtils;
......@@ -37,7 +35,6 @@ import org.elasticsearch.common.util.CollectionUtils;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
......@@ -65,13 +62,13 @@ public class ClassificationAssociatorTest {
private class ClassificationAssociatorUpdaterForSpy extends ClassificationAssociator.Updater {
private final String entityFileName;
public ClassificationAssociatorUpdaterForSpy(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore) {
super(typeRegistry, entitiesStore);
public ClassificationAssociatorUpdaterForSpy(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore) {
super(atlasGraph, typeRegistry, entitiesStore);
this.entityFileName = StringUtils.EMPTY;
}
public ClassificationAssociatorUpdaterForSpy(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore, String entityFileName) {
super(typeRegistry, entitiesStore);
public ClassificationAssociatorUpdaterForSpy(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore, String entityFileName) {
super(atlasGraph, typeRegistry, entitiesStore);
this.entityFileName = entityFileName;
}
......@@ -142,7 +139,8 @@ public class ClassificationAssociatorTest {
AtlasTypeRegistry typeRegistry = mock(AtlasTypeRegistry.class);
when(typeRegistry.getEntityTypeByName(anyString())).thenReturn(null);
ClassificationAssociator.Updater updater = new ClassificationAssociator.Updater(typeRegistry, entitiesStore);
AtlasGraph atlasGraph = mock(AtlasGraph.class);
ClassificationAssociator.Updater updater = new ClassificationAssociator.Updater(atlasGraph, typeRegistry, entitiesStore);
String summary = updater.setClassifications(entityHeaderMap.getGuidHeaderMap());
assertTrue(summary.contains("hive_"));
......@@ -155,11 +153,12 @@ public class ClassificationAssociatorTest {
AtlasEntityType hiveTable = mock(AtlasEntityType.class);
AtlasEntityStore entitiesStore = mock(AtlasEntityStore.class);
AtlasTypeRegistry typeRegistry = mock(AtlasTypeRegistry.class);
AtlasGraph atlasGraph = mock(AtlasGraph.class);
when(typeRegistry.getEntityTypeByName(anyString())).thenReturn(hiveTable);
when(hiveTable.getTypeName()).thenReturn("hive_column");
ClassificationAssociatorUpdaterForSpy updater = new ClassificationAssociatorUpdaterForSpy(typeRegistry, entitiesStore);
ClassificationAssociatorUpdaterForSpy updater = new ClassificationAssociatorUpdaterForSpy(atlasGraph, typeRegistry, entitiesStore);
String summary = updater.setClassifications(entityHeaderMap.getGuidHeaderMap());
TypeReference<String[]> typeReference = new TypeReference<String[]>() {};
......@@ -178,8 +177,9 @@ public class ClassificationAssociatorTest {
AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry();
AtlasTypeRegistry.AtlasTransientTypeRegistry ttr = typeRegistry.lockTypeRegistryForUpdate();
ttr.addTypes(CollectionUtils.newSingletonArrayList(ed));
AtlasGraph atlasGraph = mock(AtlasGraph.class);
ClassificationAssociatorUpdaterForSpy updater = new ClassificationAssociatorUpdaterForSpy(ttr, entitiesStore, "col-entity-PII");
ClassificationAssociatorUpdaterForSpy updater = new ClassificationAssociatorUpdaterForSpy(atlasGraph, ttr, entitiesStore, "col-entity-PII");
String summary = updater.setClassifications(entityHeaderMap.getGuidHeaderMap());
TypeReference<String[]> typeReference = new TypeReference<String[]>() {};
......@@ -233,11 +233,12 @@ public class ClassificationAssociatorTest {
AtlasEntityType hiveTable = mock(AtlasEntityType.class);
AtlasEntityStore entitiesStore = mock(AtlasEntityStore.class);
AtlasTypeRegistry typeRegistry = mock(AtlasTypeRegistry.class);
AtlasGraph atlasGraph = mock(AtlasGraph.class);
when(typeRegistry.getEntityTypeByName(anyString())).thenReturn(hiveTable);
when(hiveTable.getTypeName()).thenReturn("hive_column");
ClassificationAssociatorUpdaterForSpy updater = new ClassificationAssociatorUpdaterForSpy(typeRegistry, entitiesStore, entityFileName);
ClassificationAssociatorUpdaterForSpy updater = new ClassificationAssociatorUpdaterForSpy(atlasGraph, typeRegistry, entitiesStore, entityFileName);
String summary = updater.setClassifications(entityHeaderMap.getGuidHeaderMap());
TypeReference<String[]> typeReference = new TypeReference<String[]>() {};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment