Commit 0bb18f08 by Ashutosh Mestry

ATLAS-3143: PatchFx: Memory usage and performance improvement.

parent efc4bebc
...@@ -96,6 +96,27 @@ public interface AtlasGraphQuery<V, E> { ...@@ -96,6 +96,27 @@ public interface AtlasGraphQuery<V, E> {
*/ */
Iterable<AtlasVertex<V, E>> vertices(int offset, int limit); Iterable<AtlasVertex<V, E>> vertices(int offset, int limit);
/**
* Executes the query and returns IDs of matching vertices.
* @return
*/
Iterable<Object> vertexIds();
/**
* Executes the query and returns IDs of the matching vertices from given offset till the max limit
* @param limit max number of vertices
* @return
*/
Iterable<Object> vertexIds(int limit);
/**
* Executes the query and returns IDs of the matching vertices from given offset till the max limit
* @param offset starting offset
* @param limit max number of vertices
* @return
*/
Iterable<Object> vertexIds(int offset, int limit);
/** /**
* Adds a predicate that the returned vertices must have the specified * Adds a predicate that the returned vertices must have the specified
......
...@@ -77,6 +77,27 @@ public interface NativeTinkerpopGraphQuery<V, E> { ...@@ -77,6 +77,27 @@ public interface NativeTinkerpopGraphQuery<V, E> {
*/ */
Iterable<AtlasVertex<V, E>> vertices(int offset, int limit); Iterable<AtlasVertex<V, E>> vertices(int offset, int limit);
/**
* Executes the graph query.
* @return
*/
Iterable<Object> vertexIds();
/**
* Executes graph query
* @param limit Max vertices to return
* @return
*/
Iterable<Object> vertexIds(int limit);
/**
* Executes graph query
* @param offset Starting offset
* @param limit Max vertices to return
* @return
*/
Iterable<Object> vertexIds(int offset, int limit);
/** /**
* Adds an in condition to the query. * Adds an in condition to the query.
......
...@@ -237,6 +237,63 @@ public abstract class TinkerpopGraphQuery<V, E> implements AtlasGraphQuery<V, E> ...@@ -237,6 +237,63 @@ public abstract class TinkerpopGraphQuery<V, E> implements AtlasGraphQuery<V, E>
} }
@Override @Override
public Iterable<Object> vertexIds() {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing: " + queryCondition);
}
// Compute the overall result by combining the results of all the AndConditions (nested within OR) together.
Set<Object> result = new HashSet<>();
for(AndCondition andExpr : queryCondition.getAndTerms()) {
NativeTinkerpopGraphQuery<V, E> andQuery = andExpr.create(getQueryFactory());
for(Object vertexId : andQuery.vertexIds()) {
result.add(vertexId);
}
}
return result;
}
@Override
public Iterable<Object> vertexIds(int limit) {
return vertexIds(0, limit);
}
@Override
public Iterable<Object> vertexIds(int offset, int limit) {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing: " + queryCondition);
}
Preconditions.checkArgument(offset >= 0, "Offset must be non-negative");
Preconditions.checkArgument(limit >= 0, "Limit must be non-negative");
// Compute the overall result by combining the results of all the AndConditions (nested within OR) together.
Set<Object> result = new HashSet<>();
long resultIdx = 0;
for(AndCondition andExpr : queryCondition.getAndTerms()) {
if (result.size() == limit) {
break;
}
NativeTinkerpopGraphQuery<V, E> andQuery = andExpr.create(getQueryFactory());
for(Object vertexId : andQuery.vertexIds(offset + limit)) {
if (resultIdx >= offset) {
result.add(vertexId);
if (result.size() == limit) {
break;
}
}
resultIdx++;
}
}
return result;
}
@Override
public AtlasGraphQuery<V, E> has(String propertyKey, QueryOperator operator, public AtlasGraphQuery<V, E> has(String propertyKey, QueryOperator operator,
Object value) { Object value) {
queryCondition.andWith(new HasPredicate(propertyKey, operator, value)); queryCondition.andWith(new HasPredicate(propertyKey, operator, value));
......
...@@ -158,6 +158,64 @@ public class NativeJanusGraphQuery implements NativeTinkerpopGraphQuery<AtlasJan ...@@ -158,6 +158,64 @@ public class NativeJanusGraphQuery implements NativeTinkerpopGraphQuery<AtlasJan
} }
@Override @Override
public Iterable<Object> vertexIds() {
Set<Object> result = new HashSet<>();
Iterable<JanusGraphVertex> it = query.vertices();
for (Iterator<? extends Vertex> iter = it.iterator(); iter.hasNext(); ) {
result.add(iter.next().id());
}
return result;
}
@Override
public Iterable<Object> vertexIds(int limit) {
Set<Object> result = new HashSet<>(limit);
Iterable<JanusGraphVertex> it = query.limit(limit).vertices();
if (LOG.isDebugEnabled()) {
if (query instanceof GraphCentricQueryBuilder) {
LOG.debug("NativeJanusGraphQuery.vertices({}): resultSize={}, {}", limit, getCountForDebugLog(it), ((GraphCentricQueryBuilder) query).constructQuery(ElementCategory.VERTEX));
} else {
LOG.debug("NativeJanusGraphQuery.vertices({}): resultSize={}, {}", limit, getCountForDebugLog(it), query);
}
}
for (Iterator<? extends Vertex> iter = it.iterator(); iter.hasNext(); ) {
result.add(iter.next().id());
}
return result;
}
@Override
public Iterable<Object> vertexIds(int offset, int limit) {
Set<Object> result = new HashSet<>(limit);
Iterable<JanusGraphVertex> it = query.limit(offset + limit).vertices();
if (LOG.isDebugEnabled()) {
if (query instanceof GraphCentricQueryBuilder) {
LOG.debug("NativeJanusGraphQuery.vertices({}, {}): resultSize={}, {}", offset, limit, getCountForDebugLog(it), ((GraphCentricQueryBuilder) query).constructQuery(ElementCategory.VERTEX));
} else {
LOG.debug("NativeJanusGraphQuery.vertices({}, {}): resultSize={}, {}", offset, limit, getCountForDebugLog(it), query);
}
}
Iterator<? extends Vertex> iter = it.iterator();
for (long resultIdx = 0; iter.hasNext() && result.size() < limit; resultIdx++) {
if (resultIdx < offset) {
continue;
}
result.add(iter.next().id());
}
return result;
}
@Override
public void in(String propertyName, Collection<? extends Object> values) { public void in(String propertyName, Collection<? extends Object> values) {
query.has(propertyName, Contain.IN, values); query.has(propertyName, Contain.IN, values);
......
...@@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicLong; ...@@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
public abstract class WorkItemConsumer<T> implements Runnable { public abstract class WorkItemConsumer<T> implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(WorkItemConsumer.class); private static final Logger LOG = LoggerFactory.getLogger(WorkItemConsumer.class);
private static final int POLLING_DURATION_SECONDS = 30; private static final int POLLING_DURATION_SECONDS = 5;
private static final int DEFAULT_COMMIT_TIME_IN_MS = 15000; private static final int DEFAULT_COMMIT_TIME_IN_MS = 15000;
private final BlockingQueue<T> queue; private final BlockingQueue<T> queue;
...@@ -50,7 +50,7 @@ public abstract class WorkItemConsumer<T> implements Runnable { ...@@ -50,7 +50,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
T item = queue.poll(POLLING_DURATION_SECONDS, TimeUnit.SECONDS); T item = queue.poll(POLLING_DURATION_SECONDS, TimeUnit.SECONDS);
if (item == null) { if (item == null) {
LOG.warn("WorkItemConsumer.run(): no more items found in the queue. Will exit after committing"); LOG.debug("WorkItemConsumer.run(): no more items found in the queue. Will exit after committing");
commitDirty(); commitDirty();
......
...@@ -41,8 +41,8 @@ public abstract class AtlasPatchHandler { ...@@ -41,8 +41,8 @@ public abstract class AtlasPatchHandler {
private void register() { private void register() {
PatchStatus patchStatus = getStatus(); PatchStatus patchStatus = getStatus();
if (patchStatus == UNKNOWN) { if (patchStatus == null || patchStatus == UNKNOWN) {
patchRegistry.register(patchId, patchDescription, JAVA_PATCH_TYPE, getStatus()); patchRegistry.register(patchId, patchDescription, JAVA_PATCH_TYPE, "apply", UNKNOWN);
} }
} }
......
...@@ -59,8 +59,16 @@ public class AtlasPatchRegistry { ...@@ -59,8 +59,16 @@ public class AtlasPatchRegistry {
private final AtlasGraph graph; private final AtlasGraph graph;
public AtlasPatchRegistry(AtlasGraph graph) { public AtlasPatchRegistry(AtlasGraph graph) {
LOG.info("AtlasPatchRegistry: initializing..");
this.graph = graph; this.graph = graph;
this.patchNameStatusMap = getPatchNameStatusForAllRegistered(graph); this.patchNameStatusMap = getPatchNameStatusForAllRegistered(graph);
LOG.info("AtlasPatchRegistry: found {} patches", patchNameStatusMap.size());
for (Map.Entry<String, PatchStatus> entry : patchNameStatusMap.entrySet()) {
LOG.info("AtlasPatchRegistry: patchId={}, status={}", entry.getKey(), entry.getValue());
}
} }
public boolean isApplicable(String incomingId, String patchFile, int index) { public boolean isApplicable(String incomingId, String patchFile, int index) {
...@@ -83,8 +91,8 @@ public class AtlasPatchRegistry { ...@@ -83,8 +91,8 @@ public class AtlasPatchRegistry {
return patchNameStatusMap.get(id); return patchNameStatusMap.get(id);
} }
public void register(String patchId, String description, String action, PatchStatus patchStatus) { public void register(String patchId, String description, String patchType, String action, PatchStatus patchStatus) {
createOrUpdatePatchVertex(graph, patchId, description, action, patchStatus); createOrUpdatePatchVertex(graph, patchId, description, patchType, action, patchStatus);
} }
public void updateStatus(String patchId, PatchStatus patchStatus) { public void updateStatus(String patchId, PatchStatus patchStatus) {
...@@ -118,14 +126,14 @@ public class AtlasPatchRegistry { ...@@ -118,14 +126,14 @@ public class AtlasPatchRegistry {
return getAllPatches(graph); return getAllPatches(graph);
} }
private void createOrUpdatePatchVertex(AtlasGraph graph, String patchId, private void createOrUpdatePatchVertex(AtlasGraph graph, String patchId, String description,
String description, String action, PatchStatus patchStatus) { String patchType, String action, PatchStatus patchStatus) {
boolean isPatchRegistered = MapUtils.isNotEmpty(patchNameStatusMap) && patchNameStatusMap.containsKey(patchId); boolean isPatchRegistered = MapUtils.isNotEmpty(patchNameStatusMap) && patchNameStatusMap.containsKey(patchId);
AtlasVertex patchVertex = isPatchRegistered ? findByPatchId(patchId) : graph.addVertex(); AtlasVertex patchVertex = isPatchRegistered ? findByPatchId(patchId) : graph.addVertex();
setEncodedProperty(patchVertex, PATCH_ID_PROPERTY_KEY, patchId); setEncodedProperty(patchVertex, PATCH_ID_PROPERTY_KEY, patchId);
setEncodedProperty(patchVertex, PATCH_DESCRIPTION_PROPERTY_KEY, description); setEncodedProperty(patchVertex, PATCH_DESCRIPTION_PROPERTY_KEY, description);
setEncodedProperty(patchVertex, PATCH_TYPE_PROPERTY_KEY, TYPEDEF_PATCH_TYPE); setEncodedProperty(patchVertex, PATCH_TYPE_PROPERTY_KEY, patchType);
setEncodedProperty(patchVertex, PATCH_ACTION_PROPERTY_KEY, action); setEncodedProperty(patchVertex, PATCH_ACTION_PROPERTY_KEY, action);
setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString()); setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString());
setEncodedProperty(patchVertex, TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime()); setEncodedProperty(patchVertex, TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
......
...@@ -27,19 +27,22 @@ import org.apache.atlas.pc.WorkItemManager; ...@@ -27,19 +27,22 @@ import org.apache.atlas.pc.WorkItemManager;
import org.apache.atlas.repository.IndexException; import org.apache.atlas.repository.IndexException;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind; import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind;
import org.apache.atlas.repository.graphdb.*; import org.apache.atlas.repository.graphdb.AtlasCardinality;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute; import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
...@@ -52,72 +55,23 @@ public class UniqueAttributePatch extends AtlasPatchHandler { ...@@ -52,72 +55,23 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
private static final String PATCH_ID = "JAVA_PATCH_0000_001"; private static final String PATCH_ID = "JAVA_PATCH_0000_001";
private static final String PATCH_DESCRIPTION = "Add __u_ property for each unique attribute of active entities"; private static final String PATCH_DESCRIPTION = "Add __u_ property for each unique attribute of active entities";
private final AtlasGraph graph; private final PatchContext context;
private final GraphBackedSearchIndexer indexer;
private final AtlasTypeRegistry typeRegistry;
public UniqueAttributePatch(PatchContext context) { public UniqueAttributePatch(PatchContext context) {
super(context.getPatchRegistry(), PATCH_ID, PATCH_DESCRIPTION); super(context.getPatchRegistry(), PATCH_ID, PATCH_DESCRIPTION);
this.graph = context.getGraph(); this.context = context;
this.indexer = context.getIndexer();
this.typeRegistry = context.getTypeRegistry();
} }
@Override @Override
public void apply() { public void apply() {
TypeNameAttributeCache typeNameAttributeCache = registerUniqueAttributeForTypes(); UniqueAttributePatchProcessor patchProcessor = new UniqueAttributePatchProcessor(context);
UniqueAttributePatchProcessor patchProcessor = new UniqueAttributePatchProcessor(this.graph);
patchProcessor.apply(typeNameAttributeCache.getAll()); patchProcessor.apply();
setStatus(APPLIED); setStatus(APPLIED);
LOG.info("UniqueAttributePatch: {}; status: {}", getPatchId(), getStatus()); LOG.info("UniqueAttributePatch.apply(): patchId={}, status={}", getPatchId(), getStatus());
}
private TypeNameAttributeCache registerUniqueAttributeForTypes() {
TypeNameAttributeCache ret = new TypeNameAttributeCache();
for (AtlasEntityType entityType : typeRegistry.getAllEntityTypes()) {
createIndexForUniqueAttributes(entityType.getTypeName(), entityType.getUniqAttributes().values());
ret.add(entityType, entityType.getUniqAttributes().values());
}
return ret;
}
private boolean createIndexForUniqueAttributes(String typeName, Collection<AtlasAttribute> attributes) {
try {
AtlasGraphManagement management = graph.getManagementSystem();
for (AtlasAttribute attribute : attributes) {
String uniquePropertyName = attribute.getVertexUniquePropertyName();
if (management.getPropertyKey(uniquePropertyName) != null) {
continue;
}
AtlasAttributeDef attributeDef = attribute.getAttributeDef();
boolean isIndexable = attributeDef.getIsIndexable();
String attribTypeName = attributeDef.getTypeName();
Class propertyClass = indexer.getPrimitiveClass(attribTypeName);
AtlasCardinality cardinality = indexer.toAtlasCardinality(attributeDef.getCardinality());
indexer.createVertexIndex(management, uniquePropertyName, UniqueKind.PER_TYPE_UNIQUE, propertyClass, cardinality, isIndexable, true);
}
indexer.commit(management);
graph.commit();
LOG.info("Unique attributes: type: {}: Registered!", typeName);
return true;
} catch (IndexException e) {
LOG.error("Error creating index: type: {}", typeName, e);
return false;
}
} }
public static class UniqueAttributePatchProcessor { public static class UniqueAttributePatchProcessor {
...@@ -127,7 +81,9 @@ public class UniqueAttributePatch extends AtlasPatchHandler { ...@@ -127,7 +81,9 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
private static final int NUM_WORKERS; private static final int NUM_WORKERS;
private static final int BATCH_SIZE; private static final int BATCH_SIZE;
private final AtlasGraph graph; private final AtlasGraph graph;
private final GraphBackedSearchIndexer indexer;
private final AtlasTypeRegistry typeRegistry;
static { static {
int numWorkers = 3; int numWorkers = 3;
...@@ -146,45 +102,78 @@ public class UniqueAttributePatch extends AtlasPatchHandler { ...@@ -146,45 +102,78 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
BATCH_SIZE = batchSize; BATCH_SIZE = batchSize;
} }
public UniqueAttributePatchProcessor(AtlasGraph graph) { public UniqueAttributePatchProcessor(PatchContext context) {
this.graph = graph; this.graph = context.getGraph();
this.indexer = context.getIndexer();
this.typeRegistry = context.getTypeRegistry();
} }
public void apply(final Map<String, Collection<AtlasAttribute>> typeUniqueAttributeCache) { public void apply() {
WorkItemManager manager = null; createIndexForUniqueAttributes();
addUniqueAttributeToAllVertices();
}
private void addUniqueAttributeToAllVertices() {
Iterable<Object> iterable = graph.query().vertexIds();
WorkItemManager manager = new WorkItemManager(new ConsumerBuilder(graph, typeRegistry), BATCH_SIZE, NUM_WORKERS);
try { try {
Iterator<AtlasVertex> iterator = graph.getVertices().iterator(); for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); ) {
Object vertexId = iter.next();
submitForProcessing((Long) vertexId, manager);
}
if (iterator.hasNext()) { manager.drain();
manager = new WorkItemManager<>(new ConsumerBuilder(graph), BATCH_SIZE, NUM_WORKERS); } finally {
try {
manager.shutdown();
} catch (InterruptedException e) {
LOG.error("UniqueAttributePatchProcessor.apply(): interrupted during WorkItemManager shutdown", e);
}
}
}
LOG.info("Processing: Started..."); private void createIndexForUniqueAttributes() {
for (AtlasEntityType entityType : typeRegistry.getAllEntityTypes()) {
while (iterator.hasNext()) { String typeName = entityType.getTypeName();
AtlasVertex vertex = iterator.next(); Collection<AtlasAttribute> uniqAttributes = entityType.getUniqAttributes().values();
if (!AtlasGraphUtilsV2.isEntityVertex(vertex)) { if (CollectionUtils.isEmpty(uniqAttributes)) {
continue; LOG.info("UniqueAttributePatchProcessor.apply(): no unique attribute for entity-type {}", typeName);
}
String typeName = AtlasGraphUtilsV2.getTypeName(vertex); continue;
}
submitForProcessing(typeName, vertex, manager, typeUniqueAttributeCache.get(typeName)); createIndexForUniqueAttributes(typeName, uniqAttributes);
} }
}
manager.drain(); private void createIndexForUniqueAttributes(String typeName, Collection<AtlasAttribute> attributes) {
} try {
} catch (Exception ex) { AtlasGraphManagement management = graph.getManagementSystem();
LOG.error("Error: ", ex);
} finally { for (AtlasAttribute attribute : attributes) {
if (manager != null) { String uniquePropertyName = attribute.getVertexUniquePropertyName();
try {
manager.shutdown(); if (management.getPropertyKey(uniquePropertyName) != null) {
} catch (InterruptedException e) { continue;
LOG.error("Interrupted", e);
} }
AtlasAttributeDef attributeDef = attribute.getAttributeDef();
boolean isIndexable = attributeDef.getIsIndexable();
String attribTypeName = attributeDef.getTypeName();
Class propertyClass = indexer.getPrimitiveClass(attribTypeName);
AtlasCardinality cardinality = indexer.toAtlasCardinality(attributeDef.getCardinality());
indexer.createVertexIndex(management, uniquePropertyName, UniqueKind.PER_TYPE_UNIQUE, propertyClass, cardinality, isIndexable, true);
} }
indexer.commit(management);
graph.commit();
LOG.info("Unique attributes: type: {}: Registered!", typeName);
} catch (IndexException e) {
LOG.error("Error creating index: type: {}", typeName, e);
} }
} }
...@@ -192,35 +181,37 @@ public class UniqueAttributePatch extends AtlasPatchHandler { ...@@ -192,35 +181,37 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
return ApplicationProperties.get().getInt(ATLAS_SOLR_SHARDS, 1) * 3; return ApplicationProperties.get().getInt(ATLAS_SOLR_SHARDS, 1) * 3;
} }
private void submitForProcessing(String typeName, AtlasVertex vertex, WorkItemManager manager, Collection<AtlasAttribute> uniqAttributes) { private void submitForProcessing(Long vertexId, WorkItemManager manager) {
WorkItem workItem = new WorkItem(typeName, (Long) vertex.getId(), uniqAttributes); manager.checkProduce(vertexId);
manager.checkProduce(workItem);
} }
private static class ConsumerBuilder implements WorkItemBuilder<Consumer, Long> {
private final AtlasTypeRegistry typeRegistry;
private final AtlasGraph graph;
private static class WorkItem { public ConsumerBuilder(AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
private final String typeName; this.graph = graph;
private final long id; this.typeRegistry = typeRegistry;
private final Collection<AtlasAttribute> uniqueAttributeValues; }
public WorkItem(String typeName, long id, Collection<AtlasAttribute> uniqueAttributeValues) { @Override
this.typeName = typeName; public Consumer build(BlockingQueue<Long> queue) {
this.id = id; return new Consumer(graph, typeRegistry, queue);
this.uniqueAttributeValues = uniqueAttributeValues;
} }
} }
private static class Consumer extends WorkItemConsumer<WorkItem> { private static class Consumer extends WorkItemConsumer<Long> {
private static int MAX_COMMIT_RETRY_COUNT = 3; private int MAX_COMMIT_RETRY_COUNT = 3;
private final AtlasGraph graph; private final AtlasGraph graph;
private final AtlasTypeRegistry typeRegistry;
private final AtomicLong counter; private final AtomicLong counter;
public Consumer(AtlasGraph graph, BlockingQueue<WorkItem> queue) { public Consumer(AtlasGraph graph, AtlasTypeRegistry typeRegistry, BlockingQueue<Long> queue) {
super(queue); super(queue);
this.graph = graph; this.graph = graph;
this.typeRegistry = typeRegistry;
this.counter = new AtomicLong(0); this.counter = new AtomicLong(0);
} }
...@@ -261,19 +252,11 @@ public class UniqueAttributePatch extends AtlasPatchHandler { ...@@ -261,19 +252,11 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
} }
@Override @Override
protected void processItem(WorkItem wi) { protected void processItem(Long vertexId) {
counter.incrementAndGet(); AtlasVertex vertex = graph.getVertex(Long.toString(vertexId));
String typeName = wi.typeName;
if(wi.uniqueAttributeValues == null) {
return;
}
AtlasVertex vertex = graph.getVertex(Long.toString(wi.id));
if (vertex == null) { if (vertex == null) {
LOG.warn("processItem: AtlasVertex with id: ({}): not found!", wi.id); LOG.warn("processItem(vertexId={}): AtlasVertex not found!", vertexId);
return; return;
} }
...@@ -282,23 +265,30 @@ public class UniqueAttributePatch extends AtlasPatchHandler { ...@@ -282,23 +265,30 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
return; return;
} }
AtlasEntity.Status status = AtlasGraphUtilsV2.getState(vertex); if (AtlasGraphUtilsV2.getState(vertex) != AtlasEntity.Status.ACTIVE) {
return;
}
if (status != AtlasEntity.Status.ACTIVE) { String typeName = AtlasGraphUtilsV2.getTypeName(vertex);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
if (entityType == null) {
return; return;
} }
processItem(vertexId, vertex, typeName, entityType);
}
private void processItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) {
try { try {
LOG.debug("processItem: {}", wi.id); counter.incrementAndGet();
LOG.debug("processItem(typeName={}, vertexId={})", typeName, vertexId);
for (AtlasAttribute attribute : wi.uniqueAttributeValues) { for (AtlasAttribute attribute : entityType.getUniqAttributes().values()) {
String uniquePropertyKey = attribute.getVertexUniquePropertyName(); String uniquePropertyKey = attribute.getVertexUniquePropertyName();
Collection<? extends String> propertyKeys = vertex.getPropertyKeys(); Collection<? extends String> propertyKeys = vertex.getPropertyKeys();
Object uniqAttrValue = null; Object uniqAttrValue = null;
if (propertyKeys != null && propertyKeys.contains(uniquePropertyKey)) { if (propertyKeys == null || !propertyKeys.contains(uniquePropertyKey)) {
LOG.debug("processItem: {}: Skipped!", wi.id);
} else {
try { try {
String propertyKey = attribute.getVertexPropertyName(); String propertyKey = attribute.getVertexPropertyName();
...@@ -307,50 +297,18 @@ public class UniqueAttributePatch extends AtlasPatchHandler { ...@@ -307,50 +297,18 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
AtlasGraphUtilsV2.setEncodedProperty(vertex, uniquePropertyKey, uniqAttrValue); AtlasGraphUtilsV2.setEncodedProperty(vertex, uniquePropertyKey, uniqAttrValue);
} catch(AtlasSchemaViolationException ex) { } catch(AtlasSchemaViolationException ex) {
LOG.error("Duplicates detected: {}:{}:{}", typeName, uniqAttrValue, getIdFromVertex(vertex)); LOG.error("Duplicates detected: {}:{}:{}", typeName, uniqAttrValue, getIdFromVertex(vertex));
vertex.removeProperty(uniquePropertyKey);
} }
} }
commit();
} }
LOG.debug("processItem: {}: Done!", wi.id); LOG.debug("processItem(typeName={}, vertexId={}): Done!", typeName, vertexId);
} catch (Exception ex) { } catch (Exception ex) {
LOG.error("Error found: {}: {}", typeName, wi.id, ex); LOG.error("processItem(typeName={}, vertexId={}): failed!", typeName, vertexId, ex);
} finally {
commit();
} }
} }
} }
private class ConsumerBuilder implements WorkItemBuilder<Consumer, WorkItem> {
private final AtlasGraph graph;
public ConsumerBuilder(AtlasGraph graph) {
this.graph = graph;
}
@Override
public Consumer build(BlockingQueue<WorkItem> queue) {
return new Consumer(graph, queue);
}
}
}
public static class TypeNameAttributeCache {
private Map<String, Collection<AtlasAttribute>> typeUniqueAttributeCache = new HashMap<>();
public void add(AtlasEntityType entityType, Collection<AtlasAttribute> values) {
typeUniqueAttributeCache.put(entityType.getTypeName(), values);
}
public Collection<AtlasAttribute> get(String typeName) {
return typeUniqueAttributeCache.get(typeName);
}
public boolean has(String typeName) {
return typeUniqueAttributeCache.containsKey(typeName);
}
public Map<String, Collection<AtlasAttribute>> getAll() {
return typeUniqueAttributeCache;
}
} }
} }
...@@ -76,6 +76,7 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ ...@@ -76,6 +76,7 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED; import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.FAILED; import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.FAILED;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.SKIPPED; import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.SKIPPED;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
/** /**
* Class that handles initial loading of models and patches into typedef store * Class that handles initial loading of models and patches into typedef store
...@@ -475,7 +476,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { ...@@ -475,7 +476,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
patch.getId(), status.toString(), patch.getAction(), patchFile); patch.getId(), status.toString(), patch.getAction(), patchFile);
} }
patchRegistry.register(patch.id, patch.description, patch.action, status); patchRegistry.register(patch.id, patch.description, TYPEDEF_PATCH_TYPE, patch.action, status);
LOG.info("{} (status: {}; action: {}) in file: {}", patch.getId(), status.toString(), patch.getAction(), patchFile); LOG.info("{} (status: {}; action: {}) in file: {}", patch.getId(), status.toString(), patch.getAction(), patchFile);
} else { } else {
LOG.info("{} in file: {} already {}. Ignoring.", patch.getId(), patchFile, patchRegistry.getStatus(patch.getId()).toString()); LOG.info("{} in file: {} already {}. Ignoring.", patch.getId(), patchFile, patchRegistry.getStatus(patch.getId()).toString());
...@@ -783,7 +784,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { ...@@ -783,7 +784,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
public PatchStatus applyPatch(TypeDefPatch patch) throws AtlasBaseException { public PatchStatus applyPatch(TypeDefPatch patch) throws AtlasBaseException {
String typeName = patch.getTypeName(); String typeName = patch.getTypeName();
AtlasBaseTypeDef typeDef = typeRegistry.getTypeDefByName(typeName); AtlasBaseTypeDef typeDef = typeRegistry.getTypeDefByName(typeName);
PatchStatus ret = null; PatchStatus ret = UNKNOWN;
if (typeDef == null) { if (typeDef == null) {
throw new AtlasBaseException(AtlasErrorCode.PATCH_FOR_UNKNOWN_TYPE, patch.getAction(), typeName); throw new AtlasBaseException(AtlasErrorCode.PATCH_FOR_UNKNOWN_TYPE, patch.getAction(), typeName);
......
...@@ -27,6 +27,7 @@ import org.testng.annotations.Test; ...@@ -27,6 +27,7 @@ import org.testng.annotations.Test;
import javax.inject.Inject; import javax.inject.Inject;
import static org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer.TYPEDEF_PATCH_TYPE;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNotNull;
...@@ -46,7 +47,7 @@ public class AtlasPatchRegistryTest { ...@@ -46,7 +47,7 @@ public class AtlasPatchRegistryTest {
public void registerPatch() { public void registerPatch() {
AtlasPatchRegistry registry = new AtlasPatchRegistry(graph); AtlasPatchRegistry registry = new AtlasPatchRegistry(graph);
registry.register("1", "test patch", "apply", AtlasPatch.PatchStatus.UNKNOWN); registry.register("1", "test patch", TYPEDEF_PATCH_TYPE, "apply", AtlasPatch.PatchStatus.UNKNOWN);
assertPatches(registry, 1); assertPatches(registry, 1);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment