Commit 4c6f1d16 by Madhan Neethiraj

ATLAS-2827: fix to handle failure in saving indexable string property of large size - #2

parent f31a7c63
......@@ -38,6 +38,8 @@ public enum AtlasConfiguration {
NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds", 15 * 60),
NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS("atlas.notification.split.message.buffer.purge.interval.seconds", 5 * 60),
GRAPHSTORE_INDEXED_STRING_SAFE_LENGTH("atlas.graphstore.indexed.string.safe.length", Short.MAX_VALUE), // based on org.apache.hadoop.hbase.client.Mutation.checkRow()
//search configuration
SEARCH_MAX_LIMIT("atlas.search.maxlimit", 10000),
SEARCH_DEFAULT_LIMIT("atlas.search.defaultlimit", 100);
......
......@@ -18,6 +18,7 @@
package org.apache.atlas.repository.store.graph.v2;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
......@@ -35,7 +36,6 @@ import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.RepositoryException;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.graph.GraphHelper;
......@@ -87,7 +87,8 @@ import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelation
@Component
public class EntityGraphMapper {
private static final Logger LOG = LoggerFactory.getLogger(EntityGraphMapper.class);
private static final int INDEXED_STR_MAX_ALLOWED_LEN = 33482223;
private static final int INDEXED_STR_SAFE_LEN = AtlasConfiguration.GRAPHSTORE_INDEXED_STRING_SAFE_LENGTH.getInt();
private final GraphHelper graphHelper = GraphHelper.getInstance();
private final AtlasGraph graph;
......@@ -619,13 +620,32 @@ public class EntityGraphMapper {
// Janus bug, when an indexed string attribute has a value longer than a certain length then the reverse indexed key generated by JanusGraph
// exceeds the HBase row length's hard limit (Short.MAX). This trimming and hashing procedure is to circumvent that limitation
if (ret != null && isIndexableStrAttr) {
String value = (String) ctx.getValue();
String value = ret.toString();
if (value.length() > INDEXED_STR_SAFE_LEN) {
RequestContext requestContext = RequestContext.get();
final int trimmedLength;
if (requestContext.getAttemptCount() <= 1) { // if this is the first attempt, try saving as it is; trim on retry
trimmedLength = value.length();
} else if (requestContext.getAttemptCount() >= requestContext.getMaxAttempts()) { // if this is the last attempt, set to 'safe_len'
trimmedLength = INDEXED_STR_SAFE_LEN;
} else if (requestContext.getAttemptCount() == 2) { // based on experimentation, string length of 4 times 'safe_len' succeeds
trimmedLength = Math.min(4 * INDEXED_STR_SAFE_LEN, value.length());
} else if (requestContext.getAttemptCount() == 3) { // if length of 4 times 'safe_len' failed, try twice 'safe_len'
trimmedLength = Math.min(2 * INDEXED_STR_SAFE_LEN, value.length());
} else { // if twice the 'safe_len' failed, trim to 'safe_len'
trimmedLength = INDEXED_STR_SAFE_LEN;
}
if (trimmedLength < value.length()) {
LOG.warn("Indexed-String-Attribute: {} length is {} characters, trimming to {}", ctx.getAttribute().getQualifiedName(), value.length(), trimmedLength);
if (value.length() > INDEXED_STR_MAX_ALLOWED_LEN) {
LOG.warn("Indexed-String-Attribute: {} exceeds {} characters, trimming and appending checksum",
ctx.getAttribute().getQualifiedName(), INDEXED_STR_MAX_ALLOWED_LEN);
String sha256Hex = DigestUtils.shaHex(value); // Storing SHA checksum in case verification is needed after retrieval
ret = value.substring(0, (INDEXED_STR_MAX_ALLOWED_LEN - 1) - sha256Hex.length()) + ":" + sha256Hex;
String checksumSuffix = ":" + DigestUtils.shaHex(value); // Storing SHA checksum in case verification is needed after retrieval
ret = value.substring(0, trimmedLength - checksumSuffix.length()) + checksumSuffix;
}
}
}
......
......@@ -42,6 +42,8 @@ public class RequestContext {
private String user;
private Set<String> userGroups;
private String clientIPAddress;
private int maxAttempts = 1;
private int attemptCount = 1;
private RequestContext() {
......@@ -95,6 +97,23 @@ public class RequestContext {
this.clientIPAddress = clientIPAddress;
}
public int getMaxAttempts() {
return maxAttempts;
}
public void setMaxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
}
public int getAttemptCount() {
return attemptCount;
}
public void setAttemptCount(int attemptCount) {
this.attemptCount = attemptCount;
}
public void recordEntityUpdate(AtlasObjectId entity) {
if (entity != null && entity.getGuid() != null) {
updatedEntities.put(entity.getGuid(), entity);
......
......@@ -376,6 +376,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
try {
RequestContext requestContext = RequestContext.get();
requestContext.setAttemptCount(numRetries + 1);
requestContext.setMaxAttempts(maxRetries);
requestContext.setUser(messageUser, null);
switch (message.getType()) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment