Commit 18745cf4 by ashutoshm Committed by Madhan Neethiraj

ATLAS-1944: Implemented ShutdownableThread for HookConsumer

parent b0470f50
......@@ -19,6 +19,7 @@ package org.apache.atlas.notification;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import kafka.utils.ShutdownableThread;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
......@@ -28,7 +29,11 @@ import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest;
import org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequest;
import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest;
import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
......@@ -41,11 +46,12 @@ import org.apache.atlas.web.filters.AuditFilter;
import org.apache.atlas.web.service.ServiceState;
import org.apache.atlas.web.util.DateTimeHelper;
import org.apache.commons.configuration.Configuration;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.apache.kafka.common.TopicPartition;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Date;
......@@ -56,14 +62,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.atlas.AtlasClientV2.CREATE_ENTITY;
import static org.apache.atlas.AtlasClientV2.DELETE_ENTITY_BY_ATTRIBUTE;
import static org.apache.atlas.AtlasClientV2.UPDATE_ENTITY;
import static org.apache.atlas.AtlasClientV2.UPDATE_ENTITY_BY_ATTRIBUTE;
import static org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest;
import static org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequest;
import static org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest;
import static org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
import static org.apache.atlas.AtlasClientV2.*;
/**
* Consumer of notifications from hooks e.g., hive hook etc.
......@@ -80,7 +79,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries";
public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize";
public static final String CONSUMER_RETRY_INTERVAL="atlas.notification.consumer.retry.interval";
public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval";
public static final int SERVER_READY_WAIT_TIME_MS = 1000;
private final AtlasEntityStore atlasEntityStore;
......@@ -177,7 +176,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
/**
* Start Kafka consumer threads that read from Kafka topic when server is activated.
*
* <p>
* Since the consumers create / update entities to the shared backend store, only the active instance
* should perform this activity. Hence, these threads are started only on server activation.
*/
......@@ -189,7 +188,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
/**
* Stop Kafka consumer threads that read from Kafka topic when server is de-activated.
*
* <p>
* Since the consumers create / update entities to the shared backend store, only the active instance
* should perform this activity. Hence, these threads are stopped only on server deactivation.
*/
......@@ -205,18 +204,18 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
}
class HookConsumer implements Runnable {
class HookConsumer extends ShutdownableThread {
private final NotificationConsumer<HookNotificationMessage> consumer;
private final AtomicBoolean shouldRun = new AtomicBoolean(false);
private List<HookNotificationMessage> failedMessages = new ArrayList<>();
public HookConsumer(NotificationConsumer<HookNotificationMessage> consumer) {
super("atlas-hook-consumer-thread", false);
this.consumer = consumer;
}
@Override
public void run() {
public void doWork() {
shouldRun.set(true);
if (!serverAvailable(new NotificationHookConsumer.Timer())) {
......@@ -226,7 +225,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
while (shouldRun.get()) {
try {
List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive(1000L);
for (AtlasKafkaMessage<HookNotificationMessage> msg : messages){
for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) {
handleMessage(msg);
}
} catch (Throwable t) {
......@@ -267,15 +266,17 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
if (numRetries == 0) { // audit only on the first attempt
audit(messageUser, UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(),
String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(), partialUpdateRequest.getTypeName()));
String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(), partialUpdateRequest.getTypeName()));
}
Referenceable referenceable = partialUpdateRequest.getEntity();
entities = instanceConverter.toAtlasEntity(referenceable);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, Object>(){
{ put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue()); }
String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, Object>() {
{
put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue());
}
});
// There should only be one root entity
......@@ -289,13 +290,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
if (numRetries == 0) { // audit only on the first attempt
audit(messageUser, DELETE_ENTITY_BY_ATTRIBUTE.getMethod(),
String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(), deleteRequest.getTypeName()));
String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(), deleteRequest.getTypeName()));
}
try {
AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName());
atlasEntityStore.deleteByUniqueAttributes(type,
new HashMap<String, Object>() {{ put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue()); }});
new HashMap<String, Object>() {{
put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue());
}});
} catch (ClassCastException cle) {
LOG.error("Failed to do a partial update on Entity");
}
......@@ -319,10 +322,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
break;
} catch (Throwable e) {
LOG.warn("Error handling message", e);
try{
try {
LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
Thread.sleep(consumerRetryInterval);
}catch (InterruptedException ie){
} catch (InterruptedException ie) {
LOG.error("Notification consumer thread sleep interrupted");
}
......@@ -379,9 +382,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
return true;
}
public void stop() {
@Override
public void shutdown() {
super.initiateShutdown();
shouldRun.set(false);
consumer.close();
super.awaitShutdown();
}
}
......@@ -393,4 +399,4 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, path, LOCALHOST,
DateTimeHelper.formatDateUTC(new Date()));
}
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment