Commit 3959b318 by Madhan Neethiraj

ATLAS-2220: Active state change listener order made predictable

parent 5add1aeb
...@@ -406,4 +406,9 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository ...@@ -406,4 +406,9 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository
public void instanceIsPassive() { public void instanceIsPassive() {
LOG.info("Reacting to passive: No action for now."); LOG.info("Reacting to passive: No action for now.");
} }
@Override
public int getHandlerOrder() {
return HandlerOrder.HBASE_AUDIT_REPOSITORY.getOrder();
}
} }
...@@ -698,6 +698,11 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang ...@@ -698,6 +698,11 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
} }
@Override @Override
public int getHandlerOrder() {
return HandlerOrder.GRAPH_BACKED_SEARCH_INDEXER.getOrder();
}
@Override
public void onChange(ChangedTypeDefs changedTypeDefs) throws AtlasBaseException { public void onChange(ChangedTypeDefs changedTypeDefs) throws AtlasBaseException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Processing changed typedefs {}", changedTypeDefs); LOG.debug("Processing changed typedefs {}", changedTypeDefs);
......
...@@ -341,6 +341,11 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { ...@@ -341,6 +341,11 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
LOG.info("<== AtlasTypeDefStoreInitializer.instanceIsPassive()"); LOG.info("<== AtlasTypeDefStoreInitializer.instanceIsPassive()");
} }
@Override
public int getHandlerOrder() {
return HandlerOrder.TYPEDEF_STORE_INITIALIZER.getOrder();
}
private static boolean updateTypeAttributes(AtlasStructDef oldStructDef, AtlasStructDef newStructDef, boolean checkTypeVersion) { private static boolean updateTypeAttributes(AtlasStructDef oldStructDef, AtlasStructDef newStructDef, boolean checkTypeVersion) {
boolean ret = isTypeUpdateApplicable(oldStructDef, newStructDef, checkTypeVersion); boolean ret = isTypeUpdateApplicable(oldStructDef, newStructDef, checkTypeVersion);
......
...@@ -797,6 +797,11 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang ...@@ -797,6 +797,11 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
} }
@Override @Override
public int getHandlerOrder() {
return HandlerOrder.DEFAULT_METADATA_SERVICE.getOrder();
}
@Override
public void onChange(ChangedTypeDefs changedTypeDefs) throws AtlasBaseException { public void onChange(ChangedTypeDefs changedTypeDefs) throws AtlasBaseException {
// All we need here is a restore of the type-system // All we need here is a restore of the type-system
LOG.info("TypeSystem reset invoked by TypeRegistry changes"); LOG.info("TypeSystem reset invoked by TypeRegistry changes");
......
...@@ -26,6 +26,22 @@ import org.apache.atlas.AtlasException; ...@@ -26,6 +26,22 @@ import org.apache.atlas.AtlasException;
* The two state transitions we handle are (1) becoming active and (2) becoming passive. * The two state transitions we handle are (1) becoming active and (2) becoming passive.
*/ */
public interface ActiveStateChangeHandler { public interface ActiveStateChangeHandler {
public enum HandlerOrder {
HBASE_AUDIT_REPOSITORY(0),
GRAPH_BACKED_SEARCH_INDEXER(1),
TYPEDEF_STORE_INITIALIZER(2),
DEFAULT_METADATA_SERVICE(3),
NOTIFICATION_HOOK_CONSUMER(4);
private final int order;
private HandlerOrder(int order) {
this.order = order;
}
public int getOrder() { return order; }
}
/** /**
* Callback that is invoked on an implementor when this instance of Atlas server is declared the leader. * Callback that is invoked on an implementor when this instance of Atlas server is declared the leader.
...@@ -46,4 +62,13 @@ public interface ActiveStateChangeHandler { ...@@ -46,4 +62,13 @@ public interface ActiveStateChangeHandler {
* @throws {@link AtlasException} if anything is wrong on shutdown * @throws {@link AtlasException} if anything is wrong on shutdown
*/ */
void instanceIsPassive() throws AtlasException; void instanceIsPassive() throws AtlasException;
/**
* Defines the order in which the handler should be called.
* When state becomes active, the handler will be called from low order to high
* When state becomes passive, the handler will be called from high order to low
*
*/
int getHandlerOrder();
} }
...@@ -220,6 +220,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -220,6 +220,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
stop(); stop();
} }
@Override
public int getHandlerOrder() {
return HandlerOrder.NOTIFICATION_HOOK_CONSUMER.getOrder();
}
static class Timer { static class Timer {
public void sleep(int interval) throws InterruptedException { public void sleep(int interval) throws InterruptedException {
Thread.sleep(interval); Thread.sleep(interval);
......
...@@ -34,7 +34,9 @@ import org.springframework.stereotype.Component; ...@@ -34,7 +34,9 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject; import javax.inject.Inject;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set; import java.util.Set;
/** /**
...@@ -58,7 +60,7 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene ...@@ -58,7 +60,7 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene
private final ServiceState serviceState; private final ServiceState serviceState;
private final ActiveInstanceState activeInstanceState; private final ActiveInstanceState activeInstanceState;
private Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders; private Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders;
private Collection<ActiveStateChangeHandler> activeStateChangeHandlers; private List<ActiveStateChangeHandler> activeStateChangeHandlers;
private CuratorFactory curatorFactory; private CuratorFactory curatorFactory;
private LeaderLatch leaderLatch; private LeaderLatch leaderLatch;
private String serverId; private String serverId;
...@@ -158,6 +160,17 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene ...@@ -158,6 +160,17 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene
private void cacheActiveStateChangeHandlers() { private void cacheActiveStateChangeHandlers() {
if (activeStateChangeHandlers.size()==0) { if (activeStateChangeHandlers.size()==0) {
activeStateChangeHandlers.addAll(activeStateChangeHandlerProviders); activeStateChangeHandlers.addAll(activeStateChangeHandlerProviders);
LOG.info("activeStateChangeHandlers(): before reorder: " + activeStateChangeHandlers);
Collections.sort(activeStateChangeHandlers, new Comparator<ActiveStateChangeHandler>() {
@Override
public int compare(ActiveStateChangeHandler lhs, ActiveStateChangeHandler rhs) {
return Integer.compare(lhs.getHandlerOrder(), rhs.getHandlerOrder());
}
});
LOG.info("activeStateChangeHandlers(): after reorder: " + activeStateChangeHandlers);
} }
} }
...@@ -177,9 +190,9 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene ...@@ -177,9 +190,9 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene
public void notLeader() { public void notLeader() {
LOG.warn("Server instance with server id {} is removed as leader", serverId); LOG.warn("Server instance with server id {} is removed as leader", serverId);
serviceState.becomingPassive(); serviceState.becomingPassive();
for (ActiveStateChangeHandler handler: activeStateChangeHandlers) { for (int idx = activeStateChangeHandlers.size() - 1; idx >= 0; idx--) {
try { try {
handler.instanceIsPassive(); activeStateChangeHandlers.get(idx).instanceIsPassive();
} catch (AtlasException e) { } catch (AtlasException e) {
LOG.error("Error while reacting to passive state.", e); LOG.error("Error while reacting to passive state.", e);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment