Commit eae97618 by Madhan Neethiraj

ATLAS-2827: fix to handle failure in saving indexable string property of large size - #3

parent 4c6f1d16
......@@ -715,6 +715,7 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
EntityGraphDiscovery graphDiscoverer = new AtlasEntityGraphDiscoveryV2(typeRegistry, entityStream);
EntityGraphDiscoveryContext discoveryContext = graphDiscoverer.discoverEntities();
EntityMutationContext context = new EntityMutationContext(discoveryContext);
RequestContext requestContext = RequestContext.get();
for (String guid : discoveryContext.getReferencedGuids()) {
AtlasVertex vertex = discoveryContext.getResolvedEntityVertex(guid);
......@@ -734,6 +735,8 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
if (!StringUtils.equals(guidVertex, guid)) { // if entity was found by unique attribute
entity.setGuid(guidVertex);
requestContext.recordEntityGuidUpdate(entity, guid);
}
context.addUpdated(guid, entity, entityType, vertex);
......@@ -756,6 +759,8 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
entity.setGuid(generatedGuid);
requestContext.recordEntityGuidUpdate(entity, guid);
context.addCreated(guid, entity, entityType, vertex);
}
......
......@@ -640,11 +640,13 @@ public class EntityGraphMapper {
}
if (trimmedLength < value.length()) {
LOG.warn("Indexed-String-Attribute: {} length is {} characters, trimming to {}", ctx.getAttribute().getQualifiedName(), value.length(), trimmedLength);
LOG.warn("Length of indexed attribute {} is {} characters, longer than safe-limit {}; trimming to {} - attempt #{}", ctx.getAttribute().getQualifiedName(), value.length(), INDEXED_STR_SAFE_LEN, trimmedLength, requestContext.getAttemptCount());
String checksumSuffix = ":" + DigestUtils.shaHex(value); // Storing SHA checksum in case verification is needed after retrieval
ret = value.substring(0, trimmedLength - checksumSuffix.length()) + checksumSuffix;
} else {
LOG.warn("Length of indexed attribute {} is {} characters, longer than safe-limit {}", ctx.getAttribute().getQualifiedName(), value.length(), INDEXED_STR_SAFE_LEN);
}
}
}
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2;
import org.apache.atlas.exception.AtlasBaseException;
public interface InstanceGraphMapper<T> {
/**
* Map the given type instance to the graph
*
* @param ctx
* @return the value that was mapped to the vertex
* @throws AtlasBaseException
*/
T toGraph(AttributeMutationContext ctx, EntityMutationContext context) throws AtlasBaseException;
}
......@@ -19,6 +19,7 @@
package org.apache.atlas;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.commons.lang.StringUtils;
......@@ -38,6 +39,7 @@ public class RequestContext {
private final Map<String, List<AtlasClassification>> addedPropagations = new HashMap<>();
private final Map<String, List<AtlasClassification>> removedPropagations = new HashMap<>();
private final long requestTime = System.currentTimeMillis();
private List<EntityGuidPair> entityGuidInRequest = null;
private String user;
private Set<String> userGroups;
......@@ -71,6 +73,10 @@ public class RequestContext {
instance.entityCacheV2.clear();
instance.addedPropagations.clear();
instance.removedPropagations.clear();
if (instance.entityGuidInRequest != null) {
instance.entityGuidInRequest.clear();
}
}
CURRENT_CONTEXT.remove();
......@@ -202,4 +208,34 @@ public class RequestContext {
public boolean isDeletedEntity(String guid) {
return deletedEntities.containsKey(guid);
}
public void recordEntityGuidUpdate(AtlasEntity entity, String guidInRequest) {
if (entityGuidInRequest == null) {
entityGuidInRequest = new ArrayList<>();
}
entityGuidInRequest.add(new EntityGuidPair(entity, guidInRequest));
}
public void resetEntityGuidUpdates() {
if (entityGuidInRequest != null) {
for (EntityGuidPair entityGuidPair : entityGuidInRequest) {
entityGuidPair.resetEntityGuid();
}
}
}
public class EntityGuidPair {
private final AtlasEntity entity;
private final String guid;
public EntityGuidPair(AtlasEntity entity, String guid) {
this.entity = entity;
this.guid = guid;
}
public void resetEntityGuid() {
entity.setGuid(guid);
}
}
}
......@@ -521,6 +521,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
break;
} catch (Throwable e) {
RequestContext.get().resetEntityGuidUpdates();
LOG.warn("Error handling message", e);
try {
LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment