Commit 2bd6f469 by Sarath Subramanian

ATLAS-3276: Fix stale transactions in atlas due to ATLAS-3246 (Free-text search)

parent a097bed3
......@@ -136,6 +136,7 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
@Override
public Map<String, List<AtlasAggregationEntry>> getAggregatedMetrics(String queryString, Set<String> propertyKeyNames) {
SolrClient solrClient = null;
AtlasGraphManagement management = graph.getManagementSystem();
try {
solrClient = Solr6Index.getSolrClient(); // get solr client using same settings as that of Janus Graph
......@@ -153,7 +154,6 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
}
SolrQuery solrQuery = new SolrQuery();
AtlasGraphManagement management = graph.getManagementSystem();
Map<String, String> indexFieldName2PropertyKeyNameMap = new HashMap<>();
solrQuery.setQuery(queryString);
......@@ -192,7 +192,9 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
}
} catch (Exception e) {
LOG.error("Error enocunted in getting the aggregation metrics. Will return empty agregation.", e);
}finally {
} finally {
graphManagementCommit(management);
Solr6Index.releaseSolrClient(solrClient);
}
......@@ -214,7 +216,7 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
//update the request handler
performRequestHandlerAction(collectionName, solrClient,
generatePayLoadForSuggestions(generateSuggestionsString(collectionName, graph.getManagementSystem(), suggestionProperties)));
generatePayLoadForSuggestions(generateSuggestionsString(collectionName, suggestionProperties)));
} catch (Throwable t) {
String msg = String.format("Error encountered in creating the request handler '%s' for collection '%s'", Constants.TERMS_REQUEST_HANDLER, collectionName);
......@@ -282,6 +284,24 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
return Collections.EMPTY_LIST;
}
private void graphManagementCommit(AtlasGraphManagement management) {
try {
management.commit();
} catch (Exception ex) {
LOG.warn("Graph transaction management commit failed; attempting rollback: {}", ex);
graphManagementRollback(management);
}
}
private void graphManagementRollback(AtlasGraphManagement management) {
try {
management.rollback();
} catch (Exception ex) {
LOG.warn("Graph transaction management rollback failed: {}", ex);
}
}
@VisibleForTesting
static List<String> getTopTerms(Map<String, TermFreq> termsMap) {
final List<String> ret;
......@@ -353,9 +373,11 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
}
private String generateSearchWeightString(AtlasGraphManagement management, String indexName, Map<String, Integer> propertyName2SearchWeightMap) {
private String generateSearchWeightString(String indexName, Map<String, Integer> propertyName2SearchWeightMap) {
StringBuilder searchWeightBuilder = new StringBuilder();
AtlasGraphManagement management = graph.getManagementSystem();
try {
for (Map.Entry<String, Integer> entry : propertyName2SearchWeightMap.entrySet()) {
AtlasPropertyKey propertyKey = management.getPropertyKey(entry.getKey());
String indexFieldName = management.getIndexFieldName(indexName, propertyKey);
......@@ -365,32 +387,40 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
.append("^")
.append(entry.getValue().intValue());
}
} finally {
graphManagementCommit(management);
}
return searchWeightBuilder.toString();
}
private String generateSuggestionsString(String collectionName, AtlasGraphManagement management, List<String> suggestionProperties) {
StringBuilder stringBuilder = new StringBuilder();
private String generateSuggestionsString(String collectionName, List<String> suggestionProperties) {
StringBuilder ret = new StringBuilder();
AtlasGraphManagement management = graph.getManagementSystem();
for(String propertyName: suggestionProperties) {
try {
for (String propertyName : suggestionProperties) {
AtlasPropertyKey propertyKey = management.getPropertyKey(propertyName);
String indexFieldName = management.getIndexFieldName(collectionName, propertyKey);
stringBuilder.append("'").append(indexFieldName).append("', ");
ret.append("'").append(indexFieldName).append("', ");
}
} finally {
graphManagementCommit(management);
}
return stringBuilder.toString();
return ret.toString();
}
private V2Response updateFreeTextRequestHandler(SolrClient solrClient, String collectionName, Map<String, Integer> propertyName2SearchWeightMap) throws IOException, SolrServerException, AtlasBaseException {
String searchWeightString = generateSearchWeightString(graph.getManagementSystem(), collectionName, propertyName2SearchWeightMap);
String searchWeightString = generateSearchWeightString(collectionName, propertyName2SearchWeightMap);
String payLoadString = generatePayLoadForFreeText("update-requesthandler", searchWeightString);
return performRequestHandlerAction(collectionName, solrClient, payLoadString);
}
private V2Response createFreeTextRequestHandler(SolrClient solrClient, String collectionName, Map<String, Integer> propertyName2SearchWeightMap) throws IOException, SolrServerException, AtlasBaseException {
String searchWeightString = generateSearchWeightString(graph.getManagementSystem(), collectionName, propertyName2SearchWeightMap);
String searchWeightString = generateSearchWeightString(collectionName, propertyName2SearchWeightMap);
String payLoadString = generatePayLoadForFreeText("create-requesthandler", searchWeightString);
return performRequestHandlerAction(collectionName, solrClient, payLoadString);
......
......@@ -18,6 +18,7 @@
package org.apache.atlas.services;
import org.apache.atlas.annotation.AtlasService;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.model.metrics.AtlasMetrics;
import org.apache.atlas.repository.graphdb.AtlasGraph;
......@@ -74,6 +75,7 @@ public class MetricsService {
}
@SuppressWarnings("unchecked")
@GraphTransaction
public AtlasMetrics getMetrics() {
Collection<String> entityDefNames = typeRegistry.getAllEntityDefNames();
Collection<String> classificationDefNames = typeRegistry.getAllClassificationDefNames();
......
......@@ -165,10 +165,15 @@ public class AtlasMetricsUtil {
@Override
public void run() {
graph.query().has(TYPE_NAME_PROPERTY_KEY, TYPE_NAME_INTERNAL).vertices(1);
graphCommit();
}
}, 10, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error(e.getMessage());
graphRollback();
return false;
}
......@@ -183,10 +188,15 @@ public class AtlasMetricsUtil {
@Override
public void run() {
graph.indexQuery(Constants.VERTEX_INDEX, query).vertices(0, 1);
graphCommit();
}
}, 10, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error(e.getMessage());
graphRollback();
return false;
}
......@@ -228,6 +238,24 @@ public class AtlasMetricsUtil {
}
}
private void graphCommit() {
try {
graph.commit();
} catch (Exception ex) {
LOG.warn("Graph transaction commit failed: {}; attempting to rollback graph transaction.", ex);
graphRollback();
}
}
private void graphRollback() {
try {
graph.rollback();
} catch (Exception ex) {
LOG.warn("Graph transaction rollback failed: {}", ex);
}
}
private String millisToTimeDiff(long msDiff) {
StringBuilder sb = new StringBuilder();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment