Commit dc9e2689 by Ashutosh Mestry

ATLAS-4015: Add Re-indexing as JAVA_PATCH.

parent b7c219a5
......@@ -172,4 +172,12 @@ public interface AtlasGraphManagement {
* Set consistency to ConsistencyModifier.LOCK for all vertex and edge indexes.
*/
void updateUniqueIndexesForConsistencyLock();
/***
* Re-index elements.
* @param indexName: Name of the index that needs to be operated on.
* @param elements: Elements to be re-indexed.
* @throws Exception
*/
void reindex(String indexName, List<AtlasElement> elements) throws Exception;
}
......@@ -19,14 +19,22 @@ package org.apache.atlas.repository.graphdb.janus;
import com.google.common.base.Preconditions;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasElement;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.janusgraph.core.Cardinality;
import org.janusgraph.core.EdgeLabel;
import org.janusgraph.core.JanusGraphElement;
import org.janusgraph.core.PropertyKey;
import org.janusgraph.core.schema.*;
import org.janusgraph.core.schema.JanusGraphManagement.IndexBuilder;
import org.janusgraph.diskstorage.BackendTransaction;
import org.janusgraph.diskstorage.indexing.IndexEntry;
import org.janusgraph.graphdb.database.IndexSerializer;
import org.janusgraph.graphdb.database.StandardJanusGraph;
import org.janusgraph.graphdb.database.management.ManagementSystem;
import org.janusgraph.graphdb.internal.Token;
import org.apache.atlas.repository.graphdb.AtlasCardinality;
import org.apache.atlas.repository.graphdb.AtlasEdgeLabel;
......@@ -36,11 +44,16 @@ import org.apache.atlas.repository.graphdb.AtlasPropertyKey;
import org.apache.commons.lang.StringUtils;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.janusgraph.graphdb.transaction.StandardJanusGraphTx;
import org.janusgraph.graphdb.types.IndexType;
import org.janusgraph.graphdb.types.MixedIndexType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
......@@ -308,4 +321,55 @@ public class AtlasJanusGraphManagement implements AtlasGraphManagement {
LOG.info("setConsistency: {}: {}: Done!", elementType.getSimpleName(), count);
}
}
@Override
public void reindex(String indexName, List<AtlasElement> elements) throws Exception {
try {
JanusGraphIndex index = management.getGraphIndex(indexName);
if (index == null || !(management instanceof ManagementSystem) || !(graph.getGraph() instanceof StandardJanusGraph)) {
LOG.error("Could not retrieve index for name: {} ", indexName);
return;
}
ManagementSystem managementSystem = (ManagementSystem) management;
IndexType indexType = managementSystem.getSchemaVertex(index).asIndexType();
if (!(indexType instanceof MixedIndexType)) {
LOG.warn("Index: {}: Not of MixedIndexType ", indexName);
return;
}
IndexSerializer indexSerializer = ((StandardJanusGraph) graph.getGraph()).getIndexSerializer();
reindexElement(managementSystem, indexSerializer, (MixedIndexType) indexType, elements);
} catch (Exception exception) {
throw exception;
} finally {
management.commit();
}
}
private void reindexElement(ManagementSystem managementSystem, IndexSerializer indexSerializer, MixedIndexType indexType, List<AtlasElement> elements) throws Exception {
Map<String, Map<String, List<IndexEntry>>> documentsPerStore = new HashMap<>();
StandardJanusGraphTx tx = managementSystem.getWrappedTx();
BackendTransaction txHandle = tx.getTxHandle();
try {
JanusGraphElement janusGraphElement = null;
for (AtlasElement element : elements) {
try {
if (element == null || element.getWrappedElement() == null) {
continue;
}
janusGraphElement = element.getWrappedElement();
indexSerializer.reindexElement(janusGraphElement, indexType, documentsPerStore);
} catch (Exception e) {
LOG.warn("{}: Exception: {}:{}", indexType.getName(), e.getClass().getSimpleName(), e.getMessage());
}
}
} finally {
if (txHandle != null) {
txHandle.getIndexTransaction(indexType.getBackingIndexName()).restore(documentsPerStore);
}
}
}
}
......@@ -73,7 +73,8 @@ public enum AtlasConfiguration {
LINEAGE_USING_GREMLIN("atlas.lineage.query.use.gremlin", false),
HTTP_HEADER_SERVER_VALUE("atlas.http.header.server.value","Apache Atlas"),
STORAGE_CONSISTENCY_LOCK_ENABLED("atlas.graph.storage.consistency-lock.enabled", true);
STORAGE_CONSISTENCY_LOCK_ENABLED("atlas.graph.storage.consistency-lock.enabled", true),
REINDEX_PATCH_ENABLED("atlas.patch.reindex.enabled", false);
private static final Configuration APPLICATION_PROPERTIES;
......
......@@ -54,7 +54,8 @@ public class AtlasPatchManager {
new ClassificationTextPatch(context),
new FreeTextRequestHandlerPatch(context),
new SuggestionsRequestHandlerPatch(context),
new IndexConsistencyPatch(context)
new IndexConsistencyPatch(context),
new ReIndexPatch(context)
};
try {
......
......@@ -43,8 +43,8 @@ public abstract class ConcurrentPatchProcessor {
private static final String BATCH_SIZE_PROPERTY = "atlas.patch.batchSize";
private static final String ATLAS_SOLR_SHARDS = "ATLAS_SOLR_SHARDS";
private static final String WORKER_NAME_PREFIX = "patchWorkItem";
private static final int NUM_WORKERS;
private static final int BATCH_SIZE;
public static final int NUM_WORKERS;
public static final int BATCH_SIZE;
private final EntityGraphMapper entityGraphMapper;
private final AtlasGraph graph;
......@@ -61,7 +61,7 @@ public abstract class ConcurrentPatchProcessor {
numWorkers = config.getInt(NUM_WORKERS_PROPERTY, config.getInt(ATLAS_SOLR_SHARDS, 1) * 3);
batchSize = config.getInt(BATCH_SIZE_PROPERTY, 300);
LOG.info("UniqueAttributePatch: {}={}, {}={}", NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize);
LOG.info("ConcurrentPatchProcessor: {}={}, {}={}", NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize);
} catch (Exception e) {
LOG.error("Error retrieving configuration.", e);
}
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.patches;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.pc.WorkItemBuilder;
import org.apache.atlas.pc.WorkItemConsumer;
import org.apache.atlas.pc.WorkItemManager;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasElement;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
public class ReIndexPatch extends AtlasPatchHandler {
private static final Logger LOG = LoggerFactory.getLogger(ReIndexPatch.class);
private static final String PATCH_ID = "JAVA_PATCH_0000_006";
private static final String PATCH_DESCRIPTION = "Performs reindex on all the indexes.";
private final PatchContext context;
public ReIndexPatch(PatchContext context) {
super(context.getPatchRegistry(), PATCH_ID, PATCH_DESCRIPTION);
this.context = context;
}
@Override
public void apply() throws AtlasBaseException {
if (AtlasConfiguration.REINDEX_PATCH_ENABLED.getBoolean() == false) {
return;
}
try {
LOG.info("ReIndexPatch: Starting...");
ReindexPatchProcessor reindexPatchProcessor = new ReindexPatchProcessor(context);
reindexPatchProcessor.repairVertices();
reindexPatchProcessor.repairEdges();
} catch (Exception exception) {
LOG.error("Error while reindexing.", exception);
} finally {
LOG.info("ReIndexPatch: Done!");
}
setStatus(UNKNOWN);
LOG.info("ReIndexPatch.apply(): patchId={}, status={}", getPatchId(), getStatus());
}
public static class ReindexPatchProcessor {
private static String[] vertexIndexNames = new String[]{ Constants.VERTEX_INDEX, Constants.FULLTEXT_INDEX };
private static String[] edgeIndexNames = new String[]{ Constants.EDGE_INDEX };
private static String WORKER_PREFIX = "reindex";
private PatchContext context;
public ReindexPatchProcessor(PatchContext context) {
this.context = context;
}
public void repairVertices() {
repairElements(ReindexPatchProcessor::vertices, vertexIndexNames);
}
public void repairEdges() {
repairElements(ReindexPatchProcessor::edges, edgeIndexNames);
}
private void repairElements(BiConsumer<WorkItemManager, AtlasGraph> action, String[] indexNames) {
WorkItemManager manager = new WorkItemManager(new ReindexConsumerBuilder(context.getGraph(), indexNames),
WORKER_PREFIX, ConcurrentPatchProcessor.BATCH_SIZE, ConcurrentPatchProcessor.NUM_WORKERS, false);
try {
LOG.info("repairElements.execute(): {}: Starting...", indexNames);
action.accept(manager, context.getGraph());
manager.drain();
} finally {
try {
manager.shutdown();
} catch (InterruptedException e) {
LOG.error("repairEdges.execute(): interrupted during WorkItemManager shutdown.", e);
}
LOG.info("repairElements.execute(): {}: Done!", indexNames);
}
}
private static void edges(WorkItemManager manager, AtlasGraph graph) {
Iterable<Object> iterable = graph.getEdges();
for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); ) {
manager.checkProduce(iter.next());
}
}
private static void vertices(WorkItemManager manager, AtlasGraph graph) {
Iterable<AtlasVertex> iterable = graph.getVertices();
for (Iterator<AtlasVertex> iter = iterable.iterator(); iter.hasNext(); ) {
AtlasVertex vertex = iter.next();
manager.checkProduce(vertex);
}
}
}
private static class ReindexConsumerBuilder implements WorkItemBuilder<ReindexConsumer, AtlasElement> {
private AtlasGraph graph;
private String[] indexNames;
public ReindexConsumerBuilder(AtlasGraph graph, String[] indexNames) {
this.graph = graph;
this.indexNames = indexNames;
}
@Override
public ReindexConsumer build(BlockingQueue queue) {
return new ReindexConsumer(queue, this.graph, this.indexNames);
}
}
private static class ReindexConsumer extends WorkItemConsumer<AtlasElement> {
private List<AtlasElement> list = new ArrayList();
private AtlasGraph graph;
private String[] indexNames;
private final AtomicLong counter;
public ReindexConsumer(BlockingQueue queue, AtlasGraph graph, String[] indexNames) {
super(queue);
this.graph = graph;
this.indexNames = indexNames;
this.counter = new AtomicLong(0);
}
@Override
protected void doCommit() {
if (list.size() >= ConcurrentPatchProcessor.BATCH_SIZE) {
attemptCommit();
}
}
@Override
protected void commitDirty() {
attemptCommit();
LOG.info("Total: Commit: {}", counter.get());
super.commitDirty();
}
private void attemptCommit() {
for (String indexName : indexNames) {
try {
this.graph.getManagementSystem().reindex(indexName, list);
}
catch (IllegalStateException e) {
LOG.error("IllegalStateException: Exception", e);
return;
}
catch (Exception exception) {
LOG.error("Exception: {}", indexName, exception);
}
}
list.clear();
LOG.info("Processed: {}", counter.get());
}
@Override
protected void processItem(AtlasElement item) {
counter.incrementAndGet();
list.add(item);
commit();
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment