Commit b9481410 by Madhan Neethiraj

ATLAS-3262: disabled full-text index population when free-text is enabled i.e. for Solr index store

parent 40081d48
...@@ -196,7 +196,9 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE ...@@ -196,7 +196,9 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE
@Override @Override
public AtlasGraphIndexClient getGraphIndexClient() throws AtlasException { public AtlasGraphIndexClient getGraphIndexClient() throws AtlasException {
try { try {
return new AtlasJanusGraphSolrIndexClient(this); initApplicationProperties();
return new AtlasJanusGraphIndexClient(this, APPLICATION_PROPERTIES);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error encountered in creating Graph Index Client.", e); LOG.error("Error encountered in creating Graph Index Client.", e);
throw new AtlasException(e); throw new AtlasException(e);
......
...@@ -83,6 +83,10 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, ...@@ -83,6 +83,10 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex,
Configuration configProperties = ApplicationProperties.get(); Configuration configProperties = ApplicationProperties.get();
if (isEmbeddedSolr()) { // AtlasJanusGraphIndexClient.performRequestHandlerAction() fails for embedded-solr; disable freetext until this issue is resolved
configProperties.setProperty(ApplicationProperties.ENABLE_FREETEXT_SEARCH_CONF, false);
}
configProperties.setProperty(SOLR_ZOOKEEPER_URLS, configProperties.getStringArray(SOLR_ZOOKEEPER_URL)); configProperties.setProperty(SOLR_ZOOKEEPER_URLS, configProperties.getStringArray(SOLR_ZOOKEEPER_URL));
Configuration janusConfig = ApplicationProperties.getSubsetConfiguration(configProperties, GRAPH_PREFIX); Configuration janusConfig = ApplicationProperties.getSubsetConfiguration(configProperties, GRAPH_PREFIX);
......
...@@ -22,6 +22,7 @@ import org.apache.atlas.repository.graphdb.AtlasGraph; ...@@ -22,6 +22,7 @@ import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphIndexClient; import org.apache.atlas.repository.graphdb.AtlasGraphIndexClient;
import org.apache.atlas.repository.graphdb.AtlasGraphManagement; import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
import org.apache.atlas.repository.graphdb.AtlasPropertyKey; import org.apache.atlas.repository.graphdb.AtlasPropertyKey;
import org.apache.commons.configuration.Configuration;
import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.SolrServerException;
...@@ -35,67 +36,98 @@ import java.util.*; ...@@ -35,67 +36,98 @@ import java.util.*;
import static org.apache.atlas.repository.Constants.FREETEXT_REQUEST_HANDLER; import static org.apache.atlas.repository.Constants.FREETEXT_REQUEST_HANDLER;
public class AtlasJanusGraphSolrIndexClient implements AtlasGraphIndexClient { public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
private static final Logger LOG = LoggerFactory.getLogger(AtlasJanusGraphSolrIndexClient.class); private static final Logger LOG = LoggerFactory.getLogger(AtlasJanusGraphIndexClient.class);
private final SolrClient solrClient;
private final AtlasGraph graph; private final AtlasGraph graph;
private final Configuration configuration;
public AtlasJanusGraphSolrIndexClient(AtlasGraph graph) { public AtlasJanusGraphIndexClient(AtlasGraph graph, Configuration configuration) {
// get solr client using same settings as that of Janus Graph
this.solrClient = Solr6Index.getSolrClient();
this.graph = graph; this.graph = graph;
this.configuration = configuration;
if(solrClient == null) {
LOG.warn("Non SOLR index stores are not supported yet.");
}
} }
@Override @Override
public void applySearchWeight(String collectionName, Map<String, Integer> attributeName2SearchWeightMap) { public void applySearchWeight(String collectionName, Map<String, Integer> attributeName2SearchWeightMap) {
SolrClient solrClient = null;
try {
solrClient = Solr6Index.getSolrClient(); // get solr client using same settings as that of Janus Graph
if (solrClient == null) {
LOG.warn("AtlasJanusGraphIndexClient.applySearchWeight(): Non SOLR index stores are not supported yet.");
return;
}
//1) try updating request handler //1) try updating request handler
//2) if update fails, try creating request handler //2) if update fails, try creating request handler
int maxAttempts = configuration != null ? configuration.getInt("index.client.apply.search.weight.max.attempts", 3) : 3;
int retryWaitIntervalMs = configuration != null ? configuration.getInt("index.client.apply.search.weight.retry.interval.ms", 1000) : 1000;
Throwable lastExcp = null;
for (int i = 0; i < maxAttempts; i++) {
if (i > 0) {
LOG.warn("Attempt #{} failed! Waiting for {}ms before retry", i, retryWaitIntervalMs);
try {
Thread.sleep(retryWaitIntervalMs);
} catch (Exception excp) {
// ignore
}
}
try { try {
LOG.info("Attempting to update free text request handler {} for collection {}", FREETEXT_REQUEST_HANDLER, collectionName); LOG.info("Attempting to update free text request handler {} for collection {}", FREETEXT_REQUEST_HANDLER, collectionName);
updateSearchWeights(collectionName, attributeName2SearchWeightMap); updateFreeTextRequestHandler(solrClient, collectionName, attributeName2SearchWeightMap);
LOG.info("Successfully updated free text request handler {} for collection {}..", FREETEXT_REQUEST_HANDLER, collectionName); LOG.info("Successfully updated free text request handler {} for collection {}..", FREETEXT_REQUEST_HANDLER, collectionName);
return; return;
} catch (Throwable t) { } catch (Throwable t) {
LOG.warn("Error encountered in updating request handler {} for collection {}. Attempting to create one", FREETEXT_REQUEST_HANDLER, collectionName, t); lastExcp = t;
LOG.warn("Error encountered in updating request handler {} for collection {}. Will attempt to create one", FREETEXT_REQUEST_HANDLER, collectionName);
} }
try { try {
LOG.info("Attempting to create free text request handler {} for collection {}", FREETEXT_REQUEST_HANDLER, collectionName); LOG.info("Attempting to create free text request handler {} for collection {}", FREETEXT_REQUEST_HANDLER, collectionName);
createFreeTextRequestHandler(collectionName, attributeName2SearchWeightMap); createFreeTextRequestHandler(solrClient, collectionName, attributeName2SearchWeightMap);
LOG.info("Successfully created free text request handler {} for collection {}", FREETEXT_REQUEST_HANDLER, collectionName); LOG.info("Successfully created free text request handler {} for collection {}", FREETEXT_REQUEST_HANDLER, collectionName);
} catch (Throwable t) {
String msg = String.format("Error encountered in creating the request handler '%s' for collection '%s'.", FREETEXT_REQUEST_HANDLER, collectionName);
LOG.error(msg, t); return;
} catch (Throwable t) {
lastExcp = t;
throw new RuntimeException(msg, t); LOG.warn("Error encountered in creating request handler {} for collection {}", FREETEXT_REQUEST_HANDLER, collectionName, t);
} }
} }
private void updateSearchWeights(String collectionName, Map<String, Integer> attributeName2SearchWeightMap) { String msg = String.format("Error encountered in creating/updating request handler %s for collection %s", FREETEXT_REQUEST_HANDLER, collectionName);
try {
updateFreeTextRequestHandler(collectionName, attributeName2SearchWeightMap);
} catch (Throwable t) {
String msg = String.format("Error encountered in updating the request handler '%s' for collection '%s'", FREETEXT_REQUEST_HANDLER, collectionName);
LOG.error(msg, t); throw lastExcp != null ? new RuntimeException(msg, lastExcp) : new RuntimeException(msg);
} finally {
Solr6Index.releaseSolrClient(solrClient);
}
}
private void updateFreeTextRequestHandler(SolrClient solrClient, String collectionName, Map<String, Integer> attributeName2SearchWeightMap) throws IOException, SolrServerException {
String searchWeightString = generateSearchWeightString(graph.getManagementSystem(), collectionName, attributeName2SearchWeightMap);
String payLoadString = generatePayLoadForFreeText("update-requesthandler", FREETEXT_REQUEST_HANDLER, searchWeightString);
throw new RuntimeException(msg, t); performRequestHandlerAction(collectionName, solrClient, payLoadString);
} }
LOG.info("Updated free text request handler for collection {}.", collectionName); private void createFreeTextRequestHandler(SolrClient solrClient, String collectionName, Map<String, Integer> attributeName2SearchWeightMap) throws IOException, SolrServerException {
String searchWeightString = generateSearchWeightString(graph.getManagementSystem(), collectionName, attributeName2SearchWeightMap);
String payLoadString = generatePayLoadForFreeText("create-requesthandler", FREETEXT_REQUEST_HANDLER, searchWeightString);
performRequestHandlerAction(collectionName, solrClient, payLoadString);
} }
private String generateSearchWeightString(AtlasGraphManagement management, String indexName, Map<String, Integer> searchWeightsMap) { private String generateSearchWeightString(AtlasGraphManagement management, String indexName, Map<String, Integer> searchWeightsMap) {
...@@ -116,20 +148,6 @@ public class AtlasJanusGraphSolrIndexClient implements AtlasGraphIndexClient { ...@@ -116,20 +148,6 @@ public class AtlasJanusGraphSolrIndexClient implements AtlasGraphIndexClient {
return searchWeightBuilder.toString(); return searchWeightBuilder.toString();
} }
private void updateFreeTextRequestHandler(String collectionName, Map<String, Integer> attributeName2SearchWeightMap) throws IOException, SolrServerException {
String searchWeightString = generateSearchWeightString(graph.getManagementSystem(), collectionName, attributeName2SearchWeightMap);
String payLoadString = generatePayLoadForFreeText("update-requesthandler", FREETEXT_REQUEST_HANDLER, searchWeightString);
performRequestHandlerAction(collectionName, solrClient, payLoadString);
}
private void createFreeTextRequestHandler(String collectionName, Map<String, Integer> attributeName2SearchWeightMap) throws IOException, SolrServerException {
String searchWeightString = generateSearchWeightString(graph.getManagementSystem(), collectionName, attributeName2SearchWeightMap);
String payLoadString = generatePayLoadForFreeText("create-requesthandler", FREETEXT_REQUEST_HANDLER, searchWeightString);
performRequestHandlerAction(collectionName, solrClient, payLoadString);
}
@VisibleForTesting @VisibleForTesting
static String generatePayLoadForFreeText(String action, String handlerName, String qfValue) { static String generatePayLoadForFreeText(String action, String handlerName, String qfValue) {
return String.format("{" + return String.format("{" +
......
...@@ -137,7 +137,8 @@ public class Solr6Index implements IndexProvider { ...@@ -137,7 +137,8 @@ public class Solr6Index implements IndexProvider {
private static final String DEFAULT_ID_FIELD = "id"; private static final String DEFAULT_ID_FIELD = "id";
private static final char CHROOT_START_CHAR = '/'; private static final char CHROOT_START_CHAR = '/';
private static Configuration config;
private static Solr6Index instance = null;
private enum Mode { private enum Mode {
HTTP, CLOUD; HTTP, CLOUD;
...@@ -181,7 +182,7 @@ public class Solr6Index implements IndexProvider { ...@@ -181,7 +182,7 @@ public class Solr6Index implements IndexProvider {
public Solr6Index(final Configuration config) throws BackendException { public Solr6Index(final Configuration config) throws BackendException {
// Add Kerberos-enabled SolrHttpClientBuilder // Add Kerberos-enabled SolrHttpClientBuilder
HttpClientUtil.setHttpClientBuilder(new Krb5HttpClientBuilder().getBuilder()); HttpClientUtil.setHttpClientBuilder(new Krb5HttpClientBuilder().getBuilder());
initConfiguration(config);
Preconditions.checkArgument(config!=null); Preconditions.checkArgument(config!=null);
configuration = config; configuration = config;
mode = Mode.parse(config.get(SOLR_MODE)); mode = Mode.parse(config.get(SOLR_MODE));
...@@ -200,34 +201,39 @@ public class Solr6Index implements IndexProvider { ...@@ -200,34 +201,39 @@ public class Solr6Index implements IndexProvider {
logger.debug("KERBEROS_ENABLED name is " + KERBEROS_ENABLED.getName() + " and it is" + (KERBEROS_ENABLED.isOption() ? " " : " not") + " an option."); logger.debug("KERBEROS_ENABLED name is " + KERBEROS_ENABLED.getName() + " and it is" + (KERBEROS_ENABLED.isOption() ? " " : " not") + " an option.");
logger.debug("KERBEROS_ENABLED type is " + KERBEROS_ENABLED.getType().name()); logger.debug("KERBEROS_ENABLED type is " + KERBEROS_ENABLED.getType().name());
} }
solrClient = getSolrClient();
} solrClient = createSolrClient();
private static void initConfiguration(Configuration config) { Solr6Index.instance = this;
if(Solr6Index.config == null) {
Solr6Index.config = config;
}
} }
public static SolrClient getSolrClient() { public static SolrClient getSolrClient() {
return Solr6Index.instance != null ? Solr6Index.instance.createSolrClient() : null;
}
public static void releaseSolrClient(SolrClient solrClient) {
if (solrClient != null) {
try {
solrClient.close();
} catch (IOException excp) {
logger.warn("Failed to close SolrClient", excp);
}
}
}
private SolrClient createSolrClient() {
final ModifiableSolrParams clientParams = new ModifiableSolrParams(); final ModifiableSolrParams clientParams = new ModifiableSolrParams();
SolrClient solrClient = null; SolrClient solrClient = null;
if(Solr6Index.config == null) {
logger.error("The solr client is not being used for the indexing purposes."); Mode mode = Mode.parse(configuration.get(SOLR_MODE));
return null;
}
Configuration config = Solr6Index.config;
Mode mode = Mode.parse(config.get(SOLR_MODE));
switch (mode) { switch (mode) {
case CLOUD: case CLOUD:
final CloudSolrClient cloudServer = new CloudSolrClient.Builder() final CloudSolrClient cloudServer = new CloudSolrClient.Builder()
.withLBHttpSolrClientBuilder( .withLBHttpSolrClientBuilder(
new LBHttpSolrClient.Builder() new LBHttpSolrClient.Builder()
.withHttpSolrClientBuilder(new HttpSolrClient.Builder().withInvariantParams(clientParams)) .withHttpSolrClientBuilder(new HttpSolrClient.Builder().withInvariantParams(clientParams))
.withBaseSolrUrls(config.get(HTTP_URLS)) .withBaseSolrUrls(configuration.get(HTTP_URLS))
) )
.withZkHost(getZookeeperURLs(config)) .withZkHost(getZookeeperURLs(configuration))
.sendUpdatesOnlyToShardLeaders() .sendUpdatesOnlyToShardLeaders()
.build(); .build();
cloudServer.connect(); cloudServer.connect();
...@@ -235,14 +241,14 @@ public class Solr6Index implements IndexProvider { ...@@ -235,14 +241,14 @@ public class Solr6Index implements IndexProvider {
logger.info("Created solr client using Cloud based configuration."); logger.info("Created solr client using Cloud based configuration.");
break; break;
case HTTP: case HTTP:
clientParams.add(HttpClientUtil.PROP_ALLOW_COMPRESSION, config.get(HTTP_ALLOW_COMPRESSION).toString()); clientParams.add(HttpClientUtil.PROP_ALLOW_COMPRESSION, configuration.get(HTTP_ALLOW_COMPRESSION).toString());
clientParams.add(HttpClientUtil.PROP_CONNECTION_TIMEOUT, config.get(HTTP_CONNECTION_TIMEOUT).toString()); clientParams.add(HttpClientUtil.PROP_CONNECTION_TIMEOUT, configuration.get(HTTP_CONNECTION_TIMEOUT).toString());
clientParams.add(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, config.get(HTTP_MAX_CONNECTIONS_PER_HOST).toString()); clientParams.add(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, configuration.get(HTTP_MAX_CONNECTIONS_PER_HOST).toString());
clientParams.add(HttpClientUtil.PROP_MAX_CONNECTIONS, config.get(HTTP_GLOBAL_MAX_CONNECTIONS).toString()); clientParams.add(HttpClientUtil.PROP_MAX_CONNECTIONS, configuration.get(HTTP_GLOBAL_MAX_CONNECTIONS).toString());
final HttpClient client = HttpClientUtil.createClient(clientParams); final HttpClient client = HttpClientUtil.createClient(clientParams);
solrClient = new LBHttpSolrClient.Builder() solrClient = new LBHttpSolrClient.Builder()
.withHttpClient(client) .withHttpClient(client)
.withBaseSolrUrls(config.get(HTTP_URLS)) .withBaseSolrUrls(configuration.get(HTTP_URLS))
.build(); .build();
logger.info("Created solr client using HTTP based configuration."); logger.info("Created solr client using HTTP based configuration.");
break; break;
......
...@@ -48,8 +48,9 @@ public final class ApplicationProperties extends PropertiesConfiguration { ...@@ -48,8 +48,9 @@ public final class ApplicationProperties extends PropertiesConfiguration {
public static final String STORAGE_BACKEND_CONF = "atlas.graph.storage.backend"; public static final String STORAGE_BACKEND_CONF = "atlas.graph.storage.backend";
public static final String INDEX_BACKEND_CONF = "atlas.graph.index.search.backend"; public static final String INDEX_BACKEND_CONF = "atlas.graph.index.search.backend";
public static final String INDEX_MAP_NAME_CONF = "atlas.graph.index.search.map-name"; public static final String INDEX_MAP_NAME_CONF = "atlas.graph.index.search.map-name";
public static final String FREE_TEXT_INDEX_ENABLED = "atlas.freetext.index.enabled";
public static final String SOLR_WAIT_SEARCHER_CONF = "atlas.graph.index.search.solr.wait-searcher"; public static final String SOLR_WAIT_SEARCHER_CONF = "atlas.graph.index.search.solr.wait-searcher";
public static final String ENABLE_FULLTEXT_SEARCH_CONF = "atlas.search.fulltext.enable";
public static final String ENABLE_FREETEXT_SEARCH_CONF = "atlas.search.freetext.enable";
public static final String GRAPHBD_BACKEND_JANUS = "janus"; public static final String GRAPHBD_BACKEND_JANUS = "janus";
public static final String STORAGE_BACKEND_HBASE = "hbase"; public static final String STORAGE_BACKEND_HBASE = "hbase";
public static final String STORAGE_BACKEND_HBASE2 = "hbase2"; public static final String STORAGE_BACKEND_HBASE2 = "hbase2";
...@@ -57,7 +58,6 @@ public final class ApplicationProperties extends PropertiesConfiguration { ...@@ -57,7 +58,6 @@ public final class ApplicationProperties extends PropertiesConfiguration {
public static final String DEFAULT_GRAPHDB_BACKEND = GRAPHBD_BACKEND_JANUS; public static final String DEFAULT_GRAPHDB_BACKEND = GRAPHBD_BACKEND_JANUS;
public static final boolean DEFAULT_SOLR_WAIT_SEARCHER = true; public static final boolean DEFAULT_SOLR_WAIT_SEARCHER = true;
public static final boolean DEFAULT_INDEX_MAP_NAME = false; public static final boolean DEFAULT_INDEX_MAP_NAME = false;
public static final boolean DEFAULT_FREE_TEXT_INDEX_ENABLED = false;
public static final SimpleEntry<String, String> DB_CACHE_CONF = new SimpleEntry<>("atlas.graph.cache.db-cache", "true"); public static final SimpleEntry<String, String> DB_CACHE_CONF = new SimpleEntry<>("atlas.graph.cache.db-cache", "true");
public static final SimpleEntry<String, String> DB_CACHE_CLEAN_WAIT_CONF = new SimpleEntry<>("atlas.graph.cache.db-cache-clean-wait", "20"); public static final SimpleEntry<String, String> DB_CACHE_CLEAN_WAIT_CONF = new SimpleEntry<>("atlas.graph.cache.db-cache-clean-wait", "20");
......
...@@ -18,35 +18,37 @@ ...@@ -18,35 +18,37 @@
package org.apache.atlas.listener; package org.apache.atlas.listener;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef; import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
public class ChangedTypeDefs { public class ChangedTypeDefs {
private List<? extends AtlasBaseTypeDef> createTypeDefs; private List<? extends AtlasBaseTypeDef> createdTypeDefs;
private List<? extends AtlasBaseTypeDef> updatedTypeDefs; private List<? extends AtlasBaseTypeDef> updatedTypeDefs;
private List<? extends AtlasBaseTypeDef> deletedTypeDefs; private List<? extends AtlasBaseTypeDef> deletedTypeDefs;
public ChangedTypeDefs(List<? extends AtlasBaseTypeDef> createTypeDefs, public ChangedTypeDefs(List<? extends AtlasBaseTypeDef> createdTypeDefs,
List<? extends AtlasBaseTypeDef> updatedTypeDefs, List<? extends AtlasBaseTypeDef> updatedTypeDefs,
List<? extends AtlasBaseTypeDef> deletedTypeDefs) { List<? extends AtlasBaseTypeDef> deletedTypeDefs) {
this.createTypeDefs = createTypeDefs; this.createdTypeDefs = createdTypeDefs;
this.updatedTypeDefs = updatedTypeDefs; this.updatedTypeDefs = updatedTypeDefs;
this.deletedTypeDefs = deletedTypeDefs; this.deletedTypeDefs = deletedTypeDefs;
} }
public ChangedTypeDefs() { public ChangedTypeDefs() {
createTypeDefs = new ArrayList<>(); createdTypeDefs = new ArrayList<>();
updatedTypeDefs = new ArrayList<>(); updatedTypeDefs = new ArrayList<>();
deletedTypeDefs = new ArrayList<>(); deletedTypeDefs = new ArrayList<>();
} }
public List<? extends AtlasBaseTypeDef> getCreateTypeDefs() { public List<? extends AtlasBaseTypeDef> getCreatedTypeDefs() {
return createTypeDefs; return createdTypeDefs;
} }
public ChangedTypeDefs setCreateTypeDefs(List<? extends AtlasBaseTypeDef> createTypeDefs) { public ChangedTypeDefs setCreatedTypeDefs(List<? extends AtlasBaseTypeDef> createdTypeDefs) {
this.createTypeDefs = createTypeDefs; this.createdTypeDefs = createdTypeDefs;
return this; return this;
} }
...@@ -67,4 +69,24 @@ public class ChangedTypeDefs { ...@@ -67,4 +69,24 @@ public class ChangedTypeDefs {
this.deletedTypeDefs = deletedTypeDefs; this.deletedTypeDefs = deletedTypeDefs;
return this; return this;
} }
public boolean hasEntityDef() {
return hasEntityDef(createdTypeDefs) || hasEntityDef(updatedTypeDefs) || hasEntityDef(deletedTypeDefs);
}
private boolean hasEntityDef(List<? extends AtlasBaseTypeDef> typeDefs) {
boolean ret = false;
if (CollectionUtils.isNotEmpty(typeDefs)) {
for (AtlasBaseTypeDef typeDef : typeDefs) {
if (typeDef instanceof AtlasEntityDef) {
ret = true;
break;
}
}
}
return ret;
}
} }
...@@ -41,7 +41,6 @@ import org.slf4j.LoggerFactory; ...@@ -41,7 +41,6 @@ import org.slf4j.LoggerFactory;
import javax.script.ScriptEngine; import javax.script.ScriptEngine;
import javax.script.ScriptException; import javax.script.ScriptException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
...@@ -82,16 +81,8 @@ public class ClassificationSearchProcessor extends SearchProcessor { ...@@ -82,16 +81,8 @@ public class ClassificationSearchProcessor extends SearchProcessor {
final Set<String> indexAttributes = new HashSet<>(); final Set<String> indexAttributes = new HashSet<>();
final Set<String> graphAttributes = new HashSet<>(); final Set<String> graphAttributes = new HashSet<>();
final Set<String> allAttributes = new HashSet<>(); final Set<String> allAttributes = new HashSet<>();
final Set<String> typeAndSubTypes; final Set<String> typeAndSubTypes = context.getClassificationTypes();
final String typeAndSubTypesQryStr; final String typeAndSubTypesQryStr = context.getClassificationTypesQryStr();
if (context.getSearchParameters().getIncludeSubClassifications()) {
typeAndSubTypes = classificationType.getTypeAndAllSubTypes();
typeAndSubTypesQryStr = classificationType.getTypeAndAllSubTypesQryStr();
} else {
typeAndSubTypes = Collections.singleton(classificationType.getTypeName());
typeAndSubTypesQryStr = classificationType.getTypeQryStr();
}
processSearchAttributes(classificationType, filterCriteria, indexAttributes, graphAttributes, allAttributes); processSearchAttributes(classificationType, filterCriteria, indexAttributes, graphAttributes, allAttributes);
......
...@@ -33,7 +33,6 @@ import org.slf4j.Logger; ...@@ -33,7 +33,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
...@@ -65,32 +64,17 @@ public class EntitySearchProcessor extends SearchProcessor { ...@@ -65,32 +64,17 @@ public class EntitySearchProcessor extends SearchProcessor {
final Set<String> indexAttributes = new HashSet<>(); final Set<String> indexAttributes = new HashSet<>();
final Set<String> graphAttributes = new HashSet<>(); final Set<String> graphAttributes = new HashSet<>();
final Set<String> allAttributes = new HashSet<>(); final Set<String> allAttributes = new HashSet<>();
final Set<String> typeAndSubTypes; final Set<String> typeAndSubTypes = context.getEntityTypes();
final String typeAndSubTypesQryStr; final String typeAndSubTypesQryStr = context.getEntityTypesQryStr();
if (context.getSearchParameters().getIncludeSubTypes()) {
typeAndSubTypes = entityType.getTypeAndAllSubTypes();
typeAndSubTypesQryStr = entityType.getTypeAndAllSubTypesQryStr();
} else {
typeAndSubTypes = Collections.singleton(entityType.getTypeName());
typeAndSubTypesQryStr = entityType.getTypeQryStr();
}
final AtlasClassificationType classificationType = context.getClassificationType(); final AtlasClassificationType classificationType = context.getClassificationType();
final Set<String> classificationTypeAndSubTypes = context.getClassificationTypes();
final boolean filterClassification; final boolean filterClassification;
final Set<String> classificationTypeAndSubTypes;
if (classificationType != null) { if (classificationType != null) {
filterClassification = !context.needClassificationProcessor(); filterClassification = !context.needClassificationProcessor();
if (context.getSearchParameters().getIncludeSubClassifications()) {
classificationTypeAndSubTypes = classificationType.getTypeAndAllSubTypes();
} else {
classificationTypeAndSubTypes = Collections.singleton(classificationType.getTypeName());
}
} else { } else {
filterClassification = false; filterClassification = false;
classificationTypeAndSubTypes = Collections.emptySet();
} }
final Predicate typeNamePredicate = SearchPredicateUtil.getINPredicateGenerator() final Predicate typeNamePredicate = SearchPredicateUtil.getINPredicateGenerator()
......
...@@ -20,14 +20,17 @@ package org.apache.atlas.discovery; ...@@ -20,14 +20,17 @@ package org.apache.atlas.discovery;
import org.apache.atlas.model.discovery.SearchParameters; import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.*; import org.apache.atlas.repository.graphdb.*;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.utils.AtlasPerfTracer; import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.*; import java.util.*;
/** /**
* This class is equivalent to legacy FullTextSearchProcessor--except that it uses a better search techniques using SOLR * This class is equivalent to legacy FullTextSearchProcessor--except that it uses a better search techniques using SOLR
* than going through Janus Graph index apis. * than going through Janus Graph index apis.
...@@ -37,7 +40,6 @@ public class FreeTextSearchProcessor extends SearchProcessor { ...@@ -37,7 +40,6 @@ public class FreeTextSearchProcessor extends SearchProcessor {
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("FreeTextSearchProcessor"); private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("FreeTextSearchProcessor");
public static final String SOLR_QT_PARAMETER = "qt"; public static final String SOLR_QT_PARAMETER = "qt";
public static final String SOLR_REQUEST_HANDLER_NAME = "/freetext"; public static final String SOLR_REQUEST_HANDLER_NAME = "/freetext";
private static final int MAX_TYPES_STRING_SIZE = 1000;
private final AtlasIndexQuery indexQuery; private final AtlasIndexQuery indexQuery;
...@@ -49,50 +51,32 @@ public class FreeTextSearchProcessor extends SearchProcessor { ...@@ -49,50 +51,32 @@ public class FreeTextSearchProcessor extends SearchProcessor {
queryString.append(searchParameters.getQuery()); queryString.append(searchParameters.getQuery());
String queryFields = null; if (StringUtils.isNotEmpty(context.getEntityTypesQryStr()) && context.getEntityTypesQryStr().length() <= MAX_QUERY_STR_LENGTH_TYPES) {
// if search includes entity-type criteria, adding a filter here can help avoid unnecessary queryString.append(AND_STR).append(context.getEntityTypesQryStr());
// processing (and rejection) by subsequent EntitySearchProcessor
if (context.getEntityType() != null) {
String typeString = context.getEntityType().getTypeAndAllSubTypesQryStr();
if (typeString.length() > MAX_TYPES_STRING_SIZE) {
LOG.info("Dropping the use of types string optimization as there are too many types {} for select type {}.", typeString, context.getEntityType().getTypeName());
} else {
LOG.debug("Using the use of types string optimization as there are too many types {} for select type {}.", typeString, context.getEntityType().getTypeName());
final Set<String> types = context.getEntityType().getTypeAndAllSubTypes();
final AtlasGraphManagement managementSystem = context.getGraph().getManagementSystem();
AtlasPropertyKey entityTypeNamePropertyKey = managementSystem.getPropertyKey(AtlasGraphUtilsV2.encodePropertyKey(Constants.ENTITY_TYPE_PROPERTY_KEY));
String encodedPropertyName = managementSystem.getIndexFieldName(Constants.VERTEX_INDEX, entityTypeNamePropertyKey);
StringBuilder typesStringBuilder = new StringBuilder();
for(String typeName: types) {
typesStringBuilder.append(" ").append(typeName);
}
//append the query with type and substypes listed in it
String typesString = typesStringBuilder.toString();
queryString.append(" AND +").append(encodedPropertyName).append(":[");
queryString.append(typesStringBuilder.toString());
queryString.append("]");
} }
if (StringUtils.isNotEmpty(context.getClassificationTypesQryStr()) && context.getClassificationTypesQryStr().length() <= MAX_QUERY_STR_LENGTH_TYPES) {
queryString.append(AND_STR).append(context.getClassificationTypesQryStr());
} }
//just use the query string as is // just use the query string as is
LOG.debug("Using query string '{}'.", queryString); LOG.debug("Using query string '{}'.", queryString);
indexQuery = context.getGraph().indexQuery(prepareGraphIndexQueryParameters(context, queryString)); indexQuery = context.getGraph().indexQuery(prepareGraphIndexQueryParameters(context, queryString));
} }
private GraphIndexQueryParameters prepareGraphIndexQueryParameters(SearchContext context, StringBuilder queryString) { private GraphIndexQueryParameters prepareGraphIndexQueryParameters(SearchContext context, StringBuilder queryString) {
List<AtlasIndexQueryParameter> parameters = new ArrayList<AtlasIndexQueryParameter>(); List<AtlasIndexQueryParameter> parameters = new ArrayList<>();
parameters.add(context.getGraph().indexQueryParameter(SOLR_QT_PARAMETER, SOLR_REQUEST_HANDLER_NAME)); parameters.add(context.getGraph().indexQueryParameter(SOLR_QT_PARAMETER, SOLR_REQUEST_HANDLER_NAME));
return new GraphIndexQueryParameters(Constants.VERTEX_INDEX, queryString.toString(), 0, parameters); return new GraphIndexQueryParameters(Constants.VERTEX_INDEX, queryString.toString(), 0, parameters);
} }
@Override @Override
public List<AtlasVertex> execute() { public List<AtlasVertex> execute() {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("==> FullTextSearchProcessorUsingFreeText.execute({})", context); LOG.debug("==> FreeTextSearchProcessor.execute({})", context);
} }
List<AtlasVertex> ret = new ArrayList<>(); List<AtlasVertex> ret = new ArrayList<>();
...@@ -100,7 +84,7 @@ public class FreeTextSearchProcessor extends SearchProcessor { ...@@ -100,7 +84,7 @@ public class FreeTextSearchProcessor extends SearchProcessor {
AtlasPerfTracer perf = null; AtlasPerfTracer perf = null;
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "FullTextSearchProcessorUsingFreeText.execute(" + context + ")"); perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "FreeTextSearchProcessor.execute(" + context + ")");
} }
try { try {
...@@ -136,19 +120,33 @@ public class FreeTextSearchProcessor extends SearchProcessor { ...@@ -136,19 +120,33 @@ public class FreeTextSearchProcessor extends SearchProcessor {
resultCount++; resultCount++;
String entityTypeName = AtlasGraphUtilsV2.getTypeName(vertex);
// skip non-entity vertices // skip non-entity vertices
if (!AtlasGraphUtilsV2.isEntityVertex(vertex)) { if (StringUtils.isEmpty(entityTypeName) || StringUtils.isEmpty(AtlasGraphUtilsV2.getIdFromVertex(vertex))) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("FullTextSearchProcessor.execute(): ignoring non-entity vertex (id={})", vertex.getId()); LOG.debug("FreeTextSearchProcessor.execute(): ignoring non-entity vertex (id={})", vertex.getId());
} }
continue; continue;
} }
if (!context.includeEntityType(entityTypeName)) {
continue;
}
if (activeOnly && AtlasGraphUtilsV2.getState(vertex) != AtlasEntity.Status.ACTIVE) { if (activeOnly && AtlasGraphUtilsV2.getState(vertex) != AtlasEntity.Status.ACTIVE) {
continue; continue;
} }
if (context.getClassificationType() != null) {
List<String> entityClassifications = GraphHelper.getAllTraitNames(vertex);
if (!context.includeClassificationTypes(entityClassifications)) {
continue;
}
}
entityVertices.add(vertex); entityVertices.add(vertex);
} }
...@@ -170,7 +168,7 @@ public class FreeTextSearchProcessor extends SearchProcessor { ...@@ -170,7 +168,7 @@ public class FreeTextSearchProcessor extends SearchProcessor {
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("<== FullTextSearchProcessor.execute({}): ret.size()={}", context, ret.size()); LOG.debug("<== FreeTextSearchProcessor.execute({}): ret.size()={}", context, ret.size());
} }
return ret; return ret;
......
...@@ -18,9 +18,7 @@ ...@@ -18,9 +18,7 @@
package org.apache.atlas.discovery; package org.apache.atlas.discovery;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.SearchParameters; import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria; import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria;
...@@ -39,16 +37,13 @@ import org.apache.atlas.type.AtlasEntityType; ...@@ -39,16 +37,13 @@ import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType; import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute; import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.AtlasRepositoryConfiguration;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.*;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import static org.apache.atlas.model.discovery.SearchParameters.ALL_CLASSIFICATIONS; import static org.apache.atlas.model.discovery.SearchParameters.ALL_CLASSIFICATIONS;
import static org.apache.atlas.model.discovery.SearchParameters.NO_CLASSIFICATIONS; import static org.apache.atlas.model.discovery.SearchParameters.NO_CLASSIFICATIONS;
...@@ -70,21 +65,16 @@ public class SearchContext { ...@@ -70,21 +65,16 @@ public class SearchContext {
private final AtlasClassificationType classificationType; private final AtlasClassificationType classificationType;
private SearchProcessor searchProcessor; private SearchProcessor searchProcessor;
private boolean terminateSearch = false; private boolean terminateSearch = false;
private static boolean isIndexSolrBased = false; private final Set<String> typeAndSubTypes;
private final Set<String> classificationTypeAndSubTypes;
private final String typeAndSubTypesQryStr;
private final String classificationTypeAndSubTypesQryStr;
public final static AtlasClassificationType MATCH_ALL_WILDCARD_CLASSIFICATION = new AtlasClassificationType(new AtlasClassificationDef(WILDCARD_CLASSIFICATIONS)); public final static AtlasClassificationType MATCH_ALL_WILDCARD_CLASSIFICATION = new AtlasClassificationType(new AtlasClassificationDef(WILDCARD_CLASSIFICATIONS));
public final static AtlasClassificationType MATCH_ALL_CLASSIFIED = new AtlasClassificationType(new AtlasClassificationDef(ALL_CLASSIFICATIONS)); public final static AtlasClassificationType MATCH_ALL_CLASSIFIED = new AtlasClassificationType(new AtlasClassificationDef(ALL_CLASSIFICATIONS));
public final static AtlasClassificationType MATCH_ALL_NOT_CLASSIFIED = new AtlasClassificationType(new AtlasClassificationDef(NO_CLASSIFICATIONS)); public final static AtlasClassificationType MATCH_ALL_NOT_CLASSIFIED = new AtlasClassificationType(new AtlasClassificationDef(NO_CLASSIFICATIONS));
static {
try {
isIndexSolrBased = ApplicationProperties.INDEX_BACKEND_SOLR.equalsIgnoreCase(ApplicationProperties.get().getString(ApplicationProperties.INDEX_BACKEND_CONF));
} catch (AtlasException e) {
String msg = String.format("Error encountered in verifying the backend index mode.");
LOG.error(msg, e);
throw new RuntimeException(msg, e);
};
}
public SearchContext(SearchParameters searchParameters, AtlasTypeRegistry typeRegistry, AtlasGraph graph, Set<String> indexedKeys) throws AtlasBaseException { public SearchContext(SearchParameters searchParameters, AtlasTypeRegistry typeRegistry, AtlasGraph graph, Set<String> indexedKeys) throws AtlasBaseException {
String classificationName = searchParameters.getClassification(); String classificationName = searchParameters.getClassification();
...@@ -119,17 +109,48 @@ public class SearchContext { ...@@ -119,17 +109,48 @@ public class SearchContext {
// Invalid attributes will raise an exception with 400 error code // Invalid attributes will raise an exception with 400 error code
validateAttributes(classificationType, searchParameters.getTagFilters()); validateAttributes(classificationType, searchParameters.getTagFilters());
if (entityType != null) {
if (searchParameters.getIncludeSubTypes()) {
typeAndSubTypes = entityType.getTypeAndAllSubTypes();
typeAndSubTypesQryStr = entityType.getTypeAndAllSubTypesQryStr();
} else {
typeAndSubTypes = Collections.singleton(entityType.getTypeName());
typeAndSubTypesQryStr = entityType.getTypeQryStr();
}
} else {
typeAndSubTypes = Collections.emptySet();
typeAndSubTypesQryStr = "";
}
if (classificationType != null) {
if (classificationType == MATCH_ALL_CLASSIFIED || classificationType == MATCH_ALL_NOT_CLASSIFIED || classificationType == MATCH_ALL_WILDCARD_CLASSIFICATION) {
classificationTypeAndSubTypes = Collections.emptySet();
classificationTypeAndSubTypesQryStr = "";
} else if (searchParameters.getIncludeSubClassifications()) {
classificationTypeAndSubTypes = classificationType.getTypeAndAllSubTypes();
classificationTypeAndSubTypesQryStr = classificationType.getTypeAndAllSubTypesQryStr();
} else {
classificationTypeAndSubTypes = Collections.singleton(classificationType.getTypeName());
classificationTypeAndSubTypesQryStr = classificationType.getTypeQryStr();
}
} else {
classificationTypeAndSubTypes = Collections.emptySet();
classificationTypeAndSubTypesQryStr = "";
}
if (glossaryTermVertex != null) { if (glossaryTermVertex != null) {
addProcessor(new TermSearchProcessor(this, getAssignedEntities(glossaryTermVertex))); addProcessor(new TermSearchProcessor(this, getAssignedEntities(glossaryTermVertex)));
} }
if (needFullTextProcessor()) { if (needFullTextProcessor()) {
if(!isFreeTextIndexEnabled()) { if (AtlasRepositoryConfiguration.isFreeTextSearchEnabled()) {
LOG.info("Using Full Text index based search."); LOG.debug("Using Free Text index based search.");
addProcessor(new FullTextSearchProcessor(this));
}else {
LOG.info("Using Free Text index based search.");
addProcessor(new FreeTextSearchProcessor(this)); addProcessor(new FreeTextSearchProcessor(this));
} else {
LOG.debug("Using Full Text index based search.");
addProcessor(new FullTextSearchProcessor(this));
} }
} }
...@@ -157,8 +178,36 @@ public class SearchContext { ...@@ -157,8 +178,36 @@ public class SearchContext {
public AtlasClassificationType getClassificationType() { return classificationType; } public AtlasClassificationType getClassificationType() { return classificationType; }
public Set<String> getEntityTypes() { return typeAndSubTypes; }
public Set<String> getClassificationTypes() { return classificationTypeAndSubTypes; }
public String getEntityTypesQryStr() { return typeAndSubTypesQryStr; }
public String getClassificationTypesQryStr() { return classificationTypeAndSubTypesQryStr; }
public SearchProcessor getSearchProcessor() { return searchProcessor; } public SearchProcessor getSearchProcessor() { return searchProcessor; }
public boolean includeEntityType(String entityType) {
return typeAndSubTypes.isEmpty() || typeAndSubTypes.contains(entityType);
}
public boolean includeClassificationTypes(Collection<String> classificationTypes) {
final boolean ret;
if (classificationType == null) {
ret = true;
} else if (classificationType == MATCH_ALL_NOT_CLASSIFIED) {
ret = CollectionUtils.isEmpty(classificationTypes);
} else if (classificationType == MATCH_ALL_CLASSIFIED || classificationType == MATCH_ALL_WILDCARD_CLASSIFICATION) {
ret = CollectionUtils.isNotEmpty(classificationTypes);
} else {
ret = CollectionUtils.containsAny(classificationTypeAndSubTypes, classificationTypes);
}
return ret;
}
public boolean terminateSearch() { return terminateSearch; } public boolean terminateSearch() { return terminateSearch; }
public void terminateSearch(boolean terminateSearch) { this.terminateSearch = terminateSearch; } public void terminateSearch(boolean terminateSearch) { this.terminateSearch = terminateSearch; }
...@@ -284,18 +333,4 @@ public class SearchContext { ...@@ -284,18 +333,4 @@ public class SearchContext {
private AtlasEntityType getTermEntityType() { private AtlasEntityType getTermEntityType() {
return typeRegistry.getEntityTypeByName(TermSearchProcessor.ATLAS_GLOSSARY_TERM_ENTITY_TYPE); return typeRegistry.getEntityTypeByName(TermSearchProcessor.ATLAS_GLOSSARY_TERM_ENTITY_TYPE);
} }
public static boolean isFreeTextIndexEnabled() {
try {
return isIndexSolrBased() && ApplicationProperties.get().getBoolean(ApplicationProperties.FREE_TEXT_INDEX_ENABLED, ApplicationProperties.DEFAULT_FREE_TEXT_INDEX_ENABLED);
} catch (AtlasException e) {
String msg = String.format("Error encountered in fetching the configuration %s.", ApplicationProperties.FREE_TEXT_INDEX_ENABLED);
LOG.error(msg, e);
throw new RuntimeException(msg, e);
}
}
public static boolean isIndexSolrBased() {
return isIndexSolrBased;
}
} }
...@@ -29,12 +29,8 @@ import org.apache.atlas.repository.graphdb.AtlasGraphQuery; ...@@ -29,12 +29,8 @@ import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery; import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.type.AtlasArrayType; import org.apache.atlas.type.*;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasEnumType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute; import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.util.AtlasGremlinQueryProvider; import org.apache.atlas.util.AtlasGremlinQueryProvider;
import org.apache.atlas.util.SearchPredicateUtil.*; import org.apache.atlas.util.SearchPredicateUtil.*;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
...@@ -46,15 +42,7 @@ import org.slf4j.LoggerFactory; ...@@ -46,15 +42,7 @@ import org.slf4j.LoggerFactory;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.ArrayList; import java.util.*;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import static org.apache.atlas.util.SearchPredicateUtil.*; import static org.apache.atlas.util.SearchPredicateUtil.*;
......
...@@ -165,13 +165,15 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang ...@@ -165,13 +165,15 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Processing changed typedefs {}", changedTypeDefs); LOG.debug("Processing changed typedefs {}", changedTypeDefs);
} }
AtlasGraphManagement management = null; AtlasGraphManagement management = null;
try { try {
management = provider.get().getManagementSystem(); management = provider.get().getManagementSystem();
// Update index for newly created types // Update index for newly created types
if (CollectionUtils.isNotEmpty(changedTypeDefs.getCreateTypeDefs())) { if (CollectionUtils.isNotEmpty(changedTypeDefs.getCreatedTypeDefs())) {
for (AtlasBaseTypeDef typeDef : changedTypeDefs.getCreateTypeDefs()) { for (AtlasBaseTypeDef typeDef : changedTypeDefs.getCreatedTypeDefs()) {
updateIndexForTypeDef(management, typeDef); updateIndexForTypeDef(management, typeDef);
} }
} }
...@@ -196,7 +198,8 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang ...@@ -196,7 +198,8 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
LOG.error("Failed to update indexes for changed typedefs", e); LOG.error("Failed to update indexes for changed typedefs", e);
attemptRollback(changedTypeDefs, management); attemptRollback(changedTypeDefs, management);
} }
notifyChangeListeners();
notifyChangeListeners(changedTypeDefs);
} }
public Set<String> getVertexIndexKeys() { public Set<String> getVertexIndexKeys() {
...@@ -805,10 +808,10 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang ...@@ -805,10 +808,10 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
LOG.info("Index creation for type {} complete", typeDef.getName()); LOG.info("Index creation for type {} complete", typeDef.getName());
} }
private void notifyChangeListeners() { private void notifyChangeListeners(ChangedTypeDefs changedTypeDefs) {
for (IndexChangeListener indexChangeListener : indexChangeListeners) { for (IndexChangeListener indexChangeListener : indexChangeListeners) {
try { try {
indexChangeListener.onChange(); indexChangeListener.onChange(changedTypeDefs);
} catch (Throwable t) { } catch (Throwable t) {
LOG.error("Error encountered in notifying the index change listener {}.", indexChangeListener.getClass().getName(), t); LOG.error("Error encountered in notifying the index change listener {}.", indexChangeListener.getClass().getName(), t);
//we need to throw exception if any of the listeners throw execption. //we need to throw exception if any of the listeners throw execption.
......
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
*/ */
package org.apache.atlas.repository.graph; package org.apache.atlas.repository.graph;
import org.apache.atlas.listener.ChangedTypeDefs;
public interface IndexChangeListener { public interface IndexChangeListener {
void onChange(); void onChange(ChangedTypeDefs changedTypeDefs);
} }
...@@ -18,13 +18,14 @@ ...@@ -18,13 +18,14 @@
package org.apache.atlas.repository.graph; package org.apache.atlas.repository.graph;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.discovery.SearchContext; import org.apache.atlas.listener.ChangedTypeDefs;
import org.apache.atlas.model.typedef.AtlasEntityDef; import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphIndexClient; import org.apache.atlas.repository.graphdb.AtlasGraphIndexClient;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.AtlasRepositoryConfiguration;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -35,6 +36,7 @@ import java.util.Map; ...@@ -35,6 +36,7 @@ import java.util.Map;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.DEFAULT_SEARCHWEIGHT; import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.DEFAULT_SEARCHWEIGHT;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_TEXT_KEY; import static org.apache.atlas.repository.Constants.CLASSIFICATION_TEXT_KEY;
import static org.apache.atlas.repository.Constants.TYPE_NAME_PROPERTY_KEY;
/** /**
This is a component that will go through all entity type definitions and create free text index This is a component that will go through all entity type definitions and create free text index
...@@ -45,6 +47,8 @@ public class SolrIndexHelper implements IndexChangeListener { ...@@ -45,6 +47,8 @@ public class SolrIndexHelper implements IndexChangeListener {
private static final Logger LOG = LoggerFactory.getLogger(SolrIndexHelper.class); private static final Logger LOG = LoggerFactory.getLogger(SolrIndexHelper.class);
public static final int DEFAULT_SEARCHWEIGHT_FOR_STRINGS = 3; public static final int DEFAULT_SEARCHWEIGHT_FOR_STRINGS = 3;
public static final int SEARCHWEIGHT_FOR_CLASSIFICATIONS = 10;
public static final int SEARCHWEIGHT_FOR_TYPENAME = 1;
private final AtlasTypeRegistry typeRegistry; private final AtlasTypeRegistry typeRegistry;
...@@ -54,12 +58,9 @@ public class SolrIndexHelper implements IndexChangeListener { ...@@ -54,12 +58,9 @@ public class SolrIndexHelper implements IndexChangeListener {
} }
@Override @Override
public void onChange() { public void onChange(ChangedTypeDefs changedTypeDefs) {
LOG.info("SolrIndexHelper.onChange()"); if (!AtlasRepositoryConfiguration.isFreeTextSearchEnabled() ||
changedTypeDefs == null || !changedTypeDefs.hasEntityDef()) { // nothing to do if there are no changes to entity-defs
if(!SearchContext.isIndexSolrBased()) {
LOG.warn("Not a Solr based index store. Free text search is not supported");
return; return;
} }
...@@ -79,7 +80,8 @@ public class SolrIndexHelper implements IndexChangeListener { ...@@ -79,7 +80,8 @@ public class SolrIndexHelper implements IndexChangeListener {
Map<String, Integer> attributesWithSearchWeights = new HashMap<>(); Map<String, Integer> attributesWithSearchWeights = new HashMap<>();
Collection<AtlasEntityDef> allEntityDefs = typeRegistry.getAllEntityDefs(); Collection<AtlasEntityDef> allEntityDefs = typeRegistry.getAllEntityDefs();
attributesWithSearchWeights.put(CLASSIFICATION_TEXT_KEY,10); attributesWithSearchWeights.put(CLASSIFICATION_TEXT_KEY, SEARCHWEIGHT_FOR_CLASSIFICATIONS);
attributesWithSearchWeights.put(TYPE_NAME_PROPERTY_KEY, SEARCHWEIGHT_FOR_TYPENAME);
if (CollectionUtils.isNotEmpty(allEntityDefs)) { if (CollectionUtils.isNotEmpty(allEntityDefs)) {
for (AtlasEntityDef entityDef : allEntityDefs) { for (AtlasEntityDef entityDef : allEntityDefs) {
...@@ -106,14 +108,14 @@ public class SolrIndexHelper implements IndexChangeListener { ...@@ -106,14 +108,14 @@ public class SolrIndexHelper implements IndexChangeListener {
//this will make the string data searchable like in FullTextIndex Searcher using Free Text searcher. //this will make the string data searchable like in FullTextIndex Searcher using Free Text searcher.
searchWeight = DEFAULT_SEARCHWEIGHT_FOR_STRINGS; searchWeight = DEFAULT_SEARCHWEIGHT_FOR_STRINGS;
} else if (!GraphBackedSearchIndexer.isValidSearchWeight(searchWeight)) { //validate the value provided in the model. } else if (!GraphBackedSearchIndexer.isValidSearchWeight(searchWeight)) { //validate the value provided in the model.
String msg = String.format("Invalid search weight '%d' for attribute %s.%s", searchWeight, entityDef.getName(), attributeName); LOG.warn("Invalid search weight {} for attribute {}.{}. Will use default {}", searchWeight, entityDef.getName(), attributeName, DEFAULT_SEARCHWEIGHT_FOR_STRINGS);
LOG.error(msg);
throw new RuntimeException(msg); searchWeight = DEFAULT_SEARCHWEIGHT_FOR_STRINGS;
} }
LOG.info("Applying search weight {} for attribute {}.{}", searchWeight, entityDef.getName(), attributeName); if (LOG.isDebugEnabled()) {
LOG.debug("Applying search weight {} for attribute {}.{}", searchWeight, entityDef.getName(), attributeName);
}
attributesWithSearchWeights.put(attributeName, searchWeight); attributesWithSearchWeights.put(attributeName, searchWeight);
} }
......
...@@ -51,7 +51,8 @@ public class AtlasPatchManager { ...@@ -51,7 +51,8 @@ public class AtlasPatchManager {
public void applyAll() { public void applyAll() {
final AtlasPatchHandler handlers[] = { final AtlasPatchHandler handlers[] = {
new UniqueAttributePatch(context), new UniqueAttributePatch(context),
new ClassificationTextPatch(context) new ClassificationTextPatch(context),
new FreeTextRequestHandlerPatch(context)
}; };
try { try {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.patches;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.listener.ChangedTypeDefs;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.repository.graph.SolrIndexHelper;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
public class FreeTextRequestHandlerPatch extends AtlasPatchHandler {
private static final Logger LOG = LoggerFactory.getLogger(FreeTextRequestHandlerPatch.class);
private static final String PATCH_ID = "JAVA_PATCH_0000_003";
private static final String PATCH_DESCRIPTION = "Creates Solr request handler for use in free-text searches";
private final PatchContext context;
public FreeTextRequestHandlerPatch(PatchContext context) {
super(context.getPatchRegistry(), PATCH_ID, PATCH_DESCRIPTION);
this.context = context;
}
@Override
public void apply() throws AtlasBaseException {
AtlasTypeRegistry typeRegistry = context.getTypeRegistry();
Collection<AtlasEntityDef> entityDefs = typeRegistry.getAllEntityDefs();
if (CollectionUtils.isNotEmpty(entityDefs)) {
SolrIndexHelper indexHelper = new SolrIndexHelper(typeRegistry);
ChangedTypeDefs changedTypeDefs = new ChangedTypeDefs(null, new ArrayList<>(entityDefs), null);
indexHelper.onChange(changedTypeDefs);
}
setStatus(APPLIED);
LOG.info("FreeTextRequestHandlerPatch.apply(): patchId={}, status={}", getPatchId(), getStatus());
}
}
...@@ -481,17 +481,13 @@ public class AtlasEntityChangeNotifier { ...@@ -481,17 +481,13 @@ public class AtlasEntityChangeNotifier {
} }
private void doFullTextMapping(List<AtlasEntityHeader> entityHeaders) { private void doFullTextMapping(List<AtlasEntityHeader> entityHeaders) {
if (CollectionUtils.isEmpty(entityHeaders)) { if(AtlasRepositoryConfiguration.isFreeTextSearchEnabled() || !AtlasRepositoryConfiguration.isFullTextSearchEnabled()) {
return; return;
} }
try { if (CollectionUtils.isEmpty(entityHeaders)) {
if(!AtlasRepositoryConfiguration.isFullTextSearchEnabled()) {
return; return;
} }
} catch (AtlasException e) {
LOG.warn("Unable to determine if FullText is disabled. Proceeding with FullText mapping");
}
MetricRecorder metric = RequestContext.get().startMetricRecord("fullTextMapping"); MetricRecorder metric = RequestContext.get().startMetricRecord("fullTextMapping");
...@@ -520,13 +516,9 @@ public class AtlasEntityChangeNotifier { ...@@ -520,13 +516,9 @@ public class AtlasEntityChangeNotifier {
} }
private void updateFullTextMapping(String entityId, List<AtlasClassification> classifications) { private void updateFullTextMapping(String entityId, List<AtlasClassification> classifications) {
try { if(AtlasRepositoryConfiguration.isFreeTextSearchEnabled() || !AtlasRepositoryConfiguration.isFullTextSearchEnabled()) {
if(!AtlasRepositoryConfiguration.isFullTextSearchEnabled()) {
return; return;
} }
} catch (AtlasException e) {
LOG.warn("Unable to determine if FullText is disabled. Proceeding with FullText mapping");
}
if (StringUtils.isEmpty(entityId) || CollectionUtils.isEmpty(classifications)) { if (StringUtils.isEmpty(entityId) || CollectionUtils.isEmpty(classifications)) {
return; return;
......
...@@ -48,7 +48,6 @@ public class AtlasRepositoryConfiguration { ...@@ -48,7 +48,6 @@ public class AtlasRepositoryConfiguration {
private static final Integer DEFAULT_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS = Integer.valueOf(15); private static final Integer DEFAULT_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS = Integer.valueOf(15);
private static final String CONFIG_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS = "atlas.server.type.update.lock.max.wait.time.seconds"; private static final String CONFIG_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS = "atlas.server.type.update.lock.max.wait.time.seconds";
private static final String ENABLE_FULLTEXT_SEARCH_PROPERTY = "atlas.search.fulltext.enable";
private static final String JANUS_GRAPH_DATABASE_IMPLEMENTATION_CLASS = "org.apache.atlas.repository.graphdb.janus.AtlasJanusGraphDatabase"; private static final String JANUS_GRAPH_DATABASE_IMPLEMENTATION_CLASS = "org.apache.atlas.repository.graphdb.janus.AtlasJanusGraphDatabase";
private static final String DEFAULT_GRAPH_DATABASE_IMPLEMENTATION_CLASS = JANUS_GRAPH_DATABASE_IMPLEMENTATION_CLASS; private static final String DEFAULT_GRAPH_DATABASE_IMPLEMENTATION_CLASS = JANUS_GRAPH_DATABASE_IMPLEMENTATION_CLASS;
...@@ -56,13 +55,25 @@ public class AtlasRepositoryConfiguration { ...@@ -56,13 +55,25 @@ public class AtlasRepositoryConfiguration {
private static List<String> skippedOperations = null; private static List<String> skippedOperations = null;
private static final String ENTITY_NOTIFICATION_VERSION_PROPERTY = "atlas.notification.entity.version"; private static final String ENTITY_NOTIFICATION_VERSION_PROPERTY = "atlas.notification.entity.version";
private static boolean isInitialized = false;
private static boolean isFullTextSearchEnabled = true;
private static boolean isFreeTextSearchEnabled = true;
/** /**
* Configures whether the full text vertex property is populated. Turning this off * Configures whether the full text vertex property is populated. Turning this off
* effectively disables full text searches, since all no entities created or updated after * effectively disables full text searches, since all no entities created or updated after
* turning this off will match full text searches. * turning this off will match full text searches.
*/ */
public static boolean isFullTextSearchEnabled() throws AtlasException { public static boolean isFullTextSearchEnabled() {
return ApplicationProperties.get().getBoolean(ENABLE_FULLTEXT_SEARCH_PROPERTY, true); initialize();
return isFullTextSearchEnabled;
}
public static boolean isFreeTextSearchEnabled() {
initialize();
return isFreeTextSearchEnabled;
} }
public static boolean isV2EntityNotificationEnabled() { public static boolean isV2EntityNotificationEnabled() {
...@@ -236,4 +247,26 @@ public class AtlasRepositoryConfiguration { ...@@ -236,4 +247,26 @@ public class AtlasRepositoryConfiguration {
return ret == null ? DEFAULT_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS : ret; return ret == null ? DEFAULT_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS : ret;
} }
private static void initialize() {
if (!isInitialized) {
try {
isFreeTextSearchEnabled = ApplicationProperties.get().getBoolean(ApplicationProperties.ENABLE_FREETEXT_SEARCH_CONF, true);
if (isFreeTextSearchEnabled) { // currently free-text is supported only for Solr
isFreeTextSearchEnabled = ApplicationProperties.INDEX_BACKEND_SOLR.equalsIgnoreCase(ApplicationProperties.get().getString(ApplicationProperties.INDEX_BACKEND_CONF));
}
if (isFreeTextSearchEnabled) { // if free-text is enabled, disable full-text - to avoid performance penalty
isFullTextSearchEnabled = false;
} else {
isFullTextSearchEnabled = ApplicationProperties.get().getBoolean(ApplicationProperties.ENABLE_FULLTEXT_SEARCH_CONF, true);
}
isInitialized = true;
} catch (AtlasException excp) {
LOG.error("Failed to initialize. isFullTextSearchEnabled={}, isFreeTextSearchEnabled={}", isFullTextSearchEnabled, isFreeTextSearchEnabled, excp);
}
}
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment