diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java index 4dd641d..3a64d31 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java @@ -135,7 +135,8 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient { @Override public Map<String, List<AtlasAggregationEntry>> getAggregatedMetrics(String queryString, Set<String> propertyKeyNames) { - SolrClient solrClient = null; + SolrClient solrClient = null; + AtlasGraphManagement management = graph.getManagementSystem(); try { solrClient = Solr6Index.getSolrClient(); // get solr client using same settings as that of Janus Graph @@ -153,7 +154,6 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient { } SolrQuery solrQuery = new SolrQuery(); - AtlasGraphManagement management = graph.getManagementSystem(); Map<String, String> indexFieldName2PropertyKeyNameMap = new HashMap<>(); solrQuery.setQuery(queryString); @@ -192,7 +192,9 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient { } } catch (Exception e) { LOG.error("Error enocunted in getting the aggregation metrics. Will return empty agregation.", e); - }finally { + } finally { + graphManagementCommit(management); + Solr6Index.releaseSolrClient(solrClient); } @@ -214,7 +216,7 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient { //update the request handler performRequestHandlerAction(collectionName, solrClient, - generatePayLoadForSuggestions(generateSuggestionsString(collectionName, graph.getManagementSystem(), suggestionProperties))); + generatePayLoadForSuggestions(generateSuggestionsString(collectionName, suggestionProperties))); } catch (Throwable t) { String msg = String.format("Error encountered in creating the request handler '%s' for collection '%s'", Constants.TERMS_REQUEST_HANDLER, collectionName); @@ -282,6 +284,24 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient { return Collections.EMPTY_LIST; } + private void graphManagementCommit(AtlasGraphManagement management) { + try { + management.commit(); + } catch (Exception ex) { + LOG.warn("Graph transaction management commit failed; attempting rollback: {}", ex); + + graphManagementRollback(management); + } + } + + private void graphManagementRollback(AtlasGraphManagement management) { + try { + management.rollback(); + } catch (Exception ex) { + LOG.warn("Graph transaction management rollback failed: {}", ex); + } + } + @VisibleForTesting static List<String> getTopTerms(Map<String, TermFreq> termsMap) { final List<String> ret; @@ -353,44 +373,54 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient { } - private String generateSearchWeightString(AtlasGraphManagement management, String indexName, Map<String, Integer> propertyName2SearchWeightMap) { - StringBuilder searchWeightBuilder = new StringBuilder(); - - for (Map.Entry<String, Integer> entry : propertyName2SearchWeightMap.entrySet()) { - AtlasPropertyKey propertyKey = management.getPropertyKey(entry.getKey()); - String indexFieldName = management.getIndexFieldName(indexName, propertyKey); + private String generateSearchWeightString(String indexName, Map<String, Integer> propertyName2SearchWeightMap) { + StringBuilder searchWeightBuilder = new StringBuilder(); + AtlasGraphManagement management = graph.getManagementSystem(); - searchWeightBuilder.append(" ") - .append(indexFieldName) - .append("^") - .append(entry.getValue().intValue()); + try { + for (Map.Entry<String, Integer> entry : propertyName2SearchWeightMap.entrySet()) { + AtlasPropertyKey propertyKey = management.getPropertyKey(entry.getKey()); + String indexFieldName = management.getIndexFieldName(indexName, propertyKey); + + searchWeightBuilder.append(" ") + .append(indexFieldName) + .append("^") + .append(entry.getValue().intValue()); + } + } finally { + graphManagementCommit(management); } return searchWeightBuilder.toString(); } - private String generateSuggestionsString(String collectionName, AtlasGraphManagement management, List<String> suggestionProperties) { - StringBuilder stringBuilder = new StringBuilder(); + private String generateSuggestionsString(String collectionName, List<String> suggestionProperties) { + StringBuilder ret = new StringBuilder(); + AtlasGraphManagement management = graph.getManagementSystem(); - for(String propertyName: suggestionProperties) { - AtlasPropertyKey propertyKey = management.getPropertyKey(propertyName); - String indexFieldName = management.getIndexFieldName(collectionName, propertyKey); + try { + for (String propertyName : suggestionProperties) { + AtlasPropertyKey propertyKey = management.getPropertyKey(propertyName); + String indexFieldName = management.getIndexFieldName(collectionName, propertyKey); - stringBuilder.append("'").append(indexFieldName).append("', "); + ret.append("'").append(indexFieldName).append("', "); + } + } finally { + graphManagementCommit(management); } - return stringBuilder.toString(); + return ret.toString(); } private V2Response updateFreeTextRequestHandler(SolrClient solrClient, String collectionName, Map<String, Integer> propertyName2SearchWeightMap) throws IOException, SolrServerException, AtlasBaseException { - String searchWeightString = generateSearchWeightString(graph.getManagementSystem(), collectionName, propertyName2SearchWeightMap); + String searchWeightString = generateSearchWeightString(collectionName, propertyName2SearchWeightMap); String payLoadString = generatePayLoadForFreeText("update-requesthandler", searchWeightString); return performRequestHandlerAction(collectionName, solrClient, payLoadString); } private V2Response createFreeTextRequestHandler(SolrClient solrClient, String collectionName, Map<String, Integer> propertyName2SearchWeightMap) throws IOException, SolrServerException, AtlasBaseException { - String searchWeightString = generateSearchWeightString(graph.getManagementSystem(), collectionName, propertyName2SearchWeightMap); + String searchWeightString = generateSearchWeightString(collectionName, propertyName2SearchWeightMap); String payLoadString = generatePayLoadForFreeText("create-requesthandler", searchWeightString); return performRequestHandlerAction(collectionName, solrClient, payLoadString); diff --git a/repository/src/main/java/org/apache/atlas/services/MetricsService.java b/repository/src/main/java/org/apache/atlas/services/MetricsService.java index 8fb68e9..62e81e2 100644 --- a/repository/src/main/java/org/apache/atlas/services/MetricsService.java +++ b/repository/src/main/java/org/apache/atlas/services/MetricsService.java @@ -18,6 +18,7 @@ package org.apache.atlas.services; import org.apache.atlas.annotation.AtlasService; +import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.model.instance.AtlasEntity.Status; import org.apache.atlas.model.metrics.AtlasMetrics; import org.apache.atlas.repository.graphdb.AtlasGraph; @@ -74,6 +75,7 @@ public class MetricsService { } @SuppressWarnings("unchecked") + @GraphTransaction public AtlasMetrics getMetrics() { Collection<String> entityDefNames = typeRegistry.getAllEntityDefNames(); Collection<String> classificationDefNames = typeRegistry.getAllClassificationDefNames(); diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java index 0c86aff..f658caa 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java @@ -165,10 +165,15 @@ public class AtlasMetricsUtil { @Override public void run() { graph.query().has(TYPE_NAME_PROPERTY_KEY, TYPE_NAME_INTERNAL).vertices(1); + + graphCommit(); } }, 10, TimeUnit.SECONDS); } catch (Exception e) { LOG.error(e.getMessage()); + + graphRollback(); + return false; } @@ -183,10 +188,15 @@ public class AtlasMetricsUtil { @Override public void run() { graph.indexQuery(Constants.VERTEX_INDEX, query).vertices(0, 1); + + graphCommit(); } }, 10, TimeUnit.SECONDS); } catch (Exception e) { LOG.error(e.getMessage()); + + graphRollback(); + return false; } @@ -228,6 +238,24 @@ public class AtlasMetricsUtil { } } + private void graphCommit() { + try { + graph.commit(); + } catch (Exception ex) { + LOG.warn("Graph transaction commit failed: {}; attempting to rollback graph transaction.", ex); + + graphRollback(); + } + } + + private void graphRollback() { + try { + graph.rollback(); + } catch (Exception ex) { + LOG.warn("Graph transaction rollback failed: {}", ex); + } + } + private String millisToTimeDiff(long msDiff) { StringBuilder sb = new StringBuilder();