Commit 6fb2a038 by ashutoshm Committed by Madhan Neethiraj

ATLAS-1995: updated entity-lookup-by-unique-attributes to use indexQuery

parent d8b86833
......@@ -168,7 +168,6 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
}
@Override
@GraphTransaction
public ITypedReferenceableInstance getEntityDefinition(String guid) throws RepositoryException, EntityNotFoundException {
return getEntityDefinitions(guid).get(0);
}
......
......@@ -378,6 +378,12 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
}
}
try {
ttr.updateTypes(ret);
} catch (AtlasBaseException e) { // this shouldn't happen, as the types were already validated
LOG.error("failed to update the registry after updating the store", e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTypeDefGraphStore.createUpdateTypesDef({}, {}): {}", typesToCreate, typesToUpdate, ret);
}
......
......@@ -307,7 +307,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
entity.setGuid(guid);
return createOrUpdate(new AtlasEntityStream(updatedEntityInfo), true);
return createOrUpdate(new AtlasEntityStream(updatedEntityInfo), true, false);
}
@Override
......@@ -358,7 +358,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
throw new AtlasBaseException(AtlasErrorCode.ATTRIBUTE_UPDATE_NOT_SUPPORTED, attrName, attrType.getTypeName());
}
return createOrUpdate(new AtlasEntityStream(updateEntity), true);
return createOrUpdate(new AtlasEntityStream(updateEntity), true, false);
}
@Override
......
......@@ -18,7 +18,9 @@
package org.apache.atlas.repository.store.graph.v1;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.discovery.SearchProcessor;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasEntity;
......@@ -29,12 +31,14 @@ import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasElement;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -43,6 +47,7 @@ import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
/**
* Utility methods for Graph.
......@@ -55,6 +60,19 @@ public class AtlasGraphUtilsV1 {
public static final String VERTEX_TYPE = "typeSystem";
public static final String RELATIONSHIPTYPE_EDGE_LABEL = PROPERTY_PREFIX + ".relationshipType";
private static boolean USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES = false;
static {
try {
Configuration conf = ApplicationProperties.get();
USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES = conf.getBoolean("atlas.use.index.query.to.find.entity.by.unique.attributes", USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES);
} catch (Exception excp) {
LOG.error("Error reading configuration", excp);
} finally {
LOG.info("atlas.use.index.query.to.find.entity.by.unique.attributes=" + USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES);
}
}
public static String getTypeDefPropertyKey(AtlasBaseTypeDef typeDef) {
return getTypeDefPropertyKey(typeDef.getName());
......@@ -217,13 +235,22 @@ public class AtlasGraphUtilsV1 {
continue;
}
vertex = AtlasGraphUtilsV1.findByTypeAndPropertyName(entityType.getTypeName(), attribute.getVertexPropertyName(), attrValue);
if (canUseIndexQuery(entityType, attribute.getName())) {
vertex = AtlasGraphUtilsV1.getAtlasVertexFromIndexQuery(entityType, attribute, attrValue);
} else {
vertex = AtlasGraphUtilsV1.findByTypeAndPropertyName(entityType.getTypeName(), attribute.getVertexPropertyName(), attrValue);
if (vertex == null) {
vertex = AtlasGraphUtilsV1.findBySuperTypeAndPropertyName(entityType.getTypeName(), attribute.getVertexPropertyName(), attrValue);
if (vertex == null) {
vertex = AtlasGraphUtilsV1.findBySuperTypeAndPropertyName(entityType.getTypeName(), attribute.getVertexPropertyName(), attrValue);
}
}
if (vertex != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("findByUniqueAttributes(type={}, attrName={}, attrValue={}: found vertex {}",
entityType.getTypeName(), attribute.getName(), attrValue, vertex);
}
break;
}
}
......@@ -366,4 +393,77 @@ public class AtlasGraphUtilsV1 {
public static String getStateAsString(AtlasElement element) {
return element.getProperty(Constants.STATE_PROPERTY_KEY, String.class);
}
private static boolean canUseIndexQuery(AtlasEntityType entityType, String attributeName) {
boolean ret = false;
if (USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES) {
final String typeAndSubTypesQryStr = entityType.getTypeAndAllSubTypesQryStr();
ret = typeAndSubTypesQryStr.length() <= SearchProcessor.MAX_QUERY_STR_LENGTH_TYPES;
if (ret) {
Set<String> indexSet = AtlasGraphProvider.getGraphInstance().getVertexIndexKeys();
try {
ret = indexSet.contains(entityType.getQualifiedAttributeName(attributeName));
}
catch (AtlasBaseException ex) {
ret = false;
}
}
}
return ret;
}
private static AtlasVertex getAtlasVertexFromIndexQuery(AtlasEntityType entityType, AtlasAttribute attribute, Object attrVal) {
String propertyName = attribute.getVertexPropertyName();
AtlasIndexQuery query = getIndexQuery(entityType, propertyName, attrVal.toString());
for (Iterator<AtlasIndexQuery.Result> iter = query.vertices(); iter.hasNext(); ) {
AtlasIndexQuery.Result result = iter.next();
AtlasVertex vertex = result.getVertex();
// skip non-entity vertices, if any got returned
if (vertex == null || !vertex.getPropertyKeys().contains(Constants.GUID_PROPERTY_KEY)) {
continue;
}
// verify the typeName
String typeNameInVertex = getTypeName(vertex);
if (!entityType.getTypeAndAllSubTypes().contains(typeNameInVertex)) {
LOG.warn("incorrect vertex type from index-query: expected='{}'; found='{}'", entityType.getTypeName(), typeNameInVertex);
continue;
}
if (attrVal.getClass() == String.class) {
String s = (String) attrVal;
String vertexVal = vertex.getProperty(propertyName, String.class);
if (!s.equalsIgnoreCase(vertexVal)) {
LOG.warn("incorrect match from index-query for property {}: expected='{}'; found='{}'", propertyName, s, vertexVal);
continue;
}
}
return vertex;
}
return null;
}
private static AtlasIndexQuery getIndexQuery(AtlasEntityType entityType, String propertyName, String value) {
StringBuilder sb = new StringBuilder();
sb.append("v.\"").append(Constants.TYPE_NAME_PROPERTY_KEY).append("\":").append(entityType.getTypeAndAllSubTypesQryStr())
.append(" AND ")
.append("v.\"").append(propertyName).append("\":").append(AtlasAttribute.escapeIndexQueryValue(value))
.append(" AND ")
.append("v.\"").append(Constants.STATE_PROPERTY_KEY).append("\":ACTIVE");
return AtlasGraphProvider.getGraphInstance().indexQuery(Constants.VERTEX_INDEX, sb.toString());
}
}
......@@ -474,7 +474,6 @@ public class GraphBackedMetadataRepositoryTest {
return guid;
}
@GraphTransaction
AtlasVertex getTableEntityVertex() {
AtlasGraph graph = TestUtils.getGraph();
AtlasGraphQuery query = graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, ComparisionOperator.EQUAL, TestUtils.TABLE_TYPE);
......@@ -651,6 +650,7 @@ public class GraphBackedMetadataRepositoryTest {
}
@Test(dependsOnMethods = "testCreateEntity")
@GraphTransaction
public void testGetIdFromVertex() throws Exception {
AtlasVertex tableVertex = getTableEntityVertex();
......@@ -664,6 +664,7 @@ public class GraphBackedMetadataRepositoryTest {
}
@Test(dependsOnMethods = "testCreateEntity")
@GraphTransaction
public void testGetTypeName() throws Exception {
AtlasVertex tableVertex = getTableEntityVertex();
Assert.assertEquals(GraphHelper.getTypeName(tableVertex), TestUtils.TABLE_TYPE);
......
......@@ -41,6 +41,7 @@ import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.services.MetadataService;
import org.apache.atlas.store.AtlasTypeDefStore;
......@@ -129,7 +130,11 @@ public abstract class AtlasDeleteHandlerV1Test {
ImmutableList.<AtlasClassificationDef>of(),
ImmutableList.of(mapValueDef, mapOwnerDef));
typeDefStore.createTypesDef(typesDef);
AtlasTypesDef typesToCreate = AtlasTypeDefStoreInitializer.getTypesToCreate(typesDef, typeRegistry);
if (!typesToCreate.isEmpty()) {
typeDefStore.createTypesDef(typesToCreate);
}
compositeMapOwnerType = typeRegistry.getEntityTypeByName("CompositeMapOwner");
compositeMapValueType = typeRegistry.getEntityTypeByName("CompositeMapValue");
......
......@@ -30,6 +30,7 @@ import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
import org.apache.atlas.store.AtlasTypeDefStore;
......@@ -97,8 +98,13 @@ public abstract class AtlasRelationshipStoreV1Test {
}
init();
AtlasTypesDef testTypes = getInverseReferenceTestTypes();
typeDefStore.createTypesDef(testTypes);
AtlasTypesDef typesDef = getInverseReferenceTestTypes();
AtlasTypesDef typesToCreate = AtlasTypeDefStoreInitializer.getTypesToCreate(typesDef, typeRegistry);
if (!typesToCreate.isEmpty()) {
typeDefStore.createTypesDef(typesToCreate);
}
}
@BeforeTest
......
......@@ -42,6 +42,7 @@ import org.apache.atlas.service.Service;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.web.filters.AuditFilter;
import org.apache.atlas.web.service.ServiceState;
import org.apache.atlas.web.util.DateTimeHelper;
......@@ -71,6 +72,7 @@ import static org.apache.atlas.AtlasClientV2.*;
@Order(4)
public class NotificationHookConsumer implements Service, ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger(NotificationHookConsumer.class);
private static final String LOCALHOST = "localhost";
private static Logger FAILED_LOG = LoggerFactory.getLogger("FAILED");
......@@ -236,113 +238,124 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
@VisibleForTesting
void handleMessage(AtlasKafkaMessage<HookNotificationMessage> kafkaMsg) throws AtlasServiceException, AtlasException {
AtlasPerfTracer perf = null;
HookNotificationMessage message = kafkaMsg.getMessage();
String messageUser = message.getUser();
// Used for intermediate conversions during create and update
AtlasEntity.AtlasEntitiesWithExtInfo entities;
for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
if (LOG.isDebugEnabled()) {
LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries);
}
try {
RequestContext requestContext = RequestContext.createContext();
requestContext.setUser(messageUser);
switch (message.getType()) {
case ENTITY_CREATE:
EntityCreateRequest createRequest = (EntityCreateRequest) message;
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, message.getType().name());
}
if (numRetries == 0) { // audit only on the first attempt
audit(messageUser, CREATE_ENTITY.getMethod(), CREATE_ENTITY.getPath());
}
try {
// Used for intermediate conversions during create and update
AtlasEntity.AtlasEntitiesWithExtInfo entities;
for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
if (LOG.isDebugEnabled()) {
LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries);
}
try {
RequestContext requestContext = RequestContext.createContext();
requestContext.setUser(messageUser);
entities = instanceConverter.toAtlasEntities(createRequest.getEntities());
switch (message.getType()) {
case ENTITY_CREATE:
EntityCreateRequest createRequest = (EntityCreateRequest) message;
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
break;
if (numRetries == 0) { // audit only on the first attempt
audit(messageUser, CREATE_ENTITY.getMethod(), CREATE_ENTITY.getPath());
}
case ENTITY_PARTIAL_UPDATE:
final EntityPartialUpdateRequest partialUpdateRequest = (EntityPartialUpdateRequest) message;
entities = instanceConverter.toAtlasEntities(createRequest.getEntities());
if (numRetries == 0) { // audit only on the first attempt
audit(messageUser, UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(),
String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(), partialUpdateRequest.getTypeName()));
}
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
break;
Referenceable referenceable = partialUpdateRequest.getEntity();
entities = instanceConverter.toAtlasEntity(referenceable);
case ENTITY_PARTIAL_UPDATE:
final EntityPartialUpdateRequest partialUpdateRequest = (EntityPartialUpdateRequest) message;
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, Object>() {
{
put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue());
if (numRetries == 0) { // audit only on the first attempt
audit(messageUser, UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(),
String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(), partialUpdateRequest.getTypeName()));
}
});
// There should only be one root entity
entities.getEntities().get(0).setGuid(guid);
Referenceable referenceable = partialUpdateRequest.getEntity();
entities = instanceConverter.toAtlasEntity(referenceable);
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), true);
break;
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, Object>() {
{
put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue());
}
});
case ENTITY_DELETE:
final EntityDeleteRequest deleteRequest = (EntityDeleteRequest) message;
// There should only be one root entity
entities.getEntities().get(0).setGuid(guid);
if (numRetries == 0) { // audit only on the first attempt
audit(messageUser, DELETE_ENTITY_BY_ATTRIBUTE.getMethod(),
String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(), deleteRequest.getTypeName()));
}
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), true);
break;
try {
AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName());
atlasEntityStore.deleteByUniqueAttributes(type,
new HashMap<String, Object>() {{
put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue());
}});
} catch (ClassCastException cle) {
LOG.error("Failed to do a partial update on Entity");
}
break;
case ENTITY_DELETE:
final EntityDeleteRequest deleteRequest = (EntityDeleteRequest) message;
case ENTITY_FULL_UPDATE:
EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
if (numRetries == 0) { // audit only on the first attempt
audit(messageUser, DELETE_ENTITY_BY_ATTRIBUTE.getMethod(),
String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(), deleteRequest.getTypeName()));
}
if (numRetries == 0) { // audit only on the first attempt
audit(messageUser, UPDATE_ENTITY.getMethod(), UPDATE_ENTITY.getPath());
}
try {
AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName());
atlasEntityStore.deleteByUniqueAttributes(type,
new HashMap<String, Object>() {{
put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue());
}});
} catch (ClassCastException cle) {
LOG.error("Failed to do a partial update on Entity");
}
break;
entities = instanceConverter.toAtlasEntities(updateRequest.getEntities());
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
break;
case ENTITY_FULL_UPDATE:
EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
default:
throw new IllegalStateException("Unknown notification type: " + message.getType().name());
}
if (numRetries == 0) { // audit only on the first attempt
audit(messageUser, UPDATE_ENTITY.getMethod(), UPDATE_ENTITY.getPath());
}
break;
} catch (Throwable e) {
LOG.warn("Error handling message", e);
try {
LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
Thread.sleep(consumerRetryInterval);
} catch (InterruptedException ie) {
LOG.error("Notification consumer thread sleep interrupted");
}
entities = instanceConverter.toAtlasEntities(updateRequest.getEntities());
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
break;
default:
throw new IllegalStateException("Unknown notification type: " + message.getType().name());
}
if (numRetries == (maxRetries - 1)) {
LOG.warn("Max retries exceeded for message {}", message, e);
failedMessages.add(message);
if (failedMessages.size() >= failedMsgCacheSize) {
recordFailedMessages();
break;
} catch (Throwable e) {
LOG.warn("Error handling message", e);
try {
LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
Thread.sleep(consumerRetryInterval);
} catch (InterruptedException ie) {
LOG.error("Notification consumer thread sleep interrupted");
}
if (numRetries == (maxRetries - 1)) {
LOG.warn("Max retries exceeded for message {}", message, e);
failedMessages.add(message);
if (failedMessages.size() >= failedMsgCacheSize) {
recordFailedMessages();
}
return;
}
return;
} finally {
RequestContext.clear();
RequestContextV1.clear();
}
} finally {
RequestContext.clear();
RequestContextV1.clear();
}
commit(kafkaMsg);
} finally {
AtlasPerfTracer.log(perf);
}
commit(kafkaMsg);
}
private void recordFailedMessages() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment