Commit 3468c2b4 by Ashutosh Mestry Committed by Sarath Subramanian

ATLAS-3874: NotificationHookConsumer: Concurrent Message Processing

Change-Id: Ia5ccc6aeea2b0c44493636243b7627d16ad32c8f Signed-off-by: 's avatarSarath Subramanian <sarath@apache.org>
parent 250c2eaf
......@@ -96,11 +96,17 @@ public final class EntityGraphDiscoveryContext {
}
public AtlasVertex getResolvedEntityVertex(AtlasObjectId objId) {
if (objId instanceof AtlasRelatedObjectId) {
if (resolvedIdsByUniqAttribs.containsKey(objId)) {
return getAtlasVertexFromResolvedIdsByAttribs(objId);
} else if (objId instanceof AtlasRelatedObjectId) {
objId = new AtlasObjectId(objId.getGuid(), objId.getTypeName(), objId.getUniqueAttributes());
}
AtlasVertex vertex = resolvedIdsByUniqAttribs.get(objId);
return getAtlasVertexFromResolvedIdsByAttribs(objId);
}
private AtlasVertex getAtlasVertexFromResolvedIdsByAttribs(AtlasObjectId objId) {
AtlasVertex vertex = resolvedIdsByUniqAttribs.get(objId);
// check also for sub-types; ref={typeName=Asset; guid=abcd} should match {typeName=hive_table; guid=abcd}
if (vertex == null) {
final AtlasEntityType entityType = typeRegistry.getEntityTypeByName(objId.getTypeName());
......
......@@ -117,6 +117,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private static final String TYPE_HIVE_COLUMN_LINEAGE = "hive_column_lineage";
private static final String ATTRIBUTE_INPUTS = "inputs";
private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
private static final String EXCEPTION_CLASS_NAME_JANUSGRAPH_EXCEPTION = "JanusGraphException";
private static final String EXCEPTION_CLASS_NAME_PERMANENTLOCKING_EXCEPTION = "PermanentLockingException";
// from org.apache.hadoop.hive.ql.parse.SemanticAnalyzer
public static final String DUMMY_DATABASE = "_dummy_database";
......@@ -624,6 +626,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
// Used for intermediate conversions during create and update
String exceptionClassName = StringUtils.EMPTY;
for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
if (LOG.isDebugEnabled()) {
LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries);
......@@ -783,9 +786,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
throw new IllegalStateException("Unknown notification type: " + message.getType().name());
}
if (StringUtils.isNotEmpty(exceptionClassName)) {
LOG.warn("{}: Pausing & retry: Try: {}: Pause: {} ms. Handled!",
exceptionClassName, numRetries, adaptiveWaiter.waitDuration);
exceptionClassName = StringUtils.EMPTY;
}
break;
} catch (Throwable e) {
RequestContext.get().resetEntityGuidUpdates();
exceptionClassName = e.getClass().getSimpleName();
if (numRetries == (maxRetries - 1)) {
String strMessage = AbstractNotification.getMessageJson(message);
......@@ -800,6 +809,13 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
recordFailedMessages();
}
return;
} else if (e instanceof org.apache.atlas.repository.graphdb.AtlasSchemaViolationException
|| exceptionClassName.equals(EXCEPTION_CLASS_NAME_JANUSGRAPH_EXCEPTION)
|| exceptionClassName.equals(EXCEPTION_CLASS_NAME_PERMANENTLOCKING_EXCEPTION)) {
LOG.warn("{}: Pausing & retry: Try: {}: Pause: {} ms. {}",
exceptionClassName, numRetries, adaptiveWaiter.waitDuration, e.getMessage());
adaptiveWaiter.pause((Exception) e);
} else {
LOG.warn("Error handling message", e);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment