Commit 13675409 by skoritala Committed by Madhan Neethiraj

ATLAS-3270: added a rest end point to get suggestions for a given search-prefix

parent f2cb641e
......@@ -40,11 +40,15 @@ public final class Constants {
public static final String RELATIONSHIP_GUID_PROPERTY_KEY = encodePropertyKey(RELATIONSHIP_PROPERTY_KEY_PREFIX + GUID_PROPERTY_KEY);
public static final String HISTORICAL_GUID_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "historicalGuids");
public static final String FREETEXT_REQUEST_HANDLER = "/freetext";
public static final String TERMS_REQUEST_HANDLER = "/terms";
/**
* Entity type name property key.
*/
public static final String ENTITY_TYPE_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "typeName");
public static final String TYPE_NAME_INTERNAL = INTERNAL_PROPERTY_KEY_PREFIX + "internal";
public static final String ASSET_OWNER_PROPERTY_KEY = "Asset.owner";
/**
* Entity type's super types property key.
......
......@@ -17,7 +17,11 @@
*/
package org.apache.atlas.repository.graphdb;
import org.apache.atlas.model.discovery.AtlasAggregationEntry;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Represents a graph client work with indices used by Jansgraph.
......@@ -25,10 +29,31 @@ import java.util.Map;
public interface AtlasGraphIndexClient {
/**
* The implementers should apply the search weights for the passed in attributes.
* Gets aggregated metrics for the given query string and aggregation field names.
* @param queryString the query string whose aggregation metrics need to be retrieved.
* @param propertyKeyNames the set of aggregation fields.
* @return A map of aggregation field to value-count pairs.
*/
Map<String, List<AtlasAggregationEntry>> getAggregatedMetrics(String queryString, Set<String> propertyKeyNames);
/**
* Returns top 5 suggestions for the given prefix string.
* @param prefixString the prefix string whose value needs to be retrieved.
* @return top 5 suggestion strings with prefix String
*/
List<String> getSuggestions(String prefixString);
/**
* The implementers should apply the search weights for the passed in properties.
* @param collectionName the name of the collection for which the search weight needs to be applied
* @param attributeName2SearchWeightMap the map containing search weights from attribute name to search weights.
* @param propertyName2SearchWeightMap the map containing search weights from property name to search weights.
*/
void applySearchWeight(String collectionName, Map<String, Integer> attributeName2SearchWeightMap);
void applySearchWeight(String collectionName, Map<String, Integer> propertyName2SearchWeightMap);
/**
* The implementors should take the passed in list of suggestion properties for suggestions functionality.
* @param collectionName the name of the collection to which the suggestions properties should be applied to.
* @param suggestionProperties the list of suggestion properties.
*/
void applySuggestionFields(String collectionName, List<String> suggestionProperties);
}
......@@ -19,17 +19,26 @@ package org.apache.atlas.repository.graphdb.janus;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.AtlasAggregationEntry;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphIndexClient;
import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
import org.apache.atlas.repository.graphdb.AtlasPropertyKey;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.response.V2Response;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.client.solrj.response.FacetField;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.TermsResponse;
import org.apache.solr.common.params.CommonParams;
import org.janusgraph.diskstorage.solr.Solr6Index;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -38,10 +47,14 @@ import java.io.IOException;
import java.util.*;
import static org.apache.atlas.repository.Constants.FREETEXT_REQUEST_HANDLER;
import static org.apache.atlas.repository.Constants.VERTEX_INDEX;
public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
private static final Logger LOG = LoggerFactory.getLogger(AtlasJanusGraphIndexClient.class);
private static final FreqComparator FREQ_COMPARATOR = new FreqComparator();
private static final int DEFAULT_SUGGESTION_COUNT = 5;
private final AtlasGraph graph;
private final Configuration configuration;
......@@ -52,7 +65,7 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
}
@Override
public void applySearchWeight(String collectionName, Map<String, Integer> attributeName2SearchWeightMap) {
public void applySearchWeight(String collectionName, Map<String, Integer> propertyName2SearchWeightMap) {
SolrClient solrClient = null;
try {
......@@ -85,7 +98,8 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
try {
LOG.info("Attempting to update free text request handler {} for collection {}", FREETEXT_REQUEST_HANDLER, collectionName);
updateFreeTextRequestHandler(solrClient, collectionName, attributeName2SearchWeightMap);
updateFreeTextRequestHandler(solrClient, collectionName, propertyName2SearchWeightMap);
LOG.info("Successfully updated free text request handler {} for collection {}..", FREETEXT_REQUEST_HANDLER, collectionName);
return;
......@@ -98,7 +112,7 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
try {
LOG.info("Attempting to create free text request handler {} for collection {}", FREETEXT_REQUEST_HANDLER, collectionName);
createFreeTextRequestHandler(solrClient, collectionName, attributeName2SearchWeightMap);
createFreeTextRequestHandler(solrClient, collectionName, propertyName2SearchWeightMap);
LOG.info("Successfully created free text request handler {} for collection {}", FREETEXT_REQUEST_HANDLER, collectionName);
return;
......@@ -114,74 +128,190 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
throw lastExcp != null ? new RuntimeException(msg, lastExcp) : new RuntimeException(msg);
} finally {
LOG.debug("Releasing the solr client from usage.");
Solr6Index.releaseSolrClient(solrClient);
}
}
private V2Response validateResponseForSuccess(V2Response v2Response) throws AtlasBaseException {
if(v2Response == null) {
String msg = "Received in valid response .";
LOG.error(msg);
throw new AtlasBaseException(msg);
@Override
public Map<String, List<AtlasAggregationEntry>> getAggregatedMetrics(String queryString, Set<String> propertyKeyNames) {
SolrClient solrClient = null;
try {
solrClient = Solr6Index.getSolrClient(); // get solr client using same settings as that of Janus Graph
if (solrClient == null) {
LOG.warn("The indexing system is not solr based. Will return empty Aggregation metrics.");
return Collections.EMPTY_MAP;
}
if(LOG.isDebugEnabled()) {
LOG.debug("V2 Response is {}", v2Response.toString());
if (propertyKeyNames.size() <= 0) {
LOG.warn("There no fields provided for aggregation purpose.");
return Collections.EMPTY_MAP;
}
NamedList<Object> response = v2Response.getResponse();
Object errorMessages = response.get("errorMessages");
if(errorMessages != null) {
LOG.error("Error encountered in performing response handler action.");
List<Object> errorObjects = (List<Object>) errorMessages;
Map<Object, Object> errObject = (Map<Object, Object>) errorObjects.get(0);
List<String> msgs = (List<String>) errObject.get("errorMessages");
StringBuilder sb = new StringBuilder();
for(String msg: msgs) {
sb.append(msg);
SolrQuery solrQuery = new SolrQuery();
AtlasGraphManagement management = graph.getManagementSystem();
Map<String, String> indexFieldName2PropertyKeyNameMap = new HashMap<>();
solrQuery.setQuery(queryString);
solrQuery.setRequestHandler(FREETEXT_REQUEST_HANDLER);
for (String propertyName : propertyKeyNames) {
AtlasPropertyKey propertyKey = management.getPropertyKey(propertyName);
String indexFieldName = management.getIndexFieldName(VERTEX_INDEX, propertyKey);
indexFieldName2PropertyKeyNameMap.put(indexFieldName, propertyName);
solrQuery.addFacetField(indexFieldName);
}
String errors = sb.toString();
String msg = String.format("Error encountered in performing response handler action. %s.", errors);
LOG.error(msg);
throw new AtlasBaseException(msg);
} else {
LOG.debug("Successfully performed response handler action. V2 Response is {}", v2Response.toString());
QueryResponse queryResponse = solrClient.query(VERTEX_INDEX, solrQuery);
List<FacetField> facetFields = queryResponse == null ? null : queryResponse.getFacetFields();
if (CollectionUtils.isNotEmpty(facetFields)) {
Map<String, List<AtlasAggregationEntry>> ret = new HashMap<>();
for (FacetField facetField : facetFields) {
String indexFieldName = facetField.getName();
List<AtlasAggregationEntry> entries = new ArrayList<>(facetField.getValueCount());
List<FacetField.Count> values = facetField.getValues();
for (FacetField.Count count : values) {
entries.add(new AtlasAggregationEntry(count.getName(), count.getCount()));
}
return v2Response;
String propertyKeyName = indexFieldName2PropertyKeyNameMap.get(indexFieldName);
ret.put(propertyKeyName, entries);
}
private V2Response updateFreeTextRequestHandler(SolrClient solrClient, String collectionName, Map<String, Integer> attributeName2SearchWeightMap) throws IOException, SolrServerException, AtlasBaseException {
String searchWeightString = generateSearchWeightString(graph.getManagementSystem(), collectionName, attributeName2SearchWeightMap);
String payLoadString = generatePayLoadForFreeText("update-requesthandler", FREETEXT_REQUEST_HANDLER, searchWeightString);
return ret;
}
} catch (Exception e) {
LOG.error("Error enocunted in getting the aggregation metrics. Will return empty agregation.", e);
}finally {
Solr6Index.releaseSolrClient(solrClient);
}
return performRequestHandlerAction(collectionName, solrClient, payLoadString);
return Collections.EMPTY_MAP;
}
@Override
public void applySuggestionFields(String collectionName, List<String> suggestionProperties) {
SolrClient solrClient = null;
try {
solrClient = Solr6Index.getSolrClient(); // get solr client using same settings as that of Janus Graph
if (solrClient == null) {
LOG.warn("The indexing system is not solr based. Suggestions feature will not be available.");
return;
}
private V2Response createFreeTextRequestHandler(SolrClient solrClient, String collectionName, Map<String, Integer> attributeName2SearchWeightMap) throws IOException, SolrServerException, AtlasBaseException {
String searchWeightString = generateSearchWeightString(graph.getManagementSystem(), collectionName, attributeName2SearchWeightMap);
String payLoadString = generatePayLoadForFreeText("create-requesthandler", FREETEXT_REQUEST_HANDLER, searchWeightString);
//update the request handler
performRequestHandlerAction(collectionName, solrClient,
generatePayLoadForSuggestions(generateSuggestionsString(collectionName, graph.getManagementSystem(), suggestionProperties)));
} catch (Throwable t) {
String msg = String.format("Error encountered in creating the request handler '%s' for collection '%s'", Constants.TERMS_REQUEST_HANDLER, collectionName);
return performRequestHandlerAction(collectionName, solrClient, payLoadString);
LOG.error(msg, t);
} finally {
Solr6Index.releaseSolrClient(solrClient);
}
private String generateSearchWeightString(AtlasGraphManagement management, String indexName, Map<String, Integer> searchWeightsMap) {
StringBuilder searchWeightBuilder = new StringBuilder();
Set<Map.Entry<String, Integer>> searchWeightFields = searchWeightsMap.entrySet();
LOG.info("Applied suggestion fields request handler for collection {}.", collectionName);
}
for (Map.Entry<String, Integer> entry : searchWeightFields) {
AtlasPropertyKey propertyKey = management.getPropertyKey(entry.getKey());
String indexFieldName = management.getIndexFieldName(indexName, propertyKey);
@Override
public List<String> getSuggestions(String prefixString) {
SolrClient solrClient = null;
searchWeightBuilder.append(" ")
.append(indexFieldName)
.append("^")
.append(entry.getValue().intValue());
try {
solrClient = Solr6Index.getSolrClient(); // get solr client using same settings as that of Janus Graph
if (solrClient == null) {
LOG.warn("The indexing system is not solr based. Suggestions feature will not be available.");
return Collections.EMPTY_LIST;
}
return searchWeightBuilder.toString();
SolrQuery solrQuery = new SolrQuery();
solrQuery.setRequestHandler(Constants.TERMS_REQUEST_HANDLER)
.setParam("terms.prefix", prefixString)
.setParam(CommonParams.OMIT_HEADER, true);
QueryResponse queryResponse = solrClient.query(VERTEX_INDEX, solrQuery);
TermsResponse termsResponse = queryResponse == null? null: queryResponse.getTermsResponse();
if(termsResponse == null) {
LOG.info("Received null for terms response. Will return no suggestions.");
return Collections.EMPTY_LIST;
}
Map<String, TermFreq> termsMap = new HashMap<>();
for (List<TermsResponse.Term> fieldTerms : termsResponse.getTermMap().values()) {
for (TermsResponse.Term fieldTerm : fieldTerms) {
TermFreq term = termsMap.get(fieldTerm.getTerm());
if (term == null) {
term = new TermFreq(fieldTerm.getTerm(), fieldTerm.getFrequency());
termsMap.put(term.getTerm(), term);
} else {
term.addFreq(fieldTerm.getFrequency());
}
}
}
return getTopTerms(termsMap);
} catch (SolrServerException | IOException e) {
String msg = String.format("Error encountered in generating the suggestions. Ignoring the error", e);
LOG.error(msg);
} finally {
Solr6Index.releaseSolrClient(solrClient);
}
return Collections.EMPTY_LIST;
}
@VisibleForTesting
static List<String> getTopTerms(Map<String, TermFreq> termsMap) {
final List<String> ret;
if (MapUtils.isNotEmpty(termsMap)) {
// Collect top high frequency terms.
PriorityQueue<TermFreq> termsQueue = new PriorityQueue(termsMap.size(), FREQ_COMPARATOR);
for (TermFreq term : termsMap.values()) {
termsQueue.add(term);
}
ret = new ArrayList<>(DEFAULT_SUGGESTION_COUNT);
while (!termsQueue.isEmpty()) {
ret.add(termsQueue.poll().getTerm());
if (ret.size() >= DEFAULT_SUGGESTION_COUNT) {
break;
}
}
} else {
ret = Collections.EMPTY_LIST;
}
return ret;
}
@VisibleForTesting
static String generatePayLoadForFreeText(String action, String handlerName, String qfValue) {
static String generatePayLoadForFreeText(String action, String qfValue) {
return String.format("{" +
" %s : { " +
" 'name' : '%s', " +
......@@ -196,15 +326,157 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
" 'lowercaseOperators': true , " +
" }" +
" }" +
"}", action, handlerName, qfValue);
"}", action, FREETEXT_REQUEST_HANDLER, qfValue);
}
@VisibleForTesting
String generatePayLoadForSuggestions(String suggestionFieldsString) {
return String.format("{\n" +
" update-requesthandler : { \n" +
" 'name' : '%s', \n" +
" 'class': 'solr.SearchHandler' , \n" +
" 'startup': 'lazy' ,\n" +
" 'defaults': " + "{ \n" +
" 'terms': true , \n" +
" 'distrib': false , \n" +
" 'terms.limit': 5 , \n" +
" 'terms.fl' : \n" +
" [\n" +
" %s \n" +
" ] \n" +
" }\n" +
" 'components': " + "[ \n" +
" 'terms' \n" +
" ] \n" +
" } \n" +
"}", Constants.TERMS_REQUEST_HANDLER, suggestionFieldsString);
}
private V2Response performRequestHandlerAction(String collectionName, SolrClient solrClient,
String actionPayLoad) throws IOException, SolrServerException, AtlasBaseException {
private String generateSearchWeightString(AtlasGraphManagement management, String indexName, Map<String, Integer> propertyName2SearchWeightMap) {
StringBuilder searchWeightBuilder = new StringBuilder();
for (Map.Entry<String, Integer> entry : propertyName2SearchWeightMap.entrySet()) {
AtlasPropertyKey propertyKey = management.getPropertyKey(entry.getKey());
String indexFieldName = management.getIndexFieldName(indexName, propertyKey);
searchWeightBuilder.append(" ")
.append(indexFieldName)
.append("^")
.append(entry.getValue().intValue());
}
return searchWeightBuilder.toString();
}
private String generateSuggestionsString(String collectionName, AtlasGraphManagement management, List<String> suggestionProperties) {
StringBuilder stringBuilder = new StringBuilder();
for(String propertyName: suggestionProperties) {
AtlasPropertyKey propertyKey = management.getPropertyKey(propertyName);
String indexFieldName = management.getIndexFieldName(collectionName, propertyKey);
stringBuilder.append("'").append(indexFieldName).append("', ");
}
return stringBuilder.toString();
}
private V2Response updateFreeTextRequestHandler(SolrClient solrClient, String collectionName, Map<String, Integer> propertyName2SearchWeightMap) throws IOException, SolrServerException, AtlasBaseException {
String searchWeightString = generateSearchWeightString(graph.getManagementSystem(), collectionName, propertyName2SearchWeightMap);
String payLoadString = generatePayLoadForFreeText("update-requesthandler", searchWeightString);
return performRequestHandlerAction(collectionName, solrClient, payLoadString);
}
private V2Response createFreeTextRequestHandler(SolrClient solrClient, String collectionName, Map<String, Integer> propertyName2SearchWeightMap) throws IOException, SolrServerException, AtlasBaseException {
String searchWeightString = generateSearchWeightString(graph.getManagementSystem(), collectionName, propertyName2SearchWeightMap);
String payLoadString = generatePayLoadForFreeText("create-requesthandler", searchWeightString);
return performRequestHandlerAction(collectionName, solrClient, payLoadString);
}
private V2Response performRequestHandlerAction(String collectionName, SolrClient solrClient, String actionPayLoad)
throws IOException, SolrServerException, AtlasBaseException {
V2Request v2Request = new V2Request.Builder(String.format("/collections/%s/config", collectionName))
.withMethod(SolrRequest.METHOD.POST)
.withPayload(actionPayLoad)
.build();
return validateResponseForSuccess(v2Request.process(solrClient));
}
private V2Response validateResponseForSuccess(V2Response v2Response) throws AtlasBaseException {
if(v2Response == null) {
String msg = "Received null response .";
LOG.error(msg);
throw new AtlasBaseException(msg);
}
if (LOG.isDebugEnabled()) {
LOG.debug("V2 Response is {}", v2Response.toString());
}
NamedList<Object> response = v2Response.getResponse();
if(response != null) {
Object errorMessages = response.get("errorMessages");
if(errorMessages != null) {
LOG.error("Error encountered in performing request handler create/update");
List<Object> errorObjects = (List<Object>) errorMessages;
Map<Object, Object> errObject = (Map<Object, Object>) errorObjects.get(0);
List<String> msgs = (List<String>) errObject.get("errorMessages");
StringBuilder sb = new StringBuilder();
for(String msg: msgs) {
sb.append(msg);
}
String errors = sb.toString();
String msg = String.format("Error encountered in performing response handler action. %s.", errors);
LOG.error(msg);
throw new AtlasBaseException(msg);
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("Successfully performed response handler action. V2 Response is {}", v2Response.toString());
}
}
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("Did not receive any response from SOLR.");
}
}
return v2Response;
}
static final class TermFreq {
private final String term;
private long freq;
public TermFreq(String term, long freq) {
this.term = term;
this.freq = freq;
}
public final String getTerm() { return term; }
public final long getFreq() { return freq; }
public final void addFreq(long val) { freq += val; }
}
static class FreqComparator implements Comparator<TermFreq> {
@Override
public int compare(TermFreq lhs, TermFreq rhs) {
return Long.compare(rhs.getFreq(), lhs.getFreq());
}
}
}
......@@ -235,16 +235,21 @@ public class Solr6Index implements IndexProvider {
if (solrClient != null) {
try {
solrClient.close();
if(logger.isDebugEnabled()) {
logger.debug("Closed the solr client successfully.");
}
} catch (IOException excp) {
logger.warn("Failed to close SolrClient", excp);
logger.warn("Failed to close SolrClient.", excp);
}
}
logger.debug("Closed the solr client successfully.");
} else {
if(logger.isDebugEnabled()) {
logger.debug("Ignoring the closing of solr client as it is owned by Solr6Index.");
return;
}
}
}
private SolrClient createSolrClient() {
final ModifiableSolrParams clientParams = new ModifiableSolrParams();
SolrClient solrClient = null;
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class AtlasJanusGraphIndexClientTest {
@Test
public void testGetTop5TermsAsendingInput() {
Map<String, AtlasJanusGraphIndexClient.TermFreq> terms = generateTerms( 10, 12, 15);
List<String> top5Terms = AtlasJanusGraphIndexClient.getTopTerms(terms);
validateOrder(top5Terms, 2,1,0);
}
@Test
public void testGetTop5TermsAsendingInput2() {
Map<String, AtlasJanusGraphIndexClient.TermFreq> terms = generateTerms( 10, 12, 15, 20, 25, 26, 30, 40);
List<String> top5Terms = AtlasJanusGraphIndexClient.getTopTerms(terms);
validateOrder(top5Terms, 7, 6, 5, 4, 3);
}
@Test
public void testGetTop5TermsDescendingInput() {
Map<String, AtlasJanusGraphIndexClient.TermFreq> terms = generateTerms( 10, 9, 8);
List<String> top5Terms = AtlasJanusGraphIndexClient.getTopTerms(terms);
validateOrder(top5Terms, 0, 1, 2);
}
@Test
public void testGetTop5TermsDescendingInput2() {
Map<String, AtlasJanusGraphIndexClient.TermFreq> terms = generateTerms( 10, 9, 8, 7, 6, 5, 4, 3, 2);
List<String> top5Terms = AtlasJanusGraphIndexClient.getTopTerms(terms);
validateOrder(top5Terms, 0, 1, 2, 3, 4);
}
@Test
public void testGetTop5TermsRandom() {
Map<String, AtlasJanusGraphIndexClient.TermFreq> terms = generateTerms( 10, 19, 28, 27, 16, 1, 30, 3, 36);
List<String> top5Terms = AtlasJanusGraphIndexClient.getTopTerms(terms);
//10, 19, 28, 27, 16, 1, 30, 3, 36
//0, 1, 2, 3, 4, 5, 6, 7, 8
validateOrder(top5Terms, 8, 6, 2, 3, 1);
}
@Test
public void testGetTop5TermsRandom2() {
Map<String, AtlasJanusGraphIndexClient.TermFreq> terms = generateTerms( 36, 19, 28, 27, 16, 1, 30, 3, 10);
List<String> top5Terms = AtlasJanusGraphIndexClient.getTopTerms(terms);
//36, 19, 28, 27, 16, 1, 30, 3, 10
//0, 1, 2, 3, 4, 5, 6, 7, 8
validateOrder(top5Terms, 0, 6, 2, 3, 1);
}
@Test
public void testGetTop5TermsRandom3() {
Map<String, AtlasJanusGraphIndexClient.TermFreq> terms = generateTerms( 36, 36, 28, 27, 16, 1, 30, 3, 10);
List<String> top5Terms = AtlasJanusGraphIndexClient.getTopTerms(terms);
//36, 36, 28, 27, 16, 1, 30, 3, 10
//0, 1, 2, 3, 4, 5, 6, 7, 8
validateOrder(top5Terms, 0, 1, 6, 2, 3);
}
@Test
public void testGetTop5TermsRandom4() {
Map<String, AtlasJanusGraphIndexClient.TermFreq> terms = generateTerms( 10, 10, 28, 27, 16, 1, 30, 36, 36);
List<String> top5Terms = AtlasJanusGraphIndexClient.getTopTerms(terms);
//10, 10, 28, 27, 16, 1, 30, 36, 36
//0, 1, 2, 3, 4, 5, 6, 7, 8
validateOrder(top5Terms, 7, 8, 6, 2, 3);
}
@Test
public void testGetTop5TermsRandom5() {
Map<String, AtlasJanusGraphIndexClient.TermFreq> terms = generateTerms( 36, 10, 28, 27, 16, 1, 30, 36, 36);
List<String> top5Terms = AtlasJanusGraphIndexClient.getTopTerms(terms);
//36, 10, 28, 27, 16, 1, 30, 36, 36
//0, 1, 2, 3, 4, 5, 6, 7, 8
validateOrder(top5Terms, 0, 7, 8, 6, 2);
}
private void validateOrder(List<String> topTerms, int ... indices) {
Assert.assertEquals(topTerms.size(), indices.length);
int i = 0;
for(String term: topTerms) {
Assert.assertEquals(Integer.toString(indices[i++]), term);
}
Assert.assertEquals(topTerms.size(), indices.length);
}
private Map<String, AtlasJanusGraphIndexClient.TermFreq> generateTerms(int ... termFreqs) {
int i =0;
Map<String, AtlasJanusGraphIndexClient.TermFreq> terms = new HashMap<>();
for(int count: termFreqs) {
AtlasJanusGraphIndexClient.TermFreq termFreq1 = new AtlasJanusGraphIndexClient.TermFreq(Integer.toString(i++), count);
terms.put(termFreq1.getTerm(), termFreq1);
}
return terms;
}
}
\ No newline at end of file
......@@ -51,6 +51,7 @@ public final class ApplicationProperties extends PropertiesConfiguration {
public static final String SOLR_WAIT_SEARCHER_CONF = "atlas.graph.index.search.solr.wait-searcher";
public static final String ENABLE_FULLTEXT_SEARCH_CONF = "atlas.search.fulltext.enable";
public static final String ENABLE_FREETEXT_SEARCH_CONF = "atlas.search.freetext.enable";
public static final String ATLAS_RUN_MODE = "atlas.run.mode";
public static final String GRAPHBD_BACKEND_JANUS = "janus";
public static final String STORAGE_BACKEND_HBASE = "hbase";
public static final String STORAGE_BACKEND_HBASE2 = "hbase2";
......@@ -58,6 +59,7 @@ public final class ApplicationProperties extends PropertiesConfiguration {
public static final String DEFAULT_GRAPHDB_BACKEND = GRAPHBD_BACKEND_JANUS;
public static final boolean DEFAULT_SOLR_WAIT_SEARCHER = true;
public static final boolean DEFAULT_INDEX_MAP_NAME = false;
public static final AtlasRunMode DEFAULT_ATLAS_RUN_MODE = AtlasRunMode.PROD;
public static final SimpleEntry<String, String> DB_CACHE_CONF = new SimpleEntry<>("atlas.graph.cache.db-cache", "true");
public static final SimpleEntry<String, String> DB_CACHE_CLEAN_WAIT_CONF = new SimpleEntry<>("atlas.graph.cache.db-cache-clean-wait", "20");
......@@ -67,6 +69,11 @@ public final class ApplicationProperties extends PropertiesConfiguration {
private static volatile Configuration instance = null;
public enum AtlasRunMode {
PROD,
DEV
}
private ApplicationProperties(URL url) throws ConfigurationException {
super(url);
}
......@@ -263,6 +270,8 @@ public final class ApplicationProperties extends PropertiesConfiguration {
}
private void setDefaults() {
AtlasRunMode runMode = AtlasRunMode.valueOf(getString(ATLAS_RUN_MODE, DEFAULT_ATLAS_RUN_MODE.name()));
// setting value for 'atlas.graphdb.backend' (default = 'janus')
String graphDbBackend = getString(GRAPHDB_BACKEND_CONF);
......@@ -298,6 +307,10 @@ public final class ApplicationProperties extends PropertiesConfiguration {
// set the following if indexing backend is 'solr'
if (indexBackend.equalsIgnoreCase(INDEX_BACKEND_SOLR)) {
LOG.info("Atlas is running in MODE: {}.", runMode.name());
if(runMode == AtlasRunMode.PROD) {
//we do not want these configurations to be over written in Production mode.
clearPropertyDirect(SOLR_WAIT_SEARCHER_CONF);
addPropertyDirect(SOLR_WAIT_SEARCHER_CONF, DEFAULT_SOLR_WAIT_SEARCHER);
LOG.info("Setting solr-wait-searcher property '" + DEFAULT_SOLR_WAIT_SEARCHER + "'");
......@@ -306,6 +319,7 @@ public final class ApplicationProperties extends PropertiesConfiguration {
addPropertyDirect(INDEX_MAP_NAME_CONF, DEFAULT_INDEX_MAP_NAME);
LOG.info("Setting index.search.map-name property '" + DEFAULT_INDEX_MAP_NAME + "'");
}
}
setDbCacheConfDefaults();
}
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.model.discovery;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
/**
* An instance of an entity - like hive_table, hive_database.
*/
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY)
public class AtlasAggregationEntry {
private String name;
private long count;
public AtlasAggregationEntry() {
}
public AtlasAggregationEntry(String value, long count) {
this.name = value;
this.count = count;
}
public String getName() {
return name;
}
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AtlasAggregationEntry that = (AtlasAggregationEntry) o;
return name.equals(that.name);
}
@Override
public int hashCode() {
return name.hashCode();
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.model.discovery;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.util.*;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class AtlasQuickSearchResult {
private AtlasSearchResult searchResults;
private Map<String, List<AtlasAggregationEntry>> aggregationMetrics;
public AtlasQuickSearchResult() {
}
public AtlasQuickSearchResult(AtlasSearchResult searchResults, Map<String, List<AtlasAggregationEntry>> aggregationMetrics) {
this.searchResults = searchResults;
this.aggregationMetrics = aggregationMetrics;
}
public Map<String, List<AtlasAggregationEntry>> getAggregationMetrics() {
return aggregationMetrics;
}
public void setAggregationMetrics(Map<String, List<AtlasAggregationEntry>> aggregationMetrics) {
this.aggregationMetrics = aggregationMetrics;
}
public AtlasSearchResult getSearchResults() {
return searchResults;
}
public void setSearchResults(AtlasSearchResult searchResults) {
this.searchResults = searchResults;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.model.discovery;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.util.List;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class AtlasSuggestionsResult {
private List<String> suggestions;
private String prefixString;
public AtlasSuggestionsResult(String prefixString) {
this.prefixString = prefixString;
}
public List<String> getSuggestions() {
return suggestions;
}
public void setSuggestions(List<String> suggestions) {
this.suggestions = suggestions;
}
public String getPrefixString() {
return prefixString;
}
public void setPrefixString(String prefixString) {
this.prefixString = prefixString;
}
}
......@@ -21,7 +21,9 @@ package org.apache.atlas.discovery;
import org.apache.atlas.SortOrder;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.AtlasQuickSearchResult;
import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.discovery.AtlasSuggestionsResult;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.profile.AtlasUserSavedSearch;
......@@ -139,4 +141,13 @@ public interface AtlasDiscoveryService {
* @throws AtlasBaseException
*/
void deleteSavedSearch(String currentUser, String guid) throws AtlasBaseException;
AtlasQuickSearchResult quickSearchWithParameters(SearchParameters searchParameters) throws AtlasBaseException;
/**
* Should return top 5 suggestion strings for the given prefix.
* @param prefixString the prefix string
* @return top 5 suggestion strings for the given prefix.
*/
AtlasSuggestionsResult getSuggestions(String prefixString);
}
......@@ -26,11 +26,14 @@ import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.authorize.AtlasSearchResultScrubRequest;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.AtlasQuickSearchResult;
import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.discovery.AtlasSearchResult.AtlasFullTextResult;
import org.apache.atlas.model.discovery.AtlasSearchResult.AtlasQueryType;
import org.apache.atlas.model.discovery.AtlasSearchResult.AttributeSearchResult;
import org.apache.atlas.model.discovery.AtlasSuggestionsResult;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.discovery.AtlasAggregationEntry;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.profile.AtlasUserSavedSearch;
......@@ -51,7 +54,6 @@ import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasMapType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
......@@ -76,9 +78,6 @@ import static org.apache.atlas.AtlasErrorCode.DISCOVERY_QUERY_FAILED;
import static org.apache.atlas.AtlasErrorCode.UNKNOWN_TYPENAME;
import static org.apache.atlas.SortOrder.ASCENDING;
import static org.apache.atlas.SortOrder.DESCENDING;
import static org.apache.atlas.model.TypeCategory.ARRAY;
import static org.apache.atlas.model.TypeCategory.MAP;
import static org.apache.atlas.model.TypeCategory.OBJECT_ID_TYPE;
import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.*;
......@@ -99,6 +98,7 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
private final int maxTagsLengthInIdxQuery;
private final String indexSearchPrefix;
private final UserProfileService userProfileService;
private final SuggestionsProvider suggestionsProvider;
@Inject
EntityDiscoveryService(AtlasTypeRegistry typeRegistry,
......@@ -115,6 +115,7 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
this.maxTagsLengthInIdxQuery = ApplicationProperties.get().getInt(Constants.INDEX_SEARCH_TAGS_MAX_QUERY_STR_LENGTH, 512);
this.indexSearchPrefix = AtlasGraphUtilsV2.getIndexSearchPrefix();
this.userProfileService = userProfileService;
this.suggestionsProvider = new SuggestionsProviderImpl(graph);
}
@Override
......@@ -420,17 +421,49 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
@Override
@GraphTransaction
public AtlasQuickSearchResult quickSearchWithParameters(SearchParameters searchParameters) throws AtlasBaseException {
SearchContext searchContext = new SearchContext(searchParameters, typeRegistry, graph, indexer.getVertexIndexKeys());
if(LOG.isDebugEnabled()) {
LOG.debug("Generating the search results for the query {} .", searchContext.getSearchParameters().getQuery());
}
AtlasSearchResult searchResult = searchWithSearchContext(searchContext);
if(LOG.isDebugEnabled()) {
LOG.debug("Generating the aggregated metrics for the query {} .", searchContext.getSearchParameters().getQuery());
}
SearchAggregator searchAggregator = new SearchAggregatorImpl(searchContext);
Map<String, List<AtlasAggregationEntry>> aggregatedMetrics = searchAggregator.getAggregatedMetrics();
AtlasQuickSearchResult ret = new AtlasQuickSearchResult(searchResult, aggregatedMetrics);
return ret;
}
@Override
@GraphTransaction
public AtlasSuggestionsResult getSuggestions(String prefixString) {
return suggestionsProvider.getSuggestions(prefixString);
}
@Override
@GraphTransaction
public AtlasSearchResult searchWithParameters(SearchParameters searchParameters) throws AtlasBaseException {
return searchWithSearchContext(new SearchContext(searchParameters, typeRegistry, graph, indexer.getVertexIndexKeys()));
}
private AtlasSearchResult searchWithSearchContext(SearchContext searchContext) throws AtlasBaseException {
SearchParameters searchParameters = searchContext.getSearchParameters();
AtlasSearchResult ret = new AtlasSearchResult(searchParameters);
final QueryParams params = QueryParams.getNormalizedParams(searchParameters.getLimit(),searchParameters.getOffset());
String searchID = searchTracker.add(searchContext); // For future cancellations
searchParameters.setLimit(params.limit());
searchParameters.setOffset(params.offset());
SearchContext context = new SearchContext(searchParameters, typeRegistry, graph, indexer.getVertexIndexKeys());
String searchID = searchTracker.add(context); // For future cancellations
try {
List<AtlasVertex> resultList = context.getSearchProcessor().execute();
List<AtlasVertex> resultList = searchContext.getSearchProcessor().execute();
// By default any attribute that shows up in the search parameter should be sent back in the response
// If additional values are requested then the entityAttributes will be a superset of the all search attributes
......@@ -442,11 +475,11 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
resultAttributes.addAll(searchParameters.getAttributes());
}
if (CollectionUtils.isNotEmpty(context.getEntityAttributes())) {
resultAttributes.addAll(context.getEntityAttributes());
if (CollectionUtils.isNotEmpty(searchContext.getEntityAttributes())) {
resultAttributes.addAll(searchContext.getEntityAttributes());
}
AtlasEntityType entityType = context.getEntityType();
AtlasEntityType entityType = searchContext.getEntityType();
if (entityType != null) {
for (String resultAttribute : resultAttributes) {
AtlasAttribute attribute = entityType.getAttribute(resultAttribute);
......
......@@ -25,6 +25,7 @@ import org.apache.atlas.repository.graphdb.*;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.common.params.CommonParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -38,7 +39,7 @@ import java.util.*;
public class FreeTextSearchProcessor extends SearchProcessor {
private static final Logger LOG = LoggerFactory.getLogger(FreeTextSearchProcessor.class);
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("FreeTextSearchProcessor");
public static final String SOLR_QT_PARAMETER = "qt";
public static final String SOLR_QT_PARAMETER = CommonParams.QT;
public static final String SOLR_REQUEST_HANDLER_NAME = "/freetext";
private final AtlasIndexQuery indexQuery;
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.discovery;
import org.apache.atlas.model.discovery.AtlasAggregationEntry;
import java.util.List;
import java.util.Map;
public interface SearchAggregator {
Map<String, List<AtlasAggregationEntry>> getAggregatedMetrics();
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.discovery;
import org.apache.atlas.AtlasException;
import org.apache.atlas.model.discovery.AtlasAggregationEntry;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class SearchAggregatorImpl implements SearchAggregator {
private static final Logger LOG = LoggerFactory.getLogger(SearchAggregatorImpl.class);
private final SearchContext searchContext;
public SearchAggregatorImpl(SearchContext searchContext) {
this.searchContext = searchContext;
}
public Map<String, List<AtlasAggregationEntry>> getAggregatedMetrics() {
String queryString = searchContext.getSearchParameters().getQuery();
AtlasGraph atlasGraph = searchContext.getGraph();
Set<String> aggregationFields = new HashSet<>();
List<PostProcessor> postProcessors = new ArrayList<>();
aggregationFields.add(Constants.ENTITY_TYPE_PROPERTY_KEY);
aggregationFields.add(Constants.ASSET_OWNER_PROPERTY_KEY);
postProcessors.add(new ServiceTypeAggregator(searchContext.getTypeRegistry()));
try {
Map<String, List<AtlasAggregationEntry>> aggregatedMetrics = atlasGraph.getGraphIndexClient().getAggregatedMetrics(queryString, aggregationFields);
Set<String> aggregationMetricNames = aggregatedMetrics.keySet();
for(String aggregationMetricName: aggregationMetricNames) {
for(PostProcessor postProcessor: postProcessors) {
if(postProcessor.needsProcessing(aggregationMetricName)) {
postProcessor.prepareForMetric(aggregationMetricName);
for(AtlasAggregationEntry aggregationEntry: aggregatedMetrics.get(aggregationMetricName)) {
postProcessor.process(aggregationEntry);
}
postProcessor.handleMetricCompletion(aggregationMetricName);
}
}
}
for(PostProcessor postProcessor: postProcessors) {
postProcessor.handleCompletion(aggregatedMetrics);
}
// remove entries with 0 counts
for (List<AtlasAggregationEntry> entries : aggregatedMetrics.values()) {
for (ListIterator<AtlasAggregationEntry> iter = entries.listIterator(); iter.hasNext(); ) {
AtlasAggregationEntry entry = iter.next();
if (entry.getCount() <= 0) {
iter.remove();
}
}
}
return aggregatedMetrics;
} catch (AtlasException e) {
LOG.error("Error encountered in post processing stage of aggrgation metrics collection. Empty metrics will be returned.", e);
return Collections.EMPTY_MAP;
}
}
static interface PostProcessor {
boolean needsProcessing(String metricName);
void prepareForMetric(String metricName);
void process(AtlasAggregationEntry aggregationEntry);
void handleMetricCompletion(String metricName);
void handleCompletion(Map<String, List<AtlasAggregationEntry>> aggregatedMetrics);
}
static class ServiceTypeAggregator implements PostProcessor {
private static final String SERVICE_TYPE = "ServiceType";
private final AtlasTypeRegistry typeRegistry;
private List<AtlasAggregationEntry> entries;
private Map<String, AtlasAggregationEntry> entityType2MetricsMap;
public ServiceTypeAggregator(AtlasTypeRegistry typeRegistry) {
this.typeRegistry = typeRegistry;
}
@Override
public boolean needsProcessing(String metricName) {
return Constants.ENTITY_TYPE_PROPERTY_KEY.equals(metricName);
}
@Override
public void prepareForMetric(String metricName) {
Map<String, AtlasAggregationEntry> serviceName2MetricsMap = new HashMap<>();
entries = new ArrayList<>();
//prepare the service map to aggregations
for(String serviceName: typeRegistry.getAllServiceTypes()) {
AtlasAggregationEntry serviceMetrics = new AtlasAggregationEntry(serviceName, 0);
serviceName2MetricsMap.put(serviceName, serviceMetrics);
entries.add(serviceMetrics);
}
//prepare the map from entity type to aggregations
entityType2MetricsMap = new HashMap<>();
for(AtlasEntityType entityType: typeRegistry.getAllEntityTypes()) {
String serviceName = entityType.getServiceType();
entityType2MetricsMap.put(entityType.getTypeName(), serviceName2MetricsMap.get(serviceName));
}
}
@Override
public void process(AtlasAggregationEntry aggregationEntryForType) {
String entityType = aggregationEntryForType.getName();
AtlasAggregationEntry atlasAggregationEntryForService = entityType2MetricsMap.get(entityType);
//atlasAggregationEntryForService can be null--classifications for e.g.
if (atlasAggregationEntryForService != null) {
atlasAggregationEntryForService.setCount(atlasAggregationEntryForService.getCount() + aggregationEntryForType.getCount());
}
}
@Override
public void handleMetricCompletion(String metricName) {
//do nothing
}
@Override
public void handleCompletion(Map<String, List<AtlasAggregationEntry>> aggregatedMetrics) {
aggregatedMetrics.put(SERVICE_TYPE, entries);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.discovery;
import org.apache.atlas.model.discovery.AtlasSuggestionsResult;
public interface SuggestionsProvider {
AtlasSuggestionsResult getSuggestions(String prefixString);
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.discovery;
import org.apache.atlas.AtlasException;
import org.apache.atlas.model.discovery.AtlasSuggestionsResult;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphIndexClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
public class SuggestionsProviderImpl implements SuggestionsProvider {
private static final Logger LOG = LoggerFactory.getLogger(SuggestionsProviderImpl.class);
private final AtlasGraph graph;
public SuggestionsProviderImpl(AtlasGraph graph) {
this.graph = graph;
}
@Override
public AtlasSuggestionsResult getSuggestions(String prefixString) {
AtlasSuggestionsResult result = new AtlasSuggestionsResult(prefixString);
try {
AtlasGraphIndexClient graphIndexClient = graph.getGraphIndexClient();
result.setSuggestions(graphIndexClient.getSuggestions(prefixString));
} catch (AtlasException e) {
LOG.error("Error encountered in performing quick suggestions. Will return no suggestions.", e);
result.setSuggestions(Collections.EMPTY_LIST);
}
return result;
}
}
......@@ -30,9 +30,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.DEFAULT_SEARCHWEIGHT;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_TEXT_KEY;
......@@ -50,6 +48,8 @@ public class SolrIndexHelper implements IndexChangeListener {
public static final int SEARCHWEIGHT_FOR_CLASSIFICATIONS = 10;
public static final int SEARCHWEIGHT_FOR_TYPENAME = 1;
private static final int MIN_SEARCH_WEIGHT_FOR_SUGGESTIONS = 8;
private final AtlasTypeRegistry typeRegistry;
......@@ -67,40 +67,60 @@ public class SolrIndexHelper implements IndexChangeListener {
try {
AtlasGraph atlasGraph = AtlasGraphProvider.getGraphInstance();
AtlasGraphIndexClient atlasGraphIndexClient = atlasGraph.getGraphIndexClient();
Map<String, Integer> attributeName2SearchWeightMap = getAttributesWithSearchWeights();
Map<String, Integer> propertyName2SearchWeightMap = gePropertiesWithSearchWeights();
atlasGraphIndexClient.applySearchWeight(Constants.VERTEX_INDEX, propertyName2SearchWeightMap);
atlasGraphIndexClient.applySuggestionFields(Constants.VERTEX_INDEX, getPropertiesForSuggestions(propertyName2SearchWeightMap));
atlasGraphIndexClient.applySearchWeight(Constants.VERTEX_INDEX, attributeName2SearchWeightMap);
} catch (AtlasException e) {
LOG.error("Error encountered in handling type system change notification.", e);
throw new RuntimeException("Error encountered in handling type system change notification.", e);
}
}
private Map<String, Integer> getAttributesWithSearchWeights() {
Map<String, Integer> attributesWithSearchWeights = new HashMap<>();
private List<String> getPropertiesForSuggestions(Map<String, Integer> propertyName2SearchWeightMap) {
List<String> propertiesForSuggestions = new ArrayList<>();
for(Map.Entry<String, Integer> entry: propertyName2SearchWeightMap.entrySet()) {
if(entry.getValue().intValue() >= MIN_SEARCH_WEIGHT_FOR_SUGGESTIONS) {
String propertyName = entry.getKey();
if (LOG.isDebugEnabled()) {
LOG.debug("Adding the property {} for suggestions.", propertyName);
}
propertiesForSuggestions.add(propertyName);
}
}
return propertiesForSuggestions;
}
private Map<String, Integer> gePropertiesWithSearchWeights() {
Map<String, Integer> propertiesWithSearchWeights = new HashMap<>();
Collection<AtlasEntityDef> allEntityDefs = typeRegistry.getAllEntityDefs();
attributesWithSearchWeights.put(CLASSIFICATION_TEXT_KEY, SEARCHWEIGHT_FOR_CLASSIFICATIONS);
attributesWithSearchWeights.put(TYPE_NAME_PROPERTY_KEY, SEARCHWEIGHT_FOR_TYPENAME);
propertiesWithSearchWeights.put(CLASSIFICATION_TEXT_KEY, SEARCHWEIGHT_FOR_CLASSIFICATIONS);
propertiesWithSearchWeights.put(TYPE_NAME_PROPERTY_KEY, SEARCHWEIGHT_FOR_TYPENAME);
if (CollectionUtils.isNotEmpty(allEntityDefs)) {
for (AtlasEntityDef entityDef : allEntityDefs) {
processEntity(attributesWithSearchWeights, entityDef);
processEntity(propertiesWithSearchWeights, entityDef);
}
}
return attributesWithSearchWeights;
return propertiesWithSearchWeights;
}
private void processEntity(Map<String, Integer> attributesWithSearchWeights, AtlasEntityDef entityDef) {
private void processEntity(Map<String, Integer> propertiesWithSearchWeights, AtlasEntityDef entityDef) {
for (AtlasAttributeDef attributeDef : entityDef.getAttributeDefs()) {
processAttributeDefinition(attributesWithSearchWeights, entityDef, attributeDef);
processAttributeDefinition(propertiesWithSearchWeights, entityDef, attributeDef);
}
}
private void processAttributeDefinition(Map<String, Integer> attributesWithSearchWeights, AtlasEntityDef entityDef, AtlasAttributeDef attributeDef) {
private void processAttributeDefinition(Map<String, Integer> propertiesWithSearchWeights, AtlasEntityDef entityDef, AtlasAttributeDef attributeDef) {
if (GraphBackedSearchIndexer.isStringAttribute(attributeDef)) {
final String attributeName = GraphBackedSearchIndexer.getEncodedPropertyName(entityDef.getName(), attributeDef);
final String propertyName = GraphBackedSearchIndexer.getEncodedPropertyName(entityDef.getName(), attributeDef);
int searchWeight = attributeDef.getSearchWeight();
if (searchWeight == DEFAULT_SEARCHWEIGHT) {
......@@ -108,16 +128,16 @@ public class SolrIndexHelper implements IndexChangeListener {
//this will make the string data searchable like in FullTextIndex Searcher using Free Text searcher.
searchWeight = DEFAULT_SEARCHWEIGHT_FOR_STRINGS;
} else if (!GraphBackedSearchIndexer.isValidSearchWeight(searchWeight)) { //validate the value provided in the model.
LOG.warn("Invalid search weight {} for attribute {}.{}. Will use default {}", searchWeight, entityDef.getName(), attributeName, DEFAULT_SEARCHWEIGHT_FOR_STRINGS);
LOG.warn("Invalid search weight {} for attribute {}.{}. Will use default {}", searchWeight, entityDef.getName(), propertyName, DEFAULT_SEARCHWEIGHT_FOR_STRINGS);
searchWeight = DEFAULT_SEARCHWEIGHT_FOR_STRINGS;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Applying search weight {} for attribute {}.{}", searchWeight, entityDef.getName(), attributeName);
LOG.debug("Applying search weight {} for attribute {}.{}", searchWeight, entityDef.getName(), propertyName);
}
attributesWithSearchWeights.put(attributeName, searchWeight);
propertiesWithSearchWeights.put(propertyName, searchWeight);
}
}
}
\ No newline at end of file
......@@ -22,7 +22,9 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.SortOrder;
import org.apache.atlas.discovery.AtlasDiscoveryService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.AtlasQuickSearchResult;
import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.discovery.AtlasSuggestionsResult;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria;
import org.apache.atlas.model.profile.AtlasUserSavedSearch;
......@@ -559,6 +561,60 @@ public class DiscoveryREST {
}
}
/**
* Attribute based search for entities satisfying the search parameters
*@return Atlas search result
* @throws AtlasBaseException
* @HTTP 200 On successful search
* @HTTP 400 Tag/Entity doesn't exist or Tag/entity filter is present without tag/type name
*/
@Path("/quick")
@GET
public AtlasQuickSearchResult searchUsingFreeText(@QueryParam("query") String query,
@QueryParam("excludeDeletedEntities") boolean excludeDeletedEntities,
@QueryParam("limit") int limit,
@QueryParam("offset") int offset) throws AtlasBaseException {
if (StringUtils.isNotEmpty(query) && query.length() > maxFullTextQueryLength) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_QUERY_LENGTH, Constants.MAX_FULLTEXT_QUERY_STR_LENGTH);
}
AtlasPerfTracer perf = null;
try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DiscoveryREST.quick(" + query + "," +
"excludeDeletedEntities:" + excludeDeletedEntities + "," + limit + "," + offset + ")");
}
SearchParameters searchParameters = new SearchParameters();
searchParameters.setQuery(query);
searchParameters.setExcludeDeletedEntities(excludeDeletedEntities);
searchParameters.setLimit(limit);
searchParameters.setOffset(offset);
return atlasDiscoveryService.quickSearchWithParameters(searchParameters);
} finally {
AtlasPerfTracer.log(perf);
}
}
@Path("suggestions")
@GET
public AtlasSuggestionsResult getSuggestions(@QueryParam("prefixString") String prefixString) throws AtlasBaseException {
AtlasPerfTracer perf = null;
try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DiscoveryREST.getSuggestions(" + prefixString + ")");
}
return atlasDiscoveryService.getSuggestions(prefixString);
} finally {
AtlasPerfTracer.log(perf);
}
}
private boolean isEmpty(SearchParameters.FilterCriteria filterCriteria) {
return filterCriteria == null ||
(StringUtils.isEmpty(filterCriteria.getAttributeName()) && CollectionUtils.isEmpty(filterCriteria.getCriterion()));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment