Commit fea30b88 by ashutoshm Committed by Sarath Subramanian

ATLAS-3168: PatchFix: Support for HA Mode

parent 5c9a699a
...@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.util.Bytes; ...@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.core.annotation.Order;
import javax.inject.Singleton; import javax.inject.Singleton;
import java.io.Closeable; import java.io.Closeable;
...@@ -84,6 +85,7 @@ import java.util.Set; ...@@ -84,6 +85,7 @@ import java.util.Set;
@Singleton @Singleton
@Component @Component
@ConditionalOnAtlasProperty(property = "atlas.EntityAuditRepository.impl", isDefault = true) @ConditionalOnAtlasProperty(property = "atlas.EntityAuditRepository.impl", isDefault = true)
@Order(0)
public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditRepository { public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditRepository {
private static final Logger LOG = LoggerFactory.getLogger(HBaseBasedAuditRepository.class); private static final Logger LOG = LoggerFactory.getLogger(HBaseBasedAuditRepository.class);
......
...@@ -58,6 +58,7 @@ import org.apache.commons.configuration.Configuration; ...@@ -58,6 +58,7 @@ import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.core.annotation.Order;
import javax.inject.Inject; import javax.inject.Inject;
import java.math.BigDecimal; import java.math.BigDecimal;
...@@ -85,6 +86,7 @@ import static org.apache.atlas.type.AtlasTypeUtil.isMapType; ...@@ -85,6 +86,7 @@ import static org.apache.atlas.type.AtlasTypeUtil.isMapType;
* Adds index for properties of a given type when its added before any instances are added. * Adds index for properties of a given type when its added before any instances are added.
*/ */
@Component @Component
@Order(1)
public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChangeHandler, TypeDefChangeListener { public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChangeHandler, TypeDefChangeListener {
private static final Logger LOG = LoggerFactory.getLogger(GraphBackedSearchIndexer.class); private static final Logger LOG = LoggerFactory.getLogger(GraphBackedSearchIndexer.class);
......
...@@ -18,8 +18,13 @@ ...@@ -18,8 +18,13 @@
package org.apache.atlas.repository.patches; package org.apache.atlas.repository.patches;
import javafx.application.Application;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.service.Service; import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;
...@@ -28,13 +33,12 @@ import org.springframework.stereotype.Component; ...@@ -28,13 +33,12 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject; import javax.inject.Inject;
@Component @Component
@Order(2) @Order(3)
public class AtlasPatchService implements Service { public class AtlasPatchService implements Service, ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(AtlasPatchService.class); private static final Logger LOG = LoggerFactory.getLogger(AtlasPatchService.class);
private final AtlasPatchManager patchManager; private final AtlasPatchManager patchManager;
@Inject @Inject
public AtlasPatchService(AtlasPatchManager patchManager) { public AtlasPatchService(AtlasPatchManager patchManager) {
this.patchManager = patchManager; this.patchManager = patchManager;
...@@ -44,11 +48,38 @@ public class AtlasPatchService implements Service { ...@@ -44,11 +48,38 @@ public class AtlasPatchService implements Service {
public void start() throws AtlasException { public void start() throws AtlasException {
LOG.info("PatchService: Started."); LOG.info("PatchService: Started.");
patchManager.applyAll(); startInternal(ApplicationProperties.get());
}
void startInternal(Configuration configuration) {
if (!HAConfiguration.isHAEnabled(configuration)) {
instanceIsActive();
}
} }
@Override @Override
public void stop() throws AtlasException { public void stop() {
LOG.info("PatchService: Stopped."); LOG.info("PatchService: Stopped.");
} }
@Override
public void instanceIsActive() {
try {
LOG.info("PatchService: Applying patches...");
patchManager.applyAll();
}
catch (Exception ex) {
LOG.error("PatchService: Applying patches: Failed!", ex);
}
}
@Override
public void instanceIsPassive() {
LOG.info("Reacting to passive: No action for now.");
}
@Override
public int getHandlerOrder() {
return HandlerOrder.ATLAS_PATCH_SERVICE.getOrder();
}
} }
...@@ -55,6 +55,7 @@ import org.apache.commons.lang.ObjectUtils; ...@@ -55,6 +55,7 @@ import org.apache.commons.lang.ObjectUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
...@@ -82,6 +83,7 @@ import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN; ...@@ -82,6 +83,7 @@ import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
* Class that handles initial loading of models and patches into typedef store * Class that handles initial loading of models and patches into typedef store
*/ */
@Service @Service
@Order(2)
public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
public static final Logger LOG = LoggerFactory.getLogger(AtlasTypeDefStoreInitializer.class); public static final Logger LOG = LoggerFactory.getLogger(AtlasTypeDefStoreInitializer.class);
public static final String PATCHES_FOLDER_NAME = "patches"; public static final String PATCHES_FOLDER_NAME = "patches";
...@@ -93,7 +95,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { ...@@ -93,7 +95,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
private final AtlasTypeDefStore typeDefStore; private final AtlasTypeDefStore typeDefStore;
private final AtlasTypeRegistry typeRegistry; private final AtlasTypeRegistry typeRegistry;
private final Configuration conf; private final Configuration conf;
private final AtlasPatchRegistry patchRegistry; private final AtlasGraph graph;
@Inject @Inject
public AtlasTypeDefStoreInitializer(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry, public AtlasTypeDefStoreInitializer(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry,
...@@ -101,24 +103,15 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { ...@@ -101,24 +103,15 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
this.typeDefStore = typeDefStore; this.typeDefStore = typeDefStore;
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.conf = conf; this.conf = conf;
this.patchRegistry = new AtlasPatchRegistry(graph); this.graph = graph;
} }
@PostConstruct @PostConstruct
public void init() throws AtlasBaseException { public void init() {
LOG.info("==> AtlasTypeDefStoreInitializer.init()"); LOG.info("==> AtlasTypeDefStoreInitializer.init()");
if (!HAConfiguration.isHAEnabled(conf)) { if (!HAConfiguration.isHAEnabled(conf)) {
typeDefStore.init(); startInternal();
loadBootstrapTypeDefs();
try {
AtlasAuthorizerFactory.getAtlasAuthorizer();
} catch (Throwable t) {
LOG.error("AtlasTypeDefStoreInitializer.init(): Unable to obtain AtlasAuthorizer", t);
}
} else {
LOG.info("AtlasTypeDefStoreInitializer.init(): deferring type loading until instance activation");
} }
LOG.info("<== AtlasTypeDefStoreInitializer.init()"); LOG.info("<== AtlasTypeDefStoreInitializer.init()");
...@@ -348,12 +341,17 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { ...@@ -348,12 +341,17 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
} }
@Override @Override
public void instanceIsActive() throws AtlasException { public void instanceIsActive() {
LOG.info("==> AtlasTypeDefStoreInitializer.instanceIsActive()"); LOG.info("==> AtlasTypeDefStoreInitializer.instanceIsActive()");
startInternal();
LOG.info("<== AtlasTypeDefStoreInitializer.instanceIsActive()");
}
private void startInternal() {
try { try {
typeDefStore.init(); typeDefStore.init();
loadBootstrapTypeDefs(); loadBootstrapTypeDefs();
try { try {
...@@ -364,8 +362,6 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { ...@@ -364,8 +362,6 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
} catch (AtlasBaseException e) { } catch (AtlasBaseException e) {
LOG.error("Failed to init after becoming active", e); LOG.error("Failed to init after becoming active", e);
} }
LOG.info("<== AtlasTypeDefStoreInitializer.instanceIsActive()");
} }
@Override @Override
...@@ -414,6 +410,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { ...@@ -414,6 +410,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
String typePatchesDirName = typesDirName + File.separator + PATCHES_FOLDER_NAME; String typePatchesDirName = typesDirName + File.separator + PATCHES_FOLDER_NAME;
File typePatchesDir = new File(typePatchesDirName); File typePatchesDir = new File(typePatchesDirName);
File[] typePatchFiles = typePatchesDir.exists() ? typePatchesDir.listFiles() : null; File[] typePatchFiles = typePatchesDir.exists() ? typePatchesDir.listFiles() : null;
AtlasPatchRegistry patchRegistry = new AtlasPatchRegistry(graph);
if (typePatchFiles == null || typePatchFiles.length == 0) { if (typePatchFiles == null || typePatchFiles.length == 0) {
LOG.info("Type patches directory {} does not exist or not readable or has no patches", typePatchesDirName); LOG.info("Type patches directory {} does not exist or not readable or has no patches", typePatchesDirName);
......
...@@ -30,8 +30,9 @@ public interface ActiveStateChangeHandler { ...@@ -30,8 +30,9 @@ public interface ActiveStateChangeHandler {
AUDIT_REPOSITORY(0), AUDIT_REPOSITORY(0),
GRAPH_BACKED_SEARCH_INDEXER(1), GRAPH_BACKED_SEARCH_INDEXER(1),
TYPEDEF_STORE_INITIALIZER(2), TYPEDEF_STORE_INITIALIZER(2),
DEFAULT_METADATA_SERVICE(3), ATLAS_PATCH_SERVICE(3),
NOTIFICATION_HOOK_CONSUMER(4); DEFAULT_METADATA_SERVICE(4),
NOTIFICATION_HOOK_CONSUMER(5);
private final int order; private final int order;
......
...@@ -53,7 +53,9 @@ import java.util.Set; ...@@ -53,7 +53,9 @@ import java.util.Set;
*/ */
@Component @Component
@Order(1) //
// This should be called the last, leaving it without the @Order(Integer.MAX_VALUE) will make it get
// called after all services have their start called.
public class ActiveInstanceElectorService implements Service, LeaderLatchListener { public class ActiveInstanceElectorService implements Service, LeaderLatchListener {
private static final Logger LOG = LoggerFactory.getLogger(ActiveInstanceElectorService.class); private static final Logger LOG = LoggerFactory.getLogger(ActiveInstanceElectorService.class);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment