Commit efc4bebc by Ashutosh Mestry Committed by Madhan Neethiraj

ATLAS-3132: performance improvements in UniqueAttributesPatch

parent 93e629fa
......@@ -30,31 +30,35 @@ import java.util.concurrent.atomic.AtomicLong;
public abstract class WorkItemConsumer<T> implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(WorkItemConsumer.class);
private static final int POLLING_DURATION_SECONDS = 5;
private static final int POLLING_DURATION_SECONDS = 30;
private static final int DEFAULT_COMMIT_TIME_IN_MS = 15000;
private final BlockingQueue<T> queue;
private AtomicBoolean isDirty = new AtomicBoolean(false);
private AtomicLong maxCommitTimeInMs = new AtomicLong(0);
private CountDownLatch countdownLatch;
private BlockingQueue<Object> results;
private final AtomicBoolean isDirty = new AtomicBoolean(false);
private final AtomicLong maxCommitTimeInMs = new AtomicLong(DEFAULT_COMMIT_TIME_IN_MS);
private CountDownLatch countdownLatch;
private BlockingQueue<Object> results;
public WorkItemConsumer(BlockingQueue<T> queue) {
this.queue = queue;
this.queue = queue;
this.countdownLatch = null;
}
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
T item = queue.poll(POLLING_DURATION_SECONDS, TimeUnit.SECONDS);
if (item == null) {
LOG.warn("WorkItemConsumer.run(): no more items found in the queue. Will exit after committing");
commitDirty();
return;
}
isDirty.set(true);
processItem(item);
}
} catch (InterruptedException e) {
......@@ -67,6 +71,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
public long getMaxCommitTimeInMs() {
long commitTime = this.maxCommitTimeInMs.get();
return ((commitTime > DEFAULT_COMMIT_TIME_IN_MS) ? commitTime : DEFAULT_COMMIT_TIME_IN_MS);
}
......
......@@ -33,20 +33,20 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
private final ExecutorService service;
private final List<U> consumers = new ArrayList<>();
private CountDownLatch countdownLatch;
private BlockingQueue<Object> resultsQueue;
private BlockingQueue<Object> resultsQueue;
public WorkItemManager(WorkItemBuilder builder, String namePrefix, int batchSize, int numWorkers, boolean collectResults) {
this.numWorkers = numWorkers;
workQueue = new LinkedBlockingQueue<>(batchSize * numWorkers);
service = Executors.newFixedThreadPool(numWorkers,
new ThreadFactoryBuilder().setNameFormat(namePrefix + "-%d").build());
this.workQueue = new LinkedBlockingQueue<>(batchSize * numWorkers);
this.service = Executors.newFixedThreadPool(numWorkers, new ThreadFactoryBuilder().setNameFormat(namePrefix + "-%d").build());
createConsumers(builder, numWorkers, collectResults);
execute();
start();
}
public WorkItemManager(WorkItemBuilder builder, int batchSize, int numWorkers) {
this(builder, "workItem", batchSize, numWorkers, false);
this(builder, "workItemConsumer", batchSize, numWorkers, false);
}
public void setResultsCollection(BlockingQueue<Object> resultsQueue) {
......@@ -60,6 +60,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
for (int i = 0; i < numWorkers; i++) {
U c = (U) builder.build(workQueue);
consumers.add(c);
if (collectResults) {
......@@ -68,10 +69,12 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
}
}
private void execute() {
public void start() {
this.countdownLatch = new CountDownLatch(numWorkers);
for (U c : consumers) {
c.setCountDownLatch(countdownLatch);
service.execute(c);
}
}
......@@ -85,9 +88,14 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
}
public void checkProduce(T item) {
if (countdownLatch.getCount() == 0) {
execute();
if (countdownLatch.getCount() < numWorkers) {
LOG.info("Fewer workers detected: {}", countdownLatch.getCount());
drain();
start();
}
produce(item);
}
......
......@@ -49,7 +49,7 @@ import java.util.concurrent.TimeUnit;
@Component
@Order(2)
@Order(3)
public class EmbeddedKafkaServer implements Service {
public static final Logger LOG = LoggerFactory.getLogger(EmbeddedKafkaServer.class);
......
......@@ -48,7 +48,7 @@ import java.util.concurrent.Future;
* Kafka specific access point to the Atlas notification framework.
*/
@Component
@Order(3)
@Order(4)
public class KafkaNotification extends AbstractNotification implements Service {
public static final Logger LOG = LoggerFactory.getLogger(KafkaNotification.class);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.patches;
import org.apache.atlas.RequestContext;
import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.MapUtils;
import java.util.Map;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
import static org.apache.atlas.repository.Constants.CREATED_BY_KEY;
import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY;
import static org.apache.atlas.repository.Constants.PATCH_DESCRIPTION_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.PATCH_ID_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.PATCH_STATE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.PATCH_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.findByPatchId;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
import static org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2.getCurrentUser;
public abstract class AtlasJavaPatchHandler {
public final AtlasGraph graph;
public final AtlasTypeRegistry typeRegistry;
public final Map<String, PatchStatus> patchesRegistry;
public final EntityGraphRetriever entityRetriever;
public final GraphBackedSearchIndexer indexer;
public final PatchContext context;
public final String patchId;
public final String patchDescription;
private PatchStatus patchStatus;
public static final String JAVA_PATCH_TYPE = "JAVA_PATCH";
public AtlasJavaPatchHandler(PatchContext context, String patchId, String patchDescription) {
this.context = context;
this.graph = context.getGraph();
this.typeRegistry = context.getTypeRegistry();
this.indexer = context.getIndexer();
this.patchesRegistry = context.getPatchesRegistry();
this.patchId = patchId;
this.patchDescription = patchDescription;
this.patchStatus = getPatchStatus(patchesRegistry);
this.entityRetriever = new EntityGraphRetriever(typeRegistry);
init();
}
private void init() {
PatchStatus patchStatus = getPatchStatus();
if (patchStatus == UNKNOWN) {
AtlasVertex patchVertex = graph.addVertex();
setEncodedProperty(patchVertex, PATCH_ID_PROPERTY_KEY, patchId);
setEncodedProperty(patchVertex, PATCH_DESCRIPTION_PROPERTY_KEY, patchDescription);
setEncodedProperty(patchVertex, PATCH_TYPE_PROPERTY_KEY, JAVA_PATCH_TYPE);
setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, getPatchStatus().toString());
setEncodedProperty(patchVertex, TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
setEncodedProperty(patchVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
setEncodedProperty(patchVertex, CREATED_BY_KEY, getCurrentUser());
setEncodedProperty(patchVertex, MODIFIED_BY_KEY, getCurrentUser());
addToPatchesRegistry(patchId, getPatchStatus());
}
graph.commit();
}
private PatchStatus getPatchStatus(Map<String, PatchStatus> patchesRegistry) {
PatchStatus ret = UNKNOWN;
if (MapUtils.isNotEmpty(patchesRegistry) && patchesRegistry.containsKey(patchId)) {
ret = patchesRegistry.get(patchId);
}
return ret;
}
public void updatePatchVertex(PatchStatus patchStatus) {
AtlasVertex patchVertex = findByPatchId(patchId);
if (patchVertex != null) {
setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString());
setEncodedProperty(patchVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
setEncodedProperty(patchVertex, MODIFIED_BY_KEY, getCurrentUser());
addToPatchesRegistry(getPatchId(), getPatchStatus());
}
graph.commit();
}
public PatchStatus getPatchStatus() {
return patchStatus;
}
public void addToPatchesRegistry(String patchId, PatchStatus status) {
getPatchesRegistry().put(patchId, status);
}
public void setPatchStatus(PatchStatus patchStatus) {
this.patchStatus = patchStatus;
}
public String getPatchId() {
return patchId;
}
public Map<String, PatchStatus> getPatchesRegistry() {
return patchesRegistry;
}
public abstract void applyPatch();
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.patches;
import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
public abstract class AtlasPatchHandler {
public static final String JAVA_PATCH_TYPE = "JAVA_PATCH";
private final String patchId;
private final String patchDescription;
private final AtlasPatchRegistry patchRegistry;
private PatchStatus status;
public AtlasPatchHandler(AtlasPatchRegistry patchRegistry, String patchId, String patchDescription) {
this.patchId = patchId;
this.patchDescription = patchDescription;
this.patchRegistry = patchRegistry;
this.status = getStatusFromRegistry();
register();
}
private void register() {
PatchStatus patchStatus = getStatus();
if (patchStatus == UNKNOWN) {
patchRegistry.register(patchId, patchDescription, JAVA_PATCH_TYPE, getStatus());
}
}
public PatchStatus getStatusFromRegistry() {
return patchRegistry.getStatus(patchId);
}
public PatchStatus getStatus() {
return status;
}
public void setStatus(PatchStatus status) {
this.status = status;
patchRegistry.updateStatus(patchId, status);
}
public String getPatchId() {
return patchId;
}
public abstract void apply();
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.patches;
import org.apache.atlas.model.patches.AtlasPatch;
import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.SKIPPED;
@Component
public class AtlasPatchManager {
private static final Logger LOG = LoggerFactory.getLogger(AtlasPatchManager.class);
private final PatchContext context;
@Inject
public AtlasPatchManager(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry, GraphBackedSearchIndexer indexer) {
this.context = new PatchContext(atlasGraph, typeRegistry, indexer);
}
public AtlasPatch.AtlasPatches getAllPatches() {
return context.getPatchRegistry().getAllPatches();
}
public void applyAll() {
final AtlasPatchHandler handlers[] = {
new UniqueAttributePatch(context)
};
try {
for (AtlasPatchHandler handler : handlers) {
PatchStatus patchStatus = handler.getStatusFromRegistry();
if (patchStatus == APPLIED || patchStatus == SKIPPED) {
LOG.info("Ignoring java handler: {}; status: {}", handler.getPatchId(), patchStatus);
} else {
LOG.info("Applying java handler: {}; status: {}", handler.getPatchId(), patchStatus);
handler.apply();
}
}
}
catch (Exception ex) {
LOG.error("Error applying patches.", ex);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.patches;
import org.apache.atlas.RequestContext;
import org.apache.atlas.model.patches.AtlasPatch;
import org.apache.atlas.model.patches.AtlasPatch.AtlasPatches;
import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.FAILED;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graph.AtlasGraphProvider.getGraphInstance;
import static org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer.TYPEDEF_PATCH_TYPE;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getEncodedProperty;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIndexSearchPrefix;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
import static org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2.getCurrentUser;
public class AtlasPatchRegistry {
private static final Logger LOG = LoggerFactory.getLogger(AtlasPatchRegistry.class);
private final Map<String, PatchStatus> patchNameStatusMap;
private final AtlasGraph graph;
public AtlasPatchRegistry(AtlasGraph graph) {
this.graph = graph;
this.patchNameStatusMap = getPatchNameStatusForAllRegistered(graph);
}
public boolean isApplicable(String incomingId, String patchFile, int index) {
String patchId = getId(incomingId, patchFile, index);
if (MapUtils.isEmpty(patchNameStatusMap) || !patchNameStatusMap.containsKey(patchId)) {
return true;
}
PatchStatus status = patchNameStatusMap.get(patchId);
if (status == FAILED || status == UNKNOWN) {
return true;
}
return false;
}
public PatchStatus getStatus(String id) {
return patchNameStatusMap.get(id);
}
public void register(String patchId, String description, String action, PatchStatus patchStatus) {
createOrUpdatePatchVertex(graph, patchId, description, action, patchStatus);
}
public void updateStatus(String patchId, PatchStatus patchStatus) {
AtlasVertex patchVertex = findByPatchId(patchId);
if (patchVertex == null) {
return;
}
setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString());
setEncodedProperty(patchVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
setEncodedProperty(patchVertex, MODIFIED_BY_KEY, getCurrentUser());
setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString());
patchNameStatusMap.put(patchId, patchStatus);
graph.commit();
}
private static String getId(String incomingId, String patchFile, int index) {
String patchId = incomingId;
if (StringUtils.isEmpty(patchId)) {
return String.format("%s_%s", patchFile, index);
}
return patchId;
}
public AtlasPatches getAllPatches() {
return getAllPatches(graph);
}
private void createOrUpdatePatchVertex(AtlasGraph graph, String patchId,
String description, String action, PatchStatus patchStatus) {
boolean isPatchRegistered = MapUtils.isNotEmpty(patchNameStatusMap) && patchNameStatusMap.containsKey(patchId);
AtlasVertex patchVertex = isPatchRegistered ? findByPatchId(patchId) : graph.addVertex();
setEncodedProperty(patchVertex, PATCH_ID_PROPERTY_KEY, patchId);
setEncodedProperty(patchVertex, PATCH_DESCRIPTION_PROPERTY_KEY, description);
setEncodedProperty(patchVertex, PATCH_TYPE_PROPERTY_KEY, TYPEDEF_PATCH_TYPE);
setEncodedProperty(patchVertex, PATCH_ACTION_PROPERTY_KEY, action);
setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString());
setEncodedProperty(patchVertex, TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
setEncodedProperty(patchVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
setEncodedProperty(patchVertex, CREATED_BY_KEY, AtlasTypeDefGraphStoreV2.getCurrentUser());
setEncodedProperty(patchVertex, MODIFIED_BY_KEY, AtlasTypeDefGraphStoreV2.getCurrentUser());
graph.commit();
}
private static Map<String, PatchStatus> getPatchNameStatusForAllRegistered(AtlasGraph graph) {
Map<String, PatchStatus> ret = new HashMap<>();
AtlasPatches patches = getAllPatches(graph);
for (AtlasPatch patch : patches.getPatches()) {
String patchId = patch.getId();
PatchStatus patchStatus = patch.getStatus();
if (patchId != null && patchStatus != null) {
ret.put(patchId, patchStatus);
}
}
return ret;
}
private static AtlasPatches getAllPatches(AtlasGraph graph) {
List<AtlasPatch> ret = new ArrayList<>();
String idxQueryString = getIndexSearchPrefix() + "\"" + PATCH_ID_PROPERTY_KEY + "\" : (*)";
AtlasIndexQuery idxQuery = graph.indexQuery(VERTEX_INDEX, idxQueryString);
try {
Iterator<Result<Object, Object>> results = idxQuery.vertices();
while (results != null && results.hasNext()) {
AtlasVertex patchVertex = results.next().getVertex();
AtlasPatch patch = toAtlasPatch(patchVertex);
ret.add(patch);
}
if (CollectionUtils.isNotEmpty(ret)) {
Collections.sort(ret, Comparator.comparing(AtlasPatch::getId));
}
} catch (Throwable t) {
LOG.warn("getAllPatches(): Returned empty result!");
}
graph.commit();
return new AtlasPatches(ret);
}
private static AtlasPatch toAtlasPatch(AtlasVertex vertex) {
AtlasPatch ret = new AtlasPatch();
ret.setId(getEncodedProperty(vertex, PATCH_ID_PROPERTY_KEY, String.class));
ret.setDescription(getEncodedProperty(vertex, PATCH_DESCRIPTION_PROPERTY_KEY, String.class));
ret.setType(getEncodedProperty(vertex, PATCH_TYPE_PROPERTY_KEY, String.class));
ret.setAction(getEncodedProperty(vertex, PATCH_ACTION_PROPERTY_KEY, String.class));
ret.setCreatedBy(getEncodedProperty(vertex, CREATED_BY_KEY, String.class));
ret.setUpdatedBy(getEncodedProperty(vertex, MODIFIED_BY_KEY, String.class));
ret.setCreatedTime(getEncodedProperty(vertex, TIMESTAMP_PROPERTY_KEY, Long.class));
ret.setUpdatedTime(getEncodedProperty(vertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class));
ret.setStatus(getPatchStatus(vertex));
return ret;
}
private static AtlasVertex findByPatchId(String patchId) {
AtlasVertex ret = null;
String indexQuery = getIndexSearchPrefix() + "\"" + PATCH_ID_PROPERTY_KEY + "\" : (" + patchId + ")";
Iterator<Result<Object, Object>> results = getGraphInstance().indexQuery(VERTEX_INDEX, indexQuery).vertices();
while (results != null && results.hasNext()) {
ret = results.next().getVertex();
if (ret != null) {
break;
}
}
return ret;
}
private static PatchStatus getPatchStatus(AtlasVertex vertex) {
String patchStatus = AtlasGraphUtilsV2.getEncodedProperty(vertex, PATCH_STATE_PROPERTY_KEY, String.class);
return patchStatus != null ? PatchStatus.valueOf(patchStatus) : UNKNOWN;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.patches;
import org.apache.atlas.AtlasException;
import org.apache.atlas.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
@Component
@Order(2)
public class AtlasPatchService implements Service {
private static final Logger LOG = LoggerFactory.getLogger(AtlasPatchService.class);
private final AtlasPatchManager patchManager;
@Inject
public AtlasPatchService(AtlasPatchManager patchManager) {
this.patchManager = patchManager;
}
@Override
public void start() throws AtlasException {
LOG.info("PatchService: Started.");
patchManager.applyAll();
}
@Override
public void stop() throws AtlasException {
LOG.info("PatchService: Stopped.");
}
}
......@@ -6,39 +6,33 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.patches;
import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.type.AtlasTypeRegistry;
import java.util.Map;
/**
* Patch context for typedef and java patches.
*/
public class PatchContext {
private final AtlasGraph graph;
private final AtlasTypeRegistry typeRegistry;
private final GraphBackedSearchIndexer indexer;
private final Map<String, PatchStatus> patchesRegistry;
public PatchContext(AtlasGraph graph, AtlasTypeRegistry typeRegistry, GraphBackedSearchIndexer indexer,
Map<String, PatchStatus> patchesRegistry) {
this.graph = graph;
this.typeRegistry = typeRegistry;
this.indexer = indexer;
this.patchesRegistry = patchesRegistry;
private final AtlasPatchRegistry patchRegistry;
public PatchContext(AtlasGraph graph, AtlasTypeRegistry typeRegistry, GraphBackedSearchIndexer indexer) {
this.graph = graph;
this.typeRegistry = typeRegistry;
this.indexer = indexer;
this.patchRegistry = new AtlasPatchRegistry(this.graph);
}
public AtlasGraph getGraph() {
......@@ -53,7 +47,7 @@ public class PatchContext {
return indexer;
}
public Map<String, PatchStatus> getPatchesRegistry() {
return patchesRegistry;
public AtlasPatchRegistry getPatchRegistry() {
return patchRegistry;
}
}
\ No newline at end of file
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.patches;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.repository.IndexException;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind;
import org.apache.atlas.repository.graphdb.AtlasCardinality;
import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.FAILED;
import static org.apache.atlas.repository.graph.GraphHelper.getGuid;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.findActiveEntityVerticesByType;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
public class UniqueAttributePatchHandler extends AtlasJavaPatchHandler {
private static final String PATCH_ID = "JAVA_PATCH_0000_001";
private static final String PATCH_DESCRIPTION = "Add new vertex property for each unique attribute of active entities";
private static final Logger LOG = LoggerFactory.getLogger(UniqueAttributePatchHandler.class);
public UniqueAttributePatchHandler(PatchContext context) {
super(context, PATCH_ID, PATCH_DESCRIPTION);
}
@Override
public void applyPatch() {
Collection<AtlasEntityType> allEntityTypes = typeRegistry.getAllEntityTypes();
boolean patchFailed = false;
for (AtlasEntityType entityType : allEntityTypes) {
String typeName = entityType.getTypeName();
Map<String, AtlasAttribute> uniqAttributes = entityType.getUniqAttributes();
int patchAppliedCount = 0;
LOG.info("Applying java patch: {} for type: {}", getPatchId(), typeName);
if (MapUtils.isNotEmpty(uniqAttributes)) {
Collection<AtlasAttribute> attributes = uniqAttributes.values();
try {
// register unique attribute property keys in graph
registerUniqueAttrPropertyKeys(attributes);
Iterator<Result<Object, Object>> iterator = findActiveEntityVerticesByType(typeName);
int entityCount = 0;
while (iterator != null && iterator.hasNext()) {
AtlasVertex entityVertex = iterator.next().getVertex();
boolean patchApplied = false;
entityCount++;
for (AtlasAttribute attribute : attributes) {
String uniquePropertyKey = attribute.getVertexUniquePropertyName();
Collection<? extends String> propertyKeys = entityVertex.getPropertyKeys();
if (!propertyKeys.contains(uniquePropertyKey)) {
String propertyKey = attribute.getVertexPropertyName();
AtlasAttributeDef attributeDef = attribute.getAttributeDef();
Object uniqAttrValue = entityRetriever.mapVertexToPrimitive(entityVertex, propertyKey, attributeDef);
// add the unique attribute property to vertex
setEncodedProperty(entityVertex, uniquePropertyKey, uniqAttrValue);
try {
graph.commit();
patchApplied = true;
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Added unique attribute property: {} to entity: {} ({})",
PATCH_ID, uniquePropertyKey, getGuid(entityVertex), typeName);
}
} catch (Throwable t) {
LOG.warn("Java patch ({}): failed to update entity guid: {}; typeName: {}; attrName: {}; attrValue: {}",
getPatchId(), getGuid(entityVertex), typeName, attribute.getName(), uniqAttrValue);
continue;
}
}
}
if (patchApplied) {
patchAppliedCount++;
}
if (entityCount % 1000 == 0) {
LOG.info("Java patch: {} : applied {}; processed {} {} entities.", getPatchId(), patchAppliedCount, entityCount, typeName);
}
}
} catch (IndexException e) {
LOG.error("Java patch: {} failed! error: {}", getPatchId(), e);
patchFailed = true;
break;
}
}
LOG.info("Applied java patch ({}) for type: {}; Total processed: {}", getPatchId(), typeName, patchAppliedCount);
}
if (patchFailed) {
setPatchStatus(FAILED);
} else {
setPatchStatus(APPLIED);
}
LOG.info("Applied java patch: {}; status: {}", getPatchId(), getPatchStatus());
updatePatchVertex(getPatchStatus());
}
private void registerUniqueAttrPropertyKeys(Collection<AtlasAttribute> attributes) throws IndexException {
AtlasGraphManagement management = graph.getManagementSystem();
for (AtlasAttribute attribute : attributes) {
String uniquePropertyName = attribute.getVertexUniquePropertyName();
boolean uniquePropertyNameExists = management.getPropertyKey(uniquePropertyName) != null;
if (!uniquePropertyNameExists) {
AtlasAttributeDef attributeDef = attribute.getAttributeDef();
boolean isIndexable = attributeDef.getIsIndexable();
String attribTypeName = attributeDef.getTypeName();
Class propertyClass = indexer.getPrimitiveClass(attribTypeName);
AtlasCardinality cardinality = indexer.toAtlasCardinality(attributeDef.getCardinality());
indexer.createVertexIndex(management, uniquePropertyName, UniqueKind.NONE, propertyClass, cardinality, isIndexable, true);
}
}
//Commit indexes
indexer.commit(management);
graph.commit();
}
}
\ No newline at end of file
......@@ -947,7 +947,7 @@ public class EntityGraphRetriever {
return ret;
}
public Object mapVertexToPrimitive(AtlasElement entityVertex, final String vertexPropertyName, AtlasAttributeDef attrDef) {
public static Object mapVertexToPrimitive(AtlasElement entityVertex, final String vertexPropertyName, AtlasAttributeDef attrDef) {
Object ret = null;
if (AtlasGraphUtilsV2.getEncodedProperty(entityVertex, vertexPropertyName, Object.class) == null) {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.patches;
import org.apache.atlas.TestModules;
import org.apache.atlas.model.patches.AtlasPatch;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.patches.AtlasPatchRegistry;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
@Guice(modules = TestModules.TestOnlyModule.class)
public class AtlasPatchRegistryTest {
@Inject
private AtlasGraph graph;
@Test
public void noPatchesRegistered() {
AtlasPatchRegistry registry = new AtlasPatchRegistry(graph);
assertPatches(registry, 0);
}
@Test(dependsOnMethods = "noPatchesRegistered")
public void registerPatch() {
AtlasPatchRegistry registry = new AtlasPatchRegistry(graph);
registry.register("1", "test patch", "apply", AtlasPatch.PatchStatus.UNKNOWN);
assertPatches(registry, 1);
}
@Test(dependsOnMethods = "registerPatch")
public void updateStatusForPatch() {
final AtlasPatch.PatchStatus expectedStatus = AtlasPatch.PatchStatus.APPLIED;
String patchId = "1";
AtlasPatchRegistry registry = new AtlasPatchRegistry(graph);
registry.updateStatus(patchId, expectedStatus);
AtlasPatch.AtlasPatches patches = assertPatches(registry, 1);
assertEquals(patches.getPatches().get(0).getId(), patchId);
assertEquals(patches.getPatches().get(0).getStatus(), expectedStatus);
}
private AtlasPatch.AtlasPatches assertPatches(AtlasPatchRegistry registry, int i) {
AtlasPatch.AtlasPatches patches = registry.getAllPatches();
assertNotNull(patches);
assertEquals(patches.getPatches().size(), i);
return patches;
}
}
......@@ -100,7 +100,7 @@ import static org.apache.atlas.notification.preprocessor.EntityPreprocessor.TYPE
* Consumer of notifications from hooks e.g., hive hook etc.
*/
@Component
@Order(4)
@Order(5)
@DependsOn(value = {"atlasTypeDefStoreInitializer", "atlasTypeDefGraphStoreV2"})
public class NotificationHookConsumer implements Service, ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
......
......@@ -23,16 +23,16 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.authorize.AtlasAdminAccessRequest;
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.authorize.AtlasEntityAccessRequest;
import org.apache.atlas.authorize.AtlasPrivilege;
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.discovery.SearchContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasServer;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.impexp.AtlasServer;
import org.apache.atlas.model.impexp.ExportImportAuditEntry;
import org.apache.atlas.model.impexp.MigrationStatus;
import org.apache.atlas.model.instance.AtlasCheckStateRequest;
......@@ -46,14 +46,14 @@ import org.apache.atlas.repository.impexp.ImportService;
import org.apache.atlas.repository.impexp.MigrationProgressService;
import org.apache.atlas.repository.impexp.ZipSink;
import org.apache.atlas.repository.impexp.ZipSource;
import org.apache.atlas.repository.patches.AtlasPatchManager;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.services.MetricsService;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.SearchTracker;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.utils.AtlasJson;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.web.filters.AtlasCSRFPreventionFilter;
import org.apache.atlas.web.service.ServiceState;
import org.apache.atlas.web.util.Servlets;
......@@ -138,6 +138,7 @@ public class AdminResource {
private final ExportImportAuditService exportImportAuditService;
private final AtlasServerService atlasServerService;
private final AtlasEntityStore entityStore;
private final AtlasPatchManager patchManager;
static {
try {
......@@ -152,7 +153,8 @@ public class AdminResource {
ExportService exportService, ImportService importService, SearchTracker activeSearches,
MigrationProgressService migrationProgressService,
AtlasServerService serverService,
ExportImportAuditService exportImportAuditService, AtlasEntityStore entityStore) {
ExportImportAuditService exportImportAuditService, AtlasEntityStore entityStore,
AtlasPatchManager patchManager) {
this.serviceState = serviceState;
this.metricsService = metricsService;
this.exportService = exportService;
......@@ -164,6 +166,7 @@ public class AdminResource {
this.entityStore = entityStore;
this.exportImportAuditService = exportImportAuditService;
this.importExportOperationLock = new ReentrantLock();
this.patchManager = patchManager;
}
/**
......@@ -564,7 +567,7 @@ public class AdminResource {
LOG.debug("==> AdminResource.getAtlasPatches()");
}
AtlasPatches ret = AtlasGraphUtilsV2.getAllPatches();
AtlasPatches ret = patchManager.getAllPatches();
if (LOG.isDebugEnabled()) {
LOG.debug("<== AdminResource.getAtlasPatches()");
......
......@@ -51,7 +51,7 @@ public class AdminResourceTest {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null);
Response response = adminResource.getStatus();
assertEquals(response.getStatus(), HttpServletResponse.SC_OK);
JsonNode entity = AtlasJson.parseToV1JsonNode((String) response.getEntity());
......@@ -62,7 +62,7 @@ public class AdminResourceTest {
public void testResourceGetsValueFromServiceState() throws IOException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null);
Response response = adminResource.getStatus();
verify(serviceState).getState();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment