Commit e2ba3051 by Madhan Neethiraj Committed by Sarath Subramanian

ATLAS-3566: improvements in upgrade patches, to avoid full-scan

parent c789f2d7
...@@ -18,11 +18,22 @@ ...@@ -18,11 +18,22 @@
package org.apache.atlas.repository.patches; package org.apache.atlas.repository.patches;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.pc.WorkItemManager;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED; import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
public class ClassificationTextPatch extends AtlasPatchHandler { public class ClassificationTextPatch extends AtlasPatchHandler {
...@@ -57,16 +68,54 @@ public class ClassificationTextPatch extends AtlasPatchHandler { ...@@ -57,16 +68,54 @@ public class ClassificationTextPatch extends AtlasPatchHandler {
} }
@Override @Override
protected void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException { protected void prepareForExecution() {
processItem(vertexId, vertex, typeName, entityType); //do nothing
} }
@Override @Override
protected void prepareForExecution() { public void submitVerticesToUpdate(WorkItemManager manager) {
//do nothing AtlasTypeRegistry typeRegistry = getTypeRegistry();
AtlasGraph graph = getGraph();
Set<Long> vertexIds = new HashSet<>();
for (AtlasClassificationType classificationType : typeRegistry.getAllClassificationTypes()) {
LOG.info("finding classification of type {}", classificationType.getTypeName());
Iterable<AtlasVertex> iterable = graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, classificationType.getTypeName()).vertices();
int count = 0;
for (Iterator<AtlasVertex> iter = iterable.iterator(); iter.hasNext(); ) {
AtlasVertex classificationVertex = iter.next();
Iterable<AtlasEdge> edges = classificationVertex.getEdges(AtlasEdgeDirection.IN);
for (AtlasEdge edge : edges) {
AtlasVertex entityVertex = edge.getOutVertex();
Long vertexId = (Long) entityVertex.getId();
if (vertexIds.contains(vertexId)) {
continue;
}
vertexIds.add(vertexId);
manager.checkProduce(vertexId);
}
count++;
}
LOG.info("found {} classification of type {}", count, classificationType.getTypeName());
}
LOG.info("found {} entities with classifications", vertexIds.size());
}
@Override
protected void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException {
processItem(vertexId, vertex, typeName, entityType);
} }
protected void processItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException { private void processItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("processItem(typeName={}, vertexId={})", typeName, vertexId); LOG.debug("processItem(typeName={}, vertexId={})", typeName, vertexId);
} }
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
package org.apache.atlas.repository.patches; package org.apache.atlas.repository.patches;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.pc.WorkItemBuilder; import org.apache.atlas.pc.WorkItemBuilder;
...@@ -30,47 +29,37 @@ import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; ...@@ -30,47 +29,37 @@ import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
public abstract class ConcurrentPatchProcessor { public abstract class ConcurrentPatchProcessor {
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentPatchProcessor.class); private static final Logger LOG = LoggerFactory.getLogger(ConcurrentPatchProcessor.class);
private static final String NUM_WORKERS_PROPERTY = "atlas.patch.numWorkers"; private static final String NUM_WORKERS_PROPERTY = "atlas.patch.numWorkers";
private static final String BATCH_SIZE_PROPERTY = "atlas.patch.batchSize"; private static final String BATCH_SIZE_PROPERTY = "atlas.patch.batchSize";
private static final String ATLAS_SOLR_SHARDS = "ATLAS_SOLR_SHARDS"; private static final String ATLAS_SOLR_SHARDS = "ATLAS_SOLR_SHARDS";
private static final String WORKER_NAME_PREFIX = "patchWorkItem"; private static final String WORKER_NAME_PREFIX = "patchWorkItem";
private static final int NUM_WORKERS; private static final int NUM_WORKERS;
private static final int BATCH_SIZE; private static final int BATCH_SIZE;
private final EntityGraphMapper entityGraphMapper;
public AtlasGraph getGraph() { private final EntityGraphMapper entityGraphMapper;
return graph; private final AtlasGraph graph;
}
public GraphBackedSearchIndexer getIndexer() {
return indexer;
}
public AtlasTypeRegistry getTypeRegistry() {
return typeRegistry;
}
private final AtlasGraph graph;
private final GraphBackedSearchIndexer indexer; private final GraphBackedSearchIndexer indexer;
private final AtlasTypeRegistry typeRegistry; private final AtlasTypeRegistry typeRegistry;
static { static {
int numWorkers = 3; int numWorkers = 3;
int batchSize = 300; int batchSize = 300;
try { try {
numWorkers = ApplicationProperties.get().getInt(NUM_WORKERS_PROPERTY, getDefaultNumWorkers()); Configuration config = ApplicationProperties.get();
batchSize = ApplicationProperties.get().getInt(BATCH_SIZE_PROPERTY, 300);
numWorkers = config.getInt(NUM_WORKERS_PROPERTY, config.getInt(ATLAS_SOLR_SHARDS, 1) * 3);
batchSize = config.getInt(BATCH_SIZE_PROPERTY, 300);
LOG.info("UniqueAttributePatch: {}={}, {}={}", NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize); LOG.info("UniqueAttributePatch: {}={}, {}={}", NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize);
} catch (Exception e) { } catch (Exception e) {
...@@ -81,10 +70,6 @@ public abstract class ConcurrentPatchProcessor { ...@@ -81,10 +70,6 @@ public abstract class ConcurrentPatchProcessor {
BATCH_SIZE = batchSize; BATCH_SIZE = batchSize;
} }
private static int getDefaultNumWorkers() throws AtlasException {
return ApplicationProperties.get().getInt(ATLAS_SOLR_SHARDS, 1) * 3;
}
public ConcurrentPatchProcessor(PatchContext context) { public ConcurrentPatchProcessor(PatchContext context) {
this.graph = context.getGraph(); this.graph = context.getGraph();
this.indexer = context.getIndexer(); this.indexer = context.getIndexer();
...@@ -95,21 +80,34 @@ public abstract class ConcurrentPatchProcessor { ...@@ -95,21 +80,34 @@ public abstract class ConcurrentPatchProcessor {
public EntityGraphMapper getEntityGraphMapper() { public EntityGraphMapper getEntityGraphMapper() {
return entityGraphMapper; return entityGraphMapper;
} }
public AtlasGraph getGraph() {
return graph;
}
public GraphBackedSearchIndexer getIndexer() {
return indexer;
}
public AtlasTypeRegistry getTypeRegistry() {
return typeRegistry;
}
public void apply() throws AtlasBaseException { public void apply() throws AtlasBaseException {
prepareForExecution(); prepareForExecution();
execute(); execute();
} }
protected abstract void prepareForExecution() throws AtlasBaseException;
protected abstract void submitVerticesToUpdate(WorkItemManager manager);
protected abstract void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException;
private void execute() { private void execute() {
Iterable<Object> iterable = graph.query().vertexIds(); WorkItemManager manager = new WorkItemManager(new ConsumerBuilder(graph, typeRegistry, this),
WorkItemManager manager = new WorkItemManager( WORKER_NAME_PREFIX, BATCH_SIZE, NUM_WORKERS, false);
new ConsumerBuilder(graph, typeRegistry, this), WORKER_NAME_PREFIX,
BATCH_SIZE, NUM_WORKERS, false);
try { try {
for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); ) { submitVerticesToUpdate(manager);
Object vertexId = iter.next();
submitForProcessing((Long) vertexId, manager);
}
manager.drain(); manager.drain();
} finally { } finally {
...@@ -121,10 +119,6 @@ public abstract class ConcurrentPatchProcessor { ...@@ -121,10 +119,6 @@ public abstract class ConcurrentPatchProcessor {
} }
} }
private void submitForProcessing(Long vertexId, WorkItemManager manager) {
manager.checkProduce(vertexId);
}
private static class ConsumerBuilder implements WorkItemBuilder<Consumer, Long> { private static class ConsumerBuilder implements WorkItemBuilder<Consumer, Long> {
private final AtlasTypeRegistry typeRegistry; private final AtlasTypeRegistry typeRegistry;
private final AtlasGraph graph; private final AtlasGraph graph;
...@@ -228,7 +222,4 @@ public abstract class ConcurrentPatchProcessor { ...@@ -228,7 +222,4 @@ public abstract class ConcurrentPatchProcessor {
} }
} }
} }
protected abstract void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException;
protected abstract void prepareForExecution() throws AtlasBaseException;
} }
...@@ -17,25 +17,24 @@ ...@@ -17,25 +17,24 @@
*/ */
package org.apache.atlas.repository.patches; package org.apache.atlas.repository.patches;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.pc.WorkItemManager;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.IndexException; import org.apache.atlas.repository.IndexException;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind; import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind;
import org.apache.atlas.repository.graphdb.AtlasCardinality; import org.apache.atlas.repository.graphdb.*;
import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute; import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED; import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex; import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
...@@ -66,31 +65,37 @@ public class UniqueAttributePatch extends AtlasPatchHandler { ...@@ -66,31 +65,37 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
} }
public static class UniqueAttributePatchProcessor extends ConcurrentPatchProcessor { public static class UniqueAttributePatchProcessor extends ConcurrentPatchProcessor {
private static final String NUM_WORKERS_PROPERTY = "atlas.patch.unique_attribute_patch.numWorkers"; public UniqueAttributePatchProcessor(PatchContext context) {
private static final String BATCH_SIZE_PROPERTY = "atlas.patch.unique_attribute_patch.batchSize"; super(context);
private static final String ATLAS_SOLR_SHARDS = "ATLAS_SOLR_SHARDS"; }
private static final int NUM_WORKERS;
private static final int BATCH_SIZE;
static { @Override
int numWorkers = 3; protected void prepareForExecution() {
int batchSize = 300; //create the new attribute for all unique attributes.
createIndexForUniqueAttributes();
}
try { @Override
numWorkers = ApplicationProperties.get().getInt(NUM_WORKERS_PROPERTY, getDefaultNumWorkers()); public void submitVerticesToUpdate(WorkItemManager manager) {
batchSize = ApplicationProperties.get().getInt(BATCH_SIZE_PROPERTY, 300); AtlasTypeRegistry typeRegistry = getTypeRegistry();
AtlasGraph graph = getGraph();
LOG.info("UniqueAttributePatch: {}={}, {}={}", NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize); for (AtlasEntityType entityType : typeRegistry.getAllEntityTypes()) {
} catch (Exception e) { LOG.info("finding entities of type {}", entityType.getTypeName());
LOG.error("Error retrieving configuration.", e);
}
NUM_WORKERS = numWorkers; Iterable<Object> iterable = graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, entityType.getTypeName()).vertexIds();
BATCH_SIZE = batchSize; int count = 0;
}
public UniqueAttributePatchProcessor(PatchContext context) { for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); ) {
super(context); Object vertexId = iter.next();
manager.checkProduce((Long) vertexId);
count++;
}
LOG.info("found {} entities of type {}", count, entityType.getTypeName());
}
} }
@Override @Override
...@@ -99,12 +104,6 @@ public class UniqueAttributePatch extends AtlasPatchHandler { ...@@ -99,12 +104,6 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
processItem(vertexId, vertex, typeName, entityType); processItem(vertexId, vertex, typeName, entityType);
} }
@Override
protected void prepareForExecution() {
//create the new attribute for all unique attributes.
createIndexForUniqueAttributes();
}
private void createIndexForUniqueAttributes() { private void createIndexForUniqueAttributes() {
for (AtlasEntityType entityType : getTypeRegistry().getAllEntityTypes()) { for (AtlasEntityType entityType : getTypeRegistry().getAllEntityTypes()) {
...@@ -157,10 +156,6 @@ public class UniqueAttributePatch extends AtlasPatchHandler { ...@@ -157,10 +156,6 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
} }
} }
private static int getDefaultNumWorkers() throws AtlasException {
return ApplicationProperties.get().getInt(ATLAS_SOLR_SHARDS, 1) * 3;
}
protected void processItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) { protected void processItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) {
LOG.debug("processItem(typeName={}, vertexId={})", typeName, vertexId); LOG.debug("processItem(typeName={}, vertexId={})", typeName, vertexId);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment